From 76289024543fbfb6df3d07f61f15eabc623dd246 Mon Sep 17 00:00:00 2001 From: Yves Fischer Date: Sat, 23 Jul 2016 18:42:06 +0200 Subject: restructure code rename module to pyinflux split in client and parser package. the parser depends also on funcparserlib contains 3 examples of strange influxdb behavior --- fuzzer1.py | 74 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100755 fuzzer1.py (limited to 'fuzzer1.py') diff --git a/fuzzer1.py b/fuzzer1.py new file mode 100755 index 0000000..57b5de6 --- /dev/null +++ b/fuzzer1.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +""" +in 2016/07/23 18:37:37 InfluxDB starting, version 0.13.0, branch 0.13, commit e57fb88a051ee40fd9277094345fbd47bb4783ce +this fuzzer can cause a concurrent read write crash +""" +import itertools +from multiprocessing.pool import ThreadPool as Pool +from pyinflux.client import InfluxDB, Line + +influxdb = InfluxDB('test', 'localhost') + +value_generator = itertools.count() + + +def test(number): + for value in value_generator: + if value + 1 % 500 == 0: + print("thread {} at {}".format(number, value)) + expected = "value{value}".format(value=value) + line = Line('series' + str(value), + {'tag': 'tag' + expected}, + {'field': 'field' + expected}) + + try: + influxdb.write([line]) + except: + print(line) + raise + + query = """\ + SELECT * + FROM "series{value}" + WHERE time >= now() - 2s""".format(value=value) + try: + results = influxdb.query(query).as_json()['results'] + except: + print("""\ + Failure. With the following INSERT: + {line}""".format(line=line)) + raise + try: + assert len(results) == 1 + series = results[0]['series'] + assert len(series) == 1 + series_result = series[0] + assert series_result['name'] == 'series' + str(value) + assert len(series_result['values']) == 1 + values = series_result['values'] + assert len(values) == 1 + assert len(values[0]) == 3 + columns = series_result['columns'] + assert columns == ['time', 'field', 'tag'] + + assert values[0][columns.index('field')] == 'fieldvalue' + str(value) + assert values[0][columns.index('tag')] == 'tagvalue' + str(value) + except: + print("""\ + Failure. With the following INSERT: + {line} + + Got this result: + {results}""".format(line=line, results=results)) + raise + + +print(influxdb.query('DROP DATABASE test').as_text()) +print(influxdb.query('CREATE DATABASE test').as_text()) + +N_PROC = 2 +with Pool(processes=N_PROC) as pool: + for res in pool.imap_unordered(test, range(1, N_PROC + 1)): + # they are suppossed to run endlessly, if one returns then there was a failure + pool.terminate() +print('Done') -- cgit v1.2.1