diff options
Diffstat (limited to 'heap/ebus/datastore.py')
-rw-r--r-- | heap/ebus/datastore.py | 116 |
1 files changed, 116 insertions, 0 deletions
diff --git a/heap/ebus/datastore.py b/heap/ebus/datastore.py new file mode 100644 index 0000000..66303f3 --- /dev/null +++ b/heap/ebus/datastore.py @@ -0,0 +1,116 @@ +# +import os +from threading import Lock +import tables +import logging + +class ValueFloat(tables.IsDescription): + timestamp = tables.Time32Col() #index on Time64 is broken on pytables 2.4 + value = tables.Float32Col() + +class ValueInt(tables.IsDescription): + timestamp = tables.Time32Col() + value = tables.Int32Col() + +class ValueString(tables.IsDescription): + timestamp = tables.Time32Col() + value = tables.StringCol(20) + + +class Datastore(object): + def __init__(self,basepath): + self.basepath = basepath + self.files = {} + self.fileLock = Lock() + self.tables = {} + + # list all table in a dictionary { path.dot.name : object } + def listTables(self): + return dict(map(lambda e: (e.title, e), self.files.root)) + + def __enter__(self): + self.fileLock.__enter__() + + def __exit__(self, *a): + self.fileLock.__exit__(*a) + + def getTable(self, name, klass=None): + name = name.replace(".","_") + with self.fileLock: + if not name in self.files: + path = os.path.join(self.basepath, name+".hdf") + print "open {0}".format(path) + if os.path.exists(path): + self.files[name] = tables.openFile(path, "a", title = "eBus Datastore") + elif klass: + self.files[name] = tables.openFile(path, "w", title = "eBus Datastore") + else: + raise Exception("No such sensor {0}".format(name)) + + if not name in self.tables: + t = None + try: + t = self.files[name].getNode("/"+name) + except tables.NoSuchNodeError,e: + if not klass: raise e + + t = self.files[name].createTable("/", + name, + klass, + 'Description ' + name, + filters=tables.Filters(complevel=1), + createparents=True) + t.cols.timestamp.createCSIndex() + + self.tables[name] = t + + return self.tables[name] + + def addValue(self, name, ts, value, klass): + t = self.getTable(name, klass) + if klass != None: + assert klass.columns['value'].type == t.cols.value.type, "Type check failed" + + with self.fileLock: + t.row['timestamp'] = ts + t.row['value'] = value + t.row.append() + + def addValueInt(self, name, ts, value): self.addValue(name, ts, value, ValueInt) + def addValueString(self, name, ts, value): self.addValue(name, ts, value, ValueString) + def addValueFloat(self, name, ts, value): self.addValue(name, ts, value, ValueFloat) + + def flush(self): + with self.fileLock: + for file in self.files.values(): + logging.debug("flush file: %s" % file.filename) + file.flush() + + def flush_table(self,name,table,close=False,index=False): + if close or table.cols.timestamp.index.nelements == 0: + logging.debug("close table: %s" % table.name) + name = name.replace(".","_") + #close file after first insert + #to force a index build on re-open + self.tables.pop(name).close() + self.files.pop(name).close() + elif 'row' in table.__dict__: + logging.debug("flush buffer {name}".format(name=name)) + table.row._flushBufferedRows() + + if index and table.indexed and table.autoIndex: + logging.debug("index {name}".format(name=name)) + # Flush any unindexed row + rowsadded = table.flushRowsToIndex(_lastrow=True) + if table._dirtyindexes: + # Finally, re-index any dirty column + table.reIndexDirty() + + + + def close(self): + with self.fileLock: + for file in self.files.values(): + file.close() + + |