diff options
Diffstat (limited to 'datastore')
-rw-r--r-- | datastore/.gitignore | 2 | ||||
-rw-r--r-- | datastore/DEP | 3 | ||||
-rw-r--r-- | datastore/config.py | 37 | ||||
-rw-r--r-- | datastore/dump.py | 29 | ||||
-rw-r--r-- | datastore/interactive.py | 51 | ||||
-rw-r--r-- | datastore/server.py | 36 | ||||
-rw-r--r-- | datastore/store/__init__.py | 1 | ||||
-rw-r--r-- | datastore/store/channel.py | 25 | ||||
-rw-r--r-- | datastore/store/controller.py | 93 | ||||
-rw-r--r-- | datastore/store/processor.py | 16 | ||||
-rw-r--r-- | datastore/store/timeseries.py | 264 |
11 files changed, 0 insertions, 557 deletions
diff --git a/datastore/.gitignore b/datastore/.gitignore deleted file mode 100644 index f3d74a9..0000000 --- a/datastore/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -*.pyc -*~ diff --git a/datastore/DEP b/datastore/DEP deleted file mode 100644 index f9f913f..0000000 --- a/datastore/DEP +++ /dev/null @@ -1,3 +0,0 @@ -coilmq -stompclient -pytc (aptitude install libtokyocabinet-dev) diff --git a/datastore/config.py b/datastore/config.py deleted file mode 100644 index cd85f02..0000000 --- a/datastore/config.py +++ /dev/null @@ -1,37 +0,0 @@ -from store.channel import PrinterChannel, SimpleMemoryChannel -from store.processor import CountingProcessor - -CHANNELS={ - 'org.xapek.test1' : PrinterChannel("org.xapek.test1"), - 'org.xapek.test1.count' : SimpleMemoryChannel(), - 'arduino.a0' : PrinterChannel("a0"), - 'arduino.a0.dba' : PrinterChannel("a0.dba"), - 'arduino.a0.mv' : PrinterChannel("a0.mv"), -} - -PROCESSORS={ - # key : queue-name subscribed with - 'org.xapek.test1' : [ CountingProcessor("org.xapek.test1.count"), ], -} - - -STOMP_HOST="10.2.2.26" -STOMP_PORT=61613 -STOMP_LOGIN="" -STOMP_PASSCODE="" -STOMP_QUEUE_PUT="/queue/queue-put" -STOMP_QUEUE_GET="/queue/queue-get" -STOMP_QUEUE_SUBSCRIBE="/queue/queue-subscribe" - - -__all__ = [ - 'CHANNELS', - 'STOMP_HOST', - 'STOMP_PORT', - 'STOMP_LOGIN', - 'STOMP_PASSCODE', - 'STOMP_QUEUE_PUT', - 'STOMP_QUEUE_GET', - 'STOMP_QUEUE_SUBSCRIBE', -] - diff --git a/datastore/dump.py b/datastore/dump.py deleted file mode 100644 index ac7ec29..0000000 --- a/datastore/dump.py +++ /dev/null @@ -1,29 +0,0 @@ -from time import sleep -from stompclient import PublishSubscribeClient -import config -from threading import Thread -from pprint import pprint - -client = PublishSubscribeClient(config.STOMP_HOST, config.STOMP_PORT) -listener = Thread(target=client.listen_forever, name='Frame-Receiver') -listener.start() -client.listening_event.wait() -r = client.connect(config.STOMP_LOGIN, config.STOMP_PASSCODE) -if not client.connection: - print r - exit(1) - - -def dump(x): - pprint(x.headers) - for line in x.body.split("\n"): - print "\t", - pprint(line) - -client.subscribe("/queue/queue-new", dump) - -try: - while True: - sleep(1) -except: - client.disconnect() diff --git a/datastore/interactive.py b/datastore/interactive.py deleted file mode 100644 index 76a9636..0000000 --- a/datastore/interactive.py +++ /dev/null @@ -1,51 +0,0 @@ -from IPython import embed -from time import time -from stompclient import PublishSubscribeClient -import config -from uuid import uuid1 as uuid -from threading import Thread, Event - -client = PublishSubscribeClient(config.STOMP_HOST, config.STOMP_PORT) -listener = Thread(target=client.listen_forever, name='Frame-Receiver') -listener.start() -client.listening_event.wait() -r = client.connect(config.STOMP_LOGIN, config.STOMP_PASSCODE) -if not client.connection: - print r - exit(1) - -def put(valuemap): - body="\n".join(map(lambda (k,v): "{0}={1}".format(k,v), valuemap.iteritems())) - client.send(config.STOMP_QUEUE_PUT, - body=body, - extra_headers={"timestamp":int(time()*1000)}) - -def get(name,query=None,timeout=5.0): - reply=[] - def unlock(frame,reply): - reply.append(frame.body) - cond.set() - - replyTo='/topic/reply-' + str(uuid()) - cond = Event() - client.subscribe(replyTo, lambda frame: unlock(frame,reply)) - client.send(config.STOMP_QUEUE_GET, - body=query, - extra_headers={'name':name, - 'reply-to':replyTo}) - cond.wait(timeout) - client.unsubscribe(replyTo) - return len(reply)>0 and reply[0] or None - -print """ -EXAMPLES - put({"org.xapek.test1":"asd1234"}) - get("org.xapek.test1") - -""" - -embed() - -client.disconnect() - - diff --git a/datastore/server.py b/datastore/server.py deleted file mode 100644 index 8b9ca4e..0000000 --- a/datastore/server.py +++ /dev/null @@ -1,36 +0,0 @@ -from sys import exit -from stompclient import PublishSubscribeClient -from threading import Thread -from store.controller import MQStoreController -from time import sleep - -import config - -client = PublishSubscribeClient(config.STOMP_HOST, config.STOMP_PORT) -listener = Thread(target=client.listen_forever, name='Frame-Receiver') -listener.start() -client.listening_event.wait() -r = client.connect(config.STOMP_LOGIN, config.STOMP_PASSCODE) -if r.command != "CONNECTED": - print "Failed to connect to {0}:{1}".format(config.STOMP_HOST,config.STOMP_PORT) - print r - exit(1) - - -controller = MQStoreController(client, config.CHANNELS, config.PROCESSORS) -client.subscribe(config.STOMP_QUEUE_PUT, controller.cb_put) -client.subscribe(config.STOMP_QUEUE_GET, controller.cb_get) -client.subscribe(config.STOMP_QUEUE_SUBSCRIBE, controller.cb_subscribe) - -while True: - try: - sleep(1) - except KeyboardInterrupt: - print "shutdown" - try: - client.shutdown_event.set() - except: - pass - exit(0) - - diff --git a/datastore/store/__init__.py b/datastore/store/__init__.py deleted file mode 100644 index 792d600..0000000 --- a/datastore/store/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# diff --git a/datastore/store/channel.py b/datastore/store/channel.py deleted file mode 100644 index 1e083b5..0000000 --- a/datastore/store/channel.py +++ /dev/null @@ -1,25 +0,0 @@ -class IChannel: - def __init__(self): raise NotImplementedError() - def add(self,timestamp,data,controller): pass - def get(self,query): pass - -class PrinterChannel(IChannel): - def __init__(self,name): - self.name = name - def add(self,timestamp,data,controller): - print "{0}: timestamp={1} value={2}".format(self.name, timestamp, data) - -class SimpleMemoryChannel(IChannel): - def __init__(self): - self.data = "" - def add(self,timestamp,data,controller): - self.data = data - def get(self,query): - return str(self.data) - -class IntervalStoreChannel(IChannel): - SECONDS=1000 - MINUTES=60*1000 - HOURS=60*60*1000 - DAYS=24*60*60*1000 - def __init__(self): pass diff --git a/datastore/store/controller.py b/datastore/store/controller.py deleted file mode 100644 index 0d694d1..0000000 --- a/datastore/store/controller.py +++ /dev/null @@ -1,93 +0,0 @@ -import traceback -import time - -class SubscriptionManager: - def __init__(self): - self.s = {} - - def subscribe(self,name,replyTo): - if not self.s.has_key(name): - self.s[name] = [] - assert self.s.has_key(name) - - if replyTo not in self.s[name]: - self.s[name].append(replyTo) - - def unsubscribe(self,name,replyTo): - assert self.s.has_key(name) - assert replyTo in self.s[name] - self.s[name].remove(replyTo) - - def notify(self, name, timestamp, data, controller): - if not self.s.has_key(name): - return - for replyTo in self.s[name]: - controller.client.send(replyTo, - "{0}={1}".format(name,dat), - extra_headers={'timestamp':timestamp}) - -class StoreController: - def __init__(self, client, channels, processors): - self.client = client - self.channels = channels - self.processors = processors - self.subscriptions = SubscriptionManager() - - def put(self, name, timestamp, data): - assert self.channels.has_key(name) - - self.channels[name].add(timestamp, data, self) - self.subscriptions.notify(name, timestamp, data, self) - if self.processors.has_key(name): - for processor in self.processors[name]: - processor.process(name, timestamp, data, self) - - def get(self, name, query): - assert self.channels.has_key(name) - return self.channels[name].get(query) - - def subscribe(self,name, replyTo): - assert self.channels.has_key(frame.name) - self.subscriptions.subscribe(name, replyTo) - - def unsubscribe(self,name,replyTo): - assert self.channels.has_key(frame.name) - self.subscriptions.unsubscribe(name,replyTo) - -class MQStoreController(StoreController): - def cb_put(self, frame): - try: - if not frame.headers.has_key("timestamp"): - frame.headers['timestamp'] = str(time.time()) - assert len(frame.timestamp) > 0 - values = map(lambda kv: kv.split("="), frame.body.split("\n")) - values = filter(lambda x: len(x)==2, values) #filter non-pairs from empty lines - for (name,data) in values: - if name not in self.channels.keys(): - print "No channel named {0}".format(name) - continue - print name,"=",data - self.put(name, frame.timestamp, data) - except Exception,e: traceback.print_exc() - - - def cb_get(self, frame): - try: - assert len(frame.headers['reply-to']) > 0 - r = self.get(frame.name, frame.body) - print "send",frame.name,r - self.client.send(frame.headers['reply-to'], - body=r) - except Exception,e: traceback.print_exc() - - - def cb_subscribe(self, frame): - try: - assert len(frame.headers['reply-to']) > 0 - assert frame.action in ["subscribe","unsubscribe"] - - if frame.action == "subscribe": - self.subscribe(frame.name, frame.headers['reply-to']) - elif frame.action == "unsubscribe": - self.unsubscribe(frame.name, frame.headers['reply-to']) - except Exception,e: traceback.print_exc() diff --git a/datastore/store/processor.py b/datastore/store/processor.py deleted file mode 100644 index 9d39d9a..0000000 --- a/datastore/store/processor.py +++ /dev/null @@ -1,16 +0,0 @@ -class IProcessor: - def __init__(self): raise NotImplementedError() - def process(self, name, timestamp, data, controller): pass - -class CountingProcessor(IProcessor): - def __init__(self, destination): - self.destination = destination - - def process(self,name,timestamp,data,controller): - current = controller.get(self.destination,None) - if not current: - current = 0 - current = int(current) - - controller.put(self.destination, timestamp, current + 1) - diff --git a/datastore/store/timeseries.py b/datastore/store/timeseries.py deleted file mode 100644 index 8e30e4e..0000000 --- a/datastore/store/timeseries.py +++ /dev/null @@ -1,264 +0,0 @@ -#!/usr/bin/env python -from collections import namedtuple -import struct -import time -import glob -import os - -import logging - -class Constants: - DB_MAX_NAME_LENGTH = 10 - DB_WRITEBUF_LENGTH = 10 - DBDAY_FORMAT_DAY = "%d%m%Y" - DB_OPEN_FILES = 5 - - INT_MILLI = 1 - INT_SEC = 1000 - INT_MIN = 60 * INT_SEC - INT_HOUR = 60 * INT_MIN - INT_DAY = 24 * INT_HOUR - INT_WEEK = 7 * INT_DAY - INT_MONTH = 31 * INT_DAY - INT_YEAR = 365 * INT_DAY - -C = Constants - -def time_ms(): - return int(time.time() * 1000) - -class DB(object): - def __init__(self, interval, fileinterval, directory, name): - assert len(name) <= Constants.DB_MAX_NAME_LENGTH, \ - "name must be smaller than {0} (is {1})".format(Constants.DB_MAX_NAME_LENGTH, len(name)) - self.interval = interval - self.fileinterval = fileinterval - self.directory = directory - self.name = name - self.isize = self.item_size() # item_size - self._files = {} - self._write_buf = {} - self.last_access = [] - for path in glob.glob(self.directory + os.path.sep + name + "*"): - dbday = DBSegment.fopen(path, self) - self._files[dbday.filekey] = dbday - - def __getitem__(self, key): - return self.get(key) - - def get(self, key): - raise NotImplementedError() - - def _get(self, key): - if not isinstance(key, int): - raise TypeError("DB keys must be millisecond timestamps (int)") - - logging.debug("get -> key %s fileinterval %s interval %s", key, self.fileinterval, self.interval) - filekey = self._calc_segment_key(key) - key = self._calc_key(key) - logging.debug("get => filekey %s key %s", filekey, key ) - - if (filekey, key) in self._write_buf.keys(): - logging.debug("Write-Cache hit %s", key) - return self._write_buf[(filekey, key)] - - db_segment = self.get_segment(filekey) - return db_segment[key] - - def __setitem__(self, key, value): - return self.set(key, value) - - def set(self, key, value): - raise NotImplementedError() - - def _set(self, key, value): - if not isinstance(key, int): - raise TypeError("DB keys must be millisecond timestamps (int)") - - logging.debug("get -> key %s fileinterval %s interval %s", key, self.fileinterval, self.interval) - filekey = self._calc_segment_key(key) - key = self._calc_key(key) - logging.debug("get => filekey %s key %s", filekey, key ) - - self._write_buf[(filekey, key)] = value - if len(self._write_buf) > Constants.DB_WRITEBUF_LENGTH: - self._flush_buf() - - return value - - def get_segment(self, filekey): - if filekey in self._files.keys(): - return self._files[filekey] - else: - header = DBSegment.header( filekey=filekey, - isize=self.isize, interval=self.interval, - fileinterval=self.fileinterval, dbname=self.name) - path = self.directory + os.path.sep + self.name + str(filekey) - db_segment = DBSegment(self, path, header) - logging.info("New File %s", db_segment.path) - self._files[filekey] = db_segment - return db_segment - - - - def _calc_key(self, key): - return int( (key % self.fileinterval) / self.interval) - - def _calc_segment_key(self, key): - return int( key / self.fileinterval ) - - def _flush_buf(self): - logging.debug("flush") - keys = self._write_buf.keys() - keys.sort() - for (filekey,key) in keys: - db_segment = self.get_segment(filekey) - db_segment[key] = self._write_buf[(filekey, key)] - self._write_buf.clear() - - def close(self): - self._flush_buf() - - def item_size(self): - """""" - raise NotImplementedError() - -class InvalidSegmentError(Exception): - def __init__(self, header, *args): - Exception.__init__(self, *args) - self.header = header - -class _LazyEval(object): - def __init__(self, func, *args, **kwargs): - self.func, self.args, self.kwargs = func, args, kwargs - def __str__(self): - return self.func(*self.args, **self.kwargs) - -def _to_hexstring(input_string): - def _do(string): - return " ".join(map(lambda c:"%02x"%ord(c), string)).upper() - return _LazyEval(_do, input_string) - -class DBSegment(object): - header = namedtuple('Header', 'filekey isize interval fileinterval dbname') - header_format = "iiii%dp"%Constants.DB_MAX_NAME_LENGTH - header_size = struct.calcsize( "iiii%dp"%Constants.DB_MAX_NAME_LENGTH ) - - @staticmethod - def fopen(path, db): - filekey = int( os.path.basename(path)[len(db.name):] ) - with file(path, "rb") as fh: - header = DBSegment.header._make( struct.unpack( DBSegment.header_format, fh.read( DBSegment.header_size ) ) ) - if not (header.filekey == filekey - and header.interval == db.interval - and header.fileinterval == db.fileinterval - and header.dbname == db.name): - raise InvalidSegmentError(header) - f = DBSegment(db, path, header) - return f - - def __init__(self, db, path, header): - self.db = db - self.path = path - self.header = header - - self.filekey = header.filekey - if not os.path.exists(self.path): - logging.info("Write header in new file %s", self.path) - self.fh = file(self.path, "wb") - self.fh.write( struct.pack( DBSegment.header_format, *list(header) ) ) - end = ( DBSegment.header_size + - ( self.db.fileinterval / self.db.interval ) * self.db.isize) - self.fh.seek( end ) - self.fh.write( "\0" ) - self.fh.close() - self.fh = None - self.open() - - def __setitem__(self, key, data): - logging.debug("Set %s = %s", key, _to_hexstring(data)) - self._seek(key) - self.fh.write( data[:self.db.isize] ) - self.lastpos = self.fh.tell() - - def __getitem__(self, key): - self._seek(key) - data = self.fh.read( self.db.isize ) - self.lastpos = self.fh.tell() - logging.debug("Get %s => %s", key, _to_hexstring(data)) - return data - - def _seek(self, key): - offset = DBSegment.header_size + \ - key * self.db.isize - - if self.lastpos == offset: - logging.debug("continue at %s", self.lastpos) - else: - logging.debug("seek to %s in %s instance %s (lastpos = %s)", offset, self.path, id(self), self.lastpos) - if not self.fh: - self.open() - self.fh.seek( offset ) - - def open(self): - if not self.fh: - logging.debug("Open %s",self.path) - self.fh = file(self.path, "r+b") - self.lastpos = self.fh.tell() - - def close(self): - logging.info("Close %s instance %s", self.path, id(self)) - if self.fh: - self.fh.close() - self.fh = None - -class DBInt(DB): - payload = namedtuple("payload", "x y z") - payload_format = "iii" - def get(self, key): - return DBInt.payload._make( - struct.unpack( - DBInt.payload_format, self._get(key) ) ) - - def set(self, key, value): - return self._set(key, - struct.pack(DBInt.payload_format, *list(value))) - - def item_size(self): - return struct.calcsize( DBInt.payload_format ) - -class DBSimpleInt(DB): - def get(self, key): - return struct.unpack("i", self._get(key))[0] - - def set(self, key, value): - return self._set(key, - struct.pack("i", value)) - - def item_size(self): - return struct.calcsize( "i" ) - -import random -if __name__ == "__main__": - #logging.basicConfig(level=logging.DEBUG) - logging.basicConfig(level=logging.INFO) - #logging.basicConfig(level=logging.ERROR) - - # Alle 5 sek. - start = time_ms() - stop = time_ms() + Constants.INT_DAY*2 - - db = DBSimple(1000 * Constants.INT_MILLI, Constants.INT_DAY, ".", "testdb") - rand = 123456 - for ts in range(start, stop, 1000*Constants.INT_MILLI): - db[ts] = -ts % rand - assert db[ts] == -ts%rand, "%s != %s"%(db[ts], ts) - - r = range(start, stop, 1000*Constants.INT_MILLI) - r.reverse() - for ts in r: - assert db[ts] == -ts%rand, "%s != %s"%(db[ts], -ts%rand) - - - db.close() - |