summaryrefslogtreecommitdiff
path: root/datastore/store
diff options
context:
space:
mode:
Diffstat (limited to 'datastore/store')
-rw-r--r--datastore/store/__init__.py1
-rw-r--r--datastore/store/channel.py25
-rw-r--r--datastore/store/controller.py93
-rw-r--r--datastore/store/processor.py16
-rw-r--r--datastore/store/timeseries.py264
5 files changed, 0 insertions, 399 deletions
diff --git a/datastore/store/__init__.py b/datastore/store/__init__.py
deleted file mode 100644
index 792d600..0000000
--- a/datastore/store/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-#
diff --git a/datastore/store/channel.py b/datastore/store/channel.py
deleted file mode 100644
index 1e083b5..0000000
--- a/datastore/store/channel.py
+++ /dev/null
@@ -1,25 +0,0 @@
-class IChannel:
- def __init__(self): raise NotImplementedError()
- def add(self,timestamp,data,controller): pass
- def get(self,query): pass
-
-class PrinterChannel(IChannel):
- def __init__(self,name):
- self.name = name
- def add(self,timestamp,data,controller):
- print "{0}: timestamp={1} value={2}".format(self.name, timestamp, data)
-
-class SimpleMemoryChannel(IChannel):
- def __init__(self):
- self.data = ""
- def add(self,timestamp,data,controller):
- self.data = data
- def get(self,query):
- return str(self.data)
-
-class IntervalStoreChannel(IChannel):
- SECONDS=1000
- MINUTES=60*1000
- HOURS=60*60*1000
- DAYS=24*60*60*1000
- def __init__(self): pass
diff --git a/datastore/store/controller.py b/datastore/store/controller.py
deleted file mode 100644
index 0d694d1..0000000
--- a/datastore/store/controller.py
+++ /dev/null
@@ -1,93 +0,0 @@
-import traceback
-import time
-
-class SubscriptionManager:
- def __init__(self):
- self.s = {}
-
- def subscribe(self,name,replyTo):
- if not self.s.has_key(name):
- self.s[name] = []
- assert self.s.has_key(name)
-
- if replyTo not in self.s[name]:
- self.s[name].append(replyTo)
-
- def unsubscribe(self,name,replyTo):
- assert self.s.has_key(name)
- assert replyTo in self.s[name]
- self.s[name].remove(replyTo)
-
- def notify(self, name, timestamp, data, controller):
- if not self.s.has_key(name):
- return
- for replyTo in self.s[name]:
- controller.client.send(replyTo,
- "{0}={1}".format(name,dat),
- extra_headers={'timestamp':timestamp})
-
-class StoreController:
- def __init__(self, client, channels, processors):
- self.client = client
- self.channels = channels
- self.processors = processors
- self.subscriptions = SubscriptionManager()
-
- def put(self, name, timestamp, data):
- assert self.channels.has_key(name)
-
- self.channels[name].add(timestamp, data, self)
- self.subscriptions.notify(name, timestamp, data, self)
- if self.processors.has_key(name):
- for processor in self.processors[name]:
- processor.process(name, timestamp, data, self)
-
- def get(self, name, query):
- assert self.channels.has_key(name)
- return self.channels[name].get(query)
-
- def subscribe(self,name, replyTo):
- assert self.channels.has_key(frame.name)
- self.subscriptions.subscribe(name, replyTo)
-
- def unsubscribe(self,name,replyTo):
- assert self.channels.has_key(frame.name)
- self.subscriptions.unsubscribe(name,replyTo)
-
-class MQStoreController(StoreController):
- def cb_put(self, frame):
- try:
- if not frame.headers.has_key("timestamp"):
- frame.headers['timestamp'] = str(time.time())
- assert len(frame.timestamp) > 0
- values = map(lambda kv: kv.split("="), frame.body.split("\n"))
- values = filter(lambda x: len(x)==2, values) #filter non-pairs from empty lines
- for (name,data) in values:
- if name not in self.channels.keys():
- print "No channel named {0}".format(name)
- continue
- print name,"=",data
- self.put(name, frame.timestamp, data)
- except Exception,e: traceback.print_exc()
-
-
- def cb_get(self, frame):
- try:
- assert len(frame.headers['reply-to']) > 0
- r = self.get(frame.name, frame.body)
- print "send",frame.name,r
- self.client.send(frame.headers['reply-to'],
- body=r)
- except Exception,e: traceback.print_exc()
-
-
- def cb_subscribe(self, frame):
- try:
- assert len(frame.headers['reply-to']) > 0
- assert frame.action in ["subscribe","unsubscribe"]
-
- if frame.action == "subscribe":
- self.subscribe(frame.name, frame.headers['reply-to'])
- elif frame.action == "unsubscribe":
- self.unsubscribe(frame.name, frame.headers['reply-to'])
- except Exception,e: traceback.print_exc()
diff --git a/datastore/store/processor.py b/datastore/store/processor.py
deleted file mode 100644
index 9d39d9a..0000000
--- a/datastore/store/processor.py
+++ /dev/null
@@ -1,16 +0,0 @@
-class IProcessor:
- def __init__(self): raise NotImplementedError()
- def process(self, name, timestamp, data, controller): pass
-
-class CountingProcessor(IProcessor):
- def __init__(self, destination):
- self.destination = destination
-
- def process(self,name,timestamp,data,controller):
- current = controller.get(self.destination,None)
- if not current:
- current = 0
- current = int(current)
-
- controller.put(self.destination, timestamp, current + 1)
-
diff --git a/datastore/store/timeseries.py b/datastore/store/timeseries.py
deleted file mode 100644
index 8e30e4e..0000000
--- a/datastore/store/timeseries.py
+++ /dev/null
@@ -1,264 +0,0 @@
-#!/usr/bin/env python
-from collections import namedtuple
-import struct
-import time
-import glob
-import os
-
-import logging
-
-class Constants:
- DB_MAX_NAME_LENGTH = 10
- DB_WRITEBUF_LENGTH = 10
- DBDAY_FORMAT_DAY = "%d%m%Y"
- DB_OPEN_FILES = 5
-
- INT_MILLI = 1
- INT_SEC = 1000
- INT_MIN = 60 * INT_SEC
- INT_HOUR = 60 * INT_MIN
- INT_DAY = 24 * INT_HOUR
- INT_WEEK = 7 * INT_DAY
- INT_MONTH = 31 * INT_DAY
- INT_YEAR = 365 * INT_DAY
-
-C = Constants
-
-def time_ms():
- return int(time.time() * 1000)
-
-class DB(object):
- def __init__(self, interval, fileinterval, directory, name):
- assert len(name) <= Constants.DB_MAX_NAME_LENGTH, \
- "name must be smaller than {0} (is {1})".format(Constants.DB_MAX_NAME_LENGTH, len(name))
- self.interval = interval
- self.fileinterval = fileinterval
- self.directory = directory
- self.name = name
- self.isize = self.item_size() # item_size
- self._files = {}
- self._write_buf = {}
- self.last_access = []
- for path in glob.glob(self.directory + os.path.sep + name + "*"):
- dbday = DBSegment.fopen(path, self)
- self._files[dbday.filekey] = dbday
-
- def __getitem__(self, key):
- return self.get(key)
-
- def get(self, key):
- raise NotImplementedError()
-
- def _get(self, key):
- if not isinstance(key, int):
- raise TypeError("DB keys must be millisecond timestamps (int)")
-
- logging.debug("get -> key %s fileinterval %s interval %s", key, self.fileinterval, self.interval)
- filekey = self._calc_segment_key(key)
- key = self._calc_key(key)
- logging.debug("get => filekey %s key %s", filekey, key )
-
- if (filekey, key) in self._write_buf.keys():
- logging.debug("Write-Cache hit %s", key)
- return self._write_buf[(filekey, key)]
-
- db_segment = self.get_segment(filekey)
- return db_segment[key]
-
- def __setitem__(self, key, value):
- return self.set(key, value)
-
- def set(self, key, value):
- raise NotImplementedError()
-
- def _set(self, key, value):
- if not isinstance(key, int):
- raise TypeError("DB keys must be millisecond timestamps (int)")
-
- logging.debug("get -> key %s fileinterval %s interval %s", key, self.fileinterval, self.interval)
- filekey = self._calc_segment_key(key)
- key = self._calc_key(key)
- logging.debug("get => filekey %s key %s", filekey, key )
-
- self._write_buf[(filekey, key)] = value
- if len(self._write_buf) > Constants.DB_WRITEBUF_LENGTH:
- self._flush_buf()
-
- return value
-
- def get_segment(self, filekey):
- if filekey in self._files.keys():
- return self._files[filekey]
- else:
- header = DBSegment.header( filekey=filekey,
- isize=self.isize, interval=self.interval,
- fileinterval=self.fileinterval, dbname=self.name)
- path = self.directory + os.path.sep + self.name + str(filekey)
- db_segment = DBSegment(self, path, header)
- logging.info("New File %s", db_segment.path)
- self._files[filekey] = db_segment
- return db_segment
-
-
-
- def _calc_key(self, key):
- return int( (key % self.fileinterval) / self.interval)
-
- def _calc_segment_key(self, key):
- return int( key / self.fileinterval )
-
- def _flush_buf(self):
- logging.debug("flush")
- keys = self._write_buf.keys()
- keys.sort()
- for (filekey,key) in keys:
- db_segment = self.get_segment(filekey)
- db_segment[key] = self._write_buf[(filekey, key)]
- self._write_buf.clear()
-
- def close(self):
- self._flush_buf()
-
- def item_size(self):
- """"""
- raise NotImplementedError()
-
-class InvalidSegmentError(Exception):
- def __init__(self, header, *args):
- Exception.__init__(self, *args)
- self.header = header
-
-class _LazyEval(object):
- def __init__(self, func, *args, **kwargs):
- self.func, self.args, self.kwargs = func, args, kwargs
- def __str__(self):
- return self.func(*self.args, **self.kwargs)
-
-def _to_hexstring(input_string):
- def _do(string):
- return " ".join(map(lambda c:"%02x"%ord(c), string)).upper()
- return _LazyEval(_do, input_string)
-
-class DBSegment(object):
- header = namedtuple('Header', 'filekey isize interval fileinterval dbname')
- header_format = "iiii%dp"%Constants.DB_MAX_NAME_LENGTH
- header_size = struct.calcsize( "iiii%dp"%Constants.DB_MAX_NAME_LENGTH )
-
- @staticmethod
- def fopen(path, db):
- filekey = int( os.path.basename(path)[len(db.name):] )
- with file(path, "rb") as fh:
- header = DBSegment.header._make( struct.unpack( DBSegment.header_format, fh.read( DBSegment.header_size ) ) )
- if not (header.filekey == filekey
- and header.interval == db.interval
- and header.fileinterval == db.fileinterval
- and header.dbname == db.name):
- raise InvalidSegmentError(header)
- f = DBSegment(db, path, header)
- return f
-
- def __init__(self, db, path, header):
- self.db = db
- self.path = path
- self.header = header
-
- self.filekey = header.filekey
- if not os.path.exists(self.path):
- logging.info("Write header in new file %s", self.path)
- self.fh = file(self.path, "wb")
- self.fh.write( struct.pack( DBSegment.header_format, *list(header) ) )
- end = ( DBSegment.header_size +
- ( self.db.fileinterval / self.db.interval ) * self.db.isize)
- self.fh.seek( end )
- self.fh.write( "\0" )
- self.fh.close()
- self.fh = None
- self.open()
-
- def __setitem__(self, key, data):
- logging.debug("Set %s = %s", key, _to_hexstring(data))
- self._seek(key)
- self.fh.write( data[:self.db.isize] )
- self.lastpos = self.fh.tell()
-
- def __getitem__(self, key):
- self._seek(key)
- data = self.fh.read( self.db.isize )
- self.lastpos = self.fh.tell()
- logging.debug("Get %s => %s", key, _to_hexstring(data))
- return data
-
- def _seek(self, key):
- offset = DBSegment.header_size + \
- key * self.db.isize
-
- if self.lastpos == offset:
- logging.debug("continue at %s", self.lastpos)
- else:
- logging.debug("seek to %s in %s instance %s (lastpos = %s)", offset, self.path, id(self), self.lastpos)
- if not self.fh:
- self.open()
- self.fh.seek( offset )
-
- def open(self):
- if not self.fh:
- logging.debug("Open %s",self.path)
- self.fh = file(self.path, "r+b")
- self.lastpos = self.fh.tell()
-
- def close(self):
- logging.info("Close %s instance %s", self.path, id(self))
- if self.fh:
- self.fh.close()
- self.fh = None
-
-class DBInt(DB):
- payload = namedtuple("payload", "x y z")
- payload_format = "iii"
- def get(self, key):
- return DBInt.payload._make(
- struct.unpack(
- DBInt.payload_format, self._get(key) ) )
-
- def set(self, key, value):
- return self._set(key,
- struct.pack(DBInt.payload_format, *list(value)))
-
- def item_size(self):
- return struct.calcsize( DBInt.payload_format )
-
-class DBSimpleInt(DB):
- def get(self, key):
- return struct.unpack("i", self._get(key))[0]
-
- def set(self, key, value):
- return self._set(key,
- struct.pack("i", value))
-
- def item_size(self):
- return struct.calcsize( "i" )
-
-import random
-if __name__ == "__main__":
- #logging.basicConfig(level=logging.DEBUG)
- logging.basicConfig(level=logging.INFO)
- #logging.basicConfig(level=logging.ERROR)
-
- # Alle 5 sek.
- start = time_ms()
- stop = time_ms() + Constants.INT_DAY*2
-
- db = DBSimple(1000 * Constants.INT_MILLI, Constants.INT_DAY, ".", "testdb")
- rand = 123456
- for ts in range(start, stop, 1000*Constants.INT_MILLI):
- db[ts] = -ts % rand
- assert db[ts] == -ts%rand, "%s != %s"%(db[ts], ts)
-
- r = range(start, stop, 1000*Constants.INT_MILLI)
- r.reverse()
- for ts in r:
- assert db[ts] == -ts%rand, "%s != %s"%(db[ts], -ts%rand)
-
-
- db.close()
-