summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYves Fischer <yvesf-git@xapek.org>2013-01-11 20:01:09 +0100
committerYves Fischer <yvesf-git@xapek.org>2013-01-11 20:01:09 +0100
commit934d642b410d75dae1966c993fc61c371959876e (patch)
treeca9bcd22d98d88424be0d21d6387ddba779a9daa
parent69fbdff8e8ba69034111038ce01184ef5472728a (diff)
downloadebus-alt-934d642b410d75dae1966c993fc61c371959876e.tar.gz
ebus-alt-934d642b410d75dae1966c993fc61c371959876e.zip
datastore
-rw-r--r--datastore/.gitignore2
-rw-r--r--datastore/DEP3
-rw-r--r--datastore/config.py34
-rw-r--r--datastore/interactive.py51
-rw-r--r--datastore/server.py36
-rw-r--r--datastore/store/__init__.py1
-rw-r--r--datastore/store/channel.py25
-rw-r--r--datastore/store/controller.py89
-rw-r--r--datastore/store/processor.py16
-rw-r--r--datastore/store/timeseries.py264
10 files changed, 521 insertions, 0 deletions
diff --git a/datastore/.gitignore b/datastore/.gitignore
new file mode 100644
index 0000000..f3d74a9
--- /dev/null
+++ b/datastore/.gitignore
@@ -0,0 +1,2 @@
+*.pyc
+*~
diff --git a/datastore/DEP b/datastore/DEP
new file mode 100644
index 0000000..f9f913f
--- /dev/null
+++ b/datastore/DEP
@@ -0,0 +1,3 @@
+coilmq
+stompclient
+pytc (aptitude install libtokyocabinet-dev)
diff --git a/datastore/config.py b/datastore/config.py
new file mode 100644
index 0000000..9f615d2
--- /dev/null
+++ b/datastore/config.py
@@ -0,0 +1,34 @@
+from store.channel import PrinterChannel, SimpleMemoryChannel
+from store.processor import CountingProcessor
+
+CHANNELS={
+ 'org.xapek.test1' : PrinterChannel("org.xapek.test1"),
+ 'org.xapek.test1.count' : SimpleMemoryChannel(),
+}
+
+PROCESSORS={
+ # key : queue-name subscribed with
+ 'org.xapek.test1' : [ CountingProcessor("org.xapek.test1.count"), ],
+}
+
+
+STOMP_HOST="localhost"
+STOMP_PORT=61613
+STOMP_LOGIN=""
+STOMP_PASSCODE=""
+STOMP_QUEUE_PUT="/queue/queue-put"
+STOMP_QUEUE_GET="/queue/queue-get"
+STOMP_QUEUE_SUBSCRIBE="/queue/queue-subscribe"
+
+
+__all__ = [
+ 'CHANNELS',
+ 'STOMP_HOST',
+ 'STOMP_PORT',
+ 'STOMP_LOGIN',
+ 'STOMP_PASSCODE',
+ 'STOMP_QUEUE_PUT',
+ 'STOMP_QUEUE_GET',
+ 'STOMP_QUEUE_SUBSCRIBE',
+]
+
diff --git a/datastore/interactive.py b/datastore/interactive.py
new file mode 100644
index 0000000..76a9636
--- /dev/null
+++ b/datastore/interactive.py
@@ -0,0 +1,51 @@
+from IPython import embed
+from time import time
+from stompclient import PublishSubscribeClient
+import config
+from uuid import uuid1 as uuid
+from threading import Thread, Event
+
+client = PublishSubscribeClient(config.STOMP_HOST, config.STOMP_PORT)
+listener = Thread(target=client.listen_forever, name='Frame-Receiver')
+listener.start()
+client.listening_event.wait()
+r = client.connect(config.STOMP_LOGIN, config.STOMP_PASSCODE)
+if not client.connection:
+ print r
+ exit(1)
+
+def put(valuemap):
+ body="\n".join(map(lambda (k,v): "{0}={1}".format(k,v), valuemap.iteritems()))
+ client.send(config.STOMP_QUEUE_PUT,
+ body=body,
+ extra_headers={"timestamp":int(time()*1000)})
+
+def get(name,query=None,timeout=5.0):
+ reply=[]
+ def unlock(frame,reply):
+ reply.append(frame.body)
+ cond.set()
+
+ replyTo='/topic/reply-' + str(uuid())
+ cond = Event()
+ client.subscribe(replyTo, lambda frame: unlock(frame,reply))
+ client.send(config.STOMP_QUEUE_GET,
+ body=query,
+ extra_headers={'name':name,
+ 'reply-to':replyTo})
+ cond.wait(timeout)
+ client.unsubscribe(replyTo)
+ return len(reply)>0 and reply[0] or None
+
+print """
+EXAMPLES
+ put({"org.xapek.test1":"asd1234"})
+ get("org.xapek.test1")
+
+"""
+
+embed()
+
+client.disconnect()
+
+
diff --git a/datastore/server.py b/datastore/server.py
new file mode 100644
index 0000000..8b9ca4e
--- /dev/null
+++ b/datastore/server.py
@@ -0,0 +1,36 @@
+from sys import exit
+from stompclient import PublishSubscribeClient
+from threading import Thread
+from store.controller import MQStoreController
+from time import sleep
+
+import config
+
+client = PublishSubscribeClient(config.STOMP_HOST, config.STOMP_PORT)
+listener = Thread(target=client.listen_forever, name='Frame-Receiver')
+listener.start()
+client.listening_event.wait()
+r = client.connect(config.STOMP_LOGIN, config.STOMP_PASSCODE)
+if r.command != "CONNECTED":
+ print "Failed to connect to {0}:{1}".format(config.STOMP_HOST,config.STOMP_PORT)
+ print r
+ exit(1)
+
+
+controller = MQStoreController(client, config.CHANNELS, config.PROCESSORS)
+client.subscribe(config.STOMP_QUEUE_PUT, controller.cb_put)
+client.subscribe(config.STOMP_QUEUE_GET, controller.cb_get)
+client.subscribe(config.STOMP_QUEUE_SUBSCRIBE, controller.cb_subscribe)
+
+while True:
+ try:
+ sleep(1)
+ except KeyboardInterrupt:
+ print "shutdown"
+ try:
+ client.shutdown_event.set()
+ except:
+ pass
+ exit(0)
+
+
diff --git a/datastore/store/__init__.py b/datastore/store/__init__.py
new file mode 100644
index 0000000..792d600
--- /dev/null
+++ b/datastore/store/__init__.py
@@ -0,0 +1 @@
+#
diff --git a/datastore/store/channel.py b/datastore/store/channel.py
new file mode 100644
index 0000000..2a5d5f4
--- /dev/null
+++ b/datastore/store/channel.py
@@ -0,0 +1,25 @@
+class IChannel:
+ def __init__(self): raise NotImplementedError()
+ def add(self,timestamp,data,controller): pass
+ def get(self,query): pass
+
+class PrinterChannel(IChannel):
+ def __init__(self,name):
+ self.name = name
+ def add(self,timestamp,data,controller):
+ print "{0}: {1},{2}".format(self.name, timestamp, data)
+
+class SimpleMemoryChannel(IChannel):
+ def __init__(self):
+ self.data = ""
+ def add(self,timestamp,data,controller):
+ self.data = data
+ def get(self,query):
+ return str(self.data)
+
+class IntervalStoreChannel(IChannel):
+ SECONDS=1000
+ MINUTES=60*1000
+ HOURS=60*60*1000
+ DAYS=24*60*60*1000
+ def __init__(self): pass
diff --git a/datastore/store/controller.py b/datastore/store/controller.py
new file mode 100644
index 0000000..0814488
--- /dev/null
+++ b/datastore/store/controller.py
@@ -0,0 +1,89 @@
+import traceback
+
+class SubscriptionManager:
+ def __init__(self):
+ self.s = {}
+
+ def subscribe(self,name,replyTo):
+ if not self.s.has_key(name):
+ self.s[name] = []
+ assert self.s.has_key(name)
+
+ if replyTo not in self.s[name]:
+ self.s[name].append(replyTo)
+
+ def unsubscribe(self,name,replyTo):
+ assert self.s.has_key(name)
+ assert replyTo in self.s[name]
+ self.s[name].remove(replyTo)
+
+ def notify(self, name, timestamp, data, controller):
+ if not self.s.has_key(name):
+ return
+ for replyTo in self.s[name]:
+ controller.client.send(replyTo,
+ "{0}={1}".format(name,dat),
+ extra_headers={'timestamp':timestamp})
+
+class StoreController:
+ def __init__(self, client, channels, processors):
+ self.client = client
+ self.channels = channels
+ self.processors = processors
+ self.subscriptions = SubscriptionManager()
+
+ def put(self, name, timestamp, data):
+ assert self.channels.has_key(name)
+
+ self.channels[name].add(timestamp, data, self)
+ self.subscriptions.notify(name, timestamp, data, self)
+ if self.processors.has_key(name):
+ for processor in self.processors[name]:
+ processor.process(name, timestamp, data, self)
+
+ def get(self, name, query):
+ assert self.channels.has_key(name)
+ return self.channels[name].get(query)
+
+ def subscribe(self,name, replyTo):
+ assert self.channels.has_key(frame.name)
+ self.subscriptions.subscribe(name, replyTo)
+
+ def unsubscribe(self,name,replyTo):
+ assert self.channels.has_key(frame.name)
+ self.subscriptions.unsubscribe(name,replyTo)
+
+class MQStoreController(StoreController):
+ def cb_put(self, frame):
+ try:
+ assert len(frame.timestamp) > 0
+ values = map(lambda kv: kv.split("="), frame.body.split("\n"))
+ for (name,data) in values:
+ assert self.channels.has_key(name)
+ print name,"=",data
+
+ for (name,data) in values:
+ self.put(name, frame.timestamp, data)
+ except Exception,e: traceback.print_exc()
+
+
+ def cb_get(self, frame):
+ try:
+ assert len(frame.headers['reply-to']) > 0
+ r = self.get(frame.name, frame.body)
+ print "send",frame.name,r
+ self.client.send(frame.headers['reply-to'],
+ body=r)
+ except Exception,e: traceback.print_exc()
+
+
+ def cb_subscribe(self, frame):
+ try:
+ assert len(frame.headers['reply-to']) > 0
+ assert frame.action in ["subscribe","unsubscribe"]
+
+ if frame.action == "subscribe":
+ self.subscribe(frame.name, frame.headers['reply-to'])
+ elif frame.action == "unsubscribe":
+ self.unsubscribe(frame.name, frame.headers['reply-to'])
+ except Exception,e: traceback.print_exc()
diff --git a/datastore/store/processor.py b/datastore/store/processor.py
new file mode 100644
index 0000000..9d39d9a
--- /dev/null
+++ b/datastore/store/processor.py
@@ -0,0 +1,16 @@
+class IProcessor:
+ def __init__(self): raise NotImplementedError()
+ def process(self, name, timestamp, data, controller): pass
+
+class CountingProcessor(IProcessor):
+ def __init__(self, destination):
+ self.destination = destination
+
+ def process(self,name,timestamp,data,controller):
+ current = controller.get(self.destination,None)
+ if not current:
+ current = 0
+ current = int(current)
+
+ controller.put(self.destination, timestamp, current + 1)
+
diff --git a/datastore/store/timeseries.py b/datastore/store/timeseries.py
new file mode 100644
index 0000000..8e30e4e
--- /dev/null
+++ b/datastore/store/timeseries.py
@@ -0,0 +1,264 @@
+#!/usr/bin/env python
+from collections import namedtuple
+import struct
+import time
+import glob
+import os
+
+import logging
+
+class Constants:
+ DB_MAX_NAME_LENGTH = 10
+ DB_WRITEBUF_LENGTH = 10
+ DBDAY_FORMAT_DAY = "%d%m%Y"
+ DB_OPEN_FILES = 5
+
+ INT_MILLI = 1
+ INT_SEC = 1000
+ INT_MIN = 60 * INT_SEC
+ INT_HOUR = 60 * INT_MIN
+ INT_DAY = 24 * INT_HOUR
+ INT_WEEK = 7 * INT_DAY
+ INT_MONTH = 31 * INT_DAY
+ INT_YEAR = 365 * INT_DAY
+
+C = Constants
+
+def time_ms():
+ return int(time.time() * 1000)
+
+class DB(object):
+ def __init__(self, interval, fileinterval, directory, name):
+ assert len(name) <= Constants.DB_MAX_NAME_LENGTH, \
+ "name must be smaller than {0} (is {1})".format(Constants.DB_MAX_NAME_LENGTH, len(name))
+ self.interval = interval
+ self.fileinterval = fileinterval
+ self.directory = directory
+ self.name = name
+ self.isize = self.item_size() # item_size
+ self._files = {}
+ self._write_buf = {}
+ self.last_access = []
+ for path in glob.glob(self.directory + os.path.sep + name + "*"):
+ dbday = DBSegment.fopen(path, self)
+ self._files[dbday.filekey] = dbday
+
+ def __getitem__(self, key):
+ return self.get(key)
+
+ def get(self, key):
+ raise NotImplementedError()
+
+ def _get(self, key):
+ if not isinstance(key, int):
+ raise TypeError("DB keys must be millisecond timestamps (int)")
+
+ logging.debug("get -> key %s fileinterval %s interval %s", key, self.fileinterval, self.interval)
+ filekey = self._calc_segment_key(key)
+ key = self._calc_key(key)
+ logging.debug("get => filekey %s key %s", filekey, key )
+
+ if (filekey, key) in self._write_buf.keys():
+ logging.debug("Write-Cache hit %s", key)
+ return self._write_buf[(filekey, key)]
+
+ db_segment = self.get_segment(filekey)
+ return db_segment[key]
+
+ def __setitem__(self, key, value):
+ return self.set(key, value)
+
+ def set(self, key, value):
+ raise NotImplementedError()
+
+ def _set(self, key, value):
+ if not isinstance(key, int):
+ raise TypeError("DB keys must be millisecond timestamps (int)")
+
+ logging.debug("get -> key %s fileinterval %s interval %s", key, self.fileinterval, self.interval)
+ filekey = self._calc_segment_key(key)
+ key = self._calc_key(key)
+ logging.debug("get => filekey %s key %s", filekey, key )
+
+ self._write_buf[(filekey, key)] = value
+ if len(self._write_buf) > Constants.DB_WRITEBUF_LENGTH:
+ self._flush_buf()
+
+ return value
+
+ def get_segment(self, filekey):
+ if filekey in self._files.keys():
+ return self._files[filekey]
+ else:
+ header = DBSegment.header( filekey=filekey,
+ isize=self.isize, interval=self.interval,
+ fileinterval=self.fileinterval, dbname=self.name)
+ path = self.directory + os.path.sep + self.name + str(filekey)
+ db_segment = DBSegment(self, path, header)
+ logging.info("New File %s", db_segment.path)
+ self._files[filekey] = db_segment
+ return db_segment
+
+
+
+ def _calc_key(self, key):
+ return int( (key % self.fileinterval) / self.interval)
+
+ def _calc_segment_key(self, key):
+ return int( key / self.fileinterval )
+
+ def _flush_buf(self):
+ logging.debug("flush")
+ keys = self._write_buf.keys()
+ keys.sort()
+ for (filekey,key) in keys:
+ db_segment = self.get_segment(filekey)
+ db_segment[key] = self._write_buf[(filekey, key)]
+ self._write_buf.clear()
+
+ def close(self):
+ self._flush_buf()
+
+ def item_size(self):
+ """"""
+ raise NotImplementedError()
+
+class InvalidSegmentError(Exception):
+ def __init__(self, header, *args):
+ Exception.__init__(self, *args)
+ self.header = header
+
+class _LazyEval(object):
+ def __init__(self, func, *args, **kwargs):
+ self.func, self.args, self.kwargs = func, args, kwargs
+ def __str__(self):
+ return self.func(*self.args, **self.kwargs)
+
+def _to_hexstring(input_string):
+ def _do(string):
+ return " ".join(map(lambda c:"%02x"%ord(c), string)).upper()
+ return _LazyEval(_do, input_string)
+
+class DBSegment(object):
+ header = namedtuple('Header', 'filekey isize interval fileinterval dbname')
+ header_format = "iiii%dp"%Constants.DB_MAX_NAME_LENGTH
+ header_size = struct.calcsize( "iiii%dp"%Constants.DB_MAX_NAME_LENGTH )
+
+ @staticmethod
+ def fopen(path, db):
+ filekey = int( os.path.basename(path)[len(db.name):] )
+ with file(path, "rb") as fh:
+ header = DBSegment.header._make( struct.unpack( DBSegment.header_format, fh.read( DBSegment.header_size ) ) )
+ if not (header.filekey == filekey
+ and header.interval == db.interval
+ and header.fileinterval == db.fileinterval
+ and header.dbname == db.name):
+ raise InvalidSegmentError(header)
+ f = DBSegment(db, path, header)
+ return f
+
+ def __init__(self, db, path, header):
+ self.db = db
+ self.path = path
+ self.header = header
+
+ self.filekey = header.filekey
+ if not os.path.exists(self.path):
+ logging.info("Write header in new file %s", self.path)
+ self.fh = file(self.path, "wb")
+ self.fh.write( struct.pack( DBSegment.header_format, *list(header) ) )
+ end = ( DBSegment.header_size +
+ ( self.db.fileinterval / self.db.interval ) * self.db.isize)
+ self.fh.seek( end )
+ self.fh.write( "\0" )
+ self.fh.close()
+ self.fh = None
+ self.open()
+
+ def __setitem__(self, key, data):
+ logging.debug("Set %s = %s", key, _to_hexstring(data))
+ self._seek(key)
+ self.fh.write( data[:self.db.isize] )
+ self.lastpos = self.fh.tell()
+
+ def __getitem__(self, key):
+ self._seek(key)
+ data = self.fh.read( self.db.isize )
+ self.lastpos = self.fh.tell()
+ logging.debug("Get %s => %s", key, _to_hexstring(data))
+ return data
+
+ def _seek(self, key):
+ offset = DBSegment.header_size + \
+ key * self.db.isize
+
+ if self.lastpos == offset:
+ logging.debug("continue at %s", self.lastpos)
+ else:
+ logging.debug("seek to %s in %s instance %s (lastpos = %s)", offset, self.path, id(self), self.lastpos)
+ if not self.fh:
+ self.open()
+ self.fh.seek( offset )
+
+ def open(self):
+ if not self.fh:
+ logging.debug("Open %s",self.path)
+ self.fh = file(self.path, "r+b")
+ self.lastpos = self.fh.tell()
+
+ def close(self):
+ logging.info("Close %s instance %s", self.path, id(self))
+ if self.fh:
+ self.fh.close()
+ self.fh = None
+
+class DBInt(DB):
+ payload = namedtuple("payload", "x y z")
+ payload_format = "iii"
+ def get(self, key):
+ return DBInt.payload._make(
+ struct.unpack(
+ DBInt.payload_format, self._get(key) ) )
+
+ def set(self, key, value):
+ return self._set(key,
+ struct.pack(DBInt.payload_format, *list(value)))
+
+ def item_size(self):
+ return struct.calcsize( DBInt.payload_format )
+
+class DBSimpleInt(DB):
+ def get(self, key):
+ return struct.unpack("i", self._get(key))[0]
+
+ def set(self, key, value):
+ return self._set(key,
+ struct.pack("i", value))
+
+ def item_size(self):
+ return struct.calcsize( "i" )
+
+import random
+if __name__ == "__main__":
+ #logging.basicConfig(level=logging.DEBUG)
+ logging.basicConfig(level=logging.INFO)
+ #logging.basicConfig(level=logging.ERROR)
+
+ # Alle 5 sek.
+ start = time_ms()
+ stop = time_ms() + Constants.INT_DAY*2
+
+ db = DBSimple(1000 * Constants.INT_MILLI, Constants.INT_DAY, ".", "testdb")
+ rand = 123456
+ for ts in range(start, stop, 1000*Constants.INT_MILLI):
+ db[ts] = -ts % rand
+ assert db[ts] == -ts%rand, "%s != %s"%(db[ts], ts)
+
+ r = range(start, stop, 1000*Constants.INT_MILLI)
+ r.reverse()
+ for ts in r:
+ assert db[ts] == -ts%rand, "%s != %s"%(db[ts], -ts%rand)
+
+
+ db.close()
+