blob: ab909117d390fb9a9b7d5d63cc47900591826a35 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
#! /usr/bin/env racket
#lang racket/base
(require racket/cmdline
racket/tcp
racket/match
"db.rkt"
"layer7.rkt"
"layer2.rkt"
"tcp-repl.rkt")
(define logger (make-logger 'ebus-inserter (current-logger)))
(define connect-host? (make-parameter null))
(define connect-port? (make-parameter null))
(define loglevel-layer2? (make-parameter 'info))
(define loglevel-db? (make-parameter 'warning))
(define insert? (make-parameter #f))
(define (handle-fields fields)
(for ([field fields])
(log-message logger 'info (format "Field: ~a" field) #t)
(when (insert?)
(with-handlers ([exn:fail? (lambda (exn)
(log-message logger 'error (format "Failed to insert ~a: ~a" field exn) #t))])
(apply db-insert-field field)))))
(define (make-ebus-loop7 input-port)
(lambda ()
(let loop ()
(with-handlers ([exn:fail? (lambda (exn)
(log-message logger 'error (format "Failed to parse paket: ~a" exn) #t))])
(let ([fields (layer7-read-ebus (current-input-port))])
(when (not (or (void? fields) (eof-object? fields)))
(handle-fields fields))
(when (not (eof-object? fields))
(loop)))))))
;; Start Thread that observe all given log-receivers
(define (start-logger-thread receiver1 . receiverN)
(define receivers (cons receiver1 receiverN))
(void
(thread
(lambda ()
(let loop ()
(match (apply sync receivers)
[(vector level msg data)
(printf "[~s] ~a~n" level msg)
(flush-output)])
(loop))))))
(define-namespace-anchor repl-ns-anchor)
(define (main)
;; Parse commandline
(command-line
#:once-each
[("-c" "--connect") host port "Connect to server <host> <port>"
(connect-host? host)
(connect-port? (string->number port))]
["--tcp-repl" port "Open REPL on TCP <port>"
(tcp-repl-run (namespace-anchor->namespace repl-ns-anchor) (string->number port))]
["--debug-layer2" "Log level for Layer 2 Parser"
(loglevel-layer2? 'debug)]
["--debug-db" "Log level for DB"
(loglevel-db? 'debug)]
["--insert" "Do Insert into Database"
(insert? #t)]
["--db-user" user "Datanase User"
(db-con-user? user)]
["--db-password" password "Database password"
(db-con-password? password)]
["--db-database" database "Database database-name"
(db-con-database? database)]
["--db-server" server "Database Address/Server"
(db-con-server? server)])
;; Init Logging
(start-logger-thread (make-log-receiver logger 'info)
(make-log-receiver db-logger (loglevel-db?))
(make-log-receiver layer2-logger (loglevel-layer2?))
(make-log-receiver layer7-logger 'info)
(make-log-receiver tcp-repl-logger 'info))
;; Test Database Connection
(when (not (db-pgc-test))
(log-message logger 'fatal "Failed to connect to database" #t)
(exit 1))
;; Connect, replacing input with tcp connection
(if (or (null? (connect-host?)) (null? (connect-port?)))
(log-message (current-logger) 'info "Using stdin" #t)
(let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))])
(log-message logger 'info (format "Connected to ~s ~s ~n" (connect-host?) (connect-port?)) #t)
(current-input-port cin)))
;; Process Ebus Packets
((make-ebus-loop7 (current-input-port)) )
)
(exit (main))
|