summaryrefslogtreecommitdiff
path: root/ebus-datastore/ebus/datastore.py
diff options
context:
space:
mode:
Diffstat (limited to 'ebus-datastore/ebus/datastore.py')
-rw-r--r--ebus-datastore/ebus/datastore.py170
1 files changed, 0 insertions, 170 deletions
diff --git a/ebus-datastore/ebus/datastore.py b/ebus-datastore/ebus/datastore.py
deleted file mode 100644
index 4d37dc7..0000000
--- a/ebus-datastore/ebus/datastore.py
+++ /dev/null
@@ -1,170 +0,0 @@
-import bsddb
-from _bsddb import DBNotFoundError
-import json
-import time
-import os
-import numpy
-import struct
-from threading import Lock
-try:
- from collections import OrderedDict
-except ImportError:
- from ordereddict import OrderedDict
-
-__all__ = ['Datastore', 'Interval']
-
-class UnknownTableError(Exception): pass
-class EmptyTableError(Exception): pass
-class ReadOnlyError(Exception): pass
-class Interval:
- MS=1
- SEC=1000*MS
- MIN=60*SEC
- HOUR=60*MIN
- DAY=24*HOUR
- MONTH=30.5*DAY
- YEAR=365*MONTH
-
-class Datastore(object):
- def __init__(self,directory,readonly=False,chunkInterval=Interval.DAY):
- if not os.path.exists(directory):
- raise Exception("No such directory {d}".format(d=directory))
- self.directory = directory
- self.readonly = readonly
- self.chunkInterval = chunkInterval
- self.tables = OrderedDict()
- self.tableLock = Lock()
-
- def openChunkTable(self, name, timestamp):
- with self.tableLock:
- chunkTimestamp = timestamp - (timestamp % self.chunkInterval)
- key = "{n:s}:{ts:d}".format(n=name, ts=chunkTimestamp)
- if key in self.tables:
- return self.tables[key]
-
- path_dir = os.path.join(self.directory, name)
- if not os.path.exists(path_dir):
- raise UnknownTableError("no table with name {0}".format(name))
-
- path_time = time.strftime("%Y_%m_%d-%H_%M_%S",
- time.gmtime(chunkTimestamp/Interval.SEC))
- path = os.path.join(path_dir, path_time + ".bdb")
- print "open key={k} open-files={c}".format(k=key,c=len(self.tables))
-
- if len(self.tables) > 800:
- while len(self.tables) > 500:
- closekey = self.tables.keys()[0]
- self.tables.pop( closekey )
- print "closed key={name} open-files={c}".format(name=closekey,c=len(self.tables))
-
- if self.readonly:
- try:
- self.tables[key] = bsddb.btopen(path,"r")
- except bsddb.db.DBNoSuchFileError:
- raise EmptyTableError(key)
- else:
- self.tables[key] = bsddb.btopen(path)
- return self.tables[key]
-
- def createTable(self, name, format):
- if self.readonly:
- raise ReadOnlyError()
- with self.tableLock:
- path_dir = os.path.join(self.directory, name)
- if os.path.exists(path_dir):
- raise Exception("table exists")
- os.mkdir(path_dir)
- configfile = open(os.path.join(path_dir, "config.json"), "w")
- config = {'name':name, 'format':format, 'created':time.time()}
- print "Create config={config}".format(config=config)
- json.dump(config, configfile)
- configfile.close()
-
- def get_config(self, name):
- with self.tableLock:
- path_dir = os.path.join(self.directory, name)
- if not os.path.exists(path_dir):
- raise UnknownTableError(name)
- return json.load(open(os.path.join(path_dir, "config.json"), "r"))
-
- def has_table(self,name):
- with self.tableLock:
- return os.path.exists( os.path.join(self.directory, name) )
-
- def _addValue(self, name, ts, value, format):
- try:
- t = self.openChunkTable(name, ts)
- except UnknownTableError:
- self.createTable(name, format)
- t = self.openChunkTable(name, ts)
-
- key = str(ts)
- value = struct.pack(format, value)
- t[key] = value
-
- def addValueInt(self, name, ts, value): self._addValue(name, ts, value, "q")
- def addValueString(self, name, ts, value): self._addValue(name, ts, value, "30s")
- def addValueFloat(self, name, ts, value): self._addValue(name, ts, value, "d")
-
- def get_range_n(self, name, start, end, number=500):
- print "get_range_n start={s} end={e} number={n}".format(
- s=start, e=end, n=number)
- for kv in self.get_range(name, start, end,
- step=long(numpy.ceil((end-start)/float(number)))):
- yield kv
-
- def get_range(self, name, start, end, step=None):
- print "get_range name={n} start={s} end={e} step={p}({t})".format(
- n=name, s=start, e=end, p=step, t=type(step))
- assert step == None or step > 0
- config = self.get_config(name)
- unpackFunc = lambda v: struct.unpack(config['format'], v)[0]
- currentChunk = start - (start % self.chunkInterval)
- key = start
- while key<=end:
- try:
- t = self.openChunkTable(name,currentChunk)
- except EmptyTableError:
- # if the current chunk is empty switch to the next
- currentChunk += self.chunkInterval
- key = currentChunk
- continue
-
- try:
- key_b = str(key)
- (key_b, value) = t.set_location(key_b)
- key = long(key_b)
- except DBNotFoundError:
- # if a item does not exist it will point to the next item
- # in sorted order. If event that does not exist something is terrible
- # wrong and we continue with the next chunk
- currentChunk += self.chunkInterval
- key = currentChunk
- continue
-
- while key <= currentChunk + self.chunkInterval:
- #print time.strftime("%c", time.gmtime(key/1000)), unpack(value)
- yield (key, unpackFunc(value))
- try:
- if step:
- key += step
- key_b = str(key)
- key_b, value = t.set_location(key_b)
- else:
- key_b, value = t.next()
- key = long(key_b)
- except DBNotFoundError,e:
- # if key is not found then no data is
- # left in this btree chunk
- break
-
- currentChunk += self.chunkInterval
-
- def close(self):
- with self.tableLock:
- while len(self.tables) > 0:
- closekey = self.tables.keys()[0]
- self.tables.pop( closekey )
- print "closed key={name} open-files={c}".format(name=closekey,c=len(self.tables))
-
-# vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=python