#! /usr/bin/env racket #lang racket/base (require racket/cmdline racket/tcp racket/stream racket/string data/queue net/url "ebus/layer7.rkt" "util/tcp-repl.rkt" "util/json.rkt") (define-logger inserter) (define connect-host? (make-parameter null)) (define connect-port? (make-parameter null)) (define insert? (make-parameter #f)) (define baseurl? (make-parameter "http://localhost:8080/api/value")) (define influxurl? (make-parameter "http://db.2.localnet.cc:8001/write/ZQYZHSLWEIWCMXIYQSKAOCFCFWEVFK?db=ebus")) (define influx-queue (make-queue)) (define influx-queue-size? (make-parameter 0)) ;; Send field and value to database server (define (insert-field sensor-name datatype value) (if (layer7-ersatzwert? value) (log-inserter-debug "Skipping Ersatzwert for ~a/~a" sensor-name datatype) (let () (define url (format "~a/~a" (baseurl?) sensor-name)) (define raw-value (cond ((member datatype '(data1c data2b data2c)) (real->decimal-string value)) ((member datatype '(bit byte data1b word bcd)) (format "~s" value)) ((member datatype '(byteEnum)) value) (else (error 'invalid-datatype)))) (define input-port (put-impure-port (string->url url) (string->bytes/utf-8 raw-value))) (define server-response (read-line input-port)) (close-input-port input-port) (cond ((string=? (list-ref (string-split server-response) 1) "200") (log-inserter-info "OK: ~a : ~a" url raw-value)) (else (error "server-error" url raw-value server-response)))))) (define (insert-influxdb sensor-name datatype value) (if (layer7-ersatzwert? value) (log-inserter-debug "Skipping Ersatzwert for ~a/~a" sensor-name datatype) (let ([point (format "~a,type=~a value=~a" sensor-name (symbol->string datatype) value)]) (enqueue! influx-queue point) (log-inserter-debug (format "influxdb: ~a~n" (jsexpr->string point))) (when (> (queue-length influx-queue) (influx-queue-size?)) (let ([points (queue->list influx-queue)]) (log-inserter-info "Make bulk insert") (for-each (lambda (e) (dequeue! influx-queue)) (queue->list influx-queue)) ;; empty the queue (define input-port (post-impure-port (string->url (influxurl?)) (string->bytes/utf-8 (string-join points (format "~n"))))) (log-inserter-info "Server Response: ~a~nData: ~a" (read-line input-port) (string-join points "|")) (close-input-port input-port)))))) (define (handle-packet packet) (for ([field packet]) (when (insert?) (with-handlers ([exn:fail? (lambda (exn) (log-inserter-error "Failed to insert ~a: ~a" field exn))] [exn:fail:read? (lambda (exn) (log-inserter-error "TCP Read exception ~a" exn))] [exn:fail:network? (lambda (exn) (log-inserter-error "TCP Exception ~a" exn))]) (apply insert-influxdb field) (apply insert-field field))))) (define-namespace-anchor repl-ns-anchor) (define (main) ;; Parse commandline (command-line #:once-each [("-c" "--connect") host port "Connect to server " (connect-host? host) (connect-port? (string->number port))] ["--tcp-repl" port "Open REPL on TCP " (tcp-repl-run (namespace-anchor->namespace repl-ns-anchor) (string->number port))] ["--insert" "Do Insert into Database" (insert? #t)] ["--baseurl" url "Database server http url" (baseurl? url)]) ;; Connect, replacing input with tcp connection (if (or (null? (connect-host?)) (null? (connect-port?))) (log-inserter-info "Using stdin") (let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))]) (log-inserter-info "Connected to ~s ~s ~n" (connect-host?) (connect-port?)) (current-input-port cin))) ;; Process Ebus Packets (for ([packet (make-stream (current-input-port))]) (when (not (or (void? packet) (eof-object? packet))) (handle-packet packet)) (when (eof-object? packet) (exit 1))) ) (define (make-stream port) (stream-cons (with-handlers ([exn:fail? (lambda (exn) (log-inserter-error "Failed to parse paket: ~a" exn) (void))]) (layer7-read-ebus port)) (make-stream port))) (exit (main))