summaryrefslogtreecommitdiff
path: root/datastore/store/controller.py
diff options
context:
space:
mode:
authorYves Fischer <yvesf-git@xapek.org>2013-01-11 20:01:09 +0100
committerYves Fischer <yvesf-git@xapek.org>2013-01-11 20:01:09 +0100
commit934d642b410d75dae1966c993fc61c371959876e (patch)
treeca9bcd22d98d88424be0d21d6387ddba779a9daa /datastore/store/controller.py
parent69fbdff8e8ba69034111038ce01184ef5472728a (diff)
downloadebus-alt-934d642b410d75dae1966c993fc61c371959876e.tar.gz
ebus-alt-934d642b410d75dae1966c993fc61c371959876e.zip
datastore
Diffstat (limited to 'datastore/store/controller.py')
-rw-r--r--datastore/store/controller.py89
1 files changed, 89 insertions, 0 deletions
diff --git a/datastore/store/controller.py b/datastore/store/controller.py
new file mode 100644
index 0000000..0814488
--- /dev/null
+++ b/datastore/store/controller.py
@@ -0,0 +1,89 @@
+import traceback
+
+class SubscriptionManager:
+ def __init__(self):
+ self.s = {}
+
+ def subscribe(self,name,replyTo):
+ if not self.s.has_key(name):
+ self.s[name] = []
+ assert self.s.has_key(name)
+
+ if replyTo not in self.s[name]:
+ self.s[name].append(replyTo)
+
+ def unsubscribe(self,name,replyTo):
+ assert self.s.has_key(name)
+ assert replyTo in self.s[name]
+ self.s[name].remove(replyTo)
+
+ def notify(self, name, timestamp, data, controller):
+ if not self.s.has_key(name):
+ return
+ for replyTo in self.s[name]:
+ controller.client.send(replyTo,
+ "{0}={1}".format(name,dat),
+ extra_headers={'timestamp':timestamp})
+
+class StoreController:
+ def __init__(self, client, channels, processors):
+ self.client = client
+ self.channels = channels
+ self.processors = processors
+ self.subscriptions = SubscriptionManager()
+
+ def put(self, name, timestamp, data):
+ assert self.channels.has_key(name)
+
+ self.channels[name].add(timestamp, data, self)
+ self.subscriptions.notify(name, timestamp, data, self)
+ if self.processors.has_key(name):
+ for processor in self.processors[name]:
+ processor.process(name, timestamp, data, self)
+
+ def get(self, name, query):
+ assert self.channels.has_key(name)
+ return self.channels[name].get(query)
+
+ def subscribe(self,name, replyTo):
+ assert self.channels.has_key(frame.name)
+ self.subscriptions.subscribe(name, replyTo)
+
+ def unsubscribe(self,name,replyTo):
+ assert self.channels.has_key(frame.name)
+ self.subscriptions.unsubscribe(name,replyTo)
+
+class MQStoreController(StoreController):
+ def cb_put(self, frame):
+ try:
+ assert len(frame.timestamp) > 0
+ values = map(lambda kv: kv.split("="), frame.body.split("\n"))
+ for (name,data) in values:
+ assert self.channels.has_key(name)
+ print name,"=",data
+
+ for (name,data) in values:
+ self.put(name, frame.timestamp, data)
+ except Exception,e: traceback.print_exc()
+
+
+ def cb_get(self, frame):
+ try:
+ assert len(frame.headers['reply-to']) > 0
+ r = self.get(frame.name, frame.body)
+ print "send",frame.name,r
+ self.client.send(frame.headers['reply-to'],
+ body=r)
+ except Exception,e: traceback.print_exc()
+
+
+ def cb_subscribe(self, frame):
+ try:
+ assert len(frame.headers['reply-to']) > 0
+ assert frame.action in ["subscribe","unsubscribe"]
+
+ if frame.action == "subscribe":
+ self.subscribe(frame.name, frame.headers['reply-to'])
+ elif frame.action == "unsubscribe":
+ self.unsubscribe(frame.name, frame.headers['reply-to'])
+ except Exception,e: traceback.print_exc()