From 78908906a5ebedba5762cb6ac392bfed651c7c7c Mon Sep 17 00:00:00 2001 From: Yves Fischer Date: Sun, 4 Mar 2012 22:22:51 +0100 Subject: ebus-racket: inserter.rkt , overall improvements --- ebus-racket/db.rkt | 79 +++++++++++++++++++++++++++++++++---------- ebus-racket/dumper.rkt | 3 +- ebus-racket/inserter.rkt | 87 ++++++++++++++++++++++++++++++++++++++++++++++++ ebus-racket/layer2.rkt | 11 +++--- ebus-racket/layer7.rkt | 19 ++++++----- 5 files changed, 168 insertions(+), 31 deletions(-) create mode 100755 ebus-racket/inserter.rkt diff --git a/ebus-racket/db.rkt b/ebus-racket/db.rkt index 9ebdfa9..54de8f7 100644 --- a/ebus-racket/db.rkt +++ b/ebus-racket/db.rkt @@ -1,40 +1,85 @@ #lang racket/base (require (prefix-in db: (planet ryanc/db:1:5))) +(require (prefix-in db: (planet ryanc/db:1:5/util/connect))) + +(define logger (make-logger 'ebus-db (current-logger))) + +(define con-user? (make-parameter "ebus")) +(define con-password? (make-parameter "ebus")) +(define con-database? (make-parameter "ebus")) +(define con-server? (make-parameter "localhost")) (define pgc - (db:postgresql-connect #:user "ebus" - #:database "ebus" - #:password "ebus" - #:server "10.2.2.26")) + (db:virtual-connection + (lambda () + (log-message logger 'info (format "Connect to Database: user=~a database=~a server=~a" + (con-user?) (con-database?) (con-server?)) #t) + (db:postgresql-connect #:user (con-user?) + #:database (con-database?) + #:password (con-password?) + #:server (con-server?))))) + +;; Test Database Connection +;; Returns #t on success, #f otherwise +(define (pgc-test) + (with-handlers ([exn:fail? (lambda (exn) #f)]) + (= (db:query-value pgc "SELECT 1") 1))) ;; Query ID of sensor given by sensor-name ;; Returns null if sensor is undefined (define (get-sensor-id sensor-name) + (define sql-stmt "SELECT id FROM sensor WHERE name = $1") (with-handlers ([exn:fail? (lambda (exn) (void))]) - (db:query-value pgc "SELECT id FROM sensor WHERE name = $1" sensor-name))) + (db:query-value pgc sql-stmt sensor-name))) ;; Create Sensor-ID with given name ;; returns id (define (create-sensor-id sensor-name) - (db:query-exec "INSERT INTO sensor(name) VALUES ($1)" sensor-name) + (log-message logger 'info (format "create sensor id for ~a" sensor-name) #t) + (db:query-exec pgc "INSERT INTO sensor(name) VALUES ($1)" sensor-name) (get-sensor-id sensor-name)) ;; Get ID of sensor given by sensor-name ;; define sensor if needed -(define (sensor-id sensor-name) +(define (get-or-create-sensor-id sensor-name) (define id (get-sensor-id sensor-name)) - (cond ((void? id) (create-sensor-id sensor-id)) + (cond ((void? id) (create-sensor-id sensor-name)) (else id))) +;; Insert Field in Database +;; Decide Database-Datatype from Ebus-Datatype +;; then calls 'insert` +(define (insert-field sensor-name datatype offset value) + (cond ((member datatype (list "data1c" "data2b" "data2c")) + ;; float + (insert sensor-name value db:sql-null db:sql-null)) + ((member datatype (list "bit" "byte" "data1b" "word" "bcd")) + ;; int + (insert sensor-name db:sql-null value db:sql-null)) + ((member datatype (list "byteEnum")) + ;; string + (insert sensor-name db:sql-null db:sql-null value)) + (else (log-message logger 'error (format "Datatype ~a is not support by DB" datatype) #t)))) + (define (insert sensor-name value-float value-int value-string) - (define sensor-id (sensor-id sensor-name)) - (define type (cond ((not (void? value-string)) "string") - ((not (void? value-float)) "float") - ((not (void? value-int)) "int"))) - (db:query-exec (string-append "INSERT INTO value(timestamp, sensor_id, type, value_float, value_int, value_string) " - "VALUES (now(), $1, $2, $3, $4, $5)") - sensor-id type value-float value-int value-string)) + (define sensor-id (get-or-create-sensor-id sensor-name)) + (define type (cond ((not (db:sql-null? value-string)) "string") + ((not (db:sql-null? value-float)) "float") + ((not (db:sql-null? value-int)) "int"))) + (define sql-stmt + (string-append "INSERT INTO value(timestamp, sensor_id, type, value_float, value_int, value_string) " + "VALUES (now(), $1, $2, $3, $4, $5)")) + (log-message logger 'info (string-append sql-stmt "\n\t\t" + (format + "sensor-id=~a type=~a value-float=~a value-int=~a value-string=~a" + sensor-id type value-float value-int value-string)) #t) + (db:query-exec sql-stmt sensor-id type value-float value-int value-string)) (provide - (prefix-out db- insert)) - + (prefix-out db- logger) + (prefix-out db- con-user?) + (prefix-out db- con-password?) + (prefix-out db- con-database?) + (prefix-out db- con-server?) + (prefix-out db- pgc-test) + (prefix-out db- insert-field)) diff --git a/ebus-racket/dumper.rkt b/ebus-racket/dumper.rkt index 7ffd556..cc31a16 100755 --- a/ebus-racket/dumper.rkt +++ b/ebus-racket/dumper.rkt @@ -23,13 +23,14 @@ (connect-port? (string->number port)) ])) -; Connect +;; Connect, replacing input with tcp connection (if (or (null? (connect-host?)) (null? (connect-port?))) (display "Using stdin\n") (let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))]) (display (format "Connected to ~s ~s ~n" (connect-host?) (connect-port?))) (current-input-port cin))) + (define (read-ebus-loop2 input-port) (let ([paket (layer2-read-ebus (current-input-port))]) (pretty-print paket) diff --git a/ebus-racket/inserter.rkt b/ebus-racket/inserter.rkt new file mode 100755 index 0000000..c340019 --- /dev/null +++ b/ebus-racket/inserter.rkt @@ -0,0 +1,87 @@ +#! /usr/bin/env racket +#lang racket/base +(require racket/cmdline + racket/tcp + racket/pretty + racket/match + "db.rkt" + "layer7.rkt" + "layer2.rkt") + +(define connect-host? (make-parameter null)) +(define connect-port? (make-parameter null)) +(define loglevel-layer2? (make-parameter 'info)) +(define loglevel-db? (make-parameter 'info)) +(define insert? (make-parameter #f)) + +(define (handle-fields fields) + (for ([field fields]) + (when (insert?) + (apply db-insert-field field)) + (when (not (insert?)) + (display (format "Field: ~a~n" field))))) + +(define (read-ebus-loop7 input-port) + (let ([fields (layer7-read-ebus (current-input-port))]) + (when (not (or (void? fields) (eof-object? fields))) + (handle-fields fields)) + (cond ((not (eof-object? fields)) (read-ebus-loop7 input-port))))) + +;; Start Thread that observe all given log-receivers +(define (start-logger-thread receiver1 . receiverN) + (define receivers (cons receiver1 receiverN)) + (void + (thread + (lambda () + (let loop () + (match (apply sync receivers) + [(vector level msg data) + (printf "[~s] ~a~n" level msg) + (flush-output)]) + (loop)))))) + +(define (main) + ;; Parse commandline + (command-line + #:once-each + [("-c" "--connect") host port "Connect to server " + (connect-host? host) + (connect-port? port)] + ["--debug-layer2" "Log level for Layer 2 Parser" + (loglevel-layer2? 'debug)] + ["--debug-db" "Log level for DB" + (loglevel-db? 'debug)] + ["--insert" "Do Insert into Database" + (insert? #t)] + ["--db-user" user "Datanase User" + (db-con-user? user)] + ["--db-password" password "Database password" + (db-con-password? password)] + ["--db-database" database "Database database-name" + (db-con-database? database)] + ["--db-server" server "Database Address/Server" + (db-con-server? server)]) + + ;; Init Logging + (start-logger-thread (make-log-receiver (current-logger) 'info) + (make-log-receiver db-logger (loglevel-db?)) + (make-log-receiver layer2-logger (loglevel-layer2?)) + (make-log-receiver layer7-logger 'info)) + + ;; Test Database Connection + (when (not (db-pgc-test)) + (display "Failed to connect database") + (exit 1)) + + ;; Connect, replacing input with tcp connection + (if (or (null? (connect-host?)) (null? (connect-port?))) + (log-message (current-logger) 'info "Using stdin" #t) + (let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))]) + (display (format "Connected to ~s ~s ~n" (connect-host?) (connect-port?))) + (current-input-port cin))) + + ;; Process Ebus Packets + (read-ebus-loop7 (current-input-port)) + ) + +(exit (main)) diff --git a/ebus-racket/layer2.rkt b/ebus-racket/layer2.rkt index e0f9fbd..9ed2118 100644 --- a/ebus-racket/layer2.rkt +++ b/ebus-racket/layer2.rkt @@ -2,6 +2,7 @@ (require racket/bool (planet bzlib/parseq:1:3)) +(define logger (make-logger 'ebus-layer2 (current-logger))) ;; Ebus SYN (define ebus-const-syn #xaa) ;; Ebus Escape-Sequence Start @@ -83,16 +84,15 @@ (define syn ((make-reader ebus-sync #:sof? #f #:eof? #f) input-port)) (define paket ((make-reader parse-ebus-paket #:sof? #f #:eof? #f) input-port)) (cond ((not (false? syn)) - (display (format "drop ~s x SYN (~s) ~n" syn ebus-const-syn)))) + (log-message logger 'debug (format "drop ~s x SYN (~s)" syn ebus-const-syn) #t))) (cond ((not (false? paket)) paket) ((eof-object? (peek-byte input-port)) eof) (else - (let ([byte (read-byte input-port)]) - (display (format "drop ~s 0x~x ~n" byte byte))) ;; skip one byte - (read-byte input-port) + (let ([byte (read-byte input-port)]) + (log-message logger 'debug (format "drop ~s 0x~x ~n" byte byte) #t)) (read-ebus input-port)))) (provide @@ -101,4 +101,5 @@ (prefix-out layer2- (struct-out ebus-paket)) (prefix-out layer2- (struct-out ebus-body-broadcast)) (prefix-out layer2- (struct-out ebus-body-mastermaster)) - (prefix-out layer2- (struct-out ebus-body-masterslave))) + (prefix-out layer2- (struct-out ebus-body-masterslave)) + (prefix-out layer2- logger)) \ No newline at end of file diff --git a/ebus-racket/layer7.rkt b/ebus-racket/layer7.rkt index 95739e6..eb37e91 100644 --- a/ebus-racket/layer7.rkt +++ b/ebus-racket/layer7.rkt @@ -4,11 +4,12 @@ (planet lizorkin/ssax:2:0/ssax) "layer2.rkt") +(define logger (make-logger 'ebus-layer7 (current-logger))) + (define definition (ssax:xml->sxml (open-input-file "../ebus-xml/ebus.xml") '[(#f . "http://xapek.org/ebus/0.1")])) - (define (paket ebus-paket) (define primaryCommand (layer2-ebus-paket-primaryCommand ebus-paket)) (define secondaryCommand (layer2-ebus-paket-secondaryCommand ebus-paket)) @@ -23,22 +24,24 @@ ;; returns device-name in a list or empty-list (define (device-name address) - ((sxpath "@name/text()") (device address))) + (first ((sxpath "@name/text()") (device address)))) (define (paket-fields ebus-paket) (define paket-definition (paket ebus-paket)) (cond ((> (length paket-definition) 0) (let* - ([paket-name (first ((sxpath "@name/text()") paket-definition))] + ([paket-name (string-append + (device-name (layer2-ebus-paket-source ebus-paket)) + "." + (first ((sxpath "@name/text()") paket-definition)))] [fields ((sxpath "fields/*") paket-definition)] [values (for/list ([field fields]) (paket-fields-dispatch-decoder ebus-paket field paket-name))]) ;; filter invalid values (for/list ([value values] #:when (not (void? value))) value))) - (else (display (format "Unknown Paket: ~s~n" ebus-paket)) - (void)))) + (else (void (log-message logger 'warning (format "Unknown Paket: ~s" ebus-paket) #t))))) (define (paket-fields-dispatch-decoder ebus-paket field paket-name) (define datatype ((sxpath "name()") field)) @@ -74,8 +77,7 @@ ((string=? "byteEnum" datatype) (list name datatype offset (field-decoder-byteEnum (list-ref payload offset) field))) - (else (display (string-append "unknown datatype: " datatype "\n")) - (void)))) + (else (void (log-message logger 'error (format "unknown datatype: ~a" datatype) #t))))) ;; type bit (define (field-decoder-bit value) @@ -150,4 +152,5 @@ (prefix-out layer7- device) (prefix-out layer7- device-name) ;; read ebus from port an return fields from next paket - (prefix-out layer7- read-ebus)) \ No newline at end of file + (prefix-out layer7- read-ebus) + (prefix-out layer7- logger)) \ No newline at end of file -- cgit v1.2.1