From 98b7a9c7af864fdc4cfe2c5b00f918b52e63d826 Mon Sep 17 00:00:00 2001 From: Random Hacker Date: Thu, 8 Mar 2012 00:51:51 +0100 Subject: ebus-racket: -pq uses synx/libpq --- ebus-racket/db-pq.rkt | 92 ++++++++++++++++++++++++++++++++++++++++ ebus-racket/db.rkt | 2 + ebus-racket/inserter-pq.rkt | 100 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 194 insertions(+) create mode 100644 ebus-racket/db-pq.rkt create mode 100755 ebus-racket/inserter-pq.rkt diff --git a/ebus-racket/db-pq.rkt b/ebus-racket/db-pq.rkt new file mode 100644 index 0000000..03ef154 --- /dev/null +++ b/ebus-racket/db-pq.rkt @@ -0,0 +1,92 @@ +#lang racket/base +;; Database Access with synx/libpq ffi bindings +(require racket/class) +(require (prefix-in pq: (planet synx/libpq:1:3))) + +(define logger (make-logger 'ebus-db (current-logger))) + +(define con-user? (make-parameter "ebus")) +(define con-password? (make-parameter "ebus")) +(define con-database? (make-parameter "ebus")) +(define con-server? (make-parameter "localhost")) + +(define _con (void)) +(define (con) + (when (void? _con) + (log-message logger 'info (format "Connect using libpq to Database: user=~a database=~a server=~a" + (con-user?) (con-database?) (con-server?)) #t) + (set! _con + (pq:connect #:host (con-server?) + #:user (con-user?) + #:password (con-password?) + #:dbname (con-database?)))) + _con) + +(define (query-matrix stmt) + (con) + (send _con exec stmt)) + +;; Test Database Connection +;; Returns #t on success, #f otherwise +(define (pgc-test) + (with-handlers ([exn:fail? (lambda (exn) (display exn)(display "\n") #f)]) + (query-matrix "SELECT 123") + #t)) + +;; Query ID of sensor given by sensor-name +;; return void if sensor is undefined +(define (get-sensor-id sensor-name) + (define sql-stmt "SELECT id FROM sensor WHERE name = $1") + (with-handlers ([exn:fail? (lambda (exn) (void))]) + (let* ( + [result (send (con) exec sql-stmt sensor-name)] + [matrix (send result get-matrix)]) + (caar matrix)))) + +;; Create Sensor-ID with given name +;; returns id +(define (create-sensor-id sensor-name) + (log-message logger 'info (format "create sensor id for ~a" sensor-name) #t) + (define sql-stmt "INSERT INTO sensor(name) VALUES ($1)") + (send (con) exec sql-stmt sensor-name) + (get-sensor-id sensor-name)) + +;; Get ID of sensor given by sensor-name +;; define sensor if needed +(define (get-or-create-sensor-id sensor-name) + (define id (get-sensor-id sensor-name)) + (cond ((void? id) (create-sensor-id sensor-name)) + (else id))) + +;; Insert Field in Database +;; Decide Database-Datatype from Ebus-Datatype +;; then calls 'insert` +(define (insert-field sensor-name datatype offset value) + (cond ((member datatype (list "data1c" "data2b" "data2c")) + (insert sensor-name "float" value)) + ((member datatype (list "bit" "byte" "data1b" "word" "bcd")) + (insert sensor-name "int" value)) + ((member datatype (list "byteEnum")) + (insert sensor-name "string" value)) + (else + (log-message logger 'error (format "Datatype ~a is not support by DB" datatype) #t)))) + +(define (insert sensor-name type value) + (define sensor-id (get-or-create-sensor-id sensor-name)) + (log-message logger 'info (string-append (format "sensor-id=~a type=~a value=~a" + sensor-id type value)) #t) + (cond ((string=? type "string") + (send (con) p-exec "INSERT INTO value (timestamp, sensor_id, type, value_string) VALUES (now(), $1, 'string', $2)" sensor-id value)) + ((string=? type "float") + (send (con) p-exec "INSERT INTO value (timestamp, sensor_id, type, value_float) VALUES (now(), $1, 'float', $2)" sensor-id value)) + ((string=? type "int") + (send (con) p-exec "INSERT INTO value (timestamp, sensor_id, type, value_int) VALUES (now(), $1, 'int', $2::integer)" sensor-id value)))) + +(provide + (prefix-out db- logger) + (prefix-out db- con-user?) + (prefix-out db- con-password?) + (prefix-out db- con-database?) + (prefix-out db- con-server?) + (prefix-out db- pgc-test) + (prefix-out db- insert-field)) diff --git a/ebus-racket/db.rkt b/ebus-racket/db.rkt index f203c5b..081cca7 100644 --- a/ebus-racket/db.rkt +++ b/ebus-racket/db.rkt @@ -1,4 +1,6 @@ #lang racket/base +;; Database Acess with ryanc/db +;; leaks memory somewhere in db.plt (require (prefix-in db: (planet ryanc/db:1:5))) (require (prefix-in db: (planet ryanc/db:1:5/util/connect))) diff --git a/ebus-racket/inserter-pq.rkt b/ebus-racket/inserter-pq.rkt new file mode 100755 index 0000000..7ed87d9 --- /dev/null +++ b/ebus-racket/inserter-pq.rkt @@ -0,0 +1,100 @@ +#! /usr/bin/env racket +#lang racket/base +(require racket/cmdline + racket/tcp + racket/match + "db-pq.rkt" + "layer7.rkt" + "layer2.rkt" + "tcp-repl.rkt") + +(define logger (make-logger 'ebus-inserter (current-logger))) + +(define connect-host? (make-parameter null)) +(define connect-port? (make-parameter null)) +(define loglevel-layer2? (make-parameter 'info)) +(define loglevel-db? (make-parameter 'warning)) +(define insert? (make-parameter #f)) + +(define (handle-fields fields) + (for ([field fields]) + (log-message logger 'info (format "Field: ~a" field) #t) + (when (insert?) + (with-handlers ([exn:fail? (lambda (exn) + (log-message logger 'error (format "Failed to insert ~a: ~a" field exn) #t))]) + (apply db-insert-field field))))) + +(define (make-ebus-loop7 input-port) + (lambda () + (let loop () + (with-handlers ([exn:fail? (lambda (exn) + (log-message logger 'error (format "Failed to parse paket: ~a" exn) #t))]) + (let ([fields (layer7-read-ebus (current-input-port))]) + (when (not (or (void? fields) (eof-object? fields))) + (handle-fields fields)) + (when (not (eof-object? fields)) + (loop))))))) + +;; Start Thread that observe all given log-receivers +(define (start-logger-thread receiver1 . receiverN) + (define receivers (cons receiver1 receiverN)) + (void + (thread + (lambda () + (let loop () + (match (apply sync receivers) + [(vector level msg data) + (printf "[~s] ~a~n" level msg) + (flush-output)]) + (loop)))))) + +(define-namespace-anchor repl-ns-anchor) + +(define (main) + ;; Parse commandline + (command-line + #:once-each + [("-c" "--connect") host port "Connect to server " + (connect-host? host) + (connect-port? (string->number port))] + ["--tcp-repl" port "Open REPL on TCP " + (tcp-repl-run (namespace-anchor->namespace repl-ns-anchor) (string->number port))] + ["--debug-layer2" "Log level for Layer 2 Parser" + (loglevel-layer2? 'debug)] + ["--debug-db" "Log level for DB" + (loglevel-db? 'debug)] + ["--insert" "Do Insert into Database" + (insert? #t)] + ["--db-user" user "Datanase User" + (db-con-user? user)] + ["--db-password" password "Database password" + (db-con-password? password)] + ["--db-database" database "Database database-name" + (db-con-database? database)] + ["--db-server" server "Database Address/Server" + (db-con-server? server)]) + + ;; Init Logging + (start-logger-thread (make-log-receiver logger 'info) + (make-log-receiver db-logger (loglevel-db?)) + (make-log-receiver layer2-logger (loglevel-layer2?)) + (make-log-receiver layer7-logger 'info) + (make-log-receiver tcp-repl-logger 'info)) + + ;; Test Database Connection + (when (not (db-pgc-test)) + (log-message logger 'fatal "Failed to connect to database" #t) + (exit 1)) + + ;; Connect, replacing input with tcp connection + (if (or (null? (connect-host?)) (null? (connect-port?))) + (log-message (current-logger) 'info "Using stdin" #t) + (let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))]) + (log-message logger 'info (format "Connected to ~s ~s ~n" (connect-host?) (connect-port?)) #t) + (current-input-port cin))) + + ;; Process Ebus Packets + ((make-ebus-loop7 (current-input-port)) ) + ) + +(exit (main)) -- cgit v1.2.1