From 773093acabfb84b88c37bb0342d6a8d4a6f8267a Mon Sep 17 00:00:00 2001 From: Ebus-at-dockstar Date: Sat, 26 Jul 2014 19:57:28 +0200 Subject: move ebus-datastore to heap --- ebus-datastore/ebus/datastore.py | 170 --------------------------------------- 1 file changed, 170 deletions(-) delete mode 100644 ebus-datastore/ebus/datastore.py (limited to 'ebus-datastore/ebus/datastore.py') diff --git a/ebus-datastore/ebus/datastore.py b/ebus-datastore/ebus/datastore.py deleted file mode 100644 index 4d37dc7..0000000 --- a/ebus-datastore/ebus/datastore.py +++ /dev/null @@ -1,170 +0,0 @@ -import bsddb -from _bsddb import DBNotFoundError -import json -import time -import os -import numpy -import struct -from threading import Lock -try: - from collections import OrderedDict -except ImportError: - from ordereddict import OrderedDict - -__all__ = ['Datastore', 'Interval'] - -class UnknownTableError(Exception): pass -class EmptyTableError(Exception): pass -class ReadOnlyError(Exception): pass -class Interval: - MS=1 - SEC=1000*MS - MIN=60*SEC - HOUR=60*MIN - DAY=24*HOUR - MONTH=30.5*DAY - YEAR=365*MONTH - -class Datastore(object): - def __init__(self,directory,readonly=False,chunkInterval=Interval.DAY): - if not os.path.exists(directory): - raise Exception("No such directory {d}".format(d=directory)) - self.directory = directory - self.readonly = readonly - self.chunkInterval = chunkInterval - self.tables = OrderedDict() - self.tableLock = Lock() - - def openChunkTable(self, name, timestamp): - with self.tableLock: - chunkTimestamp = timestamp - (timestamp % self.chunkInterval) - key = "{n:s}:{ts:d}".format(n=name, ts=chunkTimestamp) - if key in self.tables: - return self.tables[key] - - path_dir = os.path.join(self.directory, name) - if not os.path.exists(path_dir): - raise UnknownTableError("no table with name {0}".format(name)) - - path_time = time.strftime("%Y_%m_%d-%H_%M_%S", - time.gmtime(chunkTimestamp/Interval.SEC)) - path = os.path.join(path_dir, path_time + ".bdb") - print "open key={k} open-files={c}".format(k=key,c=len(self.tables)) - - if len(self.tables) > 800: - while len(self.tables) > 500: - closekey = self.tables.keys()[0] - self.tables.pop( closekey ) - print "closed key={name} open-files={c}".format(name=closekey,c=len(self.tables)) - - if self.readonly: - try: - self.tables[key] = bsddb.btopen(path,"r") - except bsddb.db.DBNoSuchFileError: - raise EmptyTableError(key) - else: - self.tables[key] = bsddb.btopen(path) - return self.tables[key] - - def createTable(self, name, format): - if self.readonly: - raise ReadOnlyError() - with self.tableLock: - path_dir = os.path.join(self.directory, name) - if os.path.exists(path_dir): - raise Exception("table exists") - os.mkdir(path_dir) - configfile = open(os.path.join(path_dir, "config.json"), "w") - config = {'name':name, 'format':format, 'created':time.time()} - print "Create config={config}".format(config=config) - json.dump(config, configfile) - configfile.close() - - def get_config(self, name): - with self.tableLock: - path_dir = os.path.join(self.directory, name) - if not os.path.exists(path_dir): - raise UnknownTableError(name) - return json.load(open(os.path.join(path_dir, "config.json"), "r")) - - def has_table(self,name): - with self.tableLock: - return os.path.exists( os.path.join(self.directory, name) ) - - def _addValue(self, name, ts, value, format): - try: - t = self.openChunkTable(name, ts) - except UnknownTableError: - self.createTable(name, format) - t = self.openChunkTable(name, ts) - - key = str(ts) - value = struct.pack(format, value) - t[key] = value - - def addValueInt(self, name, ts, value): self._addValue(name, ts, value, "q") - def addValueString(self, name, ts, value): self._addValue(name, ts, value, "30s") - def addValueFloat(self, name, ts, value): self._addValue(name, ts, value, "d") - - def get_range_n(self, name, start, end, number=500): - print "get_range_n start={s} end={e} number={n}".format( - s=start, e=end, n=number) - for kv in self.get_range(name, start, end, - step=long(numpy.ceil((end-start)/float(number)))): - yield kv - - def get_range(self, name, start, end, step=None): - print "get_range name={n} start={s} end={e} step={p}({t})".format( - n=name, s=start, e=end, p=step, t=type(step)) - assert step == None or step > 0 - config = self.get_config(name) - unpackFunc = lambda v: struct.unpack(config['format'], v)[0] - currentChunk = start - (start % self.chunkInterval) - key = start - while key<=end: - try: - t = self.openChunkTable(name,currentChunk) - except EmptyTableError: - # if the current chunk is empty switch to the next - currentChunk += self.chunkInterval - key = currentChunk - continue - - try: - key_b = str(key) - (key_b, value) = t.set_location(key_b) - key = long(key_b) - except DBNotFoundError: - # if a item does not exist it will point to the next item - # in sorted order. If event that does not exist something is terrible - # wrong and we continue with the next chunk - currentChunk += self.chunkInterval - key = currentChunk - continue - - while key <= currentChunk + self.chunkInterval: - #print time.strftime("%c", time.gmtime(key/1000)), unpack(value) - yield (key, unpackFunc(value)) - try: - if step: - key += step - key_b = str(key) - key_b, value = t.set_location(key_b) - else: - key_b, value = t.next() - key = long(key_b) - except DBNotFoundError,e: - # if key is not found then no data is - # left in this btree chunk - break - - currentChunk += self.chunkInterval - - def close(self): - with self.tableLock: - while len(self.tables) > 0: - closekey = self.tables.keys()[0] - self.tables.pop( closekey ) - print "closed key={name} open-files={c}".format(name=closekey,c=len(self.tables)) - -# vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=python -- cgit v1.2.1