From b94fe72c9c26962e00382fa21f46d4774b56d781 Mon Sep 17 00:00:00 2001 From: Jose Quintana Date: Sat, 23 Jan 2021 00:48:25 +0100 Subject: [PATCH] refactor: core modules --- src/bin/server.rs | 198 ++---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- src/cache.rs | 45 +++++++++++++++++++++++++++++++++++++++++++++ src/config.rs | 63 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/core/cache.rs | 45 --------------------------------------------- src/core/config.rs | 63 --------------------------------------------------------------- src/core/cors.rs | 36 ------------------------------------ src/core/helpers.rs | 37 ------------------------------------- src/core/logger.rs | 17 ----------------- src/core/mod.rs | 12 ------------ src/core/rejection.rs | 38 -------------------------------------- src/core/result.rs | 4 ---- src/core/signals.rs | 51 --------------------------------------------------- src/cors.rs | 36 ++++++++++++++++++++++++++++++++++++ src/error.rs | 10 ++++++++++ src/helpers.rs | 37 +++++++++++++++++++++++++++++++++++++ src/lib.rs | 14 +++++++++++++- src/logger.rs | 17 +++++++++++++++++ src/rejection.rs | 40 ++++++++++++++++++++++++++++++++++++++++ src/server.rs | 216 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/signals.rs | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ 20 files changed, 530 insertions(+), 500 deletions(-) create mode 100644 src/cache.rs create mode 100644 src/config.rs delete mode 100644 src/core/cache.rs delete mode 100644 src/core/config.rs delete mode 100644 src/core/cors.rs delete mode 100644 src/core/helpers.rs delete mode 100644 src/core/logger.rs delete mode 100644 src/core/mod.rs delete mode 100644 src/core/rejection.rs delete mode 100644 src/core/result.rs delete mode 100644 src/core/signals.rs create mode 100644 src/cors.rs create mode 100644 src/error.rs create mode 100644 src/helpers.rs create mode 100644 src/logger.rs create mode 100644 src/rejection.rs create mode 100644 src/server.rs create mode 100644 src/signals.rs diff --git a/src/bin/server.rs b/src/bin/server.rs index 7c3aa44..65997fc 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -6,205 +6,11 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; extern crate static_web_server; +use self::static_web_server::*; use structopt::StructOpt; -use warp::Filter; - -use self::static_web_server::core::*; - -/// It creates a new server instance with given options. -async fn server(opts: config::Options) -> Result { - logger::init(&opts.log_level)?; - - let host = opts.host.parse::()?; - let port = opts.port; - - // Check a valid root directory - let root_dir = helpers::get_valid_dirpath(opts.root)?; - - // Custom error pages content - let page404 = helpers::read_file_content(opts.page404.as_ref()); - let page50x = helpers::read_file_content(opts.page50x.as_ref()); - let page404_a = page404.clone(); - let page50x_a = page50x.clone(); - - // CORS support - let (cors_filter, cors_allowed_origins) = - cors::get_opt_cors_filter(opts.cors_allow_origins.as_ref()); - - // Base fs directory filter - let base_dir_filter = warp::fs::dir(root_dir.clone()) - .map(cache::control_headers) - .with(warp::trace::request()) - .recover(move |rej| { - let page404_a = page404_a.clone(); - let page50x_a = page50x_a.clone(); - async move { rejection::handle_rejection(page404_a, page50x_a, rej).await } - }); - - // Public HEAD endpoint - let public_head = warp::head().and(base_dir_filter.clone()); - - // Public GET endpoint (default) - let public_get_default = warp::get().and(base_dir_filter.clone()); - - // Public GET/HEAD endpoints with compression (deflate, gzip, brotli, none) - match opts.compression.as_ref() { - "brotli" => tokio::task::spawn(async move { - let with_dir = warp::fs::dir(root_dir) - .map(cache::control_headers) - .with(warp::trace::request()) - .with(warp::compression::brotli(true)) - .recover(move |rej| { - let page404 = page404.clone(); - let page50x = page50x.clone(); - async move { rejection::handle_rejection(page404, page50x, rej).await } - }); - - if let Some(cors_filter) = cors_filter { - tracing::info!( - cors_enabled = ?true, - allowed_origins = ?cors_allowed_origins - ); - warp::serve( - public_head.with(cors_filter.clone()).or(warp::get() - .and(cache::has_accept_encoding("br")) - .and(with_dir) - .with(cors_filter.clone()) - .or(public_get_default.with(cors_filter))), - ) - .run((host, port)) - .await - } else { - warp::serve( - public_head.or(warp::get() - .and(cache::has_accept_encoding("br")) - .and(with_dir) - .or(public_get_default)), - ) - .run((host, port)) - .await - } - }), - "deflate" => tokio::task::spawn(async move { - let with_dir = warp::fs::dir(root_dir) - .map(cache::control_headers) - .with(warp::trace::request()) - .with(warp::compression::deflate(true)) - .recover(move |rej| { - let page404 = page404.clone(); - let page50x = page50x.clone(); - async move { rejection::handle_rejection(page404, page50x, rej).await } - }); - - if let Some(cors_filter) = cors_filter { - tracing::info!( - cors_enabled = ?true, - allowed_origins = ?cors_allowed_origins - ); - warp::serve( - public_head.with(cors_filter.clone()).or(warp::get() - .and(cache::has_accept_encoding("deflate")) - .and(with_dir) - .with(cors_filter.clone()) - .or(public_get_default.with(cors_filter))), - ) - .run((host, port)) - .await - } else { - warp::serve( - public_head.or(warp::get() - .and(cache::has_accept_encoding("deflate")) - .and(with_dir) - .or(public_get_default)), - ) - .run((host, port)) - .await - } - }), - "gzip" => tokio::task::spawn(async move { - let with_dir = warp::fs::dir(root_dir) - .map(cache::control_headers) - .with(warp::trace::request()) - .with(warp::compression::gzip(true)) - .recover(move |rej| { - let page404 = page404.clone(); - let page50x = page50x.clone(); - async move { rejection::handle_rejection(page404, page50x, rej).await } - }); - - if let Some(cors_filter) = cors_filter { - tracing::info!( - cors_enabled = ?true, - allowed_origins = ?cors_allowed_origins - ); - warp::serve( - public_head.with(cors_filter.clone()).or(warp::get() - .and(cache::has_accept_encoding("gzip")) - .and(with_dir) - .with(cors_filter.clone()) - .or(public_get_default.with(cors_filter))), - ) - .run((host, port)) - .await - } else { - warp::serve( - public_head.or(warp::get() - .and(cache::has_accept_encoding("gzip")) - .and(with_dir) - .or(public_get_default)), - ) - .run((host, port)) - .await - } - }), - _ => tokio::task::spawn(async move { - if let Some(cors_filter) = cors_filter { - tracing::info!( - cors_enabled = ?true, - allowed_origins = ?cors_allowed_origins - ); - let public_get_default = warp::get() - .and(base_dir_filter.clone()) - .with(cors_filter.clone()); - warp::serve(public_head.or(public_get_default.with(cors_filter))) - .run((host, port)) - .await - } else { - warp::serve(public_head.or(public_get_default)) - .run((host, port)) - .await - } - }), - }; - - signals::wait(|sig: signals::Signal| { - let code = signals::as_int(sig); - tracing::warn!("Signal {} caught. Server execution exited.", code); - std::process::exit(code) - }); - - Ok(()) -} fn main() -> Result { - let opts = config::Options::from_args(); - let n = if opts.threads_multiplier == 0 { - 1 - } else { - opts.threads_multiplier - }; - let threads = num_cpus::get() * n; - - tokio::runtime::Builder::new_multi_thread() - .worker_threads(threads) - .enable_all() - .build()? - .block_on(async { - let r = server(opts).await; - if r.is_err() { - panic!("Server error: {:?}", r.unwrap_err()) - } - }); + server::Server::new(config::Config::from_args()).run()?; Ok(()) } diff --git a/src/cache.rs b/src/cache.rs new file mode 100644 index 0000000..a9f93dd --- /dev/null +++ b/src/cache.rs @@ -0,0 +1,45 @@ +const CACHE_EXT_ONE_HOUR: [&str; 4] = ["atom", "json", "rss", "xml"]; +const CACHE_EXT_ONE_YEAR: [&str; 30] = [ + "bmp", "bz2", "css", "map", "doc", "gif", "gz", "htc", "ico", "jpg", "mp3", "mp4", "ogg", + "ogv", "pdf", "png", "rar", "tar", "tgz", "wav", "weba", "webm", "webp", "woff", "zip", "jpeg", + "js", "mjs", "rtf", "woff2", +]; + +/// It applies the corresponding Cache-Control headers based on a set of file types. +pub fn control_headers(res: warp::fs::File) -> warp::reply::WithHeader { + // Default max-age value in seconds (one day) + let mut max_age = 60 * 60 * 24_u64; + + if let Some(ext) = res.path().extension() { + if let Some(ext) = ext.to_str() { + if CACHE_EXT_ONE_HOUR.iter().any(|x| *x == ext) { + max_age = 60 * 60; + } else if CACHE_EXT_ONE_YEAR.iter().any(|x| *x == ext) { + max_age = 60 * 60 * 24 * 365; + } + } + } + + // HTML file types and others + warp::reply::with_header( + res, + "cache-control", + [ + "public, max-age=".to_string(), + duration(max_age).to_string(), + ] + .concat(), + ) +} + +/// It caps a duration value at ~136 years. +fn duration(n: u64) -> u32 { + std::cmp::min(n, u32::MAX as u64) as u32 +} + +/// Warp filter in order to check for an `Accept-Encoding` header value. +pub fn has_accept_encoding( + val: &'static str, +) -> impl warp::Filter + Copy { + warp::header::contains("accept-encoding", val) +} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..7631c96 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,63 @@ +use structopt::StructOpt; + +/// Static Web Server +#[derive(Debug, StructOpt)] +pub struct Config { + #[structopt(long, short = "a", default_value = "::", env = "SERVER_HOST")] + /// Host address (E.g 127.0.0.1 or ::1) + pub host: String, + + #[structopt(long, short = "p", default_value = "80", env = "SERVER_PORT")] + /// Host port + pub port: u16, + + #[structopt( + long, + short = "n", + default_value = "8", + env = "SERVER_THREADS_MULTIPLIER" + )] + /// Number of worker threads multiplier that'll be multiplied by the number of system CPUs + /// using the formula: `worker threads = number of CPUs * n` where `n` is the value that changes here. + /// When multiplier value is 0 or 1 then the `number of CPUs` is used. + /// Number of worker threads result should be a number between 1 and 32,768 though it is advised to keep this value on the smaller side. + pub threads_multiplier: usize, + + #[structopt(long, short = "d", default_value = "./public", env = "SERVER_ROOT")] + /// Root directory path of static files + pub root: String, + + #[structopt( + long, + default_value = "./public/50x.html", + env = "SERVER_ERROR_PAGE_50X" + )] + /// HTML file path for 50x errors. If path is not specified or simply don't exists then server will use a generic HTML error message. + pub page50x: String, + + #[structopt( + long, + default_value = "./public/404.html", + env = "SERVER_ERROR_PAGE_404" + )] + /// HTML file path for 404 errors. If path is not specified or simply don't exists then server will use a generic HTML error message. + pub page404: String, + + #[structopt(long, short = "x", default_value = "gzip", env = "SERVER_COMPRESSION")] + /// Compression body support for web text-based file types. Values: "gzip", "deflate" or "brotli". + /// Use an empty value to skip compression. + pub compression: String, + + #[structopt(long, short = "g", default_value = "error", env = "SERVER_LOG_LEVEL")] + /// Specify a logging level in lower case. + pub log_level: String, + + #[structopt( + long, + short = "c", + default_value = "", + env = "SERVER_CORS_ALLOW_ORIGINS" + )] + /// Specify a optional CORS list of allowed origin hosts separated by comas. Host ports or protocols aren't being checked. Use an asterisk (*) to allow any host. + pub cors_allow_origins: String, +} diff --git a/src/core/cache.rs b/src/core/cache.rs deleted file mode 100644 index a9f93dd..0000000 --- a/src/core/cache.rs +++ /dev/null @@ -1,45 +0,0 @@ -const CACHE_EXT_ONE_HOUR: [&str; 4] = ["atom", "json", "rss", "xml"]; -const CACHE_EXT_ONE_YEAR: [&str; 30] = [ - "bmp", "bz2", "css", "map", "doc", "gif", "gz", "htc", "ico", "jpg", "mp3", "mp4", "ogg", - "ogv", "pdf", "png", "rar", "tar", "tgz", "wav", "weba", "webm", "webp", "woff", "zip", "jpeg", - "js", "mjs", "rtf", "woff2", -]; - -/// It applies the corresponding Cache-Control headers based on a set of file types. -pub fn control_headers(res: warp::fs::File) -> warp::reply::WithHeader { - // Default max-age value in seconds (one day) - let mut max_age = 60 * 60 * 24_u64; - - if let Some(ext) = res.path().extension() { - if let Some(ext) = ext.to_str() { - if CACHE_EXT_ONE_HOUR.iter().any(|x| *x == ext) { - max_age = 60 * 60; - } else if CACHE_EXT_ONE_YEAR.iter().any(|x| *x == ext) { - max_age = 60 * 60 * 24 * 365; - } - } - } - - // HTML file types and others - warp::reply::with_header( - res, - "cache-control", - [ - "public, max-age=".to_string(), - duration(max_age).to_string(), - ] - .concat(), - ) -} - -/// It caps a duration value at ~136 years. -fn duration(n: u64) -> u32 { - std::cmp::min(n, u32::MAX as u64) as u32 -} - -/// Warp filter in order to check for an `Accept-Encoding` header value. -pub fn has_accept_encoding( - val: &'static str, -) -> impl warp::Filter + Copy { - warp::header::contains("accept-encoding", val) -} diff --git a/src/core/config.rs b/src/core/config.rs deleted file mode 100644 index d8dfe96..0000000 --- a/src/core/config.rs +++ /dev/null @@ -1,63 +0,0 @@ -use structopt::StructOpt; - -/// Static Web Server -#[derive(Debug, StructOpt)] -pub struct Options { - #[structopt(long, short = "a", default_value = "::", env = "SERVER_HOST")] - /// Host address (E.g 127.0.0.1 or ::1) - pub host: String, - - #[structopt(long, short = "p", default_value = "80", env = "SERVER_PORT")] - /// Host port - pub port: u16, - - #[structopt( - long, - short = "n", - default_value = "8", - env = "SERVER_THREADS_MULTIPLIER" - )] - /// Number of worker threads multiplier that'll be multiplied by the number of system CPUs - /// using the formula: `worker threads = number of CPUs * n` where `n` is the value that changes here. - /// When multiplier value is 0 or 1 then the `number of CPUs` is used. - /// Number of worker threads result should be a number between 1 and 32,768 though it is advised to keep this value on the smaller side. - pub threads_multiplier: usize, - - #[structopt(long, short = "d", default_value = "./public", env = "SERVER_ROOT")] - /// Root directory path of static files - pub root: String, - - #[structopt( - long, - default_value = "./public/50x.html", - env = "SERVER_ERROR_PAGE_50X" - )] - /// HTML file path for 50x errors. If path is not specified or simply don't exists then server will use a generic HTML error message. - pub page50x: String, - - #[structopt( - long, - default_value = "./public/404.html", - env = "SERVER_ERROR_PAGE_404" - )] - /// HTML file path for 404 errors. If path is not specified or simply don't exists then server will use a generic HTML error message. - pub page404: String, - - #[structopt(long, short = "x", default_value = "gzip", env = "SERVER_COMPRESSION")] - /// Compression body support for web text-based file types. Values: "gzip", "deflate" or "brotli". - /// Use an empty value to skip compression. - pub compression: String, - - #[structopt(long, short = "g", default_value = "error", env = "SERVER_LOG_LEVEL")] - /// Specify a logging level in lower case. - pub log_level: String, - - #[structopt( - long, - short = "c", - default_value = "", - env = "SERVER_CORS_ALLOW_ORIGINS" - )] - /// Specify a optional CORS list of allowed origin hosts separated by comas. Host ports or protocols aren't being checked. Use an asterisk (*) to allow any host. - pub cors_allow_origins: String, -} diff --git a/src/core/cors.rs b/src/core/cors.rs deleted file mode 100644 index 9482ba9..0000000 --- a/src/core/cors.rs +++ /dev/null @@ -1,36 +0,0 @@ -use std::collections::HashSet; -use warp::filters::cors::Builder; - -/// Warp filter which provides an optional CORS if its supported. -pub fn get_opt_cors_filter(origins: &str) -> (Option, String) { - let mut cors_allowed_hosts = String::new(); - let cors_filter = if origins.is_empty() { - None - } else if origins == "*" { - cors_allowed_hosts = origins.into(); - Some( - warp::cors() - .allow_any_origin() - .allow_methods(vec!["GET", "HEAD", "OPTIONS"]), - ) - } else { - cors_allowed_hosts = origins.into(); - let hosts = cors_allowed_hosts - .split(',') - .map(|s| s.trim().as_ref()) - .collect::>(); - - if hosts.is_empty() { - cors_allowed_hosts = hosts.into_iter().collect::>().join(", "); - None - } else { - Some( - warp::cors() - .allow_origins(hosts) - .allow_methods(vec!["GET", "HEAD", "OPTIONS"]), - ) - } - }; - - (cors_filter, cors_allowed_hosts) -} diff --git a/src/core/helpers.rs b/src/core/helpers.rs deleted file mode 100644 index c9a04cb..0000000 --- a/src/core/helpers.rs +++ /dev/null @@ -1,37 +0,0 @@ -use std::fs; -use std::path::{Path, PathBuf}; - -use super::Result; - -/// Validate and return a directory path. -pub fn get_valid_dirpath>(path: P) -> Result -where - PathBuf: From

