summaryrefslogtreecommitdiff
path: root/ebus-racket/inserter-pq.rkt
diff options
context:
space:
mode:
Diffstat (limited to 'ebus-racket/inserter-pq.rkt')
-rwxr-xr-xebus-racket/inserter-pq.rkt121
1 files changed, 0 insertions, 121 deletions
diff --git a/ebus-racket/inserter-pq.rkt b/ebus-racket/inserter-pq.rkt
deleted file mode 100755
index 8dc9fb8..0000000
--- a/ebus-racket/inserter-pq.rkt
+++ /dev/null
@@ -1,121 +0,0 @@
-#! /usr/bin/env racket
-#lang racket/base
-(require racket/cmdline
- racket/tcp
- racket/match
- "db-pq.rkt"
- "layer7.rkt"
- "layer2.rkt"
- "tcp-repl.rkt")
-
-(define logger (make-logger 'ebus-inserter (current-logger)))
-
-(define connect-host? (make-parameter null))
-(define connect-port? (make-parameter null))
-(define loglevel-layer2? (make-parameter 'info))
-(define loglevel-db? (make-parameter 'warning))
-(define insert? (make-parameter #f))
-
-(define (get-input-port)
- (if (or (null? (connect-host?)) (null? (connect-port?)))
- (begin
- (log-message (current-logger) 'info "Using stdin" #t)
- (current-input-port))
- (let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))])
- (log-message logger 'info (format "Connected to ~s ~s" (connect-host?) (connect-port?)) #t)
- cin)))
-
-(define (handle-fields fields)
- (for ([field fields])
- (log-message logger 'info (format "Field: ~a" field) #t)
- (when (insert?)
- (with-handlers ([exn:fail? (lambda (exn)
- (log-message logger 'error (format "Failed to insert ~a: ~a" field exn) #t))])
- (apply db-insert-field field)))))
-
-(define (read-ebus-loop7)
- (define input-port (get-input-port))
-
- (define (reconnect)
- (log-message logger 'warning "Reconnect - sleep 5sec" #t)
- (sleep 5)
- (log-message logger 'warning "Reconnect - now" #t)
- (set! input-port (get-input-port)))
-
- (let loop ()
- (with-handlers ([exn:fail:read:eof?
- (lambda (exn-eof)
- (log-message logger 'error (format "EOF read: ~a" exn) #t)
- (raise exn-eof))]; re-raise to prevent reconnect
- [exn:fail:network?
- (lambda (exn-network)
- (log-message logger 'error (format "Network error: ~a" exn) #t)
- (reconnect))]
- [exn:fail? (lambda (exn)
- (log-message logger 'error (format "Failed to parse paket: ~a" exn) #t))])
- (let ([fields (layer7-read-ebus input-port)])
- (when (eof-object? fields)
- (raise
- (make-exn:fail:read:eof "Read EOF from layer7"
- (current-continuation-marks)
- (list))))
- (when (not (or (void? fields) (eof-object? fields)))
- (handle-fields fields))))
- (loop)))
-
-;; Start Thread that observe all given log-receivers
-(define (start-logger-thread receiver1 . receiverN)
- (define receivers (cons receiver1 receiverN))
- (void
- (thread
- (lambda ()
- (let loop ()
- (match (apply sync receivers)
- [(vector level msg data)
- (printf "[~s] ~a~n" level msg)
- (flush-output)])
- (loop))))))
-
-;; Marks Namespace where TCP-REPL starts
-(define-namespace-anchor repl-ns-anchor)
-
-(define (main)
- ;; Parse commandline
- (command-line
- #:once-each
- [("-c" "--connect") host port "Connect to server <host> <port>"
- (connect-host? host)
- (connect-port? (string->number port))]
- ["--tcp-repl" port "Open REPL on TCP <port>"
- (tcp-repl-run (namespace-anchor->namespace repl-ns-anchor) (string->number port))]
- ["--debug-layer2" "Log level for Layer 2 Parser"
- (loglevel-layer2? 'debug)]
- ["--debug-db" "Log level for DB"
- (loglevel-db? 'debug)]
- ["--insert" "Do Insert into Database"
- (insert? #t)]
- ["--db-user" user "Datanase User"
- (db-con-user? user)]
- ["--db-password" password "Database password"
- (db-con-password? password)]
- ["--db-database" database "Database database-name"
- (db-con-database? database)]
- ["--db-server" server "Database Address/Server"
- (db-con-server? server)])
-
- ;; Init Logging
- (start-logger-thread (make-log-receiver logger 'info)
- (make-log-receiver db-logger (loglevel-db?))
- (make-log-receiver layer2-logger (loglevel-layer2?))
- (make-log-receiver layer7-logger 'info)
- (make-log-receiver tcp-repl-logger 'info))
-
- ;; Test Database Connection
- (when (not (db-pgc-test))
- (log-message logger 'fatal "Failed to connect to database" #t)
- (exit 1))
-
- ;; Process Ebus Packets
- (read-ebus-loop7))
-
-(exit (main))