#!/usr/bin/env python from collections import namedtuple import struct import time import glob import os import logging class Constants: DB_MAX_NAME_LENGTH = 10 DB_WRITEBUF_LENGTH = 10 DBDAY_FORMAT_DAY = "%d%m%Y" DB_OPEN_FILES = 5 INT_MILLI = 1 INT_SEC = 1000 INT_MIN = 60 * INT_SEC INT_HOUR = 60 * INT_MIN INT_DAY = 24 * INT_HOUR INT_WEEK = 7 * INT_DAY INT_MONTH = 31 * INT_DAY INT_YEAR = 365 * INT_DAY C = Constants def time_ms(): return int(time.time() * 1000) class DB(object): def __init__(self, interval, fileinterval, directory, name): assert len(name) <= Constants.DB_MAX_NAME_LENGTH, \ "name must be smaller than {0} (is {1})".format(Constants.DB_MAX_NAME_LENGTH, len(name)) self.interval = interval self.fileinterval = fileinterval self.directory = directory self.name = name self.isize = self.item_size() # item_size self._files = {} self._write_buf = {} self.last_access = [] for path in glob.glob(self.directory + os.path.sep + name + "*"): dbday = DBSegment.fopen(path, self) self._files[dbday.filekey] = dbday def __getitem__(self, key): return self.get(key) def get(self, key): raise NotImplementedError() def _get(self, key): if not isinstance(key, int): raise TypeError("DB keys must be millisecond timestamps (int)") logging.debug("get -> key %s fileinterval %s interval %s", key, self.fileinterval, self.interval) filekey = self._calc_segment_key(key) key = self._calc_key(key) logging.debug("get => filekey %s key %s", filekey, key ) if (filekey, key) in self._write_buf.keys(): logging.debug("Write-Cache hit %s", key) return self._write_buf[(filekey, key)] db_segment = self.get_segment(filekey) return db_segment[key] def __setitem__(self, key, value): return self.set(key, value) def set(self, key, value): raise NotImplementedError() def _set(self, key, value): if not isinstance(key, int): raise TypeError("DB keys must be millisecond timestamps (int)") logging.debug("get -> key %s fileinterval %s interval %s", key, self.fileinterval, self.interval) filekey = self._calc_segment_key(key) key = self._calc_key(key) logging.debug("get => filekey %s key %s", filekey, key ) self._write_buf[(filekey, key)] = value if len(self._write_buf) > Constants.DB_WRITEBUF_LENGTH: self._flush_buf() return value def get_segment(self, filekey): if filekey in self._files.keys(): return self._files[filekey] else: header = DBSegment.header( filekey=filekey, isize=self.isize, interval=self.interval, fileinterval=self.fileinterval, dbname=self.name) path = self.directory + os.path.sep + self.name + str(filekey) db_segment = DBSegment(self, path, header) logging.info("New File %s", db_segment.path) self._files[filekey] = db_segment return db_segment def _calc_key(self, key): return int( (key % self.fileinterval) / self.interval) def _calc_segment_key(self, key): return int( key / self.fileinterval ) def _flush_buf(self): logging.debug("flush") keys = self._write_buf.keys() keys.sort() for (filekey,key) in keys: db_segment = self.get_segment(filekey) db_segment[key] = self._write_buf[(filekey, key)] self._write_buf.clear() def close(self): self._flush_buf() def item_size(self): """""" raise NotImplementedError() class InvalidSegmentError(Exception): def __init__(self, header, *args): Exception.__init__(self, *args) self.header = header class _LazyEval(object): def __init__(self, func, *args, **kwargs): self.func, self.args, self.kwargs = func, args, kwargs def __str__(self): return self.func(*self.args, **self.kwargs) def _to_hexstring(input_string): def _do(string): return " ".join(map(lambda c:"%02x"%ord(c), string)).upper() return _LazyEval(_do, input_string) class DBSegment(object): header = namedtuple('Header', 'filekey isize interval fileinterval dbname') header_format = "iiii%dp"%Constants.DB_MAX_NAME_LENGTH header_size = struct.calcsize( "iiii%dp"%Constants.DB_MAX_NAME_LENGTH ) @staticmethod def fopen(path, db): filekey = int( os.path.basename(path)[len(db.name):] ) with file(path, "rb") as fh: header = DBSegment.header._make( struct.unpack( DBSegment.header_format, fh.read( DBSegment.header_size ) ) ) if not (header.filekey == filekey and header.interval == db.interval and header.fileinterval == db.fileinterval and header.dbname == db.name): raise InvalidSegmentError(header) f = DBSegment(db, path, header) return f def __init__(self, db, path, header): self.db = db self.path = path self.header = header self.filekey = header.filekey if not os.path.exists(self.path): logging.info("Write header in new file %s", self.path) self.fh = file(self.path, "wb") self.fh.write( struct.pack( DBSegment.header_format, *list(header) ) ) end = ( DBSegment.header_size + ( self.db.fileinterval / self.db.interval ) * self.db.isize) self.fh.seek( end ) self.fh.write( "\0" ) self.fh.close() self.fh = None self.open() def __setitem__(self, key, data): logging.debug("Set %s = %s", key, _to_hexstring(data)) self._seek(key) self.fh.write( data[:self.db.isize] ) self.lastpos = self.fh.tell() def __getitem__(self, key): self._seek(key) data = self.fh.read( self.db.isize ) self.lastpos = self.fh.tell() logging.debug("Get %s => %s", key, _to_hexstring(data)) return data def _seek(self, key): offset = DBSegment.header_size + \ key * self.db.isize if self.lastpos == offset: logging.debug("continue at %s", self.lastpos) else: logging.debug("seek to %s in %s instance %s (lastpos = %s)", offset, self.path, id(self), self.lastpos) if not self.fh: self.open() self.fh.seek( offset ) def open(self): if not self.fh: logging.debug("Open %s",self.path) self.fh = file(self.path, "r+b") self.lastpos = self.fh.tell() def close(self): logging.info("Close %s instance %s", self.path, id(self)) if self.fh: self.fh.close() self.fh = None class DBInt(DB): payload = namedtuple("payload", "x y z") payload_format = "iii" def get(self, key): return DBInt.payload._make( struct.unpack( DBInt.payload_format, self._get(key) ) ) def set(self, key, value): return self._set(key, struct.pack(DBInt.payload_format, *list(value))) def item_size(self): return struct.calcsize( DBInt.payload_format ) class DBSimpleInt(DB): def get(self, key): return struct.unpack("i", self._get(key))[0] def set(self, key, value): return self._set(key, struct.pack("i", value)) def item_size(self): return struct.calcsize( "i" ) import random if __name__ == "__main__": #logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.INFO) #logging.basicConfig(level=logging.ERROR) # Alle 5 sek. start = time_ms() stop = time_ms() + Constants.INT_DAY*2 db = DBSimple(1000 * Constants.INT_MILLI, Constants.INT_DAY, ".", "testdb") rand = 123456 for ts in range(start, stop, 1000*Constants.INT_MILLI): db[ts] = -ts % rand assert db[ts] == -ts%rand, "%s != %s"%(db[ts], ts) r = range(start, stop, 1000*Constants.INT_MILLI) r.reverse() for ts in r: assert db[ts] == -ts%rand, "%s != %s"%(db[ts], -ts%rand) db.close()