summaryrefslogtreecommitdiff
path: root/datastore/store/controller.py
diff options
context:
space:
mode:
Diffstat (limited to 'datastore/store/controller.py')
-rw-r--r--datastore/store/controller.py93
1 files changed, 0 insertions, 93 deletions
diff --git a/datastore/store/controller.py b/datastore/store/controller.py
deleted file mode 100644
index 0d694d1..0000000
--- a/datastore/store/controller.py
+++ /dev/null
@@ -1,93 +0,0 @@
-import traceback
-import time
-
-class SubscriptionManager:
- def __init__(self):
- self.s = {}
-
- def subscribe(self,name,replyTo):
- if not self.s.has_key(name):
- self.s[name] = []
- assert self.s.has_key(name)
-
- if replyTo not in self.s[name]:
- self.s[name].append(replyTo)
-
- def unsubscribe(self,name,replyTo):
- assert self.s.has_key(name)
- assert replyTo in self.s[name]
- self.s[name].remove(replyTo)
-
- def notify(self, name, timestamp, data, controller):
- if not self.s.has_key(name):
- return
- for replyTo in self.s[name]:
- controller.client.send(replyTo,
- "{0}={1}".format(name,dat),
- extra_headers={'timestamp':timestamp})
-
-class StoreController:
- def __init__(self, client, channels, processors):
- self.client = client
- self.channels = channels
- self.processors = processors
- self.subscriptions = SubscriptionManager()
-
- def put(self, name, timestamp, data):
- assert self.channels.has_key(name)
-
- self.channels[name].add(timestamp, data, self)
- self.subscriptions.notify(name, timestamp, data, self)
- if self.processors.has_key(name):
- for processor in self.processors[name]:
- processor.process(name, timestamp, data, self)
-
- def get(self, name, query):
- assert self.channels.has_key(name)
- return self.channels[name].get(query)
-
- def subscribe(self,name, replyTo):
- assert self.channels.has_key(frame.name)
- self.subscriptions.subscribe(name, replyTo)
-
- def unsubscribe(self,name,replyTo):
- assert self.channels.has_key(frame.name)
- self.subscriptions.unsubscribe(name,replyTo)
-
-class MQStoreController(StoreController):
- def cb_put(self, frame):
- try:
- if not frame.headers.has_key("timestamp"):
- frame.headers['timestamp'] = str(time.time())
- assert len(frame.timestamp) > 0
- values = map(lambda kv: kv.split("="), frame.body.split("\n"))
- values = filter(lambda x: len(x)==2, values) #filter non-pairs from empty lines
- for (name,data) in values:
- if name not in self.channels.keys():
- print "No channel named {0}".format(name)
- continue
- print name,"=",data
- self.put(name, frame.timestamp, data)
- except Exception,e: traceback.print_exc()
-
-
- def cb_get(self, frame):
- try:
- assert len(frame.headers['reply-to']) > 0
- r = self.get(frame.name, frame.body)
- print "send",frame.name,r
- self.client.send(frame.headers['reply-to'],
- body=r)
- except Exception,e: traceback.print_exc()
-
-
- def cb_subscribe(self, frame):
- try:
- assert len(frame.headers['reply-to']) > 0
- assert frame.action in ["subscribe","unsubscribe"]
-
- if frame.action == "subscribe":
- self.subscribe(frame.name, frame.headers['reply-to'])
- elif frame.action == "unsubscribe":
- self.unsubscribe(frame.name, frame.headers['reply-to'])
- except Exception,e: traceback.print_exc()