summaryrefslogtreecommitdiff
path: root/ebus/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'ebus/__init__.py')
-rw-r--r--ebus/__init__.py189
1 files changed, 31 insertions, 158 deletions
diff --git a/ebus/__init__.py b/ebus/__init__.py
index 23e6595..832e9ff 100644
--- a/ebus/__init__.py
+++ b/ebus/__init__.py
@@ -8,102 +8,7 @@ import os
from lxml import objectify
from lxml import etree
-"""ebus specific data field formats"""
-class fields:
- class DataField(object):
- def __init__(self, name, offset):
- self.name = name
- self.offset = offset
- def value(self,data):
- raise NotImplemented()
-
- class Data1b(DataField):
- """
- Beispiel für die Berechnung:
- if ((x & 80h) == 80h) // y negativ
- y = - [dez(!x) + 1]
- else
- y = dez(x)
- """
- def value(self,data):
- x=ord(data[self.offset])
- if x & 0x80 == 0x80:
- return (-1) * (0xff^x + 1)
- else:
- return x
-
- class Data1c(DataField):
- def value(self,data):
- return ord(data[self.offset])/2.0
-
- class Data2c(DataField):
- """
- Beispiel für die Berechnung:
- if ((x & 8000h) == 8000h) // y negativ
- y = - [dez(High_Byte(!x)) 16 + dez(High_Nibble (Low_Byte (!x)))
- + (dez(Low_Nibble (Low_Byte (!x))) +1 ) / 16]
- else // y positiv
- y = dez(High_Byte(x)) 16 + dez(High_ Nibble (Low Byte (x)))
- + dez(Low_ Nibble (Low Byte (x))) / 16
- """
- def value(self,data):
- highByte = ord(data[self.offset+1])
- lowByte= ord(data[self.offset])
- if (0x8000 & (highByte<<8 | lowByte)) == 0x8000:
- return (-1) * ( (0xff^highByte)*16 + (0xff^(lowByte>>4)) + (0x0ff^(0xf&lowByte) + 1)/16.0 )
- else:
- return highByte*16 + (lowByte>>4) + (lowByte&0xf)/16.0
-
- class Data2b(DataField):
- """
- if ((x&8000h) == 8000h) // y negativ
- y = - [dez(High_Byte(!x)) + (dez(Low_Byte(!x)) + 1) / 256]
- else // y positiv
- y = dez(High_Byte (x)) + dez(Low_Byte (x)) / 256
- """
- def value(self,data):
- highByte = ord(data[self.offset+1])
- lowByte= ord(data[self.offset])
- if (0x8000 & (highByte<<8 | lowByte)) == 0x8000:
- return (-1) * ((0xff^highByte) + (0xff^lowByte+1)/256.0)
- else:
- return highByte + lowByte/256.0
-
- class Bit(DataField):
- def value(self, data):
- return ord(data[self.offset]) == 0x1
-
- class Word(DataField):
- def value(self, data):
- lb, hb = data[self.offset], data[self.offset+1]
- return ord(lb) + (ord(hb)<<8)
-
- class Bcd(DataField):
- """
- y = dez(High_Nibble(x))*10 + dez(Low_Nibble(x))
- """
- def value(self, data):
- byte = ord(data[self.offset])
- return (byte >> 4) * 10 + (byte & 0xf)
-
- class Byte(DataField):
- def value(self, data):
- return ord(data[self.offset])
-
-
- class ByteEnum(DataField):
- def __init__(self, name, offset, values):
- fields.DataField.__init__(self, name, offset)
- self.values = values
-
- def value(self, data):
- value = ord(data[self.offset])
- if self.values.has_key(value):
- return self.values[value]
- else:
- return None
-
-class EbusXMLMixin(object):
+class EbusPacket(object):
EBUS_SPECIFICATION = os.path.join(os.path.dirname(__file__), "ebus_specification.xml")
ebus_xml = objectify.parse(open(EBUS_SPECIFICATION))
@@ -113,83 +18,46 @@ class EbusXMLMixin(object):
address="#x%.2x"%address)]
return len(d)>0 and d[0] or None
- def _get_packet(self):
+ def __init__(self, source, destination, primary_command, secondary_command):
+ self.source = source
+ self.destination = destination
+ self.primary_command = primary_command
+ self.secondary_command = secondary_command
+
+ def __str__(self):
+ #XXX self.length only in subclasses
+ return "<%-18s name=\"%s\" source=\"0x%.2x\" destination=\"0x%.2x\" primary=0x%x secondary=0x%x length=0x%x %s>" % \
+ (self.__class__.__name__, self.name(), self.source, self.destination, \
+ self.primary_command, self.secondary_command, self.length, \
+ " ".join(map(lambda name: "%s=%s" % (name, self.values()[name]),self.values())))
+
+ def get_packet_description(self):
p=EbusXMLMixin.ebus_xml.xpath("/ebus/packets/packet[@primary=$primary and @secondary=$secondary]",
primary="#x%.2x"%self.primary_command,
secondary="#x%.2x"%self.secondary_command)
if len(p)>0 and p[0] is not None:
return p[0]
else:
- return None
+ raise "Unknown Packet, primary_command=#x%.2 secondary_command=#x%.2" % (
+ self.primary_command,
+ self.secondardy_command)
- def _get_source(self):
+ def _get_source_name(self):
s=EbusXMLMixin.ebus_xml.xpath("/ebus/devices/device[@address=$address]",
address="#x%.2x"%self.source)
return len(s)>0 and s[0] or None
- def _get_destination(self):
+ def _get_destination_name(self):
d=EbusXMLMixin.ebus_xml.xpath("/ebus/devices/device[@address=$address]",
address="#x%.2x"%self.destination)
return len(d)>0 and d[0] or None
def name(self):
- return self._get_packet().get("name")
+ return self.get_packet_description().get("name")
def description(self):
- return self._get_packet().get("description")
-
- def format(self):
- packet = self._get_packet()
- if packet is not None:
- for field in packet.fields.iterchildren():
- name = field.get("name")
- offset = field.get("offset")
- if not name:
- continue
- elif field.tag == "bit":
- yield fields.Bit( name, int(offset) )
- elif field.tag == "byte":
- yield fields.Byte( name, int(offset) )
- elif field.tag == "word":
- yield fields.Word( name, int(offset) )
- elif field.tag == "data1b":
- yield fields.Data1b( name, int(offset) )
- elif field.tag == "data1c":
- yield fields.Data1c( name, int(offset) )
- elif field.tag == "data2b":
- yield fields.Data2b( name, int(offset) )
- elif field.tag == "data2c":
- yield fields.Data2c( name, int(offset) )
- elif field.tag == "bcd":
- yield fields.Bcd( name, int(offset) )
- elif field.tag == "byteEnum":
- options = dict(map(lambda opt: ( int(opt.get("value")[2:],16), opt.get("name")),
- field.xpath("./option")))
- yield fields.ByteEnum( name, int(offset), options)
- else:
- print "error: %s in %s" % (name, self.name())
-
-
-def formatHex(data):
- return " ".join(map(lambda byte: "%.2x"%ord(byte), data))
-
-class EbusPacket(EbusXMLMixin):
- def __init__(self, source, destination, primary_command, secondary_command):
- self.source = source
- self.destination = destination
- self.primary_command = primary_command
- self.secondary_command = secondary_command
+ return self.get_packet_description().get("description")
- def __str__(self):
- #XXX self.length only in subclasses
- return "<%-18s name=\"%s\" source=\"0x%.2x\" destination=\"0x%.2x\" primary=0x%x secondary=0x%x length=0x%x %s>" % \
- (self.__class__.__name__, self.name(), self.source, self.destination, \
- self.primary_command, self.secondary_command, self.length, \
- " ".join(map(lambda name: "%s=%s" % (name, self.values()[name]),self.values())))
-
- def values(self):
- return dict( map(lambda format: (format.name, format.value(self.data) ),
- self.format() ))
class EbusMasterMaster(EbusPacket):
def __init__(self, source, destination, primary_command, secondary_command, data):
@@ -269,9 +137,15 @@ class EbusReader(asynchat.async_chat):
payload = data[5:5+payloadLength]
if self.debug:
- print "\033[1;31m%.2x %.2x\033[1;m \033[1;33m%.2x %.2x\033[1;m \033[1;30m%.2x\033[1;m \033[1;45m%s\033[1;m" % (source,destination,primaryCommand,secondaryCommand,payloadLength,formatHex(payload))
+ print "\033[1;31m%.2x %.2x\033[1;m \033[1;33m%.2x %.2x\033[1;m \033[1;30m%.2x\033[1;m \033[1;45m%s\033[1;m" % (
+ source,
+ destination,
+ primaryCommand,
+ secondaryCommand,
+ payloadLength,
+ " ".join(map(lambda byte: "%.2x"%ord(byte), payload)))
+
- p = None
if sourceType == 'master' and destinationType == 'master':
p = EbusMasterMaster(source, destination, primaryCommand, secondaryCommand, payload)
self.handle_ebus(p)
@@ -282,8 +156,7 @@ class EbusReader(asynchat.async_chat):
p = EbusBroadcast(source, destination, primaryCommand, secondaryCommand, payload)
self.handle_ebus(p)
else:
- print >>sys.stderr, "KOMISCHES ZEUG"
- print >>sys.stderr, "source=%s sourceType=%s destination=%s destType=%s" % \
+ print >>sys.stderr, "SKIP source=%s sourceType=%s destination=%s destType=%s" % \
(source, sourceType, destination, destinationType)
def handle_ebus(self,ebus_packet):