diff options
Diffstat (limited to 'datastore/store/controller.py')
-rw-r--r-- | datastore/store/controller.py | 93 |
1 files changed, 0 insertions, 93 deletions
diff --git a/datastore/store/controller.py b/datastore/store/controller.py deleted file mode 100644 index 0d694d1..0000000 --- a/datastore/store/controller.py +++ /dev/null @@ -1,93 +0,0 @@ -import traceback -import time - -class SubscriptionManager: - def __init__(self): - self.s = {} - - def subscribe(self,name,replyTo): - if not self.s.has_key(name): - self.s[name] = [] - assert self.s.has_key(name) - - if replyTo not in self.s[name]: - self.s[name].append(replyTo) - - def unsubscribe(self,name,replyTo): - assert self.s.has_key(name) - assert replyTo in self.s[name] - self.s[name].remove(replyTo) - - def notify(self, name, timestamp, data, controller): - if not self.s.has_key(name): - return - for replyTo in self.s[name]: - controller.client.send(replyTo, - "{0}={1}".format(name,dat), - extra_headers={'timestamp':timestamp}) - -class StoreController: - def __init__(self, client, channels, processors): - self.client = client - self.channels = channels - self.processors = processors - self.subscriptions = SubscriptionManager() - - def put(self, name, timestamp, data): - assert self.channels.has_key(name) - - self.channels[name].add(timestamp, data, self) - self.subscriptions.notify(name, timestamp, data, self) - if self.processors.has_key(name): - for processor in self.processors[name]: - processor.process(name, timestamp, data, self) - - def get(self, name, query): - assert self.channels.has_key(name) - return self.channels[name].get(query) - - def subscribe(self,name, replyTo): - assert self.channels.has_key(frame.name) - self.subscriptions.subscribe(name, replyTo) - - def unsubscribe(self,name,replyTo): - assert self.channels.has_key(frame.name) - self.subscriptions.unsubscribe(name,replyTo) - -class MQStoreController(StoreController): - def cb_put(self, frame): - try: - if not frame.headers.has_key("timestamp"): - frame.headers['timestamp'] = str(time.time()) - assert len(frame.timestamp) > 0 - values = map(lambda kv: kv.split("="), frame.body.split("\n")) - values = filter(lambda x: len(x)==2, values) #filter non-pairs from empty lines - for (name,data) in values: - if name not in self.channels.keys(): - print "No channel named {0}".format(name) - continue - print name,"=",data - self.put(name, frame.timestamp, data) - except Exception,e: traceback.print_exc() - - - def cb_get(self, frame): - try: - assert len(frame.headers['reply-to']) > 0 - r = self.get(frame.name, frame.body) - print "send",frame.name,r - self.client.send(frame.headers['reply-to'], - body=r) - except Exception,e: traceback.print_exc() - - - def cb_subscribe(self, frame): - try: - assert len(frame.headers['reply-to']) > 0 - assert frame.action in ["subscribe","unsubscribe"] - - if frame.action == "subscribe": - self.subscribe(frame.name, frame.headers['reply-to']) - elif frame.action == "unsubscribe": - self.unsubscribe(frame.name, frame.headers['reply-to']) - except Exception,e: traceback.print_exc() |