summaryrefslogtreecommitdiff
path: root/schall/lib
diff options
context:
space:
mode:
Diffstat (limited to 'schall/lib')
-rwxr-xr-xschall/lib/timeseries/runtests.sh2
-rw-r--r--schall/lib/timeseries/tests/__init__.py0
-rw-r--r--schall/lib/timeseries/tests/testDBSimple.py50
-rw-r--r--schall/lib/timeseries/timeseries.py264
4 files changed, 316 insertions, 0 deletions
diff --git a/schall/lib/timeseries/runtests.sh b/schall/lib/timeseries/runtests.sh
new file mode 100755
index 0000000..5d8f2e6
--- /dev/null
+++ b/schall/lib/timeseries/runtests.sh
@@ -0,0 +1,2 @@
+#!/bin/sh
+python -munittest discover -s tests $*
diff --git a/schall/lib/timeseries/tests/__init__.py b/schall/lib/timeseries/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/schall/lib/timeseries/tests/__init__.py
diff --git a/schall/lib/timeseries/tests/testDBSimple.py b/schall/lib/timeseries/tests/testDBSimple.py
new file mode 100644
index 0000000..b1f7630
--- /dev/null
+++ b/schall/lib/timeseries/tests/testDBSimple.py
@@ -0,0 +1,50 @@
+import os
+import random
+import tempfile
+import logging
+import unittest
+
+import timeseries
+
+
+class TestDB(unittest.TestCase):
+ def setUp(self):
+ logging.basicConfig(level=logging.INFO)
+ self.tempdir = tempfile.mkdtemp()
+ self.db = timeseries.DBSimpleInt(timeseries.C.INT_MIN,
+ timeseries.C.INT_DAY,
+ self.tempdir, "t-dbsimple")
+
+ def tearDown(self):
+ self.db.close()
+ for path in os.listdir(self.tempdir):
+ os.remove(os.path.join(self.tempdir, path))
+ os.rmdir(self.tempdir)
+
+class TestEmptyDB(TestDB):
+ def testEmptyDB(self):
+ start=0
+ end=timeseries.C.INT_DAY
+
+ self.assertEqual(self.db[start], 0)
+ self.assertEqual(len(os.listdir(self.tempdir)), 1)
+
+ self.assertEqual(self.db[end-1], 0)
+ self.assertEqual(len(os.listdir(self.tempdir)), 1)
+
+ self.assertEqual(self.db[end], 0)
+ self.assertEqual(len(os.listdir(self.tempdir)), 2)
+
+class TestRandomValues(TestDB):
+ def testRandomValues(self):
+ values={}
+ for i in range(0,100):
+ key = random.randint(0, timeseries.time_ms())
+ value = random.randint(-2147483648, 2147483647)
+ values[key] = value
+ self.db[key] = value
+
+ keys = values.keys()
+ random.shuffle(keys)
+ for key in keys:
+ self.assertEqual( self.db[key], values[key] )
diff --git a/schall/lib/timeseries/timeseries.py b/schall/lib/timeseries/timeseries.py
new file mode 100644
index 0000000..fbf0f34
--- /dev/null
+++ b/schall/lib/timeseries/timeseries.py
@@ -0,0 +1,264 @@
+#!/usr/bin/env python
+from collections import namedtuple
+import struct
+import time
+import glob
+import os
+
+import logging
+
+class Constants:
+ DB_MAX_NAME_LENGTH = 10
+ DB_WRITEBUF_LENGTH = 10
+ DBDAY_FORMAT_DAY = "%d%m%Y"
+ DB_OPEN_FILES = 5
+
+ INT_MILLI = 1
+ INT_SEC = 1000
+ INT_MIN = 60 * INT_SEC
+ INT_HOUR = 60 * INT_MIN
+ INT_DAY = 24 * INT_HOUR
+ INT_WEEK = 7 * INT_DAY
+ INT_MONTH = 31 * INT_DAY
+ INT_YEAR = 365 * INT_DAY
+
+C = Constants
+
+def time_ms():
+ return int(time.time() * 1000)
+
+class DB(object):
+ def __init__(self, interval, fileinterval, directory, name):
+ assert len(name) <= Constants.DB_MAX_NAME_LENGTH, \
+ "name must be smaller than {0} (is {1})".format(Constants.DB_MAX_NAME_LENGTH, len(name))
+ self.interval = interval
+ self.fileinterval = fileinterval
+ self.directory = directory
+ self.name = name
+ self.isize = self.item_size() # item_size
+ self._files = {}
+ self._write_buf = {}
+ self.last_access = []
+ for path in glob.glob(self.directory + os.path.sep + name + "*"):
+ dbday = DBSegment.fopen(path, self)
+ self._files[dbday.filekey] = dbday
+
+ def __getitem__(self, key):
+ return self.get(key)
+
+ def get(self, key):
+ raise NotImplementedError()
+
+ def _get(self, key):
+ if not isinstance(key, int) and not isinstance(key, long):
+ raise TypeError("DB keys must be millisecond timestamps (int)")
+
+ logging.debug("get -> key %s fileinterval %s interval %s", key, self.fileinterval, self.interval)
+ filekey = self._calc_segment_key(key)
+ key = self._calc_key(key)
+ logging.debug("get => filekey %s key %s", filekey, key )
+
+ if (filekey, key) in self._write_buf.keys():
+ logging.debug("Write-Cache hit %s", key)
+ return self._write_buf[(filekey, key)]
+
+ db_segment = self.get_segment(filekey)
+ return db_segment[key]
+
+ def __setitem__(self, key, value):
+ return self.set(key, value)
+
+ def set(self, key, value):
+ raise NotImplementedError()
+
+ def _set(self, key, value):
+ if not isinstance(key, int) and not isinstance(key, long):
+ raise TypeError("DB keys must be millisecond timestamps (int)")
+
+ logging.debug("get -> key %s fileinterval %s interval %s", key, self.fileinterval, self.interval)
+ filekey = self._calc_segment_key(key)
+ key = self._calc_key(key)
+ logging.debug("get => filekey %s key %s", filekey, key )
+
+ self._write_buf[(filekey, key)] = value
+ if len(self._write_buf) > Constants.DB_WRITEBUF_LENGTH:
+ self._flush_buf()
+
+ return value
+
+ def get_segment(self, filekey):
+ if filekey in self._files.keys():
+ return self._files[filekey]
+ else:
+ header = DBSegment.header( filekey=filekey,
+ isize=self.isize, interval=self.interval,
+ fileinterval=self.fileinterval, dbname=self.name)
+ path = self.directory + os.path.sep + self.name + str(filekey)
+ db_segment = DBSegment(self, path, header)
+ logging.info("New File %s", db_segment.path)
+ self._files[filekey] = db_segment
+ return db_segment
+
+
+
+ def _calc_key(self, key):
+ return int( (key % self.fileinterval) / self.interval)
+
+ def _calc_segment_key(self, key):
+ return int( key / self.fileinterval )
+
+ def _flush_buf(self):
+ logging.debug("flush")
+ keys = self._write_buf.keys()
+ keys.sort()
+ for (filekey,key) in keys:
+ db_segment = self.get_segment(filekey)
+ db_segment[key] = self._write_buf[(filekey, key)]
+ self._write_buf.clear()
+
+ def close(self):
+ self._flush_buf()
+
+ def item_size(self):
+ """"""
+ raise NotImplementedError()
+
+class InvalidSegmentError(Exception):
+ def __init__(self, header, *args):
+ Exception.__init__(self, *args)
+ self.header = header
+
+class _LazyEval(object):
+ def __init__(self, func, *args, **kwargs):
+ self.func, self.args, self.kwargs = func, args, kwargs
+ def __str__(self):
+ return self.func(*self.args, **self.kwargs)
+
+def _to_hexstring(input_string):
+ def _do(string):
+ return " ".join(map(lambda c:"%02x"%ord(c), string)).upper()
+ return _LazyEval(_do, input_string)
+
+class DBSegment(object):
+ header = namedtuple('Header', 'filekey isize interval fileinterval dbname')
+ header_format = "iiii%dp"%Constants.DB_MAX_NAME_LENGTH
+ header_size = struct.calcsize( "iiii%dp"%Constants.DB_MAX_NAME_LENGTH )
+
+ @staticmethod
+ def fopen(path, db):
+ filekey = int( os.path.basename(path)[len(db.name):] )
+ with file(path, "rb") as fh:
+ header = DBSegment.header._make( struct.unpack( DBSegment.header_format, fh.read( DBSegment.header_size ) ) )
+ if not (header.filekey == filekey
+ and header.interval == db.interval
+ and header.fileinterval == db.fileinterval
+ and header.dbname == db.name):
+ raise InvalidSegmentError(header)
+ f = DBSegment(db, path, header)
+ return f
+
+ def __init__(self, db, path, header):
+ self.db = db
+ self.path = path
+ self.header = header
+
+ self.filekey = header.filekey
+ if not os.path.exists(self.path):
+ logging.info("Write header in new file %s", self.path)
+ self.fh = file(self.path, "wb")
+ self.fh.write( struct.pack( DBSegment.header_format, *list(header) ) )
+ end = ( DBSegment.header_size +
+ ( self.db.fileinterval / self.db.interval ) * self.db.isize)
+ self.fh.seek( end )
+ self.fh.write( "\0" )
+ self.fh.close()
+ self.fh = None
+ self.open()
+
+ def __setitem__(self, key, data):
+ logging.debug("Set %s = %s", key, _to_hexstring(data))
+ self._seek(key)
+ self.fh.write( data[:self.db.isize] )
+ self.lastpos = self.fh.tell()
+
+ def __getitem__(self, key):
+ self._seek(key)
+ data = self.fh.read( self.db.isize )
+ self.lastpos = self.fh.tell()
+ logging.debug("Get %s => %s", key, _to_hexstring(data))
+ return data
+
+ def _seek(self, key):
+ offset = DBSegment.header_size + \
+ key * self.db.isize
+
+ if self.lastpos == offset:
+ logging.debug("continue at %s", self.lastpos)
+ else:
+ logging.debug("seek to %s in %s instance %s (lastpos = %s)", offset, self.path, id(self), self.lastpos)
+ if not self.fh:
+ self.open()
+ self.fh.seek( offset )
+
+ def open(self):
+ if not self.fh:
+ logging.debug("Open %s",self.path)
+ self.fh = file(self.path, "r+b")
+ self.lastpos = self.fh.tell()
+
+ def close(self):
+ logging.info("Close %s instance %s", self.path, id(self))
+ if self.fh:
+ self.fh.close()
+ self.fh = None
+
+class DBInt(DB):
+ payload = namedtuple("payload", "x y z")
+ payload_format = "iii"
+ def get(self, key):
+ return DBInt.payload._make(
+ struct.unpack(
+ DBInt.payload_format, self._get(key) ) )
+
+ def set(self, key, value):
+ return self._set(key,
+ struct.pack(DBInt.payload_format, *list(value)))
+
+ def item_size(self):
+ return struct.calcsize( DBInt.payload_format )
+
+class DBSimpleInt(DB):
+ def get(self, key):
+ return struct.unpack("i", self._get(key))[0]
+
+ def set(self, key, value):
+ return self._set(key,
+ struct.pack("i", value))
+
+ def item_size(self):
+ return struct.calcsize( "i" )
+
+import random
+if __name__ == "__main__":
+ #logging.basicConfig(level=logging.DEBUG)
+ logging.basicConfig(level=logging.INFO)
+ #logging.basicConfig(level=logging.ERROR)
+
+ # Alle 5 sek.
+ start = time_ms()
+ stop = time_ms() + Constants.INT_DAY*2
+
+ db = DBSimple(1000 * Constants.INT_MILLI, Constants.INT_DAY, ".", "testdb")
+ rand = 123456
+ for ts in range(start, stop, 1000*Constants.INT_MILLI):
+ db[ts] = -ts % rand
+ assert db[ts] == -ts%rand, "%s != %s"%(db[ts], ts)
+
+ r = range(start, stop, 1000*Constants.INT_MILLI)
+ r.reverse()
+ for ts in r:
+ assert db[ts] == -ts%rand, "%s != %s"%(db[ts], -ts%rand)
+
+
+ db.close()
+