diff options
Diffstat (limited to 'datastore/store/controller.py')
-rw-r--r-- | datastore/store/controller.py | 89 |
1 files changed, 89 insertions, 0 deletions
diff --git a/datastore/store/controller.py b/datastore/store/controller.py new file mode 100644 index 0000000..0814488 --- /dev/null +++ b/datastore/store/controller.py @@ -0,0 +1,89 @@ +import traceback + +class SubscriptionManager: + def __init__(self): + self.s = {} + + def subscribe(self,name,replyTo): + if not self.s.has_key(name): + self.s[name] = [] + assert self.s.has_key(name) + + if replyTo not in self.s[name]: + self.s[name].append(replyTo) + + def unsubscribe(self,name,replyTo): + assert self.s.has_key(name) + assert replyTo in self.s[name] + self.s[name].remove(replyTo) + + def notify(self, name, timestamp, data, controller): + if not self.s.has_key(name): + return + for replyTo in self.s[name]: + controller.client.send(replyTo, + "{0}={1}".format(name,dat), + extra_headers={'timestamp':timestamp}) + +class StoreController: + def __init__(self, client, channels, processors): + self.client = client + self.channels = channels + self.processors = processors + self.subscriptions = SubscriptionManager() + + def put(self, name, timestamp, data): + assert self.channels.has_key(name) + + self.channels[name].add(timestamp, data, self) + self.subscriptions.notify(name, timestamp, data, self) + if self.processors.has_key(name): + for processor in self.processors[name]: + processor.process(name, timestamp, data, self) + + def get(self, name, query): + assert self.channels.has_key(name) + return self.channels[name].get(query) + + def subscribe(self,name, replyTo): + assert self.channels.has_key(frame.name) + self.subscriptions.subscribe(name, replyTo) + + def unsubscribe(self,name,replyTo): + assert self.channels.has_key(frame.name) + self.subscriptions.unsubscribe(name,replyTo) + +class MQStoreController(StoreController): + def cb_put(self, frame): + try: + assert len(frame.timestamp) > 0 + values = map(lambda kv: kv.split("="), frame.body.split("\n")) + for (name,data) in values: + assert self.channels.has_key(name) + print name,"=",data + + for (name,data) in values: + self.put(name, frame.timestamp, data) + except Exception,e: traceback.print_exc() + + + def cb_get(self, frame): + try: + assert len(frame.headers['reply-to']) > 0 + r = self.get(frame.name, frame.body) + print "send",frame.name,r + self.client.send(frame.headers['reply-to'], + body=r) + except Exception,e: traceback.print_exc() + + + def cb_subscribe(self, frame): + try: + assert len(frame.headers['reply-to']) > 0 + assert frame.action in ["subscribe","unsubscribe"] + + if frame.action == "subscribe": + self.subscribe(frame.name, frame.headers['reply-to']) + elif frame.action == "unsubscribe": + self.unsubscribe(frame.name, frame.headers['reply-to']) + except Exception,e: traceback.print_exc() |