diff options
author | Yves <yvesf-git@xapek.org> | 2010-08-15 17:08:36 +0200 |
---|---|---|
committer | Yves <yvesf-git@xapek.org> | 2010-08-15 17:08:36 +0200 |
commit | 3e70cb0892a703888e913ee5a1545f488a22a4a3 (patch) | |
tree | bc9bb09a016c5ac46f5427255d24becb2d8242df /ebus/__init__.py | |
parent | d17fc7f9184312cf9af7fcee18cd4d71849db242 (diff) | |
download | ebus-alt-3e70cb0892a703888e913ee5a1545f488a22a4a3.tar.gz ebus-alt-3e70cb0892a703888e913ee5a1545f488a22a4a3.zip |
foo
Diffstat (limited to 'ebus/__init__.py')
-rw-r--r-- | ebus/__init__.py | 189 |
1 files changed, 31 insertions, 158 deletions
diff --git a/ebus/__init__.py b/ebus/__init__.py index 23e6595..832e9ff 100644 --- a/ebus/__init__.py +++ b/ebus/__init__.py @@ -8,102 +8,7 @@ import os from lxml import objectify from lxml import etree -"""ebus specific data field formats""" -class fields: - class DataField(object): - def __init__(self, name, offset): - self.name = name - self.offset = offset - def value(self,data): - raise NotImplemented() - - class Data1b(DataField): - """ - Beispiel für die Berechnung: - if ((x & 80h) == 80h) // y negativ - y = - [dez(!x) + 1] - else - y = dez(x) - """ - def value(self,data): - x=ord(data[self.offset]) - if x & 0x80 == 0x80: - return (-1) * (0xff^x + 1) - else: - return x - - class Data1c(DataField): - def value(self,data): - return ord(data[self.offset])/2.0 - - class Data2c(DataField): - """ - Beispiel für die Berechnung: - if ((x & 8000h) == 8000h) // y negativ - y = - [dez(High_Byte(!x)) 16 + dez(High_Nibble (Low_Byte (!x))) - + (dez(Low_Nibble (Low_Byte (!x))) +1 ) / 16] - else // y positiv - y = dez(High_Byte(x)) 16 + dez(High_ Nibble (Low Byte (x))) - + dez(Low_ Nibble (Low Byte (x))) / 16 - """ - def value(self,data): - highByte = ord(data[self.offset+1]) - lowByte= ord(data[self.offset]) - if (0x8000 & (highByte<<8 | lowByte)) == 0x8000: - return (-1) * ( (0xff^highByte)*16 + (0xff^(lowByte>>4)) + (0x0ff^(0xf&lowByte) + 1)/16.0 ) - else: - return highByte*16 + (lowByte>>4) + (lowByte&0xf)/16.0 - - class Data2b(DataField): - """ - if ((x&8000h) == 8000h) // y negativ - y = - [dez(High_Byte(!x)) + (dez(Low_Byte(!x)) + 1) / 256] - else // y positiv - y = dez(High_Byte (x)) + dez(Low_Byte (x)) / 256 - """ - def value(self,data): - highByte = ord(data[self.offset+1]) - lowByte= ord(data[self.offset]) - if (0x8000 & (highByte<<8 | lowByte)) == 0x8000: - return (-1) * ((0xff^highByte) + (0xff^lowByte+1)/256.0) - else: - return highByte + lowByte/256.0 - - class Bit(DataField): - def value(self, data): - return ord(data[self.offset]) == 0x1 - - class Word(DataField): - def value(self, data): - lb, hb = data[self.offset], data[self.offset+1] - return ord(lb) + (ord(hb)<<8) - - class Bcd(DataField): - """ - y = dez(High_Nibble(x))*10 + dez(Low_Nibble(x)) - """ - def value(self, data): - byte = ord(data[self.offset]) - return (byte >> 4) * 10 + (byte & 0xf) - - class Byte(DataField): - def value(self, data): - return ord(data[self.offset]) - - - class ByteEnum(DataField): - def __init__(self, name, offset, values): - fields.DataField.__init__(self, name, offset) - self.values = values - - def value(self, data): - value = ord(data[self.offset]) - if self.values.has_key(value): - return self.values[value] - else: - return None - -class EbusXMLMixin(object): +class EbusPacket(object): EBUS_SPECIFICATION = os.path.join(os.path.dirname(__file__), "ebus_specification.xml") ebus_xml = objectify.parse(open(EBUS_SPECIFICATION)) @@ -113,83 +18,46 @@ class EbusXMLMixin(object): address="#x%.2x"%address)] return len(d)>0 and d[0] or None - def _get_packet(self): + def __init__(self, source, destination, primary_command, secondary_command): + self.source = source + self.destination = destination + self.primary_command = primary_command + self.secondary_command = secondary_command + + def __str__(self): + #XXX self.length only in subclasses + return "<%-18s name=\"%s\" source=\"0x%.2x\" destination=\"0x%.2x\" primary=0x%x secondary=0x%x length=0x%x %s>" % \ + (self.__class__.__name__, self.name(), self.source, self.destination, \ + self.primary_command, self.secondary_command, self.length, \ + " ".join(map(lambda name: "%s=%s" % (name, self.values()[name]),self.values()))) + + def get_packet_description(self): p=EbusXMLMixin.ebus_xml.xpath("/ebus/packets/packet[@primary=$primary and @secondary=$secondary]", primary="#x%.2x"%self.primary_command, secondary="#x%.2x"%self.secondary_command) if len(p)>0 and p[0] is not None: return p[0] else: - return None + raise "Unknown Packet, primary_command=#x%.2 secondary_command=#x%.2" % ( + self.primary_command, + self.secondardy_command) - def _get_source(self): + def _get_source_name(self): s=EbusXMLMixin.ebus_xml.xpath("/ebus/devices/device[@address=$address]", address="#x%.2x"%self.source) return len(s)>0 and s[0] or None - def _get_destination(self): + def _get_destination_name(self): d=EbusXMLMixin.ebus_xml.xpath("/ebus/devices/device[@address=$address]", address="#x%.2x"%self.destination) return len(d)>0 and d[0] or None def name(self): - return self._get_packet().get("name") + return self.get_packet_description().get("name") def description(self): - return self._get_packet().get("description") - - def format(self): - packet = self._get_packet() - if packet is not None: - for field in packet.fields.iterchildren(): - name = field.get("name") - offset = field.get("offset") - if not name: - continue - elif field.tag == "bit": - yield fields.Bit( name, int(offset) ) - elif field.tag == "byte": - yield fields.Byte( name, int(offset) ) - elif field.tag == "word": - yield fields.Word( name, int(offset) ) - elif field.tag == "data1b": - yield fields.Data1b( name, int(offset) ) - elif field.tag == "data1c": - yield fields.Data1c( name, int(offset) ) - elif field.tag == "data2b": - yield fields.Data2b( name, int(offset) ) - elif field.tag == "data2c": - yield fields.Data2c( name, int(offset) ) - elif field.tag == "bcd": - yield fields.Bcd( name, int(offset) ) - elif field.tag == "byteEnum": - options = dict(map(lambda opt: ( int(opt.get("value")[2:],16), opt.get("name")), - field.xpath("./option"))) - yield fields.ByteEnum( name, int(offset), options) - else: - print "error: %s in %s" % (name, self.name()) - - -def formatHex(data): - return " ".join(map(lambda byte: "%.2x"%ord(byte), data)) - -class EbusPacket(EbusXMLMixin): - def __init__(self, source, destination, primary_command, secondary_command): - self.source = source - self.destination = destination - self.primary_command = primary_command - self.secondary_command = secondary_command + return self.get_packet_description().get("description") - def __str__(self): - #XXX self.length only in subclasses - return "<%-18s name=\"%s\" source=\"0x%.2x\" destination=\"0x%.2x\" primary=0x%x secondary=0x%x length=0x%x %s>" % \ - (self.__class__.__name__, self.name(), self.source, self.destination, \ - self.primary_command, self.secondary_command, self.length, \ - " ".join(map(lambda name: "%s=%s" % (name, self.values()[name]),self.values()))) - - def values(self): - return dict( map(lambda format: (format.name, format.value(self.data) ), - self.format() )) class EbusMasterMaster(EbusPacket): def __init__(self, source, destination, primary_command, secondary_command, data): @@ -269,9 +137,15 @@ class EbusReader(asynchat.async_chat): payload = data[5:5+payloadLength] if self.debug: - print "\033[1;31m%.2x %.2x\033[1;m \033[1;33m%.2x %.2x\033[1;m \033[1;30m%.2x\033[1;m \033[1;45m%s\033[1;m" % (source,destination,primaryCommand,secondaryCommand,payloadLength,formatHex(payload)) + print "\033[1;31m%.2x %.2x\033[1;m \033[1;33m%.2x %.2x\033[1;m \033[1;30m%.2x\033[1;m \033[1;45m%s\033[1;m" % ( + source, + destination, + primaryCommand, + secondaryCommand, + payloadLength, + " ".join(map(lambda byte: "%.2x"%ord(byte), payload))) + - p = None if sourceType == 'master' and destinationType == 'master': p = EbusMasterMaster(source, destination, primaryCommand, secondaryCommand, payload) self.handle_ebus(p) @@ -282,8 +156,7 @@ class EbusReader(asynchat.async_chat): p = EbusBroadcast(source, destination, primaryCommand, secondaryCommand, payload) self.handle_ebus(p) else: - print >>sys.stderr, "KOMISCHES ZEUG" - print >>sys.stderr, "source=%s sourceType=%s destination=%s destType=%s" % \ + print >>sys.stderr, "SKIP source=%s sourceType=%s destination=%s destType=%s" % \ (source, sourceType, destination, destinationType) def handle_ebus(self,ebus_packet): |