summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEbus-at-dockstar <ebus@dockstar>2013-03-08 01:46:12 +0100
committerEbus-at-dockstar <ebus@dockstar>2013-03-08 01:46:12 +0100
commitc3d899867b26192562d9e4797e09f73842df241d (patch)
tree51aee9f1ae41ee7c798b77dd032d4a2df5253241
parent604e92559787ed7e7b590321f6a1ff8fc515e06d (diff)
downloadebus-alt-c3d899867b26192562d9e4797e09f73842df241d.tar.gz
ebus-alt-c3d899867b26192562d9e4797e09f73842df241d.zip
flush every 30sec
-rw-r--r--ebus/datastore.py3
-rw-r--r--ebus/webhdf/__init__.py23
2 files changed, 20 insertions, 6 deletions
diff --git a/ebus/datastore.py b/ebus/datastore.py
index 930b4fb..3ac63e1 100644
--- a/ebus/datastore.py
+++ b/ebus/datastore.py
@@ -1,8 +1,8 @@
#
-
import os
from threading import Lock
import tables
+import logging
class ValueFloat(tables.IsDescription):
timestamp = tables.Time32Col() #index on Time64 is broken on pytables 2.4
@@ -97,3 +97,4 @@ class Datastore(object):
for file in self.files.values():
file.close()
+
diff --git a/ebus/webhdf/__init__.py b/ebus/webhdf/__init__.py
index c1f34dd..f661800 100644
--- a/ebus/webhdf/__init__.py
+++ b/ebus/webhdf/__init__.py
@@ -14,9 +14,10 @@ import bottle
import ebus.datastore
datastore = ebus.datastore.Datastore("hdf-data")
-
app = bottle.Bottle("ebus")
+lastflush = -1
+
@app.route('/')
def index_file():
return static_files("index.html")
@@ -39,6 +40,9 @@ def sensor_data_get(name):
@app.put('/sensor/:name')
@app.put('/sensor/:name/:timestamp')
def sensor_data_put(name,timestamp=None):
+ FLUSH_INTERVAL=30 #sec
+ global lastflush
+
if not timestamp: timestamp = int(time.time())
try:
value = bottle.request.POST.value
@@ -55,9 +59,16 @@ def sensor_data_put(name,timestamp=None):
else:
return {'error':'INVALID_TYPE', msg:'Type {0} is invalid'.format(type)}
- datastore.addValue(name, timestamp, value, klass, flush=True)
- msg = "Stored {0} of type {1} with timestamp {2} to {3}".format(value,type,timestamp,name)
+ msg = "Storing {0} of type {1} with timestamp {2} to {3}".format(value,type,timestamp,name)
logging.info(msg)
+
+ datastore.addValue(name, timestamp, value, klass, flush=False)
+
+ if time.time() - FLUSH_INTERVAL > lastflush:
+ logging.info("call datastore.flush()")
+ datastore.flush()
+ lastflush = time.time()
+
return {'error':None,'msg':msg}
except Exception,e:
logging.error("Error: " + "{0} name={1} value={2} type={3}".format(e, name, value, type))
@@ -68,6 +79,7 @@ def sensor_data_put(name,timestamp=None):
@app.route('/sensor/:name/:startdate/:enddate')
def sensor_name_start_end(name,startdate,enddate):
SAMPLING_STEPSIZE=500
+ NUMBER_OF_VALUES=250
try:
startdate, enddate = int(startdate), int(enddate)
logging.info("/sensor/ start={0} end={1}".format(startdate, enddate))
@@ -88,7 +100,7 @@ def sensor_name_start_end(name,startdate,enddate):
diff_avg = numpy.average(diff)
time_period = enddate - startdate
samples = time_period / diff_avg
- step = numpy.ceil(samples / 400.0)
+ step = numpy.ceil(samples / float(NUMBER_OF_VALUES))
data = [(x['timestamp'], x['value'])
for x in table.where("(timestamp >= startdate) & (timestamp <= enddate)", step=step)]
@@ -104,12 +116,13 @@ def sensor_name_start_end(name,startdate,enddate):
@app.route('/avg/:name/:startdate')
@app.route('/avg/:name/:startdate/:period')
def sensor_avg_start(name, startdate, period=60*15): #15min
+ AVG_STEPSIZE=50
try:
startdate, enddate = int(startdate), int(time.time())
logging.info("/avg/ start={0} end={1}".format(startdate, enddate))
table=datastore.getTable(name)
with datastore:
- sel_rows = table.where("(timestamp >= startdate) & (timestamp <=enddate)")
+ sel_rows = table.where("(timestamp >= startdate) & (timestamp <=enddate)", step=AVG_STEPSIZE)
f_group = range(startdate, enddate, period)
data = map(lambda (group_id, grouped_rows): (group_id, numpy.average([row['value'] for row in grouped_rows])),
itertools.groupby(sel_rows, lambda t: (t['timestamp']/period)*period))