From 76289024543fbfb6df3d07f61f15eabc623dd246 Mon Sep 17 00:00:00 2001 From: Yves Fischer Date: Sat, 23 Jul 2016 18:42:06 +0200 Subject: restructure code rename module to pyinflux split in client and parser package. the parser depends also on funcparserlib contains 3 examples of strange influxdb behavior --- .gitignore | 2 + LICENSE | 20 +++ MANIFEST.in | 1 + README.md | 3 + anotherBug.py | 37 ++++++ fuzzer.py | 132 -------------------- fuzzer1.py | 74 +++++++++++ fuzzer2.py | 132 ++++++++++++++++++++ pyinflux/__init__.py | 1 + pyinflux/client/__init__.py | 149 ++++++++++++++++++++++ pyinflux/parser/__init__.py | 222 +++++++++++++++++++++++++++++++++ pyinfluxtools/__init__.py | 293 -------------------------------------------- setup.py | 41 +++---- test.py | 7 ++ 14 files changed, 668 insertions(+), 446 deletions(-) create mode 100644 LICENSE create mode 100644 MANIFEST.in create mode 100644 README.md create mode 100755 anotherBug.py delete mode 100755 fuzzer.py create mode 100755 fuzzer1.py create mode 100755 fuzzer2.py create mode 100644 pyinflux/__init__.py create mode 100644 pyinflux/client/__init__.py create mode 100644 pyinflux/parser/__init__.py delete mode 100644 pyinfluxtools/__init__.py create mode 100755 test.py diff --git a/.gitignore b/.gitignore index 72723e5..ffd574f 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ *pyc +/MANIFEST +/.idea \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..ff8652d --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2016 Yves Fischer + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..45d276d --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include pyinflux/* \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e931f93 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# pyinflux tools + +the files `fuzzer1.py` and `fuzzer2.py` contain usage examples. \ No newline at end of file diff --git a/anotherBug.py b/anotherBug.py new file mode 100755 index 0000000..ce52cf4 --- /dev/null +++ b/anotherBug.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +""" +another bug? +in 2016/07/23 18:37:37 InfluxDB starting, version 0.13.0, branch 0.13, commit e57fb88a051ee40fd9277094345fbd47bb4783ce +the escaping of the 'key' or 'name' is strange +""" +from pyinflux.client import Influx, Line + +influxdb = Influx('localhost') + +print(influxdb.execute('DROP DATABASE test').as_text()) +print(influxdb.execute('CREATE DATABASE test').as_text()) + + +def write(line): + print(repr(line)) + print(str(line)) + return influxdb.write_db('test', [line]) + + +print(write(Line('asd\\', {'tag1': ''}, {'field1': ''}))) +print(write(Line('asd\\\\', {'tag1': ''}, {'field1': ''}))) + + +def query(q): + print(q) + try: + return influxdb.query_db('test', q).as_json() + except Exception as e: + return str(e) + +# at least some of these query must return something... +print(query('SELECT * FROM asd')) +print(query('SELECT * FROM asd\\')) +print(query('SELECT * FROM asd\\\\')) +print(query('SELECT * FROM "asd"')) +print(query('SELECT * FROM "asd\\\\"')) diff --git a/fuzzer.py b/fuzzer.py deleted file mode 100755 index ffdb0e7..0000000 --- a/fuzzer.py +++ /dev/null @@ -1,132 +0,0 @@ -#!/usr/bin/env python3 -import functools -import random -from multiprocessing.pool import ThreadPool as Pool - -import requests -from pyinfluxtools import * -from influxdb import InfluxDBClient - -settings = { - 'host': 'localhost', - 'port': 8086, - 'username': 'root', - 'password': 'root', - 'database': 'test', -} - -write_url = ("http://{host}:{port}/write?db={database}" + - "&username={username}&password={password}" - ).format(**settings) - -client = InfluxDBClient(settings['host'], settings['port'], - settings['username'], settings['password'], - settings['database']) - - -class Generator: - all = lambda x: bytes(chr(random.randint(0x00, 0xff)), 'latin-1') - _printables = list(map(chr, list(range(0x20, 0x7E)))) - printable = lambda x: random.choice(Generator._printables) - numericText = lambda x: chr(random.randint(ord("0"), ord("9"))) - text = lambda x: chr(random.choice( - list(range(ord("a"), ord("z"))) + - list(range(ord("A"), ord("Z"))))) - - -class Filter: - regex = lambda regex: re.compile(regex).match - pass_all = lambda _: True - - -def generate(length_min, length_max, filter, generator): - length = random.randint(length_min, length_max) - while True: - text = functools.reduce( - lambda a, b: a + b, map(generator, range(length))) - if filter(text): - return text - - -def run(*a): - while True: - test() - - -def test(): - # influxdb doesn't allow measurements starting with { - noBrace = Filter.regex("^[^\\{]") - noHash = Filter.regex("^[^#]") # hash sign starts a comment - # TODO: only allow valid escape sequences - noEscape = Filter.regex("^[^\\\\]+$") - - def generateKey(min=3,max=6): - return generate(min, max, lambda text: - noBrace(text) and noHash(text) and noEscape(text), - Generator.printable) - - def generateTags(min, max): - def generateTagPairs(): - for i in range(random.randint(min, max)): - yield (generateKey(), generateKey()) - - return dict(tuple(generateTagPairs())) - - def generateFields(min, max): - def generateFieldPairs(): - for i in range(random.randint(min,max)): - yield (generateKey(), generate(3, 6, lambda _: True, Generator.printable)) - return dict(tuple(generateFieldPairs())) - - w = Write(generateKey(8,12), # key - generateTags(1, 4), # tags - generateFields(1, 4)) # fields - line = str(w) - print(line) - try: - r = requests.post(write_url, line) - INFO = ("Data:\nline={}\nwrite={}\nr.status_code={}\n" + - "r.content={}").format(line, repr(w), r.status_code, r.text) - assert r.status_code == 204, INFO - except Exception as e: - print(e) - assert False, "Data\nline={}\nwrite={}".format(str(w), repr(w)) - - measurement = Write.escape_value(w.key) - query = """\ -SELECT * -FROM {measurement} -WHERE time >= now() - 2s""".format(**locals()) - result = [] - try: - result = list(client.query(query)) - except Exception as e: - print(e) - wrepr=repr(w) - DEBUGINFO = ("DEBUG:\nquery={query}\nwrite={wrepr}\n" + - "result={result}\nline={line}").format(**locals()) - assert len(result) == 1, DEBUGINFO - assert len(result[0][0]) == len(w.tags) + len(w.fields) + 1, DEBUGINFO # + time - - # assert fields - for (key, value) in w.fields: - assert key in result[0][0], DEBUGINFO - assert result[0][0][key] == value, DEBUGINFO + \ - "\n"+ key +"=" + result[0][0][key] + "=" + value - - # assert tags - for (tagKey, tagValue) in w.tags: - assert tagKey in result[0][0], DEBUGINFO - assert result[0][0][tagKey] == tagValue, DEBUGINFO - - -N_PROC = 4 -with Pool(processes=N_PROC) as pool: - try: - for res in pool.imap_unordered(run, [None] * N_PROC): - pool.terminate() - print("Exit") - raise SystemExit(0) - except Exception as e: - pool.terminate() - raise e diff --git a/fuzzer1.py b/fuzzer1.py new file mode 100755 index 0000000..57b5de6 --- /dev/null +++ b/fuzzer1.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +""" +in 2016/07/23 18:37:37 InfluxDB starting, version 0.13.0, branch 0.13, commit e57fb88a051ee40fd9277094345fbd47bb4783ce +this fuzzer can cause a concurrent read write crash +""" +import itertools +from multiprocessing.pool import ThreadPool as Pool +from pyinflux.client import InfluxDB, Line + +influxdb = InfluxDB('test', 'localhost') + +value_generator = itertools.count() + + +def test(number): + for value in value_generator: + if value + 1 % 500 == 0: + print("thread {} at {}".format(number, value)) + expected = "value{value}".format(value=value) + line = Line('series' + str(value), + {'tag': 'tag' + expected}, + {'field': 'field' + expected}) + + try: + influxdb.write([line]) + except: + print(line) + raise + + query = """\ + SELECT * + FROM "series{value}" + WHERE time >= now() - 2s""".format(value=value) + try: + results = influxdb.query(query).as_json()['results'] + except: + print("""\ + Failure. With the following INSERT: + {line}""".format(line=line)) + raise + try: + assert len(results) == 1 + series = results[0]['series'] + assert len(series) == 1 + series_result = series[0] + assert series_result['name'] == 'series' + str(value) + assert len(series_result['values']) == 1 + values = series_result['values'] + assert len(values) == 1 + assert len(values[0]) == 3 + columns = series_result['columns'] + assert columns == ['time', 'field', 'tag'] + + assert values[0][columns.index('field')] == 'fieldvalue' + str(value) + assert values[0][columns.index('tag')] == 'tagvalue' + str(value) + except: + print("""\ + Failure. With the following INSERT: + {line} + + Got this result: + {results}""".format(line=line, results=results)) + raise + + +print(influxdb.query('DROP DATABASE test').as_text()) +print(influxdb.query('CREATE DATABASE test').as_text()) + +N_PROC = 2 +with Pool(processes=N_PROC) as pool: + for res in pool.imap_unordered(test, range(1, N_PROC + 1)): + # they are suppossed to run endlessly, if one returns then there was a failure + pool.terminate() +print('Done') diff --git a/fuzzer2.py b/fuzzer2.py new file mode 100755 index 0000000..14932a8 --- /dev/null +++ b/fuzzer2.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 +""" +in 2016/07/23 18:37:37 InfluxDB starting, version 0.13.0, branch 0.13, commit e57fb88a051ee40fd9277094345fbd47bb4783ce +this fuzzer can generate write line-statements that are parsed differently than how they are meant +""" +import re +import functools +import random +from multiprocessing.pool import ThreadPool as Pool + +from pyinflux.client import Line, InfluxDB + +influxdb = InfluxDB('test', 'localhost') + + +class Generator: + all = lambda x: bytes(chr(random.randint(0x00, 0xff)), 'latin-1') + _printables = list(map(chr, list(range(0x20, 0x7E)))) + printable = lambda x: random.choice(Generator._printables) + numericText = lambda x: chr(random.randint(ord("0"), ord("9"))) + text = lambda x: chr(random.choice( + list(range(ord("a"), ord("z"))) + + list(range(ord("A"), ord("Z"))))) + basicText = lambda x: chr(random.choice( + [ord("\""), ord(" "), ord("\\")] + + list(range(ord("a"), ord("z"))) + + list(range(ord("A"), ord("Z"))))) + + +class Filter: + regex = lambda regex: re.compile(regex).match + pass_all = lambda _: True + + +def generate(length_min, length_max, filter, generator): + length = random.randint(length_min, length_max) + while True: + text = functools.reduce( + lambda a, b: a + b, map(generator, range(length))) + if filter(text): + return text + + +def run(*a): + while True: + test() + + +usedKeys = set() + + +def test(): + # influxdb doesn't allow measurements starting with { + noBrace = Filter.regex("^[^\\{]") + noHash = Filter.regex("^[^#]") # hash sign starts a comment + # TODO: only allow valid escape sequences + noEscape = Filter.regex("^[^\\\\]+$") + noUsed = lambda term: term not in usedKeys + + def generateKey(min=3, max=6): + return generate(min, max, lambda text: noUsed(text) + and noEscape(text), # for now, see anotherBug.py + Generator.basicText) + + def generateTags(min, max): + def generateTagPairs(): + for i in range(random.randint(min, max)): + yield (generateKey(), generateKey()) + + return dict(tuple(generateTagPairs())) + + def generateFields(min, max): + def generateFieldPairs(): + for i in range(random.randint(min, max)): + yield (generateKey(), generate(3, 6, lambda text: True + , Generator.basicText)) + + return dict(tuple(generateFieldPairs())) + + w = Line(generateKey(8, 12), # key + generateTags(1, 4), # tags + generateFields(1, 4)) # fields + usedKeys.add(w.key) + try: + text = influxdb.write([w]) + except Exception as e: + print("Data\nline={}\nwrite={}".format(str(w), repr(w))) + raise e + + measurement = Line.escape_value(w.key) + query = """\ +SELECT * +FROM {measurement} +WHERE time >= now() - 2s""".format(**locals()) + query_response = [] + try: + query_response = influxdb.query(query).as_json() + except Exception as e: + print("Data\nline={}\nwrite={}".format(str(w), repr(w))) + raise e + DEBUGINFO = ("DEBUG:\nquery={query}\nwrite={w}\n" + + "query_response={query_response}").format(**locals()) + try: + assert len(query_response) == 1, DEBUGINFO + + results = query_response['results'] + assert len(results) == 1 + + result = results[0] + series = result['series'] + for serie in series: + assert serie['name'] == w.key + assert len(serie['columns']) == len(w.tags) + len(w.fields) + 1, \ + "Length of colums: {} != {}".format(len(serie['columns']), len(w.tags) + len(w.fields)) + except: + print("Data\nline={}\nwrite={}\nquery={}\nquery_response={}".format(str(w), repr(w), query, query_response)) + raise + + +print(influxdb.query('DROP DATABASE test').as_text()) +print(influxdb.query('CREATE DATABASE test').as_text()) + +N_PROC = 2 +with Pool(processes=N_PROC) as pool: + try: + for res in pool.imap_unordered(run, [None] * N_PROC): + pool.terminate() + print("Exit") + raise SystemExit(0) + except: + pool.terminate() + raise diff --git a/pyinflux/__init__.py b/pyinflux/__init__.py new file mode 100644 index 0000000..bc63beb --- /dev/null +++ b/pyinflux/__init__.py @@ -0,0 +1 @@ +# empty \ No newline at end of file diff --git a/pyinflux/client/__init__.py b/pyinflux/client/__init__.py new file mode 100644 index 0000000..b485861 --- /dev/null +++ b/pyinflux/client/__init__.py @@ -0,0 +1,149 @@ +import typing +import io +import re +from urllib.request import urlopen +from urllib.parse import quote as urlquote, urlencode +import json +import codecs + + +class Line(object): + def __init__(self, key, tags, fields, timestamp=None): + self.key = key + self.tags = tags + self.fields = fields + self.timestamp = timestamp + + if isinstance(self.tags, dict): + self.tags = self.tags.items() + + if isinstance(self.fields, dict): + self.fields = self.fields.items() + + @staticmethod + def escape_identifier(string): + return re.sub(r'([\\,= ])', '\\\\\\1', string) + + @staticmethod + def escape_tags(taglist): + return ",".join(map(lambda kv: + (Line.escape_identifier(kv[0]) + "=" + Line.escape_identifier(kv[1])), + taglist)) + + @staticmethod + def escape_value(obj): + if (isinstance(obj, float) or + isinstance(obj, int) or + isinstance(obj, bool)): + return str(obj) + else: + obj = str(obj) + return "\"" + obj.replace("\\", "\\\\").replace("\"", "\\\"") + "\"" + + @staticmethod + def escape_fields(kvlist): + def escape_key(string): + return re.sub(r'(["\\,= ])', '\\\\\\1', string) + + return ",".join( + map(lambda kv: escape_key(kv[0]) + "=" + Line.escape_value(kv[1]), + kvlist)) + + def __repr__(self): + """ + >>> print(repr(Line('test', [('a','b')], [('value','asd\\\\')]))) + + """ + return "<{} key={} tags={} fields={} timestamp={}>".format( + self.__class__.__name__, self.key, self.tags, self.fields, self.timestamp) + + def __str__(self): + """ + >>> print(Line('test', [('a','b')], [('value','asd\\\\')])) + test,a=b value="asd\\\\" + """ + result = self.escape_identifier(self.key) + + if self.tags: + result += "," + result += self.escape_tags(self.tags) + + if self.fields: + result += " " + result += self.escape_fields(self.fields) + + if self.timestamp: + result += " " + result += str(self.timestamp) + + return result + + +class QueryResultOption: + CODEC = codecs.getreader('utf-8') + + def __init__(self, exec_func: typing.Callable[[], io.IOBase]): + self.exec_func = exec_func + self._json = None + self._text = None + + def as_json(self): + if self._json is None: + fh = self.CODEC(self.exec_func()) + self._json = json.load(fh) + fh.close() + return self._json + + def as_text(self): + if self._text is None: + fh = self.CODEC(self.exec_func()) + self._text = fh.read() + fh.close() + return self._text + + +class Influx: + def __init__(self, host: str, port: int = 8086, username: str = None, password: str = None): + """ + :param username: username and password: + :param password: if set both must be set + """ + self._write_url = "http://{host}:{port}/write?".format(**locals()) + self._query_url_get = "http://{host}:{port}/query?".format(**locals()) + self._query_url_post = "http://{host}:{port}/query".format(**locals()) + if username and password: + self._write_url += 'username=' + username + '&password' + password + '&' + self._query_url_get += 'username=' + username + '&password' + password + '&' + self._query_url_post += '?username=' + username + '&password' + password + + def write_db(self, db: str, lines: [Line]): + url = self._write_url + "db=" + urlquote(db) + request_data = "\n".join(map(str, lines)).encode('utf-8') + with urlopen(url, request_data) as fh: + response = fh.read() + return response.decode('utf-8') + + def query_db(self, db: str, query: str) -> QueryResultOption: + def get_fh() -> io.IOBase: + url = self._query_url_get + 'db=' + urlquote(db) + '&q=' + urlquote(query) + return urlopen(url) + + return QueryResultOption(get_fh) + + def execute(self, query: str) -> QueryResultOption: + def get_fh() -> io.IOBase: + return urlopen(self._query_url_post, urlencode({'q': query}).encode('utf-8')) + + return QueryResultOption(get_fh) + + +class InfluxDB(Influx): + def __init__(self, db: str, host: str, port: int = 8086, username: str = None, password: str = None): + super().__init__(host, port, username, password) + self._db = db + + def write(self, lines: [Line]): + return self.write_db(self._db, lines) + + def query(self, query: str): + return self.query_db(self._db, query) diff --git a/pyinflux/parser/__init__.py b/pyinflux/parser/__init__.py new file mode 100644 index 0000000..2de6c0e --- /dev/null +++ b/pyinflux/parser/__init__.py @@ -0,0 +1,222 @@ +from functools import reduce + +try: + from funcparserlib.lexer import make_tokenizer + from funcparserlib.parser import ( + some, maybe, many, finished, skip, NoParseError) +except ImportError as e: + print("Missing funcparserlib library. You need to install the 'parser' extra dependency set.") + raise e + +from pyinflux import client + + +def parse_lines(lines): + """ + Parse multiple Write objects separeted by new-line character. + + >>> print(LineParser.parse("foo b=1")) + foo b=1 + + >>> lines = [] + >>> lines += ['cpu field=123'] + >>> lines += ['cpu,host=serverA,region=us-west field1=1,field2=2'] + >>> lines += ['cpu,host=serverA,region=us-west field1=1,field2=2 1234'] + >>> print("\\n".join(map(str, parse_lines("\\n".join(lines))))) + cpu field=123 + cpu,host=serverA,region=us-west field1=1,field2=2 + cpu,host=serverA,region=us-west field1=1,field2=2 1234 + """ + writes = map(LineParser.parse, lines.split("\n")) + return list(writes) + + +class LineParser(object): + specs = [ + ('Comma', (r',',)), + ('Space', (r' ',)), + ('Equal', (r'=',)), + ('Quote', (r'"',)), + ('Escape', (r'\\',)), + ('Int', (r'[0-9]+(?![0-9\.])',)), + ('Float', (r'-?(\.[0-9]+)|([0-9]+(\.[0-9]*)?)',)), + ('Char', (r'.',)), + ] + + @classmethod + def tokenize(klass, line : str): + tokenizer = make_tokenizer(klass.specs) + return list(tokenizer(line)) + + @staticmethod + def parse(line): + """ + Parse a line from the POST request into a Write object. + + >>> line='cpu a=1'; LineParser.parse(line); print(LineParser.parse(line)) + + cpu a=1 + + >>> print(LineParser.parse('yahoo.CHFGBP\\=X.ask,tag=foobar value=10.2')) + yahoo.CHFGBP\=X.ask,tag=foobar value=10.2 + + >>> LineParser.parse('cpu,host=serverA,region=us-west foo="bar"') + + + >>> print(LineParser.parse('cpu host="serverA",region="us-west"')) + cpu host="serverA",region="us-west" + + >>> line='cpu\\,01 host="serverA",region="us-west"'; \\ + ... LineParser.parse(line); print(LineParser.parse(line)) + + cpu\,01 host="serverA",region="us-west" + + >>> LineParser.parse('cpu host="server A",region="us west"') + + + >>> line='cpu ho\\=st="server A",region="us west"'; \\ + ... LineParser.parse(line); print(LineParser.parse(line)) + + cpu ho\=st="server A",region="us west" + + >>> print(LineParser.parse('cpu,ho\=st=server\ A field=123')) + cpu,ho\=st=server\ A field=123 + + # error: double name is accepted + >>> print(LineParser.parse('cpu,foo=bar,foo=bar field=123,field=123')) + cpu,foo=bar,foo=bar field=123,field=123 + + >>> print(LineParser.parse('cpu field12=12')) + cpu field12=12 + + >>> print(LineParser.parse('cpu field12=12 123123123')) + cpu field12=12 123123123 + + >>> try: print(LineParser.parse('cpu field12=12 1231abcdef123')) + ... except NoParseError: pass + + >>> print(LineParser.parse('cpu,x=3,y=4,z=6 field\ name="HH \\\\\\"World",x="asdf foo"')) + cpu,x=3,y=4,z=6 field\\ name="HH \\"World",x="asdf foo" + + >>> print(LineParser.parse("cpu,x=3 field\ name=\\"HH \\\\\\"World\\",x=\\"asdf foo\\"")) + cpu,x=3 field\\ name="HH \\"World",x="asdf foo" + + >>> print(LineParser.parse("cpu foo=\\"bar\\" 12345")) + cpu foo="bar" 12345 + + >>> line='"measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\\\="string field value, only \\\\" need be quoted"'; \\ + ... LineParser.parse(line); print(LineParser.parse(line)) + + "measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\="string field value, only \\\" need be quoted" + + >>> LineParser.parse('disk_free value=442221834240,working\ directories="C:\My Documents\Stuff for examples,C:\My Documents"') + + + >>> LineParser.parse('disk_free value=442221834240,working\ directories="C:\My Documents\Stuff for examples,C:\My Documents" 123') + + + >>> print(LineParser.parse('foo,foo=2 "field key with space"="string field"')) + foo,foo=2 field\ key\ with\ space="string field" + + >>> print(LineParser.parse('foo,foo=2 field_key\\\\\\="string field"')) + foo,foo=2 field_key\\\\="string field" + + >>> print(LineParser.parse('foo,foo=2 field_key="string\\\\" field"')) + foo,foo=2 field_key="string\\" field" + + >>> line='foo field0="tag",field1=t,field2=true,field3=True,field4=TRUE'; \\ + ... LineParser.parse(line); print(LineParser.parse(line)) + + foo field0="tag",field1=True,field2=True,field3=True,field4=True + + >>> line='foo field1=f,field2=false,field3=False,field4=FALSE,field5="fag"'; \\ + ... LineParser.parse(line); print(LineParser.parse(line)) + + foo field1=False,field2=False,field3=False,field4=False,field5="fag" + + >>> line='"measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\\\="string field value, only \\\\" need be quoted"'; \\ + ... LineParser.parse(line); print(LineParser.parse(line)) + + "measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\="string field value, only \\" need be quoted" + + >>> LineParser.parse('"measurement\ with\ quotes" foo=1') + + + >>> print(LineParser.parse('K.5S,Ccpvo="eSLyE" value="7F\\\\\\\\\\\\""')) + K.5S,Ccpvo="eSLyE" value="7F\\\\\\\\\\\"" + + >>> print(LineParser.parse('K.5S,Ccpvo=a\\ b value=1')) + K.5S,Ccpvo=a\\ b value=1 + """ + + tokval = lambda t: t.value + joinval = "".join + someToken = lambda type: some(lambda t: t.type == type) + someCharValue = lambda string: \ + reduce(lambda a, b: a + b, + map(lambda char: + some(lambda t: t.value == char) >> tokval, + string)) >> joinval + + char = someToken('Char') >> tokval + space = someToken('Space') >> tokval + comma = someToken('Comma') >> tokval + quote = someToken('Quote') >> tokval + escape = someToken('Escape') >> tokval + equal = someToken('Equal') >> tokval + true_value = (someCharValue("true") | someCharValue("t") | + someCharValue("True") | someCharValue("TRUE") | someCharValue("T")) + false_value = (someCharValue("false") | someCharValue("f") | + someCharValue("False") | someCharValue("FALSE") | someCharValue("F")) + + escape_space = skip(escape) + space >> joinval + escape_comma = skip(escape) + comma >> joinval + escape_equal = skip(escape) + equal >> joinval + escape_quote = skip(escape) + quote >> joinval + escape_escape = skip(escape) + escape >> joinval + + plain_int_text = someToken('Int') >> tokval + plain_int = plain_int_text >> (lambda v: int(v)) + plain_float_text = someToken('Float') >> tokval + plain_float = plain_float_text >> (lambda v: float(v)) + + identifier = many(char | plain_float_text | plain_int_text | + escape_space | escape_comma | escape_equal | + escape_escape | plain_int_text | quote) >> joinval + quoted_text_ = many(escape_quote | space | plain_int_text | + plain_float_text | char | comma | + escape) >> joinval + quoted_text = skip(quote) + quoted_text_ + skip(quote) + unquoted_text = many(escape_space | escape_comma | + escape_equal | escape_escape | + plain_int_text | char | quote) >> joinval + boolean_value = (true_value >> (lambda s: True) + | false_value >> (lambda s: False)) + + kv_value = plain_int | plain_float | quoted_text | boolean_value + kv = (quoted_text | unquoted_text) + skip(equal) + kv_value >> \ + (lambda x: (x[0], x[1])) + + tag = identifier + skip(equal) + identifier >> (lambda x: (x[0], x[1])) + + def setter(obj, propert): + def r(val): + setattr(obj, propert, val) + return (propert, val) + + return r + + tags = many(skip(comma) + tag) >> (lambda x: x) + fields = (kv + many(skip(comma) + kv)) >> \ + (lambda x: [x[0]] + x[1]) + + write = client.Line(None, None, None, None) + toplevel = (identifier >> setter(write, "key")) + \ + maybe(tags >> setter(write, "tags")) + \ + (skip(space) + (fields >> setter(write, "fields"))) + \ + maybe(skip(space) + plain_int >> setter(write, "timestamp")) + \ + skip(finished) >> (lambda x: x) + + result = toplevel.parse(LineParser.tokenize(line)) + # pprint(result) + return write diff --git a/pyinfluxtools/__init__.py b/pyinfluxtools/__init__.py deleted file mode 100644 index 68146ff..0000000 --- a/pyinfluxtools/__init__.py +++ /dev/null @@ -1,293 +0,0 @@ -#!/usr/bin/env python3 -import re -import sys -from functools import reduce - -from funcparserlib.lexer import make_tokenizer -from funcparserlib.parser import ( - some, maybe, many, finished, skip, NoParseError) - - -class WriteRequest(object): - - @staticmethod - def parse(lines): - """ - Parse multiple Write objects separeted by new-line character. - - >>> print(Write.parse("foo b=1")) - foo b=1 - - >>> lines = [] - >>> lines += ['cpu field=123'] - >>> lines += ['cpu,host=serverA,region=us-west field1=1,field2=2'] - >>> lines += ['cpu,host=serverA,region=us-west field1=1,field2=2 1234'] - >>> print("\\n".join(map(str, WriteRequest.parse("\\n".join(lines))))) - cpu field=123 - cpu,host=serverA,region=us-west field1=1,field2=2 - cpu,host=serverA,region=us-west field1=1,field2=2 1234 - """ - writes = map(Write.parse, lines.split("\n")) - return list(writes) - - -class Write(object): - - def __init__(self, key, tags, fields, timestamp=None): - self.key = key - self.tags = tags - self.fields = fields - self.timestamp = timestamp - - if isinstance(self.tags, dict): - self.tags = self.tags.items() - - if isinstance(self.fields, dict): - self.fields = self.fields.items() - - specs = [ - ('Comma', (r',',)), - ('Space', (r' ',)), - ('Equal', (r'=',)), - ('Quote', (r'"',)), - ('Escape', (r'\\',)), - ('Int', (r'[0-9]+(?![0-9\.])',)), - ('Float', (r'-?(\.[0-9]+)|([0-9]+(\.[0-9]*)?)',)), - ('Char', (r'.',)), - ] - - @staticmethod - def tokenize(line): - tokenizer = make_tokenizer(Write.specs) - return list(tokenizer(line)) - - @staticmethod - def parse(line): - """ - Parse a line from the POST request into a Write object. - - >>> line='cpu a=1'; Write.parse(line); print(Write.parse(line)) - - cpu a=1 - - >>> print(Write.parse('yahoo.CHFGBP\\=X.ask,tag=foobar value=10.2')) - yahoo.CHFGBP\=X.ask,tag=foobar value=10.2 - - >>> Write.parse('cpu,host=serverA,region=us-west foo="bar"') - - - >>> print(Write.parse('cpu host="serverA",region="us-west"')) - cpu host="serverA",region="us-west" - - >>> line='cpu\\,01 host="serverA",region="us-west"'; \\ - ... Write.parse(line); print(Write.parse(line)) - - cpu\,01 host="serverA",region="us-west" - - >>> Write.parse('cpu host="server A",region="us west"') - - - >>> line='cpu ho\\=st="server A",region="us west"'; \\ - ... Write.parse(line); print(Write.parse(line)) - - cpu ho\=st="server A",region="us west" - - >>> print(Write.parse('cpu,ho\=st=server\ A field=123')) - cpu,ho\=st=server\ A field=123 - - # error: double name is accepted - >>> print(Write.parse('cpu,foo=bar,foo=bar field=123,field=123')) - cpu,foo=bar,foo=bar field=123,field=123 - - >>> print(Write.parse('cpu field12=12')) - cpu field12=12 - - >>> print(Write.parse('cpu field12=12 123123123')) - cpu field12=12 123123123 - - >>> try: print(Write.parse('cpu field12=12 1231abcdef123')) - ... except NoParseError: pass - - >>> print(Write.parse('cpu,x=3,y=4,z=6 field\ name="HH \\\\\\"World",x="asdf foo"')) - cpu,x=3,y=4,z=6 field\\ name="HH \\"World",x="asdf foo" - - >>> print(Write.parse("cpu,x=3 field\ name=\\"HH \\\\\\"World\\",x=\\"asdf foo\\"")) - cpu,x=3 field\\ name="HH \\"World",x="asdf foo" - - >>> print(Write.parse("cpu foo=\\"bar\\" 12345")) - cpu foo="bar" 12345 - - >>> line='"measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\\\="string field value, only \\\\" need be quoted"'; \\ - ... Write.parse(line); print(Write.parse(line)) - - "measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\="string field value, only \\\" need be quoted" - - >>> Write.parse('disk_free value=442221834240,working\ directories="C:\My Documents\Stuff for examples,C:\My Documents"') - - - >>> Write.parse('disk_free value=442221834240,working\ directories="C:\My Documents\Stuff for examples,C:\My Documents" 123') - - - >>> print(Write.parse('foo,foo=2 "field key with space"="string field"')) - foo,foo=2 field\ key\ with\ space="string field" - - >>> print(Write.parse('foo,foo=2 field_key\\\\\\="string field"')) - foo,foo=2 field_key\\\\="string field" - - >>> print(Write.parse('foo,foo=2 field_key="string\\\\" field"')) - foo,foo=2 field_key="string\\" field" - - >>> line='foo field0="tag",field1=t,field2=true,field3=True,field4=TRUE'; \\ - ... Write.parse(line); print(Write.parse(line)) - - foo field0="tag",field1=True,field2=True,field3=True,field4=True - - >>> line='foo field1=f,field2=false,field3=False,field4=FALSE,field5="fag"'; \\ - ... Write.parse(line); print(Write.parse(line)) - - foo field1=False,field2=False,field3=False,field4=False,field5="fag" - - >>> line='"measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\\\="string field value, only \\\\" need be quoted"'; \\ - ... Write.parse(line); print(Write.parse(line)) - - "measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\="string field value, only \\" need be quoted" - - >>> Write.parse('"measurement\ with\ quotes" foo=1') - - - >>> print(Write.parse('K.5S,Ccpvo="eSLyE" value="7F\\\\\\\\\\\\""')) - K.5S,Ccpvo="eSLyE" value="7F\\\\\\\\\\\"" - - >>> print(Write.parse('K.5S,Ccpvo=a\\ b value=1')) - K.5S,Ccpvo=a\\ b value=1 - - >>> print(Write('test', [('a','b')], [('value','asd\\\\')])) - test,a=b value="asd\\\\" - """ - - tokval = lambda t: t.value - joinval = "".join - someToken = lambda type: some(lambda t: t.type == type) - someCharValue = lambda string: \ - reduce(lambda a, b: a + b, - map(lambda char: - some(lambda t: t.value == char) >> tokval, - string)) >> joinval - - char = someToken('Char') >> tokval - space = someToken('Space') >> tokval - comma = someToken('Comma') >> tokval - quote = someToken('Quote') >> tokval - escape = someToken('Escape') >> tokval - equal = someToken('Equal') >> tokval - true_value = (someCharValue("true") | someCharValue("t") | - someCharValue("True") | someCharValue("TRUE") | someCharValue("T")) - false_value = (someCharValue("false") | someCharValue("f") | - someCharValue("False") | someCharValue("FALSE") | someCharValue("F")) - - escape_space = skip(escape) + space >> joinval - escape_comma = skip(escape) + comma >> joinval - escape_equal = skip(escape) + equal >> joinval - escape_quote = skip(escape) + quote >> joinval - escape_escape = skip(escape) + escape >> joinval - - plain_int_text = someToken('Int') >> tokval - plain_int = plain_int_text >> (lambda v: int(v)) - plain_float_text = someToken('Float') >> tokval - plain_float = plain_float_text >> (lambda v: float(v)) - - identifier = many(char | plain_float_text | plain_int_text | - escape_space | escape_comma | escape_equal | - escape_escape | plain_int_text | quote) >> joinval - quoted_text_ = many(escape_quote | space | plain_int_text | - plain_float_text | char | comma | - escape) >> joinval - quoted_text = skip(quote) + quoted_text_ + skip(quote) - unquoted_text = many(escape_space | escape_comma | - escape_equal | escape_escape | - plain_int_text | char | quote) >> joinval - boolean_value = (true_value >> (lambda s: True) - | false_value >> (lambda s: False)) - - kv_value = plain_int | plain_float | quoted_text | boolean_value - kv = (quoted_text | unquoted_text) + skip(equal) + kv_value >> \ - (lambda x: (x[0], x[1])) - - tag = identifier + skip(equal) + identifier >> (lambda x: (x[0], x[1])) - - def setter(obj, propert): - def r(val): - setattr(obj, propert, val) - return (propert, val) - return r - - tags = many(skip(comma) + tag) >> (lambda x: x) - fields = (kv + many(skip(comma) + kv)) >> \ - (lambda x: [x[0]] + x[1]) - - write = Write(None, None, None, None) - toplevel = (identifier >> setter(write, "key")) + \ - maybe(tags >> setter(write, "tags")) + \ - (skip(space) + (fields >> setter(write, "fields"))) + \ - maybe(skip(space) + plain_int >> setter(write, "timestamp")) + \ - skip(finished) >> (lambda x: x) - - result = toplevel.parse(Write.tokenize(line)) - # pprint(result) - return write - - def __repr__(self): - return "<{} key={} tags={} fields={} timestamp={}>".format( - self.__class__.__name__, self.key, self.tags, self.fields, self.timestamp) - - @staticmethod - def escape_identifier(string): - return re.sub(r'([\\,= ])', '\\\\\\1', string) - - @staticmethod - def escape_tags(taglist): - return ",".join(map(lambda kv: - (Write.escape_identifier(kv[0]) - + "=" + Write.escape_identifier(kv[1])), - taglist)) - - @staticmethod - def escape_value(obj): - if (isinstance(obj, float) - or isinstance(obj, int) - or isinstance(obj, bool)): - return str(obj) - else: - obj = str(obj) - return "\"" + obj.replace("\\", "\\\\").replace("\"", "\\\"") + "\"" - - @staticmethod - def escape_fields(kvlist): - def escape_key(string): - return re.sub(r'(["\\,= ])', '\\\\\\1', string) - - return ",".join( - map(lambda kv: escape_key(kv[0]) + "=" + Write.escape_value(kv[1]), - kvlist)) - - def __str__(self): - result = self.escape_identifier(self.key) - - if self.tags: - result += "," - result += self.escape_tags(self.tags) - - if self.fields: - result += " " - result += self.escape_fields(self.fields) - - if self.timestamp: - result += " " - result += str(self.timestamp) - - return result - - -if __name__ == "__main__": - import doctest - doctest.testmod() diff --git a/setup.py b/setup.py index 7126f2e..eff3f20 100644 --- a/setup.py +++ b/setup.py @@ -1,24 +1,23 @@ -#!/usr/bin/env python - +#!/usr/bin/env python3 from distutils.core import setup -setup(name='pyinfluxtools', - version='0.1', - description='Python classes to work with influxdb', - author='Yves Fischer', - author_email='yvesf+github@xapek.org', - license="MIT", - packages = ['pyinfluxtools'], -# url='https://github.com/', -# scripts=[], - install_requires=['funcparserlib==0.3.6'], - classifiers = [ - "Programming Language :: Python", - "Programming Language :: Python :: 3", - "Development Status :: 3 - Alpha", - "Intended Audience :: Developers", - "Operating System :: OS Independent", - "License :: OSI Approved :: MIT License", - ] -) +version = '0.1' +setup(name='pyinflux', + version=version, + description='Python classes to work with influxdb', + author='Yves Fischer', + author_email='yvesf+git@xapek.org', + license="MIT", + py_modules=['pyinflux'], + # url='https://github.com/', + install_requires=[], + extras_require={'parser': ['funcparserlib==0.3.6']}, + classifiers=[ + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Operating System :: OS Independent", + "License :: OSI Approved :: MIT License", + ]) diff --git a/test.py b/test.py new file mode 100755 index 0000000..a87b2e0 --- /dev/null +++ b/test.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 +from doctest import testmod +from pyinflux import client, parser + +if __name__ == '__main__': + testmod(m=client) + testmod(m=parser) -- cgit v1.2.1