summaryrefslogtreecommitdiff
path: root/src/http_server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/http_server.rs')
-rw-r--r--src/http_server.rs101
1 files changed, 42 insertions, 59 deletions
diff --git a/src/http_server.rs b/src/http_server.rs
index 826163c..0e60e72 100644
--- a/src/http_server.rs
+++ b/src/http_server.rs
@@ -3,92 +3,75 @@ use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use std::boxed::Box;
-use bytes::Bytes;
+use bytes::Bytes;
+use bytes::BytesMut;
use tokio;
use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio::codec::{Encoder, Decoder};
-use tokio_threadpool::Builder;
-use tokio_executor::enter;
-use bytes::BytesMut;
use http::header::HeaderValue;
use http::{Request, Response};
use thread_local::ThreadLocal;
-use system;
-
pub trait HttpHandler<T> {
fn respond(&self, state: &T, req: Request<Bytes>) -> Response<String>;
}
pub fn serve<
T: 'static + Send + Clone + HttpHandler<X>,
- X: Send + Clone + 'static
->(addr: SocketAddr, state: X, handler: T) {
+ X: Send + Clone + 'static,
+>(addr: SocketAddr, state: X, handler: T)
+ -> impl Future<Item=(), Error=()> + Send
+{
let listener = TcpListener::bind(&addr).expect("failed to bind");
info!("Listening on: {}", addr);
let tl_handler: Arc<ThreadLocal<T>> = Arc::new(ThreadLocal::new());
- let tl_state: Arc<ThreadLocal<X>>= Arc::new(ThreadLocal::new());
+ let tl_state: Arc<ThreadLocal<X>> = Arc::new(ThreadLocal::new());
+
+ listener.incoming()
+ .map_err(|e| error!("failed to accept socket; error = {:?}", e))
+ .for_each(move |socket| {
+ let peer_addr = match socket.peer_addr() {
+ Ok(addr) => format!("{}", addr),
+ Err(_) => "<error>".to_string(),
+ };
- let program =
- listener.incoming()
- .map_err(|e| error!("failed to accept socket; error = {:?}", e))
- .for_each(move |socket| {
- let peer_addr = match socket.peer_addr() {
- Ok(addr) => format!("{}", addr),
- Err(_) => "<error>".to_string(),
- };
+ let (tx, rx) =
+ HttpFrame.framed(socket).split();
- let (tx, rx) =
- HttpFrame.framed(socket).split();
+ let tl_handler = tl_handler.clone();
+ let handler = handler.clone();
- let tl_handler = tl_handler.clone();
- let handler = handler.clone();
+ let tl_state = tl_state.clone();
+ let state = state.clone();
- let tl_state = tl_state.clone();
+ let rx_task = rx.and_then(move |req| {
let state = state.clone();
-
- let rx_task = rx.and_then(move |req| {
- let state = state.clone();
- let state = tl_state.get_or(||{
- debug!("Clone state");
- Box::new(state.clone())
- });
- let handler = tl_handler.get_or(|| {
- debug!("Clone handler");
- Box::new(handler.clone())
- });
- info!("{:?} {} {} {:?}", peer_addr, req.method(), req.uri(), req.version());
- let response = handler.respond(&state, req);
- Box::new(future::ok(response))
+ let state = tl_state.get_or(|| {
+ debug!("Clone state");
+ Box::new(state.clone())
});
- let tx_task = tx.send_all(rx_task)
- .then(|res| {
- if let Err(e) = res {
- error!("failed to process connection; error = {:?}", e);
- }
- Ok(())
- });
-
- // Spawn the task that handles the connection.
- tokio::spawn(tx_task);
- Ok(())
+ let handler = tl_handler.get_or(|| {
+ debug!("Clone handler");
+ Box::new(handler.clone())
+ });
+ info!("{:?} {} {} {:?}", peer_addr, req.method(), req.uri(), req.version());
+ let response = handler.respond(&state, req);
+ Box::new(future::ok(response))
});
+ let tx_task = tx.send_all(rx_task)
+ .then(|res| {
+ if let Err(e) = res {
+ error!("failed to process connection; error = {:?}", e);
+ }
+ Ok(())
+ });
-
- let mut builder = Builder::new();
- let runtime = builder
- .name_prefix("httpd-")
- .after_start(|| {
- debug!("Start new worker");
- system::initialize_rng_from_time();
+ // Spawn the task that handles the connection.
+ tokio::spawn(tx_task);
+ Ok(())
})
- .build();
- runtime.spawn(program);
- enter().expect("nested tokio::run")
- .block_on(runtime.shutdown_on_idle())
- .unwrap();
}
///