summaryrefslogtreecommitdiff
path: root/ebus-racket/inserter.rkt
blob: 9aaca780f63d7f7effe121421ef8745e9a0cddcf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#! /usr/bin/env racket
#lang racket/base
(require racket/cmdline
	 racket/tcp
	 racket/pretty
	 racket/match
	 "db.rkt"
	 "layer7.rkt"
	 "layer2.rkt")

(define logger (make-logger 'ebus-inserter (current-logger)))

(define connect-host? (make-parameter null))
(define connect-port? (make-parameter null))
(define loglevel-layer2? (make-parameter 'info))
(define loglevel-db? (make-parameter 'info))
(define insert? (make-parameter #f))

(define (handle-fields fields)
  (for ([field fields])
       (log-message logger 'info (format "Field: ~a" field) #t)
       (when (insert?)
	 (apply db-insert-field field))))
  
(define (read-ebus-loop7 input-port)
  (let ([fields (layer7-read-ebus (current-input-port))])
    (when (not (or (void? fields) (eof-object? fields)))
		(handle-fields fields))
    (cond ((not (eof-object? fields)) (read-ebus-loop7 input-port)))))

;; Start Thread that observe all given log-receivers
(define (start-logger-thread receiver1 . receiverN)
  (define receivers (cons receiver1 receiverN))
  (void 
   (thread
    (lambda ()
      (let loop ()
	(match (apply sync receivers)
	  [(vector level msg data)
	   (printf "[~s] ~a~n" level msg)
	   (flush-output)])
	(loop))))))

(define (main)
  ;; Parse commandline
  (command-line
   #:once-each
   [("-c" "--connect") host port "Connect to server <host> <port>"
    (connect-host? host)
    (connect-port? (string->number port))]
   ["--debug-layer2" "Log level for Layer 2 Parser"
    (loglevel-layer2? 'debug)]
   ["--debug-db" "Log level for DB" 
    (loglevel-db? 'debug)]
   ["--insert" "Do Insert into Database"
    (insert? #t)]
   ["--db-user" user "Datanase User"
    (db-con-user? user)]
   ["--db-password" password "Database password"
    (db-con-password? password)]
   ["--db-database" database "Database database-name"
    (db-con-database? database)]
   ["--db-server" server  "Database Address/Server"
    (db-con-server? server)])

  ;; Init Logging
  (start-logger-thread (make-log-receiver logger 'info)
		       (make-log-receiver db-logger (loglevel-db?))
		       (make-log-receiver layer2-logger (loglevel-layer2?))
		       (make-log-receiver layer7-logger 'info))

  ;; Test Database Connection
  (when (not (db-pgc-test))
    (display "Failed to connect database")
    (exit 1))

  ;; Connect, replacing input with tcp connection
  (if (or (null? (connect-host?)) (null? (connect-port?)))
      (log-message (current-logger) 'info "Using stdin" #t)
      (let-values ([(cin cout) (tcp-connect (connect-host?) (connect-port?))])
	(display (format "Connected to ~s ~s ~n" (connect-host?) (connect-port?)))
	(current-input-port cin)))

  ;; Process Ebus Packets
  (read-ebus-loop7 (current-input-port))
  )

(exit (main))