summaryrefslogtreecommitdiff
path: root/ebustest.py
diff options
context:
space:
mode:
Diffstat (limited to 'ebustest.py')
-rw-r--r--ebustest.py259
1 files changed, 0 insertions, 259 deletions
diff --git a/ebustest.py b/ebustest.py
deleted file mode 100644
index 30bce73..0000000
--- a/ebustest.py
+++ /dev/null
@@ -1,259 +0,0 @@
-# -*- coding:utf8 -*-
-import asynchat
-import asyncore
-import socket
-import sys
-
-deviceDescription = [
- {'address':0x03, 'type':'master', 'description':'Feuerungsautomat'},
- {'address':0x10, 'type':'master', 'description':'Heizungsregler #2'},
- {'address':0x30, 'type':'master', 'description':'Heizkreisregler 1'},
- {'address':0x70, 'type':'master', 'description':'Heizkreisregler 2'},
- {'address':0x71, 'type':'master', 'description':'Heizungsregler #9'},
- {'address':0xf1, 'type':'master', 'description':'Heizungsregler #10'},
- {'address':0x50, 'type':'slave', 'description':'Mischer 1'},
- {'address':0x51, 'type':'slave', 'description':'Mischer 2'},
- {'address':0x90, 'type':'slave', 'description':'Raumgeräte/Fernsteller 1'},
- {'address':0x91, 'type':'slave', 'description':'Raumgeräte/Fernsteller 2'},
- {'address':0xfe, 'type':'broadcast', 'description':'Broadcast'},
-]
-
-"""ebus specific data field formats"""
-class fields:
- class DataField(object):
- def __init__(self, offset):
- self.offset = offset
- def value(self,data):
- raise NotImplemented()
-
- class Data2c(DataField):
- """
- Beispiel für die Berechnung:
- if ((x & 8000h) == 8000h) // y negativ
- y = - [dez(High_Byte(!x)) 16 + dez(High_Nibble (Low_Byte (!x)))
- + (dez(Low_Nibble (Low_Byte (!x))) +1 ) / 16]
- else // y positiv
- y = dez(High_Byte(x)) 16 + dez(High_ Nibble (Low Byte (x)))
- + dez(Low_ Nibble (Low Byte (x))) / 16
- """
- def value(self,data):
- highByte = ord(data[self.offset+1])
- lowByte= ord(data[self.offset])
- if (0x8000 & (highByte<<8 | lowByte)) == 0x8000:
- return (-1) * ( (0xff^highByte)*16 + (0xf^(lowByte>>4)) + (0x0f^(0x0&flowByte) + 1)/16.0 )
- else:
- return highByte*16 + (lowByte>>4) + (lowByte&0xf)/16.0
-
- class Data2b(DataField):
- """
- if ((x&8000h) == 8000h) // y negativ
- y = - [dez(High_Byte(!x)) + (dez(Low_Byte(!x)) + 1) / 256]
- else // y positiv
- y = dez(High_Byte (x)) + dez(Low_Byte (x)) / 256
- """
- def value(self,data):
- highByte = ord(data[self.offset+1])
- lowByte= ord(data[self.offset])
- if (0x8000 & (highByte<<8 | lowByte)) == 0x8000:
- return (-1) * ((0xff^highByte) + (0xff^lowByte+1)/256.0)
- else:
- return highByte + lowByte/256.0
-
- class Bit(DataField):
- def value(self, data):
- return ord(data[self.offset]) == 0x1
-
- class ByteEnum(DataField):
- def __init__(self, offset, values):
- self.values = values
- fields.DataField.__init__(self, offset)
- def value(self, data):
- value = ord(data[self.offset])
- if self.values.has_key(value):
- return self.values[value]
- else:
- return None
-
-packetDescription = [
- # Service 0x05 (Brennersteuerbefehle)
- {'primary':0x5, 'secondary':0x3, 'name':'Betriebsdaten des Feuerungsautomaten an den Regler Block1'},
- {'primary':0x5, 'secondary':0x7, 'name':'Betriebsdaten des Reglers an den Feuerungsautomaten','format': {
- 'betriebszustand':fields.ByteEnum(0, {
- 0x00:'Brenner Abschalten',
- 0x01:'Keine Aktion',
- 0x55:'Brauchwassbereitung',
- 0xaa:'Heizbetrieb',
- 0xcc:'Emmissionskontroll',
- 0xdd:'TÜV-Funktion',
- 0xee:'Regler Stopp',
- 0x66:'Brauchwasserbereitung bei Reglerstoppfunktion',
- 0xbb:'Brauchwasserbereitung bei Heizbetrieb',
- 0x44:'Reglerstoppfunktion bei stufigem Betrieb'}),
- 'betriebszustand2':fields.ByteEnum(1, {
- 0x00:'Keine Aktion',
- 0x01:'Ausschalten Kesselpumpe',
- 0x02:'Einschalten Kesselpumpe',
- 0x03:'Ausschalten variable Verbraucher',
- 0x04:'Einschalten variabler Verbraucher'}),
- 'kesselSollwertTemperatur':fields.Data2c(2),
- 'kesselSollwertDruck':fields.Data2b(4),
- }},
- # Service 0x07 (Systemdatenbefehle)
- {'primary':0x7, 'secondary':0x0, 'name':'Datum/Zeit - Meldung eines eBUS Masters'},
- {'primary':0x7, 'secondary':0x4, 'name':'Identifikation'},
- # Service 0x08 (Reglerbefehle)
- {'primary':0x8, 'secondary':0x0, 'name':'Sollwertübertragung des Reglers an andere Regler'},
-
- # Response
- #p[0] = Einheit (1=>Liter, 2=>Kubik)
- #p[1] = 10^0
- #p[2] = 10^2
- #p[3] = 10^4
- #p[4] = 10^6
- {'primary':0x3, 'secondary':0x8, 'name':'Gesamtbrennstoffmengenzähle lesen'},
- {'primary':0x50, 'secondary':0x17, 'name':'Solar Daten', 'format':{
- 'solarPumpe':fields.Bit(0),
- 'tempKollektor':fields.Data2c(2),
- 'tempWarmwasserSolar':fields.Data2c(4)}},
-]
-
-def formatHex(data):
- return " ".join(map(lambda byte: "%.2x"%ord(byte), data))
-
-def getDsc(address):
- dev=filter(lambda dev: dev['address'] == address, deviceDescription)
- if len(dev)>0:
- return dev[0]['description']
- else:
- return None
-
-class EbusPacket(object):
- def __init__(self, source, destination, primary_command, secondary_command):
- self.source = source
- self.destination = destination
- self.primary_command = primary_command
- self.secondary_command = secondary_command
-
- def name(self):
- if self.description():
- return self.description()['name']
-
- def __str__(self):
- return "<%-18s name=\"%-15s\" source=\"%s\" destination=\"%s\" primary=0x%x secondary=0x%x length=0x%x %s>" % \
- (self.__class__.__name__, self.name(), getDsc(self.source), getDsc(self.destination), \
- self.primary_command, self.secondary_command, self.length, \
- " ".join(map(lambda name: "%s=%s" % (name, self.values()[name]),self.values())))
-
- def description(self):
- desc = filter(lambda p: p['primary'] == self.primary_command \
- and p['secondary'] == self.secondary_command, packetDescription)
- if len(desc) > 0:
- return desc[0]
- else:
- return None
-
- def values(self):
- desc = self.description()
- if not desc or not desc.has_key('format'):
- return dict()
- return dict( map(lambda name: (name, desc['format'][name].value(self.data) ), desc['format'].keys()) )
-
-class EbusMasterMaster(EbusPacket):
- def __init__(self, source, destination, primary_command, secondary_command, data):
- EbusPacket.__init__(self, source, destination, primary_command, secondary_command)
- self.length = len(data)
- self.data = data
-
-class EbusMasterSlave(EbusPacket):
- def __init__(self, source, destination, primary_command, secondary_command, request, response):
- EbusPacket.__init__(self, source, destination, primary_command, secondary_command)
- self.length = len(request)
- self.request = request
- self.response = response
-
-class EbusBroadcast(EbusPacket):
- def __init__(self, source, destination, primary_command, secondary_command, data):
- EbusPacket.__init__(self, source, destination, primary_command, secondary_command)
- self.length = len(data)
- self.data = data
-
-
-class EbusReader(asynchat.async_chat):
- def __init__(self):
- self.buffer = ""
-
- asynchat.async_chat.__init__(self)
- self.set_terminator("")
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.connect(("10.2.2.200", 7970))
-
- self.packetIndex = 0
- self.buf = ""
-
- def collect_incoming_data(self,data):
- for it in range(len(data)):
- if data[it] == "\xaa":
- if it+1 < len(data) and data[it+1] != "\xaa":
- self._parse(self.buf)
- self.buf = ""
- self.packetIndex = 0
- else:
- self.buf += data[it]
- self.packetIndex = self.packetIndex + 1
- #print "%.2x [%d]" % (ord(data[it]),self.packetIndex)
-
- def _parse(self,data):
- if len(data) < 2:
- print "GAGA"
- return
- source = ord(data[0])
- destination = ord(data[1])
-
- sourceDevice = filter(lambda dev: dev['address'] == source, deviceDescription)
- destinationDevice = filter(lambda dev: dev['address'] == destination, deviceDescription)
- if len(sourceDevice) == 0 or len(destinationDevice) == 0:
- print "Unbekanntes Paket: source=%x destination=%x" % (source, destination)
- return
-
- if len(data) < 9:
- print "Unvollständige Daten"
- return
-
- primaryCommand = ord(data[2])
- secondaryCommand = ord(data[3])
- payloadLength = ord(data[4])
- payload = data[5:6+payloadLength]
- #delete 0x50 packets for devel
- #if (primaryCommand == ord("\x50")):
-# return
- #print "PR SC NN D0 D1 D2 D3 D4 D5 D6 D7 ..."
- #print "%.2x" % (primaryCommand,)
- #print "%.2x %.2x %.2x %s" % (primaryCommand,secondaryCommand,payloadLength,"bla")
- #print "%.2x %.2x %.2x %s" % (primaryCommand,secondaryCommand,payloadLength,formatHex(payload))
-
- p = None
- if sourceDevice[0]['type'] == 'master' and destinationDevice[0]['type'] == 'master':
- p = EbusMasterMaster(source, destination, primaryCommand, secondaryCommand, payload)
- self.handle_ebus(p)
- elif sourceDevice[0]['type'] == 'master' and destinationDevice[0]['type'] == 'slave':
- p = EbusMasterSlave(source, destination, primaryCommand, secondaryCommand, payload, None) #FIXME
- self.handle_ebus(p)
- elif sourceDevice[0]['type'] == 'master' and destinationDevice[0]['type'] == 'broadcast':
- p = EbusBroadcast(source, destination, primaryCommand, secondaryCommand, payload)
- self.handle_ebus(p)
- else:
- print "KOMISCHES ZEUG"
- return
-
- def handle_ebus(self,ebus_packet):
- print "unhandled ebus_packet"
-
-
-
-class MyEbusReader(EbusReader):
- def handle_ebus(self,ebus_packet):
- if ebus_packet.values() != dict():
- print ebus_packet
-
-MyEbusReader()
-asyncore.loop()