diff options
Diffstat (limited to 'ebus-racket')
-rw-r--r-- | ebus-racket/ebus/db.rkt | 88 | ||||
-rw-r--r-- | ebus-racket/ebus/layer2.rkt | 4 | ||||
-rw-r--r-- | ebus-racket/ebus/layer7.rkt | 19 | ||||
-rwxr-xr-x | ebus-racket/inserter.rkt | 100 | ||||
-rw-r--r-- | ebus-racket/util/json.rkt | 198 |
5 files changed, 253 insertions, 156 deletions
diff --git a/ebus-racket/ebus/db.rkt b/ebus-racket/ebus/db.rkt deleted file mode 100644 index 2c6be6c..0000000 --- a/ebus-racket/ebus/db.rkt +++ /dev/null @@ -1,88 +0,0 @@ -#lang racket/base -(require racket/path) -(require (prefix-in db: db)) - -(define db-logger (make-logger 'ebus-db (current-logger))) - -(define db-file? (make-parameter - (build-path (path-only (find-system-path 'run-file)) "database.sqlite3"))) - -(define pool - (db:virtual-connection - (db:connection-pool - (lambda () - (log-message db-logger 'info (format "Open database file ~a" (db-file?)) #t) - (with-handlers ([exn:fail? (lambda (exn) - (log-message db-logger 'error (format "Error opening database ~a" (db-file?)) #t) (raise exn))]) - (db:postgresql-connect #:user "ebus" - #:database "ebus" - #:password "ebus" - #:server "localhost"))) - ;; (db:sqlite3-connect #:database (db-file?)))) - ;; - #:max-connections 5 - #:max-idle-connections 2 - ))) - - -;; Test Database Connection -;; Returns #t on success, #f otherwise -(define (db-test) - (with-handlers ([exn:fail? (lambda (exn) (log-message db-logger 'error (format "Error: ~a" exn) #t) #f)]) - (= (db:query-value pool "SELECT 1") 1))) - -;; Query ID of sensor given by sensor-name -;; Returns null if sensor is undefined -(define (get-sensor-id sensor-name) - (define sql-stmt "SELECT id FROM sensor WHERE name = $1") - (with-handlers ([exn:fail? (lambda (exn) (void))]) - (db:query-value pool sql-stmt sensor-name))) - -;; Create Sensor-ID with given name -;; returns id -(define (create-sensor-id sensor-name) - (log-message db-logger 'info (format "create sensor id for ~a" sensor-name) #t) - (db:query-exec pool "INSERT INTO sensor(name) VALUES ($1)" sensor-name) - (get-sensor-id sensor-name)) - -;; Get ID of sensor given by sensor-name -;; define sensor if needed -(define (get-or-create-sensor-id sensor-name) - (define id (get-sensor-id sensor-name)) - (cond ((void? id) (create-sensor-id sensor-name)) - (else id))) - -;; Insert Field in Database -;; Decide Database-Datatype from Ebus-Datatype -;; then calls 'insert` -(define (db-insert-field sensor-name datatype offset value) - (cond ((member datatype (list "data1c" "data2b" "data2c")) - ;; float - (insert sensor-name value db:sql-null db:sql-null)) - ((member datatype (list "bit" "byte" "data1b" "word" "bcd")) - ;; int - (insert sensor-name db:sql-null value db:sql-null)) - ((member datatype (list "byteEnum")) - ;; string - (insert sensor-name db:sql-null db:sql-null value)) - (else (log-message db-logger 'error (format "Datatype ~a is not support by DB" datatype) #t)))) - -(define (insert sensor-name value-float value-int value-string) - (define sensor-id (get-or-create-sensor-id sensor-name)) - (define type (cond ((not (db:sql-null? value-string)) "string") - ((not (db:sql-null? value-float)) "float") - ((not (db:sql-null? value-int)) "int"))) - (define sql-stmt - (string-append "INSERT INTO value(timestamp, sensor_id, type, value_float, value_int, value_string) " - "VALUES (CURRENT_TIMESTAMP, $1, $2, $3, $4, $5)")) - (log-message db-logger 'info (string-append sql-stmt "\n\t\t" - (format - "sensor-id=~a type=~a value-float=~a value-int=~a value-string=~a" - sensor-id type value-float value-int value-string)) #t) - (db:query-exec pool sql-stmt sensor-id type value-float value-int value-string)) - -(provide - db-file? - db-logger - db-test - db-insert-field) diff --git a/ebus-racket/ebus/layer2.rkt b/ebus-racket/ebus/layer2.rkt index 3dd881c..3309c16 100644 --- a/ebus-racket/ebus/layer2.rkt +++ b/ebus-racket/ebus/layer2.rkt @@ -91,7 +91,7 @@ (else ;; skip one byte (let ([byte (read-byte input-port)]) - (log-message logger 'debug (format "drop ~s 0x~x ~n" byte byte) #t)) + (log-message logger 'debug (format "drop ~s 0x~x" byte byte) #t)) (read-ebus input-port)))) (provide @@ -101,4 +101,4 @@ (prefix-out layer2- (struct-out ebus-body-broadcast)) (prefix-out layer2- (struct-out ebus-body-mastermaster)) (prefix-out layer2- (struct-out ebus-body-masterslave)) - (prefix-out layer2- logger))
\ No newline at end of file + (prefix-out layer2- logger)) diff --git a/ebus-racket/ebus/layer7.rkt b/ebus-racket/ebus/layer7.rkt index 3bcad01..cb9da0b 100644 --- a/ebus-racket/ebus/layer7.rkt +++ b/ebus-racket/ebus/layer7.rkt @@ -1,7 +1,5 @@ #lang racket/base -(require racket/list - (planet lizorkin/sxml:2:1/sxml) - (planet lizorkin/ssax:2:0/ssax) +(require (planet clements/sxml2:1:=3) "layer2.rkt") (define logger (make-logger 'ebus-layer7 (current-logger))) @@ -24,8 +22,7 @@ ;; returns device-name in a list or empty-list (define (device-name address) - (first ((sxpath "@name/text()") (device address)))) - + (car ((sxpath "@name/text()") (device address)))) (define (paket-fields ebus-paket) (define paket-definition (paket ebus-paket)) @@ -34,7 +31,7 @@ ([paket-name (string-append (device-name (layer2-ebus-paket-source ebus-paket)) "." - (first ((sxpath "@name/text()") paket-definition)))] + (car ((sxpath "@name/text()") paket-definition)))] [fields ((sxpath "fields/*") paket-definition)] [values (for/list ([field fields]) (paket-fields-dispatch-decoder ebus-paket field paket-name))]) @@ -45,8 +42,8 @@ (define (paket-fields-dispatch-decoder ebus-paket field paket-name) (define datatype ((sxpath "name()") field)) - (define name (string-append paket-name "." (first ((sxpath "@name/text()") field)))) - (define offset (string->number (first ((sxpath "@offset/text()") field)))) + (define name (string-append paket-name "." (car ((sxpath "@name/text()") field)))) + (define offset (string->number (car ((sxpath "@offset/text()") field)))) (define payload (layer2-ebus-paket-payload ebus-paket)) (cond ((string=? "bit" datatype) (list name datatype offset @@ -132,11 +129,11 @@ (define (pred l) (= value (list-ref l 0))) (define all-options (for/list ([option ((sxpath "option") field-definition)]) - (list (string->number (first ((sxpath "@value/text()") option))) ;; value, name - (first ((sxpath "@name/text()") option))))) + (list (string->number (car ((sxpath "@value/text()") option))) ;; value, name + (car ((sxpath "@name/text()") option))))) (define options (filter pred all-options)) (cond ((= (length options) 1) - (list-ref (first options) 1)) + (list-ref (car options) 1)) (else "<undefined>"))) ;; type word diff --git a/ebus-racket/inserter.rkt b/ebus-racket/inserter.rkt index 657154f..cd05ad9 100755 --- a/ebus-racket/inserter.rkt +++ b/ebus-racket/inserter.rkt @@ -2,52 +2,48 @@ #lang racket/base (require racket/cmdline racket/tcp - racket/match - "ebus/db.rkt" + racket/stream + net/url "ebus/layer7.rkt" - "ebus/layer2.rkt" - "util/tcp-repl.rkt") + "util/tcp-repl.rkt" + "util/json.rkt") (define logger (make-logger 'ebus-inserter (current-logger))) (define connect-host? (make-parameter null)) (define connect-port? (make-parameter null)) -(define loglevel-layer2? (make-parameter 'info)) -(define loglevel-db? (make-parameter 'warning)) (define insert? (make-parameter #f)) +(define baseurl? (make-parameter "http://localhost:8000/sensor")) -(define (handle-fields fields) - (for ([field fields]) +;; Send field and value to database server +(define (insert-field sensor-name datatype offset value) + (define type (cond ((member datatype (list "data1c" "data2b" "data2c")) "float") + ((member datatype (list "bit" "byte" "data1b" "word" "bcd")) "int") + ((member datatype (list "byteEnum")) "string"))) + (set! value (cond ((string=? type "float") (real->decimal-string value)) + (else value))) + (define response + (read-line (put-pure-port + (string->url (format "~a/~a" (baseurl?) sensor-name)) + (string->bytes/utf-8 (format "value=~a&type=~a" value type))))) + (define responseJson (string->jsexpr response)) + (cond ((eq? (json-null) (hash-ref responseJson 'error)) + (log-message logger 'debug (format "Successful insert: type=~a value=~a" + type value) #t)) + (else (log-message logger 'error (format "Error: type=~a value=~a ERROR:~a" + type value response) #t)))) + +(define (handle-packet packet) + (for ([field packet]) (log-message logger 'info (format "Field: ~a" field) #t) (when (insert?) (with-handlers ([exn:fail? (lambda (exn) - (log-message logger 'error (format "Failed to insert ~a: ~a" field exn) #t))]) - (apply db-insert-field field))))) - -(define (make-ebus-loop7 input-port) - (lambda () - (let loop () - (with-handlers ([exn:fail? (lambda (exn) - (log-message logger 'error (format "Failed to parse paket: ~a" exn) #t) - (loop))]) - (let ([fields (layer7-read-ebus (current-input-port))]) - (when (not (or (void? fields) (eof-object? fields))) - (handle-fields fields)) - (when (not (eof-object? fields)) - (loop))))))) - -;; Start Thread that observe all given log-receivers -(define (start-logger-thread receiver1 . receiverN) - (define receivers (cons receiver1 receiverN)) - (void - (thread - (lambda () - (let loop () - (match (apply sync receivers) - [(vector level msg data) - (printf "[~s] ~a~n" level msg) - (flush-output)]) - (loop)))))) + (log-message logger 'error (format "Failed to insert ~a: ~a" field exn) #t))] + [exn:fail:read? (lambda (exn) + (log-message logger 'error (format "TCP Read exception ~a" exn) #t))] + [exn:fail:network? (lambda (exn) + (log-message logger 'error (format "TCP Exception ~a" exn) #t))]) + (apply insert-field field))))) (define-namespace-anchor repl-ns-anchor) @@ -60,26 +56,10 @@ (connect-port? (string->number port))] ["--tcp-repl" port "Open REPL on TCP <port>" (tcp-repl-run (namespace-anchor->namespace repl-ns-anchor) (string->number port))] - ["--debug-layer2" "Log level for Layer 2 Parser" - (loglevel-layer2? 'debug)] - ["--debug-db" "Log level for DB" - (loglevel-db? 'debug)] ["--insert" "Do Insert into Database" (insert? #t)] - ["--db-file" user "Database file" - (db-file? user)]) - - ;; Init Logging - (start-logger-thread (make-log-receiver logger 'info) - (make-log-receiver db-logger (loglevel-db?)) - (make-log-receiver layer2-logger (loglevel-layer2?)) - (make-log-receiver layer7-logger 'info) - (make-log-receiver tcp-repl-logger 'info)) - - ;; Test Database Connection - (when (not (db-test)) - (log-message logger 'fatal "Failed to connect to database" #t) - (exit 1)) + ["--baseurl" url "Database server http url" + (baseurl? url)]) ;; Connect, replacing input with tcp connection (if (or (null? (connect-host?)) (null? (connect-port?))) @@ -89,7 +69,17 @@ (current-input-port cin))) ;; Process Ebus Packets - ((make-ebus-loop7 (current-input-port)) ) - ) + (for ([packet (make-stream (current-input-port))]) + (when (not (or (void? packet) (eof-object? packet))) + (handle-packet packet)) + (when (eof-object? packet) + (exit 1))) ) + +(define (make-stream port) + (stream-cons (with-handlers ([exn:fail? (lambda (exn) + (log-message logger 'error (format "Failed to parse paket: ~a" exn) #t) + (void))]) + (layer7-read-ebus port)) + (make-stream port))) (exit (main)) diff --git a/ebus-racket/util/json.rkt b/ebus-racket/util/json.rkt new file mode 100644 index 0000000..39bef38 --- /dev/null +++ b/ebus-racket/util/json.rkt @@ -0,0 +1,198 @@ +#lang racket/base + +#| Roughly based on the PLaneT package by Dave Herman, + Originally released under MIT license. +|# + +;; ---------------------------------------------------------------------------- +;; Customization + +;; The default translation for a JSON `null' value +(provide json-null) +(define json-null (make-parameter 'null)) + +;; ---------------------------------------------------------------------------- +;; Predicate + +(provide jsexpr?) +(define (jsexpr? x #:null [jsnull (json-null)]) + (let loop ([x x]) + (or (exact-integer? x) + (inexact-real? x) + (boolean? x) + (string? x) + (eq? x jsnull) + (and (list? x) (andmap loop x)) + (and (hash? x) (for/and ([(k v) (in-hash x)]) + (and (symbol? k) (loop v))))))) + +;; ---------------------------------------------------------------------------- +;; Generation: Racket -> JSON + +(provide write-json) +(define (write-json x [o (current-output-port)] + #:null [jsnull (json-null)] #:encode [enc 'control]) + (define (escape m) + (define ch (string-ref m 0)) + (define r + (assoc ch '([#\backspace . "\\b"] [#\newline . "\\n"] [#\return . "\\r"] + [#\page . "\\f"] [#\tab . "\\t"] + [#\\ . "\\\\"] [#\" . "\\\""]))) + (define (u-esc n) + (define str (number->string n 16)) + (define pad (case (string-length str) + [(1) "000"] [(2) "00"] [(3) "0"] [else ""])) + (string-append "\\u" pad str)) + (if r + (cdr r) + (let ([n (char->integer ch)]) + (if (n . < . #x10000) + (u-esc n) + ;; use the (utf-16 surrogate pair) double \u-encoding + (let ([n (- n #x10000)]) + (string-append (u-esc (+ #xD800 (arithmetic-shift n -10))) + (u-esc (+ #xDC00 (bitwise-and n #x3FF))))))))) + (define rx-to-encode + (case enc + [(control) #rx"[\0-\37\\\"\177]"] + [(all) #rx"[\0-\37\\\"\177-\U10FFFF]"] + [else (raise-type-error 'write-json "encoding symbol" enc)])) + (define (write-json-string str) + (write-bytes #"\"" o) + (write-string (regexp-replace* rx-to-encode str escape) o) + (write-bytes #"\"" o)) + (let loop ([x x]) + (cond [(or (exact-integer? x) (inexact-real? x)) (write x o)] + [(eq? x #f) (write-bytes #"false" o)] + [(eq? x #t) (write-bytes #"true" o)] + [(eq? x jsnull) (write-bytes #"null" o)] + [(string? x) (write-json-string x)] + [(list? x) + (write-bytes #"[" o) + (when (pair? x) + (loop (car x)) + (for ([x (in-list (cdr x))]) (write-bytes #"," o) (loop x))) + (write-bytes #"]" o)] + [(hash? x) + (write-bytes #"{" o) + (define first? #t) + (for ([(k v) (in-hash x)]) + (unless (symbol? k) + (raise-type-error 'write-json "bad JSON key value" k)) + (if first? (set! first? #f) (write-bytes #"," o)) + (write (symbol->string k) o) ; no `printf' => proper escapes + (write-bytes #":" o) + (loop v)) + (write-bytes #"}" o)] + [else (raise-type-error 'write-json "bad JSON value" x)])) + (void)) + +;; ---------------------------------------------------------------------------- +;; Parsing: JSON -> Racket + +(require syntax/readerr) + +(provide read-json) +(define (read-json [i (current-input-port)] #:null [jsnull (json-null)]) + ;; Follows the specification (eg, at json.org) -- no extensions. + ;; + (define (err fmt . args) + (define-values [l c p] (port-next-location i)) + (raise-read-error (format "read-json: ~a" (apply format fmt args)) + (object-name i) l c p #f)) + (define (skip-whitespace) (regexp-match? #px#"^\\s*" i)) + ;; + ;; Reading a string *could* have been nearly trivial using the racket + ;; reader, except that it won't handle a "\/"... + (define (read-string) + (let loop ([l* '()]) + ;; note: use a string regexp to extract utf-8-able text + (define m (cdr (or (regexp-try-match #rx"^([^\"\\]*)(\"|\\\\(.))" i) + (err "unterminated string")))) + (define l (if ((bytes-length (car m)) . > . 0) (cons (car m) l*) l*)) + (define esc (caddr m)) + (cond + [(not esc) (bytes->string/utf-8 (apply bytes-append (reverse l)))] + [(assoc esc '([#"b" . #"\b"] [#"n" . #"\n"] [#"r" . #"\r"] + [#"f" . #"\f"] [#"t" . #"\t"] + [#"\\" . #"\\"] [#"\"" . #"\""] [#"/" . #"/"])) + => (λ (m) (loop (cons (cdr m) l)))] + [(equal? esc #"u") + (let* ([e (or (regexp-try-match #px#"^[a-fA-F0-9]{4}" i) + (err "bad string \\u escape"))] + [e (string->number (bytes->string/utf-8 (car e)) 16)]) + (define e* + (if (<= #xD800 e #xDFFF) + ;; it's the first part of a UTF-16 surrogate pair + (let* ([e2 (or (regexp-try-match #px#"^\\\\u([a-fA-F0-9]{4})" i) + (err "bad string \\u escape, ~a" + "missing second half of a UTF16 pair"))] + [e2 (string->number (bytes->string/utf-8 (cadr e2)) 16)]) + (if (<= #xDC00 e2 #xDFFF) + (+ (arithmetic-shift (- e #xD800) 10) (- e2 #xDC00) #x10000) + (err "bad string \\u escape, ~a" + "bad second half of a UTF16 pair"))) + e)) ; single \u escape + (loop (cons (string->bytes/utf-8 (string (integer->char e*))) l)))] + [else (err "bad string escape: \"~a\"" esc)]))) + ;; + (define (read-list what end-rx read-one) + (skip-whitespace) + (if (regexp-try-match end-rx i) + '() + (let loop ([l (list (read-one))]) + (skip-whitespace) + (cond [(regexp-try-match end-rx i) (reverse l)] + [(regexp-try-match #rx#"^," i) (loop (cons (read-one) l))] + [else (err "error while parsing a json ~a" what)])))) + ;; + (define (read-hash) + (define (read-pair) + (define k (read-json)) + (unless (string? k) (err "non-string value used for json object key")) + (skip-whitespace) + (unless (regexp-try-match #rx#"^:" i) + (err "error while parsing a json object pair")) + (list (string->symbol k) (read-json))) + (apply hasheq (apply append (read-list 'object #rx#"^}" read-pair)))) + ;; + (define (read-json [top? #f]) + (skip-whitespace) + (cond + [(and top? (eof-object? (peek-char i))) eof] + [(regexp-try-match #px#"^true\\b" i) #t] + [(regexp-try-match #px#"^false\\b" i) #f] + [(regexp-try-match #px#"^null\\b" i) jsnull] + [(regexp-try-match + #rx#"^-?(?:0|[1-9][0-9]*)(?:\\.[0-9]+)?(?:[eE][+-]?[0-9]+)?" i) + => (λ (bs) (string->number (bytes->string/utf-8 (car bs))))] + [(regexp-try-match #rx#"^[\"[{]" i) + => (λ (m) + (let ([m (car m)]) + (cond [(equal? m #"\"") (read-string)] + [(equal? m #"[") (read-list 'array #rx#"^\\]" read-json)] + [(equal? m #"{") (read-hash)])))] + [else (err "bad input")])) + ;; + (read-json #t)) + +;; ---------------------------------------------------------------------------- +;; Convenience functions + +(provide jsexpr->string jsexpr->bytes) +(define (jsexpr->string x #:null [jsnull (json-null)] #:encode [enc 'control]) + (define o (open-output-string)) + (write-json x o #:null jsnull #:encode enc) + (get-output-string o)) +(define (jsexpr->bytes x #:null [jsnull (json-null)] #:encode [enc 'control]) + (define o (open-output-bytes)) + (write-json x o #:null jsnull #:encode enc) + (get-output-bytes o)) + +(provide string->jsexpr bytes->jsexpr) +(define (string->jsexpr str #:null [jsnull (json-null)]) + (unless (string? str) (raise-type-error 'string->jsexpr "string" str)) + (read-json (open-input-string str) #:null jsnull)) +(define (bytes->jsexpr str #:null [jsnull (json-null)]) + (unless (bytes? str) (raise-type-error 'bytes->jsexpr "bytes" str)) + (read-json (open-input-bytes str) #:null jsnull)) |