summaryrefslogtreecommitdiff
path: root/scheduler
diff options
context:
space:
mode:
authorYves Fischer <yvesf-git@xapek.org>2018-07-17 12:34:04 +0200
committerYves Fischer <yvesf-git@xapek.org>2018-07-17 12:49:49 +0200
commit69c220cda3d8c0a95327630f5752dad36cb82261 (patch)
treefad9fbe78cf717f4dd17b8e9d996ab9a54b7d3e2 /scheduler
downloaddatasources-69c220cda3d8c0a95327630f5752dad36cb82261.tar.gz
datasources-69c220cda3d8c0a95327630f5752dad36cb82261.zip
Squashed commit
Diffstat (limited to 'scheduler')
-rw-r--r--scheduler/__init__.py300
-rw-r--r--scheduler/influxdb.py68
-rw-r--r--scheduler/test_scheduler.py171
3 files changed, 539 insertions, 0 deletions
diff --git a/scheduler/__init__.py b/scheduler/__init__.py
new file mode 100644
index 0000000..3212678
--- /dev/null
+++ b/scheduler/__init__.py
@@ -0,0 +1,300 @@
+import calendar
+import datetime
+import inspect
+import logging
+import re
+import reprlib
+import sched
+import time
+import typing
+
+
+def time_ns() -> int:
+ """:return: the current time in nanoseconds"""
+ t = time.clock_gettime(time.CLOCK_REALTIME)
+ t_ns = int(t * 1000 * 1000 * 1000)
+ return t_ns
+
+
+def datetime_from_ns(ns: int) -> datetime.datetime:
+ """:return: nanoseconds converted to python datetime class"""
+ return datetime.datetime.fromtimestamp(ns / 1000 / 1000 / 1000)
+
+
+def timedelta_ns(**kwargs) -> int:
+ """:class:`datetime.timedelta` converted to nanoseconds"""
+ return int(datetime.timedelta(**kwargs).total_seconds() * 1000 * 1000 * 1000)
+
+
+def sleep_ns(t) -> None:
+ time.sleep(t / 1000 / 1000 / 1000)
+
+
+class Job(object):
+ """Base-Class for jobs that are scheduled in :class:`Scheduler`"""
+ def __init__(self, name: str, **kwargs) -> None:
+ self.name: str = name
+ self.properties = kwargs
+ self._execute_funcs: typing.List[typing.Callable[..., typing.Any]] = []
+
+ def next(self, start_ns: int, t_ns: int, t_max_ns: int) -> typing.Optional[int]:
+ """
+ :param start_ns: start of scheduler
+ :return: next run of this job after time 't' or None if no run can be calculated in t_max_ns time
+ """
+ raise NotImplementedError()
+
+ def add_action(self, func):
+ if func is not None:
+ sig = inspect.signature(func)
+ if not len(sig.parameters) in [0, 1, 2]:
+ raise Exception("Wrong number of parameters to action")
+ self._execute_funcs.append(func)
+ return self
+
+ def execute(self, scheduler) -> typing.Any:
+ for e in self._execute_funcs:
+ sig = inspect.signature(e)
+ if len(sig.parameters) == 0:
+ return e()
+ elif len(sig.parameters) == 1:
+ return e(self)
+ elif len(sig.parameters) == 2:
+ return e(scheduler, self)
+
+ def __repr__(self) -> str:
+ return "<{cls.__name__} name={name} {conf}>".format(cls=self.__class__, name=repr(self.name),
+ conf=self.__repr_config__())
+
+ def __repr_config__(self) -> str:
+ return " "
+
+
+def every(seconds: int = 0, minutes: int = 0, hours: int = 0, name='Unnamed-Job', action=None) -> Job:
+ """
+ Run a job in intervals.
+
+ :param seconds: add seconds to the interval. Defalut: 0
+ :param minutes: add minutes to the interval. Default: 0
+ :param hours: add hours to the interval. Default: 0
+ :param name: Name of the Job
+ :param action: a function to be executed, see :func:`Job.execute`
+ :return: The job to be added to :class:`Scheduler`
+ """
+ n = seconds * 1000 * 1000 * 1000 + \
+ minutes * 1000 * 1000 * 1000 * 60 + \
+ hours * 1000 * 1000 * 1000 * 60 * 60
+ j = PeriodicJob(name, n)
+ j.add_action(action)
+ return j
+
+
+class PeriodicJob(Job):
+ def __init__(self, name: str, interval: float, **kwargs) -> None:
+ super().__init__(name, **kwargs)
+ self.interval: float = interval
+
+ def next(self, start_ns: int, t_ns: int, t_max_ns: int) -> typing.Optional[int]:
+ t_since_start = t_ns - start_ns
+ tn = t_since_start % self.interval
+ t_next_ns = int(t_ns + (self.interval - tn))
+ if t_next_ns < t_max_ns:
+ return t_next_ns
+ else:
+ return None
+
+ def __repr_config__(self) -> str:
+ return " interval=" + str(self.interval)
+
+
+_pattern_value = re.compile(r'^[0-9]+$')
+_pattern_range = re.compile(r'([0-9]+)-([0-9]+)$')
+_pattern_asterisk = re.compile(r'\*/([0-9]+)$')
+
+
+def make_test_expr(expr: str) -> typing.Callable[[int], bool]:
+ def parse(s: str):
+ if s == '*':
+ return lambda val: True
+ else:
+ m = _pattern_value.match(s)
+ if m is not None:
+ return lambda val: str(val) == s
+ m = _pattern_range.match(s)
+ if m is not None:
+ start, end = map(int, m.groups())
+ return lambda val: start <= val <= end
+ m = _pattern_asterisk.match(s)
+ if m is not None:
+ mod = int(m.groups()[0])
+ return lambda val: val % mod == 0
+ raise Exception("More complex cron expression is not supported")
+
+ exprs = expr.split(',')
+ if len(expr) == 1:
+ return parse(expr)
+ else:
+ funcs = map(parse, exprs)
+ return lambda val: any(map(lambda func: func(val), funcs))
+
+
+def _generator(timedelta_func, datetime_func, expr_func):
+ def f(start_ns: int, stop_ns: int):
+ dt = datetime_from_ns(start_ns)
+ while int(dt.timestamp() * 1000 * 1000 * 1000) < stop_ns:
+ _dt = datetime_func(dt)
+ delta = timedelta_func(_dt)
+ # print("{} - {}".format(_dt, delta))
+ t_ns = int(dt.timestamp() * 1000 * 1000 * 1000)
+ if t_ns >= start_ns and expr_func(_dt):
+ yield (int(dt.timestamp() * 1000 * 1000 * 1000),
+ int(((_dt + delta - datetime.timedelta(minutes=1)).timestamp() * 1000 * 1000 * 1000)))
+ dt = _dt + delta
+
+ return f
+
+
+def combine(start_ns: int, stop_ns: int, funcs):
+ f = funcs[0]
+ for (_start_ns, stop_ns) in f(start_ns, stop_ns):
+ # print("{}: {} {} -> {} {}".format(repr(f), _start_ns, datetime_from_ns(start_ns), stop_ns, datetime_from_ns(stop_ns)))
+ if (len(funcs)) > 1:
+ r = combine(_start_ns, stop_ns, funcs[1:])
+ if r is not None:
+ return r
+ elif _start_ns >= start_ns:
+ # print("Found {} ".format(start_ns))
+ return _start_ns
+
+
+class CronJob(Job):
+ def __init__(self, name: str, minute: str, hour: str, dow: str, dom: str, month: str, **kwargs) -> None:
+ super().__init__(name, **kwargs)
+ self.minute = minute
+ self.minute_f = _generator(lambda dt: datetime.timedelta(minutes=1),
+ lambda dt: datetime.datetime(year=dt.year, month=dt.month, day=dt.day,
+ hour=dt.hour, minute=dt.minute),
+ lambda dt: make_test_expr(minute)(dt.minute))
+ self.hour = hour
+ self.hour_f = _generator(lambda dt: datetime.timedelta(hours=1),
+ lambda dt: datetime.datetime(year=dt.year, month=dt.month, day=dt.day, hour=dt.hour),
+ lambda dt: make_test_expr(hour)(dt.hour))
+ self.dow = dow
+ self.dow_f = _generator(lambda dt: datetime.timedelta(days=1),
+ lambda dt: datetime.datetime(year=dt.year, month=dt.month, day=dt.day),
+ lambda dt: make_test_expr(dow)(dt.isoweekday()))
+ self.dom = dom
+ self.dom_f = _generator(lambda dt: datetime.timedelta(days=1),
+ lambda dt: datetime.datetime(year=dt.year, month=dt.month, day=dt.day),
+ lambda dt: make_test_expr(dom)(dt.day))
+ self.month = month
+ self.month_f = _generator(lambda dt: datetime.timedelta(days=calendar.monthrange(dt.year, dt.month)[1]),
+ lambda dt: datetime.datetime(year=dt.year, month=dt.month, day=1),
+ lambda dt: make_test_expr(month)(dt.month))
+
+ def next(self, start_ns: int, t_ns: int, t_max_ns: int):
+ stop_ns = t_ns + t_max_ns
+ t_ns = t_ns - t_ns % (1000 * 1000 * 1000) # round current timestamp down to minutes
+ n = combine(t_ns + timedelta_ns(minutes=1), stop_ns,
+ [self.month_f, self.dom_f, self.dow_f, self.hour_f, self.minute_f])
+ return n
+
+ def __repr_config__(self):
+ return " minute={_.minute} hour={_.hour} dow={_.dow} dom={_.dom} month={_.month}".format(_=self)
+
+
+def at(minute: str = '*', hour: str = '*', day_of_week: str = '*', day_of_month: str = '*', month: str = '*',
+ name='Unnamed-Job', action=None, **kwargs):
+ job = CronJob(name, minute, hour, day_of_week, day_of_month, month, **kwargs)
+ job.add_action(action)
+ return job
+
+
+def cron(cron_expr: str, name='Unnamed-Job', action=None, **kwargs):
+ groups = list(filter(lambda x: x != '', cron_expr.split(' ')))
+ if len(groups) != 5:
+ raise Exception("Invalid cron expression, failed to find minute-hour-dow-dom-month pattern")
+
+ minute, hour, day_of_month, month, day_of_week = groups
+ return at(minute, hour, day_of_week, day_of_month, month, name, action, **kwargs)
+
+
+# noinspection PyProtectedMember
+class Scheduler(object):
+ def __init__(self):
+ self._scheduler = sched.scheduler(timefunc=time_ns, delayfunc=sleep_ns)
+ self._jobs : typing.Dict[Job, typing.Optional[int, None]] = {}
+ self._processors : typing.List[typing.Callable[[Job, typing.Any], None]] = []
+ self._time_start_ns :int = time_ns()
+ self._lookahead_ns : int = 1000 * 1000 * 1000 * 60 * 120
+ self._repr = reprlib.Repr()
+
+ def remove_job_by_name(self, name : str):
+ with self._scheduler._lock:
+ remove = []
+ for job in filter(lambda j: j.name == name, self._jobs.keys()):
+ remove.append(job)
+ self._scheduler.cancel(self._jobs[job])
+
+ for job in remove:
+ del self._jobs[job]
+
+ def get_job_by_name(self, name : str) -> typing.Optional[Job]:
+ with self._scheduler._lock:
+ for job in filter(lambda j: j.name == name, self._jobs.keys()):
+ return job
+ return None
+
+ def add_job(self, job: Job):
+ with self._scheduler._lock:
+ if job.name in map(lambda j: j.name, self._jobs.keys()):
+ raise Exception("Job with name '{}' exists".format(job.name))
+ self._jobs[job] = None
+ self._schedule_job_run(job)
+ return self
+
+ def add_processor(self, processor : typing.Callable[[Job, typing.Any], None]) -> None:
+ with self._scheduler._lock:
+ logging.info("Add processor %s", processor)
+ self._processors.append(processor)
+
+ def remove_processor(self, processor : typing.Callable[[Job, typing.Any], None]) -> None:
+ with self._scheduler._lock:
+ self._processors.remove(processor)
+
+ def _process_func(self, job: Job):
+ def execute():
+ try:
+ logging.info("Execute job %s", job)
+ result = job.execute(self)
+ for p in self._processors:
+ value_repr = self._repr.repr(result)
+ logging.info("Execute result processor %s for job %s result: %s", p, job, value_repr)
+ try:
+ p(job, result)
+ except:
+ logging.exception("Execute result processor %s for job %s failed", p, job)
+ logging.info("Execution finished for job %s", job)
+ except:
+ logging.exception("Exception while job %s", job)
+ finally:
+ # re-schedule for next execution
+ self._schedule_job_run(job)
+
+ return execute
+
+ def _schedule_job_run(self, job):
+ now_ns = time_ns()
+ stop_ns = now_ns + self._lookahead_ns
+ next_ns = job.next(self._time_start_ns, now_ns, stop_ns)
+ if next_ns is not None:
+ logging.info("Schedule {} in {}ns / at {}".format(job, next_ns - now_ns, datetime_from_ns(next_ns)))
+ id = self._scheduler.enterabs(next_ns, 0, self._process_func(job))
+ else:
+ logging.info("No next schedule for job {}. Retry in 10min".format(job))
+ id = self._scheduler.enterabs(now_ns + (1000 * 1000 * 1000 * 10 * 60), 0, lambda: self._schedule_job_run(job))
+ self._jobs[job] = id
+
+ def start(self, blocking: bool = True):
+ logging.info("Start scheduler (blocking=%s)", blocking)
+ self._scheduler.run(blocking)
diff --git a/scheduler/influxdb.py b/scheduler/influxdb.py
new file mode 100644
index 0000000..ecfa6ea
--- /dev/null
+++ b/scheduler/influxdb.py
@@ -0,0 +1,68 @@
+import logging
+import reprlib
+import typing
+from urllib.request import urlopen
+
+import collections
+from pyinflux.client import Line
+
+from . import Job
+
+
+def _get_measurement_name(job: Job):
+ if 'measurement' in job.properties:
+ return job.properties['measurement']
+ else:
+ return job.name
+
+
+class Dumper:
+ def __init__(self):
+ self._repr = reprlib.Repr()
+
+ def __call__(self, *args, **kwargs) -> None:
+ if len(args) == 2 and isinstance(args[0], Job):
+ # assuming second is list of objects that str() to influx protocol lines
+ data = self._convert(args[0], args[1])
+ self._insert(data)
+ else:
+ raise Exception("Wrong arguments for InfluxDB inserter.")
+
+ def _insert(self, lines: typing.Iterable):
+ data = "\n".join(map(str, lines))
+ print("===== Would insert:\n" + data)
+
+ def _convert(self, job: Job, data) -> typing.Iterable[Line]:
+ def c(name, value):
+ if isinstance(value, Line):
+ return value
+ elif isinstance(value, int) or isinstance(value, str) or isinstance(value, float):
+ return Line(name, {}, {'value': value})
+ else:
+ raise Exception("Cannot simply insert value of type: {} for job {}".format(type(value), job))
+
+ measurement = _get_measurement_name(job)
+ if isinstance(data, collections.Iterable):
+ return map(lambda v: c(measurement, v), data)
+ else:
+ return [c(measurement, data)]
+
+
+class Inserter(Dumper):
+ def __init__(self, url: str) -> None:
+ super().__init__()
+ self._url: str = url
+
+ def _insert(self, lines: typing.Iterable):
+ try:
+ data = "\n".join(map(str, lines)).encode('utf-8')
+ try:
+ with urlopen(self._url, data) as fh:
+ logging.debug("InfluxDB successful answer: %s", self._repr.repr(fh.read().decode('utf-8')))
+ except Exception:
+ logging.exception("Failed insert of:\n%s", self._repr.repr(lines))
+ except Exception:
+ logging.exception("Failed formatting of:\n%s", self._repr.repr(lines))
+
+ def __repr__(self):
+ return f"<{self.__class__.__module__}.{self.__class__.__name__} url={repr(self._url)}>"
diff --git a/scheduler/test_scheduler.py b/scheduler/test_scheduler.py
new file mode 100644
index 0000000..ee041cf
--- /dev/null
+++ b/scheduler/test_scheduler.py
@@ -0,0 +1,171 @@
+import unittest
+
+from scheduler import *
+
+
+class TestPeriodicJob(unittest.TestCase):
+ def test1(self):
+ job = every(seconds=10)
+ next_run = job.next(0, 0, 20 * 1000 * 1000 * 1000)
+ self.assertEqual(next_run, 10 * 1000 * 1000 * 1000)
+
+ next_run = job.next(0, next_run, next_run + (20 * 1000 * 1000 * 1000))
+ self.assertEqual(next_run, 20 * 1000 * 1000 * 1000)
+
+ self.assertIsNone(job.next(0, next_run, 10))
+ self.assertIsNone(job.next(0, next_run, next_run + (10 * 1000 * 1000 * 1000)))
+ next_run = job.next(0, next_run, next_run + (10 * 1000 * 1000 * 1000) + 1)
+ self.assertEqual(next_run, 30 * 1000 * 1000 * 1000)
+
+
+class TestProcessor(unittest.TestCase):
+ def test1(self):
+ class P:
+ def __init__(self) -> None:
+ self.values = None
+
+ def __call__(self, *args, **kwargs) -> None:
+ self.values = args
+
+ s = Scheduler()
+ p = P()
+ s.add_processor(p)
+
+ job = every(seconds=10, action=lambda: 1234)
+ s._process_func(job)()
+
+ self.assertEqual(p.values, (job, 1234))
+
+
+class TestCronJob(unittest.TestCase):
+ def testas(self):
+ now = 1531173610000000000
+ next_run = at(minute='10').next(0, now, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=10))
+
+ def testAt2(self):
+ now = 1531173610000000000
+ next_run = at(minute="0", hour="8,10,12,14,16,18,20", name="Test").next(0, now, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=8, minute=0))
+
+ next_run += datetime.timedelta(minutes=1).total_seconds() * 1000 * 1000 * 1000
+ next_run = at(minute="0", hour="8,10,12,14,16,18,20", name="Test").next(0, next_run, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=10, minute=0))
+
+ def test3(self):
+ start = datetime.datetime(year=2018, month=7, day=10, hour=10, minute=0).timestamp() * 1000 * 1000 * 1000
+ next_run = start
+ runs = []
+ while next_run < start + timedelta_ns(days=60):
+ runs.append(next_run)
+ next_run = at(minute="0", hour="8,10,14", name="Test").next(0, next_run, next_run + timedelta_ns(days=2))
+
+ self.assertEqual(len(runs), 60 * 3)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=9, day=8, hour=10, minute=0, second=0))
+ self.assertTrue(
+ all(map(lambda t: t.hour in [8, 10, 14] and t.minute == 0 and t.second == 0, map(datetime_from_ns, runs))))
+
+
+ def testas3(self):
+ now = 1531173610000000000
+ next_run = cron('10,20,30 * * * *').next(0, now, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=10))
+
+ now = next_run
+ next_run = cron('10,20,30 * * * *').next(0, now, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=20))
+
+ now = next_run
+ next_run = cron('10,20,30 * * * *').next(0, now, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=30))
+
+ now = next_run
+ next_run = cron('10,20,30 * * * *').next(0, now, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=1, minute=10))
+
+
+ def test4(self):
+ expr = '10-15 * * * *'
+ now = 1531173610000000000
+ next_run = cron(expr).next(0, now, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=10))
+ next_run = cron(expr).next(0, next_run, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=11))
+ next_run = cron(expr).next(0, next_run, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=12))
+ next_run = cron(expr).next(0, next_run, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=13))
+ next_run = cron(expr).next(0, next_run, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=14))
+ next_run = cron(expr).next(0, next_run, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=15))
+ next_run = cron(expr).next(0, next_run, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=1, minute=10))
+
+
+ def test5(self):
+ expr = '10 */20 * * *'
+ now = int(datetime.datetime(year=2018, month=7, day=10, hour=0, minute=0,
+ second=0).timestamp() * 1000 * 1000 * 1000)
+ next_run = cron(expr).next(0, now, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=0, minute=10))
+ next_run = cron(expr).next(0, next_run, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=10, hour=20, minute=10))
+ next_run = cron(expr).next(0, next_run, 999000000000000000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=11, hour=0, minute=10))
+
+ def test6(self):
+ job = at(minute='*/15', hour='5-24', name='Clever-Tanken', action=None)
+ next_run = int(datetime.datetime(year=2018, month=7, day=17, hour=11, minute=51,
+ second=10).timestamp() * 1000 * 1000 * 1000)
+
+ next_run = job.next(0, next_run, next_run + 60 * 60 * 1000 * 1000 * 1000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=17, hour=12, minute=0))
+
+ next_run = job.next(0, next_run, next_run + 60 * 60 * 1000 * 1000 * 1000)
+ self.assertEqual(datetime_from_ns(next_run),
+ datetime.datetime(year=2018, month=7, day=17, hour=12, minute=15))
+
+
+ def test6(self):
+ job = at(minute="0", hour="8,10,12,14,16,18,20", name="ESG", action=None)
+ next_run = time_ns() #now
+ hours = [8,10,12,14,16,18,20]
+
+ while hours != []:
+ next_run = job.next(0, next_run, next_run + 60 * 60 * 1000 * 1000 * 1000)
+ next_run_dt = datetime_from_ns(next_run)
+ self.assertTrue(next_run_dt.hour in hours)
+ self.assertTrue(next_run_dt.minute == 0)
+ hours.remove(next_run_dt.hour)
+
+class TestScheduler(unittest.TestCase):
+ def setUp(self):
+ self.scheduler = Scheduler()
+
+ def test(self):
+ self.scheduler.add_job(at(minute='0', name='Test'))
+ try:
+ self.scheduler.add_job(at(minute='0', name='Test'))
+ self.assertFalse(True, 'must not happen')
+ except:
+ pass