, -{ - match PathBuf::from(path) { - v if !v.exists() => bail!("path \"{:?}\" was not found or inaccessible", &v), - v if !v.is_dir() => bail!("path \"{:?}\" is not a valid directory", &v), - v => Ok(v), - } -} - -/// Get the directory name of a valid directory path. -pub fn get_dirname>(path: P) -> Result -where - PathBuf: From

, -{ - let path = get_valid_dirpath(path)?; - match path.iter().last() { - Some(v) => Ok(v.to_str().unwrap().to_string()), - _ => bail!("directory name for path \"{:?}\" was not determined", path), - } -} - -// Read the entire contents of a file into a string if it's valid or empty otherwise. -pub fn read_file_content(p: &str) -> String { - if !p.is_empty() && Path::new(p).exists() { - return fs::read_to_string(p).unwrap_or_default(); - } - - String::new() -} diff --git a/src/core/logger.rs b/src/core/logger.rs deleted file mode 100644 index 759dd4f..0000000 --- a/src/core/logger.rs +++ /dev/null @@ -1,17 +0,0 @@ -use tracing::Level; -use tracing_subscriber::fmt::format::FmtSpan; - -use super::Result; - -/// Initialize logging builder with its levels. -pub fn init(level: &str) -> Result { - let level = level.parse::()?; - match tracing_subscriber::fmt() - .with_max_level(level) - .with_span_events(FmtSpan::CLOSE) - .try_init() - { - Err(err) => Err(anyhow!(err)), - _ => Ok(()), - } -} diff --git a/src/core/mod.rs b/src/core/mod.rs deleted file mode 100644 index 567cfc7..0000000 --- a/src/core/mod.rs +++ /dev/null @@ -1,12 +0,0 @@ -pub mod cache; -pub mod config; -pub mod cors; -pub mod helpers; -pub mod logger; -pub mod rejection; -pub mod signals; - -#[macro_use] -pub mod result; - -pub use result::*; diff --git a/src/core/rejection.rs b/src/core/rejection.rs deleted file mode 100644 index d2fdf61..0000000 --- a/src/core/rejection.rs +++ /dev/null @@ -1,38 +0,0 @@ -use anyhow::Result; -use std::convert::Infallible; -use warp::http::StatusCode; -use warp::{Rejection, Reply}; - -// It receives a `Rejection` and tries to return a HTML error reply. -pub async fn handle_rejection( - page_404: String, - page_50x: String, - err: Rejection, -) -> Result { - let mut content = String::new(); - let code = if err.is_not_found() { - content = page_404; - StatusCode::NOT_FOUND - } else if err - .find::() - .is_some() - { - StatusCode::BAD_REQUEST - } else if err.find::().is_some() { - StatusCode::METHOD_NOT_ALLOWED - } else if err.find::().is_some() { - StatusCode::UNSUPPORTED_MEDIA_TYPE - } else { - content = page_50x; - StatusCode::INTERNAL_SERVER_ERROR - }; - - if content.is_empty() { - content = format!( - "{}

