summaryrefslogtreecommitdiff
path: root/ebus-racket/inserter-pq.rkt
blob: 8dc9fb83e0fa38f15cc617cbad77eacb2707231f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#! /usr/bin/env racket
#lang racket/base
(require racket/cmdline
         racket/tcp
         racket/match
         "db-pq.rkt"
         "layer7.rkt"
         "layer2.rkt"
         "tcp-repl.rkt")

(define logger (make-logger 'ebus-inserter (current-logger)))

(define connect-host? (make-parameter null))
(define connect-port? (make-parameter null))
(define loglevel-layer2? (make-parameter 'info))
(define loglevel-db? (make-parameter 'warning))
(define insert? (make-parameter #f))

(define (get-input-port)
  (if (or (null? (connect-host?)) (null? (connect-port?)))
    (begin
      (log-message (current-logger) 'info "Using stdin" #t)
      (current-input-port))
    (let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))])
                (log-message logger 'info (format "Connected to ~s ~s" (connect-host?) (connect-port?)) #t)
                cin)))

(define (handle-fields fields)
  (for ([field fields])
       (log-message logger 'info (format "Field: ~a" field) #t)
       (when (insert?)
         (with-handlers ([exn:fail? (lambda (exn)
                                      (log-message logger 'error (format "Failed to insert ~a: ~a" field exn) #t))])
           (apply db-insert-field field)))))

(define (read-ebus-loop7)
  (define input-port (get-input-port))

  (define (reconnect)
    (log-message logger 'warning "Reconnect - sleep 5sec" #t)
    (sleep 5)
    (log-message logger 'warning "Reconnect - now" #t)
    (set! input-port (get-input-port)))
      
  (let loop ()
    (with-handlers ([exn:fail:read:eof? 
                      (lambda (exn-eof)
                        (log-message logger 'error (format "EOF read: ~a" exn) #t)
                        (raise exn-eof))]; re-raise to prevent reconnect
                    [exn:fail:network? 
                      (lambda (exn-network)
                        (log-message logger 'error (format "Network error: ~a" exn) #t)
                        (reconnect))]
                    [exn:fail? (lambda (exn)
                                 (log-message logger 'error (format "Failed to parse paket: ~a" exn) #t))])
      (let ([fields (layer7-read-ebus input-port)])
        (when (eof-object? fields)
          (raise
            (make-exn:fail:read:eof "Read EOF from layer7" 
                                    (current-continuation-marks)
                                    (list))))
        (when (not (or (void? fields) (eof-object? fields)))
          (handle-fields fields))))
    (loop)))

;; Start Thread that observe all given log-receivers
(define (start-logger-thread receiver1 . receiverN)
  (define receivers (cons receiver1 receiverN))
  (void 
   (thread
    (lambda ()
      (let loop ()
        (match (apply sync receivers)
          [(vector level msg data)
           (printf "[~s] ~a~n" level msg)
           (flush-output)])
        (loop))))))

;; Marks Namespace where TCP-REPL starts
(define-namespace-anchor repl-ns-anchor)

(define (main)
  ;; Parse commandline
  (command-line
   #:once-each
   [("-c" "--connect") host port "Connect to server <host> <port>"
    (connect-host? host)
    (connect-port? (string->number port))]
   ["--tcp-repl" port "Open REPL on TCP <port>"
    (tcp-repl-run (namespace-anchor->namespace repl-ns-anchor) (string->number port))]
   ["--debug-layer2" "Log level for Layer 2 Parser"
    (loglevel-layer2? 'debug)]
   ["--debug-db" "Log level for DB" 
    (loglevel-db? 'debug)]
   ["--insert" "Do Insert into Database"
    (insert? #t)]
   ["--db-user" user "Datanase User"
    (db-con-user? user)]
   ["--db-password" password "Database password"
    (db-con-password? password)]
   ["--db-database" database "Database database-name"
    (db-con-database? database)]
   ["--db-server" server  "Database Address/Server"
    (db-con-server? server)])

  ;; Init Logging
  (start-logger-thread (make-log-receiver logger 'info)
                       (make-log-receiver db-logger (loglevel-db?))
                       (make-log-receiver layer2-logger (loglevel-layer2?))
                       (make-log-receiver layer7-logger 'info)
                       (make-log-receiver tcp-repl-logger 'info))

  ;; Test Database Connection
  (when (not (db-pgc-test))
    (log-message logger 'fatal "Failed to connect to database" #t)
    (exit 1))

  ;; Process Ebus Packets
  (read-ebus-loop7))

(exit (main))