From 604e92559787ed7e7b590321f6a1ff8fc515e06d Mon Sep 17 00:00:00 2001 From: Ebus-at-dockstar Date: Thu, 7 Mar 2013 14:20:42 +0100 Subject: python datastore + webhdf --- ebus/webhdf/__init__.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) (limited to 'ebus/webhdf/__init__.py') diff --git a/ebus/webhdf/__init__.py b/ebus/webhdf/__init__.py index af13f87..c1f34dd 100644 --- a/ebus/webhdf/__init__.py +++ b/ebus/webhdf/__init__.py @@ -13,7 +13,7 @@ import bottle import ebus.datastore -datastore = ebus.datastore.Datastore("testhdffiles") +datastore = ebus.datastore.Datastore("hdf-data") app = bottle.Bottle("ebus") @@ -50,6 +50,8 @@ def sensor_data_put(name,timestamp=None): klass = ebus.datastore.ValueFloat elif type == "string": klass = ebus.datastore.ValueString + elif type == "": + klass = None else: return {'error':'INVALID_TYPE', msg:'Type {0} is invalid'.format(type)} @@ -58,38 +60,42 @@ def sensor_data_put(name,timestamp=None): logging.info(msg) return {'error':None,'msg':msg} except Exception,e: - return {'error':e,'msg':e} + logging.error("Error: " + "{0} name={1} value={2} type={3}".format(e, name, value, type)) + return {'error':str(e),'msg':str(e)} @app.route('/sensor/:name/:startdate/:enddate') def sensor_name_start_end(name,startdate,enddate): + SAMPLING_STEPSIZE=500 try: startdate, enddate = int(startdate), int(enddate) logging.info("/sensor/ start={0} end={1}".format(startdate, enddate)) table=datastore.getTable(name) with datastore: - i = table.where("(timestamp >= startdate) & (timestamp <= enddate)",step=100) + i = table.where("(timestamp >= startdate) & (timestamp <= enddate)", + condvars={'startdate':startdate,'enddate':enddate,'timestamp':table.cols.timestamp}, + step=SAMPLING_STEPSIZE) timestamps = [] try: for x in range(20): i.next() timestamps.append(i['timestamp']) - except: - pass + except: pass + if len(timestamps) > 10: - diff = map(lambda (x1,x2): (x2-x1)/100, zip(timestamps[:-1], timestamps[1:])) + diff = map(lambda (x1,x2): (x2-x1)/SAMPLING_STEPSIZE, zip(timestamps[:-1], timestamps[1:])) diff_avg = numpy.average(diff) time_period = enddate - startdate samples = time_period / diff_avg step = numpy.ceil(samples / 400.0) - data = [(x['timestamp']*1000, x['value']) + data = [(x['timestamp'], x['value']) for x in table.where("(timestamp >= startdate) & (timestamp <= enddate)", step=step)] logging.info("diff={0} samples={1} step={2} len={3} ({4})".format(diff_avg, samples, step, len(data),name)) else: - logging.info("No data found ({0})".format(name)) - data = [] + # Not enough data, sampling approach useless, deliver all data + data = [(x['timestamp'], x['value']) for x in table.where("(timestamp >= startdate) & (timestamp <= enddate)")] return {'sensor':name, 'error':None,'data':data} except Exception,e: logging.error("Error: " + str(e) + str(type(e))) @@ -107,7 +113,7 @@ def sensor_avg_start(name, startdate, period=60*15): #15min f_group = range(startdate, enddate, period) data = map(lambda (group_id, grouped_rows): (group_id, numpy.average([row['value'] for row in grouped_rows])), itertools.groupby(sel_rows, lambda t: (t['timestamp']/period)*period)) - data = map(lambda (timestamp,value): (timestamp*1000, value), data) + data = map(lambda (timestamp,value): (timestamp, value), data) return {'sensor':name, 'error':None,'data':data} except Exception,e: return {'sensor':name, 'error':str(e), 'data':None} -- cgit v1.2.1