diff options
Diffstat (limited to 'datastore')
-rw-r--r-- | datastore/config.py | 5 | ||||
-rw-r--r-- | datastore/dump.py | 29 | ||||
-rw-r--r-- | datastore/store/channel.py | 2 | ||||
-rw-r--r-- | datastore/store/controller.py | 10 |
4 files changed, 41 insertions, 5 deletions
diff --git a/datastore/config.py b/datastore/config.py index 9f615d2..cd85f02 100644 --- a/datastore/config.py +++ b/datastore/config.py @@ -4,6 +4,9 @@ from store.processor import CountingProcessor CHANNELS={ 'org.xapek.test1' : PrinterChannel("org.xapek.test1"), 'org.xapek.test1.count' : SimpleMemoryChannel(), + 'arduino.a0' : PrinterChannel("a0"), + 'arduino.a0.dba' : PrinterChannel("a0.dba"), + 'arduino.a0.mv' : PrinterChannel("a0.mv"), } PROCESSORS={ @@ -12,7 +15,7 @@ PROCESSORS={ } -STOMP_HOST="localhost" +STOMP_HOST="10.2.2.26" STOMP_PORT=61613 STOMP_LOGIN="" STOMP_PASSCODE="" diff --git a/datastore/dump.py b/datastore/dump.py new file mode 100644 index 0000000..ac7ec29 --- /dev/null +++ b/datastore/dump.py @@ -0,0 +1,29 @@ +from time import sleep +from stompclient import PublishSubscribeClient +import config +from threading import Thread +from pprint import pprint + +client = PublishSubscribeClient(config.STOMP_HOST, config.STOMP_PORT) +listener = Thread(target=client.listen_forever, name='Frame-Receiver') +listener.start() +client.listening_event.wait() +r = client.connect(config.STOMP_LOGIN, config.STOMP_PASSCODE) +if not client.connection: + print r + exit(1) + + +def dump(x): + pprint(x.headers) + for line in x.body.split("\n"): + print "\t", + pprint(line) + +client.subscribe("/queue/queue-new", dump) + +try: + while True: + sleep(1) +except: + client.disconnect() diff --git a/datastore/store/channel.py b/datastore/store/channel.py index 2a5d5f4..1e083b5 100644 --- a/datastore/store/channel.py +++ b/datastore/store/channel.py @@ -7,7 +7,7 @@ class PrinterChannel(IChannel): def __init__(self,name): self.name = name def add(self,timestamp,data,controller): - print "{0}: {1},{2}".format(self.name, timestamp, data) + print "{0}: timestamp={1} value={2}".format(self.name, timestamp, data) class SimpleMemoryChannel(IChannel): def __init__(self): diff --git a/datastore/store/controller.py b/datastore/store/controller.py index 0814488..0d694d1 100644 --- a/datastore/store/controller.py +++ b/datastore/store/controller.py @@ -1,4 +1,5 @@ import traceback +import time class SubscriptionManager: def __init__(self): @@ -56,13 +57,16 @@ class StoreController: class MQStoreController(StoreController): def cb_put(self, frame): try: + if not frame.headers.has_key("timestamp"): + frame.headers['timestamp'] = str(time.time()) assert len(frame.timestamp) > 0 values = map(lambda kv: kv.split("="), frame.body.split("\n")) + values = filter(lambda x: len(x)==2, values) #filter non-pairs from empty lines for (name,data) in values: - assert self.channels.has_key(name) + if name not in self.channels.keys(): + print "No channel named {0}".format(name) + continue print name,"=",data - - for (name,data) in values: self.put(name, frame.timestamp, data) except Exception,e: traceback.print_exc() |