diff options
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | LICENSE | 20 | ||||
-rw-r--r-- | MANIFEST.in | 1 | ||||
-rw-r--r-- | README.md | 3 | ||||
-rwxr-xr-x | anotherBug.py | 37 | ||||
-rwxr-xr-x | fuzzer.py | 132 | ||||
-rwxr-xr-x | fuzzer1.py | 74 | ||||
-rwxr-xr-x | fuzzer2.py | 132 | ||||
-rw-r--r-- | pyinflux/__init__.py | 1 | ||||
-rw-r--r-- | pyinflux/client/__init__.py | 149 | ||||
-rw-r--r-- | pyinflux/parser/__init__.py | 222 | ||||
-rw-r--r-- | pyinfluxtools/__init__.py | 293 | ||||
-rw-r--r-- | setup.py | 41 | ||||
-rwxr-xr-x | test.py | 7 |
14 files changed, 668 insertions, 446 deletions
@@ -1 +1,3 @@ *pyc +/MANIFEST +/.idea
\ No newline at end of file @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2016 Yves Fischer <yvesf+git@xapek.org> + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\ No newline at end of file diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..45d276d --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include pyinflux/*
\ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e931f93 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# pyinflux tools + +the files `fuzzer1.py` and `fuzzer2.py` contain usage examples.
\ No newline at end of file diff --git a/anotherBug.py b/anotherBug.py new file mode 100755 index 0000000..ce52cf4 --- /dev/null +++ b/anotherBug.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +""" +another bug? +in 2016/07/23 18:37:37 InfluxDB starting, version 0.13.0, branch 0.13, commit e57fb88a051ee40fd9277094345fbd47bb4783ce +the escaping of the 'key' or 'name' is strange +""" +from pyinflux.client import Influx, Line + +influxdb = Influx('localhost') + +print(influxdb.execute('DROP DATABASE test').as_text()) +print(influxdb.execute('CREATE DATABASE test').as_text()) + + +def write(line): + print(repr(line)) + print(str(line)) + return influxdb.write_db('test', [line]) + + +print(write(Line('asd\\', {'tag1': ''}, {'field1': ''}))) +print(write(Line('asd\\\\', {'tag1': ''}, {'field1': ''}))) + + +def query(q): + print(q) + try: + return influxdb.query_db('test', q).as_json() + except Exception as e: + return str(e) + +# at least some of these query must return something... +print(query('SELECT * FROM asd')) +print(query('SELECT * FROM asd\\')) +print(query('SELECT * FROM asd\\\\')) +print(query('SELECT * FROM "asd"')) +print(query('SELECT * FROM "asd\\\\"')) diff --git a/fuzzer.py b/fuzzer.py deleted file mode 100755 index ffdb0e7..0000000 --- a/fuzzer.py +++ /dev/null @@ -1,132 +0,0 @@ -#!/usr/bin/env python3 -import functools -import random -from multiprocessing.pool import ThreadPool as Pool - -import requests -from pyinfluxtools import * -from influxdb import InfluxDBClient - -settings = { - 'host': 'localhost', - 'port': 8086, - 'username': 'root', - 'password': 'root', - 'database': 'test', -} - -write_url = ("http://{host}:{port}/write?db={database}" + - "&username={username}&password={password}" - ).format(**settings) - -client = InfluxDBClient(settings['host'], settings['port'], - settings['username'], settings['password'], - settings['database']) - - -class Generator: - all = lambda x: bytes(chr(random.randint(0x00, 0xff)), 'latin-1') - _printables = list(map(chr, list(range(0x20, 0x7E)))) - printable = lambda x: random.choice(Generator._printables) - numericText = lambda x: chr(random.randint(ord("0"), ord("9"))) - text = lambda x: chr(random.choice( - list(range(ord("a"), ord("z"))) + - list(range(ord("A"), ord("Z"))))) - - -class Filter: - regex = lambda regex: re.compile(regex).match - pass_all = lambda _: True - - -def generate(length_min, length_max, filter, generator): - length = random.randint(length_min, length_max) - while True: - text = functools.reduce( - lambda a, b: a + b, map(generator, range(length))) - if filter(text): - return text - - -def run(*a): - while True: - test() - - -def test(): - # influxdb doesn't allow measurements starting with { - noBrace = Filter.regex("^[^\\{]") - noHash = Filter.regex("^[^#]") # hash sign starts a comment - # TODO: only allow valid escape sequences - noEscape = Filter.regex("^[^\\\\]+$") - - def generateKey(min=3,max=6): - return generate(min, max, lambda text: - noBrace(text) and noHash(text) and noEscape(text), - Generator.printable) - - def generateTags(min, max): - def generateTagPairs(): - for i in range(random.randint(min, max)): - yield (generateKey(), generateKey()) - - return dict(tuple(generateTagPairs())) - - def generateFields(min, max): - def generateFieldPairs(): - for i in range(random.randint(min,max)): - yield (generateKey(), generate(3, 6, lambda _: True, Generator.printable)) - return dict(tuple(generateFieldPairs())) - - w = Write(generateKey(8,12), # key - generateTags(1, 4), # tags - generateFields(1, 4)) # fields - line = str(w) - print(line) - try: - r = requests.post(write_url, line) - INFO = ("Data:\nline={}\nwrite={}\nr.status_code={}\n" + - "r.content={}").format(line, repr(w), r.status_code, r.text) - assert r.status_code == 204, INFO - except Exception as e: - print(e) - assert False, "Data\nline={}\nwrite={}".format(str(w), repr(w)) - - measurement = Write.escape_value(w.key) - query = """\ -SELECT * -FROM {measurement} -WHERE time >= now() - 2s""".format(**locals()) - result = [] - try: - result = list(client.query(query)) - except Exception as e: - print(e) - wrepr=repr(w) - DEBUGINFO = ("DEBUG:\nquery={query}\nwrite={wrepr}\n" + - "result={result}\nline={line}").format(**locals()) - assert len(result) == 1, DEBUGINFO - assert len(result[0][0]) == len(w.tags) + len(w.fields) + 1, DEBUGINFO # + time - - # assert fields - for (key, value) in w.fields: - assert key in result[0][0], DEBUGINFO - assert result[0][0][key] == value, DEBUGINFO + \ - "\n"+ key +"=" + result[0][0][key] + "=" + value - - # assert tags - for (tagKey, tagValue) in w.tags: - assert tagKey in result[0][0], DEBUGINFO - assert result[0][0][tagKey] == tagValue, DEBUGINFO - - -N_PROC = 4 -with Pool(processes=N_PROC) as pool: - try: - for res in pool.imap_unordered(run, [None] * N_PROC): - pool.terminate() - print("Exit") - raise SystemExit(0) - except Exception as e: - pool.terminate() - raise e diff --git a/fuzzer1.py b/fuzzer1.py new file mode 100755 index 0000000..57b5de6 --- /dev/null +++ b/fuzzer1.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +""" +in 2016/07/23 18:37:37 InfluxDB starting, version 0.13.0, branch 0.13, commit e57fb88a051ee40fd9277094345fbd47bb4783ce +this fuzzer can cause a concurrent read write crash +""" +import itertools +from multiprocessing.pool import ThreadPool as Pool +from pyinflux.client import InfluxDB, Line + +influxdb = InfluxDB('test', 'localhost') + +value_generator = itertools.count() + + +def test(number): + for value in value_generator: + if value + 1 % 500 == 0: + print("thread {} at {}".format(number, value)) + expected = "value{value}".format(value=value) + line = Line('series' + str(value), + {'tag': 'tag' + expected}, + {'field': 'field' + expected}) + + try: + influxdb.write([line]) + except: + print(line) + raise + + query = """\ + SELECT * + FROM "series{value}" + WHERE time >= now() - 2s""".format(value=value) + try: + results = influxdb.query(query).as_json()['results'] + except: + print("""\ + Failure. With the following INSERT: + {line}""".format(line=line)) + raise + try: + assert len(results) == 1 + series = results[0]['series'] + assert len(series) == 1 + series_result = series[0] + assert series_result['name'] == 'series' + str(value) + assert len(series_result['values']) == 1 + values = series_result['values'] + assert len(values) == 1 + assert len(values[0]) == 3 + columns = series_result['columns'] + assert columns == ['time', 'field', 'tag'] + + assert values[0][columns.index('field')] == 'fieldvalue' + str(value) + assert values[0][columns.index('tag')] == 'tagvalue' + str(value) + except: + print("""\ + Failure. With the following INSERT: + {line} + + Got this result: + {results}""".format(line=line, results=results)) + raise + + +print(influxdb.query('DROP DATABASE test').as_text()) +print(influxdb.query('CREATE DATABASE test').as_text()) + +N_PROC = 2 +with Pool(processes=N_PROC) as pool: + for res in pool.imap_unordered(test, range(1, N_PROC + 1)): + # they are suppossed to run endlessly, if one returns then there was a failure + pool.terminate() +print('Done') diff --git a/fuzzer2.py b/fuzzer2.py new file mode 100755 index 0000000..14932a8 --- /dev/null +++ b/fuzzer2.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 +""" +in 2016/07/23 18:37:37 InfluxDB starting, version 0.13.0, branch 0.13, commit e57fb88a051ee40fd9277094345fbd47bb4783ce +this fuzzer can generate write line-statements that are parsed differently than how they are meant +""" +import re +import functools +import random +from multiprocessing.pool import ThreadPool as Pool + +from pyinflux.client import Line, InfluxDB + +influxdb = InfluxDB('test', 'localhost') + + +class Generator: + all = lambda x: bytes(chr(random.randint(0x00, 0xff)), 'latin-1') + _printables = list(map(chr, list(range(0x20, 0x7E)))) + printable = lambda x: random.choice(Generator._printables) + numericText = lambda x: chr(random.randint(ord("0"), ord("9"))) + text = lambda x: chr(random.choice( + list(range(ord("a"), ord("z"))) + + list(range(ord("A"), ord("Z"))))) + basicText = lambda x: chr(random.choice( + [ord("\""), ord(" "), ord("\\")] + + list(range(ord("a"), ord("z"))) + + list(range(ord("A"), ord("Z"))))) + + +class Filter: + regex = lambda regex: re.compile(regex).match + pass_all = lambda _: True + + +def generate(length_min, length_max, filter, generator): + length = random.randint(length_min, length_max) + while True: + text = functools.reduce( + lambda a, b: a + b, map(generator, range(length))) + if filter(text): + return text + + +def run(*a): + while True: + test() + + +usedKeys = set() + + +def test(): + # influxdb doesn't allow measurements starting with { + noBrace = Filter.regex("^[^\\{]") + noHash = Filter.regex("^[^#]") # hash sign starts a comment + # TODO: only allow valid escape sequences + noEscape = Filter.regex("^[^\\\\]+$") + noUsed = lambda term: term not in usedKeys + + def generateKey(min=3, max=6): + return generate(min, max, lambda text: noUsed(text) + and noEscape(text), # for now, see anotherBug.py + Generator.basicText) + + def generateTags(min, max): + def generateTagPairs(): + for i in range(random.randint(min, max)): + yield (generateKey(), generateKey()) + + return dict(tuple(generateTagPairs())) + + def generateFields(min, max): + def generateFieldPairs(): + for i in range(random.randint(min, max)): + yield (generateKey(), generate(3, 6, lambda text: True + , Generator.basicText)) + + return dict(tuple(generateFieldPairs())) + + w = Line(generateKey(8, 12), # key + generateTags(1, 4), # tags + generateFields(1, 4)) # fields + usedKeys.add(w.key) + try: + text = influxdb.write([w]) + except Exception as e: + print("Data\nline={}\nwrite={}".format(str(w), repr(w))) + raise e + + measurement = Line.escape_value(w.key) + query = """\ +SELECT * +FROM {measurement} +WHERE time >= now() - 2s""".format(**locals()) + query_response = [] + try: + query_response = influxdb.query(query).as_json() + except Exception as e: + print("Data\nline={}\nwrite={}".format(str(w), repr(w))) + raise e + DEBUGINFO = ("DEBUG:\nquery={query}\nwrite={w}\n" + + "query_response={query_response}").format(**locals()) + try: + assert len(query_response) == 1, DEBUGINFO + + results = query_response['results'] + assert len(results) == 1 + + result = results[0] + series = result['series'] + for serie in series: + assert serie['name'] == w.key + assert len(serie['columns']) == len(w.tags) + len(w.fields) + 1, \ + "Length of colums: {} != {}".format(len(serie['columns']), len(w.tags) + len(w.fields)) + except: + print("Data\nline={}\nwrite={}\nquery={}\nquery_response={}".format(str(w), repr(w), query, query_response)) + raise + + +print(influxdb.query('DROP DATABASE test').as_text()) +print(influxdb.query('CREATE DATABASE test').as_text()) + +N_PROC = 2 +with Pool(processes=N_PROC) as pool: + try: + for res in pool.imap_unordered(run, [None] * N_PROC): + pool.terminate() + print("Exit") + raise SystemExit(0) + except: + pool.terminate() + raise diff --git a/pyinflux/__init__.py b/pyinflux/__init__.py new file mode 100644 index 0000000..bc63beb --- /dev/null +++ b/pyinflux/__init__.py @@ -0,0 +1 @@ +# empty
\ No newline at end of file diff --git a/pyinflux/client/__init__.py b/pyinflux/client/__init__.py new file mode 100644 index 0000000..b485861 --- /dev/null +++ b/pyinflux/client/__init__.py @@ -0,0 +1,149 @@ +import typing +import io +import re +from urllib.request import urlopen +from urllib.parse import quote as urlquote, urlencode +import json +import codecs + + +class Line(object): + def __init__(self, key, tags, fields, timestamp=None): + self.key = key + self.tags = tags + self.fields = fields + self.timestamp = timestamp + + if isinstance(self.tags, dict): + self.tags = self.tags.items() + + if isinstance(self.fields, dict): + self.fields = self.fields.items() + + @staticmethod + def escape_identifier(string): + return re.sub(r'([\\,= ])', '\\\\\\1', string) + + @staticmethod + def escape_tags(taglist): + return ",".join(map(lambda kv: + (Line.escape_identifier(kv[0]) + "=" + Line.escape_identifier(kv[1])), + taglist)) + + @staticmethod + def escape_value(obj): + if (isinstance(obj, float) or + isinstance(obj, int) or + isinstance(obj, bool)): + return str(obj) + else: + obj = str(obj) + return "\"" + obj.replace("\\", "\\\\").replace("\"", "\\\"") + "\"" + + @staticmethod + def escape_fields(kvlist): + def escape_key(string): + return re.sub(r'(["\\,= ])', '\\\\\\1', string) + + return ",".join( + map(lambda kv: escape_key(kv[0]) + "=" + Line.escape_value(kv[1]), + kvlist)) + + def __repr__(self): + """ + >>> print(repr(Line('test', [('a','b')], [('value','asd\\\\')]))) + <Line key=test tags=[('a', 'b')] fields=[('value', 'asd\\\\')] timestamp=None> + """ + return "<{} key={} tags={} fields={} timestamp={}>".format( + self.__class__.__name__, self.key, self.tags, self.fields, self.timestamp) + + def __str__(self): + """ + >>> print(Line('test', [('a','b')], [('value','asd\\\\')])) + test,a=b value="asd\\\\" + """ + result = self.escape_identifier(self.key) + + if self.tags: + result += "," + result += self.escape_tags(self.tags) + + if self.fields: + result += " " + result += self.escape_fields(self.fields) + + if self.timestamp: + result += " " + result += str(self.timestamp) + + return result + + +class QueryResultOption: + CODEC = codecs.getreader('utf-8') + + def __init__(self, exec_func: typing.Callable[[], io.IOBase]): + self.exec_func = exec_func + self._json = None + self._text = None + + def as_json(self): + if self._json is None: + fh = self.CODEC(self.exec_func()) + self._json = json.load(fh) + fh.close() + return self._json + + def as_text(self): + if self._text is None: + fh = self.CODEC(self.exec_func()) + self._text = fh.read() + fh.close() + return self._text + + +class Influx: + def __init__(self, host: str, port: int = 8086, username: str = None, password: str = None): + """ + :param username: username and password: + :param password: if set both must be set + """ + self._write_url = "http://{host}:{port}/write?".format(**locals()) + self._query_url_get = "http://{host}:{port}/query?".format(**locals()) + self._query_url_post = "http://{host}:{port}/query".format(**locals()) + if username and password: + self._write_url += 'username=' + username + '&password' + password + '&' + self._query_url_get += 'username=' + username + '&password' + password + '&' + self._query_url_post += '?username=' + username + '&password' + password + + def write_db(self, db: str, lines: [Line]): + url = self._write_url + "db=" + urlquote(db) + request_data = "\n".join(map(str, lines)).encode('utf-8') + with urlopen(url, request_data) as fh: + response = fh.read() + return response.decode('utf-8') + + def query_db(self, db: str, query: str) -> QueryResultOption: + def get_fh() -> io.IOBase: + url = self._query_url_get + 'db=' + urlquote(db) + '&q=' + urlquote(query) + return urlopen(url) + + return QueryResultOption(get_fh) + + def execute(self, query: str) -> QueryResultOption: + def get_fh() -> io.IOBase: + return urlopen(self._query_url_post, urlencode({'q': query}).encode('utf-8')) + + return QueryResultOption(get_fh) + + +class InfluxDB(Influx): + def __init__(self, db: str, host: str, port: int = 8086, username: str = None, password: str = None): + super().__init__(host, port, username, password) + self._db = db + + def write(self, lines: [Line]): + return self.write_db(self._db, lines) + + def query(self, query: str): + return self.query_db(self._db, query) diff --git a/pyinflux/parser/__init__.py b/pyinflux/parser/__init__.py new file mode 100644 index 0000000..2de6c0e --- /dev/null +++ b/pyinflux/parser/__init__.py @@ -0,0 +1,222 @@ +from functools import reduce + +try: + from funcparserlib.lexer import make_tokenizer + from funcparserlib.parser import ( + some, maybe, many, finished, skip, NoParseError) +except ImportError as e: + print("Missing funcparserlib library. You need to install the 'parser' extra dependency set.") + raise e + +from pyinflux import client + + +def parse_lines(lines): + """ + Parse multiple Write objects separeted by new-line character. + + >>> print(LineParser.parse("foo b=1")) + foo b=1 + + >>> lines = [] + >>> lines += ['cpu field=123'] + >>> lines += ['cpu,host=serverA,region=us-west field1=1,field2=2'] + >>> lines += ['cpu,host=serverA,region=us-west field1=1,field2=2 1234'] + >>> print("\\n".join(map(str, parse_lines("\\n".join(lines))))) + cpu field=123 + cpu,host=serverA,region=us-west field1=1,field2=2 + cpu,host=serverA,region=us-west field1=1,field2=2 1234 + """ + writes = map(LineParser.parse, lines.split("\n")) + return list(writes) + + +class LineParser(object): + specs = [ + ('Comma', (r',',)), + ('Space', (r' ',)), + ('Equal', (r'=',)), + ('Quote', (r'"',)), + ('Escape', (r'\\',)), + ('Int', (r'[0-9]+(?![0-9\.])',)), + ('Float', (r'-?(\.[0-9]+)|([0-9]+(\.[0-9]*)?)',)), + ('Char', (r'.',)), + ] + + @classmethod + def tokenize(klass, line : str): + tokenizer = make_tokenizer(klass.specs) + return list(tokenizer(line)) + + @staticmethod + def parse(line): + """ + Parse a line from the POST request into a Write object. + + >>> line='cpu a=1'; LineParser.parse(line); print(LineParser.parse(line)) + <Line key=cpu tags=[] fields=[('a', 1)] timestamp=None> + cpu a=1 + + >>> print(LineParser.parse('yahoo.CHFGBP\\=X.ask,tag=foobar value=10.2')) + yahoo.CHFGBP\=X.ask,tag=foobar value=10.2 + + >>> LineParser.parse('cpu,host=serverA,region=us-west foo="bar"') + <Line key=cpu tags=[('host', 'serverA'), ('region', 'us-west')] fields=[('foo', 'bar')] timestamp=None> + + >>> print(LineParser.parse('cpu host="serverA",region="us-west"')) + cpu host="serverA",region="us-west" + + >>> line='cpu\\,01 host="serverA",region="us-west"'; \\ + ... LineParser.parse(line); print(LineParser.parse(line)) + <Line key=cpu,01 tags=[] fields=[('host', 'serverA'), ('region', 'us-west')] timestamp=None> + cpu\,01 host="serverA",region="us-west" + + >>> LineParser.parse('cpu host="server A",region="us west"') + <Line key=cpu tags=[] fields=[('host', 'server A'), ('region', 'us west')] timestamp=None> + + >>> line='cpu ho\\=st="server A",region="us west"'; \\ + ... LineParser.parse(line); print(LineParser.parse(line)) + <Line key=cpu tags=[] fields=[('ho=st', 'server A'), ('region', 'us west')] timestamp=None> + cpu ho\=st="server A",region="us west" + + >>> print(LineParser.parse('cpu,ho\=st=server\ A field=123')) + cpu,ho\=st=server\ A field=123 + + # error: double name is accepted + >>> print(LineParser.parse('cpu,foo=bar,foo=bar field=123,field=123')) + cpu,foo=bar,foo=bar field=123,field=123 + + >>> print(LineParser.parse('cpu field12=12')) + cpu field12=12 + + >>> print(LineParser.parse('cpu field12=12 123123123')) + cpu field12=12 123123123 + + >>> try: print(LineParser.parse('cpu field12=12 1231abcdef123')) + ... except NoParseError: pass + + >>> print(LineParser.parse('cpu,x=3,y=4,z=6 field\ name="HH \\\\\\"World",x="asdf foo"')) + cpu,x=3,y=4,z=6 field\\ name="HH \\"World",x="asdf foo" + + >>> print(LineParser.parse("cpu,x=3 field\ name=\\"HH \\\\\\"World\\",x=\\"asdf foo\\"")) + cpu,x=3 field\\ name="HH \\"World",x="asdf foo" + + >>> print(LineParser.parse("cpu foo=\\"bar\\" 12345")) + cpu foo="bar" 12345 + + >>> line='"measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\\\="string field value, only \\\\" need be quoted"'; \\ + ... LineParser.parse(line); print(LineParser.parse(line)) + <Line key="measurement with quotes" tags=[('tag key with spaces', 'tag,value,with"commas"')] fields=[('field_key\\\\', 'string field value, only " need be quoted')] timestamp=None> + "measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\="string field value, only \\\" need be quoted" + + >>> LineParser.parse('disk_free value=442221834240,working\ directories="C:\My Documents\Stuff for examples,C:\My Documents"') + <Line key=disk_free tags=[] fields=[('value', 442221834240), ('working directories', 'C:\\\\My Documents\\\\Stuff for examples,C:\\\\My Documents')] timestamp=None> + + >>> LineParser.parse('disk_free value=442221834240,working\ directories="C:\My Documents\Stuff for examples,C:\My Documents" 123') + <Line key=disk_free tags=[] fields=[('value', 442221834240), ('working directories', 'C:\\\\My Documents\\\\Stuff for examples,C:\\\\My Documents')] timestamp=123> + + >>> print(LineParser.parse('foo,foo=2 "field key with space"="string field"')) + foo,foo=2 field\ key\ with\ space="string field" + + >>> print(LineParser.parse('foo,foo=2 field_key\\\\\\="string field"')) + foo,foo=2 field_key\\\\="string field" + + >>> print(LineParser.parse('foo,foo=2 field_key="string\\\\" field"')) + foo,foo=2 field_key="string\\" field" + + >>> line='foo field0="tag",field1=t,field2=true,field3=True,field4=TRUE'; \\ + ... LineParser.parse(line); print(LineParser.parse(line)) + <Line key=foo tags=[] fields=[('field0', 'tag'), ('field1', True), ('field2', True), ('field3', True), ('field4', True)] timestamp=None> + foo field0="tag",field1=True,field2=True,field3=True,field4=True + + >>> line='foo field1=f,field2=false,field3=False,field4=FALSE,field5="fag"'; \\ + ... LineParser.parse(line); print(LineParser.parse(line)) + <Line key=foo tags=[] fields=[('field1', False), ('field2', False), ('field3', False), ('field4', False), ('field5', 'fag')] timestamp=None> + foo field1=False,field2=False,field3=False,field4=False,field5="fag" + + >>> line='"measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\\\="string field value, only \\\\" need be quoted"'; \\ + ... LineParser.parse(line); print(LineParser.parse(line)) + <Line key="measurement with quotes" tags=[('tag key with spaces', 'tag,value,with"commas"')] fields=[('field_key\\\\', 'string field value, only " need be quoted')] timestamp=None> + "measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\="string field value, only \\" need be quoted" + + >>> LineParser.parse('"measurement\ with\ quotes" foo=1') + <Line key="measurement with quotes" tags=[] fields=[('foo', 1)] timestamp=None> + + >>> print(LineParser.parse('K.5S,Ccpvo="eSLyE" value="7F\\\\\\\\\\\\""')) + K.5S,Ccpvo="eSLyE" value="7F\\\\\\\\\\\"" + + >>> print(LineParser.parse('K.5S,Ccpvo=a\\ b value=1')) + K.5S,Ccpvo=a\\ b value=1 + """ + + tokval = lambda t: t.value + joinval = "".join + someToken = lambda type: some(lambda t: t.type == type) + someCharValue = lambda string: \ + reduce(lambda a, b: a + b, + map(lambda char: + some(lambda t: t.value == char) >> tokval, + string)) >> joinval + + char = someToken('Char') >> tokval + space = someToken('Space') >> tokval + comma = someToken('Comma') >> tokval + quote = someToken('Quote') >> tokval + escape = someToken('Escape') >> tokval + equal = someToken('Equal') >> tokval + true_value = (someCharValue("true") | someCharValue("t") | + someCharValue("True") | someCharValue("TRUE") | someCharValue("T")) + false_value = (someCharValue("false") | someCharValue("f") | + someCharValue("False") | someCharValue("FALSE") | someCharValue("F")) + + escape_space = skip(escape) + space >> joinval + escape_comma = skip(escape) + comma >> joinval + escape_equal = skip(escape) + equal >> joinval + escape_quote = skip(escape) + quote >> joinval + escape_escape = skip(escape) + escape >> joinval + + plain_int_text = someToken('Int') >> tokval + plain_int = plain_int_text >> (lambda v: int(v)) + plain_float_text = someToken('Float') >> tokval + plain_float = plain_float_text >> (lambda v: float(v)) + + identifier = many(char | plain_float_text | plain_int_text | + escape_space | escape_comma | escape_equal | + escape_escape | plain_int_text | quote) >> joinval + quoted_text_ = many(escape_quote | space | plain_int_text | + plain_float_text | char | comma | + escape) >> joinval + quoted_text = skip(quote) + quoted_text_ + skip(quote) + unquoted_text = many(escape_space | escape_comma | + escape_equal | escape_escape | + plain_int_text | char | quote) >> joinval + boolean_value = (true_value >> (lambda s: True) + | false_value >> (lambda s: False)) + + kv_value = plain_int | plain_float | quoted_text | boolean_value + kv = (quoted_text | unquoted_text) + skip(equal) + kv_value >> \ + (lambda x: (x[0], x[1])) + + tag = identifier + skip(equal) + identifier >> (lambda x: (x[0], x[1])) + + def setter(obj, propert): + def r(val): + setattr(obj, propert, val) + return (propert, val) + + return r + + tags = many(skip(comma) + tag) >> (lambda x: x) + fields = (kv + many(skip(comma) + kv)) >> \ + (lambda x: [x[0]] + x[1]) + + write = client.Line(None, None, None, None) + toplevel = (identifier >> setter(write, "key")) + \ + maybe(tags >> setter(write, "tags")) + \ + (skip(space) + (fields >> setter(write, "fields"))) + \ + maybe(skip(space) + plain_int >> setter(write, "timestamp")) + \ + skip(finished) >> (lambda x: x) + + result = toplevel.parse(LineParser.tokenize(line)) + # pprint(result) + return write diff --git a/pyinfluxtools/__init__.py b/pyinfluxtools/__init__.py deleted file mode 100644 index 68146ff..0000000 --- a/pyinfluxtools/__init__.py +++ /dev/null @@ -1,293 +0,0 @@ -#!/usr/bin/env python3 -import re -import sys -from functools import reduce - -from funcparserlib.lexer import make_tokenizer -from funcparserlib.parser import ( - some, maybe, many, finished, skip, NoParseError) - - -class WriteRequest(object): - - @staticmethod - def parse(lines): - """ - Parse multiple Write objects separeted by new-line character. - - >>> print(Write.parse("foo b=1")) - foo b=1 - - >>> lines = [] - >>> lines += ['cpu field=123'] - >>> lines += ['cpu,host=serverA,region=us-west field1=1,field2=2'] - >>> lines += ['cpu,host=serverA,region=us-west field1=1,field2=2 1234'] - >>> print("\\n".join(map(str, WriteRequest.parse("\\n".join(lines))))) - cpu field=123 - cpu,host=serverA,region=us-west field1=1,field2=2 - cpu,host=serverA,region=us-west field1=1,field2=2 1234 - """ - writes = map(Write.parse, lines.split("\n")) - return list(writes) - - -class Write(object): - - def __init__(self, key, tags, fields, timestamp=None): - self.key = key - self.tags = tags - self.fields = fields - self.timestamp = timestamp - - if isinstance(self.tags, dict): - self.tags = self.tags.items() - - if isinstance(self.fields, dict): - self.fields = self.fields.items() - - specs = [ - ('Comma', (r',',)), - ('Space', (r' ',)), - ('Equal', (r'=',)), - ('Quote', (r'"',)), - ('Escape', (r'\\',)), - ('Int', (r'[0-9]+(?![0-9\.])',)), - ('Float', (r'-?(\.[0-9]+)|([0-9]+(\.[0-9]*)?)',)), - ('Char', (r'.',)), - ] - - @staticmethod - def tokenize(line): - tokenizer = make_tokenizer(Write.specs) - return list(tokenizer(line)) - - @staticmethod - def parse(line): - """ - Parse a line from the POST request into a Write object. - - >>> line='cpu a=1'; Write.parse(line); print(Write.parse(line)) - <Write key=cpu tags=[] fields=[('a', 1)] timestamp=None> - cpu a=1 - - >>> print(Write.parse('yahoo.CHFGBP\\=X.ask,tag=foobar value=10.2')) - yahoo.CHFGBP\=X.ask,tag=foobar value=10.2 - - >>> Write.parse('cpu,host=serverA,region=us-west foo="bar"') - <Write key=cpu tags=[('host', 'serverA'), ('region', 'us-west')] fields=[('foo', 'bar')] timestamp=None> - - >>> print(Write.parse('cpu host="serverA",region="us-west"')) - cpu host="serverA",region="us-west" - - >>> line='cpu\\,01 host="serverA",region="us-west"'; \\ - ... Write.parse(line); print(Write.parse(line)) - <Write key=cpu,01 tags=[] fields=[('host', 'serverA'), ('region', 'us-west')] timestamp=None> - cpu\,01 host="serverA",region="us-west" - - >>> Write.parse('cpu host="server A",region="us west"') - <Write key=cpu tags=[] fields=[('host', 'server A'), ('region', 'us west')] timestamp=None> - - >>> line='cpu ho\\=st="server A",region="us west"'; \\ - ... Write.parse(line); print(Write.parse(line)) - <Write key=cpu tags=[] fields=[('ho=st', 'server A'), ('region', 'us west')] timestamp=None> - cpu ho\=st="server A",region="us west" - - >>> print(Write.parse('cpu,ho\=st=server\ A field=123')) - cpu,ho\=st=server\ A field=123 - - # error: double name is accepted - >>> print(Write.parse('cpu,foo=bar,foo=bar field=123,field=123')) - cpu,foo=bar,foo=bar field=123,field=123 - - >>> print(Write.parse('cpu field12=12')) - cpu field12=12 - - >>> print(Write.parse('cpu field12=12 123123123')) - cpu field12=12 123123123 - - >>> try: print(Write.parse('cpu field12=12 1231abcdef123')) - ... except NoParseError: pass - - >>> print(Write.parse('cpu,x=3,y=4,z=6 field\ name="HH \\\\\\"World",x="asdf foo"')) - cpu,x=3,y=4,z=6 field\\ name="HH \\"World",x="asdf foo" - - >>> print(Write.parse("cpu,x=3 field\ name=\\"HH \\\\\\"World\\",x=\\"asdf foo\\"")) - cpu,x=3 field\\ name="HH \\"World",x="asdf foo" - - >>> print(Write.parse("cpu foo=\\"bar\\" 12345")) - cpu foo="bar" 12345 - - >>> line='"measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\\\="string field value, only \\\\" need be quoted"'; \\ - ... Write.parse(line); print(Write.parse(line)) - <Write key="measurement with quotes" tags=[('tag key with spaces', 'tag,value,with"commas"')] fields=[('field_key\\\\', 'string field value, only " need be quoted')] timestamp=None> - "measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\="string field value, only \\\" need be quoted" - - >>> Write.parse('disk_free value=442221834240,working\ directories="C:\My Documents\Stuff for examples,C:\My Documents"') - <Write key=disk_free tags=[] fields=[('value', 442221834240), ('working directories', 'C:\\\\My Documents\\\\Stuff for examples,C:\\\\My Documents')] timestamp=None> - - >>> Write.parse('disk_free value=442221834240,working\ directories="C:\My Documents\Stuff for examples,C:\My Documents" 123') - <Write key=disk_free tags=[] fields=[('value', 442221834240), ('working directories', 'C:\\\\My Documents\\\\Stuff for examples,C:\\\\My Documents')] timestamp=123> - - >>> print(Write.parse('foo,foo=2 "field key with space"="string field"')) - foo,foo=2 field\ key\ with\ space="string field" - - >>> print(Write.parse('foo,foo=2 field_key\\\\\\="string field"')) - foo,foo=2 field_key\\\\="string field" - - >>> print(Write.parse('foo,foo=2 field_key="string\\\\" field"')) - foo,foo=2 field_key="string\\" field" - - >>> line='foo field0="tag",field1=t,field2=true,field3=True,field4=TRUE'; \\ - ... Write.parse(line); print(Write.parse(line)) - <Write key=foo tags=[] fields=[('field0', 'tag'), ('field1', True), ('field2', True), ('field3', True), ('field4', True)] timestamp=None> - foo field0="tag",field1=True,field2=True,field3=True,field4=True - - >>> line='foo field1=f,field2=false,field3=False,field4=FALSE,field5="fag"'; \\ - ... Write.parse(line); print(Write.parse(line)) - <Write key=foo tags=[] fields=[('field1', False), ('field2', False), ('field3', False), ('field4', False), ('field5', 'fag')] timestamp=None> - foo field1=False,field2=False,field3=False,field4=False,field5="fag" - - >>> line='"measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\\\="string field value, only \\\\" need be quoted"'; \\ - ... Write.parse(line); print(Write.parse(line)) - <Write key="measurement with quotes" tags=[('tag key with spaces', 'tag,value,with"commas"')] fields=[('field_key\\\\', 'string field value, only " need be quoted')] timestamp=None> - "measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\="string field value, only \\" need be quoted" - - >>> Write.parse('"measurement\ with\ quotes" foo=1') - <Write key="measurement with quotes" tags=[] fields=[('foo', 1)] timestamp=None> - - >>> print(Write.parse('K.5S,Ccpvo="eSLyE" value="7F\\\\\\\\\\\\""')) - K.5S,Ccpvo="eSLyE" value="7F\\\\\\\\\\\"" - - >>> print(Write.parse('K.5S,Ccpvo=a\\ b value=1')) - K.5S,Ccpvo=a\\ b value=1 - - >>> print(Write('test', [('a','b')], [('value','asd\\\\')])) - test,a=b value="asd\\\\" - """ - - tokval = lambda t: t.value - joinval = "".join - someToken = lambda type: some(lambda t: t.type == type) - someCharValue = lambda string: \ - reduce(lambda a, b: a + b, - map(lambda char: - some(lambda t: t.value == char) >> tokval, - string)) >> joinval - - char = someToken('Char') >> tokval - space = someToken('Space') >> tokval - comma = someToken('Comma') >> tokval - quote = someToken('Quote') >> tokval - escape = someToken('Escape') >> tokval - equal = someToken('Equal') >> tokval - true_value = (someCharValue("true") | someCharValue("t") | - someCharValue("True") | someCharValue("TRUE") | someCharValue("T")) - false_value = (someCharValue("false") | someCharValue("f") | - someCharValue("False") | someCharValue("FALSE") | someCharValue("F")) - - escape_space = skip(escape) + space >> joinval - escape_comma = skip(escape) + comma >> joinval - escape_equal = skip(escape) + equal >> joinval - escape_quote = skip(escape) + quote >> joinval - escape_escape = skip(escape) + escape >> joinval - - plain_int_text = someToken('Int') >> tokval - plain_int = plain_int_text >> (lambda v: int(v)) - plain_float_text = someToken('Float') >> tokval - plain_float = plain_float_text >> (lambda v: float(v)) - - identifier = many(char | plain_float_text | plain_int_text | - escape_space | escape_comma | escape_equal | - escape_escape | plain_int_text | quote) >> joinval - quoted_text_ = many(escape_quote | space | plain_int_text | - plain_float_text | char | comma | - escape) >> joinval - quoted_text = skip(quote) + quoted_text_ + skip(quote) - unquoted_text = many(escape_space | escape_comma | - escape_equal | escape_escape | - plain_int_text | char | quote) >> joinval - boolean_value = (true_value >> (lambda s: True) - | false_value >> (lambda s: False)) - - kv_value = plain_int | plain_float | quoted_text | boolean_value - kv = (quoted_text | unquoted_text) + skip(equal) + kv_value >> \ - (lambda x: (x[0], x[1])) - - tag = identifier + skip(equal) + identifier >> (lambda x: (x[0], x[1])) - - def setter(obj, propert): - def r(val): - setattr(obj, propert, val) - return (propert, val) - return r - - tags = many(skip(comma) + tag) >> (lambda x: x) - fields = (kv + many(skip(comma) + kv)) >> \ - (lambda x: [x[0]] + x[1]) - - write = Write(None, None, None, None) - toplevel = (identifier >> setter(write, "key")) + \ - maybe(tags >> setter(write, "tags")) + \ - (skip(space) + (fields >> setter(write, "fields"))) + \ - maybe(skip(space) + plain_int >> setter(write, "timestamp")) + \ - skip(finished) >> (lambda x: x) - - result = toplevel.parse(Write.tokenize(line)) - # pprint(result) - return write - - def __repr__(self): - return "<{} key={} tags={} fields={} timestamp={}>".format( - self.__class__.__name__, self.key, self.tags, self.fields, self.timestamp) - - @staticmethod - def escape_identifier(string): - return re.sub(r'([\\,= ])', '\\\\\\1', string) - - @staticmethod - def escape_tags(taglist): - return ",".join(map(lambda kv: - (Write.escape_identifier(kv[0]) - + "=" + Write.escape_identifier(kv[1])), - taglist)) - - @staticmethod - def escape_value(obj): - if (isinstance(obj, float) - or isinstance(obj, int) - or isinstance(obj, bool)): - return str(obj) - else: - obj = str(obj) - return "\"" + obj.replace("\\", "\\\\").replace("\"", "\\\"") + "\"" - - @staticmethod - def escape_fields(kvlist): - def escape_key(string): - return re.sub(r'(["\\,= ])', '\\\\\\1', string) - - return ",".join( - map(lambda kv: escape_key(kv[0]) + "=" + Write.escape_value(kv[1]), - kvlist)) - - def __str__(self): - result = self.escape_identifier(self.key) - - if self.tags: - result += "," - result += self.escape_tags(self.tags) - - if self.fields: - result += " " - result += self.escape_fields(self.fields) - - if self.timestamp: - result += " " - result += str(self.timestamp) - - return result - - -if __name__ == "__main__": - import doctest - doctest.testmod() @@ -1,24 +1,23 @@ -#!/usr/bin/env python - +#!/usr/bin/env python3 from distutils.core import setup -setup(name='pyinfluxtools', - version='0.1', - description='Python classes to work with influxdb', - author='Yves Fischer', - author_email='yvesf+github@xapek.org', - license="MIT", - packages = ['pyinfluxtools'], -# url='https://github.com/', -# scripts=[], - install_requires=['funcparserlib==0.3.6'], - classifiers = [ - "Programming Language :: Python", - "Programming Language :: Python :: 3", - "Development Status :: 3 - Alpha", - "Intended Audience :: Developers", - "Operating System :: OS Independent", - "License :: OSI Approved :: MIT License", - ] -) +version = '0.1' +setup(name='pyinflux', + version=version, + description='Python classes to work with influxdb', + author='Yves Fischer', + author_email='yvesf+git@xapek.org', + license="MIT", + py_modules=['pyinflux'], + # url='https://github.com/', + install_requires=[], + extras_require={'parser': ['funcparserlib==0.3.6']}, + classifiers=[ + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Operating System :: OS Independent", + "License :: OSI Approved :: MIT License", + ]) @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 +from doctest import testmod +from pyinflux import client, parser + +if __name__ == '__main__': + testmod(m=client) + testmod(m=parser) |