diff options
author | Ebus-at-dockstar <ebus@dockstar> | 2013-03-08 01:46:12 +0100 |
---|---|---|
committer | Ebus-at-dockstar <ebus@dockstar> | 2013-03-08 01:46:12 +0100 |
commit | c3d899867b26192562d9e4797e09f73842df241d (patch) | |
tree | 51aee9f1ae41ee7c798b77dd032d4a2df5253241 | |
parent | 604e92559787ed7e7b590321f6a1ff8fc515e06d (diff) | |
download | ebus-alt-c3d899867b26192562d9e4797e09f73842df241d.tar.gz ebus-alt-c3d899867b26192562d9e4797e09f73842df241d.zip |
flush every 30sec
-rw-r--r-- | ebus/datastore.py | 3 | ||||
-rw-r--r-- | ebus/webhdf/__init__.py | 23 |
2 files changed, 20 insertions, 6 deletions
diff --git a/ebus/datastore.py b/ebus/datastore.py index 930b4fb..3ac63e1 100644 --- a/ebus/datastore.py +++ b/ebus/datastore.py @@ -1,8 +1,8 @@ # - import os from threading import Lock import tables +import logging class ValueFloat(tables.IsDescription): timestamp = tables.Time32Col() #index on Time64 is broken on pytables 2.4 @@ -97,3 +97,4 @@ class Datastore(object): for file in self.files.values(): file.close() + diff --git a/ebus/webhdf/__init__.py b/ebus/webhdf/__init__.py index c1f34dd..f661800 100644 --- a/ebus/webhdf/__init__.py +++ b/ebus/webhdf/__init__.py @@ -14,9 +14,10 @@ import bottle import ebus.datastore datastore = ebus.datastore.Datastore("hdf-data") - app = bottle.Bottle("ebus") +lastflush = -1 + @app.route('/') def index_file(): return static_files("index.html") @@ -39,6 +40,9 @@ def sensor_data_get(name): @app.put('/sensor/:name') @app.put('/sensor/:name/:timestamp') def sensor_data_put(name,timestamp=None): + FLUSH_INTERVAL=30 #sec + global lastflush + if not timestamp: timestamp = int(time.time()) try: value = bottle.request.POST.value @@ -55,9 +59,16 @@ def sensor_data_put(name,timestamp=None): else: return {'error':'INVALID_TYPE', msg:'Type {0} is invalid'.format(type)} - datastore.addValue(name, timestamp, value, klass, flush=True) - msg = "Stored {0} of type {1} with timestamp {2} to {3}".format(value,type,timestamp,name) + msg = "Storing {0} of type {1} with timestamp {2} to {3}".format(value,type,timestamp,name) logging.info(msg) + + datastore.addValue(name, timestamp, value, klass, flush=False) + + if time.time() - FLUSH_INTERVAL > lastflush: + logging.info("call datastore.flush()") + datastore.flush() + lastflush = time.time() + return {'error':None,'msg':msg} except Exception,e: logging.error("Error: " + "{0} name={1} value={2} type={3}".format(e, name, value, type)) @@ -68,6 +79,7 @@ def sensor_data_put(name,timestamp=None): @app.route('/sensor/:name/:startdate/:enddate') def sensor_name_start_end(name,startdate,enddate): SAMPLING_STEPSIZE=500 + NUMBER_OF_VALUES=250 try: startdate, enddate = int(startdate), int(enddate) logging.info("/sensor/ start={0} end={1}".format(startdate, enddate)) @@ -88,7 +100,7 @@ def sensor_name_start_end(name,startdate,enddate): diff_avg = numpy.average(diff) time_period = enddate - startdate samples = time_period / diff_avg - step = numpy.ceil(samples / 400.0) + step = numpy.ceil(samples / float(NUMBER_OF_VALUES)) data = [(x['timestamp'], x['value']) for x in table.where("(timestamp >= startdate) & (timestamp <= enddate)", step=step)] @@ -104,12 +116,13 @@ def sensor_name_start_end(name,startdate,enddate): @app.route('/avg/:name/:startdate') @app.route('/avg/:name/:startdate/:period') def sensor_avg_start(name, startdate, period=60*15): #15min + AVG_STEPSIZE=50 try: startdate, enddate = int(startdate), int(time.time()) logging.info("/avg/ start={0} end={1}".format(startdate, enddate)) table=datastore.getTable(name) with datastore: - sel_rows = table.where("(timestamp >= startdate) & (timestamp <=enddate)") + sel_rows = table.where("(timestamp >= startdate) & (timestamp <=enddate)", step=AVG_STEPSIZE) f_group = range(startdate, enddate, period) data = map(lambda (group_id, grouped_rows): (group_id, numpy.average([row['value'] for row in grouped_rows])), itertools.groupby(sel_rows, lambda t: (t['timestamp']/period)*period)) |