diff options
-rw-r--r-- | .gitignore | 5 | ||||
-rw-r--r-- | README.md | 34 | ||||
-rw-r--r-- | jobs/__init__.py | 1 | ||||
-rwxr-xr-x | jobs/clever_tanken.py | 114 | ||||
-rwxr-xr-x | jobs/davis_vantage.py | 21 | ||||
-rwxr-xr-x | jobs/esg.py | 88 | ||||
-rwxr-xr-x | jobs/hplq1300n.py | 26 | ||||
-rwxr-xr-x | jobs/prix_carburant.py | 107 | ||||
-rwxr-xr-x | jobs/swr_wetter.py | 50 | ||||
-rwxr-xr-x | jobs/sys_network.py | 15 | ||||
-rwxr-xr-x | jobs/sys_network_rate.py | 22 | ||||
-rwxr-xr-x | jobs/tankerkoenig.py | 36 | ||||
-rwxr-xr-x | jobs/telexoo.py | 55 | ||||
-rwxr-xr-x | jobs/transferwise.py | 55 | ||||
-rw-r--r-- | scheduler/__init__.py | 300 | ||||
-rw-r--r-- | scheduler/influxdb.py | 68 | ||||
-rw-r--r-- | scheduler/test_scheduler.py | 171 | ||||
-rwxr-xr-x | tab_main.py | 133 |
18 files changed, 1301 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2dba87a --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +__pycache__ +*.pyc +error.log +/.idea/ +/.mypy_cache/
\ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..66ec733 --- /dev/null +++ b/README.md @@ -0,0 +1,34 @@ +# Währungen + +* Kurzbezeichung + * EURCHF + * EURCHF=X + * 1 EUR = x CHF +* meint: Agentur gibt x CHF für 1 EUR. + * Yahoo: EURCHF=X - Bid = Geldkurs = NOTEN, Ask = Briefkurs = DEVISEN + * Bid = Verlange = Kaufangebot, Ask = Biete = Verkaufsangebot + +# Usage + +Before executing any code: Check usage policy on fetched site. Acquire API-Keys where necessary. + +## Unittests Scheduler + +```python +python3 -munittest +``` + +## Testing jobs + +```python +python3 -mjobs.esg +``` + +# Requirements + +``` +# (for influxdb) pyinflux (github.com/yvesf/pyinflux, without [parser]), not on pypi +currencies==2014.7.13 +funcparserlib==0.3.6 +lxml +```
\ No newline at end of file diff --git a/jobs/__init__.py b/jobs/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/jobs/__init__.py @@ -0,0 +1 @@ + diff --git a/jobs/clever_tanken.py b/jobs/clever_tanken.py new file mode 100755 index 0000000..8418b11 --- /dev/null +++ b/jobs/clever_tanken.py @@ -0,0 +1,114 @@ +import codecs +import logging +from enum import Enum +from html.parser import HTMLParser +from urllib import request + +State = Enum('State', 'fuel_name fuel_price station_name idle') + + +class Tankstelle: + def __init__(self): + self.name = "" + self.preise = {} + self.id = None + + def __repr__(self): + return "{}: {} {}".format(type(self).__name__, self.name, self.preise) + + +class Parser(HTMLParser): + def error(self, message): + logging.error("Parser error: %s", message) + + def __init__(self): + super().__init__() + self.tankstelle = Tankstelle() + self._current_fuel_name = None + self._state = State.idle + + def get_prix(self): + for key, value in self.tankstelle.preise.items(): + self.tankstelle.preise[key] = float(value) + return self.tankstelle + + def handle_starttag(self, tag, attrs): + attrs = dict(attrs) + if self._state == State.idle: + if tag == "div" and attrs.get('class') == 'fuel-price-type': + self._state = State.fuel_name + self._current_fuel_name = "" + if tag == "span" and (attrs.get('id') == "main-content-fuel-station-header-name" + or attrs.get('itemprop') == "http://schema.org/addressCountry"): + self._state = State.station_name + elif self._current_fuel_name is not None and tag == "span" and attrs.get('ng-bind') == "display_preis": + self._state = State.fuel_price + + def handle_endtag(self, tag): + if self._state == State.fuel_name and tag in ('span', 'div'): + self._state = State.idle + elif self._state == State.station_name and tag in ('span'): + self._state = State.idle + elif self._state == State.fuel_price and tag == 'span': + self._state = State.idle + preis = self.tankstelle.preise[self._current_fuel_name].strip() + if preis == "": + del self.tankstelle.preise[self._current_fuel_name] + else: + self.tankstelle.preise[self._current_fuel_name] = float(preis) + self._current_fuel_name = None + + def handle_data(self, data: str): + if self._state == State.fuel_name: + self._current_fuel_name += data.strip().replace(':', '') + self.tankstelle.preise[self._current_fuel_name] = "" + elif self._state == State.fuel_price: + self.tankstelle.preise[self._current_fuel_name] += data + elif self._state == State.station_name: + if len(data.strip()) > 0: + if len(self.tankstelle.name) > 0: + self.tankstelle.name += " " + self.tankstelle.name += data.strip() + + +URL = "http://www.clever-tanken.de/tankstelle_details/" + + +def execute(station_id: str): + parser = Parser() + r = request.Request(URL + station_id) + r.add_header('Host', 'www.clever-tanken.de') + r.add_header('User-Agent', 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:30.0) Gecko/20100101 Firefox/30.0') + try: + with request.urlopen(r) as f: + f2 = codecs.getreader('utf-8')(f) + f2.errors = 'ignore' + for line in f2.readlines(): + parser.feed(line) + + tankstelle = parser.tankstelle + tankstelle.id = station_id + return tankstelle + except Exception as e: + logging.error("Failed for station: %s", station_id) + raise e + + + +if __name__ == "__main__": + from pprint import pprint + + pprint(list(map(execute, [ + '20219', '11985', '17004', + '19715', # Kaiserst. Mineralölvertrieb Schwärzle + '54296', # ESSO Endingen + '10355', # ARAL Tiengen + '20144', # bft Rankackerweg + '27534', # EXTROL Freiburg + '55690', # Rheinmünster + '15220', # Esso Achern + '5853', # JET Rastatt + '24048', # Bodersweier + '27534', + '3819']) # JET Freiburg + )) diff --git a/jobs/davis_vantage.py b/jobs/davis_vantage.py new file mode 100755 index 0000000..7a46515 --- /dev/null +++ b/jobs/davis_vantage.py @@ -0,0 +1,21 @@ +import codecs +import json +from urllib.request import urlopen, Request + + +def load(url: str): + request = Request(url) + request.add_header( + "User-Agent", + "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; FSL 7.0.6.01001)") + with urlopen(request) as f: + f2 = codecs.getreader('utf-8')(f) + data = json.load(f2) + data = {i: data[i] for i in data if type(data[i]) in (int, float)} + return data + + +if __name__ == "__main__": + from pprint import pprint + + pprint(list(load('http://wettermichel.de/davis/con_davis.php'))) diff --git a/jobs/esg.py b/jobs/esg.py new file mode 100755 index 0000000..a2bb025 --- /dev/null +++ b/jobs/esg.py @@ -0,0 +1,88 @@ +import codecs +import logging +import urllib.parse +import urllib.request +from enum import Enum +from html.parser import HTMLParser + + +class Product: + def __init__(self): + self.price = "" + self.name = "" + self.sku = None + + def __repr__(self): + return "<{} name={} price={} sku={}>".format( + self.__class__, self.name, self.price, self.sku) + + +State = Enum('State', 'parsing product product_name price idle') + + +class Parser(HTMLParser): + def error(self, message): + logging.error("Parser error: %s", message) + + def __init__(self): + super().__init__() + self.products = [] + self.current = None + self.state = State.idle + + def handle_starttag(self, tag, attrs): + attrs = dict(attrs) + if self.state == State.idle and tag == "tr" and "data-sku" in attrs: + self.current = Product() + self.current.sku = attrs["data-sku"] + self.state = State.parsing + elif self.state == State.parsing and tag == 'h3' and \ + "class" in attrs and attrs['class'] == 'product-name': + self.state = State.product_name + elif self.state == State.parsing and tag == 'span' and \ + "class" in attrs and attrs['class'] == "price": + self.state = State.price + + def handle_endtag(self, tag): + if self.state == State.product_name and tag == 'a': + self.state = State.parsing + elif self.state == State.price and tag == 'span': + self.state = State.parsing + + if self.current and self.current.name and \ + self.current.price and self.current.sku: + self.current.name = self.current.name.strip() + price = self.current.price + price = price.replace(".", "").replace(",", ".").split("\xa0")[0] + self.current.price = float(price) + self.products += [self.current] + self.current = None + self.state = State.idle + + def handle_data(self, data): + if self.state == State.product_name: + self.current.name += data + if self.state == State.price: + self.current.price += data + + +URL = "http://www.edelmetall-handel.de/quickbuy/twozero/" + + +def execute(): + """Always fetches full catalog""" + request = urllib.request.Request(URL) + with urllib.request.urlopen(request) as f: + # with open("index.html", 'rb') as f: + f2 = codecs.getreader('utf-8')(f) + f2.errors = 'ignore' + parser = Parser() + for line in f2.readlines(): + parser.feed(line) + return parser.products + + +if __name__ == "__main__": + from pprint import pprint + + pprint(execute()) diff --git a/jobs/hplq1300n.py b/jobs/hplq1300n.py new file mode 100755 index 0000000..b77a323 --- /dev/null +++ b/jobs/hplq1300n.py @@ -0,0 +1,26 @@ +import codecs +import re +import urllib.request +from collections import namedtuple + +Data = namedtuple('Data', ['hostname', 'value']) + +URL = "http://{}/hp/device/info_suppliesStatus.html" + + +def job(host: str) -> Data: + url = URL.format(host) + name = host.replace(".", "_") + request = urllib.request.Request(url) + with urllib.request.urlopen(request) as f: + f2 = codecs.getreader('utf-8')(f) + for line in f2.readlines(): + m = re.match(".*>([0-9]*)%<br", line) + if m: + return Data(name, int(m.groups()[0])) + + +if __name__ == "__main__": + from pprint import pprint + + pprint(job("10.1.0.10")) diff --git a/jobs/prix_carburant.py b/jobs/prix_carburant.py new file mode 100755 index 0000000..21e6c26 --- /dev/null +++ b/jobs/prix_carburant.py @@ -0,0 +1,107 @@ +import codecs +import logging +import typing +from enum import Enum +from html.parser import HTMLParser +from urllib import request + + +class Station: + def __init__(self): + self.station_name = "" + self.prices = {} + self.id = None + + def clean(self): + self.prices = filter(lambda kv: kv[1] != '', self.prices.items()) + self.prices = dict(map(lambda kv: (kv[0], float(kv[1])), self.prices)) + + def __repr__(self): + return "Prix: {} {}".format(self.station_name, self.prices) + + +State = Enum('State', 'pricelist fuel_name fuel_price station_name idle') + + +class Parser(HTMLParser): + def error(self, message): + logging.error("Parser error: %s", message) + + def __init__(self): + super().__init__() + self._prix = Station() + self._current_fuel_name = "" + self._state = State.idle + + def get_prix(self): + self._prix.clean() + return self._prix + + def handle_starttag(self, tag, attrs): + attrs = dict(attrs) + if self._state == State.idle and tag == "div" and "id" in attrs and attrs['id'] == 'prix': + self._state = State.pricelist + elif self._state in [State.pricelist, State.fuel_price] and tag == 'strong': + self._state = State.fuel_name + self._current_fuel_name = '' + elif self._state == State.idle and tag == 'div' and 'id' in attrs and attrs['id'] == 'colg': + self._state = State.station_name + + def handle_endtag(self, tag): + if self._state == State.pricelist and tag == 'div': + self._state = State.idle + elif self._state == State.fuel_name and tag == 'strong': + self._state = State.fuel_price + elif self._state == State.fuel_price and tag == 'div': + self._state = State.idle + elif self._state == State.station_name and tag == 'p': + self._state = State.idle + + def handle_data(self, data: str): + if self._state == State.fuel_name: + self._current_fuel_name += data.strip().replace(':', '') + self._prix.prices[self._current_fuel_name] = "" + elif self._state == State.fuel_price: + if data.strip() != "0.000": + self._prix.prices[self._current_fuel_name] += data.strip() + elif self._state == State.station_name: + if len(data.strip()) > 0: + self._prix.station_name += data.strip() + ". " + + +URL = "http://www.prix-carburants.gouv.fr/map/recupererInfosPdv/" + + +def _execute(station_id: str): + parser = Parser() + r = request.Request(URL + station_id) + r.add_header('Host', 'www.prix-carburants.gouv.fr') + r.add_header('Referer', 'http://www.prix-carburants.gouv.fr/recherche/map') + r.add_header('User-Agent', 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:30.0) Gecko/20100101 Firefox/30.0') + r.add_header('X-Requested-With', 'XMLHttpRequest') + r.add_header('X-Prototype-Version', '1.7') + r.add_header('Connection', 'close') + with request.urlopen(r) as f: + # with open("info.html", 'rb') as f: + f2 = codecs.getreader('utf-8')(f) + f2.errors = 'ignore' + for line in f2.readlines(): + parser.feed(line) + + try: + prix = parser.get_prix() + prix.id = station_id + return prix + except Exception as e: + raise Exception("Failed for station: {}".format(station_id), e) + + +def execute(*ids) -> typing.Iterable[Station]: + for station_id in ids: + yield _execute(station_id) + + +if __name__ == "__main__": + from pprint import pprint + + pprint(list(execute('1630001', '67760001', '1210003', '1630003', '1210002', '1710001'))) diff --git a/jobs/swr_wetter.py b/jobs/swr_wetter.py new file mode 100755 index 0000000..bb464c1 --- /dev/null +++ b/jobs/swr_wetter.py @@ -0,0 +1,50 @@ +import codecs +import json +import re +import urllib.parse +import urllib.request + +URL = "http://www.swr.de/-/id=5491998/cf=42/did=13968954/format=json/nid=5491998/17ag7cb/index.json" + + +def job(cc): + """ + cc: id of the region. See webpage: http://www.swr.de/wetter + """ + params = urllib.parse.urlencode({'cc': cc}) + request = urllib.request.Request(URL + "?" + params) + request.add_header( + "User-Agent", + "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; FSL 7.0.6.01001)") + + def transformDict(name, value_dict): + for (key, value) in value_dict.items(): + if key in ['timestamp', 'dayForecast']: + continue + if value == "k. A.": + continue + elif re.match("^-?[1-9]+[0-9]*$", value) or value == "0": + value = int(value) + elif re.match("^-?[1-9]+[0-9]*.?[0-9]*$", value): + value = float(value) + yield {'name' : "{}.{}.{}".format(basename, name, key), 'value': value} + + with urllib.request.urlopen(request) as f: + f2 = codecs.getreader('utf-8')(f) + response = json.load(f2) + basename = "swr_wetter.{stateCode}.{regionCode}.{name}".format( + **response['availableLocations'][cc]) + + for d in transformDict("current", response['current'][cc]): + yield d + + for (day, value) in response['forecast'].items(): + value = value[cc] + for d in transformDict("forecast." + day, value): + yield d + + +if __name__ == "__main__": + from pprint import pprint + + pprint(list(job("DE0008834"))) diff --git a/jobs/sys_network.py b/jobs/sys_network.py new file mode 100755 index 0000000..bbd6a0b --- /dev/null +++ b/jobs/sys_network.py @@ -0,0 +1,15 @@ +import socket + + +def job(device, stat_type): + f = open('/sys/class/net/' + device + '/statistics/' + stat_type, 'r') + value = f.read() + ivalue = int(value.replace("\n", "")) + f.close() + return {'hostname': socket.gethostname(), 'device': device, 'stat_type': stat_type, 'value': ivalue} + + +if __name__ == "__main__": + from pprint import pprint + + pprint(job("eth1", "rx_bytes")) diff --git a/jobs/sys_network_rate.py b/jobs/sys_network_rate.py new file mode 100755 index 0000000..3a5cc71 --- /dev/null +++ b/jobs/sys_network_rate.py @@ -0,0 +1,22 @@ +from collections import namedtuple + +Data = namedtuple('Data', ['hostname', 'device', 'entry', 'value']) +temp = {} + +def job(device, entry, intervals): + global temp + f = open('/sys/class/net/' + device + '/statistics/' + entry, 'r') + value = f.read() + ivalue = int(value.replace("\n", "")) + f.close() + + return_value = [] + if entry in temp: + rate = (ivalue - temp[entry]) / intervals # bytes/s + if rate > 0: + # prevent counter overflows + return_value = [Data('laer.2.localnet.cc', device, entry, rate)] + + temp[entry] = ivalue + + return return_value diff --git a/jobs/tankerkoenig.py b/jobs/tankerkoenig.py new file mode 100755 index 0000000..e78853d --- /dev/null +++ b/jobs/tankerkoenig.py @@ -0,0 +1,36 @@ +import codecs +import json +import logging +import typing +from collections import namedtuple +from urllib import request + +Data = namedtuple('Data', ['name', 'id', 'type', 'price']) + + +URL = "https://creativecommons.tankerkoenig.de/json/list.php?lat={lat}&lng={lng}&rad={rad}&sort=dist&type=all&apikey={api_key}" + + +def execute(api_key : str, lat: float, lng : float, rad: float) -> typing.Iterable[Data]: + url = URL.format(api_key=api_key, rad=rad, lat=lat, lng=lng) + r = request.Request(url) + try: + with request.urlopen(r) as f: + f2 = codecs.getreader('utf-8')(f) + data = json.load(f2) + if not data['status'] == 'ok': + raise Exception("Error %s", data['message']) + for station in data['stations']: + name = "{} - {} - {}".format(station['place'], station['brand'], station['name']) + if not station['isOpen'] == True: + continue + + if "diesel" in station: + yield Data(name, station['id'], 'Diesel', station['diesel']) + if "e5" in station: + yield Data(name, station['id'], 'SP95-E5', station['e5']) + if "e10" in station: + yield Data(name, station['id'], 'SP95-E10', station['e10']) + except Exception as e: + logging.error("Failed for: %f %f %f", lat, lng, rad) + raise e diff --git a/jobs/telexoo.py b/jobs/telexoo.py new file mode 100755 index 0000000..110ba72 --- /dev/null +++ b/jobs/telexoo.py @@ -0,0 +1,55 @@ +import codecs +import json +import random +import re +import urllib.parse +import urllib.request +from collections import namedtuple +from decimal import Decimal + +from currencies.config import * + +URL = "https://telexoo.tegona.com/convert/" + +Quote = namedtuple('Quote', ['curr_from', 'curr_to', 'rate']) + + +def execute(curr_from, curr_to): + MULT = random.randint(1000, 9999) + CURRENCY = { + MONEY_CURRENCY_EUR: "EUR", + MONEY_CURRENCY_CHF: "CHF", + MONEY_CURRENCY_USD: "USD", + MONEY_CURRENCY_GBP: "GBP", + MONEY_CURRENCY_PLN: "PLN" + } + curr_from = CURRENCY[curr_from] + curr_to = CURRENCY[curr_to] + params = urllib.parse.urlencode({ + 's1': curr_from, + 's2': curr_to, + 'amount': str(MULT), + 'action': 'sell', + 'language': 'en', + 'verbose': '0', + }) + request = urllib.request.Request(URL + "?" + params) + request.add_header( + "User-Agent", + "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; FSL 7.0.6.01001)") + with urllib.request.urlopen(request) as f: + f2 = codecs.getreader('utf-8')(f) + response = json.load(f2) + result_raw = response[0]['result'].replace(",", "") + match = re.match("^{} ([0-9\.]*)$".format(curr_to), result_raw) + if not match: + raise Exception("Invalid response in 'result' field") + result = Decimal(match.groups()[0]) / MULT + return Quote(curr_to, curr_to, float(result)) + + +if __name__ == "__main__": + from pprint import pprint + + pprint(execute("CHF", "EUR")) + pprint(execute("CHF", "GBP")) diff --git a/jobs/transferwise.py b/jobs/transferwise.py new file mode 100755 index 0000000..a2e30d1 --- /dev/null +++ b/jobs/transferwise.py @@ -0,0 +1,55 @@ +import codecs +import json +import random +import re +import urllib.parse +import urllib.request +from collections import namedtuple + +APP_URL = "https://transferwise.com/fr/" +URL = "https://transferwise.com/api/v1/payment/calculate" +UA = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:30.0) Gecko/20100101 Firefox/30.0" + +Data = namedtuple('Data', ['curr_from', 'curr_to', 'rate']) + +def get_token(): + request = urllib.request.Request(APP_URL) + request.add_header("User-Agent", UA) + + with urllib.request.urlopen(request) as f: + f2 = codecs.getreader('utf-8')(f) + for line in f2.readlines(): + m = re.match(".*config.appToken.*'(.+)'.*", line) + if m: + g = m.groups() + return g[0] + + +def job(currFrom, currTo): + token = get_token() + MULT = random.randint(100, 100000) + data = urllib.parse.urlencode({ + 'amount': str(MULT), + 'amountCurrency': 'source', + 'hasDiscount': 'false', + 'isFixedRate': 'false', + 'isGuaranteedFixedTarget': 'false', + 'sourceCurrency': currFrom, + 'targetCurrency': currTo, + }) + # print (URL + "?" + data) + request = urllib.request.Request(URL + "?" + data) + request.add_header("X-Authorization-key", token) + request.add_header("X-Authorization-token", "") + request.add_header("User-Agent", UA) + with urllib.request.urlopen(request) as f: + f2 = codecs.getreader('utf-8')(f) + response = json.load(f2) + return Data(currFrom, currTo, float(response['transferwiseRate'])) + + +if __name__ == "__main__": + from pprint import pprint + + pprint(job("CHF", "EUR")) + pprint(job("CHF", "GBP")) diff --git a/scheduler/__init__.py b/scheduler/__init__.py new file mode 100644 index 0000000..3212678 --- /dev/null +++ b/scheduler/__init__.py @@ -0,0 +1,300 @@ +import calendar +import datetime +import inspect +import logging +import re +import reprlib +import sched +import time +import typing + + +def time_ns() -> int: + """:return: the current time in nanoseconds""" + t = time.clock_gettime(time.CLOCK_REALTIME) + t_ns = int(t * 1000 * 1000 * 1000) + return t_ns + + +def datetime_from_ns(ns: int) -> datetime.datetime: + """:return: nanoseconds converted to python datetime class""" + return datetime.datetime.fromtimestamp(ns / 1000 / 1000 / 1000) + + +def timedelta_ns(**kwargs) -> int: + """:class:`datetime.timedelta` converted to nanoseconds""" + return int(datetime.timedelta(**kwargs).total_seconds() * 1000 * 1000 * 1000) + + +def sleep_ns(t) -> None: + time.sleep(t / 1000 / 1000 / 1000) + + +class Job(object): + """Base-Class for jobs that are scheduled in :class:`Scheduler`""" + def __init__(self, name: str, **kwargs) -> None: + self.name: str = name + self.properties = kwargs + self._execute_funcs: typing.List[typing.Callable[..., typing.Any]] = [] + + def next(self, start_ns: int, t_ns: int, t_max_ns: int) -> typing.Optional[int]: + """ + :param start_ns: start of scheduler + :return: next run of this job after time 't' or None if no run can be calculated in t_max_ns time + """ + raise NotImplementedError() + + def add_action(self, func): + if func is not None: + sig = inspect.signature(func) + if not len(sig.parameters) in [0, 1, 2]: + raise Exception("Wrong number of parameters to action") + self._execute_funcs.append(func) + return self + + def execute(self, scheduler) -> typing.Any: + for e in self._execute_funcs: + sig = inspect.signature(e) + if len(sig.parameters) == 0: + return e() + elif len(sig.parameters) == 1: + return e(self) + elif len(sig.parameters) == 2: + return e(scheduler, self) + + def __repr__(self) -> str: + return "<{cls.__name__} name={name} {conf}>".format(cls=self.__class__, name=repr(self.name), + conf=self.__repr_config__()) + + def __repr_config__(self) -> str: + return " " + + +def every(seconds: int = 0, minutes: int = 0, hours: int = 0, name='Unnamed-Job', action=None) -> Job: + """ + Run a job in intervals. + + :param seconds: add seconds to the interval. Defalut: 0 + :param minutes: add minutes to the interval. Default: 0 + :param hours: add hours to the interval. Default: 0 + :param name: Name of the Job + :param action: a function to be executed, see :func:`Job.execute` + :return: The job to be added to :class:`Scheduler` + """ + n = seconds * 1000 * 1000 * 1000 + \ + minutes * 1000 * 1000 * 1000 * 60 + \ + hours * 1000 * 1000 * 1000 * 60 * 60 + j = PeriodicJob(name, n) + j.add_action(action) + return j + + +class PeriodicJob(Job): + def __init__(self, name: str, interval: float, **kwargs) -> None: + super().__init__(name, **kwargs) + self.interval: float = interval + + def next(self, start_ns: int, t_ns: int, t_max_ns: int) -> typing.Optional[int]: + t_since_start = t_ns - start_ns + tn = t_since_start % self.interval + t_next_ns = int(t_ns + (self.interval - tn)) + if t_next_ns < t_max_ns: + return t_next_ns + else: + return None + + def __repr_config__(self) -> str: + return " interval=" + str(self.interval) + + +_pattern_value = re.compile(r'^[0-9]+$') +_pattern_range = re.compile(r'([0-9]+)-([0-9]+)$') +_pattern_asterisk = re.compile(r'\*/([0-9]+)$') + + +def make_test_expr(expr: str) -> typing.Callable[[int], bool]: + def parse(s: str): + if s == '*': + return lambda val: True + else: + m = _pattern_value.match(s) + if m is not None: + return lambda val: str(val) == s + m = _pattern_range.match(s) + if m is not None: + start, end = map(int, m.groups()) + return lambda val: start <= val <= end + m = _pattern_asterisk.match(s) + if m is not None: + mod = int(m.groups()[0]) + return lambda val: val % mod == 0 + raise Exception("More complex cron expression is not supported") + + exprs = expr.split(',') + if len(expr) == 1: + return parse(expr) + else: + funcs = map(parse, exprs) + return lambda val: any(map(lambda func: func(val), funcs)) + + +def _generator(timedelta_func, datetime_func, expr_func): + def f(start_ns: int, stop_ns: int): + dt = datetime_from_ns(start_ns) + while int(dt.timestamp() * 1000 * 1000 * 1000) < stop_ns: + _dt = datetime_func(dt) + delta = timedelta_func(_dt) + # print("{} - {}".format(_dt, delta)) + t_ns = int(dt.timestamp() * 1000 * 1000 * 1000) + if t_ns >= start_ns and expr_func(_dt): + yield (int(dt.timestamp() * 1000 * 1000 * 1000), + int(((_dt + delta - datetime.timedelta(minutes=1)).timestamp() * 1000 * 1000 * 1000))) + dt = _dt + delta + + return f + + +def combine(start_ns: int, stop_ns: int, funcs): + f = funcs[0] + for (_start_ns, stop_ns) in f(start_ns, stop_ns): + # print("{}: {} {} -> {} {}".format(repr(f), _start_ns, datetime_from_ns(start_ns), stop_ns, datetime_from_ns(stop_ns))) + if (len(funcs)) > 1: + r = combine(_start_ns, stop_ns, funcs[1:]) + if r is not None: + return r + elif _start_ns >= start_ns: + # print("Found {} ".format(start_ns)) + return _start_ns + + +class CronJob(Job): + def __init__(self, name: str, minute: str, hour: str, dow: str, dom: str, month: str, **kwargs) -> None: + super().__init__(name, **kwargs) + self.minute = minute + self.minute_f = _generator(lambda dt: datetime.timedelta(minutes=1), + lambda dt: datetime.datetime(year=dt.year, month=dt.month, day=dt.day, + hour=dt.hour, minute=dt.minute), + lambda dt: make_test_expr(minute)(dt.minute)) + self.hour = hour + self.hour_f = _generator(lambda dt: datetime.timedelta(hours=1), + lambda dt: datetime.datetime(year=dt.year, month=dt.month, day=dt.day, hour=dt.hour), + lambda dt: make_test_expr(hour)(dt.hour)) + self.dow = dow + self.dow_f = _generator(lambda dt: datetime.timedelta(days=1), + lambda dt: datetime.datetime(year=dt.year, month=dt.month, day=dt.day), + lambda dt: make_test_expr(dow)(dt.isoweekday())) + self.dom = dom + self.dom_f = _generator(lambda dt: datetime.timedelta(days=1), + lambda dt: datetime.datetime(year=dt.year, month=dt.month, day=dt.day), + lambda dt: make_test_expr(dom)(dt.day)) + self.month = month + self.month_f = _generator(lambda dt: datetime.timedelta(days=calendar.monthrange(dt.year, dt.month)[1]), + lambda dt: datetime.datetime(year=dt.year, month=dt.month, day=1), + lambda dt: make_test_expr(month)(dt.month)) + + def next(self, start_ns: int, t_ns: int, t_max_ns: int): + stop_ns = t_ns + t_max_ns + t_ns = t_ns - t_ns % (1000 * 1000 * 1000) # round current timestamp down to minutes + n = combine(t_ns + timedelta_ns(minutes=1), stop_ns, + [self.month_f, self.dom_f, self.dow_f, self.hour_f, self.minute_f]) + return n + + def __repr_config__(self): + return " minute={_.minute} hour={_.hour} dow={_.dow} dom={_.dom} month={_.month}".format(_=self) + + +def at(minute: str = '*', hour: str = '*', day_of_week: str = '*', day_of_month: str = '*', month: str = '*', + name='Unnamed-Job', action=None, **kwargs): + job = CronJob(name, minute, hour, day_of_week, day_of_month, month, **kwargs) + job.add_action(action) + return job + + +def cron(cron_expr: str, name='Unnamed-Job', action=None, **kwargs): + groups = list(filter(lambda x: x != '', cron_expr.split(' '))) + if len(groups) != 5: + raise Exception("Invalid cron expression, failed to find minute-hour-dow-dom-month pattern") + + minute, hour, day_of_month, month, day_of_week = groups + return at(minute, hour, day_of_week, day_of_month, month, name, action, **kwargs) + + +# noinspection PyProtectedMember +class Scheduler(object): + def __init__(self): + self._scheduler = sched.scheduler(timefunc=time_ns, delayfunc=sleep_ns) + self._jobs : typing.Dict[Job, typing.Optional[int, None]] = {} + self._processors : typing.List[typing.Callable[[Job, typing.Any], None]] = [] + self._time_start_ns :int = time_ns() + self._lookahead_ns : int = 1000 * 1000 * 1000 * 60 * 120 + self._repr = reprlib.Repr() + + def remove_job_by_name(self, name : str): + with self._scheduler._lock: + remove = [] + for job in filter(lambda j: j.name == name, self._jobs.keys()): + remove.append(job) + self._scheduler.cancel(self._jobs[job]) + + for job in remove: + del self._jobs[job] + + def get_job_by_name(self, name : str) -> typing.Optional[Job]: + with self._scheduler._lock: + for job in filter(lambda j: j.name == name, self._jobs.keys()): + return job + return None + + def add_job(self, job: Job): + with self._scheduler._lock: + if job.name in map(lambda j: j.name, self._jobs.keys()): + raise Exception("Job with name '{}' exists".format(job.name)) + self._jobs[job] = None + self._schedule_job_run(job) + return self + + def add_processor(self, processor : typing.Callable[[Job, typing.Any], None]) -> None: + with self._scheduler._lock: + logging.info("Add processor %s", processor) + self._processors.append(processor) + + def remove_processor(self, processor : typing.Callable[[Job, typing.Any], None]) -> None: + with self._scheduler._lock: + self._processors.remove(processor) + + def _process_func(self, job: Job): + def execute(): + try: + logging.info("Execute job %s", job) + result = job.execute(self) + for p in self._processors: + value_repr = self._repr.repr(result) + logging.info("Execute result processor %s for job %s result: %s", p, job, value_repr) + try: + p(job, result) + except: + logging.exception("Execute result processor %s for job %s failed", p, job) + logging.info("Execution finished for job %s", job) + except: + logging.exception("Exception while job %s", job) + finally: + # re-schedule for next execution + self._schedule_job_run(job) + + return execute + + def _schedule_job_run(self, job): + now_ns = time_ns() + stop_ns = now_ns + self._lookahead_ns + next_ns = job.next(self._time_start_ns, now_ns, stop_ns) + if next_ns is not None: + logging.info("Schedule {} in {}ns / at {}".format(job, next_ns - now_ns, datetime_from_ns(next_ns))) + id = self._scheduler.enterabs(next_ns, 0, self._process_func(job)) + else: + logging.info("No next schedule for job {}. Retry in 10min".format(job)) + id = self._scheduler.enterabs(now_ns + (1000 * 1000 * 1000 * 10 * 60), 0, lambda: self._schedule_job_run(job)) + self._jobs[job] = id + + def start(self, blocking: bool = True): + logging.info("Start scheduler (blocking=%s)", blocking) + self._scheduler.run(blocking) diff --git a/scheduler/influxdb.py b/scheduler/influxdb.py new file mode 100644 index 0000000..ecfa6ea --- /dev/null +++ b/scheduler/influxdb.py @@ -0,0 +1,68 @@ +import logging +import reprlib +import typing +from urllib.request import urlopen + +import collections +from pyinflux.client import Line + +from . import Job + + +def _get_measurement_name(job: Job): + if 'measurement' in job.properties: + return job.properties['measurement'] + else: + return job.name + + +class Dumper: + def __init__(self): + self._repr = reprlib.Repr() + + def __call__(self, *args, **kwargs) -> None: + if len(args) == 2 and isinstance(args[0], Job): + # assuming second is list of objects that str() to influx protocol lines + data = self._convert(args[0], args[1]) + self._insert(data) + else: + raise Exception("Wrong arguments for InfluxDB inserter.") + + def _insert(self, lines: typing.Iterable): + data = "\n".join(map(str, lines)) + print("===== Would insert:\n" + data) + + def _convert(self, job: Job, data) -> typing.Iterable[Line]: + def c(name, value): + if isinstance(value, Line): + return value + elif isinstance(value, int) or isinstance(value, str) or isinstance(value, float): + return Line(name, {}, {'value': value}) + else: + raise Exception("Cannot simply insert value of type: {} for job {}".format(type(value), job)) + + measurement = _get_measurement_name(job) + if isinstance(data, collections.Iterable): + return map(lambda v: c(measurement, v), data) + else: + return [c(measurement, data)] + + +class Inserter(Dumper): + def __init__(self, url: str) -> None: + super().__init__() + self._url: str = url + + def _insert(self, lines: typing.Iterable): + try: + data = "\n".join(map(str, lines)).encode('utf-8') + try: + with urlopen(self._url, data) as fh: + logging.debug("InfluxDB successful answer: %s", self._repr.repr(fh.read().decode('utf-8'))) + except Exception: + logging.exception("Failed insert of:\n%s", self._repr.repr(lines)) + except Exception: + logging.exception("Failed formatting of:\n%s", self._repr.repr(lines)) + + def __repr__(self): + return f"<{self.__class__.__module__}.{self.__class__.__name__} url={repr(self._url)}>" diff --git a/scheduler/test_scheduler.py b/scheduler/test_scheduler.py new file mode 100644 index 0000000..ee041cf --- /dev/null +++ b/scheduler/test_scheduler.py @@ -0,0 +1,171 @@ +import unittest + +from scheduler import * + + +class TestPeriodicJob(unittest.TestCase): + def test1(self): + job = every(seconds=10) + next_run = job.next(0, 0, 20 * 1000 * 1000 * 1000) + self.assertEqual(next_run, 10 * 1000 * 1000 * 1000) + + next_run = job.next(0, next_run, next_run + (20 * 1000 * 1000 * 1000)) + self.assertEqual(next_run, 20 * 1000 * 1000 * 1000) + + self.assertIsNone(job.next(0, next_run, 10)) + self.assertIsNone(job.next(0, next_run, next_run + (10 * 1000 * 1000 * 1000))) + next_run = job.next(0, next_run, next_run + (10 * 1000 * 1000 * 1000) + 1) + self.assertEqual(next_run, 30 * 1000 * 1000 * 1000) + + +class TestProcessor(unittest.TestCase): + def test1(self): + class P: + def __init__(self) -> None: + self.values = None + + def __call__(self, *args, **kwargs) -> None: + self.values = args + + s = Scheduler() + p = P() + s.add_processor(p) + + job = every(seconds=10, action=lambda: 1234) + s._process_func(job)() + + self.assertEqual(p.values, (job, 1234)) + + +class TestCronJob(unittest.TestCase): + def testas(self): + now = 1531173610000000000 + next_run = at(minute='10').next(0, now, 999000000000000000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=10, hour=0, minute=10)) + + def testAt2(self): + now = 1531173610000000000 + next_run = at(minute="0", hour="8,10,12,14,16,18,20", name="Test").next(0, now, 999000000000000000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=10, hour=8, minute=0)) + + next_run += datetime.timedelta(minutes=1).total_seconds() * 1000 * 1000 * 1000 + next_run = at(minute="0", hour="8,10,12,14,16,18,20", name="Test").next(0, next_run, 999000000000000000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=10, hour=10, minute=0)) + + def test3(self): + start = datetime.datetime(year=2018, month=7, day=10, hour=10, minute=0).timestamp() * 1000 * 1000 * 1000 + next_run = start + runs = [] + while next_run < start + timedelta_ns(days=60): + runs.append(next_run) + next_run = at(minute="0", hour="8,10,14", name="Test").next(0, next_run, next_run + timedelta_ns(days=2)) + + self.assertEqual(len(runs), 60 * 3) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=9, day=8, hour=10, minute=0, second=0)) + self.assertTrue( + all(map(lambda t: t.hour in [8, 10, 14] and t.minute == 0 and t.second == 0, map(datetime_from_ns, runs)))) + + + def testas3(self): + now = 1531173610000000000 + next_run = cron('10,20,30 * * * *').next(0, now, 999000000000000000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=10, hour=0, minute=10)) + + now = next_run + next_run = cron('10,20,30 * * * *').next(0, now, 999000000000000000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=10, hour=0, minute=20)) + + now = next_run + next_run = cron('10,20,30 * * * *').next(0, now, 999000000000000000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=10, hour=0, minute=30)) + + now = next_run + next_run = cron('10,20,30 * * * *').next(0, now, 999000000000000000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=10, hour=1, minute=10)) + + + def test4(self): + expr = '10-15 * * * *' + now = 1531173610000000000 + next_run = cron(expr).next(0, now, 999000000000000000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=10, hour=0, minute=10)) + next_run = cron(expr).next(0, next_run, 999000000000000000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=10, hour=0, minute=11)) + next_run = cron(expr).next(0, next_run, 999000000000000000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=10, hour=0, minute=12)) + next_run = cron(expr).next(0, next_run, 999000000000000000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=10, hour=0, minute=13)) + next_run = cron(expr).next(0, next_run, 999000000000000000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=10, hour=0, minute=14)) + next_run = cron(expr).next(0, next_run, 999000000000000000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=10, hour=0, minute=15)) + next_run = cron(expr).next(0, next_run, 999000000000000000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=10, hour=1, minute=10)) + + + def test5(self): + expr = '10 */20 * * *' + now = int(datetime.datetime(year=2018, month=7, day=10, hour=0, minute=0, + second=0).timestamp() * 1000 * 1000 * 1000) + next_run = cron(expr).next(0, now, 999000000000000000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=10, hour=0, minute=10)) + next_run = cron(expr).next(0, next_run, 999000000000000000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=10, hour=20, minute=10)) + next_run = cron(expr).next(0, next_run, 999000000000000000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=11, hour=0, minute=10)) + + def test6(self): + job = at(minute='*/15', hour='5-24', name='Clever-Tanken', action=None) + next_run = int(datetime.datetime(year=2018, month=7, day=17, hour=11, minute=51, + second=10).timestamp() * 1000 * 1000 * 1000) + + next_run = job.next(0, next_run, next_run + 60 * 60 * 1000 * 1000 * 1000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=17, hour=12, minute=0)) + + next_run = job.next(0, next_run, next_run + 60 * 60 * 1000 * 1000 * 1000) + self.assertEqual(datetime_from_ns(next_run), + datetime.datetime(year=2018, month=7, day=17, hour=12, minute=15)) + + + def test6(self): + job = at(minute="0", hour="8,10,12,14,16,18,20", name="ESG", action=None) + next_run = time_ns() #now + hours = [8,10,12,14,16,18,20] + + while hours != []: + next_run = job.next(0, next_run, next_run + 60 * 60 * 1000 * 1000 * 1000) + next_run_dt = datetime_from_ns(next_run) + self.assertTrue(next_run_dt.hour in hours) + self.assertTrue(next_run_dt.minute == 0) + hours.remove(next_run_dt.hour) + +class TestScheduler(unittest.TestCase): + def setUp(self): + self.scheduler = Scheduler() + + def test(self): + self.scheduler.add_job(at(minute='0', name='Test')) + try: + self.scheduler.add_job(at(minute='0', name='Test')) + self.assertFalse(True, 'must not happen') + except: + pass diff --git a/tab_main.py b/tab_main.py new file mode 100755 index 0000000..46ef699 --- /dev/null +++ b/tab_main.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +import logging + +from pyinflux.client import Line + +import jobs.clever_tanken +import jobs.davis_vantage +import jobs.esg +import jobs.hplq1300n +import jobs.prix_carburant +import jobs.swr_wetter +import jobs.tankerkoenig +import jobs.telexoo +import jobs.transferwise +import scheduler.influxdb + +logging.basicConfig(level=logging.INFO) + +s = scheduler.Scheduler() + +transform_swr = lambda v: Line(v['name'], {}, {'value': v['value']}) +s.add_job(scheduler.at(minute='0,15,30,45', name='SWR Wetter', + action=lambda: map(transform_swr, jobs.swr_wetter.job('DE0008834')))) + +transform_laserjet = lambda v: Line('hplq1300n.toner.{}'.format(v.hostname), {}, {'value': v.value}) +s.add_job(scheduler.at(minute='*/5', name="Laserjet Status", + action=lambda: transform_laserjet(jobs.hplq1300n.job('10.1.0.10')))) + +transform_telexoo = lambda qoute: Line('telexoo.{}{}_X'.format(qoute.curr_from, qoute.curr_to), {}, + {'value': qoute.rate}) +s.add_job(scheduler.every(minutes=10, name="Telexoo.com-CHFGBP", + action=lambda: transform_telexoo(jobs.telexoo.execute("CHF", "GBP")))) +s.add_job(scheduler.every(minutes=10, name="Telexoo.com-CHFEUR", + action=lambda: transform_telexoo(jobs.telexoo.execute("CHF", "EUR")))) +s.add_job(scheduler.every(minutes=10, name="Telexoo.com-EURCHF", + action=lambda: transform_telexoo(jobs.telexoo.execute("EUR", "CHF")))) +s.add_job(scheduler.every(minutes=10, name="Telexoo.com-CHFPLN", + action=lambda: transform_telexoo(jobs.telexoo.execute("CHF", "PLN")))) + +transform_transferwise = lambda d: Line('transferwise.{}{}_X'.format(d.curr_from, d.curr_to), {}, { + 'value': d.rate}) +s.add_job(scheduler.every(minutes=10, name='Transferwise-CHFEUR', + action=lambda: transform_transferwise(jobs.transferwise.job('CHF', 'EUR')))) +s.add_job(scheduler.every(minutes=10, name='Transferwise-EURCHF', + action=lambda: transform_transferwise(jobs.transferwise.job('EUR', 'CHF')))) + +transform_esg = lambda products: [Line('esg', {'sku': product.sku, 'product_name': product.name}, + {'price': product.price}) for product in products] +s.add_job(scheduler.at(minute="0", hour="8,10,12,14,16,18,20", name="ESG", + action=lambda: transform_esg(jobs.esg.execute()))) + +s.add_job(scheduler.every(hours=2, name="Wettermichel.de", action= +lambda: [Line('wettermichel.{}'.format(name), {}, {'value': value}) + for name, value in + jobs.davis_vantage.load('http://wettermichel.de/davis/con_davis.php').items()])) + + +def execute_prix_carburant(): + for station in jobs.prix_carburant.execute('1630001', '1210003', '1630003', '1210002', '1710001', + '67760001', '67240002', '67452001', + '68740001', # Fessenheim + '67500009', # Hagenau + '67116002'): # Reichstett + for fuelname, price in station.prices.items(): + tags = {'name': station.station_name, 'id': 'prix_carburant:{}'.format(station.id)} + fields = {'value': price} + if fuelname == "SP95": + yield Line('tankstelle.SP95-E5', tags, fields) + elif fuelname == "SP95-E10": + yield Line('tankstelle.SP95-E10', tags, fields) + elif fuelname == "Gazole": + yield Line('tankstelle.Diesel', tags, fields) + elif fuelname == "E85": + yield Line('tankstelle.E85', tags, fields) + + +s.add_job(scheduler.at(minute='10', hour='5-22', name="prix_carburant", + action=execute_prix_carburant)) + + +def transform_clever(tankstelle: jobs.clever_tanken.Tankstelle): + for fuelname, price in tankstelle.preise.items(): + tags = {'name': tankstelle.name, 'id': 'clever_tanken:{}'.format(tankstelle.id)} + fields = {'value': price} + if fuelname == "Super E5": + yield Line('tankstelle.SP95-E5', tags, fields) + elif fuelname == "Super E10": + yield Line('tankstelle.SP95-E10', tags, fields) + elif fuelname == "Diesel": + yield Line('tankstelle.Diesel', tags, fields) + + +s.add_job(scheduler.at(minute='*/15', hour='5-24', name='Clever-Tanken', action= +lambda: [line for station in map(transform_clever, map(jobs.clever_tanken.execute, [ + '20219', '11985', '17004', + '19715', # Kaiserst. Mineralölvertrieb Schwärzle + '54296', # ESSO Endingen + '10355', # ARAL Tiengen + '20144', # bft Rankackerweg + '27534', # EXTROL Freiburg + '55690', # Rheinmünster + '15220', # Esso Achern + '5853', # JET Rastatt + '24048', # Bodersweier + '3819', # JET Freiburg +])) for line in station])) + + +def transform_tankerkoenig(job): + api_key = job.properties['api_key'] + for data in jobs.tankerkoenig.execute(api_key, 48.651822, 7.927891, 15.0): + yield Line("tankerkoenig.{}".format(data.type), {'name': data.name, 'id': data.id}, {'value': data.price}) + + +s.add_job(scheduler.at(minute='*/10', hour='5-24', name='Tankerkönig', + action=transform_tankerkoenig)) + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument('--influx-url', nargs=1, default=None) + parser.add_argument('--tankerkoenig', nargs=1, default='00000000-0000-0000-0000-000000000002') + args = parser.parse_args() + + tanker: scheduler.Job = s.get_job_by_name('Tankerkönig') + tanker.properties['api_key'] = args.tankerkoenig[0] + + if args.influx_url is not None: + s.add_processor(scheduler.influxdb.Inserter(args.influx_url[0])) + else: + s.add_processor(scheduler.influxdb.Dumper()) + s.start(True) |