summaryrefslogtreecommitdiff
path: root/heap/ebus/webhdf/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'heap/ebus/webhdf/__init__.py')
-rw-r--r--heap/ebus/webhdf/__init__.py142
1 files changed, 142 insertions, 0 deletions
diff --git a/heap/ebus/webhdf/__init__.py b/heap/ebus/webhdf/__init__.py
new file mode 100644
index 0000000..44b5ab4
--- /dev/null
+++ b/heap/ebus/webhdf/__init__.py
@@ -0,0 +1,142 @@
+# -*- coding: utf-8 -*-
+import os
+import json
+import datetime
+import time
+import itertools
+import traceback
+import logging
+logging.basicConfig(level=logging.DEBUG)
+
+import numpy
+
+import bottle
+
+import ebus.datastore
+
+datastore = ebus.datastore.Datastore("hdf-data")
+app = bottle.Bottle("ebus")
+
+lastflushes = dict()
+
+def get_lastflush(name):
+ if name not in lastflushes:
+ lastflushes[name] = -1
+
+ return lastflushes[name]
+
+@app.route('/')
+def index_file():
+ return static_files("index.html")
+
+@app.route('/static/:filename#.+#')
+def static_files(filename):
+ return bottle.static_file(filename, root=os.path.join(os.path.dirname(__file__),"static"))
+
+
+@app.get('/sensor/:name')
+def sensor_data_get(name):
+ try:
+ table = datastore.getTable(name)
+ with datastore:
+ data = table.readSorted(sortby="timestamp", checkCSI=True, start=0, stop=1,step=-1).tolist()[0]
+ return {'sensor':name,'error':None,'data':data}
+ except Exception,e:
+ return {'sensor':name,'data':None, 'error':str(e)}
+
+@app.put('/sensor/:name')
+@app.put('/sensor/:name/:timestamp')
+def sensor_data_put(name,timestamp=None):
+ FLUSH_INTERVAL=240 #sec
+ global lastflush
+
+ if not timestamp: timestamp = int(time.time())
+ try:
+ value = bottle.request.POST.value
+ type = bottle.request.POST.type
+
+ if type == "int":
+ klass = ebus.datastore.ValueInt
+ elif type == "float":
+ klass = ebus.datastore.ValueFloat
+ elif type == "string":
+ klass = ebus.datastore.ValueString
+ elif type == "":
+ klass = None
+ else:
+ return {'error':'INVALID_TYPE', msg:'Type {0} is invalid'.format(type)}
+
+ msg = "Storing {0} of type {1} with timestamp {2} to {3}".format(value,type,timestamp,name)
+ logging.info(msg)
+
+ datastore.addValue(name, timestamp, value, klass)
+
+ if time.time() - FLUSH_INTERVAL > get_lastflush(name):
+ table = datastore.getTable(name)
+ with datastore.fileLock:
+ datastore.flush_table(name,table,close=False,index=True)
+ lastflushes[name] = time.time()
+
+ return {'error':None,'msg':msg}
+ except Exception,e:
+ traceback.print_exc()
+ logging.error("Error: " + "{0} name={1} value={2} type={3}".format(e, name, value, type))
+ return {'error':str(e),'msg':str(e)}
+
+
+
+@app.route('/sensor/:name/:startdate/:enddate')
+def sensor_name_start_end(name,startdate,enddate):
+ SAMPLING_STEPSIZE=500
+ NUMBER_OF_VALUES=250
+ try:
+ startdate, enddate = int(startdate), int(enddate)
+ logging.info("/sensor/ start={0} end={1}".format(startdate, enddate))
+ table=datastore.getTable(name)
+ with datastore:
+ i = table.where("(timestamp >= startdate) & (timestamp <= enddate)",
+ condvars={'startdate':startdate,'enddate':enddate,'timestamp':table.cols.timestamp},
+ step=SAMPLING_STEPSIZE)
+ timestamps = []
+ try:
+ for x in range(20):
+ i.next()
+ timestamps.append(i['timestamp'])
+ except: pass
+
+ if len(timestamps) > 10:
+ diff = map(lambda (x1,x2): (x2-x1)/SAMPLING_STEPSIZE, zip(timestamps[:-1], timestamps[1:]))
+ diff_avg = numpy.average(diff)
+ time_period = enddate - startdate
+ samples = time_period / diff_avg
+ step = numpy.ceil(samples / float(NUMBER_OF_VALUES))
+
+ data = [(x['timestamp'], x['value'])
+ for x in table.where("(timestamp >= startdate) & (timestamp <= enddate)", step=step)]
+ logging.info("diff={0} samples={1} step={2} len={3} ({4})".format(diff_avg, samples, step, len(data),name))
+ else:
+ # Not enough data, sampling approach useless, deliver all data
+ data = [(x['timestamp'], x['value']) for x in table.where("(timestamp >= startdate) & (timestamp <= enddate)")]
+ return {'sensor':name, 'error':None,'data':data}
+ except Exception,e:
+ logging.error("Error: " + str(e) + str(type(e)))
+ return {'sensor':name,'data':None, 'error':str(e)}
+
+@app.route('/avg/:name/:startdate')
+@app.route('/avg/:name/:startdate/:period')
+def sensor_avg_start(name, startdate, period=60*15): #15min
+ AVG_STEPSIZE=50
+ try:
+ startdate, enddate = int(startdate), int(time.time())
+ logging.info("/avg/ start={0} end={1}".format(startdate, enddate))
+ table=datastore.getTable(name)
+ with datastore:
+ sel_rows = table.where("(timestamp >= startdate) & (timestamp <=enddate)", step=AVG_STEPSIZE)
+ f_group = range(startdate, enddate, period)
+ data = map(lambda (group_id, grouped_rows): (group_id, numpy.average([row['value'] for row in grouped_rows])),
+ itertools.groupby(sel_rows, lambda t: (t['timestamp']/period)*period))
+ data = map(lambda (timestamp,value): (timestamp, value), data)
+ return {'sensor':name, 'error':None,'data':data}
+ except Exception,e:
+ return {'sensor':name, 'error':str(e), 'data':None}
+