From 7f149ab501ab6121bddb82788e4156d21a1828c9 Mon Sep 17 00:00:00 2001 From: Ebus-at-dockstar Date: Thu, 7 Mar 2013 14:18:37 +0100 Subject: racket update to http database server. --- ebus-racket/inserter.rkt | 100 +++++++++++++++++++++-------------------------- 1 file changed, 45 insertions(+), 55 deletions(-) (limited to 'ebus-racket/inserter.rkt') diff --git a/ebus-racket/inserter.rkt b/ebus-racket/inserter.rkt index 657154f..cd05ad9 100755 --- a/ebus-racket/inserter.rkt +++ b/ebus-racket/inserter.rkt @@ -2,52 +2,48 @@ #lang racket/base (require racket/cmdline racket/tcp - racket/match - "ebus/db.rkt" + racket/stream + net/url "ebus/layer7.rkt" - "ebus/layer2.rkt" - "util/tcp-repl.rkt") + "util/tcp-repl.rkt" + "util/json.rkt") (define logger (make-logger 'ebus-inserter (current-logger))) (define connect-host? (make-parameter null)) (define connect-port? (make-parameter null)) -(define loglevel-layer2? (make-parameter 'info)) -(define loglevel-db? (make-parameter 'warning)) (define insert? (make-parameter #f)) +(define baseurl? (make-parameter "http://localhost:8000/sensor")) -(define (handle-fields fields) - (for ([field fields]) +;; Send field and value to database server +(define (insert-field sensor-name datatype offset value) + (define type (cond ((member datatype (list "data1c" "data2b" "data2c")) "float") + ((member datatype (list "bit" "byte" "data1b" "word" "bcd")) "int") + ((member datatype (list "byteEnum")) "string"))) + (set! value (cond ((string=? type "float") (real->decimal-string value)) + (else value))) + (define response + (read-line (put-pure-port + (string->url (format "~a/~a" (baseurl?) sensor-name)) + (string->bytes/utf-8 (format "value=~a&type=~a" value type))))) + (define responseJson (string->jsexpr response)) + (cond ((eq? (json-null) (hash-ref responseJson 'error)) + (log-message logger 'debug (format "Successful insert: type=~a value=~a" + type value) #t)) + (else (log-message logger 'error (format "Error: type=~a value=~a ERROR:~a" + type value response) #t)))) + +(define (handle-packet packet) + (for ([field packet]) (log-message logger 'info (format "Field: ~a" field) #t) (when (insert?) (with-handlers ([exn:fail? (lambda (exn) - (log-message logger 'error (format "Failed to insert ~a: ~a" field exn) #t))]) - (apply db-insert-field field))))) - -(define (make-ebus-loop7 input-port) - (lambda () - (let loop () - (with-handlers ([exn:fail? (lambda (exn) - (log-message logger 'error (format "Failed to parse paket: ~a" exn) #t) - (loop))]) - (let ([fields (layer7-read-ebus (current-input-port))]) - (when (not (or (void? fields) (eof-object? fields))) - (handle-fields fields)) - (when (not (eof-object? fields)) - (loop))))))) - -;; Start Thread that observe all given log-receivers -(define (start-logger-thread receiver1 . receiverN) - (define receivers (cons receiver1 receiverN)) - (void - (thread - (lambda () - (let loop () - (match (apply sync receivers) - [(vector level msg data) - (printf "[~s] ~a~n" level msg) - (flush-output)]) - (loop)))))) + (log-message logger 'error (format "Failed to insert ~a: ~a" field exn) #t))] + [exn:fail:read? (lambda (exn) + (log-message logger 'error (format "TCP Read exception ~a" exn) #t))] + [exn:fail:network? (lambda (exn) + (log-message logger 'error (format "TCP Exception ~a" exn) #t))]) + (apply insert-field field))))) (define-namespace-anchor repl-ns-anchor) @@ -60,26 +56,10 @@ (connect-port? (string->number port))] ["--tcp-repl" port "Open REPL on TCP " (tcp-repl-run (namespace-anchor->namespace repl-ns-anchor) (string->number port))] - ["--debug-layer2" "Log level for Layer 2 Parser" - (loglevel-layer2? 'debug)] - ["--debug-db" "Log level for DB" - (loglevel-db? 'debug)] ["--insert" "Do Insert into Database" (insert? #t)] - ["--db-file" user "Database file" - (db-file? user)]) - - ;; Init Logging - (start-logger-thread (make-log-receiver logger 'info) - (make-log-receiver db-logger (loglevel-db?)) - (make-log-receiver layer2-logger (loglevel-layer2?)) - (make-log-receiver layer7-logger 'info) - (make-log-receiver tcp-repl-logger 'info)) - - ;; Test Database Connection - (when (not (db-test)) - (log-message logger 'fatal "Failed to connect to database" #t) - (exit 1)) + ["--baseurl" url "Database server http url" + (baseurl? url)]) ;; Connect, replacing input with tcp connection (if (or (null? (connect-host?)) (null? (connect-port?))) @@ -89,7 +69,17 @@ (current-input-port cin))) ;; Process Ebus Packets - ((make-ebus-loop7 (current-input-port)) ) - ) + (for ([packet (make-stream (current-input-port))]) + (when (not (or (void? packet) (eof-object? packet))) + (handle-packet packet)) + (when (eof-object? packet) + (exit 1))) ) + +(define (make-stream port) + (stream-cons (with-handlers ([exn:fail? (lambda (exn) + (log-message logger 'error (format "Failed to parse paket: ~a" exn) #t) + (void))]) + (layer7-read-ebus port)) + (make-stream port))) (exit (main)) -- cgit v1.2.1