1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
import traceback
import time
class SubscriptionManager:
def __init__(self):
self.s = {}
def subscribe(self,name,replyTo):
if not self.s.has_key(name):
self.s[name] = []
assert self.s.has_key(name)
if replyTo not in self.s[name]:
self.s[name].append(replyTo)
def unsubscribe(self,name,replyTo):
assert self.s.has_key(name)
assert replyTo in self.s[name]
self.s[name].remove(replyTo)
def notify(self, name, timestamp, data, controller):
if not self.s.has_key(name):
return
for replyTo in self.s[name]:
controller.client.send(replyTo,
"{0}={1}".format(name,dat),
extra_headers={'timestamp':timestamp})
class StoreController:
def __init__(self, client, channels, processors):
self.client = client
self.channels = channels
self.processors = processors
self.subscriptions = SubscriptionManager()
def put(self, name, timestamp, data):
assert self.channels.has_key(name)
self.channels[name].add(timestamp, data, self)
self.subscriptions.notify(name, timestamp, data, self)
if self.processors.has_key(name):
for processor in self.processors[name]:
processor.process(name, timestamp, data, self)
def get(self, name, query):
assert self.channels.has_key(name)
return self.channels[name].get(query)
def subscribe(self,name, replyTo):
assert self.channels.has_key(frame.name)
self.subscriptions.subscribe(name, replyTo)
def unsubscribe(self,name,replyTo):
assert self.channels.has_key(frame.name)
self.subscriptions.unsubscribe(name,replyTo)
class MQStoreController(StoreController):
def cb_put(self, frame):
try:
if not frame.headers.has_key("timestamp"):
frame.headers['timestamp'] = str(time.time())
assert len(frame.timestamp) > 0
values = map(lambda kv: kv.split("="), frame.body.split("\n"))
values = filter(lambda x: len(x)==2, values) #filter non-pairs from empty lines
for (name,data) in values:
if name not in self.channels.keys():
print "No channel named {0}".format(name)
continue
print name,"=",data
self.put(name, frame.timestamp, data)
except Exception,e: traceback.print_exc()
def cb_get(self, frame):
try:
assert len(frame.headers['reply-to']) > 0
r = self.get(frame.name, frame.body)
print "send",frame.name,r
self.client.send(frame.headers['reply-to'],
body=r)
except Exception,e: traceback.print_exc()
def cb_subscribe(self, frame):
try:
assert len(frame.headers['reply-to']) > 0
assert frame.action in ["subscribe","unsubscribe"]
if frame.action == "subscribe":
self.subscribe(frame.name, frame.headers['reply-to'])
elif frame.action == "unsubscribe":
self.unsubscribe(frame.name, frame.headers['reply-to'])
except Exception,e: traceback.print_exc()
|