summaryrefslogtreecommitdiff
path: root/ebus-racket/inserter.rkt
diff options
context:
space:
mode:
authorEbus-at-dockstar <ebus@dockstar>2013-03-07 14:18:37 +0100
committerEbus-at-dockstar <ebus@dockstar>2013-03-07 14:18:37 +0100
commit7f149ab501ab6121bddb82788e4156d21a1828c9 (patch)
tree041ddfc4d977d1b90ea76946196fa835fbf2b1eb /ebus-racket/inserter.rkt
parent13222c58142aa6b3bdf50525a250a81c4ab52d55 (diff)
downloadebus-alt-7f149ab501ab6121bddb82788e4156d21a1828c9.tar.gz
ebus-alt-7f149ab501ab6121bddb82788e4156d21a1828c9.zip
racket update to http database server.
Diffstat (limited to 'ebus-racket/inserter.rkt')
-rwxr-xr-xebus-racket/inserter.rkt100
1 files changed, 45 insertions, 55 deletions
diff --git a/ebus-racket/inserter.rkt b/ebus-racket/inserter.rkt
index 657154f..cd05ad9 100755
--- a/ebus-racket/inserter.rkt
+++ b/ebus-racket/inserter.rkt
@@ -2,52 +2,48 @@
#lang racket/base
(require racket/cmdline
racket/tcp
- racket/match
- "ebus/db.rkt"
+ racket/stream
+ net/url
"ebus/layer7.rkt"
- "ebus/layer2.rkt"
- "util/tcp-repl.rkt")
+ "util/tcp-repl.rkt"
+ "util/json.rkt")
(define logger (make-logger 'ebus-inserter (current-logger)))
(define connect-host? (make-parameter null))
(define connect-port? (make-parameter null))
-(define loglevel-layer2? (make-parameter 'info))
-(define loglevel-db? (make-parameter 'warning))
(define insert? (make-parameter #f))
+(define baseurl? (make-parameter "http://localhost:8000/sensor"))
-(define (handle-fields fields)
- (for ([field fields])
+;; Send field and value to database server
+(define (insert-field sensor-name datatype offset value)
+ (define type (cond ((member datatype (list "data1c" "data2b" "data2c")) "float")
+ ((member datatype (list "bit" "byte" "data1b" "word" "bcd")) "int")
+ ((member datatype (list "byteEnum")) "string")))
+ (set! value (cond ((string=? type "float") (real->decimal-string value))
+ (else value)))
+ (define response
+ (read-line (put-pure-port
+ (string->url (format "~a/~a" (baseurl?) sensor-name))
+ (string->bytes/utf-8 (format "value=~a&type=~a" value type)))))
+ (define responseJson (string->jsexpr response))
+ (cond ((eq? (json-null) (hash-ref responseJson 'error))
+ (log-message logger 'debug (format "Successful insert: type=~a value=~a"
+ type value) #t))
+ (else (log-message logger 'error (format "Error: type=~a value=~a ERROR:~a"
+ type value response) #t))))
+
+(define (handle-packet packet)
+ (for ([field packet])
(log-message logger 'info (format "Field: ~a" field) #t)
(when (insert?)
(with-handlers ([exn:fail? (lambda (exn)
- (log-message logger 'error (format "Failed to insert ~a: ~a" field exn) #t))])
- (apply db-insert-field field)))))
-
-(define (make-ebus-loop7 input-port)
- (lambda ()
- (let loop ()
- (with-handlers ([exn:fail? (lambda (exn)
- (log-message logger 'error (format "Failed to parse paket: ~a" exn) #t)
- (loop))])
- (let ([fields (layer7-read-ebus (current-input-port))])
- (when (not (or (void? fields) (eof-object? fields)))
- (handle-fields fields))
- (when (not (eof-object? fields))
- (loop)))))))
-
-;; Start Thread that observe all given log-receivers
-(define (start-logger-thread receiver1 . receiverN)
- (define receivers (cons receiver1 receiverN))
- (void
- (thread
- (lambda ()
- (let loop ()
- (match (apply sync receivers)
- [(vector level msg data)
- (printf "[~s] ~a~n" level msg)
- (flush-output)])
- (loop))))))
+ (log-message logger 'error (format "Failed to insert ~a: ~a" field exn) #t))]
+ [exn:fail:read? (lambda (exn)
+ (log-message logger 'error (format "TCP Read exception ~a" exn) #t))]
+ [exn:fail:network? (lambda (exn)
+ (log-message logger 'error (format "TCP Exception ~a" exn) #t))])
+ (apply insert-field field)))))
(define-namespace-anchor repl-ns-anchor)
@@ -60,26 +56,10 @@
(connect-port? (string->number port))]
["--tcp-repl" port "Open REPL on TCP <port>"
(tcp-repl-run (namespace-anchor->namespace repl-ns-anchor) (string->number port))]
- ["--debug-layer2" "Log level for Layer 2 Parser"
- (loglevel-layer2? 'debug)]
- ["--debug-db" "Log level for DB"
- (loglevel-db? 'debug)]
["--insert" "Do Insert into Database"
(insert? #t)]
- ["--db-file" user "Database file"
- (db-file? user)])
-
- ;; Init Logging
- (start-logger-thread (make-log-receiver logger 'info)
- (make-log-receiver db-logger (loglevel-db?))
- (make-log-receiver layer2-logger (loglevel-layer2?))
- (make-log-receiver layer7-logger 'info)
- (make-log-receiver tcp-repl-logger 'info))
-
- ;; Test Database Connection
- (when (not (db-test))
- (log-message logger 'fatal "Failed to connect to database" #t)
- (exit 1))
+ ["--baseurl" url "Database server http url"
+ (baseurl? url)])
;; Connect, replacing input with tcp connection
(if (or (null? (connect-host?)) (null? (connect-port?)))
@@ -89,7 +69,17 @@
(current-input-port cin)))
;; Process Ebus Packets
- ((make-ebus-loop7 (current-input-port)) )
- )
+ (for ([packet (make-stream (current-input-port))])
+ (when (not (or (void? packet) (eof-object? packet)))
+ (handle-packet packet))
+ (when (eof-object? packet)
+ (exit 1))) )
+
+(define (make-stream port)
+ (stream-cons (with-handlers ([exn:fail? (lambda (exn)
+ (log-message logger 'error (format "Failed to parse paket: ~a" exn) #t)
+ (void))])
+ (layer7-read-ebus port))
+ (make-stream port)))
(exit (main))