summaryrefslogtreecommitdiff
path: root/datastore/store/timeseries.py
diff options
context:
space:
mode:
Diffstat (limited to 'datastore/store/timeseries.py')
-rw-r--r--datastore/store/timeseries.py264
1 files changed, 0 insertions, 264 deletions
diff --git a/datastore/store/timeseries.py b/datastore/store/timeseries.py
deleted file mode 100644
index 8e30e4e..0000000
--- a/datastore/store/timeseries.py
+++ /dev/null
@@ -1,264 +0,0 @@
-#!/usr/bin/env python
-from collections import namedtuple
-import struct
-import time
-import glob
-import os
-
-import logging
-
-class Constants:
- DB_MAX_NAME_LENGTH = 10
- DB_WRITEBUF_LENGTH = 10
- DBDAY_FORMAT_DAY = "%d%m%Y"
- DB_OPEN_FILES = 5
-
- INT_MILLI = 1
- INT_SEC = 1000
- INT_MIN = 60 * INT_SEC
- INT_HOUR = 60 * INT_MIN
- INT_DAY = 24 * INT_HOUR
- INT_WEEK = 7 * INT_DAY
- INT_MONTH = 31 * INT_DAY
- INT_YEAR = 365 * INT_DAY
-
-C = Constants
-
-def time_ms():
- return int(time.time() * 1000)
-
-class DB(object):
- def __init__(self, interval, fileinterval, directory, name):
- assert len(name) <= Constants.DB_MAX_NAME_LENGTH, \
- "name must be smaller than {0} (is {1})".format(Constants.DB_MAX_NAME_LENGTH, len(name))
- self.interval = interval
- self.fileinterval = fileinterval
- self.directory = directory
- self.name = name
- self.isize = self.item_size() # item_size
- self._files = {}
- self._write_buf = {}
- self.last_access = []
- for path in glob.glob(self.directory + os.path.sep + name + "*"):
- dbday = DBSegment.fopen(path, self)
- self._files[dbday.filekey] = dbday
-
- def __getitem__(self, key):
- return self.get(key)
-
- def get(self, key):
- raise NotImplementedError()
-
- def _get(self, key):
- if not isinstance(key, int):
- raise TypeError("DB keys must be millisecond timestamps (int)")
-
- logging.debug("get -> key %s fileinterval %s interval %s", key, self.fileinterval, self.interval)
- filekey = self._calc_segment_key(key)
- key = self._calc_key(key)
- logging.debug("get => filekey %s key %s", filekey, key )
-
- if (filekey, key) in self._write_buf.keys():
- logging.debug("Write-Cache hit %s", key)
- return self._write_buf[(filekey, key)]
-
- db_segment = self.get_segment(filekey)
- return db_segment[key]
-
- def __setitem__(self, key, value):
- return self.set(key, value)
-
- def set(self, key, value):
- raise NotImplementedError()
-
- def _set(self, key, value):
- if not isinstance(key, int):
- raise TypeError("DB keys must be millisecond timestamps (int)")
-
- logging.debug("get -> key %s fileinterval %s interval %s", key, self.fileinterval, self.interval)
- filekey = self._calc_segment_key(key)
- key = self._calc_key(key)
- logging.debug("get => filekey %s key %s", filekey, key )
-
- self._write_buf[(filekey, key)] = value
- if len(self._write_buf) > Constants.DB_WRITEBUF_LENGTH:
- self._flush_buf()
-
- return value
-
- def get_segment(self, filekey):
- if filekey in self._files.keys():
- return self._files[filekey]
- else:
- header = DBSegment.header( filekey=filekey,
- isize=self.isize, interval=self.interval,
- fileinterval=self.fileinterval, dbname=self.name)
- path = self.directory + os.path.sep + self.name + str(filekey)
- db_segment = DBSegment(self, path, header)
- logging.info("New File %s", db_segment.path)
- self._files[filekey] = db_segment
- return db_segment
-
-
-
- def _calc_key(self, key):
- return int( (key % self.fileinterval) / self.interval)
-
- def _calc_segment_key(self, key):
- return int( key / self.fileinterval )
-
- def _flush_buf(self):
- logging.debug("flush")
- keys = self._write_buf.keys()
- keys.sort()
- for (filekey,key) in keys:
- db_segment = self.get_segment(filekey)
- db_segment[key] = self._write_buf[(filekey, key)]
- self._write_buf.clear()
-
- def close(self):
- self._flush_buf()
-
- def item_size(self):
- """"""
- raise NotImplementedError()
-
-class InvalidSegmentError(Exception):
- def __init__(self, header, *args):
- Exception.__init__(self, *args)
- self.header = header
-
-class _LazyEval(object):
- def __init__(self, func, *args, **kwargs):
- self.func, self.args, self.kwargs = func, args, kwargs
- def __str__(self):
- return self.func(*self.args, **self.kwargs)
-
-def _to_hexstring(input_string):
- def _do(string):
- return " ".join(map(lambda c:"%02x"%ord(c), string)).upper()
- return _LazyEval(_do, input_string)
-
-class DBSegment(object):
- header = namedtuple('Header', 'filekey isize interval fileinterval dbname')
- header_format = "iiii%dp"%Constants.DB_MAX_NAME_LENGTH
- header_size = struct.calcsize( "iiii%dp"%Constants.DB_MAX_NAME_LENGTH )
-
- @staticmethod
- def fopen(path, db):
- filekey = int( os.path.basename(path)[len(db.name):] )
- with file(path, "rb") as fh:
- header = DBSegment.header._make( struct.unpack( DBSegment.header_format, fh.read( DBSegment.header_size ) ) )
- if not (header.filekey == filekey
- and header.interval == db.interval
- and header.fileinterval == db.fileinterval
- and header.dbname == db.name):
- raise InvalidSegmentError(header)
- f = DBSegment(db, path, header)
- return f
-
- def __init__(self, db, path, header):
- self.db = db
- self.path = path
- self.header = header
-
- self.filekey = header.filekey
- if not os.path.exists(self.path):
- logging.info("Write header in new file %s", self.path)
- self.fh = file(self.path, "wb")
- self.fh.write( struct.pack( DBSegment.header_format, *list(header) ) )
- end = ( DBSegment.header_size +
- ( self.db.fileinterval / self.db.interval ) * self.db.isize)
- self.fh.seek( end )
- self.fh.write( "\0" )
- self.fh.close()
- self.fh = None
- self.open()
-
- def __setitem__(self, key, data):
- logging.debug("Set %s = %s", key, _to_hexstring(data))
- self._seek(key)
- self.fh.write( data[:self.db.isize] )
- self.lastpos = self.fh.tell()
-
- def __getitem__(self, key):
- self._seek(key)
- data = self.fh.read( self.db.isize )
- self.lastpos = self.fh.tell()
- logging.debug("Get %s => %s", key, _to_hexstring(data))
- return data
-
- def _seek(self, key):
- offset = DBSegment.header_size + \
- key * self.db.isize
-
- if self.lastpos == offset:
- logging.debug("continue at %s", self.lastpos)
- else:
- logging.debug("seek to %s in %s instance %s (lastpos = %s)", offset, self.path, id(self), self.lastpos)
- if not self.fh:
- self.open()
- self.fh.seek( offset )
-
- def open(self):
- if not self.fh:
- logging.debug("Open %s",self.path)
- self.fh = file(self.path, "r+b")
- self.lastpos = self.fh.tell()
-
- def close(self):
- logging.info("Close %s instance %s", self.path, id(self))
- if self.fh:
- self.fh.close()
- self.fh = None
-
-class DBInt(DB):
- payload = namedtuple("payload", "x y z")
- payload_format = "iii"
- def get(self, key):
- return DBInt.payload._make(
- struct.unpack(
- DBInt.payload_format, self._get(key) ) )
-
- def set(self, key, value):
- return self._set(key,
- struct.pack(DBInt.payload_format, *list(value)))
-
- def item_size(self):
- return struct.calcsize( DBInt.payload_format )
-
-class DBSimpleInt(DB):
- def get(self, key):
- return struct.unpack("i", self._get(key))[0]
-
- def set(self, key, value):
- return self._set(key,
- struct.pack("i", value))
-
- def item_size(self):
- return struct.calcsize( "i" )
-
-import random
-if __name__ == "__main__":
- #logging.basicConfig(level=logging.DEBUG)
- logging.basicConfig(level=logging.INFO)
- #logging.basicConfig(level=logging.ERROR)
-
- # Alle 5 sek.
- start = time_ms()
- stop = time_ms() + Constants.INT_DAY*2
-
- db = DBSimple(1000 * Constants.INT_MILLI, Constants.INT_DAY, ".", "testdb")
- rand = 123456
- for ts in range(start, stop, 1000*Constants.INT_MILLI):
- db[ts] = -ts % rand
- assert db[ts] == -ts%rand, "%s != %s"%(db[ts], ts)
-
- r = range(start, stop, 1000*Constants.INT_MILLI)
- r.reverse()
- for ts in r:
- assert db[ts] == -ts%rand, "%s != %s"%(db[ts], -ts%rand)
-
-
- db.close()
-