{}

", - code, code - ); - } - - Ok(warp::reply::with_status(warp::reply::html(content), code)) -} diff --git a/src/core/result.rs b/src/core/result.rs deleted file mode 100644 index a183171..0000000 --- a/src/core/result.rs +++ /dev/null @@ -1,4 +0,0 @@ -/// Convenient result return type alias of `anyhow::Result` -pub type Result = anyhow::Result; - -pub use anyhow::Context; diff --git a/src/core/signals.rs b/src/core/signals.rs deleted file mode 100644 index 6947b11..0000000 --- a/src/core/signals.rs +++ /dev/null @@ -1,51 +0,0 @@ -use nix::errno::Errno; -use nix::libc::c_int; -use nix::sys::signal::{SIGCHLD, SIGINT, SIGTERM}; -use nix::sys::wait::WaitStatus::{Exited, Signaled, StillAlive}; -use nix::sys::wait::{waitpid, WaitPidFlag}; -use nix::Error; - -pub use signal::Signal; - -/// It waits for an incoming Termination Signal like Ctrl+C (SIGINT), SIGTERM, etc -pub fn wait(func: F) -where - F: Fn(signal::Signal), -{ - let sig_trap = signal::trap::Trap::trap(&[SIGTERM, SIGINT, SIGCHLD]); - for sig in sig_trap { - match sig { - SIGCHLD => { - // Current std::process::Command ip does not have a way to find - // process id, so we just wait until we have no children - loop { - match waitpid(None, Some(WaitPidFlag::WNOHANG)) { - Ok(Exited(pid, status)) => { - println!("{} exited with status {}", pid, status); - continue; - } - Ok(Signaled(pid, sig, _)) => { - println!("{} killed by {}", pid, sig as c_int); - continue; - } - Ok(StillAlive) => break, - Ok(status) => { - println!("Temporary status {:?}", status); - continue; - } - Err(Error::Sys(Errno::ECHILD)) => return, - Err(e) => { - panic!("Error {:?}", e); - } - } - } - } - sig => func(sig), - } - } -} - -/// It casts a given `signal::Signal` to `i32`. -pub fn as_int(sig: signal::Signal) -> i32 { - sig as c_int -} diff --git a/src/cors.rs b/src/cors.rs new file mode 100644 index 0000000..9482ba9 --- /dev/null +++ b/src/cors.rs @@ -0,0 +1,36 @@ +use std::collections::HashSet; +use warp::filters::cors::Builder; + +/// Warp filter which provides an optional CORS if its supported. +pub fn get_opt_cors_filter(origins: &str) -> (Option, String) { + let mut cors_allowed_hosts = String::new(); + let cors_filter = if origins.is_empty() { + None + } else if origins == "*" { + cors_allowed_hosts = origins.into(); + Some( + warp::cors() + .allow_any_origin() + .allow_methods(vec!["GET", "HEAD", "OPTIONS"]), + ) + } else { + cors_allowed_hosts = origins.into(); + let hosts = cors_allowed_hosts + .split(',') + .map(|s| s.trim().as_ref()) + .collect::>(); + + if hosts.is_empty() { + cors_allowed_hosts = hosts.into_iter().collect::>().join(", "); + None + } else { + Some( + warp::cors() + .allow_origins(hosts) + .allow_methods(vec!["GET", "HEAD", "OPTIONS"]), + ) + } + }; + + (cors_filter, cors_allowed_hosts) +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..83512fa --- /dev/null +++ b/src/error.rs @@ -0,0 +1,10 @@ +/// Just a `anyhow::Result` type alias. +pub type Result = anyhow::Result; + +/// Just an `anyhow::Error` type alias. +pub type Error = anyhow::Error; + +/// Just re-export some `anyhow` stuff. +pub use anyhow::anyhow; +pub use anyhow::bail; +pub use anyhow::Context; diff --git a/src/helpers.rs b/src/helpers.rs new file mode 100644 index 0000000..c9a04cb --- /dev/null +++ b/src/helpers.rs @@ -0,0 +1,37 @@ +use std::fs; +use std::path::{Path, PathBuf}; + +use super::Result; + +/// Validate and return a directory path. +pub fn get_valid_dirpath>(path: P) -> Result +where + PathBuf: From

