diff options
Diffstat (limited to 'heap/ebus/webhdf/__init__.py')
-rw-r--r-- | heap/ebus/webhdf/__init__.py | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/heap/ebus/webhdf/__init__.py b/heap/ebus/webhdf/__init__.py new file mode 100644 index 0000000..44b5ab4 --- /dev/null +++ b/heap/ebus/webhdf/__init__.py @@ -0,0 +1,142 @@ +# -*- coding: utf-8 -*- +import os +import json +import datetime +import time +import itertools +import traceback +import logging +logging.basicConfig(level=logging.DEBUG) + +import numpy + +import bottle + +import ebus.datastore + +datastore = ebus.datastore.Datastore("hdf-data") +app = bottle.Bottle("ebus") + +lastflushes = dict() + +def get_lastflush(name): + if name not in lastflushes: + lastflushes[name] = -1 + + return lastflushes[name] + +@app.route('/') +def index_file(): + return static_files("index.html") + +@app.route('/static/:filename#.+#') +def static_files(filename): + return bottle.static_file(filename, root=os.path.join(os.path.dirname(__file__),"static")) + + +@app.get('/sensor/:name') +def sensor_data_get(name): + try: + table = datastore.getTable(name) + with datastore: + data = table.readSorted(sortby="timestamp", checkCSI=True, start=0, stop=1,step=-1).tolist()[0] + return {'sensor':name,'error':None,'data':data} + except Exception,e: + return {'sensor':name,'data':None, 'error':str(e)} + +@app.put('/sensor/:name') +@app.put('/sensor/:name/:timestamp') +def sensor_data_put(name,timestamp=None): + FLUSH_INTERVAL=240 #sec + global lastflush + + if not timestamp: timestamp = int(time.time()) + try: + value = bottle.request.POST.value + type = bottle.request.POST.type + + if type == "int": + klass = ebus.datastore.ValueInt + elif type == "float": + klass = ebus.datastore.ValueFloat + elif type == "string": + klass = ebus.datastore.ValueString + elif type == "": + klass = None + else: + return {'error':'INVALID_TYPE', msg:'Type {0} is invalid'.format(type)} + + msg = "Storing {0} of type {1} with timestamp {2} to {3}".format(value,type,timestamp,name) + logging.info(msg) + + datastore.addValue(name, timestamp, value, klass) + + if time.time() - FLUSH_INTERVAL > get_lastflush(name): + table = datastore.getTable(name) + with datastore.fileLock: + datastore.flush_table(name,table,close=False,index=True) + lastflushes[name] = time.time() + + return {'error':None,'msg':msg} + except Exception,e: + traceback.print_exc() + logging.error("Error: " + "{0} name={1} value={2} type={3}".format(e, name, value, type)) + return {'error':str(e),'msg':str(e)} + + + +@app.route('/sensor/:name/:startdate/:enddate') +def sensor_name_start_end(name,startdate,enddate): + SAMPLING_STEPSIZE=500 + NUMBER_OF_VALUES=250 + try: + startdate, enddate = int(startdate), int(enddate) + logging.info("/sensor/ start={0} end={1}".format(startdate, enddate)) + table=datastore.getTable(name) + with datastore: + i = table.where("(timestamp >= startdate) & (timestamp <= enddate)", + condvars={'startdate':startdate,'enddate':enddate,'timestamp':table.cols.timestamp}, + step=SAMPLING_STEPSIZE) + timestamps = [] + try: + for x in range(20): + i.next() + timestamps.append(i['timestamp']) + except: pass + + if len(timestamps) > 10: + diff = map(lambda (x1,x2): (x2-x1)/SAMPLING_STEPSIZE, zip(timestamps[:-1], timestamps[1:])) + diff_avg = numpy.average(diff) + time_period = enddate - startdate + samples = time_period / diff_avg + step = numpy.ceil(samples / float(NUMBER_OF_VALUES)) + + data = [(x['timestamp'], x['value']) + for x in table.where("(timestamp >= startdate) & (timestamp <= enddate)", step=step)] + logging.info("diff={0} samples={1} step={2} len={3} ({4})".format(diff_avg, samples, step, len(data),name)) + else: + # Not enough data, sampling approach useless, deliver all data + data = [(x['timestamp'], x['value']) for x in table.where("(timestamp >= startdate) & (timestamp <= enddate)")] + return {'sensor':name, 'error':None,'data':data} + except Exception,e: + logging.error("Error: " + str(e) + str(type(e))) + return {'sensor':name,'data':None, 'error':str(e)} + +@app.route('/avg/:name/:startdate') +@app.route('/avg/:name/:startdate/:period') +def sensor_avg_start(name, startdate, period=60*15): #15min + AVG_STEPSIZE=50 + try: + startdate, enddate = int(startdate), int(time.time()) + logging.info("/avg/ start={0} end={1}".format(startdate, enddate)) + table=datastore.getTable(name) + with datastore: + sel_rows = table.where("(timestamp >= startdate) & (timestamp <=enddate)", step=AVG_STEPSIZE) + f_group = range(startdate, enddate, period) + data = map(lambda (group_id, grouped_rows): (group_id, numpy.average([row['value'] for row in grouped_rows])), + itertools.groupby(sel_rows, lambda t: (t['timestamp']/period)*period)) + data = map(lambda (timestamp,value): (timestamp, value), data) + return {'sensor':name, 'error':None,'data':data} + except Exception,e: + return {'sensor':name, 'error':str(e), 'data':None} + |