summaryrefslogtreecommitdiff
path: root/scheduler/influxdb.py
diff options
context:
space:
mode:
authorYves Fischer <yvesf-git@xapek.org>2018-07-17 12:34:04 +0200
committerYves Fischer <yvesf-git@xapek.org>2018-07-17 12:49:49 +0200
commit69c220cda3d8c0a95327630f5752dad36cb82261 (patch)
treefad9fbe78cf717f4dd17b8e9d996ab9a54b7d3e2 /scheduler/influxdb.py
downloaddatasources-69c220cda3d8c0a95327630f5752dad36cb82261.tar.gz
datasources-69c220cda3d8c0a95327630f5752dad36cb82261.zip
Squashed commit
Diffstat (limited to 'scheduler/influxdb.py')
-rw-r--r--scheduler/influxdb.py68
1 files changed, 68 insertions, 0 deletions
diff --git a/scheduler/influxdb.py b/scheduler/influxdb.py
new file mode 100644
index 0000000..ecfa6ea
--- /dev/null
+++ b/scheduler/influxdb.py
@@ -0,0 +1,68 @@
+import logging
+import reprlib
+import typing
+from urllib.request import urlopen
+
+import collections
+from pyinflux.client import Line
+
+from . import Job
+
+
+def _get_measurement_name(job: Job):
+ if 'measurement' in job.properties:
+ return job.properties['measurement']
+ else:
+ return job.name
+
+
+class Dumper:
+ def __init__(self):
+ self._repr = reprlib.Repr()
+
+ def __call__(self, *args, **kwargs) -> None:
+ if len(args) == 2 and isinstance(args[0], Job):
+ # assuming second is list of objects that str() to influx protocol lines
+ data = self._convert(args[0], args[1])
+ self._insert(data)
+ else:
+ raise Exception("Wrong arguments for InfluxDB inserter.")
+
+ def _insert(self, lines: typing.Iterable):
+ data = "\n".join(map(str, lines))
+ print("===== Would insert:\n" + data)
+
+ def _convert(self, job: Job, data) -> typing.Iterable[Line]:
+ def c(name, value):
+ if isinstance(value, Line):
+ return value
+ elif isinstance(value, int) or isinstance(value, str) or isinstance(value, float):
+ return Line(name, {}, {'value': value})
+ else:
+ raise Exception("Cannot simply insert value of type: {} for job {}".format(type(value), job))
+
+ measurement = _get_measurement_name(job)
+ if isinstance(data, collections.Iterable):
+ return map(lambda v: c(measurement, v), data)
+ else:
+ return [c(measurement, data)]
+
+
+class Inserter(Dumper):
+ def __init__(self, url: str) -> None:
+ super().__init__()
+ self._url: str = url
+
+ def _insert(self, lines: typing.Iterable):
+ try:
+ data = "\n".join(map(str, lines)).encode('utf-8')
+ try:
+ with urlopen(self._url, data) as fh:
+ logging.debug("InfluxDB successful answer: %s", self._repr.repr(fh.read().decode('utf-8')))
+ except Exception:
+ logging.exception("Failed insert of:\n%s", self._repr.repr(lines))
+ except Exception:
+ logging.exception("Failed formatting of:\n%s", self._repr.repr(lines))
+
+ def __repr__(self):
+ return f"<{self.__class__.__module__}.{self.__class__.__name__} url={repr(self._url)}>"