From b5c5e835c64f0aa51387fca574b883a4228218e9 Mon Sep 17 00:00:00 2001 From: Random Hacker Date: Fri, 9 Mar 2012 12:57:09 +0100 Subject: ebus-racket: inserter-pq: reconnect --- ebus-racket/inserter-pq.rkt | 90 +++++++++++++++++++++++++++------------------ 1 file changed, 55 insertions(+), 35 deletions(-) (limited to 'ebus-racket/inserter-pq.rkt') diff --git a/ebus-racket/inserter-pq.rkt b/ebus-racket/inserter-pq.rkt index 85073f4..8dc9fb8 100755 --- a/ebus-racket/inserter-pq.rkt +++ b/ebus-racket/inserter-pq.rkt @@ -1,12 +1,12 @@ #! /usr/bin/env racket #lang racket/base (require racket/cmdline - racket/tcp - racket/match - "db-pq.rkt" - "layer7.rkt" - "layer2.rkt" - "tcp-repl.rkt") + racket/tcp + racket/match + "db-pq.rkt" + "layer7.rkt" + "layer2.rkt" + "tcp-repl.rkt") (define logger (make-logger 'ebus-inserter (current-logger))) @@ -16,6 +16,15 @@ (define loglevel-db? (make-parameter 'warning)) (define insert? (make-parameter #f)) +(define (get-input-port) + (if (or (null? (connect-host?)) (null? (connect-port?))) + (begin + (log-message (current-logger) 'info "Using stdin" #t) + (current-input-port)) + (let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))]) + (log-message logger 'info (format "Connected to ~s ~s" (connect-host?) (connect-port?)) #t) + cin))) + (define (handle-fields fields) (for ([field fields]) (log-message logger 'info (format "Field: ~a" field) #t) @@ -24,17 +33,35 @@ (log-message logger 'error (format "Failed to insert ~a: ~a" field exn) #t))]) (apply db-insert-field field))))) -(define (make-ebus-loop7 input-port) - (lambda () - (let loop () - (with-handlers ([exn:fail? (lambda (exn) - (log-message logger 'error (format "Failed to parse paket: ~a" exn) #t) - (loop) )]) - (let ([fields (layer7-read-ebus (current-input-port))]) - (when (not (or (void? fields) (eof-object? fields))) - (handle-fields fields)) - (when (not (eof-object? fields)) - (loop))))))) +(define (read-ebus-loop7) + (define input-port (get-input-port)) + + (define (reconnect) + (log-message logger 'warning "Reconnect - sleep 5sec" #t) + (sleep 5) + (log-message logger 'warning "Reconnect - now" #t) + (set! input-port (get-input-port))) + + (let loop () + (with-handlers ([exn:fail:read:eof? + (lambda (exn-eof) + (log-message logger 'error (format "EOF read: ~a" exn) #t) + (raise exn-eof))]; re-raise to prevent reconnect + [exn:fail:network? + (lambda (exn-network) + (log-message logger 'error (format "Network error: ~a" exn) #t) + (reconnect))] + [exn:fail? (lambda (exn) + (log-message logger 'error (format "Failed to parse paket: ~a" exn) #t))]) + (let ([fields (layer7-read-ebus input-port)]) + (when (eof-object? fields) + (raise + (make-exn:fail:read:eof "Read EOF from layer7" + (current-continuation-marks) + (list)))) + (when (not (or (void? fields) (eof-object? fields))) + (handle-fields fields)))) + (loop))) ;; Start Thread that observe all given log-receivers (define (start-logger-thread receiver1 . receiverN) @@ -43,12 +70,13 @@ (thread (lambda () (let loop () - (match (apply sync receivers) - [(vector level msg data) - (printf "[~s] ~a~n" level msg) - (flush-output)]) - (loop)))))) + (match (apply sync receivers) + [(vector level msg data) + (printf "[~s] ~a~n" level msg) + (flush-output)]) + (loop)))))) +;; Marks Namespace where TCP-REPL starts (define-namespace-anchor repl-ns-anchor) (define (main) @@ -77,25 +105,17 @@ ;; Init Logging (start-logger-thread (make-log-receiver logger 'info) - (make-log-receiver db-logger (loglevel-db?)) - (make-log-receiver layer2-logger (loglevel-layer2?)) - (make-log-receiver layer7-logger 'info) - (make-log-receiver tcp-repl-logger 'info)) + (make-log-receiver db-logger (loglevel-db?)) + (make-log-receiver layer2-logger (loglevel-layer2?)) + (make-log-receiver layer7-logger 'info) + (make-log-receiver tcp-repl-logger 'info)) ;; Test Database Connection (when (not (db-pgc-test)) (log-message logger 'fatal "Failed to connect to database" #t) (exit 1)) - ;; Connect, replacing input with tcp connection - (if (or (null? (connect-host?)) (null? (connect-port?))) - (log-message (current-logger) 'info "Using stdin" #t) - (let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))]) - (log-message logger 'info (format "Connected to ~s ~s ~n" (connect-host?) (connect-port?)) #t) - (current-input-port cin))) - ;; Process Ebus Packets - ((make-ebus-loop7 (current-input-port)) ) - ) + (read-ebus-loop7)) (exit (main)) -- cgit v1.2.1