diff options
Diffstat (limited to 'tester.py')
-rwxr-xr-x | tester.py | 76 |
1 files changed, 76 insertions, 0 deletions
diff --git a/tester.py b/tester.py new file mode 100755 index 0000000..9900b4a --- /dev/null +++ b/tester.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 +import itertools +from multiprocessing.pool import ThreadPool as Pool +from pyinflux.client import InfluxDB, Line + + +class TwoWayInfluxDB(InfluxDB): + def __init__(self, write_url): + super().__init__('test', 'localhost') + self._write_url = write_url + +influxdb = TwoWayInfluxDB('http://localhost:8001/write/test?') +value_generator = itertools.count() + +def test(number): + for value in value_generator: + if value > 200: + return + if (value + 1) % 50 == 0: + print("thread {} at {}".format(number, value)) + expected = "value{value}".format(value=value) + line = Line('series' + str(value), + {'tag': 'tag' + expected}, + {'field': 'field' + expected}) + + try: + influxdb.write([line]) + except: + print(line) + raise + + query = """\ + SELECT * + FROM "series{value}" + WHERE time >= now() - 2s""".format(value=value) + try: + results = influxdb.query(query).as_json()['results'] + except: + print("""\ + Failure. With the following INSERT: + {line}""".format(line=line)) + raise + try: + assert len(results) == 1 + series = results[0]['series'] + assert len(series) == 1 + series_result = series[0] + assert series_result['name'] == 'series' + str(value) + assert len(series_result['values']) == 1 + values = series_result['values'] + assert len(values) == 1 + assert len(values[0]) == 3 + columns = series_result['columns'] + assert columns == ['time', 'field', 'tag'] + + assert values[0][columns.index('field')] == 'fieldvalue' + str(value) + assert values[0][columns.index('tag')] == 'tagvalue' + str(value) + except: + print("""\ + Failure. With the following INSERT: + {line} + + Got this result: + {results}""".format(line=line, results=results)) + raise + + +print(influxdb.query('DROP DATABASE test').as_text()) +print(influxdb.query('CREATE DATABASE test').as_text()) + +N_PROC = 1 +with Pool(processes=N_PROC) as pool: + for res in pool.imap_unordered(test, range(1, N_PROC + 1)): + # they are suppossed to run endlessly, if one returns then there was a failure + pool.terminate() +print('Done') |