summaryrefslogtreecommitdiff
path: root/ebus-racket/inserter.rkt
blob: 7adbe887400ac00e4ca99f23c5838bac85f62689 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#! /usr/bin/env racket
#lang racket/base
(require racket/cmdline
	 racket/tcp
	 racket/stream
	 racket/string
	 data/queue
	 net/url
	 "ebus/layer7.rkt"
	 "util/tcp-repl.rkt"
	 "util/json.rkt")

(define-logger inserter)

(define connect-host? (make-parameter null))
(define connect-port? (make-parameter null))
(define insert? (make-parameter #f))
(define baseurl? (make-parameter "http://localhost:8080/api/value"))
(define influxurl? (make-parameter "http://db.2.localnet.cc:8001/write/ZQYZHSLWEIWCMXIYQSKAOCFCFWEVFK?db=ebus"))

(define influx-queue (make-queue))
(define influx-queue-size? (make-parameter 0))


;; Send field and value to database server
(define (insert-field sensor-name datatype value)
  (if (layer7-ersatzwert? value)
	(log-inserter-debug "Skipping Ersatzwert for ~a/~a" sensor-name datatype)
	(let ()
	  (define url (format "~a/~a" (baseurl?) sensor-name))
	  (define raw-value
		(cond ((member datatype '(data1c data2b data2c)) (real->decimal-string value))
			  ((member datatype '(bit byte data1b word bcd)) (format "~s" value))
			  ((member datatype '(byteEnum)) value)
			  (else (error 'invalid-datatype))))
	  (define input-port (put-impure-port (string->url url) (string->bytes/utf-8 raw-value)))
	  (define server-response (read-line input-port))
	  (close-input-port input-port)
	  (cond ((string=? (list-ref (string-split server-response) 1) "200")
			 (log-inserter-info "OK: ~a : ~a" url raw-value))
			(else 
			  (error "server-error" url raw-value server-response))))))

(define (insert-influxdb sensor-name datatype value)
  (if (layer7-ersatzwert? value)
	(log-inserter-debug "Skipping Ersatzwert for ~a/~a" sensor-name datatype)
	(let ([point (format "~a,type=~a value=~a" sensor-name (symbol->string datatype) value)])
	  (enqueue! influx-queue point)
	  (log-inserter-debug (format "influxdb: ~a~n" (jsexpr->string point)))
	  (when (> (queue-length influx-queue) (influx-queue-size?))
		(let ([points (queue->list influx-queue)])
		  (log-inserter-info "Make bulk insert")
		  (for-each (lambda (e) (dequeue! influx-queue)) (queue->list influx-queue)) ;; empty the queue
		  (define input-port (post-impure-port (string->url (influxurl?)) 
											   (string->bytes/utf-8 (string-join points (format "~n")))))
		  (log-inserter-info "Server Response: ~a~nData: ~a" (read-line input-port) (string-join points "|"))
		  (close-input-port input-port))))))

(define (handle-packet packet)
  (for ([field packet])
       (when (insert?)
         (with-handlers ([exn:fail? (lambda (exn)
                                      (log-inserter-error "Failed to insert ~a: ~a" field exn))]
			 [exn:fail:read? (lambda (exn)
                                      (log-inserter-error "TCP Read exception ~a" exn))]
			 [exn:fail:network? (lambda (exn)
                                      (log-inserter-error "TCP Exception ~a" exn))])
		   (apply insert-influxdb field)
           (apply insert-field field)))))

(define-namespace-anchor repl-ns-anchor)

(define (main)
  ;; Parse commandline
  (command-line
   #:once-each
   [("-c" "--connect") host port "Connect to server <host> <port>"
    (connect-host? host)
    (connect-port? (string->number port))]
   ["--tcp-repl" port "Open REPL on TCP <port>"
    (tcp-repl-run (namespace-anchor->namespace repl-ns-anchor) (string->number port))]
   ["--insert" "Do Insert into Database"
    (insert? #t)]
   ["--baseurl" url "Database server http url"
    (baseurl? url)])

  ;; Connect, replacing input with tcp connection
  (if (or (null? (connect-host?)) (null? (connect-port?)))
      (log-inserter-info "Using stdin")
      (let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))])
	(log-inserter-info "Connected to ~s ~s ~n" (connect-host?) (connect-port?))
	(current-input-port cin)))

  ;; Process Ebus Packets
  (for ([packet (make-stream (current-input-port))])
       (when (not (or (void? packet) (eof-object? packet)))
	 (handle-packet packet))
       (when (eof-object? packet)
	 (exit 1))) )

(define (make-stream port)
  (stream-cons (with-handlers ([exn:fail? (lambda (exn)
					    (log-inserter-error "Failed to parse paket: ~a" exn)
					    (void))])
			      (layer7-read-ebus port))
	       (make-stream port)))

(exit (main))