summaryrefslogtreecommitdiff
path: root/heap/datastore/store/controller.py
diff options
context:
space:
mode:
Diffstat (limited to 'heap/datastore/store/controller.py')
-rw-r--r--heap/datastore/store/controller.py93
1 files changed, 93 insertions, 0 deletions
diff --git a/heap/datastore/store/controller.py b/heap/datastore/store/controller.py
new file mode 100644
index 0000000..0d694d1
--- /dev/null
+++ b/heap/datastore/store/controller.py
@@ -0,0 +1,93 @@
+import traceback
+import time
+
+class SubscriptionManager:
+ def __init__(self):
+ self.s = {}
+
+ def subscribe(self,name,replyTo):
+ if not self.s.has_key(name):
+ self.s[name] = []
+ assert self.s.has_key(name)
+
+ if replyTo not in self.s[name]:
+ self.s[name].append(replyTo)
+
+ def unsubscribe(self,name,replyTo):
+ assert self.s.has_key(name)
+ assert replyTo in self.s[name]
+ self.s[name].remove(replyTo)
+
+ def notify(self, name, timestamp, data, controller):
+ if not self.s.has_key(name):
+ return
+ for replyTo in self.s[name]:
+ controller.client.send(replyTo,
+ "{0}={1}".format(name,dat),
+ extra_headers={'timestamp':timestamp})
+
+class StoreController:
+ def __init__(self, client, channels, processors):
+ self.client = client
+ self.channels = channels
+ self.processors = processors
+ self.subscriptions = SubscriptionManager()
+
+ def put(self, name, timestamp, data):
+ assert self.channels.has_key(name)
+
+ self.channels[name].add(timestamp, data, self)
+ self.subscriptions.notify(name, timestamp, data, self)
+ if self.processors.has_key(name):
+ for processor in self.processors[name]:
+ processor.process(name, timestamp, data, self)
+
+ def get(self, name, query):
+ assert self.channels.has_key(name)
+ return self.channels[name].get(query)
+
+ def subscribe(self,name, replyTo):
+ assert self.channels.has_key(frame.name)
+ self.subscriptions.subscribe(name, replyTo)
+
+ def unsubscribe(self,name,replyTo):
+ assert self.channels.has_key(frame.name)
+ self.subscriptions.unsubscribe(name,replyTo)
+
+class MQStoreController(StoreController):
+ def cb_put(self, frame):
+ try:
+ if not frame.headers.has_key("timestamp"):
+ frame.headers['timestamp'] = str(time.time())
+ assert len(frame.timestamp) > 0
+ values = map(lambda kv: kv.split("="), frame.body.split("\n"))
+ values = filter(lambda x: len(x)==2, values) #filter non-pairs from empty lines
+ for (name,data) in values:
+ if name not in self.channels.keys():
+ print "No channel named {0}".format(name)
+ continue
+ print name,"=",data
+ self.put(name, frame.timestamp, data)
+ except Exception,e: traceback.print_exc()
+
+
+ def cb_get(self, frame):
+ try:
+ assert len(frame.headers['reply-to']) > 0
+ r = self.get(frame.name, frame.body)
+ print "send",frame.name,r
+ self.client.send(frame.headers['reply-to'],
+ body=r)
+ except Exception,e: traceback.print_exc()
+
+
+ def cb_subscribe(self, frame):
+ try:
+ assert len(frame.headers['reply-to']) > 0
+ assert frame.action in ["subscribe","unsubscribe"]
+
+ if frame.action == "subscribe":
+ self.subscribe(frame.name, frame.headers['reply-to'])
+ elif frame.action == "unsubscribe":
+ self.unsubscribe(frame.name, frame.headers['reply-to'])
+ except Exception,e: traceback.print_exc()