diff options
Diffstat (limited to 'heap/datastore/interactive.py')
-rw-r--r-- | heap/datastore/interactive.py | 51 |
1 files changed, 51 insertions, 0 deletions
diff --git a/heap/datastore/interactive.py b/heap/datastore/interactive.py new file mode 100644 index 0000000..76a9636 --- /dev/null +++ b/heap/datastore/interactive.py @@ -0,0 +1,51 @@ +from IPython import embed +from time import time +from stompclient import PublishSubscribeClient +import config +from uuid import uuid1 as uuid +from threading import Thread, Event + +client = PublishSubscribeClient(config.STOMP_HOST, config.STOMP_PORT) +listener = Thread(target=client.listen_forever, name='Frame-Receiver') +listener.start() +client.listening_event.wait() +r = client.connect(config.STOMP_LOGIN, config.STOMP_PASSCODE) +if not client.connection: + print r + exit(1) + +def put(valuemap): + body="\n".join(map(lambda (k,v): "{0}={1}".format(k,v), valuemap.iteritems())) + client.send(config.STOMP_QUEUE_PUT, + body=body, + extra_headers={"timestamp":int(time()*1000)}) + +def get(name,query=None,timeout=5.0): + reply=[] + def unlock(frame,reply): + reply.append(frame.body) + cond.set() + + replyTo='/topic/reply-' + str(uuid()) + cond = Event() + client.subscribe(replyTo, lambda frame: unlock(frame,reply)) + client.send(config.STOMP_QUEUE_GET, + body=query, + extra_headers={'name':name, + 'reply-to':replyTo}) + cond.wait(timeout) + client.unsubscribe(replyTo) + return len(reply)>0 and reply[0] or None + +print """ +EXAMPLES + put({"org.xapek.test1":"asd1234"}) + get("org.xapek.test1") + +""" + +embed() + +client.disconnect() + + |