From 69c220cda3d8c0a95327630f5752dad36cb82261 Mon Sep 17 00:00:00 2001 From: Yves Fischer Date: Tue, 17 Jul 2018 12:34:04 +0200 Subject: Squashed commit --- scheduler/influxdb.py | 68 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 scheduler/influxdb.py (limited to 'scheduler/influxdb.py') diff --git a/scheduler/influxdb.py b/scheduler/influxdb.py new file mode 100644 index 0000000..ecfa6ea --- /dev/null +++ b/scheduler/influxdb.py @@ -0,0 +1,68 @@ +import logging +import reprlib +import typing +from urllib.request import urlopen + +import collections +from pyinflux.client import Line + +from . import Job + + +def _get_measurement_name(job: Job): + if 'measurement' in job.properties: + return job.properties['measurement'] + else: + return job.name + + +class Dumper: + def __init__(self): + self._repr = reprlib.Repr() + + def __call__(self, *args, **kwargs) -> None: + if len(args) == 2 and isinstance(args[0], Job): + # assuming second is list of objects that str() to influx protocol lines + data = self._convert(args[0], args[1]) + self._insert(data) + else: + raise Exception("Wrong arguments for InfluxDB inserter.") + + def _insert(self, lines: typing.Iterable): + data = "\n".join(map(str, lines)) + print("===== Would insert:\n" + data) + + def _convert(self, job: Job, data) -> typing.Iterable[Line]: + def c(name, value): + if isinstance(value, Line): + return value + elif isinstance(value, int) or isinstance(value, str) or isinstance(value, float): + return Line(name, {}, {'value': value}) + else: + raise Exception("Cannot simply insert value of type: {} for job {}".format(type(value), job)) + + measurement = _get_measurement_name(job) + if isinstance(data, collections.Iterable): + return map(lambda v: c(measurement, v), data) + else: + return [c(measurement, data)] + + +class Inserter(Dumper): + def __init__(self, url: str) -> None: + super().__init__() + self._url: str = url + + def _insert(self, lines: typing.Iterable): + try: + data = "\n".join(map(str, lines)).encode('utf-8') + try: + with urlopen(self._url, data) as fh: + logging.debug("InfluxDB successful answer: %s", self._repr.repr(fh.read().decode('utf-8'))) + except Exception: + logging.exception("Failed insert of:\n%s", self._repr.repr(lines)) + except Exception: + logging.exception("Failed formatting of:\n%s", self._repr.repr(lines)) + + def __repr__(self): + return f"<{self.__class__.__module__}.{self.__class__.__name__} url={repr(self._url)}>" -- cgit v1.2.1