summaryrefslogtreecommitdiff
path: root/ebus-racket/inserter.rkt
diff options
context:
space:
mode:
authorEbus-at-dockstar <ebus@dockstar>2015-07-04 22:04:25 +0200
committerEbus-at-dockstar <ebus@dockstar>2015-07-04 22:04:25 +0200
commit07d19f2c5a8b312c497df2fba6f6d9f247df1701 (patch)
tree6a90a85935fd47c858b378a2819adb1d66ae4747 /ebus-racket/inserter.rkt
parentc7dc49bae1b0183389ad7ac40f96789d0a444a09 (diff)
downloadebus-alt-07d19f2c5a8b312c497df2fba6f6d9f247df1701.tar.gz
ebus-alt-07d19f2c5a8b312c497df2fba6f6d9f247df1701.zip
influx 0.9
Diffstat (limited to 'ebus-racket/inserter.rkt')
-rwxr-xr-xebus-racket/inserter.rkt23
1 files changed, 9 insertions, 14 deletions
diff --git a/ebus-racket/inserter.rkt b/ebus-racket/inserter.rkt
index 029688e..7adbe88 100755
--- a/ebus-racket/inserter.rkt
+++ b/ebus-racket/inserter.rkt
@@ -16,10 +16,10 @@
(define connect-port? (make-parameter null))
(define insert? (make-parameter #f))
(define baseurl? (make-parameter "http://localhost:8080/api/value"))
-(define influxurl? (make-parameter "http://db.2.localnet.cc:8001/write/ZQYZHSLWEIWCMXIYQSKAOCFCFWEVFK"))
+(define influxurl? (make-parameter "http://db.2.localnet.cc:8001/write/ZQYZHSLWEIWCMXIYQSKAOCFCFWEVFK?db=ebus"))
(define influx-queue (make-queue))
-(define influx-queue-size? (make-parameter 10))
+(define influx-queue-size? (make-parameter 0))
;; Send field and value to database server
@@ -44,22 +44,17 @@
(define (insert-influxdb sensor-name datatype value)
(if (layer7-ersatzwert? value)
(log-inserter-debug "Skipping Ersatzwert for ~a/~a" sensor-name datatype)
- (let ([point (hash 'name sensor-name
- 'precision "s"
- 'tags (hash 'type (symbol->string datatype))
- 'fields (hash 'value value
- 'type (symbol->string datatype)))])
+ (let ([point (format "~a,type=~a value=~a" sensor-name (symbol->string datatype) value)])
(enqueue! influx-queue point)
(log-inserter-debug (format "influxdb: ~a~n" (jsexpr->string point)))
(when (> (queue-length influx-queue) (influx-queue-size?))
- (let ([data (hash 'database "ebus"
- 'points (queue->list influx-queue))])
+ (let ([points (queue->list influx-queue)])
(log-inserter-info "Make bulk insert")
- (for-each (lambda (e) (dequeue! influx-queue)) (queue->list influx-queue))
- (define input-port (post-impure-port (string->url (influxurl?)) (string->bytes/utf-8 (jsexpr->string data))))
- (define server-response (read-line input-port))
- (close-input-port input-port)
- (log-inserter-info "Server Response: ~a~n~a" server-response (jsexpr->string data)))))))
+ (for-each (lambda (e) (dequeue! influx-queue)) (queue->list influx-queue)) ;; empty the queue
+ (define input-port (post-impure-port (string->url (influxurl?))
+ (string->bytes/utf-8 (string-join points (format "~n")))))
+ (log-inserter-info "Server Response: ~a~nData: ~a" (read-line input-port) (string-join points "|"))
+ (close-input-port input-port))))))
(define (handle-packet packet)
(for ([field packet])