From a1969362f9b90a88b24e7a78fc6262a0b79e881a Mon Sep 17 00:00:00 2001 From: Jose Quintana Date: Tue, 27 Apr 2021 12:27:19 +0200 Subject: [PATCH] refactor: tokio server tasks independently --- src/server.rs | 433 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 1 file changed, 264 insertions(+), 169 deletions(-) diff --git a/src/server.rs b/src/server.rs index 768478f..57dcbaf 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,11 +1,12 @@ use std::net::{IpAddr, SocketAddr}; +use std::path::PathBuf; use structopt::StructOpt; use warp::Filter; use crate::config::{Config, CONFIG}; use crate::{cache, cors, filters, helpers, logger, rejection, signals, Result}; -/// Define a multi-thread HTTP/HTTPS web server. +/// Define a multi-thread HTTP or HTTP/2 web server. pub struct Server { threads: usize, } @@ -24,7 +25,7 @@ impl Server { Self { threads } } - /// Build and run the `Server` forever on the current thread. + /// Build and run the multi-thread `Server` spawning a new Tokio asynchronous task for it. pub fn run(self) -> Result { tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -32,7 +33,7 @@ impl Server { .worker_threads(self.threads) .build()? .block_on(async { - let r = self.start_server().await; + let r = self.run_server_with_config().await; if r.is_err() { panic!("Server error during start up: {:?}", r.unwrap_err()) } @@ -41,8 +42,8 @@ impl Server { Ok(()) } - /// Run the inner `Warp` server forever on the current thread. - async fn start_server(self) -> Result { + /// Create and run the `Warp` server spawning a new Tokio asynchronous task with the given configuration. + async fn run_server_with_config(self) -> Result { let opts = Config::global(); logger::init(&opts.log_level)?; @@ -68,176 +69,59 @@ impl Server { let (cors_filter_opt, cors_allowed_origins) = cors::get_opt_cors_filter(opts.cors_allow_origins.as_ref()); - // Base fs directory filter - let base_fs_dir_filter = warp::fs::dir(root_dir.clone()) - .map(cache::control_headers) - .with(warp::trace::request()) - .recover(rejection::handle_rejection); - - // Public HEAD endpoint - let public_head = warp::head().and(base_fs_dir_filter.clone()); - - // Public GET endpoint (default) - let public_get_default = warp::get().and(base_fs_dir_filter); - // HTTP/2 + TLS let http2 = opts.http2; let http2_tls_cert_path = &opts.http2_tls_cert; let http2_tls_key_path = &opts.http2_tls_key; - // Public GET/HEAD endpoints with compression (gzip, brotli or none) - match opts.compression.as_ref() { - "brotli" => tokio::task::spawn(async move { - let fs_dir_filter = warp::fs::dir(root_dir) - .map(cache::control_headers) - .with(warp::compression::brotli(true)) - .with(warp::trace::request()) - .recover(rejection::handle_rejection); - - match cors_filter_opt { - Some(cors_filter) => { - tracing::info!( - cors_enabled = ?true, - allowed_origins = ?cors_allowed_origins - ); - - let public_head = public_head.with(cors_filter.clone()); - let public_get_default = public_get_default.with(cors_filter.clone()); - - let public_get = warp::get() - .and(filters::has_accept_encoding("br")) - .and(fs_dir_filter) - .with(cors_filter.clone()); - - let server = warp::serve(public_head.or(public_get).or(public_get_default)); - - if http2 { - server - .tls() - .cert_path(http2_tls_cert_path) - .key_path(http2_tls_key_path) - .run(addr) - .await; - } else { - server.run(addr).await - } - } - None => { - let public_get = warp::get() - .and(filters::has_accept_encoding("br")) - .and(fs_dir_filter); - - let server = warp::serve(public_head.or(public_get).or(public_get_default)); - - if http2 { - server - .tls() - .cert_path(http2_tls_cert_path) - .key_path(http2_tls_key_path) - .run(addr) - .await; - } else { - server.run(addr).await - } - } - } - }), - "gzip" => tokio::task::spawn(async move { - let fs_dir_filter = warp::fs::dir(root_dir) - .map(cache::control_headers) - .with(warp::compression::gzip(true)) - .with(warp::trace::request()) - .recover(rejection::handle_rejection); - - match cors_filter_opt { - Some(cors_filter) => { - tracing::info!( - cors_enabled = ?true, - allowed_origins = ?cors_allowed_origins - ); - - let public_head = public_head.with(cors_filter.clone()); - let public_get_default = public_get_default.with(cors_filter.clone()); - - let public_get = warp::get() - .and(filters::has_accept_encoding("gzip")) - .and(fs_dir_filter) - .with(cors_filter.clone()); - - let server = warp::serve(public_head.or(public_get).or(public_get_default)); - - if http2 { - server - .tls() - .cert_path(http2_tls_cert_path) - .key_path(http2_tls_key_path) - .run(addr) - .await; - } else { - server.run(addr).await - } - } - None => { - let public_get = warp::get() - .and(filters::has_accept_encoding("gzip")) - .and(fs_dir_filter); - - let server = warp::serve(public_head.or(public_get).or(public_get_default)); - - if http2 { - server - .tls() - .cert_path(http2_tls_cert_path) - .key_path(http2_tls_key_path) - .run(addr) - .await; - } else { - server.run(addr).await - } - } - } - }), - _ => tokio::task::spawn(async move { - match cors_filter_opt { - Some(cors_filter) => { - tracing::info!( - cors_enabled = ?true, - allowed_origins = ?cors_allowed_origins - ); - - let public_get = public_get_default.with(cors_filter.clone()); - - let server = warp::serve(public_head.or(public_get)); - - if http2 { - server - .tls() - .cert_path(http2_tls_cert_path) - .key_path(http2_tls_key_path) - .run(addr) - .await; - } else { - server.run(addr).await - } - } - None => { - let server = warp::serve(public_head.or(public_get_default)); - - if http2 { - server - .tls() - .cert_path(http2_tls_cert_path) - .key_path(http2_tls_key_path) - .run(addr) - .await; - } else { - server.run(addr).await - } - } - } - }), - }; + // Spawn a new Tokio asynchronous server task determined by the given compression type (gzip, brotli or none) + // TODO: this can be simplified by replicating something similar to `warp::compression::auto()` but skipping `deflate + // see Warp PR #513 https://github.com/seanmonstar/warp/pull/513 + tokio::task::spawn(async move { + if opts.compression == "brotli" { + run_server_with_brotli_compression( + addr, + root_dir, + http2, + http2_tls_cert_path, + http2_tls_key_path, + cors_filter_opt, + cors_allowed_origins, + ) + .await; + return; + } + + if opts.compression == "gzip" { + run_server_with_gzip_compression( + addr, + root_dir, + http2, + http2_tls_cert_path, + http2_tls_key_path, + cors_filter_opt, + cors_allowed_origins, + ) + .await; + return; + } + + // Fallback HTTP or HTTP/2 server with no compression + run_server_with_no_compression( + addr, + root_dir, + http2, + http2_tls_cert_path, + http2_tls_key_path, + cors_filter_opt, + cors_allowed_origins, + ) + .await + }); + // Handle incoming signals for Unix-like OS's only + // TODO: make this more explicit like v1 + // see https://github.com/joseluisq/static-web-server/blob/1.x/src/server.rs#L96 signals::wait(|sig: signals::Signal| { let code = signals::as_int(sig); tracing::warn!("Signal {} caught. Server execution exited.", code); @@ -248,6 +132,217 @@ impl Server { } } +/// It creates and starts a Warp HTTP or HTTP/2 server with Brotli compression. +pub async fn run_server_with_brotli_compression( + addr: SocketAddr, + root_dir: PathBuf, + http2: bool, + http2_tls_cert_path: &'static str, + http2_tls_key_path: &'static str, + cors_filter_opt: Option, + cors_allowed_origins: String, +) { + // Base fs directory filter + let base_fs_dir_filter = warp::fs::dir(root_dir.clone()) + .map(cache::control_headers) + .with(warp::trace::request()) + .recover(rejection::handle_rejection); + + // Public HEAD endpoint + let public_head = warp::head().and(base_fs_dir_filter.clone()); + + // Public GET endpoint (default) + let public_get_default = warp::get().and(base_fs_dir_filter); + + // Current fs directory filter + let fs_dir_filter = warp::fs::dir(root_dir) + .map(cache::control_headers) + .with(warp::compression::brotli(true)) + .with(warp::trace::request()) + .recover(rejection::handle_rejection); + + // Determine CORS filter + if let Some(cors_filter) = cors_filter_opt { + tracing::info!( + cors_enabled = ?true, + allowed_origins = ?cors_allowed_origins + ); + + let public_head = public_head.with(cors_filter.clone()); + let public_get_default = public_get_default.with(cors_filter.clone()); + + let public_get = warp::get() + .and(filters::has_accept_encoding("br")) + .and(fs_dir_filter) + .with(cors_filter.clone()); + + let server = warp::serve(public_head.or(public_get).or(public_get_default)); + + if http2 { + server + .tls() + .cert_path(http2_tls_cert_path) + .key_path(http2_tls_key_path) + .run(addr) + .await + } else { + server.run(addr).await + } + } else { + let public_get = warp::get() + .and(filters::has_accept_encoding("br")) + .and(fs_dir_filter); + + let server = warp::serve(public_head.or(public_get).or(public_get_default)); + + if http2 { + server + .tls() + .cert_path(http2_tls_cert_path) + .key_path(http2_tls_key_path) + .run(addr) + .await + } else { + server.run(addr).await + } + } +} + +/// It creates and starts a Warp HTTP or HTTP/2 server with GZIP compression. +pub async fn run_server_with_gzip_compression( + addr: SocketAddr, + root_dir: PathBuf, + http2: bool, + http2_tls_cert_path: &'static str, + http2_tls_key_path: &'static str, + cors_filter_opt: Option, + cors_allowed_origins: String, +) { + // Base fs directory filter + let base_fs_dir_filter = warp::fs::dir(root_dir.clone()) + .map(cache::control_headers) + .with(warp::trace::request()) + .recover(rejection::handle_rejection); + + // Public HEAD endpoint + let public_head = warp::head().and(base_fs_dir_filter.clone()); + + // Public GET endpoint (default) + let public_get_default = warp::get().and(base_fs_dir_filter); + + // Current fs directory filter + let fs_dir_filter = warp::fs::dir(root_dir) + .map(cache::control_headers) + .with(warp::compression::gzip(true)) + .with(warp::trace::request()) + .recover(rejection::handle_rejection); + + // Determine CORS filter + if let Some(cors_filter) = cors_filter_opt { + tracing::info!( + cors_enabled = ?true, + allowed_origins = ?cors_allowed_origins + ); + + let public_head = public_head.with(cors_filter.clone()); + let public_get_default = public_get_default.with(cors_filter.clone()); + + let public_get = warp::get() + .and(filters::has_accept_encoding("gzip")) + .and(fs_dir_filter) + .with(cors_filter.clone()); + + let server = warp::serve(public_head.or(public_get).or(public_get_default)); + + if http2 { + server + .tls() + .cert_path(http2_tls_cert_path) + .key_path(http2_tls_key_path) + .run(addr) + .await + } else { + server.run(addr).await + } + } else { + let public_get = warp::get() + .and(filters::has_accept_encoding("gzip")) + .and(fs_dir_filter); + + let server = warp::serve(public_head.or(public_get).or(public_get_default)); + + if http2 { + server + .tls() + .cert_path(http2_tls_cert_path) + .key_path(http2_tls_key_path) + .run(addr) + .await + } else { + server.run(addr).await + } + } +} + +/// It creates and starts a Warp HTTP or HTTP/2 server with no compression. +pub async fn run_server_with_no_compression( + addr: SocketAddr, + root_dir: PathBuf, + http2: bool, + http2_tls_cert_path: &'static str, + http2_tls_key_path: &'static str, + cors_filter_opt: Option, + cors_allowed_origins: String, +) { + // Base fs directory filter + let base_fs_dir_filter = warp::fs::dir(root_dir.clone()) + .map(cache::control_headers) + .with(warp::trace::request()) + .recover(rejection::handle_rejection); + + // Public HEAD endpoint + let public_head = warp::head().and(base_fs_dir_filter.clone()); + + // Public GET endpoint (default) + let public_get_default = warp::get().and(base_fs_dir_filter); + + // Determine CORS filter + if let Some(cors_filter) = cors_filter_opt { + tracing::info!( + cors_enabled = ?true, + allowed_origins = ?cors_allowed_origins + ); + + let public_get = public_get_default.with(cors_filter.clone()); + + let server = warp::serve(public_head.or(public_get)); + + if http2 { + server + .tls() + .cert_path(http2_tls_cert_path) + .key_path(http2_tls_key_path) + .run(addr) + .await + } else { + server.run(addr).await + } + } else { + let server = warp::serve(public_head.or(public_get_default)); + + if http2 { + server + .tls() + .cert_path(http2_tls_cert_path) + .key_path(http2_tls_key_path) + .run(addr) + .await + } else { + server.run(addr).await + } + } +} + impl Default for Server { fn default() -> Self { Self::new() -- libgit2 1.7.2