# import os from threading import Lock import tables import logging class ValueFloat(tables.IsDescription): timestamp = tables.Time32Col() #index on Time64 is broken on pytables 2.4 value = tables.Float32Col() class ValueInt(tables.IsDescription): timestamp = tables.Time32Col() value = tables.Int32Col() class ValueString(tables.IsDescription): timestamp = tables.Time32Col() value = tables.StringCol(20) class Datastore(object): def __init__(self,basepath): self.basepath = basepath self.files = {} self.fileLock = Lock() self.tables = {} # list all table in a dictionary { path.dot.name : object } def listTables(self): return dict(map(lambda e: (e.title, e), self.files.root)) def __enter__(self): self.fileLock.__enter__() def __exit__(self, *a): self.fileLock.__exit__(*a) def getTable(self, name, klass=None): name = name.replace(".","_") with self.fileLock: if not name in self.files: path = os.path.join(self.basepath, name+".hdf") print "open {0}".format(path) if os.path.exists(path): self.files[name] = tables.openFile(path, "a", title = "eBus Datastore") elif klass: self.files[name] = tables.openFile(path, "w", title = "eBus Datastore") else: raise Exception("No such sensor {0}".format(name)) if not name in self.tables: t = None try: t = self.files[name].getNode("/"+name) except tables.NoSuchNodeError,e: if not klass: raise e t = self.files[name].createTable("/", name, klass, 'Description ' + name, filters=tables.Filters(complevel=1), createparents=True) t.cols.timestamp.createCSIndex() self.tables[name] = t return self.tables[name] def addValue(self, name, ts, value, klass): t = self.getTable(name, klass) if klass != None: assert klass.columns['value'].type == t.cols.value.type, "Type check failed" with self.fileLock: t.row['timestamp'] = ts t.row['value'] = value t.row.append() def addValueInt(self, name, ts, value): self.addValue(name, ts, value, ValueInt) def addValueString(self, name, ts, value): self.addValue(name, ts, value, ValueString) def addValueFloat(self, name, ts, value): self.addValue(name, ts, value, ValueFloat) def flush(self): with self.fileLock: for file in self.files.values(): logging.debug("flush file: %s" % file.filename) file.flush() def flush_table(self,name,table,close=False,index=False): if close or table.cols.timestamp.index.nelements == 0: logging.debug("close table: %s" % table.name) name = name.replace(".","_") #close file after first insert #to force a index build on re-open self.tables.pop(name).close() self.files.pop(name).close() elif 'row' in table.__dict__: logging.debug("flush buffer {name}".format(name=name)) table.row._flushBufferedRows() if index and table.indexed and table.autoIndex: logging.debug("index {name}".format(name=name)) # Flush any unindexed row rowsadded = table.flushRowsToIndex(_lastrow=True) if table._dirtyindexes: # Finally, re-index any dirty column table.reIndexDirty() def close(self): with self.fileLock: for file in self.files.values(): file.close()