import traceback class SubscriptionManager: def __init__(self): self.s = {} def subscribe(self,name,replyTo): if not self.s.has_key(name): self.s[name] = [] assert self.s.has_key(name) if replyTo not in self.s[name]: self.s[name].append(replyTo) def unsubscribe(self,name,replyTo): assert self.s.has_key(name) assert replyTo in self.s[name] self.s[name].remove(replyTo) def notify(self, name, timestamp, data, controller): if not self.s.has_key(name): return for replyTo in self.s[name]: controller.client.send(replyTo, "{0}={1}".format(name,dat), extra_headers={'timestamp':timestamp}) class StoreController: def __init__(self, client, channels, processors): self.client = client self.channels = channels self.processors = processors self.subscriptions = SubscriptionManager() def put(self, name, timestamp, data): assert self.channels.has_key(name) self.channels[name].add(timestamp, data, self) self.subscriptions.notify(name, timestamp, data, self) if self.processors.has_key(name): for processor in self.processors[name]: processor.process(name, timestamp, data, self) def get(self, name, query): assert self.channels.has_key(name) return self.channels[name].get(query) def subscribe(self,name, replyTo): assert self.channels.has_key(frame.name) self.subscriptions.subscribe(name, replyTo) def unsubscribe(self,name,replyTo): assert self.channels.has_key(frame.name) self.subscriptions.unsubscribe(name,replyTo) class MQStoreController(StoreController): def cb_put(self, frame): try: assert len(frame.timestamp) > 0 values = map(lambda kv: kv.split("="), frame.body.split("\n")) for (name,data) in values: assert self.channels.has_key(name) print name,"=",data for (name,data) in values: self.put(name, frame.timestamp, data) except Exception,e: traceback.print_exc() def cb_get(self, frame): try: assert len(frame.headers['reply-to']) > 0 r = self.get(frame.name, frame.body) print "send",frame.name,r self.client.send(frame.headers['reply-to'], body=r) except Exception,e: traceback.print_exc() def cb_subscribe(self, frame): try: assert len(frame.headers['reply-to']) > 0 assert frame.action in ["subscribe","unsubscribe"] if frame.action == "subscribe": self.subscribe(frame.name, frame.headers['reply-to']) elif frame.action == "unsubscribe": self.unsubscribe(frame.name, frame.headers['reply-to']) except Exception,e: traceback.print_exc()