# -*- coding: utf-8 -*- import sys import time import psycopg2 from ebus.datastore import Datastore from subprocess import Popen,PIPE conn = psycopg2.connect("dbname=ebus user=ebus") cur = conn.cursor() sql = """SELECT * from sensor""" cur.execute(sql) sensors = cur.fetchall() sensors = map(lambda a: (a[0], a[1]), sensors) sensors = dict(sensors) cur.close() conn.close() d = Datastore("testhdffiles") pg_dump = Popen(["psql"], stdin=PIPE, stdout=PIPE) pg_dump.stdin.write("COPY value TO stdout WITH DELIMITER '|' NULL AS '';\n") c=0 for line in pg_dump.stdout.xreadlines(): (oid, timestamp, sensor_id, type, value_float, value_int, value_string) = line.split("|") name = sensors[int(sensor_id)] # if value_float == "\N": value_float = None # else: value_float = float(value_float) # if value_int == "\N": value_int = None # else: value_int = int(value_int) # if value_string =="\N": value_string = None # timestamp = time.strptime(timestamp.split(".")[0], "%Y-%m-%d %H:%M:%S") timestamp = time.mktime(timestamp) c += 1 if not c % 100000: print c d.flush() if value_int != '': d.addValueInt(name, timestamp, int(value_int)) elif value_float != '': d.addValueFloat(name, timestamp, float(value_float)) elif value_string != '': d.addValueString(name, timestamp, value_string) else: print 'skip: %s' % (name, timestamp, value_int, value_float, value_string) d.close() #@app.route('/sensor/:name') #def sensor_data(soup,name): # try: # sensor_id = soup.sensor.filter(soup.sensor.name == name).one().id # # conn = soup.connection() # sql = text("""SELECT timestamp, COALESCE(value_int, value_float) as "value_real" # FROM value # WHERE sensor_id = :sensor_id # ORDER BY timestamp DESC from sqlsoup import SQLSoup as SqlSoup # LIMIT 1""") # value = conn.execute(sql, sensor_id=sensor_id).first() # # return {'sensor':name,'data':[maketime(value.timestamp), float(value.value_real)], 'error':None} # except Exception,e: # return {'sensor':name,'data':None, 'error':str(e)} # #@app.route('/sensor/:name/:startdate/:enddate') #def sensor_data_fromto(soup,name,startdate,enddate): # try: # interval = float(enddate) - float(startdate) # modulo = interval / 500 #500 values # # startdate = datetime.datetime.fromtimestamp(float(startdate)) # enddate = datetime.datetime.fromtimestamp(float(enddate)) # # if interval <= 0: raise Exception("Invalid interval") # if interval >= 14 * 24 * 60 * 60: raise Exception("interval too big") # # sensor_id = soup.sensor.filter(soup.sensor.name == name).one().id # # conn = soup.connection() # sql = text(""" # SELECT to_timestamp( extract(epoch from timestamp)::int - extract(epoch from timestamp)::int % :modulo ) "round_timestamp", # AVG(COALESCE(value_int,value_float)) "value_real" # FROM value # WHERE timestamp > :startdate # AND timestamp < :enddate # AND sensor_id = :sensor_id # GROUP BY "round_timestamp" # ORDER BY "round_timestamp" # """) # # values = conn.execute(sql, # sensor_id=sensor_id, # startdate=startdate, # enddate=enddate, # modulo=modulo).fetchall() # # values = map(lambda row: (maketime(row.round_timestamp), row.value_real), # values) # # return {'sensor':name,'data':values, 'error':None} # except Exception,e: # return {'sensor':name, 'data':None, 'error':str(e)} # #@app.route('/sensor_cached/:name/:timestamp_from') #def sensor_data_cached_fromto(soup, name, timestamp_from): # try: # timestamp_from = datetime.datetime.fromtimestamp(float(timestamp_from)) # sensor_id = soup.sensor.filter(soup.sensor.name == name).one().id # # # select data from cache-view # conn = soup.connection() # sql = text("""SELECT timestamp, value_real AS "value_real" # FROM vi_value_cache # WHERE timestamp >= :timestamp_from # AND sensor_id = :sensor_id # ORDER BY timestamp""") # values = conn.execute(sql, timestamp_from=timestamp_from, sensor_id=sensor_id).fetchall() # values = map(lambda row: (maketime(row.timestamp), row.value_real.__float__()), # values) # # return {'sensor':name,'data':values, 'error':None} # except Exception,e: # return {'sensor':name, 'data':None, 'error':str(e)} # #import select #import psycopg2 # #@app.route('/stream') #@app.route('/stream/:startdate') #def stream(soup, startdate=None): # connection = soup.connection() # conn = connection.connection # # time_start = startdate != None and parsetime(float(startdate)) \ # or now(connection) # # # conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) # # cursor = conn.cursor() # cursor.execute("LISTEN evt_ebus_value_insert;") # # values = [] # fails = 0 # while fails < 5: # if select.select([conn],[],[],10) == ([],[],[]): # time_stop = now(connection) # fails += 1 # else: # conn.poll() # # notify = conn.notifies.pop() # if not notify: # continue # # time_stop = now(connection) # print "time_stop %s"%time_stop # sql = text("""SELECT sensor.name, # value.timestamp, # COALESCE(value.value_int,value.value_float) "value_real", # value_string # FROM value, sensor # WHERE value.sensor_id = sensor.id # AND timestamp >= :time_start # AND timestamp < :time_stop""") # values = map(lambda row: { # "name":row.name, # "timestamp":maketime(row.timestamp), # "value_real":row.value_real != None and row.value_real.__float__() or None, # "value_string":row.value_string # }, # connection.execute(sql, time_start=time_start, time_stop=time_stop)) # # break # # cursor.close() # return {'time_start' : maketime(time_start), 'time_stop':maketime(time_stop), # 'data':values} # #@app.route("/all_values") #def all_values(soup): # conn = soup.connection() # # sql = text(""" # SELECT sensor.name, # value.timestamp, # COALESCE(value.value_int,value.value_float) "value_real", # value_string # FROM value, sensor, (SELECT MAX(timestamp) as timestamp, # sensor_id # FROM value # WHERE timestamp > CURRENT_TIMESTAMP - '15 minutes'::interval # GROUP BY sensor_id) last_value # WHERE value.timestamp = last_value.timestamp # AND value.sensor_id = last_value.sensor_id # AND value.sensor_id = sensor.id""") # # time_stop = now(conn) # values = map(lambda row: { # "name":row.name, # "timestamp":maketime(row.timestamp), # "value_real":row.value_real != None and row.value_real.__float__() or None, # "value_string":row.value_string}, # conn.execute(sql)) # # return {'data':values, 'time_stop':maketime(time_stop)} # ## vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=python