diff options
Diffstat (limited to 'ebus-rust/src/main.rs')
-rw-r--r-- | ebus-rust/src/main.rs | 403 |
1 files changed, 403 insertions, 0 deletions
diff --git a/ebus-rust/src/main.rs b/ebus-rust/src/main.rs new file mode 100644 index 0000000..9046ce6 --- /dev/null +++ b/ebus-rust/src/main.rs @@ -0,0 +1,403 @@ +extern crate bytes; +extern crate env_logger; +extern crate log; +extern crate nom; +extern crate quick_xml; +extern crate serde; +extern crate structopt; + +mod layer2; +mod layer7; +mod model; + +use bytes::{BufMut, BytesMut}; +use std::fs::File; +use std::io; +use std::io::Read; +use std::path::Path; +use std::thread; +use std::time; +use structopt::StructOpt; + +#[derive(Debug, StructOpt)] +struct Opts { + #[structopt( + short, + long, + default_value = "", + help = "if not set internal copy will be used" + )] + config: String, + #[structopt(skip)] + ebus_xml: Vec<u8>, + #[structopt(short, long, help = "Use the adapter.ebusd.eu enhanced protocol")] + enhanced: bool, + #[structopt(subcommand)] + subcmd: Command, +} + +#[derive(Debug, StructOpt)] +enum Command { + Dump(Dump), + DumpL2(DumpL2), + ParseL2(ParseL2), + ParseL7(ParseL7), + Influxdb(Influxdb), +} + +impl Command { + fn run(&self, opts: &Opts) { + match self { + Command::Dump(..) => dump(opts), + Command::DumpL2(..) => dump_l2(opts), + Command::ParseL2(..) => parse_l2(opts, &|p| println!("{:?}", p)), + Command::ParseL7(..) => parse_l7(opts, &|_, _, _, _| ()), + Command::Influxdb(cfg) => influxdb(opts, cfg), + } + } +} + +#[derive(Debug, StructOpt)] +#[structopt(about = "Dump the configuration from XML to stdout")] +struct Dump {} + +#[derive(Debug, StructOpt)] +#[structopt(about = "Dump raw data")] +struct DumpL2 {} + +#[derive(Debug, StructOpt)] +#[structopt(about = "Parse and dump L2 from stdin")] +struct ParseL2 {} + +#[derive(Debug, StructOpt)] +#[structopt(about = "Parse and dump L7 from stdin")] +struct ParseL7 {} + +#[derive(Debug, StructOpt)] +#[structopt(about = "Parse from stdin and write to influxdb")] +struct Influxdb { + #[structopt(short, long, default_value = "http://localhost:8086")] + url: String, + #[structopt(short, long, default_value = "ebus")] + db: String, + #[structopt(short, long, default_value = "ebus")] + measurement: String, +} + +fn main() { + env_logger::init(); + let mut opts: Opts = Opts::from_args(); + + log::info!("Enhanced protocol is status: {}", opts.enhanced); + + if opts.config.is_empty() { + opts.ebus_xml = Vec::from(*include_bytes!("../../ebus-xml/ebus.xml")); + } else { + let path = Path::new(&opts.config); + let mut file = File::open(&path) + .map_err(|e| format!("could not open file: {}", e)) + .unwrap(); + let mut xml = String::new(); + file.read_to_string(&mut xml) + .map_err(|e| format!("could not read: {}", e)) + .unwrap(); + opts.ebus_xml = Vec::from(xml.as_bytes()); + } + + opts.subcmd.run(&opts); +} + +fn dump(opts: &Opts) { + let conf = model::read_config(&opts.ebus_xml).unwrap(); + for dev in conf.devices.devices { + println!("device: address={} name={}", dev.address, dev.name); + for d in dev.descriptions { + println!(" {}: {}", d.lang, d.text) + } + } + for p in conf.packets.packets { + println!( + "packet: primary={:02} secondary={:02} name={}", + p.primary, p.secondary, p.name + ); + for d in p.descriptions { + println!(" {}: {}", d.lang, d.text); + } + for f in p.fields.fields.unwrap() { + match f { + model::PacketField::Byte { offset, name } => { + println!(" [{:02}] Byte: {}", offset, name); + } + model::PacketField::Data1b { offset, name } => { + println!(" [{:02}] Data1b: {}", offset, name) + } + model::PacketField::Data1c { offset, name } => { + println!(" [{:02}] Data1c: {}", offset, name) + } + model::PacketField::Data2b { offset, name } => { + println!(" [{:02}] Data2b: {}", offset, name) + } + model::PacketField::Data2c { offset, name } => { + println!(" [{:02}] Data1c: {}", offset, name) + } + model::PacketField::ByteEnum { + offset, + name, + options, + } => { + println!(" [{:02}] ByteEnum: {}", offset, name); + for o in options.0 { + println!(" [{:02}] {}", o.value, o.name); + } + } + model::PacketField::Bcd { offset, name } => { + println!(" [{:02}] Bcd: {}", offset, name) + } + model::PacketField::Bit { offset, name } => { + println!(" [{:02}] Bit: {}", offset, name) + } + model::PacketField::Word { offset, name } => { + println!(" [{:02}] Word: {}", offset, name) + } + model::PacketField::String { + offset, + name, + length, + } => { + println!(" [{:02}] String: {} / {}", offset, name, length) + } + } + } + } +} + +fn dump_l2(opts: &Opts) { + let mut buf = BytesMut::with_capacity(1024); + let mut other: u8 = 0; + let mut sync = false; + + for b in io::stdin().bytes() { + let mut b = b.unwrap(); + if opts.enhanced && b >= 0x80 { + if other == 0 { + if b & 0xc0 == 0xc0 { + other = b; + } + continue; + } else { + // byte-1 byte-2 + // 76543210 76543210 + // 11ccccdd 10dddddd + // c: command (0x01 = receive) + // d: data (here b) + b = (other & 0x03) << 6 | b & 0x3f; + other = 0; + } + } + if !sync { + if b == 0xaa { + sync = true; + } + continue; + } + buf.put_u8(b); + + if b == 0xaa { + if buf.len() > 1 { + println!( + "{:#02x?}", + buf.iter() + .map(|v| format!("0x{:02x}", v)) + .collect::<Vec<String>>() + .join(", ") + ); + } + buf.clear() + } + } +} + +fn parse_l2(opts: &Opts, cb: &dyn Fn(layer2::Packet)) { + let mut buf = BytesMut::with_capacity(1024); + let mut other: u8 = 0; + let mut sync = false; + for b in io::stdin().bytes() { + let mut b = b.unwrap(); + if opts.enhanced && b >= 0x80 { + if other == 0 { + if b & 0xc0 == 0xc0 { + other = b; + } + continue; + } else { + // byte-1 byte-2 + // 76543210 76543210 + // 11ccccdd 10dddddd + // c: command (0x01 = receive) + // d: data (here b) + b = (other & 0x03) << 6 | b & 0x3f; + other = 0; + } + } + if !sync { + if b == 0xaa { + sync = true; + } + continue; + } + if b == 0xaa { + if buf.len() > 1 { + buf.put_u8(0xaa); + let l2p = layer2::parse(&buf); + match l2p.ok() { + Some(p) => { + if p.calc_crc() == p.crc { + log::debug!( + "{:#02x?} from {:#02x?}", + p, + buf.iter() + .map(|v| format!("0x{:02x}", v)) + .collect::<Vec<String>>() + .join(", ") + ); + } else { + log::info!( + "CRC-Fail: [{}] should be crc={:#02x}", + buf.iter() + .map(|v| format!("0x{:02x}", v)) + .collect::<Vec<String>>() + .join(", "), + p.calc_crc() + ); + } + cb(p); + } + None => { + if buf.len() > 2 { + log::info!( + "Discard: [{}]", + buf.iter() + .map(|v| format!("0x{:02x}", v)) + .collect::<Vec<String>>() + .join(", ") + ); + } + } + } + buf.clear(); + } + continue; + } + buf.put_u8(b); + } +} + +fn parse_l7(opts: &Opts, cb: &dyn Fn(String, String, String, &Vec<layer7::DecodedField>)) { + let conf = model::read_config(&opts.ebus_xml).unwrap(); + + parse_l2(opts, &|p| { + let pack = conf.packets.get(p.primary, p.secondary); + if pack.is_none() { + log::info!("No definition: {:?}", p); + return; + } + let pack = pack.unwrap(); + let values = pack + .fields + .fields + .as_ref() + .map(|fields| layer7::decode_fields(&p, fields)); + + let source = conf + .devices + .devices + .iter() + .find(|d| d.address == p.source) + .map(|d| d.name.clone()) + .unwrap_or_else(|| format!("{:#02x}", p.source)); + let destination = conf + .devices + .devices + .iter() + .find(|d| d.address == p.destination) + .map(|d| d.name.clone()) + .unwrap_or_else(|| format!("{:#02x}", p.destination)); + + log::info!( + "{} -> {}: {} {}", + source, + destination, + pack.name, + values + .as_ref() + .unwrap_or(&Vec::new()) + .iter() + .map(|field| format!( + "{}={}", + field.name.clone(), + field + .v + .as_ref() + .map(|v2| v2.convert()) + .unwrap_or_else(|| String::from("<>")) + )) + .collect::<Vec<String>>() + .join(", ") + ); + if let Some(values) = values { + cb(source, destination, pack.name.clone(), &values); + } + }) +} + +impl From<&layer7::types::Value> for rinfluxdb::line_protocol::FieldValue { + fn from(v: &layer7::types::Value) -> Self { + match v { + layer7::types::Value::Bool(v) => rinfluxdb::line_protocol::FieldValue::from(*v), + layer7::types::Value::I8(v) => rinfluxdb::line_protocol::FieldValue::from(*v as i64), + layer7::types::Value::U8(v) => rinfluxdb::line_protocol::FieldValue::from(*v as u64), + layer7::types::Value::U16(v) => rinfluxdb::line_protocol::FieldValue::from(*v as u64), + layer7::types::Value::F32(v) => rinfluxdb::line_protocol::FieldValue::from(*v as f64), + layer7::types::Value::String(v) => { + rinfluxdb::line_protocol::FieldValue::from(v.clone()) + } + } + } +} + +fn influxdb(opts: &Opts, cfg: &Influxdb) { + use rinfluxdb::line_protocol; + use rinfluxdb::line_protocol::blocking::Client; + use rinfluxdb::line_protocol::LineBuilder; + use url::Url; + + let url = Url::parse(&cfg.url).unwrap(); + let client = Client::new(url, Some(("", ""))).unwrap(); + + parse_l7(opts, &|source, destination, name, values| { + let lines = values + .iter() + .filter(|df| df.v.is_some()) // filter ersatzwert values + .map(|df| { + let l = LineBuilder::new(cfg.measurement.clone()) + .insert_tag("source", source.clone()) + .insert_tag("destination", destination.clone()) + .insert_field( + format!("{}.{}", name.clone(), df.name.clone()), + df.v.as_ref().unwrap(), + ) + .build(); + log::debug!("{:?}", l); + l + }) + .collect::<Vec<line_protocol::Line>>(); + + match client.send(&cfg.db, &lines) { + Ok(_) => (), + Err(err) => { + log::error!("Sleep 1s after error sending values to influxdb: {}", err); + thread::sleep(time::Duration::from_millis(1000)); + } + }; + }); +} |