summaryrefslogtreecommitdiff
path: root/datastore
diff options
context:
space:
mode:
authorYves Fischer <yvesf-git@xapek.org>2013-01-20 23:53:52 +0100
committerYves Fischer <yvesf-git@xapek.org>2013-01-20 23:53:52 +0100
commit1068ff9bf74cf411be701bb50fae92e8476b5da7 (patch)
tree5f54b2cb37bace46a0508544ec5116468c8f584b /datastore
parenta40afda64c61089d6c108eea9cb3eac8aeaaa072 (diff)
downloadebus-alt-1068ff9bf74cf411be701bb50fae92e8476b5da7.tar.gz
ebus-alt-1068ff9bf74cf411be701bb50fae92e8476b5da7.zip
datastore foo
Diffstat (limited to 'datastore')
-rw-r--r--datastore/config.py5
-rw-r--r--datastore/dump.py29
-rw-r--r--datastore/store/channel.py2
-rw-r--r--datastore/store/controller.py10
4 files changed, 41 insertions, 5 deletions
diff --git a/datastore/config.py b/datastore/config.py
index 9f615d2..cd85f02 100644
--- a/datastore/config.py
+++ b/datastore/config.py
@@ -4,6 +4,9 @@ from store.processor import CountingProcessor
CHANNELS={
'org.xapek.test1' : PrinterChannel("org.xapek.test1"),
'org.xapek.test1.count' : SimpleMemoryChannel(),
+ 'arduino.a0' : PrinterChannel("a0"),
+ 'arduino.a0.dba' : PrinterChannel("a0.dba"),
+ 'arduino.a0.mv' : PrinterChannel("a0.mv"),
}
PROCESSORS={
@@ -12,7 +15,7 @@ PROCESSORS={
}
-STOMP_HOST="localhost"
+STOMP_HOST="10.2.2.26"
STOMP_PORT=61613
STOMP_LOGIN=""
STOMP_PASSCODE=""
diff --git a/datastore/dump.py b/datastore/dump.py
new file mode 100644
index 0000000..ac7ec29
--- /dev/null
+++ b/datastore/dump.py
@@ -0,0 +1,29 @@
+from time import sleep
+from stompclient import PublishSubscribeClient
+import config
+from threading import Thread
+from pprint import pprint
+
+client = PublishSubscribeClient(config.STOMP_HOST, config.STOMP_PORT)
+listener = Thread(target=client.listen_forever, name='Frame-Receiver')
+listener.start()
+client.listening_event.wait()
+r = client.connect(config.STOMP_LOGIN, config.STOMP_PASSCODE)
+if not client.connection:
+ print r
+ exit(1)
+
+
+def dump(x):
+ pprint(x.headers)
+ for line in x.body.split("\n"):
+ print "\t",
+ pprint(line)
+
+client.subscribe("/queue/queue-new", dump)
+
+try:
+ while True:
+ sleep(1)
+except:
+ client.disconnect()
diff --git a/datastore/store/channel.py b/datastore/store/channel.py
index 2a5d5f4..1e083b5 100644
--- a/datastore/store/channel.py
+++ b/datastore/store/channel.py
@@ -7,7 +7,7 @@ class PrinterChannel(IChannel):
def __init__(self,name):
self.name = name
def add(self,timestamp,data,controller):
- print "{0}: {1},{2}".format(self.name, timestamp, data)
+ print "{0}: timestamp={1} value={2}".format(self.name, timestamp, data)
class SimpleMemoryChannel(IChannel):
def __init__(self):
diff --git a/datastore/store/controller.py b/datastore/store/controller.py
index 0814488..0d694d1 100644
--- a/datastore/store/controller.py
+++ b/datastore/store/controller.py
@@ -1,4 +1,5 @@
import traceback
+import time
class SubscriptionManager:
def __init__(self):
@@ -56,13 +57,16 @@ class StoreController:
class MQStoreController(StoreController):
def cb_put(self, frame):
try:
+ if not frame.headers.has_key("timestamp"):
+ frame.headers['timestamp'] = str(time.time())
assert len(frame.timestamp) > 0
values = map(lambda kv: kv.split("="), frame.body.split("\n"))
+ values = filter(lambda x: len(x)==2, values) #filter non-pairs from empty lines
for (name,data) in values:
- assert self.channels.has_key(name)
+ if name not in self.channels.keys():
+ print "No channel named {0}".format(name)
+ continue
print name,"=",data
-
- for (name,data) in values:
self.put(name, frame.timestamp, data)
except Exception,e: traceback.print_exc()