#! /usr/bin/env racket #lang racket/base (require racket/cmdline racket/tcp racket/match "db-pq.rkt" "layer7.rkt" "layer2.rkt" "tcp-repl.rkt") (define logger (make-logger 'ebus-inserter (current-logger))) (define connect-host? (make-parameter null)) (define connect-port? (make-parameter null)) (define loglevel-layer2? (make-parameter 'info)) (define loglevel-db? (make-parameter 'warning)) (define insert? (make-parameter #f)) (define (get-input-port) (if (or (null? (connect-host?)) (null? (connect-port?))) (begin (log-message (current-logger) 'info "Using stdin" #t) (current-input-port)) (let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))]) (log-message logger 'info (format "Connected to ~s ~s" (connect-host?) (connect-port?)) #t) cin))) (define (handle-fields fields) (for ([field fields]) (log-message logger 'info (format "Field: ~a" field) #t) (when (insert?) (with-handlers ([exn:fail? (lambda (exn) (log-message logger 'error (format "Failed to insert ~a: ~a" field exn) #t))]) (apply db-insert-field field))))) (define (read-ebus-loop7) (define input-port (get-input-port)) (define (reconnect) (log-message logger 'warning "Reconnect - sleep 5sec" #t) (sleep 5) (log-message logger 'warning "Reconnect - now" #t) (set! input-port (get-input-port))) (let loop () (with-handlers ([exn:fail:read:eof? (lambda (exn-eof) (log-message logger 'error (format "EOF read: ~a" exn) #t) (raise exn-eof))]; re-raise to prevent reconnect [exn:fail:network? (lambda (exn-network) (log-message logger 'error (format "Network error: ~a" exn) #t) (reconnect))] [exn:fail? (lambda (exn) (log-message logger 'error (format "Failed to parse paket: ~a" exn) #t))]) (let ([fields (layer7-read-ebus input-port)]) (when (eof-object? fields) (raise (make-exn:fail:read:eof "Read EOF from layer7" (current-continuation-marks) (list)))) (when (not (or (void? fields) (eof-object? fields))) (handle-fields fields)))) (loop))) ;; Start Thread that observe all given log-receivers (define (start-logger-thread receiver1 . receiverN) (define receivers (cons receiver1 receiverN)) (void (thread (lambda () (let loop () (match (apply sync receivers) [(vector level msg data) (printf "[~s] ~a~n" level msg) (flush-output)]) (loop)))))) ;; Marks Namespace where TCP-REPL starts (define-namespace-anchor repl-ns-anchor) (define (main) ;; Parse commandline (command-line #:once-each [("-c" "--connect") host port "Connect to server " (connect-host? host) (connect-port? (string->number port))] ["--tcp-repl" port "Open REPL on TCP " (tcp-repl-run (namespace-anchor->namespace repl-ns-anchor) (string->number port))] ["--debug-layer2" "Log level for Layer 2 Parser" (loglevel-layer2? 'debug)] ["--debug-db" "Log level for DB" (loglevel-db? 'debug)] ["--insert" "Do Insert into Database" (insert? #t)] ["--db-user" user "Datanase User" (db-con-user? user)] ["--db-password" password "Database password" (db-con-password? password)] ["--db-database" database "Database database-name" (db-con-database? database)] ["--db-server" server "Database Address/Server" (db-con-server? server)]) ;; Init Logging (start-logger-thread (make-log-receiver logger 'info) (make-log-receiver db-logger (loglevel-db?)) (make-log-receiver layer2-logger (loglevel-layer2?)) (make-log-receiver layer7-logger 'info) (make-log-receiver tcp-repl-logger 'info)) ;; Test Database Connection (when (not (db-pgc-test)) (log-message logger 'fatal "Failed to connect to database" #t) (exit 1)) ;; Process Ebus Packets (read-ebus-loop7)) (exit (main))