diff options
-rw-r--r-- | Cargo.toml | 7 | ||||
-rw-r--r-- | src/http_server.rs | 101 | ||||
-rw-r--r-- | src/main.rs | 96 |
3 files changed, 97 insertions, 107 deletions
@@ -4,9 +4,7 @@ version = "0.1.0" authors = ["Yves Fischer <yvesf+git@xapek.org>"] [dependencies] -ascii = "0.7" time = "0.1" -getopts = "0.2" simple_logger = "0.4" log = "0.3" oath = "0.10.*" @@ -16,9 +14,12 @@ random = "0.12.*" tokio = "0.1.*" tokio-threadpool = "0.1.*" tokio-executor = "0.1.*" +tokio-signal = "0.2.*" +futures = "0.1.*" http = "0.1.*" bytes = "0.4.*" httparse = "1.3.*" thread_local = "0.3.*" cookie = "0.11.*" -url = "1.7.*"
\ No newline at end of file +url = "1.7.*" +structopt = "0.2.*"
\ No newline at end of file diff --git a/src/http_server.rs b/src/http_server.rs index 826163c..0e60e72 100644 --- a/src/http_server.rs +++ b/src/http_server.rs @@ -3,92 +3,75 @@ use std::io; use std::net::SocketAddr; use std::sync::Arc; use std::boxed::Box; -use bytes::Bytes; +use bytes::Bytes; +use bytes::BytesMut; use tokio; use tokio::net::TcpListener; use tokio::prelude::*; use tokio::codec::{Encoder, Decoder}; -use tokio_threadpool::Builder; -use tokio_executor::enter; -use bytes::BytesMut; use http::header::HeaderValue; use http::{Request, Response}; use thread_local::ThreadLocal; -use system; - pub trait HttpHandler<T> { fn respond(&self, state: &T, req: Request<Bytes>) -> Response<String>; } pub fn serve< T: 'static + Send + Clone + HttpHandler<X>, - X: Send + Clone + 'static ->(addr: SocketAddr, state: X, handler: T) { + X: Send + Clone + 'static, +>(addr: SocketAddr, state: X, handler: T) + -> impl Future<Item=(), Error=()> + Send +{ let listener = TcpListener::bind(&addr).expect("failed to bind"); info!("Listening on: {}", addr); let tl_handler: Arc<ThreadLocal<T>> = Arc::new(ThreadLocal::new()); - let tl_state: Arc<ThreadLocal<X>>= Arc::new(ThreadLocal::new()); + let tl_state: Arc<ThreadLocal<X>> = Arc::new(ThreadLocal::new()); + + listener.incoming() + .map_err(|e| error!("failed to accept socket; error = {:?}", e)) + .for_each(move |socket| { + let peer_addr = match socket.peer_addr() { + Ok(addr) => format!("{}", addr), + Err(_) => "<error>".to_string(), + }; - let program = - listener.incoming() - .map_err(|e| error!("failed to accept socket; error = {:?}", e)) - .for_each(move |socket| { - let peer_addr = match socket.peer_addr() { - Ok(addr) => format!("{}", addr), - Err(_) => "<error>".to_string(), - }; + let (tx, rx) = + HttpFrame.framed(socket).split(); - let (tx, rx) = - HttpFrame.framed(socket).split(); + let tl_handler = tl_handler.clone(); + let handler = handler.clone(); - let tl_handler = tl_handler.clone(); - let handler = handler.clone(); + let tl_state = tl_state.clone(); + let state = state.clone(); - let tl_state = tl_state.clone(); + let rx_task = rx.and_then(move |req| { let state = state.clone(); - - let rx_task = rx.and_then(move |req| { - let state = state.clone(); - let state = tl_state.get_or(||{ - debug!("Clone state"); - Box::new(state.clone()) - }); - let handler = tl_handler.get_or(|| { - debug!("Clone handler"); - Box::new(handler.clone()) - }); - info!("{:?} {} {} {:?}", peer_addr, req.method(), req.uri(), req.version()); - let response = handler.respond(&state, req); - Box::new(future::ok(response)) + let state = tl_state.get_or(|| { + debug!("Clone state"); + Box::new(state.clone()) }); - let tx_task = tx.send_all(rx_task) - .then(|res| { - if let Err(e) = res { - error!("failed to process connection; error = {:?}", e); - } - Ok(()) - }); - - // Spawn the task that handles the connection. - tokio::spawn(tx_task); - Ok(()) + let handler = tl_handler.get_or(|| { + debug!("Clone handler"); + Box::new(handler.clone()) + }); + info!("{:?} {} {} {:?}", peer_addr, req.method(), req.uri(), req.version()); + let response = handler.respond(&state, req); + Box::new(future::ok(response)) }); + let tx_task = tx.send_all(rx_task) + .then(|res| { + if let Err(e) = res { + error!("failed to process connection; error = {:?}", e); + } + Ok(()) + }); - - let mut builder = Builder::new(); - let runtime = builder - .name_prefix("httpd-") - .after_start(|| { - debug!("Start new worker"); - system::initialize_rng_from_time(); + // Spawn the task that handles the connection. + tokio::spawn(tx_task); + Ok(()) }) - .build(); - runtime.spawn(program); - enter().expect("nested tokio::run") - .block_on(runtime.shutdown_on_idle()) - .unwrap(); } /// diff --git a/src/main.rs b/src/main.rs index bb3c57e..2d5218f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,23 +1,18 @@ #![feature(test)] -#![feature(convert_id)] -#![feature(proc_macro_hygiene)] -#![feature(try_from)] #![feature(duration_as_u128)] -#![feature(libc)] -use std::env; use std::sync::Arc; use std::thread; use std::sync::atomic; use std::net::SocketAddr; -extern crate ascii; -extern crate getopts; #[macro_use] extern crate log; extern crate tokio; extern crate tokio_threadpool; extern crate tokio_executor; +extern crate tokio_signal; +extern crate futures; extern crate time; extern crate simple_logger; extern crate oath; @@ -32,10 +27,14 @@ extern crate bytes; extern crate thread_local; extern crate cookie; extern crate url; +extern crate structopt; -use getopts::Options; +use structopt::StructOpt; use log::LogLevel::{Debug, Warn}; use time::Duration; +use futures::{Future, Stream}; +use tokio_threadpool::Builder; +use tokio_executor::enter; mod auth_handler; mod cookie_store; @@ -44,8 +43,6 @@ mod router; mod system; mod totp; -extern crate libc; - use cookie_store::CookieStore; #[derive(Clone)] @@ -54,53 +51,62 @@ pub struct ApplicationState { cookie_max_age: Duration, } -fn print_usage(program: &str, opts: &Options) { - let brief = format!("Usage: {} [options]", program); - print!("{}", opts.usage(&brief)); +#[derive(Debug, StructOpt)] +#[structopt(name = "nginx-auth-totp")] +struct Opt { + #[structopt(short = "l", long = "port", default_value = "127.0.0.1:8080")] + addr: SocketAddr, + #[structopt(short = "d", long = "debug")] + debug: bool, } fn main() { - let args: Vec<String> = env::args().collect(); - let program = args[0].clone(); - let mut opts = Options::new(); - opts.optopt("l", "port", "Listen address", "LISTEN-ADDR"); - opts.optflag("d", "debug", "Use loglevel Debug instead of Warn"); - opts.optflag("h", "help", "print this help menu"); - let matches = opts.parse(&args[1..]).unwrap_or_else(|f| panic!(f.to_string())); - - if matches.opt_present("h") { - print_usage(&program, &opts); - return; - } - - simple_logger::init_with_level(if matches.opt_present("d") { Debug } else { Warn }) + let opt = Opt::from_args(); + simple_logger::init_with_level(if opt.debug { Debug } else { Warn }) .unwrap_or_else(|_| panic!("Failed to initialize logger")); - - let addr = matches.opt_str("l").unwrap_or_else(||"127.0.0.1:8080".to_string()); - let addr = addr.parse::<SocketAddr>() - .unwrap_or_else(|_| panic!("Failed to parse LISTEN-ADDRESS")); - - - // concurrent eventual consistent hashmap with <cookie-id, timeout> let state = ApplicationState { cookie_store: CookieStore::new(), cookie_max_age: Duration::days(1) }; let server_shutdown_condvar = Arc::new(atomic::AtomicBool::new(false)); - let cookie_clean_thread_condvar = server_shutdown_condvar.clone(); - let cookie_clean_state = state.clone(); - let cookie_clean_thread = thread::spawn(move || { - while !cookie_clean_thread_condvar.load(atomic::Ordering::Relaxed) { - thread::sleep(std::time::Duration::from_secs(60)); - debug!("Clean cookie cache"); - cookie_clean_state.cookie_store.clean_outdated_cookies(); - } - }); + let cookie_clean_thread = { + let server_shutdown_condvar = server_shutdown_condvar.clone(); + let state = state.clone(); + thread::spawn(move || { + thread::park_timeout(std::time::Duration::from_secs(10)); + while !server_shutdown_condvar.load(atomic::Ordering::Relaxed) { + info!("Clean cookie cache"); + state.cookie_store.clean_outdated_cookies(); + thread::park_timeout(std::time::Duration::from_secs(60)); + } + }) + }; let auth_handler = auth_handler::AuthHandler::make(); - http_server::serve(addr, state, auth_handler); + let runtime = Builder::new() + .name_prefix("httpd-") + .after_start(|| { + debug!("Start new worker: {}", thread::current().name().unwrap_or("-")); + system::initialize_rng_from_time(); + }) + .build(); + + let program = http_server::serve(opt.addr, state, auth_handler); + runtime.spawn(program); + + let ctrl_c_block = tokio_signal::ctrl_c() + .flatten_stream().take(1).for_each(|()| { + info!("ctrl-c received"); + Ok(()) + }); + + enter().expect("nested tokio::run") + .block_on(ctrl_c_block) + .unwrap(); + runtime.shutdown(); + info!("Waiting for cookie cleanup thread to stop"); server_shutdown_condvar.store(true, atomic::Ordering::Relaxed); - debug!("Waiting for cleanup thread to shutdown"); + cookie_clean_thread.thread().unpark(); cookie_clean_thread.join().unwrap(); } |