summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandom Hacker <random_hacker@xapek.org>2012-03-08 00:51:51 +0100
committerRandom Hacker <random_hacker@xapek.org>2012-03-08 00:51:51 +0100
commit98b7a9c7af864fdc4cfe2c5b00f918b52e63d826 (patch)
tree1cbd1f437c619b8e1c665b906264534eeab4a838
parentb79a5a8ab011e8b4ee3a9adb3b1085a6acd0f482 (diff)
downloadebus-alt-98b7a9c7af864fdc4cfe2c5b00f918b52e63d826.tar.gz
ebus-alt-98b7a9c7af864fdc4cfe2c5b00f918b52e63d826.zip
ebus-racket: -pq uses synx/libpq
-rw-r--r--ebus-racket/db-pq.rkt92
-rw-r--r--ebus-racket/db.rkt2
-rwxr-xr-xebus-racket/inserter-pq.rkt100
3 files changed, 194 insertions, 0 deletions
diff --git a/ebus-racket/db-pq.rkt b/ebus-racket/db-pq.rkt
new file mode 100644
index 0000000..03ef154
--- /dev/null
+++ b/ebus-racket/db-pq.rkt
@@ -0,0 +1,92 @@
+#lang racket/base
+;; Database Access with synx/libpq ffi bindings
+(require racket/class)
+(require (prefix-in pq: (planet synx/libpq:1:3)))
+
+(define logger (make-logger 'ebus-db (current-logger)))
+
+(define con-user? (make-parameter "ebus"))
+(define con-password? (make-parameter "ebus"))
+(define con-database? (make-parameter "ebus"))
+(define con-server? (make-parameter "localhost"))
+
+(define _con (void))
+(define (con)
+ (when (void? _con)
+ (log-message logger 'info (format "Connect using libpq to Database: user=~a database=~a server=~a"
+ (con-user?) (con-database?) (con-server?)) #t)
+ (set! _con
+ (pq:connect #:host (con-server?)
+ #:user (con-user?)
+ #:password (con-password?)
+ #:dbname (con-database?))))
+ _con)
+
+(define (query-matrix stmt)
+ (con)
+ (send _con exec stmt))
+
+;; Test Database Connection
+;; Returns #t on success, #f otherwise
+(define (pgc-test)
+ (with-handlers ([exn:fail? (lambda (exn) (display exn)(display "\n") #f)])
+ (query-matrix "SELECT 123")
+ #t))
+
+;; Query ID of sensor given by sensor-name
+;; return void if sensor is undefined
+(define (get-sensor-id sensor-name)
+ (define sql-stmt "SELECT id FROM sensor WHERE name = $1")
+ (with-handlers ([exn:fail? (lambda (exn) (void))])
+ (let* (
+ [result (send (con) exec sql-stmt sensor-name)]
+ [matrix (send result get-matrix)])
+ (caar matrix))))
+
+;; Create Sensor-ID with given name
+;; returns id
+(define (create-sensor-id sensor-name)
+ (log-message logger 'info (format "create sensor id for ~a" sensor-name) #t)
+ (define sql-stmt "INSERT INTO sensor(name) VALUES ($1)")
+ (send (con) exec sql-stmt sensor-name)
+ (get-sensor-id sensor-name))
+
+;; Get ID of sensor given by sensor-name
+;; define sensor if needed
+(define (get-or-create-sensor-id sensor-name)
+ (define id (get-sensor-id sensor-name))
+ (cond ((void? id) (create-sensor-id sensor-name))
+ (else id)))
+
+;; Insert Field in Database
+;; Decide Database-Datatype from Ebus-Datatype
+;; then calls 'insert`
+(define (insert-field sensor-name datatype offset value)
+ (cond ((member datatype (list "data1c" "data2b" "data2c"))
+ (insert sensor-name "float" value))
+ ((member datatype (list "bit" "byte" "data1b" "word" "bcd"))
+ (insert sensor-name "int" value))
+ ((member datatype (list "byteEnum"))
+ (insert sensor-name "string" value))
+ (else
+ (log-message logger 'error (format "Datatype ~a is not support by DB" datatype) #t))))
+
+(define (insert sensor-name type value)
+ (define sensor-id (get-or-create-sensor-id sensor-name))
+ (log-message logger 'info (string-append (format "sensor-id=~a type=~a value=~a"
+ sensor-id type value)) #t)
+ (cond ((string=? type "string")
+ (send (con) p-exec "INSERT INTO value (timestamp, sensor_id, type, value_string) VALUES (now(), $1, 'string', $2)" sensor-id value))
+ ((string=? type "float")
+ (send (con) p-exec "INSERT INTO value (timestamp, sensor_id, type, value_float) VALUES (now(), $1, 'float', $2)" sensor-id value))
+ ((string=? type "int")
+ (send (con) p-exec "INSERT INTO value (timestamp, sensor_id, type, value_int) VALUES (now(), $1, 'int', $2::integer)" sensor-id value))))
+
+(provide
+ (prefix-out db- logger)
+ (prefix-out db- con-user?)
+ (prefix-out db- con-password?)
+ (prefix-out db- con-database?)
+ (prefix-out db- con-server?)
+ (prefix-out db- pgc-test)
+ (prefix-out db- insert-field))
diff --git a/ebus-racket/db.rkt b/ebus-racket/db.rkt
index f203c5b..081cca7 100644
--- a/ebus-racket/db.rkt
+++ b/ebus-racket/db.rkt
@@ -1,4 +1,6 @@
#lang racket/base
+;; Database Acess with ryanc/db
+;; leaks memory somewhere in db.plt
(require (prefix-in db: (planet ryanc/db:1:5)))
(require (prefix-in db: (planet ryanc/db:1:5/util/connect)))
diff --git a/ebus-racket/inserter-pq.rkt b/ebus-racket/inserter-pq.rkt
new file mode 100755
index 0000000..7ed87d9
--- /dev/null
+++ b/ebus-racket/inserter-pq.rkt
@@ -0,0 +1,100 @@
+#! /usr/bin/env racket
+#lang racket/base
+(require racket/cmdline
+ racket/tcp
+ racket/match
+ "db-pq.rkt"
+ "layer7.rkt"
+ "layer2.rkt"
+ "tcp-repl.rkt")
+
+(define logger (make-logger 'ebus-inserter (current-logger)))
+
+(define connect-host? (make-parameter null))
+(define connect-port? (make-parameter null))
+(define loglevel-layer2? (make-parameter 'info))
+(define loglevel-db? (make-parameter 'warning))
+(define insert? (make-parameter #f))
+
+(define (handle-fields fields)
+ (for ([field fields])
+ (log-message logger 'info (format "Field: ~a" field) #t)
+ (when (insert?)
+ (with-handlers ([exn:fail? (lambda (exn)
+ (log-message logger 'error (format "Failed to insert ~a: ~a" field exn) #t))])
+ (apply db-insert-field field)))))
+
+(define (make-ebus-loop7 input-port)
+ (lambda ()
+ (let loop ()
+ (with-handlers ([exn:fail? (lambda (exn)
+ (log-message logger 'error (format "Failed to parse paket: ~a" exn) #t))])
+ (let ([fields (layer7-read-ebus (current-input-port))])
+ (when (not (or (void? fields) (eof-object? fields)))
+ (handle-fields fields))
+ (when (not (eof-object? fields))
+ (loop)))))))
+
+;; Start Thread that observe all given log-receivers
+(define (start-logger-thread receiver1 . receiverN)
+ (define receivers (cons receiver1 receiverN))
+ (void
+ (thread
+ (lambda ()
+ (let loop ()
+ (match (apply sync receivers)
+ [(vector level msg data)
+ (printf "[~s] ~a~n" level msg)
+ (flush-output)])
+ (loop))))))
+
+(define-namespace-anchor repl-ns-anchor)
+
+(define (main)
+ ;; Parse commandline
+ (command-line
+ #:once-each
+ [("-c" "--connect") host port "Connect to server <host> <port>"
+ (connect-host? host)
+ (connect-port? (string->number port))]
+ ["--tcp-repl" port "Open REPL on TCP <port>"
+ (tcp-repl-run (namespace-anchor->namespace repl-ns-anchor) (string->number port))]
+ ["--debug-layer2" "Log level for Layer 2 Parser"
+ (loglevel-layer2? 'debug)]
+ ["--debug-db" "Log level for DB"
+ (loglevel-db? 'debug)]
+ ["--insert" "Do Insert into Database"
+ (insert? #t)]
+ ["--db-user" user "Datanase User"
+ (db-con-user? user)]
+ ["--db-password" password "Database password"
+ (db-con-password? password)]
+ ["--db-database" database "Database database-name"
+ (db-con-database? database)]
+ ["--db-server" server "Database Address/Server"
+ (db-con-server? server)])
+
+ ;; Init Logging
+ (start-logger-thread (make-log-receiver logger 'info)
+ (make-log-receiver db-logger (loglevel-db?))
+ (make-log-receiver layer2-logger (loglevel-layer2?))
+ (make-log-receiver layer7-logger 'info)
+ (make-log-receiver tcp-repl-logger 'info))
+
+ ;; Test Database Connection
+ (when (not (db-pgc-test))
+ (log-message logger 'fatal "Failed to connect to database" #t)
+ (exit 1))
+
+ ;; Connect, replacing input with tcp connection
+ (if (or (null? (connect-host?)) (null? (connect-port?)))
+ (log-message (current-logger) 'info "Using stdin" #t)
+ (let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))])
+ (log-message logger 'info (format "Connected to ~s ~s ~n" (connect-host?) (connect-port?)) #t)
+ (current-input-port cin)))
+
+ ;; Process Ebus Packets
+ ((make-ebus-loop7 (current-input-port)) )
+ )
+
+(exit (main))