summaryrefslogtreecommitdiff
path: root/ebus-racket/reader.rkt
diff options
context:
space:
mode:
authorYves Fischer <yvesf-git@xapek.org>2016-08-14 19:25:26 +0200
committerYves Fischer <yvesf-git@xapek.org>2016-08-14 19:25:26 +0200
commitcaae83f445935c06cd6aef36f283a4688675278a (patch)
tree5e63cbfd2877195430a8657dcd75f42b6a4d7110 /ebus-racket/reader.rkt
downloadebus-caae83f445935c06cd6aef36f283a4688675278a.tar.gz
ebus-caae83f445935c06cd6aef36f283a4688675278a.zip
refactored ebus code
Diffstat (limited to 'ebus-racket/reader.rkt')
-rwxr-xr-xebus-racket/reader.rkt82
1 files changed, 82 insertions, 0 deletions
diff --git a/ebus-racket/reader.rkt b/ebus-racket/reader.rkt
new file mode 100755
index 0000000..438b807
--- /dev/null
+++ b/ebus-racket/reader.rkt
@@ -0,0 +1,82 @@
+#!/usr/bin/env racket
+#lang racket/base
+(require racket/cmdline
+ racket/stream
+ racket/string
+ data/queue
+ net/url
+ (prefix-in layer7- "ebus/layer7.rkt"))
+
+(define-logger inserter)
+
+(define insert? (make-parameter #f))
+(define influx-url? (make-parameter null))
+(define influx-queue (make-queue))
+(define influx-queue-size? (make-parameter 0))
+(define ebus-xml-path? (make-parameter "../ebus-xml/ebus.xml"))
+
+;; Send fields to database server
+(define (insert-influxdb sensor-name datatype value)
+ (if (layer7-ersatzwert? value)
+ (log-inserter-debug "Skipping Ersatzwert for ~a/~a" sensor-name datatype)
+ ;; Some basic formatting rules. This must satisfy the influxdb "Write Protocol"
+ ;; https://docs.influxdata.com/influxdb/v0.13/write_protocols/line/
+ (let* ([raw-value (cond ((member datatype '(data1c data2b data2c)) (real->decimal-string (exact->inexact value)))
+ ((member datatype '(bit byte data1b word bcd)) (format "~s" value))
+ ((member datatype '(byteEnum)) (format "\"~s\"" value)))]
+ [point (format "~a,type=~a value=~a" sensor-name (symbol->string datatype) raw-value)])
+ (enqueue! influx-queue point)
+ (log-inserter-debug (format "influxdb: ~a~n" point))
+ (when (> (queue-length influx-queue) (influx-queue-size?))
+ (let ([points (queue->list influx-queue)]) ;; empty the queue
+ (log-inserter-info "Make bulk insert to ~a" (influx-url?))
+ (for-each (lambda (e) (dequeue! influx-queue)) (queue->list influx-queue))
+ (define input-port (post-impure-port (string->url (influx-url?))
+ (string->bytes/utf-8 (string-join points (format "~n")))))
+ (log-inserter-info "Server Response: ~a~n" (read-line input-port))
+ (log-inserter-info "Data: ~a~n" (string-join points "|"))
+ (close-input-port input-port))))))
+
+(define (handle-packet packet)
+ (for ([field packet])
+ (when (insert?)
+ (with-handlers ([exn:fail? (lambda (exn)
+ (log-inserter-error "Failed to insert ~a: ~a" field exn))]
+ [exn:fail:read? (lambda (exn)
+ (log-inserter-error "TCP Read exception ~a" exn))]
+ [exn:fail:network? (lambda (exn)
+ (log-inserter-error "TCP Exception ~a" exn))])
+ (apply insert-influxdb field)))
+ (when (not (insert?))
+ (apply (lambda (sensor-name datatype value)
+ (printf "No Insert: (~a) ~a=~a~n" datatype sensor-name value))
+ field))))
+
+(define (make-stream port)
+ (stream-cons
+ (with-handlers ([exn:fail? (lambda (exn)
+ (log-inserter-error "Failed to parse packet: ~a" exn)
+ (void))])
+ (layer7-read-ebus port))
+ (make-stream port)))
+
+(define (main)
+ ;; Parse commandline
+ (command-line
+ #:once-each
+ ["--insert" "Do Insert into Database"
+ (insert? #t)]
+ ["--influx-url" url "Influx server http write url"
+ (influx-url? url)]
+ ["--ebus-xml" ebus-xml-path "Influx server http write url"
+ (ebus-xml-path? ebus-xml-path)])
+
+ (parameterize ([layer7-definition (layer7-read-ebus-xml (ebus-xml-path?))])
+ ;; process ebus packets from stdin
+ (for ([packet (make-stream (current-input-port))])
+ (when (not (or (void? packet) (eof-object? packet)))
+ (handle-packet packet))
+ (when (eof-object? packet)
+ (exit 1)))))
+
+(exit (main))