summaryrefslogtreecommitdiff
path: root/heap/ebus/datastore.py
diff options
context:
space:
mode:
Diffstat (limited to 'heap/ebus/datastore.py')
-rw-r--r--heap/ebus/datastore.py116
1 files changed, 116 insertions, 0 deletions
diff --git a/heap/ebus/datastore.py b/heap/ebus/datastore.py
new file mode 100644
index 0000000..66303f3
--- /dev/null
+++ b/heap/ebus/datastore.py
@@ -0,0 +1,116 @@
+#
+import os
+from threading import Lock
+import tables
+import logging
+
+class ValueFloat(tables.IsDescription):
+ timestamp = tables.Time32Col() #index on Time64 is broken on pytables 2.4
+ value = tables.Float32Col()
+
+class ValueInt(tables.IsDescription):
+ timestamp = tables.Time32Col()
+ value = tables.Int32Col()
+
+class ValueString(tables.IsDescription):
+ timestamp = tables.Time32Col()
+ value = tables.StringCol(20)
+
+
+class Datastore(object):
+ def __init__(self,basepath):
+ self.basepath = basepath
+ self.files = {}
+ self.fileLock = Lock()
+ self.tables = {}
+
+ # list all table in a dictionary { path.dot.name : object }
+ def listTables(self):
+ return dict(map(lambda e: (e.title, e), self.files.root))
+
+ def __enter__(self):
+ self.fileLock.__enter__()
+
+ def __exit__(self, *a):
+ self.fileLock.__exit__(*a)
+
+ def getTable(self, name, klass=None):
+ name = name.replace(".","_")
+ with self.fileLock:
+ if not name in self.files:
+ path = os.path.join(self.basepath, name+".hdf")
+ print "open {0}".format(path)
+ if os.path.exists(path):
+ self.files[name] = tables.openFile(path, "a", title = "eBus Datastore")
+ elif klass:
+ self.files[name] = tables.openFile(path, "w", title = "eBus Datastore")
+ else:
+ raise Exception("No such sensor {0}".format(name))
+
+ if not name in self.tables:
+ t = None
+ try:
+ t = self.files[name].getNode("/"+name)
+ except tables.NoSuchNodeError,e:
+ if not klass: raise e
+
+ t = self.files[name].createTable("/",
+ name,
+ klass,
+ 'Description ' + name,
+ filters=tables.Filters(complevel=1),
+ createparents=True)
+ t.cols.timestamp.createCSIndex()
+
+ self.tables[name] = t
+
+ return self.tables[name]
+
+ def addValue(self, name, ts, value, klass):
+ t = self.getTable(name, klass)
+ if klass != None:
+ assert klass.columns['value'].type == t.cols.value.type, "Type check failed"
+
+ with self.fileLock:
+ t.row['timestamp'] = ts
+ t.row['value'] = value
+ t.row.append()
+
+ def addValueInt(self, name, ts, value): self.addValue(name, ts, value, ValueInt)
+ def addValueString(self, name, ts, value): self.addValue(name, ts, value, ValueString)
+ def addValueFloat(self, name, ts, value): self.addValue(name, ts, value, ValueFloat)
+
+ def flush(self):
+ with self.fileLock:
+ for file in self.files.values():
+ logging.debug("flush file: %s" % file.filename)
+ file.flush()
+
+ def flush_table(self,name,table,close=False,index=False):
+ if close or table.cols.timestamp.index.nelements == 0:
+ logging.debug("close table: %s" % table.name)
+ name = name.replace(".","_")
+ #close file after first insert
+ #to force a index build on re-open
+ self.tables.pop(name).close()
+ self.files.pop(name).close()
+ elif 'row' in table.__dict__:
+ logging.debug("flush buffer {name}".format(name=name))
+ table.row._flushBufferedRows()
+
+ if index and table.indexed and table.autoIndex:
+ logging.debug("index {name}".format(name=name))
+ # Flush any unindexed row
+ rowsadded = table.flushRowsToIndex(_lastrow=True)
+ if table._dirtyindexes:
+ # Finally, re-index any dirty column
+ table.reIndexDirty()
+
+
+
+ def close(self):
+ with self.fileLock:
+ for file in self.files.values():
+ file.close()
+
+