summaryrefslogtreecommitdiff
path: root/heap/ebus-datastore/ebus/datastore.py
diff options
context:
space:
mode:
authorEbus-at-dockstar <ebus@dockstar>2014-07-26 19:57:28 +0200
committerEbus-at-dockstar <ebus@dockstar>2014-07-26 19:57:28 +0200
commit773093acabfb84b88c37bb0342d6a8d4a6f8267a (patch)
treeeed343d56e36dfcc87bfc0e81af762cbed5938d7 /heap/ebus-datastore/ebus/datastore.py
parentd624ed7b00203ec3beb3337a44ec1cf8075df453 (diff)
downloadebus-alt-773093acabfb84b88c37bb0342d6a8d4a6f8267a.tar.gz
ebus-alt-773093acabfb84b88c37bb0342d6a8d4a6f8267a.zip
move ebus-datastore to heap
Diffstat (limited to 'heap/ebus-datastore/ebus/datastore.py')
-rw-r--r--heap/ebus-datastore/ebus/datastore.py170
1 files changed, 170 insertions, 0 deletions
diff --git a/heap/ebus-datastore/ebus/datastore.py b/heap/ebus-datastore/ebus/datastore.py
new file mode 100644
index 0000000..4d37dc7
--- /dev/null
+++ b/heap/ebus-datastore/ebus/datastore.py
@@ -0,0 +1,170 @@
+import bsddb
+from _bsddb import DBNotFoundError
+import json
+import time
+import os
+import numpy
+import struct
+from threading import Lock
+try:
+ from collections import OrderedDict
+except ImportError:
+ from ordereddict import OrderedDict
+
+__all__ = ['Datastore', 'Interval']
+
+class UnknownTableError(Exception): pass
+class EmptyTableError(Exception): pass
+class ReadOnlyError(Exception): pass
+class Interval:
+ MS=1
+ SEC=1000*MS
+ MIN=60*SEC
+ HOUR=60*MIN
+ DAY=24*HOUR
+ MONTH=30.5*DAY
+ YEAR=365*MONTH
+
+class Datastore(object):
+ def __init__(self,directory,readonly=False,chunkInterval=Interval.DAY):
+ if not os.path.exists(directory):
+ raise Exception("No such directory {d}".format(d=directory))
+ self.directory = directory
+ self.readonly = readonly
+ self.chunkInterval = chunkInterval
+ self.tables = OrderedDict()
+ self.tableLock = Lock()
+
+ def openChunkTable(self, name, timestamp):
+ with self.tableLock:
+ chunkTimestamp = timestamp - (timestamp % self.chunkInterval)
+ key = "{n:s}:{ts:d}".format(n=name, ts=chunkTimestamp)
+ if key in self.tables:
+ return self.tables[key]
+
+ path_dir = os.path.join(self.directory, name)
+ if not os.path.exists(path_dir):
+ raise UnknownTableError("no table with name {0}".format(name))
+
+ path_time = time.strftime("%Y_%m_%d-%H_%M_%S",
+ time.gmtime(chunkTimestamp/Interval.SEC))
+ path = os.path.join(path_dir, path_time + ".bdb")
+ print "open key={k} open-files={c}".format(k=key,c=len(self.tables))
+
+ if len(self.tables) > 800:
+ while len(self.tables) > 500:
+ closekey = self.tables.keys()[0]
+ self.tables.pop( closekey )
+ print "closed key={name} open-files={c}".format(name=closekey,c=len(self.tables))
+
+ if self.readonly:
+ try:
+ self.tables[key] = bsddb.btopen(path,"r")
+ except bsddb.db.DBNoSuchFileError:
+ raise EmptyTableError(key)
+ else:
+ self.tables[key] = bsddb.btopen(path)
+ return self.tables[key]
+
+ def createTable(self, name, format):
+ if self.readonly:
+ raise ReadOnlyError()
+ with self.tableLock:
+ path_dir = os.path.join(self.directory, name)
+ if os.path.exists(path_dir):
+ raise Exception("table exists")
+ os.mkdir(path_dir)
+ configfile = open(os.path.join(path_dir, "config.json"), "w")
+ config = {'name':name, 'format':format, 'created':time.time()}
+ print "Create config={config}".format(config=config)
+ json.dump(config, configfile)
+ configfile.close()
+
+ def get_config(self, name):
+ with self.tableLock:
+ path_dir = os.path.join(self.directory, name)
+ if not os.path.exists(path_dir):
+ raise UnknownTableError(name)
+ return json.load(open(os.path.join(path_dir, "config.json"), "r"))
+
+ def has_table(self,name):
+ with self.tableLock:
+ return os.path.exists( os.path.join(self.directory, name) )
+
+ def _addValue(self, name, ts, value, format):
+ try:
+ t = self.openChunkTable(name, ts)
+ except UnknownTableError:
+ self.createTable(name, format)
+ t = self.openChunkTable(name, ts)
+
+ key = str(ts)
+ value = struct.pack(format, value)
+ t[key] = value
+
+ def addValueInt(self, name, ts, value): self._addValue(name, ts, value, "q")
+ def addValueString(self, name, ts, value): self._addValue(name, ts, value, "30s")
+ def addValueFloat(self, name, ts, value): self._addValue(name, ts, value, "d")
+
+ def get_range_n(self, name, start, end, number=500):
+ print "get_range_n start={s} end={e} number={n}".format(
+ s=start, e=end, n=number)
+ for kv in self.get_range(name, start, end,
+ step=long(numpy.ceil((end-start)/float(number)))):
+ yield kv
+
+ def get_range(self, name, start, end, step=None):
+ print "get_range name={n} start={s} end={e} step={p}({t})".format(
+ n=name, s=start, e=end, p=step, t=type(step))
+ assert step == None or step > 0
+ config = self.get_config(name)
+ unpackFunc = lambda v: struct.unpack(config['format'], v)[0]
+ currentChunk = start - (start % self.chunkInterval)
+ key = start
+ while key<=end:
+ try:
+ t = self.openChunkTable(name,currentChunk)
+ except EmptyTableError:
+ # if the current chunk is empty switch to the next
+ currentChunk += self.chunkInterval
+ key = currentChunk
+ continue
+
+ try:
+ key_b = str(key)
+ (key_b, value) = t.set_location(key_b)
+ key = long(key_b)
+ except DBNotFoundError:
+ # if a item does not exist it will point to the next item
+ # in sorted order. If event that does not exist something is terrible
+ # wrong and we continue with the next chunk
+ currentChunk += self.chunkInterval
+ key = currentChunk
+ continue
+
+ while key <= currentChunk + self.chunkInterval:
+ #print time.strftime("%c", time.gmtime(key/1000)), unpack(value)
+ yield (key, unpackFunc(value))
+ try:
+ if step:
+ key += step
+ key_b = str(key)
+ key_b, value = t.set_location(key_b)
+ else:
+ key_b, value = t.next()
+ key = long(key_b)
+ except DBNotFoundError,e:
+ # if key is not found then no data is
+ # left in this btree chunk
+ break
+
+ currentChunk += self.chunkInterval
+
+ def close(self):
+ with self.tableLock:
+ while len(self.tables) > 0:
+ closekey = self.tables.keys()[0]
+ self.tables.pop( closekey )
+ print "closed key={name} open-files={c}".format(name=closekey,c=len(self.tables))
+
+# vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=python