From c3d899867b26192562d9e4797e09f73842df241d Mon Sep 17 00:00:00 2001 From: Ebus-at-dockstar Date: Fri, 8 Mar 2013 01:46:12 +0100 Subject: flush every 30sec --- ebus/datastore.py | 3 ++- ebus/webhdf/__init__.py | 23 ++++++++++++++++++----- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/ebus/datastore.py b/ebus/datastore.py index 930b4fb..3ac63e1 100644 --- a/ebus/datastore.py +++ b/ebus/datastore.py @@ -1,8 +1,8 @@ # - import os from threading import Lock import tables +import logging class ValueFloat(tables.IsDescription): timestamp = tables.Time32Col() #index on Time64 is broken on pytables 2.4 @@ -97,3 +97,4 @@ class Datastore(object): for file in self.files.values(): file.close() + diff --git a/ebus/webhdf/__init__.py b/ebus/webhdf/__init__.py index c1f34dd..f661800 100644 --- a/ebus/webhdf/__init__.py +++ b/ebus/webhdf/__init__.py @@ -14,9 +14,10 @@ import bottle import ebus.datastore datastore = ebus.datastore.Datastore("hdf-data") - app = bottle.Bottle("ebus") +lastflush = -1 + @app.route('/') def index_file(): return static_files("index.html") @@ -39,6 +40,9 @@ def sensor_data_get(name): @app.put('/sensor/:name') @app.put('/sensor/:name/:timestamp') def sensor_data_put(name,timestamp=None): + FLUSH_INTERVAL=30 #sec + global lastflush + if not timestamp: timestamp = int(time.time()) try: value = bottle.request.POST.value @@ -55,9 +59,16 @@ def sensor_data_put(name,timestamp=None): else: return {'error':'INVALID_TYPE', msg:'Type {0} is invalid'.format(type)} - datastore.addValue(name, timestamp, value, klass, flush=True) - msg = "Stored {0} of type {1} with timestamp {2} to {3}".format(value,type,timestamp,name) + msg = "Storing {0} of type {1} with timestamp {2} to {3}".format(value,type,timestamp,name) logging.info(msg) + + datastore.addValue(name, timestamp, value, klass, flush=False) + + if time.time() - FLUSH_INTERVAL > lastflush: + logging.info("call datastore.flush()") + datastore.flush() + lastflush = time.time() + return {'error':None,'msg':msg} except Exception,e: logging.error("Error: " + "{0} name={1} value={2} type={3}".format(e, name, value, type)) @@ -68,6 +79,7 @@ def sensor_data_put(name,timestamp=None): @app.route('/sensor/:name/:startdate/:enddate') def sensor_name_start_end(name,startdate,enddate): SAMPLING_STEPSIZE=500 + NUMBER_OF_VALUES=250 try: startdate, enddate = int(startdate), int(enddate) logging.info("/sensor/ start={0} end={1}".format(startdate, enddate)) @@ -88,7 +100,7 @@ def sensor_name_start_end(name,startdate,enddate): diff_avg = numpy.average(diff) time_period = enddate - startdate samples = time_period / diff_avg - step = numpy.ceil(samples / 400.0) + step = numpy.ceil(samples / float(NUMBER_OF_VALUES)) data = [(x['timestamp'], x['value']) for x in table.where("(timestamp >= startdate) & (timestamp <= enddate)", step=step)] @@ -104,12 +116,13 @@ def sensor_name_start_end(name,startdate,enddate): @app.route('/avg/:name/:startdate') @app.route('/avg/:name/:startdate/:period') def sensor_avg_start(name, startdate, period=60*15): #15min + AVG_STEPSIZE=50 try: startdate, enddate = int(startdate), int(time.time()) logging.info("/avg/ start={0} end={1}".format(startdate, enddate)) table=datastore.getTable(name) with datastore: - sel_rows = table.where("(timestamp >= startdate) & (timestamp <=enddate)") + sel_rows = table.where("(timestamp >= startdate) & (timestamp <=enddate)", step=AVG_STEPSIZE) f_group = range(startdate, enddate, period) data = map(lambda (group_id, grouped_rows): (group_id, numpy.average([row['value'] for row in grouped_rows])), itertools.groupby(sel_rows, lambda t: (t['timestamp']/period)*period)) -- cgit v1.2.1