diff options
-rw-r--r-- | datastore/.gitignore | 2 | ||||
-rw-r--r-- | datastore/DEP | 3 | ||||
-rw-r--r-- | datastore/config.py | 34 | ||||
-rw-r--r-- | datastore/interactive.py | 51 | ||||
-rw-r--r-- | datastore/server.py | 36 | ||||
-rw-r--r-- | datastore/store/__init__.py | 1 | ||||
-rw-r--r-- | datastore/store/channel.py | 25 | ||||
-rw-r--r-- | datastore/store/controller.py | 89 | ||||
-rw-r--r-- | datastore/store/processor.py | 16 | ||||
-rw-r--r-- | datastore/store/timeseries.py | 264 |
10 files changed, 521 insertions, 0 deletions
diff --git a/datastore/.gitignore b/datastore/.gitignore new file mode 100644 index 0000000..f3d74a9 --- /dev/null +++ b/datastore/.gitignore @@ -0,0 +1,2 @@ +*.pyc +*~ diff --git a/datastore/DEP b/datastore/DEP new file mode 100644 index 0000000..f9f913f --- /dev/null +++ b/datastore/DEP @@ -0,0 +1,3 @@ +coilmq +stompclient +pytc (aptitude install libtokyocabinet-dev) diff --git a/datastore/config.py b/datastore/config.py new file mode 100644 index 0000000..9f615d2 --- /dev/null +++ b/datastore/config.py @@ -0,0 +1,34 @@ +from store.channel import PrinterChannel, SimpleMemoryChannel +from store.processor import CountingProcessor + +CHANNELS={ + 'org.xapek.test1' : PrinterChannel("org.xapek.test1"), + 'org.xapek.test1.count' : SimpleMemoryChannel(), +} + +PROCESSORS={ + # key : queue-name subscribed with + 'org.xapek.test1' : [ CountingProcessor("org.xapek.test1.count"), ], +} + + +STOMP_HOST="localhost" +STOMP_PORT=61613 +STOMP_LOGIN="" +STOMP_PASSCODE="" +STOMP_QUEUE_PUT="/queue/queue-put" +STOMP_QUEUE_GET="/queue/queue-get" +STOMP_QUEUE_SUBSCRIBE="/queue/queue-subscribe" + + +__all__ = [ + 'CHANNELS', + 'STOMP_HOST', + 'STOMP_PORT', + 'STOMP_LOGIN', + 'STOMP_PASSCODE', + 'STOMP_QUEUE_PUT', + 'STOMP_QUEUE_GET', + 'STOMP_QUEUE_SUBSCRIBE', +] + diff --git a/datastore/interactive.py b/datastore/interactive.py new file mode 100644 index 0000000..76a9636 --- /dev/null +++ b/datastore/interactive.py @@ -0,0 +1,51 @@ +from IPython import embed +from time import time +from stompclient import PublishSubscribeClient +import config +from uuid import uuid1 as uuid +from threading import Thread, Event + +client = PublishSubscribeClient(config.STOMP_HOST, config.STOMP_PORT) +listener = Thread(target=client.listen_forever, name='Frame-Receiver') +listener.start() +client.listening_event.wait() +r = client.connect(config.STOMP_LOGIN, config.STOMP_PASSCODE) +if not client.connection: + print r + exit(1) + +def put(valuemap): + body="\n".join(map(lambda (k,v): "{0}={1}".format(k,v), valuemap.iteritems())) + client.send(config.STOMP_QUEUE_PUT, + body=body, + extra_headers={"timestamp":int(time()*1000)}) + +def get(name,query=None,timeout=5.0): + reply=[] + def unlock(frame,reply): + reply.append(frame.body) + cond.set() + + replyTo='/topic/reply-' + str(uuid()) + cond = Event() + client.subscribe(replyTo, lambda frame: unlock(frame,reply)) + client.send(config.STOMP_QUEUE_GET, + body=query, + extra_headers={'name':name, + 'reply-to':replyTo}) + cond.wait(timeout) + client.unsubscribe(replyTo) + return len(reply)>0 and reply[0] or None + +print """ +EXAMPLES + put({"org.xapek.test1":"asd1234"}) + get("org.xapek.test1") + +""" + +embed() + +client.disconnect() + + diff --git a/datastore/server.py b/datastore/server.py new file mode 100644 index 0000000..8b9ca4e --- /dev/null +++ b/datastore/server.py @@ -0,0 +1,36 @@ +from sys import exit +from stompclient import PublishSubscribeClient +from threading import Thread +from store.controller import MQStoreController +from time import sleep + +import config + +client = PublishSubscribeClient(config.STOMP_HOST, config.STOMP_PORT) +listener = Thread(target=client.listen_forever, name='Frame-Receiver') +listener.start() +client.listening_event.wait() +r = client.connect(config.STOMP_LOGIN, config.STOMP_PASSCODE) +if r.command != "CONNECTED": + print "Failed to connect to {0}:{1}".format(config.STOMP_HOST,config.STOMP_PORT) + print r + exit(1) + + +controller = MQStoreController(client, config.CHANNELS, config.PROCESSORS) +client.subscribe(config.STOMP_QUEUE_PUT, controller.cb_put) +client.subscribe(config.STOMP_QUEUE_GET, controller.cb_get) +client.subscribe(config.STOMP_QUEUE_SUBSCRIBE, controller.cb_subscribe) + +while True: + try: + sleep(1) + except KeyboardInterrupt: + print "shutdown" + try: + client.shutdown_event.set() + except: + pass + exit(0) + + diff --git a/datastore/store/__init__.py b/datastore/store/__init__.py new file mode 100644 index 0000000..792d600 --- /dev/null +++ b/datastore/store/__init__.py @@ -0,0 +1 @@ +# diff --git a/datastore/store/channel.py b/datastore/store/channel.py new file mode 100644 index 0000000..2a5d5f4 --- /dev/null +++ b/datastore/store/channel.py @@ -0,0 +1,25 @@ +class IChannel: + def __init__(self): raise NotImplementedError() + def add(self,timestamp,data,controller): pass + def get(self,query): pass + +class PrinterChannel(IChannel): + def __init__(self,name): + self.name = name + def add(self,timestamp,data,controller): + print "{0}: {1},{2}".format(self.name, timestamp, data) + +class SimpleMemoryChannel(IChannel): + def __init__(self): + self.data = "" + def add(self,timestamp,data,controller): + self.data = data + def get(self,query): + return str(self.data) + +class IntervalStoreChannel(IChannel): + SECONDS=1000 + MINUTES=60*1000 + HOURS=60*60*1000 + DAYS=24*60*60*1000 + def __init__(self): pass diff --git a/datastore/store/controller.py b/datastore/store/controller.py new file mode 100644 index 0000000..0814488 --- /dev/null +++ b/datastore/store/controller.py @@ -0,0 +1,89 @@ +import traceback + +class SubscriptionManager: + def __init__(self): + self.s = {} + + def subscribe(self,name,replyTo): + if not self.s.has_key(name): + self.s[name] = [] + assert self.s.has_key(name) + + if replyTo not in self.s[name]: + self.s[name].append(replyTo) + + def unsubscribe(self,name,replyTo): + assert self.s.has_key(name) + assert replyTo in self.s[name] + self.s[name].remove(replyTo) + + def notify(self, name, timestamp, data, controller): + if not self.s.has_key(name): + return + for replyTo in self.s[name]: + controller.client.send(replyTo, + "{0}={1}".format(name,dat), + extra_headers={'timestamp':timestamp}) + +class StoreController: + def __init__(self, client, channels, processors): + self.client = client + self.channels = channels + self.processors = processors + self.subscriptions = SubscriptionManager() + + def put(self, name, timestamp, data): + assert self.channels.has_key(name) + + self.channels[name].add(timestamp, data, self) + self.subscriptions.notify(name, timestamp, data, self) + if self.processors.has_key(name): + for processor in self.processors[name]: + processor.process(name, timestamp, data, self) + + def get(self, name, query): + assert self.channels.has_key(name) + return self.channels[name].get(query) + + def subscribe(self,name, replyTo): + assert self.channels.has_key(frame.name) + self.subscriptions.subscribe(name, replyTo) + + def unsubscribe(self,name,replyTo): + assert self.channels.has_key(frame.name) + self.subscriptions.unsubscribe(name,replyTo) + +class MQStoreController(StoreController): + def cb_put(self, frame): + try: + assert len(frame.timestamp) > 0 + values = map(lambda kv: kv.split("="), frame.body.split("\n")) + for (name,data) in values: + assert self.channels.has_key(name) + print name,"=",data + + for (name,data) in values: + self.put(name, frame.timestamp, data) + except Exception,e: traceback.print_exc() + + + def cb_get(self, frame): + try: + assert len(frame.headers['reply-to']) > 0 + r = self.get(frame.name, frame.body) + print "send",frame.name,r + self.client.send(frame.headers['reply-to'], + body=r) + except Exception,e: traceback.print_exc() + + + def cb_subscribe(self, frame): + try: + assert len(frame.headers['reply-to']) > 0 + assert frame.action in ["subscribe","unsubscribe"] + + if frame.action == "subscribe": + self.subscribe(frame.name, frame.headers['reply-to']) + elif frame.action == "unsubscribe": + self.unsubscribe(frame.name, frame.headers['reply-to']) + except Exception,e: traceback.print_exc() diff --git a/datastore/store/processor.py b/datastore/store/processor.py new file mode 100644 index 0000000..9d39d9a --- /dev/null +++ b/datastore/store/processor.py @@ -0,0 +1,16 @@ +class IProcessor: + def __init__(self): raise NotImplementedError() + def process(self, name, timestamp, data, controller): pass + +class CountingProcessor(IProcessor): + def __init__(self, destination): + self.destination = destination + + def process(self,name,timestamp,data,controller): + current = controller.get(self.destination,None) + if not current: + current = 0 + current = int(current) + + controller.put(self.destination, timestamp, current + 1) + diff --git a/datastore/store/timeseries.py b/datastore/store/timeseries.py new file mode 100644 index 0000000..8e30e4e --- /dev/null +++ b/datastore/store/timeseries.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python +from collections import namedtuple +import struct +import time +import glob +import os + +import logging + +class Constants: + DB_MAX_NAME_LENGTH = 10 + DB_WRITEBUF_LENGTH = 10 + DBDAY_FORMAT_DAY = "%d%m%Y" + DB_OPEN_FILES = 5 + + INT_MILLI = 1 + INT_SEC = 1000 + INT_MIN = 60 * INT_SEC + INT_HOUR = 60 * INT_MIN + INT_DAY = 24 * INT_HOUR + INT_WEEK = 7 * INT_DAY + INT_MONTH = 31 * INT_DAY + INT_YEAR = 365 * INT_DAY + +C = Constants + +def time_ms(): + return int(time.time() * 1000) + +class DB(object): + def __init__(self, interval, fileinterval, directory, name): + assert len(name) <= Constants.DB_MAX_NAME_LENGTH, \ + "name must be smaller than {0} (is {1})".format(Constants.DB_MAX_NAME_LENGTH, len(name)) + self.interval = interval + self.fileinterval = fileinterval + self.directory = directory + self.name = name + self.isize = self.item_size() # item_size + self._files = {} + self._write_buf = {} + self.last_access = [] + for path in glob.glob(self.directory + os.path.sep + name + "*"): + dbday = DBSegment.fopen(path, self) + self._files[dbday.filekey] = dbday + + def __getitem__(self, key): + return self.get(key) + + def get(self, key): + raise NotImplementedError() + + def _get(self, key): + if not isinstance(key, int): + raise TypeError("DB keys must be millisecond timestamps (int)") + + logging.debug("get -> key %s fileinterval %s interval %s", key, self.fileinterval, self.interval) + filekey = self._calc_segment_key(key) + key = self._calc_key(key) + logging.debug("get => filekey %s key %s", filekey, key ) + + if (filekey, key) in self._write_buf.keys(): + logging.debug("Write-Cache hit %s", key) + return self._write_buf[(filekey, key)] + + db_segment = self.get_segment(filekey) + return db_segment[key] + + def __setitem__(self, key, value): + return self.set(key, value) + + def set(self, key, value): + raise NotImplementedError() + + def _set(self, key, value): + if not isinstance(key, int): + raise TypeError("DB keys must be millisecond timestamps (int)") + + logging.debug("get -> key %s fileinterval %s interval %s", key, self.fileinterval, self.interval) + filekey = self._calc_segment_key(key) + key = self._calc_key(key) + logging.debug("get => filekey %s key %s", filekey, key ) + + self._write_buf[(filekey, key)] = value + if len(self._write_buf) > Constants.DB_WRITEBUF_LENGTH: + self._flush_buf() + + return value + + def get_segment(self, filekey): + if filekey in self._files.keys(): + return self._files[filekey] + else: + header = DBSegment.header( filekey=filekey, + isize=self.isize, interval=self.interval, + fileinterval=self.fileinterval, dbname=self.name) + path = self.directory + os.path.sep + self.name + str(filekey) + db_segment = DBSegment(self, path, header) + logging.info("New File %s", db_segment.path) + self._files[filekey] = db_segment + return db_segment + + + + def _calc_key(self, key): + return int( (key % self.fileinterval) / self.interval) + + def _calc_segment_key(self, key): + return int( key / self.fileinterval ) + + def _flush_buf(self): + logging.debug("flush") + keys = self._write_buf.keys() + keys.sort() + for (filekey,key) in keys: + db_segment = self.get_segment(filekey) + db_segment[key] = self._write_buf[(filekey, key)] + self._write_buf.clear() + + def close(self): + self._flush_buf() + + def item_size(self): + """""" + raise NotImplementedError() + +class InvalidSegmentError(Exception): + def __init__(self, header, *args): + Exception.__init__(self, *args) + self.header = header + +class _LazyEval(object): + def __init__(self, func, *args, **kwargs): + self.func, self.args, self.kwargs = func, args, kwargs + def __str__(self): + return self.func(*self.args, **self.kwargs) + +def _to_hexstring(input_string): + def _do(string): + return " ".join(map(lambda c:"%02x"%ord(c), string)).upper() + return _LazyEval(_do, input_string) + +class DBSegment(object): + header = namedtuple('Header', 'filekey isize interval fileinterval dbname') + header_format = "iiii%dp"%Constants.DB_MAX_NAME_LENGTH + header_size = struct.calcsize( "iiii%dp"%Constants.DB_MAX_NAME_LENGTH ) + + @staticmethod + def fopen(path, db): + filekey = int( os.path.basename(path)[len(db.name):] ) + with file(path, "rb") as fh: + header = DBSegment.header._make( struct.unpack( DBSegment.header_format, fh.read( DBSegment.header_size ) ) ) + if not (header.filekey == filekey + and header.interval == db.interval + and header.fileinterval == db.fileinterval + and header.dbname == db.name): + raise InvalidSegmentError(header) + f = DBSegment(db, path, header) + return f + + def __init__(self, db, path, header): + self.db = db + self.path = path + self.header = header + + self.filekey = header.filekey + if not os.path.exists(self.path): + logging.info("Write header in new file %s", self.path) + self.fh = file(self.path, "wb") + self.fh.write( struct.pack( DBSegment.header_format, *list(header) ) ) + end = ( DBSegment.header_size + + ( self.db.fileinterval / self.db.interval ) * self.db.isize) + self.fh.seek( end ) + self.fh.write( "\0" ) + self.fh.close() + self.fh = None + self.open() + + def __setitem__(self, key, data): + logging.debug("Set %s = %s", key, _to_hexstring(data)) + self._seek(key) + self.fh.write( data[:self.db.isize] ) + self.lastpos = self.fh.tell() + + def __getitem__(self, key): + self._seek(key) + data = self.fh.read( self.db.isize ) + self.lastpos = self.fh.tell() + logging.debug("Get %s => %s", key, _to_hexstring(data)) + return data + + def _seek(self, key): + offset = DBSegment.header_size + \ + key * self.db.isize + + if self.lastpos == offset: + logging.debug("continue at %s", self.lastpos) + else: + logging.debug("seek to %s in %s instance %s (lastpos = %s)", offset, self.path, id(self), self.lastpos) + if not self.fh: + self.open() + self.fh.seek( offset ) + + def open(self): + if not self.fh: + logging.debug("Open %s",self.path) + self.fh = file(self.path, "r+b") + self.lastpos = self.fh.tell() + + def close(self): + logging.info("Close %s instance %s", self.path, id(self)) + if self.fh: + self.fh.close() + self.fh = None + +class DBInt(DB): + payload = namedtuple("payload", "x y z") + payload_format = "iii" + def get(self, key): + return DBInt.payload._make( + struct.unpack( + DBInt.payload_format, self._get(key) ) ) + + def set(self, key, value): + return self._set(key, + struct.pack(DBInt.payload_format, *list(value))) + + def item_size(self): + return struct.calcsize( DBInt.payload_format ) + +class DBSimpleInt(DB): + def get(self, key): + return struct.unpack("i", self._get(key))[0] + + def set(self, key, value): + return self._set(key, + struct.pack("i", value)) + + def item_size(self): + return struct.calcsize( "i" ) + +import random +if __name__ == "__main__": + #logging.basicConfig(level=logging.DEBUG) + logging.basicConfig(level=logging.INFO) + #logging.basicConfig(level=logging.ERROR) + + # Alle 5 sek. + start = time_ms() + stop = time_ms() + Constants.INT_DAY*2 + + db = DBSimple(1000 * Constants.INT_MILLI, Constants.INT_DAY, ".", "testdb") + rand = 123456 + for ts in range(start, stop, 1000*Constants.INT_MILLI): + db[ts] = -ts % rand + assert db[ts] == -ts%rand, "%s != %s"%(db[ts], ts) + + r = range(start, stop, 1000*Constants.INT_MILLI) + r.reverse() + for ts in r: + assert db[ts] == -ts%rand, "%s != %s"%(db[ts], -ts%rand) + + + db.close() + |