summaryrefslogtreecommitdiff
path: root/ebus-racket/inserter-pq.rkt
blob: 85073f42ac38b2339460ebf66333ea4f3d77b561 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#! /usr/bin/env racket
#lang racket/base
(require racket/cmdline
	 racket/tcp
	 racket/match
	 "db-pq.rkt"
	 "layer7.rkt"
	 "layer2.rkt"
	 "tcp-repl.rkt")

(define logger (make-logger 'ebus-inserter (current-logger)))

(define connect-host? (make-parameter null))
(define connect-port? (make-parameter null))
(define loglevel-layer2? (make-parameter 'info))
(define loglevel-db? (make-parameter 'warning))
(define insert? (make-parameter #f))

(define (handle-fields fields)
  (for ([field fields])
       (log-message logger 'info (format "Field: ~a" field) #t)
       (when (insert?)
         (with-handlers ([exn:fail? (lambda (exn)
                                      (log-message logger 'error (format "Failed to insert ~a: ~a" field exn) #t))])
           (apply db-insert-field field)))))

(define (make-ebus-loop7 input-port)
  (lambda ()
    (let loop ()
      (with-handlers ([exn:fail? (lambda (exn)
                                   (log-message logger 'error (format "Failed to parse paket: ~a" exn) #t)
                                   (loop) )])
        (let ([fields (layer7-read-ebus (current-input-port))])
	  (when (not (or (void? fields) (eof-object? fields)))
	    (handle-fields fields))
	  (when (not (eof-object? fields))
	    (loop)))))))

;; Start Thread that observe all given log-receivers
(define (start-logger-thread receiver1 . receiverN)
  (define receivers (cons receiver1 receiverN))
  (void 
   (thread
    (lambda ()
      (let loop ()
	(match (apply sync receivers)
	  [(vector level msg data)
	   (printf "[~s] ~a~n" level msg)
	   (flush-output)])
	(loop))))))

(define-namespace-anchor repl-ns-anchor)

(define (main)
  ;; Parse commandline
  (command-line
   #:once-each
   [("-c" "--connect") host port "Connect to server <host> <port>"
    (connect-host? host)
    (connect-port? (string->number port))]
   ["--tcp-repl" port "Open REPL on TCP <port>"
    (tcp-repl-run (namespace-anchor->namespace repl-ns-anchor) (string->number port))]
   ["--debug-layer2" "Log level for Layer 2 Parser"
    (loglevel-layer2? 'debug)]
   ["--debug-db" "Log level for DB" 
    (loglevel-db? 'debug)]
   ["--insert" "Do Insert into Database"
    (insert? #t)]
   ["--db-user" user "Datanase User"
    (db-con-user? user)]
   ["--db-password" password "Database password"
    (db-con-password? password)]
   ["--db-database" database "Database database-name"
    (db-con-database? database)]
   ["--db-server" server  "Database Address/Server"
    (db-con-server? server)])

  ;; Init Logging
  (start-logger-thread (make-log-receiver logger 'info)
		       (make-log-receiver db-logger (loglevel-db?))
		       (make-log-receiver layer2-logger (loglevel-layer2?))
		       (make-log-receiver layer7-logger 'info)
		       (make-log-receiver tcp-repl-logger 'info))

  ;; Test Database Connection
  (when (not (db-pgc-test))
    (log-message logger 'fatal "Failed to connect to database" #t)
    (exit 1))

  ;; Connect, replacing input with tcp connection
  (if (or (null? (connect-host?)) (null? (connect-port?)))
      (log-message (current-logger) 'info "Using stdin" #t)
      (let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))])
	(log-message logger 'info (format "Connected to ~s ~s ~n" (connect-host?) (connect-port?)) #t)
	(current-input-port cin)))

  ;; Process Ebus Packets
  ((make-ebus-loop7 (current-input-port)) )
  )

(exit (main))