summaryrefslogtreecommitdiff
path: root/heap/datastore/store/controller.py
blob: 0d694d18398be6af4b7e00dfd4299a19090b8d5a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import traceback
import time

class SubscriptionManager:
    def __init__(self):
        self.s = {}

    def subscribe(self,name,replyTo):
        if not self.s.has_key(name):
            self.s[name] = []
        assert self.s.has_key(name)

        if replyTo not in self.s[name]:
            self.s[name].append(replyTo)

    def unsubscribe(self,name,replyTo):
        assert self.s.has_key(name)
        assert replyTo in self.s[name]
        self.s[name].remove(replyTo)

    def notify(self, name, timestamp, data, controller):
        if not self.s.has_key(name):
            return
        for replyTo in self.s[name]:
            controller.client.send(replyTo,
                                   "{0}={1}".format(name,dat),
                                   extra_headers={'timestamp':timestamp})

class StoreController:
    def __init__(self, client, channels, processors):
        self.client = client
        self.channels = channels
        self.processors = processors
        self.subscriptions = SubscriptionManager()

    def put(self, name, timestamp, data):
        assert self.channels.has_key(name)

        self.channels[name].add(timestamp, data, self)
        self.subscriptions.notify(name, timestamp, data, self)
        if self.processors.has_key(name):
            for processor in self.processors[name]:
                processor.process(name, timestamp, data, self)

    def get(self, name, query):
        assert self.channels.has_key(name)
        return self.channels[name].get(query)

    def subscribe(self,name, replyTo):
        assert self.channels.has_key(frame.name)
        self.subscriptions.subscribe(name, replyTo)

    def unsubscribe(self,name,replyTo):
        assert self.channels.has_key(frame.name)
        self.subscriptions.unsubscribe(name,replyTo)

class MQStoreController(StoreController):
    def cb_put(self, frame):
        try:
            if not frame.headers.has_key("timestamp"):
                frame.headers['timestamp'] = str(time.time())
            assert len(frame.timestamp) > 0
            values = map(lambda kv: kv.split("="), frame.body.split("\n"))
            values = filter(lambda x: len(x)==2, values) #filter non-pairs from empty lines
            for (name,data) in values:
                if name not in self.channels.keys():
                    print "No channel named {0}".format(name)
                    continue
                print name,"=",data
                self.put(name, frame.timestamp, data)
        except Exception,e: traceback.print_exc()


    def cb_get(self, frame):
        try:
            assert len(frame.headers['reply-to']) > 0
            r = self.get(frame.name, frame.body)
            print "send",frame.name,r
            self.client.send(frame.headers['reply-to'],
                             body=r)
        except Exception,e: traceback.print_exc()


    def cb_subscribe(self, frame):
        try:
            assert len(frame.headers['reply-to']) > 0
            assert frame.action in ["subscribe","unsubscribe"]
        
            if frame.action == "subscribe":
                self.subscribe(frame.name, frame.headers['reply-to'])
            elif frame.action == "unsubscribe":
                self.unsubscribe(frame.name, frame.headers['reply-to'])
        except Exception,e: traceback.print_exc()