diff options
Diffstat (limited to 'ebus-racket/inserter-pq.rkt')
-rwxr-xr-x | ebus-racket/inserter-pq.rkt | 121 |
1 files changed, 0 insertions, 121 deletions
diff --git a/ebus-racket/inserter-pq.rkt b/ebus-racket/inserter-pq.rkt deleted file mode 100755 index 8dc9fb8..0000000 --- a/ebus-racket/inserter-pq.rkt +++ /dev/null @@ -1,121 +0,0 @@ -#! /usr/bin/env racket -#lang racket/base -(require racket/cmdline - racket/tcp - racket/match - "db-pq.rkt" - "layer7.rkt" - "layer2.rkt" - "tcp-repl.rkt") - -(define logger (make-logger 'ebus-inserter (current-logger))) - -(define connect-host? (make-parameter null)) -(define connect-port? (make-parameter null)) -(define loglevel-layer2? (make-parameter 'info)) -(define loglevel-db? (make-parameter 'warning)) -(define insert? (make-parameter #f)) - -(define (get-input-port) - (if (or (null? (connect-host?)) (null? (connect-port?))) - (begin - (log-message (current-logger) 'info "Using stdin" #t) - (current-input-port)) - (let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))]) - (log-message logger 'info (format "Connected to ~s ~s" (connect-host?) (connect-port?)) #t) - cin))) - -(define (handle-fields fields) - (for ([field fields]) - (log-message logger 'info (format "Field: ~a" field) #t) - (when (insert?) - (with-handlers ([exn:fail? (lambda (exn) - (log-message logger 'error (format "Failed to insert ~a: ~a" field exn) #t))]) - (apply db-insert-field field))))) - -(define (read-ebus-loop7) - (define input-port (get-input-port)) - - (define (reconnect) - (log-message logger 'warning "Reconnect - sleep 5sec" #t) - (sleep 5) - (log-message logger 'warning "Reconnect - now" #t) - (set! input-port (get-input-port))) - - (let loop () - (with-handlers ([exn:fail:read:eof? - (lambda (exn-eof) - (log-message logger 'error (format "EOF read: ~a" exn) #t) - (raise exn-eof))]; re-raise to prevent reconnect - [exn:fail:network? - (lambda (exn-network) - (log-message logger 'error (format "Network error: ~a" exn) #t) - (reconnect))] - [exn:fail? (lambda (exn) - (log-message logger 'error (format "Failed to parse paket: ~a" exn) #t))]) - (let ([fields (layer7-read-ebus input-port)]) - (when (eof-object? fields) - (raise - (make-exn:fail:read:eof "Read EOF from layer7" - (current-continuation-marks) - (list)))) - (when (not (or (void? fields) (eof-object? fields))) - (handle-fields fields)))) - (loop))) - -;; Start Thread that observe all given log-receivers -(define (start-logger-thread receiver1 . receiverN) - (define receivers (cons receiver1 receiverN)) - (void - (thread - (lambda () - (let loop () - (match (apply sync receivers) - [(vector level msg data) - (printf "[~s] ~a~n" level msg) - (flush-output)]) - (loop)))))) - -;; Marks Namespace where TCP-REPL starts -(define-namespace-anchor repl-ns-anchor) - -(define (main) - ;; Parse commandline - (command-line - #:once-each - [("-c" "--connect") host port "Connect to server <host> <port>" - (connect-host? host) - (connect-port? (string->number port))] - ["--tcp-repl" port "Open REPL on TCP <port>" - (tcp-repl-run (namespace-anchor->namespace repl-ns-anchor) (string->number port))] - ["--debug-layer2" "Log level for Layer 2 Parser" - (loglevel-layer2? 'debug)] - ["--debug-db" "Log level for DB" - (loglevel-db? 'debug)] - ["--insert" "Do Insert into Database" - (insert? #t)] - ["--db-user" user "Datanase User" - (db-con-user? user)] - ["--db-password" password "Database password" - (db-con-password? password)] - ["--db-database" database "Database database-name" - (db-con-database? database)] - ["--db-server" server "Database Address/Server" - (db-con-server? server)]) - - ;; Init Logging - (start-logger-thread (make-log-receiver logger 'info) - (make-log-receiver db-logger (loglevel-db?)) - (make-log-receiver layer2-logger (loglevel-layer2?)) - (make-log-receiver layer7-logger 'info) - (make-log-receiver tcp-repl-logger 'info)) - - ;; Test Database Connection - (when (not (db-pgc-test)) - (log-message logger 'fatal "Failed to connect to database" #t) - (exit 1)) - - ;; Process Ebus Packets - (read-ebus-loop7)) - -(exit (main)) |