summaryrefslogtreecommitdiff
path: root/tester.py
diff options
context:
space:
mode:
Diffstat (limited to 'tester.py')
-rwxr-xr-xtester.py76
1 files changed, 76 insertions, 0 deletions
diff --git a/tester.py b/tester.py
new file mode 100755
index 0000000..9900b4a
--- /dev/null
+++ b/tester.py
@@ -0,0 +1,76 @@
+#!/usr/bin/env python3
+import itertools
+from multiprocessing.pool import ThreadPool as Pool
+from pyinflux.client import InfluxDB, Line
+
+
+class TwoWayInfluxDB(InfluxDB):
+ def __init__(self, write_url):
+ super().__init__('test', 'localhost')
+ self._write_url = write_url
+
+influxdb = TwoWayInfluxDB('http://localhost:8001/write/test?')
+value_generator = itertools.count()
+
+def test(number):
+ for value in value_generator:
+ if value > 200:
+ return
+ if (value + 1) % 50 == 0:
+ print("thread {} at {}".format(number, value))
+ expected = "value{value}".format(value=value)
+ line = Line('series' + str(value),
+ {'tag': 'tag' + expected},
+ {'field': 'field' + expected})
+
+ try:
+ influxdb.write([line])
+ except:
+ print(line)
+ raise
+
+ query = """\
+ SELECT *
+ FROM "series{value}"
+ WHERE time >= now() - 2s""".format(value=value)
+ try:
+ results = influxdb.query(query).as_json()['results']
+ except:
+ print("""\
+ Failure. With the following INSERT:
+ {line}""".format(line=line))
+ raise
+ try:
+ assert len(results) == 1
+ series = results[0]['series']
+ assert len(series) == 1
+ series_result = series[0]
+ assert series_result['name'] == 'series' + str(value)
+ assert len(series_result['values']) == 1
+ values = series_result['values']
+ assert len(values) == 1
+ assert len(values[0]) == 3
+ columns = series_result['columns']
+ assert columns == ['time', 'field', 'tag']
+
+ assert values[0][columns.index('field')] == 'fieldvalue' + str(value)
+ assert values[0][columns.index('tag')] == 'tagvalue' + str(value)
+ except:
+ print("""\
+ Failure. With the following INSERT:
+ {line}
+
+ Got this result:
+ {results}""".format(line=line, results=results))
+ raise
+
+
+print(influxdb.query('DROP DATABASE test').as_text())
+print(influxdb.query('CREATE DATABASE test').as_text())
+
+N_PROC = 1
+with Pool(processes=N_PROC) as pool:
+ for res in pool.imap_unordered(test, range(1, N_PROC + 1)):
+ # they are suppossed to run endlessly, if one returns then there was a failure
+ pool.terminate()
+print('Done')