summaryrefslogtreecommitdiff
path: root/heap/ebus/webhdf/__init__.py
blob: 44b5ab44344360f8f30829812d539026a17ecb7e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# -*- coding: utf-8 -*-
import os
import json
import datetime
import time
import itertools
import traceback
import logging
logging.basicConfig(level=logging.DEBUG)

import numpy

import bottle 

import ebus.datastore

datastore = ebus.datastore.Datastore("hdf-data")
app = bottle.Bottle("ebus")

lastflushes = dict()

def get_lastflush(name):
    if name not in lastflushes:
        lastflushes[name] = -1

    return lastflushes[name]

@app.route('/')
def index_file():
    return static_files("index.html")

@app.route('/static/:filename#.+#')
def static_files(filename):
    return bottle.static_file(filename, root=os.path.join(os.path.dirname(__file__),"static"))


@app.get('/sensor/:name')
def sensor_data_get(name):
    try:
        table = datastore.getTable(name)
        with datastore:
            data = table.readSorted(sortby="timestamp", checkCSI=True, start=0, stop=1,step=-1).tolist()[0]
            return {'sensor':name,'error':None,'data':data}
    except Exception,e:
        return {'sensor':name,'data':None, 'error':str(e)}

@app.put('/sensor/:name')
@app.put('/sensor/:name/:timestamp')
def sensor_data_put(name,timestamp=None):
    FLUSH_INTERVAL=240 #sec
    global lastflush

    if not timestamp: timestamp = int(time.time())
    try:
        value = bottle.request.POST.value
        type = bottle.request.POST.type

        if type == "int":
            klass = ebus.datastore.ValueInt
        elif type == "float":
            klass = ebus.datastore.ValueFloat
        elif type == "string":
            klass = ebus.datastore.ValueString
	elif type == "":
	    klass = None
        else:
            return {'error':'INVALID_TYPE', msg:'Type {0} is invalid'.format(type)}

	msg = "Storing {0} of type {1} with timestamp {2} to {3}".format(value,type,timestamp,name)
	logging.info(msg)

        datastore.addValue(name, timestamp, value, klass)

        if time.time() - FLUSH_INTERVAL > get_lastflush(name):
	    table = datastore.getTable(name)
            with datastore.fileLock:
                datastore.flush_table(name,table,close=False,index=True)
	    lastflushes[name] = time.time()

        return {'error':None,'msg':msg}
    except Exception,e:
	traceback.print_exc()
	logging.error("Error: " + "{0} name={1} value={2} type={3}".format(e, name, value, type))
        return {'error':str(e),'msg':str(e)}



@app.route('/sensor/:name/:startdate/:enddate')
def sensor_name_start_end(name,startdate,enddate):
    SAMPLING_STEPSIZE=500
    NUMBER_OF_VALUES=250
    try:
        startdate, enddate = int(startdate), int(enddate)
	logging.info("/sensor/ start={0} end={1}".format(startdate, enddate))
        table=datastore.getTable(name)
        with datastore:
	    i = table.where("(timestamp >= startdate) & (timestamp <= enddate)",
			    condvars={'startdate':startdate,'enddate':enddate,'timestamp':table.cols.timestamp},
			    step=SAMPLING_STEPSIZE)
	    timestamps = []
	    try:
	        for x in range(20):
		    i.next()
		    timestamps.append(i['timestamp'])
	    except: pass

	    if len(timestamps) > 10:
		    diff = map(lambda (x1,x2): (x2-x1)/SAMPLING_STEPSIZE, zip(timestamps[:-1], timestamps[1:]))
		    diff_avg = numpy.average(diff)
		    time_period = enddate - startdate
		    samples = time_period / diff_avg
		    step = numpy.ceil(samples / float(NUMBER_OF_VALUES))

		    data = [(x['timestamp'], x['value']) 
			    for x in table.where("(timestamp >= startdate) & (timestamp <= enddate)", step=step)]
		    logging.info("diff={0} samples={1} step={2} len={3} ({4})".format(diff_avg, samples, step, len(data),name))
	    else:
		    # Not enough data, sampling approach useless, deliver all data
		    data = [(x['timestamp'], x['value']) for x in table.where("(timestamp >= startdate) & (timestamp <= enddate)")]
            return {'sensor':name, 'error':None,'data':data}
    except Exception,e:
	logging.error("Error: " + str(e) + str(type(e)))
        return {'sensor':name,'data':None, 'error':str(e)}

@app.route('/avg/:name/:startdate')
@app.route('/avg/:name/:startdate/:period')
def sensor_avg_start(name, startdate, period=60*15): #15min
    AVG_STEPSIZE=50
    try:
        startdate, enddate = int(startdate), int(time.time())
	logging.info("/avg/ start={0} end={1}".format(startdate, enddate))
        table=datastore.getTable(name)
        with datastore:
            sel_rows = table.where("(timestamp >= startdate) & (timestamp <=enddate)", step=AVG_STEPSIZE)
            f_group = range(startdate, enddate, period)
            data = map(lambda (group_id, grouped_rows): (group_id, numpy.average([row['value'] for row in grouped_rows])),
                    itertools.groupby(sel_rows, lambda t: (t['timestamp']/period)*period))
            data = map(lambda (timestamp,value): (timestamp, value), data)
            return {'sensor':name, 'error':None,'data':data}
    except Exception,e:
            return {'sensor':name, 'error':str(e), 'data':None}