summaryrefslogtreecommitdiff
path: root/ebus-racket
diff options
context:
space:
mode:
authorRandom Hacker <random_hacker@xapek.org>2012-03-09 12:57:09 +0100
committerRandom Hacker <random_hacker@xapek.org>2012-03-09 12:57:09 +0100
commitb5c5e835c64f0aa51387fca574b883a4228218e9 (patch)
treef5adc9a5c5dea195cd2b2f68cfa55bc83fc8df1e /ebus-racket
parent6dd26ef3f562d551216420b982e9f9dfc2340bd0 (diff)
downloadebus-alt-b5c5e835c64f0aa51387fca574b883a4228218e9.tar.gz
ebus-alt-b5c5e835c64f0aa51387fca574b883a4228218e9.zip
ebus-racket: inserter-pq: reconnect
Diffstat (limited to 'ebus-racket')
-rwxr-xr-xebus-racket/inserter-pq.rkt90
1 files changed, 55 insertions, 35 deletions
diff --git a/ebus-racket/inserter-pq.rkt b/ebus-racket/inserter-pq.rkt
index 85073f4..8dc9fb8 100755
--- a/ebus-racket/inserter-pq.rkt
+++ b/ebus-racket/inserter-pq.rkt
@@ -1,12 +1,12 @@
#! /usr/bin/env racket
#lang racket/base
(require racket/cmdline
- racket/tcp
- racket/match
- "db-pq.rkt"
- "layer7.rkt"
- "layer2.rkt"
- "tcp-repl.rkt")
+ racket/tcp
+ racket/match
+ "db-pq.rkt"
+ "layer7.rkt"
+ "layer2.rkt"
+ "tcp-repl.rkt")
(define logger (make-logger 'ebus-inserter (current-logger)))
@@ -16,6 +16,15 @@
(define loglevel-db? (make-parameter 'warning))
(define insert? (make-parameter #f))
+(define (get-input-port)
+ (if (or (null? (connect-host?)) (null? (connect-port?)))
+ (begin
+ (log-message (current-logger) 'info "Using stdin" #t)
+ (current-input-port))
+ (let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))])
+ (log-message logger 'info (format "Connected to ~s ~s" (connect-host?) (connect-port?)) #t)
+ cin)))
+
(define (handle-fields fields)
(for ([field fields])
(log-message logger 'info (format "Field: ~a" field) #t)
@@ -24,17 +33,35 @@
(log-message logger 'error (format "Failed to insert ~a: ~a" field exn) #t))])
(apply db-insert-field field)))))
-(define (make-ebus-loop7 input-port)
- (lambda ()
- (let loop ()
- (with-handlers ([exn:fail? (lambda (exn)
- (log-message logger 'error (format "Failed to parse paket: ~a" exn) #t)
- (loop) )])
- (let ([fields (layer7-read-ebus (current-input-port))])
- (when (not (or (void? fields) (eof-object? fields)))
- (handle-fields fields))
- (when (not (eof-object? fields))
- (loop)))))))
+(define (read-ebus-loop7)
+ (define input-port (get-input-port))
+
+ (define (reconnect)
+ (log-message logger 'warning "Reconnect - sleep 5sec" #t)
+ (sleep 5)
+ (log-message logger 'warning "Reconnect - now" #t)
+ (set! input-port (get-input-port)))
+
+ (let loop ()
+ (with-handlers ([exn:fail:read:eof?
+ (lambda (exn-eof)
+ (log-message logger 'error (format "EOF read: ~a" exn) #t)
+ (raise exn-eof))]; re-raise to prevent reconnect
+ [exn:fail:network?
+ (lambda (exn-network)
+ (log-message logger 'error (format "Network error: ~a" exn) #t)
+ (reconnect))]
+ [exn:fail? (lambda (exn)
+ (log-message logger 'error (format "Failed to parse paket: ~a" exn) #t))])
+ (let ([fields (layer7-read-ebus input-port)])
+ (when (eof-object? fields)
+ (raise
+ (make-exn:fail:read:eof "Read EOF from layer7"
+ (current-continuation-marks)
+ (list))))
+ (when (not (or (void? fields) (eof-object? fields)))
+ (handle-fields fields))))
+ (loop)))
;; Start Thread that observe all given log-receivers
(define (start-logger-thread receiver1 . receiverN)
@@ -43,12 +70,13 @@
(thread
(lambda ()
(let loop ()
- (match (apply sync receivers)
- [(vector level msg data)
- (printf "[~s] ~a~n" level msg)
- (flush-output)])
- (loop))))))
+ (match (apply sync receivers)
+ [(vector level msg data)
+ (printf "[~s] ~a~n" level msg)
+ (flush-output)])
+ (loop))))))
+;; Marks Namespace where TCP-REPL starts
(define-namespace-anchor repl-ns-anchor)
(define (main)
@@ -77,25 +105,17 @@
;; Init Logging
(start-logger-thread (make-log-receiver logger 'info)
- (make-log-receiver db-logger (loglevel-db?))
- (make-log-receiver layer2-logger (loglevel-layer2?))
- (make-log-receiver layer7-logger 'info)
- (make-log-receiver tcp-repl-logger 'info))
+ (make-log-receiver db-logger (loglevel-db?))
+ (make-log-receiver layer2-logger (loglevel-layer2?))
+ (make-log-receiver layer7-logger 'info)
+ (make-log-receiver tcp-repl-logger 'info))
;; Test Database Connection
(when (not (db-pgc-test))
(log-message logger 'fatal "Failed to connect to database" #t)
(exit 1))
- ;; Connect, replacing input with tcp connection
- (if (or (null? (connect-host?)) (null? (connect-port?)))
- (log-message (current-logger) 'info "Using stdin" #t)
- (let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))])
- (log-message logger 'info (format "Connected to ~s ~s ~n" (connect-host?) (connect-port?)) #t)
- (current-input-port cin)))
-
;; Process Ebus Packets
- ((make-ebus-loop7 (current-input-port)) )
- )
+ (read-ebus-loop7))
(exit (main))