1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
# -*- coding: utf-8 -*-
import os
import json
import datetime
import time
import itertools
import traceback
import logging
logging.basicConfig(level=logging.DEBUG)
import numpy
import bottle
import ebus.datastore
datastore = ebus.datastore.Datastore("hdf-data")
app = bottle.Bottle("ebus")
lastflushes = dict()
def get_lastflush(name):
if name not in lastflushes:
lastflushes[name] = -1
return lastflushes[name]
@app.route('/')
def index_file():
return static_files("index.html")
@app.route('/static/:filename#.+#')
def static_files(filename):
return bottle.static_file(filename, root=os.path.join(os.path.dirname(__file__),"static"))
@app.get('/sensor/:name')
def sensor_data_get(name):
try:
table = datastore.getTable(name)
with datastore:
data = table.readSorted(sortby="timestamp", checkCSI=True, start=0, stop=1,step=-1).tolist()[0]
return {'sensor':name,'error':None,'data':data}
except Exception,e:
return {'sensor':name,'data':None, 'error':str(e)}
@app.put('/sensor/:name')
@app.put('/sensor/:name/:timestamp')
def sensor_data_put(name,timestamp=None):
FLUSH_INTERVAL=240 #sec
global lastflush
if not timestamp: timestamp = int(time.time())
try:
value = bottle.request.POST.value
type = bottle.request.POST.type
if type == "int":
klass = ebus.datastore.ValueInt
elif type == "float":
klass = ebus.datastore.ValueFloat
elif type == "string":
klass = ebus.datastore.ValueString
elif type == "":
klass = None
else:
return {'error':'INVALID_TYPE', msg:'Type {0} is invalid'.format(type)}
msg = "Storing {0} of type {1} with timestamp {2} to {3}".format(value,type,timestamp,name)
logging.info(msg)
datastore.addValue(name, timestamp, value, klass)
if time.time() - FLUSH_INTERVAL > get_lastflush(name):
table = datastore.getTable(name)
with datastore.fileLock:
datastore.flush_table(name,table,close=False,index=True)
lastflushes[name] = time.time()
return {'error':None,'msg':msg}
except Exception,e:
traceback.print_exc()
logging.error("Error: " + "{0} name={1} value={2} type={3}".format(e, name, value, type))
return {'error':str(e),'msg':str(e)}
@app.route('/sensor/:name/:startdate/:enddate')
def sensor_name_start_end(name,startdate,enddate):
SAMPLING_STEPSIZE=500
NUMBER_OF_VALUES=250
try:
startdate, enddate = int(startdate), int(enddate)
logging.info("/sensor/ start={0} end={1}".format(startdate, enddate))
table=datastore.getTable(name)
with datastore:
i = table.where("(timestamp >= startdate) & (timestamp <= enddate)",
condvars={'startdate':startdate,'enddate':enddate,'timestamp':table.cols.timestamp},
step=SAMPLING_STEPSIZE)
timestamps = []
try:
for x in range(20):
i.next()
timestamps.append(i['timestamp'])
except: pass
if len(timestamps) > 10:
diff = map(lambda (x1,x2): (x2-x1)/SAMPLING_STEPSIZE, zip(timestamps[:-1], timestamps[1:]))
diff_avg = numpy.average(diff)
time_period = enddate - startdate
samples = time_period / diff_avg
step = numpy.ceil(samples / float(NUMBER_OF_VALUES))
data = [(x['timestamp'], x['value'])
for x in table.where("(timestamp >= startdate) & (timestamp <= enddate)", step=step)]
logging.info("diff={0} samples={1} step={2} len={3} ({4})".format(diff_avg, samples, step, len(data),name))
else:
# Not enough data, sampling approach useless, deliver all data
data = [(x['timestamp'], x['value']) for x in table.where("(timestamp >= startdate) & (timestamp <= enddate)")]
return {'sensor':name, 'error':None,'data':data}
except Exception,e:
logging.error("Error: " + str(e) + str(type(e)))
return {'sensor':name,'data':None, 'error':str(e)}
@app.route('/avg/:name/:startdate')
@app.route('/avg/:name/:startdate/:period')
def sensor_avg_start(name, startdate, period=60*15): #15min
AVG_STEPSIZE=50
try:
startdate, enddate = int(startdate), int(time.time())
logging.info("/avg/ start={0} end={1}".format(startdate, enddate))
table=datastore.getTable(name)
with datastore:
sel_rows = table.where("(timestamp >= startdate) & (timestamp <=enddate)", step=AVG_STEPSIZE)
f_group = range(startdate, enddate, period)
data = map(lambda (group_id, grouped_rows): (group_id, numpy.average([row['value'] for row in grouped_rows])),
itertools.groupby(sel_rows, lambda t: (t['timestamp']/period)*period))
data = map(lambda (timestamp,value): (timestamp, value), data)
return {'sensor':name, 'error':None,'data':data}
except Exception,e:
return {'sensor':name, 'error':str(e), 'data':None}
|