diff options
author | Ebus-at-dockstar <ebus@dockstar> | 2015-07-04 22:04:25 +0200 |
---|---|---|
committer | Ebus-at-dockstar <ebus@dockstar> | 2015-07-04 22:04:25 +0200 |
commit | 07d19f2c5a8b312c497df2fba6f6d9f247df1701 (patch) | |
tree | 6a90a85935fd47c858b378a2819adb1d66ae4747 /ebus-racket/inserter.rkt | |
parent | c7dc49bae1b0183389ad7ac40f96789d0a444a09 (diff) | |
download | ebus-alt-07d19f2c5a8b312c497df2fba6f6d9f247df1701.tar.gz ebus-alt-07d19f2c5a8b312c497df2fba6f6d9f247df1701.zip |
influx 0.9
Diffstat (limited to 'ebus-racket/inserter.rkt')
-rwxr-xr-x | ebus-racket/inserter.rkt | 23 |
1 files changed, 9 insertions, 14 deletions
diff --git a/ebus-racket/inserter.rkt b/ebus-racket/inserter.rkt index 029688e..7adbe88 100755 --- a/ebus-racket/inserter.rkt +++ b/ebus-racket/inserter.rkt @@ -16,10 +16,10 @@ (define connect-port? (make-parameter null)) (define insert? (make-parameter #f)) (define baseurl? (make-parameter "http://localhost:8080/api/value")) -(define influxurl? (make-parameter "http://db.2.localnet.cc:8001/write/ZQYZHSLWEIWCMXIYQSKAOCFCFWEVFK")) +(define influxurl? (make-parameter "http://db.2.localnet.cc:8001/write/ZQYZHSLWEIWCMXIYQSKAOCFCFWEVFK?db=ebus")) (define influx-queue (make-queue)) -(define influx-queue-size? (make-parameter 10)) +(define influx-queue-size? (make-parameter 0)) ;; Send field and value to database server @@ -44,22 +44,17 @@ (define (insert-influxdb sensor-name datatype value) (if (layer7-ersatzwert? value) (log-inserter-debug "Skipping Ersatzwert for ~a/~a" sensor-name datatype) - (let ([point (hash 'name sensor-name - 'precision "s" - 'tags (hash 'type (symbol->string datatype)) - 'fields (hash 'value value - 'type (symbol->string datatype)))]) + (let ([point (format "~a,type=~a value=~a" sensor-name (symbol->string datatype) value)]) (enqueue! influx-queue point) (log-inserter-debug (format "influxdb: ~a~n" (jsexpr->string point))) (when (> (queue-length influx-queue) (influx-queue-size?)) - (let ([data (hash 'database "ebus" - 'points (queue->list influx-queue))]) + (let ([points (queue->list influx-queue)]) (log-inserter-info "Make bulk insert") - (for-each (lambda (e) (dequeue! influx-queue)) (queue->list influx-queue)) - (define input-port (post-impure-port (string->url (influxurl?)) (string->bytes/utf-8 (jsexpr->string data)))) - (define server-response (read-line input-port)) - (close-input-port input-port) - (log-inserter-info "Server Response: ~a~n~a" server-response (jsexpr->string data))))))) + (for-each (lambda (e) (dequeue! influx-queue)) (queue->list influx-queue)) ;; empty the queue + (define input-port (post-impure-port (string->url (influxurl?)) + (string->bytes/utf-8 (string-join points (format "~n"))))) + (log-inserter-info "Server Response: ~a~nData: ~a" (read-line input-port) (string-join points "|")) + (close-input-port input-port)))))) (define (handle-packet packet) (for ([field packet]) |