summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore5
-rw-r--r--README.md34
-rw-r--r--jobs/__init__.py1
-rwxr-xr-xjobs/clever_tanken.py114
-rwxr-xr-xjobs/davis_vantage.py21
-rwxr-xr-xjobs/esg.py88
-rwxr-xr-xjobs/hplq1300n.py26
-rwxr-xr-xjobs/prix_carburant.py107
-rwxr-xr-xjobs/swr_wetter.py50
-rwxr-xr-xjobs/sys_network.py15
-rwxr-xr-xjobs/sys_network_rate.py22
-rwxr-xr-xjobs/tankerkoenig.py36
-rwxr-xr-xjobs/telexoo.py55
-rwxr-xr-xjobs/transferwise.py55
-rw-r--r--scheduler/__init__.py300
-rw-r--r--scheduler/influxdb.py68
-rw-r--r--scheduler/test_scheduler.py171
-rwxr-xr-xtab_main.py133
18 files changed, 1301 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..2dba87a
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,5 @@
+__pycache__
+*.pyc
+error.log
+/.idea/
+/.mypy_cache/ \ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..66ec733
--- /dev/null
+++ b/README.md
@@ -0,0 +1,34 @@
+# Währungen
+
+* Kurzbezeichung
+ * EURCHF
+ * EURCHF=X
+ * 1 EUR = x CHF
+* meint: Agentur gibt x CHF für 1 EUR.
+ * Yahoo: EURCHF=X - Bid = Geldkurs = NOTEN, Ask = Briefkurs = DEVISEN
+ * Bid = Verlange = Kaufangebot, Ask = Biete = Verkaufsangebot
+
+# Usage
+
+Before executing any code: Check usage policy on fetched site. Acquire API-Keys where necessary.
+
+## Unittests Scheduler
+
+```python
+python3 -munittest
+```
+
+## Testing jobs
+
+```python
+python3 -mjobs.esg
+```
+
+# Requirements
+
+```
+# (for influxdb) pyinflux (github.com/yvesf/pyinflux, without [parser]), not on pypi
+currencies==2014.7.13
+funcparserlib==0.3.6
+lxml
+``` \ No newline at end of file
diff --git a/jobs/__init__.py b/jobs/__init__.py
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/jobs/__init__.py
@@ -0,0 +1 @@
+
diff --git a/jobs/clever_tanken.py b/jobs/clever_tanken.py
new file mode 100755
index 0000000..8418b11
--- /dev/null
+++ b/jobs/clever_tanken.py
@@ -0,0 +1,114 @@
+import codecs
+import logging
+from enum import Enum
+from html.parser import HTMLParser
+from urllib import request
+
+State = Enum('State', 'fuel_name fuel_price station_name idle')
+
+
+class Tankstelle:
+ def __init__(self):
+ self.name = ""
+ self.preise = {}
+ self.id = None
+
+ def __repr__(self):
+ return "{}: {} {}".format(type(self).__name__, self.name, self.preise)
+
+
+class Parser(HTMLParser):
+ def error(self, message):
+ logging.error("Parser error: %s", message)
+
+ def __init__(self):
+ super().__init__()
+ self.tankstelle = Tankstelle()
+ self._current_fuel_name = None
+ self._state = State.idle
+
+ def get_prix(self):
+ for key, value in self.tankstelle.preise.items():
+ self.tankstelle.preise[key] = float(value)
+ return self.tankstelle
+
+ def handle_starttag(self, tag, attrs):
+ attrs = dict(attrs)
+ if self._state == State.idle:
+ if tag == "div" and attrs.get('class') == 'fuel-price-type':
+ self._state = State.fuel_name
+ self._current_fuel_name = ""
+ if tag == "span" and (attrs.get('id') == "main-content-fuel-station-header-name"
+ or attrs.get('itemprop') == "http://schema.org/addressCountry"):
+ self._state = State.station_name
+ elif self._current_fuel_name is not None and tag == "span" and attrs.get('ng-bind') == "display_preis":
+ self._state = State.fuel_price
+
+ def handle_endtag(self, tag):
+ if self._state == State.fuel_name and tag in ('span', 'div'):
+ self._state = State.idle
+ elif self._state == State.station_name and tag in ('span'):
+ self._state = State.idle
+ elif self._state == State.fuel_price and tag == 'span':
+ self._state = State.idle
+ preis = self.tankstelle.preise[self._current_fuel_name].strip()
+ if preis == "":
+ del self.tankstelle.preise[self._current_fuel_name]
+ else:
+ self.tankstelle.preise[self._current_fuel_name] = float(preis)
+ self._current_fuel_name = None
+
+ def handle_data(self, data: str):
+ if self._state == State.fuel_name:
+ self._current_fuel_name += data.strip().replace(':', '')
+ self.tankstelle.preise[self._current_fuel_name] = ""
+ elif self._state == State.fuel_price:
+ self.tankstelle.preise[self._current_fuel_name] += data
+ elif self._state == State.station_name:
+ if len(data.strip()) > 0:
+ if len(self.tankstelle.name) > 0:
+ self.tankstelle.name += " "
+ self.tankstelle.name += data.strip()
+
+
+URL = "http://www.clever-tanken.de/tankstelle_details/"
+
+
+def execute(station_id: str):
+ parser = Parser()
+ r = request.Request(URL + station_id)
+ r.add_header('Host', 'www.clever-tanken.de')
+ r.add_header('User-Agent', 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:30.0) Gecko/20100101 Firefox/30.0')
+ try:
+ with request.urlopen(r) as f:
+ f2 = codecs.getreader('utf-8')(f)
+ f2.errors = 'ignore'
+ for line in f2.readlines():
+ parser.feed(line)
+
+ tankstelle = parser.tankstelle
+ tankstelle.id = station_id
+ return tankstelle
+ except Exception as e:
+ logging.error("Failed for station: %s", station_id)
+ raise e
+
+
+
+if __name__ == "__main__":
+ from pprint import pprint
+
+ pprint(list(map(execute, [
+ '20219', '11985', '17004',
+ '19715', # Kaiserst. Mineralölvertrieb Schwärzle
+ '54296', # ESSO Endingen
+ '10355', # ARAL Tiengen
+ '20144', # bft Rankackerweg
+ '27534', # EXTROL Freiburg
+ '55690', # Rheinmünster
+ '15220', # Esso Achern
+ '5853', # JET Rastatt
+ '24048', # Bodersweier
+ '27534',
+ '3819']) # JET Freiburg
+ ))
diff --git a/jobs/davis_vantage.py b/jobs/davis_vantage.py
new file mode 100755
index 0000000..7a46515
--- /dev/null
+++ b/jobs/davis_vantage.py
@@ -0,0 +1,21 @@
+import codecs
+import json
+from urllib.request import urlopen, Request
+
+
+def load(url: str):
+ request = Request(url)
+ request.add_header(
+ "User-Agent",
+ "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; FSL 7.0.6.01001)")
+ with urlopen(request) as f:
+ f2 = codecs.getreader('utf-8')(f)
+ data = json.load(f2)
+ data = {i: data[i] for i in data if type(data[i]) in (int, float)}
+ return data
+
+
+if __name__ == "__main__":
+ from pprint import pprint
+
+ pprint(list(load('http://wettermichel.de/davis/con_davis.php')))
diff --git a/jobs/esg.py b/jobs/esg.py
new file mode 100755
index 0000000..a2bb025
--- /dev/null
+++ b/jobs/esg.py
@@ -0,0 +1,88 @@
+import codecs
+import logging
+import urllib.parse
+import urllib.request
+from enum import Enum
+from html.parser import HTMLParser
+
+
+class Product:
+ def __init__(self):
+ self.price = ""
+ self.name = ""
+ self.sku = None
+
+ def __repr__(self):
+ return "<{} name={} price={} sku={}>".format(
+ self.__class__, self.name, self.price, self.sku)
+
+
+State = Enum('State', 'parsing product product_name price idle')
+
+
+class Parser(HTMLParser):
+ def error(self, message):
+ logging.error("Parser error: %s", message)
+
+ def __init__(self):
+ super().__init__()
+ self.products = []
+ self.current = None
+ self.state = State.idle
+
+ def handle_starttag(self, tag, attrs):
+ attrs = dict(attrs)
+ if self.state == State.idle and tag == "tr" and "data-sku" in attrs:
+ self.current = Product()
+ self.current.sku = attrs["data-sku"]
+ self.state = State.parsing
+ elif self.state == State.parsing and tag == 'h3' and \
+ "class" in attrs and attrs['class'] == 'product-name':
+ self.state = State.product_name
+ elif self.state == State.parsing and tag == 'span' and \
+ "class" in attrs and attrs['class'] == "price":
+ self.state = State.price
+
+ def handle_endtag(self, tag):
+ if self.state == State.product_name and tag == 'a':
+ self.state = State.parsing
+ elif self.state == State.price and tag == 'span':
+ self.state = State.parsing
+
+ if self.current and self.current.name and \
+ self.current.price and self.current.sku:
+ self.current.name = self.current.name.strip()
+ price = self.current.price
+ price = price.replace(".", "").replace(",", ".").split("\xa0")[0]
+ self.current.price = float(price)
+ self.products += [self.current]
+ self.current = None
+ self.state = State.idle
+
+ def handle_data(self, data):
+ if self.state == State.product_name:
+ self.current.name += data
+ if self.state == State.price:
+ self.current.price += data
+
+
+URL = "http://www.edelmetall-handel.de/quickbuy/twozero/"
+
+
+def execute():
+ """Always fetches full catalog"""
+ request = urllib.request.Request(URL)
+ with urllib.request.urlopen(request) as f:
+ # with open("index.html", 'rb') as f:
+ f2 = codecs.getreader('utf-8')(f)
+ f2.errors = 'ignore'
+ parser = Parser()
+ for line in f2.readlines():
+ parser.feed(line)
+ return parser.products
+
+
+if __name__ == "__main__":
+ from pprint import pprint
+
+ pprint(execute())
diff --git a/jobs/hplq1300n.py b/jobs/hplq1300n.py
new file mode 100755
index 0000000..b77a323
--- /dev/null
+++ b/jobs/hplq1300n.py
@@ -0,0 +1,26 @@
+import codecs
+import re
+import urllib.request
+from collections import namedtuple
+
+Data = namedtuple('Data', ['hostname', 'value'])
+
+URL = "http://{}/hp/device/info_suppliesStatus.html"
+
+
+def job(host: str) -> Data:
+ url = URL.format(host)
+ name = host.replace(".", "_")
+ request = urllib.request.Request(url)
+ with urllib.request.urlopen(request) as f:
+ f2 = codecs.getreader('utf-8')(f)
+ for line in f2.readlines():
+ m = re.match(".*>([0-9]*)%<br", line)
+ if m:
+ return Data(name, int(m.groups()[0]))
+
+
+if __name__ == "__main__":
+ from pprint import pprint
+
+ pprint(job("10.1.0.10"))
diff --git a/jobs/prix_carburant.py b/jobs/prix_carburant.py
new file mode 100755
index 0000000..21e6c26
--- /dev/null
+++ b/jobs/prix_carburant.py
@@ -0,0 +1,107 @@
+import codecs
+import logging
+import typing
+from enum import Enum
+from html.parser import HTMLParser
+from urllib import request
+
+
+class Station:
+ def __init__(self):
+ self.station_name = ""
+ self.prices = {}
+ self.id = None
+
+ def clean(self):
+ self.prices = filter(lambda kv: kv[1] != '', self.prices.items())
+ self.prices = dict(map(lambda kv: (kv[0], float(kv[1])), self.prices))
+
+ def __repr__(self):
+ return "Prix: {} {}".format(self.station_name, self.prices)
+
+
+State = Enum('State', 'pricelist fuel_name fuel_price station_name idle')
+
+
+class Parser(HTMLParser):
+ def error(self, message):
+ logging.error("Parser error: %s", message)
+
+ def __init__(self):
+ super().__init__()
+ self._prix = Station()
+ self._current_fuel_name = ""
+ self._state = State.idle
+
+ def get_prix(self):
+ self._prix.clean()
+ return self._prix
+
+ def handle_starttag(self, tag, attrs):
+ attrs = dict(attrs)
+ if self._state == State.idle and tag == "div" and "id" in attrs and attrs['id'] == 'prix':
+ self._state = State.pricelist
+ elif self._state in [State.pricelist, State.fuel_price] and tag == 'strong':
+ self._state = State.fuel_name
+ self._current_fuel_name = ''
+ elif self._state == State.idle and tag == 'div' and 'id' in attrs and attrs['id'] == 'colg':
+ self._state = State.station_name
+
+ def handle_endtag(self, tag):
+ if self._state == State.pricelist and tag == 'div':
+ self._state = State.idle
+ elif self._state == State.fuel_name and tag == 'strong':
+ self._state = State.fuel_price
+ elif self._state == State.fuel_price and tag == 'div':
+ self._state = State.idle
+ elif self._state == State.station_name and tag == 'p':
+ self._state = State.idle
+
+ def handle_data(self, data: str):
+ if self._state == State.fuel_name:
+ self._current_fuel_name += data.strip().replace(':', '')
+ self._prix.prices[self._current_fuel_name] = ""
+ elif self._state == State.fuel_price:
+ if data.strip() != "0.000":
+ self._prix.prices[self._current_fuel_name] += data.strip()
+ elif self._state == State.station_name:
+ if len(data.strip()) > 0:
+ self._prix.station_name += data.strip() + ". "
+
+
+URL = "http://www.prix-carburants.gouv.fr/map/recupererInfosPdv/"
+
+
+def _execute(station_id: str):
+ parser = Parser()
+ r = request.Request(URL + station_id)
+ r.add_header('Host', 'www.prix-carburants.gouv.fr')
+ r.add_header('Referer', 'http://www.prix-carburants.gouv.fr/recherche/map')
+ r.add_header('User-Agent', 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:30.0) Gecko/20100101 Firefox/30.0')
+ r.add_header('X-Requested-With', 'XMLHttpRequest')
+ r.add_header('X-Prototype-Version', '1.7')
+ r.add_header('Connection', 'close')
+ with request.urlopen(r) as f:
+ # with open("info.html", 'rb') as f:
+ f2 = codecs.getreader('utf-8')(f)
+ f2.errors = 'ignore'
+ for line in f2.readlines():
+ parser.feed(line)
+
+ try:
+ prix = parser.get_prix()
+ prix.id = station_id
+ return prix
+ except Exception as e:
+ raise Exception("Failed for station: {}".format(station_id), e)
+
+
+def execute(*ids) -> typing.Iterable[Station]:
+ for station_id in ids:
+ yield _execute(station_id)
+
+
+if __name__ == "__main__":
+ from pprint import pprint
+
+ pprint(list(execute('1630001', '67760001', '1210003', '1630003', '1210002', '1710001')))
diff --git a/jobs/swr_wetter.py b/jobs/swr_wetter.py
new file mode 100755
index 0000000..bb464c1
--- /dev/null
+++ b/jobs/swr_wetter.py
@@ -0,0 +1,50 @@
+import codecs
+import json
+import re
+import urllib.parse
+import urllib.request
+
+URL = "http://www.swr.de/-/id=5491998/cf=42/did=13968954/format=json/nid=5491998/17ag7cb/index.json"
+
+
+def job(cc):
+ """
+ cc: id of the region. See webpage: http://www.swr.de/wetter
+ """
+ params = urllib.parse.urlencode({'cc': cc})
+ request = urllib.request.Request(URL + "?" + params)
+ request.add_header(
+ "User-Agent",
+ "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; FSL 7.0.6.01001)")
+
+ def transformDict(name, value_dict):
+ for (key, value) in value_dict.items():
+ if key in ['timestamp', 'dayForecast']:
+ continue
+ if value == "k. A.":
+ continue
+ elif re.match("^-?[1-9]+[0-9]*$", value) or value == "0":
+ value = int(value)
+ elif re.match("^-?[1-9]+[0-9]*.?[0-9]*$", value):
+ value = float(value)
+ yield {'name' : "{}.{}.{}".format(basename, name, key), 'value': value}
+
+ with urllib.request.urlopen(request) as f:
+ f2 = codecs.getreader('utf-8')(f)
+ response = json.load(f2)
+ basename = "swr_wetter.{stateCode}.{regionCode}.{name}".format(
+ **response['availableLocations'][cc])
+
+ for d in transformDict("current", response['current'][cc]):
+ yield d
+
+ for (day, value) in response['forecast'].items():
+ value = value[cc]
+ for d in transformDict("forecast." + day, value):
+ yield d
+
+
+if __name__ == "__main__":
+ from pprint import pprint
+
+ pprint(list(job("DE0008834")))
diff --git a/jobs/sys_network.py b/jobs/sys_network.py
new file mode 100755
index 0000000..bbd6a0b
--- /dev/null
+++ b/jobs/sys_network.py
@@ -0,0 +1,15 @@
+import socket
+
+
+def job(device, stat_type):
+ f = open('/sys/class/net/' + device + '/statistics/' + stat_type, 'r')
+ value = f.read()
+ ivalue = int(value.replace("\n", ""))
+ f.close()
+ return {'hostname': socket.gethostname(), 'device': device, 'stat_type': stat_type, 'value': ivalue}
+
+
+if __name__ == "__main__":
+ from pprint import pprint
+
+ pprint(job("eth1", "rx_bytes"))
diff --git a/jobs/sys_network_rate.py b/jobs/sys_network_rate.py
new file mode 100755
index 0000000..3a5cc71
--- /dev/null
+++ b/jobs/sys_network_rate.py
@@ -0,0 +1,22 @@
+from collections import namedtuple
+
+Data = namedtuple('Data', ['hostname', 'device', 'entry', 'value'])
+temp = {}
+
+def job(device, entry, intervals):
+ global temp
+ f = open('/sys/class/net/' + device + '/statistics/' + entry, 'r')
+ value = f.read()
+ ivalue = int(value.replace("\n", ""))
+ f.close()
+
+ return_value = []
+ if entry in temp:
+ rate = (ivalue - temp[entry]) / intervals # bytes/s
+ if rate > 0:
+ # prevent counter overflows
+ return_value = [Data('laer.2.localnet.cc', device, entry, rate)]
+
+ temp[entry] = ivalue
+
+ return return_value
diff --git a/jobs/tankerkoenig.py b/jobs/tankerkoenig.py
new file mode 100755
index 0000000..e78853d
--- /dev/null
+++ b/jobs/tankerkoenig.py
@@ -0,0 +1,36 @@
+import codecs
+import json
+import logging
+import typing
+from collections import namedtuple
+from urllib import request
+
+Data = namedtuple('Data', ['name', 'id', 'type', 'price'])
+
+
+URL = "https://creativecommons.tankerkoenig.de/json/list.php?lat={lat}&lng={lng}&rad={rad}&sort=dist&type=all&apikey={api_key}"
+
+
+def execute(api_key : str, lat: float, lng : float, rad: float) -> typing.Iterable[Data]:
+ url = URL.format(api_key=api_key, rad=rad, lat=lat, lng=lng)
+ r = request.Request(url)
+ try:
+ with request.urlopen(r) as f:
+ f2 = codecs.getreader('utf-8')(f)
+ data = json.load(f2)
+ if not data['status'] == 'ok':
+ raise Exception("Error %s", data['message'])
+ for station in data['stations']:
+ name = "{} - {} - {}".format(station['place'], station['brand'], station['name'])
+ if not station['isOpen'] == True:
+ continue
+
+ if "diesel" in station:
+ yield Data(name, station['id'], 'Diesel', station['diesel'])
+ if "e5" in station:
+ yield Data(name, station['id'], 'SP95-E5', station['e5'])
+ if "e10" in station:
+ yield Data(name, station['id'], 'SP95-E10', station['e10'])
+ except Exception as e:
+ logging.error("Failed for: %f %f %f", lat, lng, rad)
+ raise e
diff --git a/jobs/telexoo.py b/jobs/telexoo.py
new file mode 100755
index 0000000..110ba72
--- /dev/null
+++ b/jobs/telexoo.py
@@ -0,0 +1,55 @@
+import codecs
+import json
+import random
+import re
+import urllib.parse
+import urllib.request
+from collections import namedtuple
+from decimal import Decimal
+
+from currencies.config import *
+
+URL = "https://telexoo.tegona.com/convert/"
+
+Quote = namedtuple('Quote', ['curr_from', 'curr_to', 'rate'])
+
+
+def execute(curr_from, curr_to):
+ MULT = random.randint(1000, 9999)
+ CURRENCY = {
+ MONEY_CURRENCY_EUR: "EUR",
+ MONEY_CURRENCY_CHF: "CHF",
+ MONEY_CURRENCY_USD: "USD",
+ MONEY_CURRENCY_GBP: "GBP",
+ MONEY_CURRENCY_PLN: "PLN"
+ }
+ curr_from = CURRENCY[curr_from]
+ curr_to = CURRENCY[curr_to]
+ params = urllib.parse.urlencode({
+ 's1': curr_from,
+ 's2': curr_to,
+ 'amount': str(MULT),
+ 'action': 'sell',
+ 'language': 'en',
+ 'verbose': '0',
+ })
+ request = urllib.request.Request(URL + "?" + params)
+ request.add_header(
+ "User-Agent",
+ "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; FSL 7.0.6.01001)")
+ with urllib.request.urlopen(request) as f:
+ f2 = codecs.getreader('utf-8')(f)
+ response = json.load(f2)
+ result_raw = response[0]['result'].replace(",", "")
+ match = re.match("^{} ([0-9\.]*)$".format(curr_to), result_raw)
+ if not match:
+ raise Exception("Invalid response in 'result' field")
+ result = Decimal(match.groups()[0]) / MULT
+ return Quote(curr_to, curr_to, float(result))
+
+
+if __name__ == "__main__":
+ from pprint import pprint
+
+ pprint(execute("CHF", "EUR"))
+ pprint(execute("CHF", "GBP"))
diff --git a/jobs/transferwise.py b/jobs/transferwise.py
new file mode 100755
index 0000000..a2e30d1
--- /dev/null
+++ b/jobs/transferwise.py
@@ -0,0 +1,55 @@
+import codecs
+import json
+import random
+import re
+import urllib.parse
+import urllib.request
+from collections import namedtuple
+
+APP_URL = "https://transferwise.com/fr/"
+URL = "https://transferwise.com/api/v1/payment/calculate"
+UA = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:30.0) Gecko/20100101 Firefox/30.0"
+
+Data = namedtuple('Data', ['curr_from', 'curr_to', 'rate'])
+
+def get_token():
+ request = urllib.request.Request(APP_URL)
+ request.add_header("User-Agent", UA)
+
+ with urllib.request.urlopen(request) as f:
+ f2 = codecs.getreader('utf-8')(f)
+ for line in f2.readlines():
+ m = re.match(".*config.appToken.*'(.+)'.*", line)
+ if m:
+ g = m.groups()
+ return g[0]
+
+
+def job(currFrom, currTo):
+ token = get_token()
+ MULT = random.randint(100, 100000)
+ data = urllib.parse.urlencode({
+ 'amount': str(MULT),
+ 'amountCurrency': 'source',
+ 'hasDiscount': 'false',
+ 'isFixedRate': 'false',
+ 'isGuaranteedFixedTarget': 'false',
+ 'sourceCurrency': currFrom,
+ 'targetCurrency': currTo,
+ })
+ # print (URL + "?" + data)
+ request = urllib.request.Request(URL + "?" + data)
+ request.add_header("X-Authorization-key", token)
+ request.add_header("X-Authorization-token", "")
+ request.add_header("User-Agent", UA)
+ with urllib.request.urlopen(request) as f:
+ f2 = codecs.getreader('utf-8')(f)
+ response = json.load(f2)
+ return Data(currFrom, currTo, float(response['transferwiseRate']))
+
+
+if __name__ == "__main__":
+ from pprint import pprint
+
+ pprint(job("CHF", "EUR"))
+ pprint(job("CHF", "GBP"))
diff --git a/scheduler/__init__.py b/scheduler/__init__.py
new file mode 100644
index 0000000..3212678
--- /dev/null
+++ b/scheduler/__init__.py
@@ -0,0 +1,300 @@
+import calendar
+import datetime
+import inspect
+import logging
+import re
+import reprlib
+import sched
+import time
+import typing
+
+
+def time_ns() -> int:
+ """:return: the current time in nanoseconds"""
+ t = time.clock_gettime(time.CLOCK_REALTIME)
+ t_ns = int(t * 1000 * 1000 * 1000)
+ return t_ns
+
+
+def datetime_from_ns(ns: int) -> datetime.datetime:
+ """:return: nanoseconds converted to python datetime class"""
+ return datetime.datetime.fromtimestamp(ns / 1000 / 1000 / 1000)
+
+
+def timedelta_ns(**kwargs) -> int:
+ """:class:`datetime.timedelta` converted to nanoseconds"""
+ return int(datetime.timedelta(**kwargs).total_seconds() * 1000 * 1000 * 1000)
+
+
+def sleep_ns(t) -> None:
+ time.sleep(t / 1000 / 1000 / 1000)
+
+
+class Job(object):
+ """Base-Class for jobs that are scheduled in :class:`Scheduler`"""
+ def __init__(self, name: str, **kwargs) -> None:
+ self.name: str = name
+ self.properties = kwargs
+ self._execute_funcs: typing.List[typing.Callable[..., typing.Any]] = []
+
+ def next(self, start_ns: int, t_ns: int, t_max_ns: int) -> typing.Optional[int]:
+ """
+ :param start_ns: start of scheduler
+ :return: next run of this job after time 't' or None if no run can be calculated in t_max_ns time
+ """
+ raise NotImplementedError()
+
+ def add_action(self, func):
+ if func is not None:
+ sig = inspect.signature(func)
+ if not len(sig.parameters) in [0, 1, 2]:
+ raise Exception("Wrong number of parameters to action")
+ self._execute_funcs.append(func)
+ return self
+
+ def execute(self, scheduler) -> typing.Any:
+ for e in self._execute_funcs:
+ sig = inspect.signature(e)
+ if len(sig.parameters) == 0:
+ return e()
+ elif len(sig.parameters) == 1:
+ return e(self)
+ elif len(sig.parameters) == 2:
+ return e(scheduler, self)
+
+ def __repr__(self) -> str:
+ return "<{cls.__name__} name={name} {conf}>".format(cls=self.__class__, name=repr(self.name),
+ conf=self.__repr_config__())
+
+ def __repr_config__(self) -> str:
+ return " "
+
+
+def every(seconds: int = 0, minutes: int = 0, hours: int = 0, name='Unnamed-Job', action=None) -> Job:
+ """
+ Run a job in intervals.
+
+ :param seconds: add seconds to the interval. Defalut: 0
+ :param minutes: add minutes to the interval. Default: 0
+ :param hours: add hours to the interval. Default: 0
+ :param name: Name of the Job
+ :param action: a function to be executed, see :func:`Job.execute`
+ :return: The job to be added to :class:`Scheduler`
+ """
+ n = seconds * 1000 * 1000 * 1000 + \
+ minutes * 1000 * 1000 * 1000 * 60 + \
+ hours * 1000 * 1000 * 1000 * 60 * 60
+ j = PeriodicJob(name, n)
+ j.add_action(action)
+ return j
+
+
+class PeriodicJob(Job):
+ def __init__(self, name: str, interval: float, **kwargs) -> None:
+ super().__init__(name, **kwargs)
+ self.interval: float = interval
+
+ def next(self, start_ns: int, t_ns: int, t_max_ns: int) -> typing.Optional[int]:
+ t_since_start = t_ns - start_ns
+ tn = t_since_start % self.interval
+ t_next_ns = int(t_ns + (self.interval - tn))
+ if t_next_ns < t_max_ns:
+ return t_next_ns
+ else:
+ return None
+
+ def __repr_config__(self) -> str:
+ return " interval=" + str(self.interval)
+
+
+_pattern_value = re.compile(r'^[0-9]+$')
+_pattern_range = re.compile(r'([0-9]+)-([0-9]+)$')
+_pattern_asterisk = re.compile(r'\*/([0-9]+)$')
+
+
+def make_test_expr(expr: str) -> typing.Callable[[int], bool]:
+ def parse(s: str):
+ if s == '*':
+ return lambda val: True
+ else:
+ m = _pattern_value.match(s)
+ if m is not None:
+ return lambda val: str(val) == s
+ m = _pattern_range.match(s)
+ if m is not None:
+ start, end = map(int, m.groups())
+ return lambda val: start <= val <= end
+ m = _pattern_asterisk.match(s)
+ if m is not None:
+ mod = int(m.groups()[0])
+ return lambda val: val % mod == 0
+ raise Exception("More complex cron expression is not supported")
+
+ exprs = expr.split(',')
+ if len(expr) == 1:
+ return parse(expr)
+ else:
+ funcs = map(parse, exprs)
+ return lambda val: any(map(lambda func: func(val), funcs))
+
+
+def _generator(timedelta_func, datetime_func, expr_func):
+ def f(start_ns: int, stop_ns: int):
+ dt = datetime_from_ns(start_ns)
+ while int(dt.timestamp() * 1000 * 1000 * 1000) < stop_ns:
+ _dt = datetime_func(dt)
+ delta = timedelta_func(_dt)
+ # print("{} - {}".format(_dt, delta))
+ t_ns = int(dt.timestamp() * 1000 * 1000 * 1000)
+ if t_ns >= start_ns and expr_func(_dt):
+ yield (int(dt.timestamp() * 1000 * 1000 * 1000),
+ int(((_dt + delta - datetime.timedelta(minutes=1)).timestamp() * 1000 * 1000 * 1000)))
+ dt = _dt + delta
+
+ return f
+
+
+def combine(start_ns: int, stop_ns: int, funcs):
+ f = funcs[0]
+ for (_start_ns, stop_ns) in f(start_ns, stop_ns):
+ # print("{}: {} {} -> {} {}".format(repr(f), _start_ns, datetime_from_ns(start_ns), stop_ns, datetime_from_ns(stop_ns)))
+ if (len(funcs)) > 1:
+ r = combine(_start_ns, stop_ns, funcs[1:])
+ if r is not None:
+ return r
+ elif _start_ns >= start_ns:
+ # print("Found {} ".format(start_ns))
+ return _start_ns
+
+
+class CronJob(Job):
+ def __init__(self, name: str, minute: str, hour: str, dow: str, dom: str, month: str, **kwargs) -> None:
+ super().__init__(name, **kwargs)
+ self.minute = minute
+ self.minute_f = _generator(lambda dt: datetime.timedelta(minutes=1),
+ lambda dt: datetime.datetime(year=dt.year, month=dt.month, day=dt.day,
+ hour=dt.hour, minute=dt.minute),
+ lambda dt: make_test_expr(minute)(dt.minute))
+ self.hour = hour
+ self.hour_f = _generator(lambda dt: datetime.timedelta(hours=1),
+ lambda dt: datetime.datetime(year=dt.year, month=dt.month, day=dt.day, hour=dt.hour),
+ lambda dt: make_test_expr(hour)(dt.hour))
+ self.dow = dow
+ self.dow_f = _generator(lambda dt: datetime.timedelta(days=1),
+ lambda dt: datetime.datetime(year=dt.year, month=dt.month, day=dt.day),
+ lambda dt: make_test_expr(dow)(dt.isoweekday()))
+ self.dom = dom
+ self.dom_f = _generator(lambda dt: datetime.timedelta(days=1),
+ lambda dt: datetime.datetime(year=dt.year, month=dt.month, day=dt.day),
+ lambda dt: make_test_expr(dom)(dt.day))
+ self.month = month
+ self.month_f = _generator(lambda dt: datetime.timedelta(days=calendar.monthrange(dt.year, dt.month)[1]),
+ lambda dt: datetime.datetime(year=dt.year, month=dt.month, day=1),
+ lambda dt: make_test_expr(month)(dt.month))
+
+ def next(self, start_ns: int, t_ns: int, t_max_ns: int):
+ stop_ns = t_ns + t_max_ns
+ t_ns = t_ns - t_ns % (1000 * 1000 * 1000) # round current timestamp down to minutes
+ n = combine(t_ns + timedelta_ns(minutes=1), stop_ns,
+ [self.month_f, self.dom_f, self.dow_f, self.hour_f, self.minute_f])
+ return n
+
+ def __repr_config__(self):
+ return " minute={_.minute} hour={_.hour} dow={_.dow} dom={_.dom} month={_.month}".format(_=self)
+
+
+def at(minute: str = '*', hour: str = '*', day_of_week: str = '*', day_of_month: str = '*', month: str = '*',
+ name='Unnamed-Job', action=None, **kwargs):
+ job = CronJob(name, minute, hour, day_of_week, day_of_month, month, **kwargs)
+ job.add_action(action)
+ return job
+
+
+def cron(cron_expr: str, name='Unnamed-Job', action=None, **kwargs):
+ groups = list(filter(lambda x: x != '', cron_expr.split(' ')))
+ if len(groups) != 5:
+ raise Exception("Invalid cron expression, failed to find minute-hour-dow-dom-month pattern")
+
+ minute, hour, day_of_month, month, day_of_week = groups
+ return at(minute, hour, day_of_week, day_of_month, month, name, action, **kwargs)
+
+
+# noinspection PyProtectedMember
+class Scheduler(object):
+ def __init__(self):
+ self._scheduler = sched.scheduler(timefunc=time_ns, delayfunc=sleep_ns)
+ self._jobs : typing.Dict[Job, typing.Optional[int, None]] = {}
+ self._processors : typing.List[typing.Callable[[Job, typing.Any], None]] = []
+ self._time_start_ns :int = time_ns()
+ self._lookahead_ns : int = 1000 * 1000 * 1000 * 60 * 120
+ self._repr = reprlib.Repr()
+
+ def remove_job_by_name(self, name : str):
+ with self._scheduler._lock:
+ remove = []
+ for job in filter(lambda j: j.name == name, self._jobs.keys()):
+ remove.append(job)
+ self._scheduler.cancel(self._jobs[job])
+
+ for job in remove:
+ del self._jobs[job]
+
+ def get_job_by_name(self, name : str) -> typing.Optional[Job]:
+ with self._scheduler._lock:
+ for job in filter(lambda j: j.name == name, self._jobs.keys()):
+ return job
+ return None
+
+ def add_job(self, job: Job):
+ with self._scheduler._lock:
+ if job.name in map(lambda j: j.name, self._jobs.keys()):
+ raise Exception("Job with name '{}' exists".format(job.name))
+ self._jobs[job] = None
+ self._schedule_job_run(job)
+ return self
+
+ def add_processor(self, processor : typing.Callable[[Job, typing.Any], None]) -> None:
+ with self._scheduler._lock:
+ logging.info("Add processor %s", processor)
+ self._processors.append(processor)
+
+ def remove_processor(self, processor : typing.Callable[[Job, typing.Any], None]) -> None:
+ with self._scheduler._lock:
+ self._processors.remove(processor)
+
+ def _process_func(self, job: Job):
+ def execute():
+ try:
+ logging.info("Execute job %s", job)
+ result = job.execute(self)
+ for p in self._processors:
+ value_repr = self._repr.repr(result)
+ logging.info("Execute result processor %s for job %s result: %s", p, job, value_repr)
+ try:
+ p(job, result)
+ except:
+ logging.exception("Execute result processor %s for job %s failed", p, job)
+ logging.info("Execution finished for job %s", job)
+ except:
+ logging.exception("Exception while job %s", job)
+ finally:
+ # re-schedule for next execution
+ self._schedule_job_run(job)
+
+ return execute
+
+ def _schedule_job_run(self, job):
+ now_ns = time_ns()
+ stop_ns = now_ns + self._lookahead_ns
+ next_ns = job.next(self._time_start_ns, now_ns, stop_ns)
+ if next_ns is not None:
+ logging.info("Schedule {} in {}ns / at {}".format(job, next_ns - now_ns, datetime_from_ns(next_ns)))
+ id = self._scheduler.enterabs(next_ns, 0, self._process_func(job))
+ else:
+ logging.info("No next schedule for job {}. Retry in 10min".format(job))
+ id = self._scheduler.enterabs(now_ns + (1000 * 1000 * 1000 * 10 * 60), 0, lambda: self._schedule_job_run(job))
+ self._jobs[job] = id
+
+ def start(self, blocking: bool = True):
+ logging.info("Start scheduler (blocking=%s)", blocking)
+ self._scheduler.run(blocking)
diff --git a/scheduler/influxdb.py b/scheduler/influxdb.py
new file mode 100644
index 0000000..ecfa6ea
--- /dev/null
+++ b/scheduler/influxdb.py
@@ -0,0 +1,68 @@
+import logging
+import reprlib
+import typing
+from urllib.request import urlopen
+
+import collections
+from pyinflux.client import Line
+
+from . import Job
+
+
+def _get_measurement_name(job: Job):
+ if 'measurement' in job.properties:
+ return job.properties['measurement']
+ else:
+ return job.name
+
+
+class Dumper:
+ def __init__(self):
+ self._repr = reprlib.Repr()
+
+ def __call__(self, *args, **kwargs) -> None:
+ if len(args) == 2 and isinstance(args[0], Job):
+ # assuming second is list of objects that str() to influx protocol lines
+ data = self._convert(args[0], args[1])
+ self._insert(data)
+ else:
+ raise Exception("Wrong arguments for InfluxDB inserter.")
+
+ def _insert(self, lines: typing.Iterable):
+ data = "\n".join(map(str, lines))
+ print("===== Would insert:\n" + data)
+
+ def _convert(self, job: Job, data) -> typing.Iterable[Line]:
+ def c(name, value):
+ if isinstance(value, Line):
+ return value
+ elif isinstance(value, int) or isinstance(value, str) or isinstance(value, float):
+ return Line(name, {}, {'value': value})
+ else:
+ raise Exception("Cannot simply insert value of type: {} for job {}".format(type(value), job))
+
+ measurement = _get_measurement_name(job)
+ if isinstance(data, collections.Iterable):
+ return map(lambda v: c(measurement, v), data)
+ else:
+ return [c(measurement, data)]
+
+
+class Inserter(Dumper):
+ def __init__(self, url: str) -> None:
+ super().__init__()
+ self._url: str = url
+
+ def _insert(self, lines: typing.Iterable):
+ try:
+ data = "\n".join(map(str, lines)).encode('utf-8')
+ try:
+ with urlopen(self._url, data) as fh:
+ logging.debug("InfluxDB successful answer: %s", self._repr.repr(fh.read().decode('utf-8')))
+ except Exception:
+ logging.exception("Failed insert of:\n%s", self._repr.repr(lines))
+ except Exception:
+ logging.exception("Failed formatting of:\n%s", self._repr.repr(lines))
+
+ def __repr__(self):
+ return f"<{self.__class__.__module__}.{self.__class__.__name__} url={repr(self._url)}>"
diff --git a/scheduler/test_scheduler.py b/scheduler/test_scheduler.py
new file mode 100644
index 0000000..ee041cf
--- /dev/null
+++ b/scheduler/test_scheduler.py
@@ -0,0 +1,171 @@
+import unittest
+
+from scheduler import *
+
+
+class TestPeriodicJob(unittest.TestCase):
+ def test1(self):
+ job = every(seconds=10)
+ next_run = job.next(0, 0, 20 * 1000 * 1000 * 1000)
+ self.assertEqual(next_run, 10 * 1000 * 1000 * 1000)
+
+ next_run = job.next(0, next_run, next_run + (20 * 1000 * 1000 * 1000))
+ self.assertEqual(next_run, 20 * 1000 * 1000 * 1000)
+
+ self.assertIsNone(job.next(0, next_run, 10))
+ self.assertIsNone(job.next(0, next_run, next_run + (10 * 1000 * 1000 * 1000)))
+ next_run = job.next(0, next_run, next_run + (10 * 1000 * 1000 * 1000) + 1)
+ self.assertEqual(next_run, 30 * 1000 * 1000 * 1000)
+
+
+class TestProcessor(unittest.TestCase):
+ def test1(self):
+ class P:
+ def __init__(self) -> None:
+ self.values = None
+
+ def __call__(self, *args, **kwargs) -> None:
+ self.values = args
+
+ s = Scheduler()
+ p = P()
+ s.add_processor(p)
+
+ job = every(seconds=10, action=lambda: 1234)
+ s._process_func(job)()
+
+ self.assertEqual(p.values, (job, 1234))
+
+
+class TestCronJob(unittest.TestCase):
+ def testas(self):
+ now = 1531173610000000000
+ next_run = at(minute='10').next(0, now, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=10))
+
+ def testAt2(self):
+ now = 1531173610000000000
+ next_run = at(minute="0", hour="8,10,12,14,16,18,20", name="Test").next(0, now, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=8, minute=0))
+
+ next_run += datetime.timedelta(minutes=1).total_seconds() * 1000 * 1000 * 1000
+ next_run = at(minute="0", hour="8,10,12,14,16,18,20", name="Test").next(0, next_run, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=10, minute=0))
+
+ def test3(self):
+ start = datetime.datetime(year=2018, month=7, day=10, hour=10, minute=0).timestamp() * 1000 * 1000 * 1000
+ next_run = start
+ runs = []
+ while next_run < start + timedelta_ns(days=60):
+ runs.append(next_run)
+ next_run = at(minute="0", hour="8,10,14", name="Test").next(0, next_run, next_run + timedelta_ns(days=2))
+
+ self.assertEqual(len(runs), 60 * 3)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=9, day=8, hour=10, minute=0, second=0))
+ self.assertTrue(
+ all(map(lambda t: t.hour in [8, 10, 14] and t.minute == 0 and t.second == 0, map(datetime_from_ns, runs))))
+
+
+ def testas3(self):
+ now = 1531173610000000000
+ next_run = cron('10,20,30 * * * *').next(0, now, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=10))
+
+ now = next_run
+ next_run = cron('10,20,30 * * * *').next(0, now, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=20))
+
+ now = next_run
+ next_run = cron('10,20,30 * * * *').next(0, now, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=30))
+
+ now = next_run
+ next_run = cron('10,20,30 * * * *').next(0, now, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=1, minute=10))
+
+
+ def test4(self):
+ expr = '10-15 * * * *'
+ now = 1531173610000000000
+ next_run = cron(expr).next(0, now, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=10))
+ next_run = cron(expr).next(0, next_run, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=11))
+ next_run = cron(expr).next(0, next_run, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=12))
+ next_run = cron(expr).next(0, next_run, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=13))
+ next_run = cron(expr).next(0, next_run, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=14))
+ next_run = cron(expr).next(0, next_run, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=15))
+ next_run = cron(expr).next(0, next_run, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=1, minute=10))
+
+
+ def test5(self):
+ expr = '10 */20 * * *'
+ now = int(datetime.datetime(year=2018, month=7, day=10, hour=0, minute=0,
+ second=0).timestamp() * 1000 * 1000 * 1000)
+ next_run = cron(expr).next(0, now, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=10))
+ next_run = cron(expr).next(0, next_run, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=20, minute=10))
+ next_run = cron(expr).next(0, next_run, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=11, hour=0, minute=10))
+
+ def test6(self):
+ job = at(minute='*/15', hour='5-24', name='Clever-Tanken', action=None)
+ next_run = int(datetime.datetime(year=2018, month=7, day=17, hour=11, minute=51,
+ second=10).timestamp() * 1000 * 1000 * 1000)
+
+ next_run = job.next(0, next_run, next_run + 60 * 60 * 1000 * 1000 * 1000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=17, hour=12, minute=0))
+
+ next_run = job.next(0, next_run, next_run + 60 * 60 * 1000 * 1000 * 1000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=17, hour=12, minute=15))
+
+
+ def test6(self):
+ job = at(minute="0", hour="8,10,12,14,16,18,20", name="ESG", action=None)
+ next_run = time_ns() #now
+ hours = [8,10,12,14,16,18,20]
+
+ while hours != []:
+ next_run = job.next(0, next_run, next_run + 60 * 60 * 1000 * 1000 * 1000)
+ next_run_dt = datetime_from_ns(next_run)
+ self.assertTrue(next_run_dt.hour in hours)
+ self.assertTrue(next_run_dt.minute == 0)
+ hours.remove(next_run_dt.hour)
+
+class TestScheduler(unittest.TestCase):
+ def setUp(self):
+ self.scheduler = Scheduler()
+
+ def test(self):
+ self.scheduler.add_job(at(minute='0', name='Test'))
+ try:
+ self.scheduler.add_job(at(minute='0', name='Test'))
+ self.assertFalse(True, 'must not happen')
+ except:
+ pass
diff --git a/tab_main.py b/tab_main.py
new file mode 100755
index 0000000..46ef699
--- /dev/null
+++ b/tab_main.py
@@ -0,0 +1,133 @@
+#!/usr/bin/env python3
+import logging
+
+from pyinflux.client import Line
+
+import jobs.clever_tanken
+import jobs.davis_vantage
+import jobs.esg
+import jobs.hplq1300n
+import jobs.prix_carburant
+import jobs.swr_wetter
+import jobs.tankerkoenig
+import jobs.telexoo
+import jobs.transferwise
+import scheduler.influxdb
+
+logging.basicConfig(level=logging.INFO)
+
+s = scheduler.Scheduler()
+
+transform_swr = lambda v: Line(v['name'], {}, {'value': v['value']})
+s.add_job(scheduler.at(minute='0,15,30,45', name='SWR Wetter',
+ action=lambda: map(transform_swr, jobs.swr_wetter.job('DE0008834'))))
+
+transform_laserjet = lambda v: Line('hplq1300n.toner.{}'.format(v.hostname), {}, {'value': v.value})
+s.add_job(scheduler.at(minute='*/5', name="Laserjet Status",
+ action=lambda: transform_laserjet(jobs.hplq1300n.job('10.1.0.10'))))
+
+transform_telexoo = lambda qoute: Line('telexoo.{}{}_X'.format(qoute.curr_from, qoute.curr_to), {},
+ {'value': qoute.rate})
+s.add_job(scheduler.every(minutes=10, name="Telexoo.com-CHFGBP",
+ action=lambda: transform_telexoo(jobs.telexoo.execute("CHF", "GBP"))))
+s.add_job(scheduler.every(minutes=10, name="Telexoo.com-CHFEUR",
+ action=lambda: transform_telexoo(jobs.telexoo.execute("CHF", "EUR"))))
+s.add_job(scheduler.every(minutes=10, name="Telexoo.com-EURCHF",
+ action=lambda: transform_telexoo(jobs.telexoo.execute("EUR", "CHF"))))
+s.add_job(scheduler.every(minutes=10, name="Telexoo.com-CHFPLN",
+ action=lambda: transform_telexoo(jobs.telexoo.execute("CHF", "PLN"))))
+
+transform_transferwise = lambda d: Line('transferwise.{}{}_X'.format(d.curr_from, d.curr_to), {}, {
+ 'value': d.rate})
+s.add_job(scheduler.every(minutes=10, name='Transferwise-CHFEUR',
+ action=lambda: transform_transferwise(jobs.transferwise.job('CHF', 'EUR'))))
+s.add_job(scheduler.every(minutes=10, name='Transferwise-EURCHF',
+ action=lambda: transform_transferwise(jobs.transferwise.job('EUR', 'CHF'))))
+
+transform_esg = lambda products: [Line('esg', {'sku': product.sku, 'product_name': product.name},
+ {'price': product.price}) for product in products]
+s.add_job(scheduler.at(minute="0", hour="8,10,12,14,16,18,20", name="ESG",
+ action=lambda: transform_esg(jobs.esg.execute())))
+
+s.add_job(scheduler.every(hours=2, name="Wettermichel.de", action=
+lambda: [Line('wettermichel.{}'.format(name), {}, {'value': value})
+ for name, value in
+ jobs.davis_vantage.load('http://wettermichel.de/davis/con_davis.php').items()]))
+
+
+def execute_prix_carburant():
+ for station in jobs.prix_carburant.execute('1630001', '1210003', '1630003', '1210002', '1710001',
+ '67760001', '67240002', '67452001',
+ '68740001', # Fessenheim
+ '67500009', # Hagenau
+ '67116002'): # Reichstett
+ for fuelname, price in station.prices.items():
+ tags = {'name': station.station_name, 'id': 'prix_carburant:{}'.format(station.id)}
+ fields = {'value': price}
+ if fuelname == "SP95":
+ yield Line('tankstelle.SP95-E5', tags, fields)
+ elif fuelname == "SP95-E10":
+ yield Line('tankstelle.SP95-E10', tags, fields)
+ elif fuelname == "Gazole":
+ yield Line('tankstelle.Diesel', tags, fields)
+ elif fuelname == "E85":
+ yield Line('tankstelle.E85', tags, fields)
+
+
+s.add_job(scheduler.at(minute='10', hour='5-22', name="prix_carburant",
+ action=execute_prix_carburant))
+
+
+def transform_clever(tankstelle: jobs.clever_tanken.Tankstelle):
+ for fuelname, price in tankstelle.preise.items():
+ tags = {'name': tankstelle.name, 'id': 'clever_tanken:{}'.format(tankstelle.id)}
+ fields = {'value': price}
+ if fuelname == "Super E5":
+ yield Line('tankstelle.SP95-E5', tags, fields)
+ elif fuelname == "Super E10":
+ yield Line('tankstelle.SP95-E10', tags, fields)
+ elif fuelname == "Diesel":
+ yield Line('tankstelle.Diesel', tags, fields)
+
+
+s.add_job(scheduler.at(minute='*/15', hour='5-24', name='Clever-Tanken', action=
+lambda: [line for station in map(transform_clever, map(jobs.clever_tanken.execute, [
+ '20219', '11985', '17004',
+ '19715', # Kaiserst. Mineralölvertrieb Schwärzle
+ '54296', # ESSO Endingen
+ '10355', # ARAL Tiengen
+ '20144', # bft Rankackerweg
+ '27534', # EXTROL Freiburg
+ '55690', # Rheinmünster
+ '15220', # Esso Achern
+ '5853', # JET Rastatt
+ '24048', # Bodersweier
+ '3819', # JET Freiburg
+])) for line in station]))
+
+
+def transform_tankerkoenig(job):
+ api_key = job.properties['api_key']
+ for data in jobs.tankerkoenig.execute(api_key, 48.651822, 7.927891, 15.0):
+ yield Line("tankerkoenig.{}".format(data.type), {'name': data.name, 'id': data.id}, {'value': data.price})
+
+
+s.add_job(scheduler.at(minute='*/10', hour='5-24', name='Tankerkönig',
+ action=transform_tankerkoenig))
+
+if __name__ == '__main__':
+ import argparse
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--influx-url', nargs=1, default=None)
+ parser.add_argument('--tankerkoenig', nargs=1, default='00000000-0000-0000-0000-000000000002')
+ args = parser.parse_args()
+
+ tanker: scheduler.Job = s.get_job_by_name('Tankerkönig')
+ tanker.properties['api_key'] = args.tankerkoenig[0]
+
+ if args.influx_url is not None:
+ s.add_processor(scheduler.influxdb.Inserter(args.influx_url[0]))
+ else:
+ s.add_processor(scheduler.influxdb.Dumper())
+ s.start(True)