summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYves Fischer <yvesf-git@xapek.org>2016-07-23 18:42:06 +0200
committerYves Fischer <yvesf-git@xapek.org>2016-07-23 18:42:40 +0200
commit76289024543fbfb6df3d07f61f15eabc623dd246 (patch)
tree5f7e7a86e42f26e8afe3b4437f77e9e4100280ee
parentf019accc6f20ef97e95b9b6a3f776aefb5ebd62b (diff)
downloadpyinflux-76289024543fbfb6df3d07f61f15eabc623dd246.tar.gz
pyinflux-76289024543fbfb6df3d07f61f15eabc623dd246.zip
restructure code
rename module to pyinflux split in client and parser package. the parser depends also on funcparserlib contains 3 examples of strange influxdb behavior
-rw-r--r--.gitignore2
-rw-r--r--LICENSE20
-rw-r--r--MANIFEST.in1
-rw-r--r--README.md3
-rwxr-xr-xanotherBug.py37
-rwxr-xr-xfuzzer.py132
-rwxr-xr-xfuzzer1.py74
-rwxr-xr-xfuzzer2.py132
-rw-r--r--pyinflux/__init__.py1
-rw-r--r--pyinflux/client/__init__.py149
-rw-r--r--pyinflux/parser/__init__.py222
-rw-r--r--pyinfluxtools/__init__.py293
-rw-r--r--setup.py41
-rwxr-xr-xtest.py7
14 files changed, 668 insertions, 446 deletions
diff --git a/.gitignore b/.gitignore
index 72723e5..ffd574f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,3 @@
*pyc
+/MANIFEST
+/.idea \ No newline at end of file
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..ff8652d
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+
+Copyright (c) 2016 Yves Fischer <yvesf+git@xapek.org>
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..45d276d
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1 @@
+include pyinflux/* \ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..e931f93
--- /dev/null
+++ b/README.md
@@ -0,0 +1,3 @@
+# pyinflux tools
+
+the files `fuzzer1.py` and `fuzzer2.py` contain usage examples. \ No newline at end of file
diff --git a/anotherBug.py b/anotherBug.py
new file mode 100755
index 0000000..ce52cf4
--- /dev/null
+++ b/anotherBug.py
@@ -0,0 +1,37 @@
+#!/usr/bin/env python3
+"""
+another bug?
+in 2016/07/23 18:37:37 InfluxDB starting, version 0.13.0, branch 0.13, commit e57fb88a051ee40fd9277094345fbd47bb4783ce
+the escaping of the 'key' or 'name' is strange
+"""
+from pyinflux.client import Influx, Line
+
+influxdb = Influx('localhost')
+
+print(influxdb.execute('DROP DATABASE test').as_text())
+print(influxdb.execute('CREATE DATABASE test').as_text())
+
+
+def write(line):
+ print(repr(line))
+ print(str(line))
+ return influxdb.write_db('test', [line])
+
+
+print(write(Line('asd\\', {'tag1': ''}, {'field1': ''})))
+print(write(Line('asd\\\\', {'tag1': ''}, {'field1': ''})))
+
+
+def query(q):
+ print(q)
+ try:
+ return influxdb.query_db('test', q).as_json()
+ except Exception as e:
+ return str(e)
+
+# at least some of these query must return something...
+print(query('SELECT * FROM asd'))
+print(query('SELECT * FROM asd\\'))
+print(query('SELECT * FROM asd\\\\'))
+print(query('SELECT * FROM "asd"'))
+print(query('SELECT * FROM "asd\\\\"'))
diff --git a/fuzzer.py b/fuzzer.py
deleted file mode 100755
index ffdb0e7..0000000
--- a/fuzzer.py
+++ /dev/null
@@ -1,132 +0,0 @@
-#!/usr/bin/env python3
-import functools
-import random
-from multiprocessing.pool import ThreadPool as Pool
-
-import requests
-from pyinfluxtools import *
-from influxdb import InfluxDBClient
-
-settings = {
- 'host': 'localhost',
- 'port': 8086,
- 'username': 'root',
- 'password': 'root',
- 'database': 'test',
-}
-
-write_url = ("http://{host}:{port}/write?db={database}" +
- "&username={username}&password={password}"
- ).format(**settings)
-
-client = InfluxDBClient(settings['host'], settings['port'],
- settings['username'], settings['password'],
- settings['database'])
-
-
-class Generator:
- all = lambda x: bytes(chr(random.randint(0x00, 0xff)), 'latin-1')
- _printables = list(map(chr, list(range(0x20, 0x7E))))
- printable = lambda x: random.choice(Generator._printables)
- numericText = lambda x: chr(random.randint(ord("0"), ord("9")))
- text = lambda x: chr(random.choice(
- list(range(ord("a"), ord("z"))) +
- list(range(ord("A"), ord("Z")))))
-
-
-class Filter:
- regex = lambda regex: re.compile(regex).match
- pass_all = lambda _: True
-
-
-def generate(length_min, length_max, filter, generator):
- length = random.randint(length_min, length_max)
- while True:
- text = functools.reduce(
- lambda a, b: a + b, map(generator, range(length)))
- if filter(text):
- return text
-
-
-def run(*a):
- while True:
- test()
-
-
-def test():
- # influxdb doesn't allow measurements starting with {
- noBrace = Filter.regex("^[^\\{]")
- noHash = Filter.regex("^[^#]") # hash sign starts a comment
- # TODO: only allow valid escape sequences
- noEscape = Filter.regex("^[^\\\\]+$")
-
- def generateKey(min=3,max=6):
- return generate(min, max, lambda text:
- noBrace(text) and noHash(text) and noEscape(text),
- Generator.printable)
-
- def generateTags(min, max):
- def generateTagPairs():
- for i in range(random.randint(min, max)):
- yield (generateKey(), generateKey())
-
- return dict(tuple(generateTagPairs()))
-
- def generateFields(min, max):
- def generateFieldPairs():
- for i in range(random.randint(min,max)):
- yield (generateKey(), generate(3, 6, lambda _: True, Generator.printable))
- return dict(tuple(generateFieldPairs()))
-
- w = Write(generateKey(8,12), # key
- generateTags(1, 4), # tags
- generateFields(1, 4)) # fields
- line = str(w)
- print(line)
- try:
- r = requests.post(write_url, line)
- INFO = ("Data:\nline={}\nwrite={}\nr.status_code={}\n" +
- "r.content={}").format(line, repr(w), r.status_code, r.text)
- assert r.status_code == 204, INFO
- except Exception as e:
- print(e)
- assert False, "Data\nline={}\nwrite={}".format(str(w), repr(w))
-
- measurement = Write.escape_value(w.key)
- query = """\
-SELECT *
-FROM {measurement}
-WHERE time >= now() - 2s""".format(**locals())
- result = []
- try:
- result = list(client.query(query))
- except Exception as e:
- print(e)
- wrepr=repr(w)
- DEBUGINFO = ("DEBUG:\nquery={query}\nwrite={wrepr}\n" +
- "result={result}\nline={line}").format(**locals())
- assert len(result) == 1, DEBUGINFO
- assert len(result[0][0]) == len(w.tags) + len(w.fields) + 1, DEBUGINFO # + time
-
- # assert fields
- for (key, value) in w.fields:
- assert key in result[0][0], DEBUGINFO
- assert result[0][0][key] == value, DEBUGINFO + \
- "\n"+ key +"=" + result[0][0][key] + "=" + value
-
- # assert tags
- for (tagKey, tagValue) in w.tags:
- assert tagKey in result[0][0], DEBUGINFO
- assert result[0][0][tagKey] == tagValue, DEBUGINFO
-
-
-N_PROC = 4
-with Pool(processes=N_PROC) as pool:
- try:
- for res in pool.imap_unordered(run, [None] * N_PROC):
- pool.terminate()
- print("Exit")
- raise SystemExit(0)
- except Exception as e:
- pool.terminate()
- raise e
diff --git a/fuzzer1.py b/fuzzer1.py
new file mode 100755
index 0000000..57b5de6
--- /dev/null
+++ b/fuzzer1.py
@@ -0,0 +1,74 @@
+#!/usr/bin/env python3
+"""
+in 2016/07/23 18:37:37 InfluxDB starting, version 0.13.0, branch 0.13, commit e57fb88a051ee40fd9277094345fbd47bb4783ce
+this fuzzer can cause a concurrent read write crash
+"""
+import itertools
+from multiprocessing.pool import ThreadPool as Pool
+from pyinflux.client import InfluxDB, Line
+
+influxdb = InfluxDB('test', 'localhost')
+
+value_generator = itertools.count()
+
+
+def test(number):
+ for value in value_generator:
+ if value + 1 % 500 == 0:
+ print("thread {} at {}".format(number, value))
+ expected = "value{value}".format(value=value)
+ line = Line('series' + str(value),
+ {'tag': 'tag' + expected},
+ {'field': 'field' + expected})
+
+ try:
+ influxdb.write([line])
+ except:
+ print(line)
+ raise
+
+ query = """\
+ SELECT *
+ FROM "series{value}"
+ WHERE time >= now() - 2s""".format(value=value)
+ try:
+ results = influxdb.query(query).as_json()['results']
+ except:
+ print("""\
+ Failure. With the following INSERT:
+ {line}""".format(line=line))
+ raise
+ try:
+ assert len(results) == 1
+ series = results[0]['series']
+ assert len(series) == 1
+ series_result = series[0]
+ assert series_result['name'] == 'series' + str(value)
+ assert len(series_result['values']) == 1
+ values = series_result['values']
+ assert len(values) == 1
+ assert len(values[0]) == 3
+ columns = series_result['columns']
+ assert columns == ['time', 'field', 'tag']
+
+ assert values[0][columns.index('field')] == 'fieldvalue' + str(value)
+ assert values[0][columns.index('tag')] == 'tagvalue' + str(value)
+ except:
+ print("""\
+ Failure. With the following INSERT:
+ {line}
+
+ Got this result:
+ {results}""".format(line=line, results=results))
+ raise
+
+
+print(influxdb.query('DROP DATABASE test').as_text())
+print(influxdb.query('CREATE DATABASE test').as_text())
+
+N_PROC = 2
+with Pool(processes=N_PROC) as pool:
+ for res in pool.imap_unordered(test, range(1, N_PROC + 1)):
+ # they are suppossed to run endlessly, if one returns then there was a failure
+ pool.terminate()
+print('Done')
diff --git a/fuzzer2.py b/fuzzer2.py
new file mode 100755
index 0000000..14932a8
--- /dev/null
+++ b/fuzzer2.py
@@ -0,0 +1,132 @@
+#!/usr/bin/env python3
+"""
+in 2016/07/23 18:37:37 InfluxDB starting, version 0.13.0, branch 0.13, commit e57fb88a051ee40fd9277094345fbd47bb4783ce
+this fuzzer can generate write line-statements that are parsed differently than how they are meant
+"""
+import re
+import functools
+import random
+from multiprocessing.pool import ThreadPool as Pool
+
+from pyinflux.client import Line, InfluxDB
+
+influxdb = InfluxDB('test', 'localhost')
+
+
+class Generator:
+ all = lambda x: bytes(chr(random.randint(0x00, 0xff)), 'latin-1')
+ _printables = list(map(chr, list(range(0x20, 0x7E))))
+ printable = lambda x: random.choice(Generator._printables)
+ numericText = lambda x: chr(random.randint(ord("0"), ord("9")))
+ text = lambda x: chr(random.choice(
+ list(range(ord("a"), ord("z"))) +
+ list(range(ord("A"), ord("Z")))))
+ basicText = lambda x: chr(random.choice(
+ [ord("\""), ord(" "), ord("\\")] +
+ list(range(ord("a"), ord("z"))) +
+ list(range(ord("A"), ord("Z")))))
+
+
+class Filter:
+ regex = lambda regex: re.compile(regex).match
+ pass_all = lambda _: True
+
+
+def generate(length_min, length_max, filter, generator):
+ length = random.randint(length_min, length_max)
+ while True:
+ text = functools.reduce(
+ lambda a, b: a + b, map(generator, range(length)))
+ if filter(text):
+ return text
+
+
+def run(*a):
+ while True:
+ test()
+
+
+usedKeys = set()
+
+
+def test():
+ # influxdb doesn't allow measurements starting with {
+ noBrace = Filter.regex("^[^\\{]")
+ noHash = Filter.regex("^[^#]") # hash sign starts a comment
+ # TODO: only allow valid escape sequences
+ noEscape = Filter.regex("^[^\\\\]+$")
+ noUsed = lambda term: term not in usedKeys
+
+ def generateKey(min=3, max=6):
+ return generate(min, max, lambda text: noUsed(text)
+ and noEscape(text), # for now, see anotherBug.py
+ Generator.basicText)
+
+ def generateTags(min, max):
+ def generateTagPairs():
+ for i in range(random.randint(min, max)):
+ yield (generateKey(), generateKey())
+
+ return dict(tuple(generateTagPairs()))
+
+ def generateFields(min, max):
+ def generateFieldPairs():
+ for i in range(random.randint(min, max)):
+ yield (generateKey(), generate(3, 6, lambda text: True
+ , Generator.basicText))
+
+ return dict(tuple(generateFieldPairs()))
+
+ w = Line(generateKey(8, 12), # key
+ generateTags(1, 4), # tags
+ generateFields(1, 4)) # fields
+ usedKeys.add(w.key)
+ try:
+ text = influxdb.write([w])
+ except Exception as e:
+ print("Data\nline={}\nwrite={}".format(str(w), repr(w)))
+ raise e
+
+ measurement = Line.escape_value(w.key)
+ query = """\
+SELECT *
+FROM {measurement}
+WHERE time >= now() - 2s""".format(**locals())
+ query_response = []
+ try:
+ query_response = influxdb.query(query).as_json()
+ except Exception as e:
+ print("Data\nline={}\nwrite={}".format(str(w), repr(w)))
+ raise e
+ DEBUGINFO = ("DEBUG:\nquery={query}\nwrite={w}\n" +
+ "query_response={query_response}").format(**locals())
+ try:
+ assert len(query_response) == 1, DEBUGINFO
+
+ results = query_response['results']
+ assert len(results) == 1
+
+ result = results[0]
+ series = result['series']
+ for serie in series:
+ assert serie['name'] == w.key
+ assert len(serie['columns']) == len(w.tags) + len(w.fields) + 1, \
+ "Length of colums: {} != {}".format(len(serie['columns']), len(w.tags) + len(w.fields))
+ except:
+ print("Data\nline={}\nwrite={}\nquery={}\nquery_response={}".format(str(w), repr(w), query, query_response))
+ raise
+
+
+print(influxdb.query('DROP DATABASE test').as_text())
+print(influxdb.query('CREATE DATABASE test').as_text())
+
+N_PROC = 2
+with Pool(processes=N_PROC) as pool:
+ try:
+ for res in pool.imap_unordered(run, [None] * N_PROC):
+ pool.terminate()
+ print("Exit")
+ raise SystemExit(0)
+ except:
+ pool.terminate()
+ raise
diff --git a/pyinflux/__init__.py b/pyinflux/__init__.py
new file mode 100644
index 0000000..bc63beb
--- /dev/null
+++ b/pyinflux/__init__.py
@@ -0,0 +1 @@
+# empty \ No newline at end of file
diff --git a/pyinflux/client/__init__.py b/pyinflux/client/__init__.py
new file mode 100644
index 0000000..b485861
--- /dev/null
+++ b/pyinflux/client/__init__.py
@@ -0,0 +1,149 @@
+import typing
+import io
+import re
+from urllib.request import urlopen
+from urllib.parse import quote as urlquote, urlencode
+import json
+import codecs
+
+
+class Line(object):
+ def __init__(self, key, tags, fields, timestamp=None):
+ self.key = key
+ self.tags = tags
+ self.fields = fields
+ self.timestamp = timestamp
+
+ if isinstance(self.tags, dict):
+ self.tags = self.tags.items()
+
+ if isinstance(self.fields, dict):
+ self.fields = self.fields.items()
+
+ @staticmethod
+ def escape_identifier(string):
+ return re.sub(r'([\\,= ])', '\\\\\\1', string)
+
+ @staticmethod
+ def escape_tags(taglist):
+ return ",".join(map(lambda kv:
+ (Line.escape_identifier(kv[0]) + "=" + Line.escape_identifier(kv[1])),
+ taglist))
+
+ @staticmethod
+ def escape_value(obj):
+ if (isinstance(obj, float) or
+ isinstance(obj, int) or
+ isinstance(obj, bool)):
+ return str(obj)
+ else:
+ obj = str(obj)
+ return "\"" + obj.replace("\\", "\\\\").replace("\"", "\\\"") + "\""
+
+ @staticmethod
+ def escape_fields(kvlist):
+ def escape_key(string):
+ return re.sub(r'(["\\,= ])', '\\\\\\1', string)
+
+ return ",".join(
+ map(lambda kv: escape_key(kv[0]) + "=" + Line.escape_value(kv[1]),
+ kvlist))
+
+ def __repr__(self):
+ """
+ >>> print(repr(Line('test', [('a','b')], [('value','asd\\\\')])))
+ <Line key=test tags=[('a', 'b')] fields=[('value', 'asd\\\\')] timestamp=None>
+ """
+ return "<{} key={} tags={} fields={} timestamp={}>".format(
+ self.__class__.__name__, self.key, self.tags, self.fields, self.timestamp)
+
+ def __str__(self):
+ """
+ >>> print(Line('test', [('a','b')], [('value','asd\\\\')]))
+ test,a=b value="asd\\\\"
+ """
+ result = self.escape_identifier(self.key)
+
+ if self.tags:
+ result += ","
+ result += self.escape_tags(self.tags)
+
+ if self.fields:
+ result += " "
+ result += self.escape_fields(self.fields)
+
+ if self.timestamp:
+ result += " "
+ result += str(self.timestamp)
+
+ return result
+
+
+class QueryResultOption:
+ CODEC = codecs.getreader('utf-8')
+
+ def __init__(self, exec_func: typing.Callable[[], io.IOBase]):
+ self.exec_func = exec_func
+ self._json = None
+ self._text = None
+
+ def as_json(self):
+ if self._json is None:
+ fh = self.CODEC(self.exec_func())
+ self._json = json.load(fh)
+ fh.close()
+ return self._json
+
+ def as_text(self):
+ if self._text is None:
+ fh = self.CODEC(self.exec_func())
+ self._text = fh.read()
+ fh.close()
+ return self._text
+
+
+class Influx:
+ def __init__(self, host: str, port: int = 8086, username: str = None, password: str = None):
+ """
+ :param username: username and password:
+ :param password: if set both must be set
+ """
+ self._write_url = "http://{host}:{port}/write?".format(**locals())
+ self._query_url_get = "http://{host}:{port}/query?".format(**locals())
+ self._query_url_post = "http://{host}:{port}/query".format(**locals())
+ if username and password:
+ self._write_url += 'username=' + username + '&password' + password + '&'
+ self._query_url_get += 'username=' + username + '&password' + password + '&'
+ self._query_url_post += '?username=' + username + '&password' + password
+
+ def write_db(self, db: str, lines: [Line]):
+ url = self._write_url + "db=" + urlquote(db)
+ request_data = "\n".join(map(str, lines)).encode('utf-8')
+ with urlopen(url, request_data) as fh:
+ response = fh.read()
+ return response.decode('utf-8')
+
+ def query_db(self, db: str, query: str) -> QueryResultOption:
+ def get_fh() -> io.IOBase:
+ url = self._query_url_get + 'db=' + urlquote(db) + '&q=' + urlquote(query)
+ return urlopen(url)
+
+ return QueryResultOption(get_fh)
+
+ def execute(self, query: str) -> QueryResultOption:
+ def get_fh() -> io.IOBase:
+ return urlopen(self._query_url_post, urlencode({'q': query}).encode('utf-8'))
+
+ return QueryResultOption(get_fh)
+
+
+class InfluxDB(Influx):
+ def __init__(self, db: str, host: str, port: int = 8086, username: str = None, password: str = None):
+ super().__init__(host, port, username, password)
+ self._db = db
+
+ def write(self, lines: [Line]):
+ return self.write_db(self._db, lines)
+
+ def query(self, query: str):
+ return self.query_db(self._db, query)
diff --git a/pyinflux/parser/__init__.py b/pyinflux/parser/__init__.py
new file mode 100644
index 0000000..2de6c0e
--- /dev/null
+++ b/pyinflux/parser/__init__.py
@@ -0,0 +1,222 @@
+from functools import reduce
+
+try:
+ from funcparserlib.lexer import make_tokenizer
+ from funcparserlib.parser import (
+ some, maybe, many, finished, skip, NoParseError)
+except ImportError as e:
+ print("Missing funcparserlib library. You need to install the 'parser' extra dependency set.")
+ raise e
+
+from pyinflux import client
+
+
+def parse_lines(lines):
+ """
+ Parse multiple Write objects separeted by new-line character.
+
+ >>> print(LineParser.parse("foo b=1"))
+ foo b=1
+
+ >>> lines = []
+ >>> lines += ['cpu field=123']
+ >>> lines += ['cpu,host=serverA,region=us-west field1=1,field2=2']
+ >>> lines += ['cpu,host=serverA,region=us-west field1=1,field2=2 1234']
+ >>> print("\\n".join(map(str, parse_lines("\\n".join(lines)))))
+ cpu field=123
+ cpu,host=serverA,region=us-west field1=1,field2=2
+ cpu,host=serverA,region=us-west field1=1,field2=2 1234
+ """
+ writes = map(LineParser.parse, lines.split("\n"))
+ return list(writes)
+
+
+class LineParser(object):
+ specs = [
+ ('Comma', (r',',)),
+ ('Space', (r' ',)),
+ ('Equal', (r'=',)),
+ ('Quote', (r'"',)),
+ ('Escape', (r'\\',)),
+ ('Int', (r'[0-9]+(?![0-9\.])',)),
+ ('Float', (r'-?(\.[0-9]+)|([0-9]+(\.[0-9]*)?)',)),
+ ('Char', (r'.',)),
+ ]
+
+ @classmethod
+ def tokenize(klass, line : str):
+ tokenizer = make_tokenizer(klass.specs)
+ return list(tokenizer(line))
+
+ @staticmethod
+ def parse(line):
+ """
+ Parse a line from the POST request into a Write object.
+
+ >>> line='cpu a=1'; LineParser.parse(line); print(LineParser.parse(line))
+ <Line key=cpu tags=[] fields=[('a', 1)] timestamp=None>
+ cpu a=1
+
+ >>> print(LineParser.parse('yahoo.CHFGBP\\=X.ask,tag=foobar value=10.2'))
+ yahoo.CHFGBP\=X.ask,tag=foobar value=10.2
+
+ >>> LineParser.parse('cpu,host=serverA,region=us-west foo="bar"')
+ <Line key=cpu tags=[('host', 'serverA'), ('region', 'us-west')] fields=[('foo', 'bar')] timestamp=None>
+
+ >>> print(LineParser.parse('cpu host="serverA",region="us-west"'))
+ cpu host="serverA",region="us-west"
+
+ >>> line='cpu\\,01 host="serverA",region="us-west"'; \\
+ ... LineParser.parse(line); print(LineParser.parse(line))
+ <Line key=cpu,01 tags=[] fields=[('host', 'serverA'), ('region', 'us-west')] timestamp=None>
+ cpu\,01 host="serverA",region="us-west"
+
+ >>> LineParser.parse('cpu host="server A",region="us west"')
+ <Line key=cpu tags=[] fields=[('host', 'server A'), ('region', 'us west')] timestamp=None>
+
+ >>> line='cpu ho\\=st="server A",region="us west"'; \\
+ ... LineParser.parse(line); print(LineParser.parse(line))
+ <Line key=cpu tags=[] fields=[('ho=st', 'server A'), ('region', 'us west')] timestamp=None>
+ cpu ho\=st="server A",region="us west"
+
+ >>> print(LineParser.parse('cpu,ho\=st=server\ A field=123'))
+ cpu,ho\=st=server\ A field=123
+
+ # error: double name is accepted
+ >>> print(LineParser.parse('cpu,foo=bar,foo=bar field=123,field=123'))
+ cpu,foo=bar,foo=bar field=123,field=123
+
+ >>> print(LineParser.parse('cpu field12=12'))
+ cpu field12=12
+
+ >>> print(LineParser.parse('cpu field12=12 123123123'))
+ cpu field12=12 123123123
+
+ >>> try: print(LineParser.parse('cpu field12=12 1231abcdef123'))
+ ... except NoParseError: pass
+
+ >>> print(LineParser.parse('cpu,x=3,y=4,z=6 field\ name="HH \\\\\\"World",x="asdf foo"'))
+ cpu,x=3,y=4,z=6 field\\ name="HH \\"World",x="asdf foo"
+
+ >>> print(LineParser.parse("cpu,x=3 field\ name=\\"HH \\\\\\"World\\",x=\\"asdf foo\\""))
+ cpu,x=3 field\\ name="HH \\"World",x="asdf foo"
+
+ >>> print(LineParser.parse("cpu foo=\\"bar\\" 12345"))
+ cpu foo="bar" 12345
+
+ >>> line='"measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\\\="string field value, only \\\\" need be quoted"'; \\
+ ... LineParser.parse(line); print(LineParser.parse(line))
+ <Line key="measurement with quotes" tags=[('tag key with spaces', 'tag,value,with"commas"')] fields=[('field_key\\\\', 'string field value, only " need be quoted')] timestamp=None>
+ "measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\="string field value, only \\\" need be quoted"
+
+ >>> LineParser.parse('disk_free value=442221834240,working\ directories="C:\My Documents\Stuff for examples,C:\My Documents"')
+ <Line key=disk_free tags=[] fields=[('value', 442221834240), ('working directories', 'C:\\\\My Documents\\\\Stuff for examples,C:\\\\My Documents')] timestamp=None>
+
+ >>> LineParser.parse('disk_free value=442221834240,working\ directories="C:\My Documents\Stuff for examples,C:\My Documents" 123')
+ <Line key=disk_free tags=[] fields=[('value', 442221834240), ('working directories', 'C:\\\\My Documents\\\\Stuff for examples,C:\\\\My Documents')] timestamp=123>
+
+ >>> print(LineParser.parse('foo,foo=2 "field key with space"="string field"'))
+ foo,foo=2 field\ key\ with\ space="string field"
+
+ >>> print(LineParser.parse('foo,foo=2 field_key\\\\\\="string field"'))
+ foo,foo=2 field_key\\\\="string field"
+
+ >>> print(LineParser.parse('foo,foo=2 field_key="string\\\\" field"'))
+ foo,foo=2 field_key="string\\" field"
+
+ >>> line='foo field0="tag",field1=t,field2=true,field3=True,field4=TRUE'; \\
+ ... LineParser.parse(line); print(LineParser.parse(line))
+ <Line key=foo tags=[] fields=[('field0', 'tag'), ('field1', True), ('field2', True), ('field3', True), ('field4', True)] timestamp=None>
+ foo field0="tag",field1=True,field2=True,field3=True,field4=True
+
+ >>> line='foo field1=f,field2=false,field3=False,field4=FALSE,field5="fag"'; \\
+ ... LineParser.parse(line); print(LineParser.parse(line))
+ <Line key=foo tags=[] fields=[('field1', False), ('field2', False), ('field3', False), ('field4', False), ('field5', 'fag')] timestamp=None>
+ foo field1=False,field2=False,field3=False,field4=False,field5="fag"
+
+ >>> line='"measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\\\="string field value, only \\\\" need be quoted"'; \\
+ ... LineParser.parse(line); print(LineParser.parse(line))
+ <Line key="measurement with quotes" tags=[('tag key with spaces', 'tag,value,with"commas"')] fields=[('field_key\\\\', 'string field value, only " need be quoted')] timestamp=None>
+ "measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\="string field value, only \\" need be quoted"
+
+ >>> LineParser.parse('"measurement\ with\ quotes" foo=1')
+ <Line key="measurement with quotes" tags=[] fields=[('foo', 1)] timestamp=None>
+
+ >>> print(LineParser.parse('K.5S,Ccpvo="eSLyE" value="7F\\\\\\\\\\\\""'))
+ K.5S,Ccpvo="eSLyE" value="7F\\\\\\\\\\\""
+
+ >>> print(LineParser.parse('K.5S,Ccpvo=a\\ b value=1'))
+ K.5S,Ccpvo=a\\ b value=1
+ """
+
+ tokval = lambda t: t.value
+ joinval = "".join
+ someToken = lambda type: some(lambda t: t.type == type)
+ someCharValue = lambda string: \
+ reduce(lambda a, b: a + b,
+ map(lambda char:
+ some(lambda t: t.value == char) >> tokval,
+ string)) >> joinval
+
+ char = someToken('Char') >> tokval
+ space = someToken('Space') >> tokval
+ comma = someToken('Comma') >> tokval
+ quote = someToken('Quote') >> tokval
+ escape = someToken('Escape') >> tokval
+ equal = someToken('Equal') >> tokval
+ true_value = (someCharValue("true") | someCharValue("t") |
+ someCharValue("True") | someCharValue("TRUE") | someCharValue("T"))
+ false_value = (someCharValue("false") | someCharValue("f") |
+ someCharValue("False") | someCharValue("FALSE") | someCharValue("F"))
+
+ escape_space = skip(escape) + space >> joinval
+ escape_comma = skip(escape) + comma >> joinval
+ escape_equal = skip(escape) + equal >> joinval
+ escape_quote = skip(escape) + quote >> joinval
+ escape_escape = skip(escape) + escape >> joinval
+
+ plain_int_text = someToken('Int') >> tokval
+ plain_int = plain_int_text >> (lambda v: int(v))
+ plain_float_text = someToken('Float') >> tokval
+ plain_float = plain_float_text >> (lambda v: float(v))
+
+ identifier = many(char | plain_float_text | plain_int_text |
+ escape_space | escape_comma | escape_equal |
+ escape_escape | plain_int_text | quote) >> joinval
+ quoted_text_ = many(escape_quote | space | plain_int_text |
+ plain_float_text | char | comma |
+ escape) >> joinval
+ quoted_text = skip(quote) + quoted_text_ + skip(quote)
+ unquoted_text = many(escape_space | escape_comma |
+ escape_equal | escape_escape |
+ plain_int_text | char | quote) >> joinval
+ boolean_value = (true_value >> (lambda s: True)
+ | false_value >> (lambda s: False))
+
+ kv_value = plain_int | plain_float | quoted_text | boolean_value
+ kv = (quoted_text | unquoted_text) + skip(equal) + kv_value >> \
+ (lambda x: (x[0], x[1]))
+
+ tag = identifier + skip(equal) + identifier >> (lambda x: (x[0], x[1]))
+
+ def setter(obj, propert):
+ def r(val):
+ setattr(obj, propert, val)
+ return (propert, val)
+
+ return r
+
+ tags = many(skip(comma) + tag) >> (lambda x: x)
+ fields = (kv + many(skip(comma) + kv)) >> \
+ (lambda x: [x[0]] + x[1])
+
+ write = client.Line(None, None, None, None)
+ toplevel = (identifier >> setter(write, "key")) + \
+ maybe(tags >> setter(write, "tags")) + \
+ (skip(space) + (fields >> setter(write, "fields"))) + \
+ maybe(skip(space) + plain_int >> setter(write, "timestamp")) + \
+ skip(finished) >> (lambda x: x)
+
+ result = toplevel.parse(LineParser.tokenize(line))
+ # pprint(result)
+ return write
diff --git a/pyinfluxtools/__init__.py b/pyinfluxtools/__init__.py
deleted file mode 100644
index 68146ff..0000000
--- a/pyinfluxtools/__init__.py
+++ /dev/null
@@ -1,293 +0,0 @@
-#!/usr/bin/env python3
-import re
-import sys
-from functools import reduce
-
-from funcparserlib.lexer import make_tokenizer
-from funcparserlib.parser import (
- some, maybe, many, finished, skip, NoParseError)
-
-
-class WriteRequest(object):
-
- @staticmethod
- def parse(lines):
- """
- Parse multiple Write objects separeted by new-line character.
-
- >>> print(Write.parse("foo b=1"))
- foo b=1
-
- >>> lines = []
- >>> lines += ['cpu field=123']
- >>> lines += ['cpu,host=serverA,region=us-west field1=1,field2=2']
- >>> lines += ['cpu,host=serverA,region=us-west field1=1,field2=2 1234']
- >>> print("\\n".join(map(str, WriteRequest.parse("\\n".join(lines)))))
- cpu field=123
- cpu,host=serverA,region=us-west field1=1,field2=2
- cpu,host=serverA,region=us-west field1=1,field2=2 1234
- """
- writes = map(Write.parse, lines.split("\n"))
- return list(writes)
-
-
-class Write(object):
-
- def __init__(self, key, tags, fields, timestamp=None):
- self.key = key
- self.tags = tags
- self.fields = fields
- self.timestamp = timestamp
-
- if isinstance(self.tags, dict):
- self.tags = self.tags.items()
-
- if isinstance(self.fields, dict):
- self.fields = self.fields.items()
-
- specs = [
- ('Comma', (r',',)),
- ('Space', (r' ',)),
- ('Equal', (r'=',)),
- ('Quote', (r'"',)),
- ('Escape', (r'\\',)),
- ('Int', (r'[0-9]+(?![0-9\.])',)),
- ('Float', (r'-?(\.[0-9]+)|([0-9]+(\.[0-9]*)?)',)),
- ('Char', (r'.',)),
- ]
-
- @staticmethod
- def tokenize(line):
- tokenizer = make_tokenizer(Write.specs)
- return list(tokenizer(line))
-
- @staticmethod
- def parse(line):
- """
- Parse a line from the POST request into a Write object.
-
- >>> line='cpu a=1'; Write.parse(line); print(Write.parse(line))
- <Write key=cpu tags=[] fields=[('a', 1)] timestamp=None>
- cpu a=1
-
- >>> print(Write.parse('yahoo.CHFGBP\\=X.ask,tag=foobar value=10.2'))
- yahoo.CHFGBP\=X.ask,tag=foobar value=10.2
-
- >>> Write.parse('cpu,host=serverA,region=us-west foo="bar"')
- <Write key=cpu tags=[('host', 'serverA'), ('region', 'us-west')] fields=[('foo', 'bar')] timestamp=None>
-
- >>> print(Write.parse('cpu host="serverA",region="us-west"'))
- cpu host="serverA",region="us-west"
-
- >>> line='cpu\\,01 host="serverA",region="us-west"'; \\
- ... Write.parse(line); print(Write.parse(line))
- <Write key=cpu,01 tags=[] fields=[('host', 'serverA'), ('region', 'us-west')] timestamp=None>
- cpu\,01 host="serverA",region="us-west"
-
- >>> Write.parse('cpu host="server A",region="us west"')
- <Write key=cpu tags=[] fields=[('host', 'server A'), ('region', 'us west')] timestamp=None>
-
- >>> line='cpu ho\\=st="server A",region="us west"'; \\
- ... Write.parse(line); print(Write.parse(line))
- <Write key=cpu tags=[] fields=[('ho=st', 'server A'), ('region', 'us west')] timestamp=None>
- cpu ho\=st="server A",region="us west"
-
- >>> print(Write.parse('cpu,ho\=st=server\ A field=123'))
- cpu,ho\=st=server\ A field=123
-
- # error: double name is accepted
- >>> print(Write.parse('cpu,foo=bar,foo=bar field=123,field=123'))
- cpu,foo=bar,foo=bar field=123,field=123
-
- >>> print(Write.parse('cpu field12=12'))
- cpu field12=12
-
- >>> print(Write.parse('cpu field12=12 123123123'))
- cpu field12=12 123123123
-
- >>> try: print(Write.parse('cpu field12=12 1231abcdef123'))
- ... except NoParseError: pass
-
- >>> print(Write.parse('cpu,x=3,y=4,z=6 field\ name="HH \\\\\\"World",x="asdf foo"'))
- cpu,x=3,y=4,z=6 field\\ name="HH \\"World",x="asdf foo"
-
- >>> print(Write.parse("cpu,x=3 field\ name=\\"HH \\\\\\"World\\",x=\\"asdf foo\\""))
- cpu,x=3 field\\ name="HH \\"World",x="asdf foo"
-
- >>> print(Write.parse("cpu foo=\\"bar\\" 12345"))
- cpu foo="bar" 12345
-
- >>> line='"measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\\\="string field value, only \\\\" need be quoted"'; \\
- ... Write.parse(line); print(Write.parse(line))
- <Write key="measurement with quotes" tags=[('tag key with spaces', 'tag,value,with"commas"')] fields=[('field_key\\\\', 'string field value, only " need be quoted')] timestamp=None>
- "measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\="string field value, only \\\" need be quoted"
-
- >>> Write.parse('disk_free value=442221834240,working\ directories="C:\My Documents\Stuff for examples,C:\My Documents"')
- <Write key=disk_free tags=[] fields=[('value', 442221834240), ('working directories', 'C:\\\\My Documents\\\\Stuff for examples,C:\\\\My Documents')] timestamp=None>
-
- >>> Write.parse('disk_free value=442221834240,working\ directories="C:\My Documents\Stuff for examples,C:\My Documents" 123')
- <Write key=disk_free tags=[] fields=[('value', 442221834240), ('working directories', 'C:\\\\My Documents\\\\Stuff for examples,C:\\\\My Documents')] timestamp=123>
-
- >>> print(Write.parse('foo,foo=2 "field key with space"="string field"'))
- foo,foo=2 field\ key\ with\ space="string field"
-
- >>> print(Write.parse('foo,foo=2 field_key\\\\\\="string field"'))
- foo,foo=2 field_key\\\\="string field"
-
- >>> print(Write.parse('foo,foo=2 field_key="string\\\\" field"'))
- foo,foo=2 field_key="string\\" field"
-
- >>> line='foo field0="tag",field1=t,field2=true,field3=True,field4=TRUE'; \\
- ... Write.parse(line); print(Write.parse(line))
- <Write key=foo tags=[] fields=[('field0', 'tag'), ('field1', True), ('field2', True), ('field3', True), ('field4', True)] timestamp=None>
- foo field0="tag",field1=True,field2=True,field3=True,field4=True
-
- >>> line='foo field1=f,field2=false,field3=False,field4=FALSE,field5="fag"'; \\
- ... Write.parse(line); print(Write.parse(line))
- <Write key=foo tags=[] fields=[('field1', False), ('field2', False), ('field3', False), ('field4', False), ('field5', 'fag')] timestamp=None>
- foo field1=False,field2=False,field3=False,field4=False,field5="fag"
-
- >>> line='"measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\\\="string field value, only \\\\" need be quoted"'; \\
- ... Write.parse(line); print(Write.parse(line))
- <Write key="measurement with quotes" tags=[('tag key with spaces', 'tag,value,with"commas"')] fields=[('field_key\\\\', 'string field value, only " need be quoted')] timestamp=None>
- "measurement\ with\ quotes",tag\ key\ with\ spaces=tag\,value\,with"commas" field_key\\\\="string field value, only \\" need be quoted"
-
- >>> Write.parse('"measurement\ with\ quotes" foo=1')
- <Write key="measurement with quotes" tags=[] fields=[('foo', 1)] timestamp=None>
-
- >>> print(Write.parse('K.5S,Ccpvo="eSLyE" value="7F\\\\\\\\\\\\""'))
- K.5S,Ccpvo="eSLyE" value="7F\\\\\\\\\\\""
-
- >>> print(Write.parse('K.5S,Ccpvo=a\\ b value=1'))
- K.5S,Ccpvo=a\\ b value=1
-
- >>> print(Write('test', [('a','b')], [('value','asd\\\\')]))
- test,a=b value="asd\\\\"
- """
-
- tokval = lambda t: t.value
- joinval = "".join
- someToken = lambda type: some(lambda t: t.type == type)
- someCharValue = lambda string: \
- reduce(lambda a, b: a + b,
- map(lambda char:
- some(lambda t: t.value == char) >> tokval,
- string)) >> joinval
-
- char = someToken('Char') >> tokval
- space = someToken('Space') >> tokval
- comma = someToken('Comma') >> tokval
- quote = someToken('Quote') >> tokval
- escape = someToken('Escape') >> tokval
- equal = someToken('Equal') >> tokval
- true_value = (someCharValue("true") | someCharValue("t") |
- someCharValue("True") | someCharValue("TRUE") | someCharValue("T"))
- false_value = (someCharValue("false") | someCharValue("f") |
- someCharValue("False") | someCharValue("FALSE") | someCharValue("F"))
-
- escape_space = skip(escape) + space >> joinval
- escape_comma = skip(escape) + comma >> joinval
- escape_equal = skip(escape) + equal >> joinval
- escape_quote = skip(escape) + quote >> joinval
- escape_escape = skip(escape) + escape >> joinval
-
- plain_int_text = someToken('Int') >> tokval
- plain_int = plain_int_text >> (lambda v: int(v))
- plain_float_text = someToken('Float') >> tokval
- plain_float = plain_float_text >> (lambda v: float(v))
-
- identifier = many(char | plain_float_text | plain_int_text |
- escape_space | escape_comma | escape_equal |
- escape_escape | plain_int_text | quote) >> joinval
- quoted_text_ = many(escape_quote | space | plain_int_text |
- plain_float_text | char | comma |
- escape) >> joinval
- quoted_text = skip(quote) + quoted_text_ + skip(quote)
- unquoted_text = many(escape_space | escape_comma |
- escape_equal | escape_escape |
- plain_int_text | char | quote) >> joinval
- boolean_value = (true_value >> (lambda s: True)
- | false_value >> (lambda s: False))
-
- kv_value = plain_int | plain_float | quoted_text | boolean_value
- kv = (quoted_text | unquoted_text) + skip(equal) + kv_value >> \
- (lambda x: (x[0], x[1]))
-
- tag = identifier + skip(equal) + identifier >> (lambda x: (x[0], x[1]))
-
- def setter(obj, propert):
- def r(val):
- setattr(obj, propert, val)
- return (propert, val)
- return r
-
- tags = many(skip(comma) + tag) >> (lambda x: x)
- fields = (kv + many(skip(comma) + kv)) >> \
- (lambda x: [x[0]] + x[1])
-
- write = Write(None, None, None, None)
- toplevel = (identifier >> setter(write, "key")) + \
- maybe(tags >> setter(write, "tags")) + \
- (skip(space) + (fields >> setter(write, "fields"))) + \
- maybe(skip(space) + plain_int >> setter(write, "timestamp")) + \
- skip(finished) >> (lambda x: x)
-
- result = toplevel.parse(Write.tokenize(line))
- # pprint(result)
- return write
-
- def __repr__(self):
- return "<{} key={} tags={} fields={} timestamp={}>".format(
- self.__class__.__name__, self.key, self.tags, self.fields, self.timestamp)
-
- @staticmethod
- def escape_identifier(string):
- return re.sub(r'([\\,= ])', '\\\\\\1', string)
-
- @staticmethod
- def escape_tags(taglist):
- return ",".join(map(lambda kv:
- (Write.escape_identifier(kv[0])
- + "=" + Write.escape_identifier(kv[1])),
- taglist))
-
- @staticmethod
- def escape_value(obj):
- if (isinstance(obj, float)
- or isinstance(obj, int)
- or isinstance(obj, bool)):
- return str(obj)
- else:
- obj = str(obj)
- return "\"" + obj.replace("\\", "\\\\").replace("\"", "\\\"") + "\""
-
- @staticmethod
- def escape_fields(kvlist):
- def escape_key(string):
- return re.sub(r'(["\\,= ])', '\\\\\\1', string)
-
- return ",".join(
- map(lambda kv: escape_key(kv[0]) + "=" + Write.escape_value(kv[1]),
- kvlist))
-
- def __str__(self):
- result = self.escape_identifier(self.key)
-
- if self.tags:
- result += ","
- result += self.escape_tags(self.tags)
-
- if self.fields:
- result += " "
- result += self.escape_fields(self.fields)
-
- if self.timestamp:
- result += " "
- result += str(self.timestamp)
-
- return result
-
-
-if __name__ == "__main__":
- import doctest
- doctest.testmod()
diff --git a/setup.py b/setup.py
index 7126f2e..eff3f20 100644
--- a/setup.py
+++ b/setup.py
@@ -1,24 +1,23 @@
-#!/usr/bin/env python
-
+#!/usr/bin/env python3
from distutils.core import setup
-setup(name='pyinfluxtools',
- version='0.1',
- description='Python classes to work with influxdb',
- author='Yves Fischer',
- author_email='yvesf+github@xapek.org',
- license="MIT",
- packages = ['pyinfluxtools'],
-# url='https://github.com/',
-# scripts=[],
- install_requires=['funcparserlib==0.3.6'],
- classifiers = [
- "Programming Language :: Python",
- "Programming Language :: Python :: 3",
- "Development Status :: 3 - Alpha",
- "Intended Audience :: Developers",
- "Operating System :: OS Independent",
- "License :: OSI Approved :: MIT License",
- ]
-)
+version = '0.1'
+setup(name='pyinflux',
+ version=version,
+ description='Python classes to work with influxdb',
+ author='Yves Fischer',
+ author_email='yvesf+git@xapek.org',
+ license="MIT",
+ py_modules=['pyinflux'],
+ # url='https://github.com/',
+ install_requires=[],
+ extras_require={'parser': ['funcparserlib==0.3.6']},
+ classifiers=[
+ "Programming Language :: Python",
+ "Programming Language :: Python :: 3",
+ "Development Status :: 3 - Alpha",
+ "Intended Audience :: Developers",
+ "Operating System :: OS Independent",
+ "License :: OSI Approved :: MIT License",
+ ])
diff --git a/test.py b/test.py
new file mode 100755
index 0000000..a87b2e0
--- /dev/null
+++ b/test.py
@@ -0,0 +1,7 @@
+#!/usr/bin/env python3
+from doctest import testmod
+from pyinflux import client, parser
+
+if __name__ == '__main__':
+ testmod(m=client)
+ testmod(m=parser)