, +{ + match PathBuf::from(path) { + v if !v.exists() => bail!("path \"{:?}\" was not found or inaccessible", &v), + v if !v.is_dir() => bail!("path \"{:?}\" is not a valid directory", &v), + v => Ok(v), + } +} + +/// Get the directory name of a valid directory path. +pub fn get_dirname>(path: P) -> Result +where + PathBuf: From

, +{ + let path = get_valid_dirpath(path)?; + match path.iter().last() { + Some(v) => Ok(v.to_str().unwrap().to_string()), + _ => bail!("directory name for path \"{:?}\" was not determined", path), + } +} + +// Read the entire contents of a file into a string if it's valid or empty otherwise. +pub fn read_file_content(p: &str) -> String { + if !p.is_empty() && Path::new(p).exists() { + return fs::read_to_string(p).unwrap_or_default(); + } + + String::new() +} diff --git a/src/lib.rs b/src/lib.rs index 2cf9f77..5e945d9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,16 @@ #[macro_use] extern crate anyhow; -pub mod core; +pub mod cache; +pub mod config; +pub mod cors; +pub mod helpers; +pub mod logger; +pub mod rejection; +pub mod server; +pub mod signals; + +#[macro_use] +pub mod error; + +pub use error::*; diff --git a/src/logger.rs b/src/logger.rs new file mode 100644 index 0000000..759dd4f --- /dev/null +++ b/src/logger.rs @@ -0,0 +1,17 @@ +use tracing::Level; +use tracing_subscriber::fmt::format::FmtSpan; + +use super::Result; + +/// Initialize logging builder with its levels. +pub fn init(level: &str) -> Result { + let level = level.parse::()?; + match tracing_subscriber::fmt() + .with_max_level(level) + .with_span_events(FmtSpan::CLOSE) + .try_init() + { + Err(err) => Err(anyhow!(err)), + _ => Ok(()), + } +} diff --git a/src/rejection.rs b/src/rejection.rs new file mode 100644 index 0000000..ece347c --- /dev/null +++ b/src/rejection.rs @@ -0,0 +1,40 @@ +use anyhow::Result; +use std::convert::Infallible; +use warp::http::StatusCode; +use warp::{Rejection, Reply}; + +// It receives a `Rejection` and tries to return a HTML error reply. +pub async fn handle_rejection( + page_404: String, + page_50x: String, + err: Rejection, +) -> Result { + let mut content = String::new(); + let code = if err.is_not_found() { + content = page_404; + StatusCode::NOT_FOUND + } else if err + .find::() + .is_some() + { + StatusCode::BAD_REQUEST + } else if err.find::().is_some() { + StatusCode::METHOD_NOT_ALLOWED + } else if err.find::().is_some() { + StatusCode::FORBIDDEN + } else if err.find::().is_some() { + StatusCode::UNSUPPORTED_MEDIA_TYPE + } else { + content = page_50x; + StatusCode::INTERNAL_SERVER_ERROR + }; + + if content.is_empty() { + content = format!( + "{}

