summaryrefslogtreecommitdiff
path: root/ebus-racket/inserter.rkt
diff options
context:
space:
mode:
authorEbus-at-dockstar <ebus@dockstar>2015-05-10 21:47:10 +0200
committerEbus-at-dockstar <ebus@dockstar>2015-05-10 21:47:10 +0200
commitc7dc49bae1b0183389ad7ac40f96789d0a444a09 (patch)
treeb1212e827d8fcf3fdc25a0a05f615f184bb20ee9 /ebus-racket/inserter.rkt
parenta925e9b9438f2d6b9c00ab84c3c1a4089fef2d28 (diff)
downloadebus-alt-c7dc49bae1b0183389ad7ac40f96789d0a444a09.tar.gz
ebus-alt-c7dc49bae1b0183389ad7ac40f96789d0a444a09.zip
queue 10 pakets before sending to influxdb
Diffstat (limited to 'ebus-racket/inserter.rkt')
-rwxr-xr-xebus-racket/inserter.rkt33
1 files changed, 22 insertions, 11 deletions
diff --git a/ebus-racket/inserter.rkt b/ebus-racket/inserter.rkt
index c068e1c..029688e 100755
--- a/ebus-racket/inserter.rkt
+++ b/ebus-racket/inserter.rkt
@@ -4,6 +4,7 @@
racket/tcp
racket/stream
racket/string
+ data/queue
net/url
"ebus/layer7.rkt"
"util/tcp-repl.rkt"
@@ -17,11 +18,14 @@
(define baseurl? (make-parameter "http://localhost:8080/api/value"))
(define influxurl? (make-parameter "http://db.2.localnet.cc:8001/write/ZQYZHSLWEIWCMXIYQSKAOCFCFWEVFK"))
+(define influx-queue (make-queue))
+(define influx-queue-size? (make-parameter 10))
+
;; Send field and value to database server
(define (insert-field sensor-name datatype value)
(if (layer7-ersatzwert? value)
- (log-inserter-info "Skipping Ersatzwert for ~a/~a" sensor-name datatype)
+ (log-inserter-debug "Skipping Ersatzwert for ~a/~a" sensor-name datatype)
(let ()
(define url (format "~a/~a" (baseurl?) sensor-name))
(define raw-value
@@ -39,16 +43,23 @@
(define (insert-influxdb sensor-name datatype value)
(if (layer7-ersatzwert? value)
- (log-inserter-info "Skipping Ersatzwert for ~a/~a" sensor-name datatype)
- (let ([data (hash 'database "ebus"
- 'points (list (hash 'name sensor-name
- 'fields (hash 'value value
- 'type (symbol->string datatype)))))])
- (log-inserter-info (format "influxdb: ~a~n" (jsexpr->string data)))
- (define input-port (post-impure-port (string->url (influxurl?)) (string->bytes/utf-8 (jsexpr->string data))))
- (define server-response (read-line input-port))
- (close-input-port input-port)
- (log-inserter-info "Response: ~a" server-response))))
+ (log-inserter-debug "Skipping Ersatzwert for ~a/~a" sensor-name datatype)
+ (let ([point (hash 'name sensor-name
+ 'precision "s"
+ 'tags (hash 'type (symbol->string datatype))
+ 'fields (hash 'value value
+ 'type (symbol->string datatype)))])
+ (enqueue! influx-queue point)
+ (log-inserter-debug (format "influxdb: ~a~n" (jsexpr->string point)))
+ (when (> (queue-length influx-queue) (influx-queue-size?))
+ (let ([data (hash 'database "ebus"
+ 'points (queue->list influx-queue))])
+ (log-inserter-info "Make bulk insert")
+ (for-each (lambda (e) (dequeue! influx-queue)) (queue->list influx-queue))
+ (define input-port (post-impure-port (string->url (influxurl?)) (string->bytes/utf-8 (jsexpr->string data))))
+ (define server-response (read-line input-port))
+ (close-input-port input-port)
+ (log-inserter-info "Server Response: ~a~n~a" server-response (jsexpr->string data)))))))
(define (handle-packet packet)
(for ([field packet])