summaryrefslogtreecommitdiff
path: root/ebus-racket/inserter.rkt
blob: a5a7ac8c5e357148114970cbb09b72b37f3285a7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#! /usr/bin/env racket
#lang racket/base
(require racket/cmdline
	 racket/stream
	 racket/tcp
	 net/url
	 "ebus/layer7.rkt"
	 "util/json.rkt")

(define logger (make-logger 'ebus-inserter (current-logger)))

(define connect-host? (make-parameter null))
(define connect-port? (make-parameter null))
(define insert? (make-parameter #f))
(define baseurl? (make-parameter "http://localhost:8080/api"))

;; Send field and value to database server
(define (insert-field sensor-name datatype offset value)
  (define type (cond ((member datatype (list "data1c" "data2b" "data2c")) "float")
		     ((member datatype (list "bit" "byte" "data1b" "word" "bcd")) "int")
		     ((member datatype (list "byteEnum")) "string")))
  (set! value (cond ((string=? type "float") (real->decimal-string value))
		    ((string=? type "int") (number->string value))
		    (else value)))
  (put-pure-port 
    (string->url (format "~a/value/~a" (baseurl?) sensor-name))
    (string->bytes/utf-8 value)))

(define (handle-packet packet)

  (for ([field packet])
       (log-message logger 'info (format "Field: ~a" field) #t)
       (when (insert?)
         (with-handlers ([exn:fail? (lambda (exn)
                                      (log-message logger 'error (format "Failed to insert ~a: ~a" field exn) #t))]
			 [exn:fail:read? (lambda (exn)
                                      (log-message logger 'error (format "TCP Read exception ~a" exn) #t))]
			 [exn:fail:network? (lambda (exn)
                                      (log-message logger 'error (format "TCP Exception ~a" exn) #t))])
           (apply insert-field field)))))

(define-namespace-anchor repl-ns-anchor)

(define (main)
  ;; Parse commandline
  (command-line
   #:once-each
   [("-c" "--connect") host port "Connect to server <host> <port>"
    (connect-host? host)
    (connect-port? (string->number port))]
   ["--insert" "Do Insert into Database"
    (insert? #t)]
   ["--baseurl" url "Database server http url"
    (baseurl? url)])

  ;; Connect, replacing input with tcp connection
  (if (or (null? (connect-host?)) (null? (connect-port?)))
      (log-message (current-logger) 'info "Using stdin" #t)
      (let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))])
	(log-message logger 'info (format "Connected to ~s ~s ~n" (connect-host?) (connect-port?)) #t)
	(current-input-port cin)))

  ;; Process Ebus Packets
  (for ([packet (make-stream (current-input-port))])
       (when (not (or (void? packet) (eof-object? packet)))
	 (handle-packet packet))
       (when (eof-object? packet)
	 (exit 1))) )

(define (make-stream port)
  (stream-cons (with-handlers ([exn:fail? (lambda (exn)
					    (log-message logger 'error (format "Failed to parse paket: ~a" exn) #t)
					    (void))])
			      (layer7-read-ebus port))
	       (make-stream port)))

(exit (main))