diff options
author | Ebus-at-dockstar <ebus@dockstar> | 2014-07-26 19:57:28 +0200 |
---|---|---|
committer | Ebus-at-dockstar <ebus@dockstar> | 2014-07-26 19:57:28 +0200 |
commit | 773093acabfb84b88c37bb0342d6a8d4a6f8267a (patch) | |
tree | eed343d56e36dfcc87bfc0e81af762cbed5938d7 /heap/ebus-datastore/ebus/datastore.py | |
parent | d624ed7b00203ec3beb3337a44ec1cf8075df453 (diff) | |
download | ebus-alt-773093acabfb84b88c37bb0342d6a8d4a6f8267a.tar.gz ebus-alt-773093acabfb84b88c37bb0342d6a8d4a6f8267a.zip |
move ebus-datastore to heap
Diffstat (limited to 'heap/ebus-datastore/ebus/datastore.py')
-rw-r--r-- | heap/ebus-datastore/ebus/datastore.py | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/heap/ebus-datastore/ebus/datastore.py b/heap/ebus-datastore/ebus/datastore.py new file mode 100644 index 0000000..4d37dc7 --- /dev/null +++ b/heap/ebus-datastore/ebus/datastore.py @@ -0,0 +1,170 @@ +import bsddb +from _bsddb import DBNotFoundError +import json +import time +import os +import numpy +import struct +from threading import Lock +try: + from collections import OrderedDict +except ImportError: + from ordereddict import OrderedDict + +__all__ = ['Datastore', 'Interval'] + +class UnknownTableError(Exception): pass +class EmptyTableError(Exception): pass +class ReadOnlyError(Exception): pass +class Interval: + MS=1 + SEC=1000*MS + MIN=60*SEC + HOUR=60*MIN + DAY=24*HOUR + MONTH=30.5*DAY + YEAR=365*MONTH + +class Datastore(object): + def __init__(self,directory,readonly=False,chunkInterval=Interval.DAY): + if not os.path.exists(directory): + raise Exception("No such directory {d}".format(d=directory)) + self.directory = directory + self.readonly = readonly + self.chunkInterval = chunkInterval + self.tables = OrderedDict() + self.tableLock = Lock() + + def openChunkTable(self, name, timestamp): + with self.tableLock: + chunkTimestamp = timestamp - (timestamp % self.chunkInterval) + key = "{n:s}:{ts:d}".format(n=name, ts=chunkTimestamp) + if key in self.tables: + return self.tables[key] + + path_dir = os.path.join(self.directory, name) + if not os.path.exists(path_dir): + raise UnknownTableError("no table with name {0}".format(name)) + + path_time = time.strftime("%Y_%m_%d-%H_%M_%S", + time.gmtime(chunkTimestamp/Interval.SEC)) + path = os.path.join(path_dir, path_time + ".bdb") + print "open key={k} open-files={c}".format(k=key,c=len(self.tables)) + + if len(self.tables) > 800: + while len(self.tables) > 500: + closekey = self.tables.keys()[0] + self.tables.pop( closekey ) + print "closed key={name} open-files={c}".format(name=closekey,c=len(self.tables)) + + if self.readonly: + try: + self.tables[key] = bsddb.btopen(path,"r") + except bsddb.db.DBNoSuchFileError: + raise EmptyTableError(key) + else: + self.tables[key] = bsddb.btopen(path) + return self.tables[key] + + def createTable(self, name, format): + if self.readonly: + raise ReadOnlyError() + with self.tableLock: + path_dir = os.path.join(self.directory, name) + if os.path.exists(path_dir): + raise Exception("table exists") + os.mkdir(path_dir) + configfile = open(os.path.join(path_dir, "config.json"), "w") + config = {'name':name, 'format':format, 'created':time.time()} + print "Create config={config}".format(config=config) + json.dump(config, configfile) + configfile.close() + + def get_config(self, name): + with self.tableLock: + path_dir = os.path.join(self.directory, name) + if not os.path.exists(path_dir): + raise UnknownTableError(name) + return json.load(open(os.path.join(path_dir, "config.json"), "r")) + + def has_table(self,name): + with self.tableLock: + return os.path.exists( os.path.join(self.directory, name) ) + + def _addValue(self, name, ts, value, format): + try: + t = self.openChunkTable(name, ts) + except UnknownTableError: + self.createTable(name, format) + t = self.openChunkTable(name, ts) + + key = str(ts) + value = struct.pack(format, value) + t[key] = value + + def addValueInt(self, name, ts, value): self._addValue(name, ts, value, "q") + def addValueString(self, name, ts, value): self._addValue(name, ts, value, "30s") + def addValueFloat(self, name, ts, value): self._addValue(name, ts, value, "d") + + def get_range_n(self, name, start, end, number=500): + print "get_range_n start={s} end={e} number={n}".format( + s=start, e=end, n=number) + for kv in self.get_range(name, start, end, + step=long(numpy.ceil((end-start)/float(number)))): + yield kv + + def get_range(self, name, start, end, step=None): + print "get_range name={n} start={s} end={e} step={p}({t})".format( + n=name, s=start, e=end, p=step, t=type(step)) + assert step == None or step > 0 + config = self.get_config(name) + unpackFunc = lambda v: struct.unpack(config['format'], v)[0] + currentChunk = start - (start % self.chunkInterval) + key = start + while key<=end: + try: + t = self.openChunkTable(name,currentChunk) + except EmptyTableError: + # if the current chunk is empty switch to the next + currentChunk += self.chunkInterval + key = currentChunk + continue + + try: + key_b = str(key) + (key_b, value) = t.set_location(key_b) + key = long(key_b) + except DBNotFoundError: + # if a item does not exist it will point to the next item + # in sorted order. If event that does not exist something is terrible + # wrong and we continue with the next chunk + currentChunk += self.chunkInterval + key = currentChunk + continue + + while key <= currentChunk + self.chunkInterval: + #print time.strftime("%c", time.gmtime(key/1000)), unpack(value) + yield (key, unpackFunc(value)) + try: + if step: + key += step + key_b = str(key) + key_b, value = t.set_location(key_b) + else: + key_b, value = t.next() + key = long(key_b) + except DBNotFoundError,e: + # if key is not found then no data is + # left in this btree chunk + break + + currentChunk += self.chunkInterval + + def close(self): + with self.tableLock: + while len(self.tables) > 0: + closekey = self.tables.keys()[0] + self.tables.pop( closekey ) + print "closed key={name} open-files={c}".format(name=closekey,c=len(self.tables)) + +# vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=python |