summaryrefslogtreecommitdiff
path: root/ebus-racket/inserter.rkt
blob: ab97ecf56a67afd5b8334ac2161e7391fb6c79b8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#! /usr/bin/env racket
#lang racket/base
(require racket/cmdline
	 racket/tcp
	 racket/stream
	 net/url
	 "ebus/layer7.rkt"
	 "util/tcp-repl.rkt"
	 "util/json.rkt")

(define-logger inserter)

(define connect-host? (make-parameter null))
(define connect-port? (make-parameter null))
(define insert? (make-parameter #f))
(define baseurl? (make-parameter "http://localhost:8000/sensor"))

;; Send field and value to database server
(define (insert-field sensor-name datatype offset value)
  (define type (cond ((member datatype (list "data1c" "data2b" "data2c")) "float")
		     ((member datatype (list "bit" "byte" "data1b" "word" "bcd")) "int")
		     ((member datatype (list "byteEnum")) "string")))
  (set! value (cond ((string=? type "float") (real->decimal-string value))
		    (else value)))
  (define response
    (read-line (put-pure-port 
		 (string->url (format "~a/~a" (baseurl?) sensor-name))
		 (string->bytes/utf-8 (format "value=~a&type=~a" value type)))))
  (define responseJson (string->jsexpr response))
  (cond ((eq? (json-null) (hash-ref responseJson 'error))
	      (log-inserter-debug "Successful insert: type=~a value=~a"
						   type value))
	(else (log-inserter-error "Error: type=~a value=~a ERROR:~a"
						   type value response))))

(define (handle-packet packet)
  (for ([field packet])
       (log-inserter-info "Field: ~a" field)
       (when (insert?)
         (with-handlers ([exn:fail? (lambda (exn)
                                      (log-inserter-error "Failed to insert ~a: ~a" field exn))]
			 [exn:fail:read? (lambda (exn)
                                      (log-inserter-error "TCP Read exception ~a" exn))]
			 [exn:fail:network? (lambda (exn)
                                      (log-inserter-error "TCP Exception ~a" exn))])
           (apply insert-field field)))))

(define-namespace-anchor repl-ns-anchor)

(define (main)
  ;; Parse commandline
  (command-line
   #:once-each
   [("-c" "--connect") host port "Connect to server <host> <port>"
    (connect-host? host)
    (connect-port? (string->number port))]
   ["--tcp-repl" port "Open REPL on TCP <port>"
    (tcp-repl-run (namespace-anchor->namespace repl-ns-anchor) (string->number port))]
   ["--insert" "Do Insert into Database"
    (insert? #t)]
   ["--baseurl" url "Database server http url"
    (baseurl? url)])

  ;; Connect, replacing input with tcp connection
  (if (or (null? (connect-host?)) (null? (connect-port?)))
      (log-inserter-info "Using stdin")
      (let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))])
	(log-inserter-info "Connected to ~s ~s ~n" (connect-host?) (connect-port?))
	(current-input-port cin)))

  ;; Process Ebus Packets
  (for ([packet (make-stream (current-input-port))])
       (when (not (or (void? packet) (eof-object? packet)))
	 (handle-packet packet))
       (when (eof-object? packet)
	 (exit 1))) )

(define (make-stream port)
  (stream-cons (with-handlers ([exn:fail? (lambda (exn)
					    (log-inserter-error "Failed to parse paket: ~a" exn)
					    (void))])
			      (layer7-read-ebus port))
	       (make-stream port)))

(exit (main))