{}

", + code, code + ); + } + + Ok(warp::reply::with_status(warp::reply::html(content), code)) +} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..7d46685 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,216 @@ +use std::net::{IpAddr, SocketAddr}; +use warp::Filter; + +use crate::{cache, config, cors, helpers, logger, rejection, signals, Result}; + +/// Define a multi-thread HTTP/HTTPS web server. +pub struct Server { + opts: config::Config, + threads: usize, +} + +impl Server { + /// Create new multi-thread server instance. + pub fn new(opts: config::Config) -> Self { + let n = if opts.threads_multiplier == 0 { + 1 + } else { + opts.threads_multiplier + }; + let threads = num_cpus::get() * n; + Self { opts, threads } + } + + /// Build and run the `Server` forever on the current thread. + pub fn run(self) -> Result { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(self.threads) + .enable_all() + .build()? + .block_on(async { + let r = self.start_server().await; + if r.is_err() { + panic!("Server error: {:?}", r.unwrap_err()) + } + }); + + Ok(()) + } + + /// Run the inner `Warp` forever on the current thread. + async fn start_server(self) -> Result { + let opts = self.opts; + + logger::init(&opts.log_level)?; + + let ip = opts.host.parse::()?; + let addr = SocketAddr::from((ip, opts.port)); + + // Check a valid root directory + let root_dir = helpers::get_valid_dirpath(opts.root)?; + + // Custom error pages content + let page404 = helpers::read_file_content(opts.page404.as_ref()); + let page50x = helpers::read_file_content(opts.page50x.as_ref()); + let page404_a = page404.clone(); + let page50x_a = page50x.clone(); + + // CORS support + let (cors_filter, cors_allowed_origins) = + cors::get_opt_cors_filter(opts.cors_allow_origins.as_ref()); + + // Base fs directory filter + let base_dir_filter = warp::fs::dir(root_dir.clone()) + .map(cache::control_headers) + .with(warp::trace::request()) + .recover(move |rej| { + let page404_a = page404_a.clone(); + let page50x_a = page50x_a.clone(); + async move { rejection::handle_rejection(page404_a, page50x_a, rej).await } + }); + + // Public HEAD endpoint + let public_head = warp::head().and(base_dir_filter.clone()); + + // Public GET endpoint (default) + let public_get_default = warp::get().and(base_dir_filter.clone()); + + // Public GET/HEAD endpoints with compression (deflate, gzip, brotli, none) + match opts.compression.as_ref() { + "brotli" => tokio::task::spawn(async move { + let with_dir = warp::fs::dir(root_dir) + .map(cache::control_headers) + .with(warp::trace::request()) + .with(warp::compression::brotli(true)) + .recover(move |rej| { + let page404 = page404.clone(); + let page50x = page50x.clone(); + async move { rejection::handle_rejection(page404, page50x, rej).await } + }); + + if let Some(cors_filter) = cors_filter { + tracing::info!( + cors_enabled = ?true, + allowed_origins = ?cors_allowed_origins + ); + warp::serve( + public_head.with(cors_filter.clone()).or(warp::get() + .and(cache::has_accept_encoding("br")) + .and(with_dir) + .with(cors_filter.clone()) + .or(public_get_default.with(cors_filter))), + ) + .run(addr) + .await + } else { + warp::serve( + public_head.or(warp::get() + .and(cache::has_accept_encoding("br")) + .and(with_dir) + .or(public_get_default)), + ) + .run(addr) + .await + } + }), + "deflate" => tokio::task::spawn(async move { + let with_dir = warp::fs::dir(root_dir) + .map(cache::control_headers) + .with(warp::trace::request()) + .with(warp::compression::deflate(true)) + .recover(move |rej| { + let page404 = page404.clone(); + let page50x = page50x.clone(); + async move { rejection::handle_rejection(page404, page50x, rej).await } + }); + + if let Some(cors_filter) = cors_filter { + tracing::info!( + cors_enabled = ?true, + allowed_origins = ?cors_allowed_origins + ); + warp::serve( + public_head.with(cors_filter.clone()).or(warp::get() + .and(cache::has_accept_encoding("deflate")) + .and(with_dir) + .with(cors_filter.clone()) + .or(public_get_default.with(cors_filter))), + ) + .run(addr) + .await + } else { + warp::serve( + public_head.or(warp::get() + .and(cache::has_accept_encoding("deflate")) + .and(with_dir) + .or(public_get_default)), + ) + .run(addr) + .await + } + }), + "gzip" => tokio::task::spawn(async move { + let with_dir = warp::fs::dir(root_dir) + .map(cache::control_headers) + .with(warp::trace::request()) + .with(warp::compression::gzip(true)) + .recover(move |rej| { + let page404 = page404.clone(); + let page50x = page50x.clone(); + async move { rejection::handle_rejection(page404, page50x, rej).await } + }); + + if let Some(cors_filter) = cors_filter { + tracing::info!( + cors_enabled = ?true, + allowed_origins = ?cors_allowed_origins + ); + warp::serve( + public_head.with(cors_filter.clone()).or(warp::get() + .and(cache::has_accept_encoding("gzip")) + .and(with_dir) + .with(cors_filter.clone()) + .or(public_get_default.with(cors_filter))), + ) + .run(addr) + .await + } else { + warp::serve( + public_head.or(warp::get() + .and(cache::has_accept_encoding("gzip")) + .and(with_dir) + .or(public_get_default)), + ) + .run(addr) + .await + } + }), + _ => tokio::task::spawn(async move { + if let Some(cors_filter) = cors_filter { + tracing::info!( + cors_enabled = ?true, + allowed_origins = ?cors_allowed_origins + ); + let public_get_default = warp::get() + .and(base_dir_filter.clone()) + .with(cors_filter.clone()); + warp::serve(public_head.or(public_get_default.with(cors_filter))) + .run(addr) + .await + } else { + warp::serve(public_head.or(public_get_default)) + .run(addr) + .await + } + }), + }; + + signals::wait(|sig: signals::Signal| { + let code = signals::as_int(sig); + tracing::warn!("Signal {} caught. Server execution exited.", code); + std::process::exit(code) + }); + + Ok(()) + } +} diff --git a/src/signals.rs b/src/signals.rs new file mode 100644 index 0000000..6947b11 --- /dev/null +++ b/src/signals.rs @@ -0,0 +1,51 @@ +use nix::errno::Errno; +use nix::libc::c_int; +use nix::sys::signal::{SIGCHLD, SIGINT, SIGTERM}; +use nix::sys::wait::WaitStatus::{Exited, Signaled, StillAlive}; +use nix::sys::wait::{waitpid, WaitPidFlag}; +use nix::Error; + +pub use signal::Signal; + +/// It waits for an incoming Termination Signal like Ctrl+C (SIGINT), SIGTERM, etc +pub fn wait(func: F) +where + F: Fn(signal::Signal), +{ + let sig_trap = signal::trap::Trap::trap(&[SIGTERM, SIGINT, SIGCHLD]); + for sig in sig_trap { + match sig { + SIGCHLD => { + // Current std::process::Command ip does not have a way to find + // process id, so we just wait until we have no children + loop { + match waitpid(None, Some(WaitPidFlag::WNOHANG)) { + Ok(Exited(pid, status)) => { + println!("{} exited with status {}", pid, status); + continue; + } + Ok(Signaled(pid, sig, _)) => { + println!("{} killed by {}", pid, sig as c_int); + continue; + } + Ok(StillAlive) => break, + Ok(status) => { + println!("Temporary status {:?}", status); + continue; + } + Err(Error::Sys(Errno::ECHILD)) => return, + Err(e) => { + panic!("Error {:?}", e); + } + } + } + } + sig => func(sig), + } + } +} + +/// It casts a given `signal::Signal` to `i32`. +pub fn as_int(sig: signal::Signal) -> i32 { + sig as c_int +} -- libgit2 1.7.2