diff options
author | Ebus-at-dockstar <ebus@dockstar> | 2015-05-10 21:47:10 +0200 |
---|---|---|
committer | Ebus-at-dockstar <ebus@dockstar> | 2015-05-10 21:47:10 +0200 |
commit | c7dc49bae1b0183389ad7ac40f96789d0a444a09 (patch) | |
tree | b1212e827d8fcf3fdc25a0a05f615f184bb20ee9 /ebus-racket | |
parent | a925e9b9438f2d6b9c00ab84c3c1a4089fef2d28 (diff) | |
download | ebus-alt-c7dc49bae1b0183389ad7ac40f96789d0a444a09.tar.gz ebus-alt-c7dc49bae1b0183389ad7ac40f96789d0a444a09.zip |
queue 10 pakets before sending to influxdb
Diffstat (limited to 'ebus-racket')
-rwxr-xr-x | ebus-racket/inserter.rkt | 33 |
1 files changed, 22 insertions, 11 deletions
diff --git a/ebus-racket/inserter.rkt b/ebus-racket/inserter.rkt index c068e1c..029688e 100755 --- a/ebus-racket/inserter.rkt +++ b/ebus-racket/inserter.rkt @@ -4,6 +4,7 @@ racket/tcp racket/stream racket/string + data/queue net/url "ebus/layer7.rkt" "util/tcp-repl.rkt" @@ -17,11 +18,14 @@ (define baseurl? (make-parameter "http://localhost:8080/api/value")) (define influxurl? (make-parameter "http://db.2.localnet.cc:8001/write/ZQYZHSLWEIWCMXIYQSKAOCFCFWEVFK")) +(define influx-queue (make-queue)) +(define influx-queue-size? (make-parameter 10)) + ;; Send field and value to database server (define (insert-field sensor-name datatype value) (if (layer7-ersatzwert? value) - (log-inserter-info "Skipping Ersatzwert for ~a/~a" sensor-name datatype) + (log-inserter-debug "Skipping Ersatzwert for ~a/~a" sensor-name datatype) (let () (define url (format "~a/~a" (baseurl?) sensor-name)) (define raw-value @@ -39,16 +43,23 @@ (define (insert-influxdb sensor-name datatype value) (if (layer7-ersatzwert? value) - (log-inserter-info "Skipping Ersatzwert for ~a/~a" sensor-name datatype) - (let ([data (hash 'database "ebus" - 'points (list (hash 'name sensor-name - 'fields (hash 'value value - 'type (symbol->string datatype)))))]) - (log-inserter-info (format "influxdb: ~a~n" (jsexpr->string data))) - (define input-port (post-impure-port (string->url (influxurl?)) (string->bytes/utf-8 (jsexpr->string data)))) - (define server-response (read-line input-port)) - (close-input-port input-port) - (log-inserter-info "Response: ~a" server-response)))) + (log-inserter-debug "Skipping Ersatzwert for ~a/~a" sensor-name datatype) + (let ([point (hash 'name sensor-name + 'precision "s" + 'tags (hash 'type (symbol->string datatype)) + 'fields (hash 'value value + 'type (symbol->string datatype)))]) + (enqueue! influx-queue point) + (log-inserter-debug (format "influxdb: ~a~n" (jsexpr->string point))) + (when (> (queue-length influx-queue) (influx-queue-size?)) + (let ([data (hash 'database "ebus" + 'points (queue->list influx-queue))]) + (log-inserter-info "Make bulk insert") + (for-each (lambda (e) (dequeue! influx-queue)) (queue->list influx-queue)) + (define input-port (post-impure-port (string->url (influxurl?)) (string->bytes/utf-8 (jsexpr->string data)))) + (define server-response (read-line input-port)) + (close-input-port input-port) + (log-inserter-info "Server Response: ~a~n~a" server-response (jsexpr->string data))))))) (define (handle-packet packet) (for ([field packet]) |