From 20242a8d3cc2e9a70812f34fcc50c170a654f6c6 Mon Sep 17 00:00:00 2001 From: Yves Fischer Date: Mon, 26 Nov 2018 11:43:49 +0100 Subject: Improve readability - Move tokio runtime to main - Use structopt instead of getopt for option parsing --- src/http_server.rs | 101 ++++++++++++++++++++++------------------------------- 1 file changed, 42 insertions(+), 59 deletions(-) (limited to 'src/http_server.rs') diff --git a/src/http_server.rs b/src/http_server.rs index 826163c..0e60e72 100644 --- a/src/http_server.rs +++ b/src/http_server.rs @@ -3,92 +3,75 @@ use std::io; use std::net::SocketAddr; use std::sync::Arc; use std::boxed::Box; -use bytes::Bytes; +use bytes::Bytes; +use bytes::BytesMut; use tokio; use tokio::net::TcpListener; use tokio::prelude::*; use tokio::codec::{Encoder, Decoder}; -use tokio_threadpool::Builder; -use tokio_executor::enter; -use bytes::BytesMut; use http::header::HeaderValue; use http::{Request, Response}; use thread_local::ThreadLocal; -use system; - pub trait HttpHandler { fn respond(&self, state: &T, req: Request) -> Response; } pub fn serve< T: 'static + Send + Clone + HttpHandler, - X: Send + Clone + 'static ->(addr: SocketAddr, state: X, handler: T) { + X: Send + Clone + 'static, +>(addr: SocketAddr, state: X, handler: T) + -> impl Future + Send +{ let listener = TcpListener::bind(&addr).expect("failed to bind"); info!("Listening on: {}", addr); let tl_handler: Arc> = Arc::new(ThreadLocal::new()); - let tl_state: Arc>= Arc::new(ThreadLocal::new()); + let tl_state: Arc> = Arc::new(ThreadLocal::new()); + + listener.incoming() + .map_err(|e| error!("failed to accept socket; error = {:?}", e)) + .for_each(move |socket| { + let peer_addr = match socket.peer_addr() { + Ok(addr) => format!("{}", addr), + Err(_) => "".to_string(), + }; - let program = - listener.incoming() - .map_err(|e| error!("failed to accept socket; error = {:?}", e)) - .for_each(move |socket| { - let peer_addr = match socket.peer_addr() { - Ok(addr) => format!("{}", addr), - Err(_) => "".to_string(), - }; + let (tx, rx) = + HttpFrame.framed(socket).split(); - let (tx, rx) = - HttpFrame.framed(socket).split(); + let tl_handler = tl_handler.clone(); + let handler = handler.clone(); - let tl_handler = tl_handler.clone(); - let handler = handler.clone(); + let tl_state = tl_state.clone(); + let state = state.clone(); - let tl_state = tl_state.clone(); + let rx_task = rx.and_then(move |req| { let state = state.clone(); - - let rx_task = rx.and_then(move |req| { - let state = state.clone(); - let state = tl_state.get_or(||{ - debug!("Clone state"); - Box::new(state.clone()) - }); - let handler = tl_handler.get_or(|| { - debug!("Clone handler"); - Box::new(handler.clone()) - }); - info!("{:?} {} {} {:?}", peer_addr, req.method(), req.uri(), req.version()); - let response = handler.respond(&state, req); - Box::new(future::ok(response)) + let state = tl_state.get_or(|| { + debug!("Clone state"); + Box::new(state.clone()) }); - let tx_task = tx.send_all(rx_task) - .then(|res| { - if let Err(e) = res { - error!("failed to process connection; error = {:?}", e); - } - Ok(()) - }); - - // Spawn the task that handles the connection. - tokio::spawn(tx_task); - Ok(()) + let handler = tl_handler.get_or(|| { + debug!("Clone handler"); + Box::new(handler.clone()) + }); + info!("{:?} {} {} {:?}", peer_addr, req.method(), req.uri(), req.version()); + let response = handler.respond(&state, req); + Box::new(future::ok(response)) }); + let tx_task = tx.send_all(rx_task) + .then(|res| { + if let Err(e) = res { + error!("failed to process connection; error = {:?}", e); + } + Ok(()) + }); - - let mut builder = Builder::new(); - let runtime = builder - .name_prefix("httpd-") - .after_start(|| { - debug!("Start new worker"); - system::initialize_rng_from_time(); + // Spawn the task that handles the connection. + tokio::spawn(tx_task); + Ok(()) }) - .build(); - runtime.spawn(program); - enter().expect("nested tokio::run") - .block_on(runtime.shutdown_on_idle()) - .unwrap(); } /// -- cgit v1.2.1