blob: 8dc9fb83e0fa38f15cc617cbad77eacb2707231f (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
#! /usr/bin/env racket
#lang racket/base
(require racket/cmdline
racket/tcp
racket/match
"db-pq.rkt"
"layer7.rkt"
"layer2.rkt"
"tcp-repl.rkt")
(define logger (make-logger 'ebus-inserter (current-logger)))
(define connect-host? (make-parameter null))
(define connect-port? (make-parameter null))
(define loglevel-layer2? (make-parameter 'info))
(define loglevel-db? (make-parameter 'warning))
(define insert? (make-parameter #f))
(define (get-input-port)
(if (or (null? (connect-host?)) (null? (connect-port?)))
(begin
(log-message (current-logger) 'info "Using stdin" #t)
(current-input-port))
(let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))])
(log-message logger 'info (format "Connected to ~s ~s" (connect-host?) (connect-port?)) #t)
cin)))
(define (handle-fields fields)
(for ([field fields])
(log-message logger 'info (format "Field: ~a" field) #t)
(when (insert?)
(with-handlers ([exn:fail? (lambda (exn)
(log-message logger 'error (format "Failed to insert ~a: ~a" field exn) #t))])
(apply db-insert-field field)))))
(define (read-ebus-loop7)
(define input-port (get-input-port))
(define (reconnect)
(log-message logger 'warning "Reconnect - sleep 5sec" #t)
(sleep 5)
(log-message logger 'warning "Reconnect - now" #t)
(set! input-port (get-input-port)))
(let loop ()
(with-handlers ([exn:fail:read:eof?
(lambda (exn-eof)
(log-message logger 'error (format "EOF read: ~a" exn) #t)
(raise exn-eof))]; re-raise to prevent reconnect
[exn:fail:network?
(lambda (exn-network)
(log-message logger 'error (format "Network error: ~a" exn) #t)
(reconnect))]
[exn:fail? (lambda (exn)
(log-message logger 'error (format "Failed to parse paket: ~a" exn) #t))])
(let ([fields (layer7-read-ebus input-port)])
(when (eof-object? fields)
(raise
(make-exn:fail:read:eof "Read EOF from layer7"
(current-continuation-marks)
(list))))
(when (not (or (void? fields) (eof-object? fields)))
(handle-fields fields))))
(loop)))
;; Start Thread that observe all given log-receivers
(define (start-logger-thread receiver1 . receiverN)
(define receivers (cons receiver1 receiverN))
(void
(thread
(lambda ()
(let loop ()
(match (apply sync receivers)
[(vector level msg data)
(printf "[~s] ~a~n" level msg)
(flush-output)])
(loop))))))
;; Marks Namespace where TCP-REPL starts
(define-namespace-anchor repl-ns-anchor)
(define (main)
;; Parse commandline
(command-line
#:once-each
[("-c" "--connect") host port "Connect to server <host> <port>"
(connect-host? host)
(connect-port? (string->number port))]
["--tcp-repl" port "Open REPL on TCP <port>"
(tcp-repl-run (namespace-anchor->namespace repl-ns-anchor) (string->number port))]
["--debug-layer2" "Log level for Layer 2 Parser"
(loglevel-layer2? 'debug)]
["--debug-db" "Log level for DB"
(loglevel-db? 'debug)]
["--insert" "Do Insert into Database"
(insert? #t)]
["--db-user" user "Datanase User"
(db-con-user? user)]
["--db-password" password "Database password"
(db-con-password? password)]
["--db-database" database "Database database-name"
(db-con-database? database)]
["--db-server" server "Database Address/Server"
(db-con-server? server)])
;; Init Logging
(start-logger-thread (make-log-receiver logger 'info)
(make-log-receiver db-logger (loglevel-db?))
(make-log-receiver layer2-logger (loglevel-layer2?))
(make-log-receiver layer7-logger 'info)
(make-log-receiver tcp-repl-logger 'info))
;; Test Database Connection
(when (not (db-pgc-test))
(log-message logger 'fatal "Failed to connect to database" #t)
(exit 1))
;; Process Ebus Packets
(read-ebus-loop7))
(exit (main))
|