summaryrefslogtreecommitdiff
path: root/heap/datastore/store/timeseries.py
diff options
context:
space:
mode:
authorEbus-at-dockstar <ebus@dockstar>2013-03-25 10:24:28 +0100
committerEbus-at-dockstar <ebus@dockstar>2013-03-25 10:24:43 +0100
commit862282ce99760832d3e9e5b4b1171b861105e004 (patch)
tree0e229418e391917b79d42a8bdee46fb7a8612895 /heap/datastore/store/timeseries.py
parent9522cdffa94f278eb5e1894600535986e22c2890 (diff)
downloadebus-alt-862282ce99760832d3e9e5b4b1171b861105e004.tar.gz
ebus-alt-862282ce99760832d3e9e5b4b1171b861105e004.zip
move old stuff away
Diffstat (limited to 'heap/datastore/store/timeseries.py')
-rw-r--r--heap/datastore/store/timeseries.py264
1 files changed, 264 insertions, 0 deletions
diff --git a/heap/datastore/store/timeseries.py b/heap/datastore/store/timeseries.py
new file mode 100644
index 0000000..8e30e4e
--- /dev/null
+++ b/heap/datastore/store/timeseries.py
@@ -0,0 +1,264 @@
+#!/usr/bin/env python
+from collections import namedtuple
+import struct
+import time
+import glob
+import os
+
+import logging
+
+class Constants:
+ DB_MAX_NAME_LENGTH = 10
+ DB_WRITEBUF_LENGTH = 10
+ DBDAY_FORMAT_DAY = "%d%m%Y"
+ DB_OPEN_FILES = 5
+
+ INT_MILLI = 1
+ INT_SEC = 1000
+ INT_MIN = 60 * INT_SEC
+ INT_HOUR = 60 * INT_MIN
+ INT_DAY = 24 * INT_HOUR
+ INT_WEEK = 7 * INT_DAY
+ INT_MONTH = 31 * INT_DAY
+ INT_YEAR = 365 * INT_DAY
+
+C = Constants
+
+def time_ms():
+ return int(time.time() * 1000)
+
+class DB(object):
+ def __init__(self, interval, fileinterval, directory, name):
+ assert len(name) <= Constants.DB_MAX_NAME_LENGTH, \
+ "name must be smaller than {0} (is {1})".format(Constants.DB_MAX_NAME_LENGTH, len(name))
+ self.interval = interval
+ self.fileinterval = fileinterval
+ self.directory = directory
+ self.name = name
+ self.isize = self.item_size() # item_size
+ self._files = {}
+ self._write_buf = {}
+ self.last_access = []
+ for path in glob.glob(self.directory + os.path.sep + name + "*"):
+ dbday = DBSegment.fopen(path, self)
+ self._files[dbday.filekey] = dbday
+
+ def __getitem__(self, key):
+ return self.get(key)
+
+ def get(self, key):
+ raise NotImplementedError()
+
+ def _get(self, key):
+ if not isinstance(key, int):
+ raise TypeError("DB keys must be millisecond timestamps (int)")
+
+ logging.debug("get -> key %s fileinterval %s interval %s", key, self.fileinterval, self.interval)
+ filekey = self._calc_segment_key(key)
+ key = self._calc_key(key)
+ logging.debug("get => filekey %s key %s", filekey, key )
+
+ if (filekey, key) in self._write_buf.keys():
+ logging.debug("Write-Cache hit %s", key)
+ return self._write_buf[(filekey, key)]
+
+ db_segment = self.get_segment(filekey)
+ return db_segment[key]
+
+ def __setitem__(self, key, value):
+ return self.set(key, value)
+
+ def set(self, key, value):
+ raise NotImplementedError()
+
+ def _set(self, key, value):
+ if not isinstance(key, int):
+ raise TypeError("DB keys must be millisecond timestamps (int)")
+
+ logging.debug("get -> key %s fileinterval %s interval %s", key, self.fileinterval, self.interval)
+ filekey = self._calc_segment_key(key)
+ key = self._calc_key(key)
+ logging.debug("get => filekey %s key %s", filekey, key )
+
+ self._write_buf[(filekey, key)] = value
+ if len(self._write_buf) > Constants.DB_WRITEBUF_LENGTH:
+ self._flush_buf()
+
+ return value
+
+ def get_segment(self, filekey):
+ if filekey in self._files.keys():
+ return self._files[filekey]
+ else:
+ header = DBSegment.header( filekey=filekey,
+ isize=self.isize, interval=self.interval,
+ fileinterval=self.fileinterval, dbname=self.name)
+ path = self.directory + os.path.sep + self.name + str(filekey)
+ db_segment = DBSegment(self, path, header)
+ logging.info("New File %s", db_segment.path)
+ self._files[filekey] = db_segment
+ return db_segment
+
+
+
+ def _calc_key(self, key):
+ return int( (key % self.fileinterval) / self.interval)
+
+ def _calc_segment_key(self, key):
+ return int( key / self.fileinterval )
+
+ def _flush_buf(self):
+ logging.debug("flush")
+ keys = self._write_buf.keys()
+ keys.sort()
+ for (filekey,key) in keys:
+ db_segment = self.get_segment(filekey)
+ db_segment[key] = self._write_buf[(filekey, key)]
+ self._write_buf.clear()
+
+ def close(self):
+ self._flush_buf()
+
+ def item_size(self):
+ """"""
+ raise NotImplementedError()
+
+class InvalidSegmentError(Exception):
+ def __init__(self, header, *args):
+ Exception.__init__(self, *args)
+ self.header = header
+
+class _LazyEval(object):
+ def __init__(self, func, *args, **kwargs):
+ self.func, self.args, self.kwargs = func, args, kwargs
+ def __str__(self):
+ return self.func(*self.args, **self.kwargs)
+
+def _to_hexstring(input_string):
+ def _do(string):
+ return " ".join(map(lambda c:"%02x"%ord(c), string)).upper()
+ return _LazyEval(_do, input_string)
+
+class DBSegment(object):
+ header = namedtuple('Header', 'filekey isize interval fileinterval dbname')
+ header_format = "iiii%dp"%Constants.DB_MAX_NAME_LENGTH
+ header_size = struct.calcsize( "iiii%dp"%Constants.DB_MAX_NAME_LENGTH )
+
+ @staticmethod
+ def fopen(path, db):
+ filekey = int( os.path.basename(path)[len(db.name):] )
+ with file(path, "rb") as fh:
+ header = DBSegment.header._make( struct.unpack( DBSegment.header_format, fh.read( DBSegment.header_size ) ) )
+ if not (header.filekey == filekey
+ and header.interval == db.interval
+ and header.fileinterval == db.fileinterval
+ and header.dbname == db.name):
+ raise InvalidSegmentError(header)
+ f = DBSegment(db, path, header)
+ return f
+
+ def __init__(self, db, path, header):
+ self.db = db
+ self.path = path
+ self.header = header
+
+ self.filekey = header.filekey
+ if not os.path.exists(self.path):
+ logging.info("Write header in new file %s", self.path)
+ self.fh = file(self.path, "wb")
+ self.fh.write( struct.pack( DBSegment.header_format, *list(header) ) )
+ end = ( DBSegment.header_size +
+ ( self.db.fileinterval / self.db.interval ) * self.db.isize)
+ self.fh.seek( end )
+ self.fh.write( "\0" )
+ self.fh.close()
+ self.fh = None
+ self.open()
+
+ def __setitem__(self, key, data):
+ logging.debug("Set %s = %s", key, _to_hexstring(data))
+ self._seek(key)
+ self.fh.write( data[:self.db.isize] )
+ self.lastpos = self.fh.tell()
+
+ def __getitem__(self, key):
+ self._seek(key)
+ data = self.fh.read( self.db.isize )
+ self.lastpos = self.fh.tell()
+ logging.debug("Get %s => %s", key, _to_hexstring(data))
+ return data
+
+ def _seek(self, key):
+ offset = DBSegment.header_size + \
+ key * self.db.isize
+
+ if self.lastpos == offset:
+ logging.debug("continue at %s", self.lastpos)
+ else:
+ logging.debug("seek to %s in %s instance %s (lastpos = %s)", offset, self.path, id(self), self.lastpos)
+ if not self.fh:
+ self.open()
+ self.fh.seek( offset )
+
+ def open(self):
+ if not self.fh:
+ logging.debug("Open %s",self.path)
+ self.fh = file(self.path, "r+b")
+ self.lastpos = self.fh.tell()
+
+ def close(self):
+ logging.info("Close %s instance %s", self.path, id(self))
+ if self.fh:
+ self.fh.close()
+ self.fh = None
+
+class DBInt(DB):
+ payload = namedtuple("payload", "x y z")
+ payload_format = "iii"
+ def get(self, key):
+ return DBInt.payload._make(
+ struct.unpack(
+ DBInt.payload_format, self._get(key) ) )
+
+ def set(self, key, value):
+ return self._set(key,
+ struct.pack(DBInt.payload_format, *list(value)))
+
+ def item_size(self):
+ return struct.calcsize( DBInt.payload_format )
+
+class DBSimpleInt(DB):
+ def get(self, key):
+ return struct.unpack("i", self._get(key))[0]
+
+ def set(self, key, value):
+ return self._set(key,
+ struct.pack("i", value))
+
+ def item_size(self):
+ return struct.calcsize( "i" )
+
+import random
+if __name__ == "__main__":
+ #logging.basicConfig(level=logging.DEBUG)
+ logging.basicConfig(level=logging.INFO)
+ #logging.basicConfig(level=logging.ERROR)
+
+ # Alle 5 sek.
+ start = time_ms()
+ stop = time_ms() + Constants.INT_DAY*2
+
+ db = DBSimple(1000 * Constants.INT_MILLI, Constants.INT_DAY, ".", "testdb")
+ rand = 123456
+ for ts in range(start, stop, 1000*Constants.INT_MILLI):
+ db[ts] = -ts % rand
+ assert db[ts] == -ts%rand, "%s != %s"%(db[ts], ts)
+
+ r = range(start, stop, 1000*Constants.INT_MILLI)
+ r.reverse()
+ for ts in r:
+ assert db[ts] == -ts%rand, "%s != %s"%(db[ts], -ts%rand)
+
+
+ db.close()
+