summaryrefslogtreecommitdiff
path: root/scheduler/influxdb.py
blob: ecfa6ea8c351a2929305f614f8e0d21aec5dceb6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import logging
import reprlib
import typing
from urllib.request import urlopen

import collections
from pyinflux.client import Line

from . import Job


def _get_measurement_name(job: Job):
    if 'measurement' in job.properties:
        return job.properties['measurement']
    else:
        return job.name


class Dumper:
    def __init__(self):
        self._repr = reprlib.Repr()

    def __call__(self, *args, **kwargs) -> None:
        if len(args) == 2 and isinstance(args[0], Job):
            # assuming second is list of objects that str() to influx protocol lines
            data = self._convert(args[0], args[1])
            self._insert(data)
        else:
            raise Exception("Wrong arguments for InfluxDB inserter.")

    def _insert(self, lines: typing.Iterable):
        data = "\n".join(map(str, lines))
        print("===== Would insert:\n" + data)

    def _convert(self, job: Job, data) -> typing.Iterable[Line]:
        def c(name, value):
            if isinstance(value, Line):
                return value
            elif isinstance(value, int) or isinstance(value, str) or isinstance(value, float):
                return Line(name, {}, {'value': value})
            else:
                raise Exception("Cannot simply insert value of type: {} for job {}".format(type(value), job))

        measurement = _get_measurement_name(job)
        if isinstance(data, collections.Iterable):
            return map(lambda v: c(measurement, v), data)
        else:
            return [c(measurement, data)]


class Inserter(Dumper):
    def __init__(self, url: str) -> None:
        super().__init__()
        self._url: str = url

    def _insert(self, lines: typing.Iterable):
        try:
            data = "\n".join(map(str, lines)).encode('utf-8')
            try:
                with urlopen(self._url, data) as fh:
                    logging.debug("InfluxDB successful answer: %s", self._repr.repr(fh.read().decode('utf-8')))
            except Exception:
                logging.exception("Failed insert of:\n%s", self._repr.repr(lines))
        except Exception:
            logging.exception("Failed formatting of:\n%s", self._repr.repr(lines))

    def __repr__(self):
        return f"<{self.__class__.__module__}.{self.__class__.__name__} url={repr(self._url)}>"