diff options
author | Ebus-at-dockstar <ebus@dockstar> | 2013-03-25 10:24:28 +0100 |
---|---|---|
committer | Ebus-at-dockstar <ebus@dockstar> | 2013-03-25 10:24:43 +0100 |
commit | 862282ce99760832d3e9e5b4b1171b861105e004 (patch) | |
tree | 0e229418e391917b79d42a8bdee46fb7a8612895 /heap/datastore/store/controller.py | |
parent | 9522cdffa94f278eb5e1894600535986e22c2890 (diff) | |
download | ebus-alt-862282ce99760832d3e9e5b4b1171b861105e004.tar.gz ebus-alt-862282ce99760832d3e9e5b4b1171b861105e004.zip |
move old stuff away
Diffstat (limited to 'heap/datastore/store/controller.py')
-rw-r--r-- | heap/datastore/store/controller.py | 93 |
1 files changed, 93 insertions, 0 deletions
diff --git a/heap/datastore/store/controller.py b/heap/datastore/store/controller.py new file mode 100644 index 0000000..0d694d1 --- /dev/null +++ b/heap/datastore/store/controller.py @@ -0,0 +1,93 @@ +import traceback +import time + +class SubscriptionManager: + def __init__(self): + self.s = {} + + def subscribe(self,name,replyTo): + if not self.s.has_key(name): + self.s[name] = [] + assert self.s.has_key(name) + + if replyTo not in self.s[name]: + self.s[name].append(replyTo) + + def unsubscribe(self,name,replyTo): + assert self.s.has_key(name) + assert replyTo in self.s[name] + self.s[name].remove(replyTo) + + def notify(self, name, timestamp, data, controller): + if not self.s.has_key(name): + return + for replyTo in self.s[name]: + controller.client.send(replyTo, + "{0}={1}".format(name,dat), + extra_headers={'timestamp':timestamp}) + +class StoreController: + def __init__(self, client, channels, processors): + self.client = client + self.channels = channels + self.processors = processors + self.subscriptions = SubscriptionManager() + + def put(self, name, timestamp, data): + assert self.channels.has_key(name) + + self.channels[name].add(timestamp, data, self) + self.subscriptions.notify(name, timestamp, data, self) + if self.processors.has_key(name): + for processor in self.processors[name]: + processor.process(name, timestamp, data, self) + + def get(self, name, query): + assert self.channels.has_key(name) + return self.channels[name].get(query) + + def subscribe(self,name, replyTo): + assert self.channels.has_key(frame.name) + self.subscriptions.subscribe(name, replyTo) + + def unsubscribe(self,name,replyTo): + assert self.channels.has_key(frame.name) + self.subscriptions.unsubscribe(name,replyTo) + +class MQStoreController(StoreController): + def cb_put(self, frame): + try: + if not frame.headers.has_key("timestamp"): + frame.headers['timestamp'] = str(time.time()) + assert len(frame.timestamp) > 0 + values = map(lambda kv: kv.split("="), frame.body.split("\n")) + values = filter(lambda x: len(x)==2, values) #filter non-pairs from empty lines + for (name,data) in values: + if name not in self.channels.keys(): + print "No channel named {0}".format(name) + continue + print name,"=",data + self.put(name, frame.timestamp, data) + except Exception,e: traceback.print_exc() + + + def cb_get(self, frame): + try: + assert len(frame.headers['reply-to']) > 0 + r = self.get(frame.name, frame.body) + print "send",frame.name,r + self.client.send(frame.headers['reply-to'], + body=r) + except Exception,e: traceback.print_exc() + + + def cb_subscribe(self, frame): + try: + assert len(frame.headers['reply-to']) > 0 + assert frame.action in ["subscribe","unsubscribe"] + + if frame.action == "subscribe": + self.subscribe(frame.name, frame.headers['reply-to']) + elif frame.action == "unsubscribe": + self.unsubscribe(frame.name, frame.headers['reply-to']) + except Exception,e: traceback.print_exc() |