summaryrefslogtreecommitdiff
path: root/datastore/interactive.py
blob: 76a9636d30cde7eec4db5bbdea3dee9068ec13c7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from IPython import embed
from time import time
from stompclient import PublishSubscribeClient
import config
from uuid import uuid1 as uuid
from threading import Thread, Event

client = PublishSubscribeClient(config.STOMP_HOST, config.STOMP_PORT)
listener = Thread(target=client.listen_forever, name='Frame-Receiver')
listener.start()
client.listening_event.wait()
r = client.connect(config.STOMP_LOGIN, config.STOMP_PASSCODE)
if not client.connection:
    print r
    exit(1)

def put(valuemap):
    body="\n".join(map(lambda (k,v): "{0}={1}".format(k,v), valuemap.iteritems()))
    client.send(config.STOMP_QUEUE_PUT,
                body=body,
                extra_headers={"timestamp":int(time()*1000)})

def get(name,query=None,timeout=5.0):
    reply=[]
    def unlock(frame,reply):
        reply.append(frame.body)
        cond.set()

    replyTo='/topic/reply-' + str(uuid())
    cond = Event()
    client.subscribe(replyTo, lambda frame: unlock(frame,reply))
    client.send(config.STOMP_QUEUE_GET,
                body=query,
                extra_headers={'name':name,
                               'reply-to':replyTo})
    cond.wait(timeout)
    client.unsubscribe(replyTo)
    return len(reply)>0 and reply[0] or None

print """
EXAMPLES
  put({"org.xapek.test1":"asd1234"})
  get("org.xapek.test1")

"""

embed()

client.disconnect()