diff options
author | Random Hacker <random_hacker@xapek.org> | 2013-02-24 16:44:17 +0100 |
---|---|---|
committer | Random Hacker <random_hacker@xapek.org> | 2013-02-24 16:44:17 +0100 |
commit | 37e334d24a099e5367597ee18ac0c6d5ae2fba32 (patch) | |
tree | 057a2ad096b44e8bda448d35ebd2a7f835673f05 /importtest.py | |
parent | 946eb7d95fc04d465802c8fc00e5d4130a52c8f2 (diff) | |
download | ebus-alt-37e334d24a099e5367597ee18ac0c6d5ae2fba32.tar.gz ebus-alt-37e334d24a099e5367597ee18ac0c6d5ae2fba32.zip |
hdf support
Diffstat (limited to 'importtest.py')
-rw-r--r-- | importtest.py | 216 |
1 files changed, 216 insertions, 0 deletions
diff --git a/importtest.py b/importtest.py new file mode 100644 index 0000000..c51aab9 --- /dev/null +++ b/importtest.py @@ -0,0 +1,216 @@ +# -*- coding: utf-8 -*- +import sys +import time +import psycopg2 +from ebus.datastore import Datastore +from subprocess import Popen,PIPE + + +conn = psycopg2.connect("dbname=ebus user=ebus") +cur = conn.cursor() +sql = """SELECT * from sensor""" +cur.execute(sql) +sensors = cur.fetchall() +sensors = map(lambda a: (a[0], a[1]), sensors) +sensors = dict(sensors) +cur.close() +conn.close() + +d = Datastore("testhdffiles") + + +pg_dump = Popen(["psql"], stdin=PIPE, stdout=PIPE) +pg_dump.stdin.write("COPY value TO stdout WITH DELIMITER '|' NULL AS '';\n") + +c=0 +for line in pg_dump.stdout.xreadlines(): + (oid, timestamp, sensor_id, type, value_float, value_int, value_string) = line.split("|") + name = sensors[int(sensor_id)] + # if value_float == "\N": value_float = None + # else: value_float = float(value_float) + # if value_int == "\N": value_int = None + # else: value_int = int(value_int) + # if value_string =="\N": value_string = None + # + + timestamp = time.strptime(timestamp.split(".")[0], "%Y-%m-%d %H:%M:%S") + timestamp = time.mktime(timestamp) + + c += 1 + if not c % 100000: + print c + d.flush() + + if value_int != '': + d.addValueInt(name, timestamp, int(value_int)) + elif value_float != '': + d.addValueFloat(name, timestamp, float(value_float)) + elif value_string != '': + d.addValueString(name, timestamp, value_string) + else: + print 'skip: %s' % (name, timestamp, value_int, value_float, value_string) + +d.close() + + #@app.route('/sensor/:name') + #def sensor_data(soup,name): + # try: + # sensor_id = soup.sensor.filter(soup.sensor.name == name).one().id + # + # conn = soup.connection() + # sql = text("""SELECT timestamp, COALESCE(value_int, value_float) as "value_real" + # FROM value + # WHERE sensor_id = :sensor_id + # ORDER BY timestamp DESC +from sqlsoup import SQLSoup as SqlSoup + # LIMIT 1""") + # value = conn.execute(sql, sensor_id=sensor_id).first() + # + # return {'sensor':name,'data':[maketime(value.timestamp), float(value.value_real)], 'error':None} + # except Exception,e: + # return {'sensor':name,'data':None, 'error':str(e)} + # + #@app.route('/sensor/:name/:startdate/:enddate') + #def sensor_data_fromto(soup,name,startdate,enddate): + # try: + # interval = float(enddate) - float(startdate) + # modulo = interval / 500 #500 values + # + # startdate = datetime.datetime.fromtimestamp(float(startdate)) + # enddate = datetime.datetime.fromtimestamp(float(enddate)) + # + # if interval <= 0: raise Exception("Invalid interval") + # if interval >= 14 * 24 * 60 * 60: raise Exception("interval too big") + # + # sensor_id = soup.sensor.filter(soup.sensor.name == name).one().id + # + # conn = soup.connection() + # sql = text(""" + # SELECT to_timestamp( extract(epoch from timestamp)::int - extract(epoch from timestamp)::int % :modulo ) "round_timestamp", + # AVG(COALESCE(value_int,value_float)) "value_real" + # FROM value + # WHERE timestamp > :startdate + # AND timestamp < :enddate + # AND sensor_id = :sensor_id + # GROUP BY "round_timestamp" + # ORDER BY "round_timestamp" + # """) + # + # values = conn.execute(sql, + # sensor_id=sensor_id, + # startdate=startdate, + # enddate=enddate, + # modulo=modulo).fetchall() + # + # values = map(lambda row: (maketime(row.round_timestamp), row.value_real), + # values) + # + # return {'sensor':name,'data':values, 'error':None} + # except Exception,e: + # return {'sensor':name, 'data':None, 'error':str(e)} + # + #@app.route('/sensor_cached/:name/:timestamp_from') + #def sensor_data_cached_fromto(soup, name, timestamp_from): + # try: + # timestamp_from = datetime.datetime.fromtimestamp(float(timestamp_from)) + # sensor_id = soup.sensor.filter(soup.sensor.name == name).one().id + # + # # select data from cache-view + # conn = soup.connection() + # sql = text("""SELECT timestamp, value_real AS "value_real" + # FROM vi_value_cache + # WHERE timestamp >= :timestamp_from + # AND sensor_id = :sensor_id + # ORDER BY timestamp""") + # values = conn.execute(sql, timestamp_from=timestamp_from, sensor_id=sensor_id).fetchall() + # values = map(lambda row: (maketime(row.timestamp), row.value_real.__float__()), + # values) + # + # return {'sensor':name,'data':values, 'error':None} + # except Exception,e: + # return {'sensor':name, 'data':None, 'error':str(e)} + # + #import select + #import psycopg2 + # + #@app.route('/stream') + #@app.route('/stream/:startdate') + #def stream(soup, startdate=None): + # connection = soup.connection() + # conn = connection.connection + # + # time_start = startdate != None and parsetime(float(startdate)) \ + # or now(connection) + # + # + # conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + # + # cursor = conn.cursor() + # cursor.execute("LISTEN evt_ebus_value_insert;") + # + # values = [] + # fails = 0 + # while fails < 5: + # if select.select([conn],[],[],10) == ([],[],[]): + # time_stop = now(connection) + # fails += 1 + # else: + # conn.poll() + # + # notify = conn.notifies.pop() + # if not notify: + # continue + # + # time_stop = now(connection) + # print "time_stop %s"%time_stop + # sql = text("""SELECT sensor.name, + # value.timestamp, + # COALESCE(value.value_int,value.value_float) "value_real", + # value_string + # FROM value, sensor + # WHERE value.sensor_id = sensor.id + # AND timestamp >= :time_start + # AND timestamp < :time_stop""") + # values = map(lambda row: { + # "name":row.name, + # "timestamp":maketime(row.timestamp), + # "value_real":row.value_real != None and row.value_real.__float__() or None, + # "value_string":row.value_string + # }, + # connection.execute(sql, time_start=time_start, time_stop=time_stop)) + # + # break + # + # cursor.close() + # return {'time_start' : maketime(time_start), 'time_stop':maketime(time_stop), + # 'data':values} + # + #@app.route("/all_values") + #def all_values(soup): + # conn = soup.connection() + # + # sql = text(""" + # SELECT sensor.name, + # value.timestamp, + # COALESCE(value.value_int,value.value_float) "value_real", + # value_string + # FROM value, sensor, (SELECT MAX(timestamp) as timestamp, + # sensor_id + # FROM value + # WHERE timestamp > CURRENT_TIMESTAMP - '15 minutes'::interval + # GROUP BY sensor_id) last_value + # WHERE value.timestamp = last_value.timestamp + # AND value.sensor_id = last_value.sensor_id + # AND value.sensor_id = sensor.id""") + # + # time_stop = now(conn) + # values = map(lambda row: { + # "name":row.name, + # "timestamp":maketime(row.timestamp), + # "value_real":row.value_real != None and row.value_real.__float__() or None, + # "value_string":row.value_string}, + # conn.execute(sql)) + # + # return {'data':values, 'time_stop':maketime(time_stop)} + # + ## vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=python |