import bsddb from _bsddb import DBNotFoundError import json import time import os import numpy import struct from threading import Lock try: from collections import OrderedDict except ImportError: from ordereddict import OrderedDict __all__ = ['Datastore', 'Interval'] class UnknownTableError(Exception): pass class EmptyTableError(Exception): pass class ReadOnlyError(Exception): pass class Interval: MS=1 SEC=1000*MS MIN=60*SEC HOUR=60*MIN DAY=24*HOUR MONTH=30.5*DAY YEAR=365*MONTH class Datastore(object): def __init__(self,directory,readonly=False,chunkInterval=Interval.DAY): if not os.path.exists(directory): raise Exception("No such directory {d}".format(d=directory)) self.directory = directory self.readonly = readonly self.chunkInterval = chunkInterval self.tables = OrderedDict() self.tableLock = Lock() def openChunkTable(self, name, timestamp): with self.tableLock: chunkTimestamp = timestamp - (timestamp % self.chunkInterval) key = "{n:s}:{ts:d}".format(n=name, ts=chunkTimestamp) if key in self.tables: return self.tables[key] path_dir = os.path.join(self.directory, name) if not os.path.exists(path_dir): raise UnknownTableError("no table with name {0}".format(name)) path_time = time.strftime("%Y_%m_%d-%H_%M_%S", time.gmtime(chunkTimestamp/Interval.SEC)) path = os.path.join(path_dir, path_time + ".bdb") print "open key={k} open-files={c}".format(k=key,c=len(self.tables)) if len(self.tables) > 800: while len(self.tables) > 500: closekey = self.tables.keys()[0] self.tables.pop( closekey ) print "closed key={name} open-files={c}".format(name=closekey,c=len(self.tables)) if self.readonly: try: self.tables[key] = bsddb.btopen(path,"r") except bsddb.db.DBNoSuchFileError: raise EmptyTableError(key) else: self.tables[key] = bsddb.btopen(path) return self.tables[key] def createTable(self, name, format): if self.readonly: raise ReadOnlyError() with self.tableLock: path_dir = os.path.join(self.directory, name) if os.path.exists(path_dir): raise Exception("table exists") os.mkdir(path_dir) configfile = open(os.path.join(path_dir, "config.json"), "w") config = {'name':name, 'format':format, 'created':time.time()} print "Create config={config}".format(config=config) json.dump(config, configfile) configfile.close() def get_config(self, name): with self.tableLock: path_dir = os.path.join(self.directory, name) if not os.path.exists(path_dir): raise UnknownTableError(name) return json.load(open(os.path.join(path_dir, "config.json"), "r")) def has_table(self,name): with self.tableLock: return os.path.exists( os.path.join(self.directory, name) ) def _addValue(self, name, ts, value, format): try: t = self.openChunkTable(name, ts) except UnknownTableError: self.createTable(name, format) t = self.openChunkTable(name, ts) key = str(ts) value = struct.pack(format, value) t[key] = value def addValueInt(self, name, ts, value): self._addValue(name, ts, value, "q") def addValueString(self, name, ts, value): self._addValue(name, ts, value, "30s") def addValueFloat(self, name, ts, value): self._addValue(name, ts, value, "d") def get_range_n(self, name, start, end, number=500): print "get_range_n start={s} end={e} number={n}".format( s=start, e=end, n=number) for kv in self.get_range(name, start, end, step=long(numpy.ceil((end-start)/float(number)))): yield kv def get_range(self, name, start, end, step=None): print "get_range name={n} start={s} end={e} step={p}({t})".format( n=name, s=start, e=end, p=step, t=type(step)) assert step == None or step > 0 config = self.get_config(name) unpackFunc = lambda v: struct.unpack(config['format'], v)[0] currentChunk = start - (start % self.chunkInterval) key = start while key<=end: try: t = self.openChunkTable(name,currentChunk) except EmptyTableError: # if the current chunk is empty switch to the next currentChunk += self.chunkInterval key = currentChunk continue try: key_b = str(key) (key_b, value) = t.set_location(key_b) key = long(key_b) except DBNotFoundError: # if a item does not exist it will point to the next item # in sorted order. If event that does not exist something is terrible # wrong and we continue with the next chunk currentChunk += self.chunkInterval key = currentChunk continue while key <= currentChunk + self.chunkInterval: #print time.strftime("%c", time.gmtime(key/1000)), unpack(value) yield (key, unpackFunc(value)) try: if step: key += step key_b = str(key) key_b, value = t.set_location(key_b) else: key_b, value = t.next() key = long(key_b) except DBNotFoundError,e: # if key is not found then no data is # left in this btree chunk break currentChunk += self.chunkInterval def close(self): with self.tableLock: while len(self.tables) > 0: closekey = self.tables.keys()[0] self.tables.pop( closekey ) print "closed key={name} open-files={c}".format(name=closekey,c=len(self.tables)) # vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=python