refactor: core modules
Diff
src/bin/server.rs | 198 +-----------------------------------------------
src/cache.rs | 45 +++++++++++-
src/config.rs | 63 +++++++++++++++-
src/core/cache.rs | 45 +-----------
src/core/config.rs | 63 +---------------
src/core/cors.rs | 36 +---------
src/core/helpers.rs | 37 +---------
src/core/logger.rs | 17 +----
src/core/mod.rs | 12 +---
src/core/rejection.rs | 38 +---------
src/core/result.rs | 4 +-
src/core/signals.rs | 51 +------------
src/cors.rs | 36 +++++++++-
src/error.rs | 10 ++-
src/helpers.rs | 37 +++++++++-
src/lib.rs | 14 ++-
src/logger.rs | 17 ++++-
src/rejection.rs | 40 +++++++++-
src/server.rs | 216 +++++++++++++++++++++++++++++++++++++++++++++++++++-
src/signals.rs | 51 ++++++++++++-
20 files changed, 530 insertions(+), 500 deletions(-)
@@ -6,205 +6,11 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
extern crate static_web_server;
use self::static_web_server::*;
use structopt::StructOpt;
use warp::Filter;
use self::static_web_server::core::*;
async fn server(opts: config::Options) -> Result {
logger::init(&opts.log_level)?;
let host = opts.host.parse::<std::net::IpAddr>()?;
let port = opts.port;
let root_dir = helpers::get_valid_dirpath(opts.root)?;
let page404 = helpers::read_file_content(opts.page404.as_ref());
let page50x = helpers::read_file_content(opts.page50x.as_ref());
let page404_a = page404.clone();
let page50x_a = page50x.clone();
let (cors_filter, cors_allowed_origins) =
cors::get_opt_cors_filter(opts.cors_allow_origins.as_ref());
let base_dir_filter = warp::fs::dir(root_dir.clone())
.map(cache::control_headers)
.with(warp::trace::request())
.recover(move |rej| {
let page404_a = page404_a.clone();
let page50x_a = page50x_a.clone();
async move { rejection::handle_rejection(page404_a, page50x_a, rej).await }
});
let public_head = warp::head().and(base_dir_filter.clone());
let public_get_default = warp::get().and(base_dir_filter.clone());
match opts.compression.as_ref() {
"brotli" => tokio::task::spawn(async move {
let with_dir = warp::fs::dir(root_dir)
.map(cache::control_headers)
.with(warp::trace::request())
.with(warp::compression::brotli(true))
.recover(move |rej| {
let page404 = page404.clone();
let page50x = page50x.clone();
async move { rejection::handle_rejection(page404, page50x, rej).await }
});
if let Some(cors_filter) = cors_filter {
tracing::info!(
cors_enabled = ?true,
allowed_origins = ?cors_allowed_origins
);
warp::serve(
public_head.with(cors_filter.clone()).or(warp::get()
.and(cache::has_accept_encoding("br"))
.and(with_dir)
.with(cors_filter.clone())
.or(public_get_default.with(cors_filter))),
)
.run((host, port))
.await
} else {
warp::serve(
public_head.or(warp::get()
.and(cache::has_accept_encoding("br"))
.and(with_dir)
.or(public_get_default)),
)
.run((host, port))
.await
}
}),
"deflate" => tokio::task::spawn(async move {
let with_dir = warp::fs::dir(root_dir)
.map(cache::control_headers)
.with(warp::trace::request())
.with(warp::compression::deflate(true))
.recover(move |rej| {
let page404 = page404.clone();
let page50x = page50x.clone();
async move { rejection::handle_rejection(page404, page50x, rej).await }
});
if let Some(cors_filter) = cors_filter {
tracing::info!(
cors_enabled = ?true,
allowed_origins = ?cors_allowed_origins
);
warp::serve(
public_head.with(cors_filter.clone()).or(warp::get()
.and(cache::has_accept_encoding("deflate"))
.and(with_dir)
.with(cors_filter.clone())
.or(public_get_default.with(cors_filter))),
)
.run((host, port))
.await
} else {
warp::serve(
public_head.or(warp::get()
.and(cache::has_accept_encoding("deflate"))
.and(with_dir)
.or(public_get_default)),
)
.run((host, port))
.await
}
}),
"gzip" => tokio::task::spawn(async move {
let with_dir = warp::fs::dir(root_dir)
.map(cache::control_headers)
.with(warp::trace::request())
.with(warp::compression::gzip(true))
.recover(move |rej| {
let page404 = page404.clone();
let page50x = page50x.clone();
async move { rejection::handle_rejection(page404, page50x, rej).await }
});
if let Some(cors_filter) = cors_filter {
tracing::info!(
cors_enabled = ?true,
allowed_origins = ?cors_allowed_origins
);
warp::serve(
public_head.with(cors_filter.clone()).or(warp::get()
.and(cache::has_accept_encoding("gzip"))
.and(with_dir)
.with(cors_filter.clone())
.or(public_get_default.with(cors_filter))),
)
.run((host, port))
.await
} else {
warp::serve(
public_head.or(warp::get()
.and(cache::has_accept_encoding("gzip"))
.and(with_dir)
.or(public_get_default)),
)
.run((host, port))
.await
}
}),
_ => tokio::task::spawn(async move {
if let Some(cors_filter) = cors_filter {
tracing::info!(
cors_enabled = ?true,
allowed_origins = ?cors_allowed_origins
);
let public_get_default = warp::get()
.and(base_dir_filter.clone())
.with(cors_filter.clone());
warp::serve(public_head.or(public_get_default.with(cors_filter)))
.run((host, port))
.await
} else {
warp::serve(public_head.or(public_get_default))
.run((host, port))
.await
}
}),
};
signals::wait(|sig: signals::Signal| {
let code = signals::as_int(sig);
tracing::warn!("Signal {} caught. Server execution exited.", code);
std::process::exit(code)
});
Ok(())
}
fn main() -> Result {
let opts = config::Options::from_args();
let n = if opts.threads_multiplier == 0 {
1
} else {
opts.threads_multiplier
};
let threads = num_cpus::get() * n;
tokio::runtime::Builder::new_multi_thread()
.worker_threads(threads)
.enable_all()
.build()?
.block_on(async {
let r = server(opts).await;
if r.is_err() {
panic!("Server error: {:?}", r.unwrap_err())
}
});
server::Server::new(config::Config::from_args()).run()?;
Ok(())
}
@@ -0,0 +1,45 @@
const CACHE_EXT_ONE_HOUR: [&str; 4] = ["atom", "json", "rss", "xml"];
const CACHE_EXT_ONE_YEAR: [&str; 30] = [
"bmp", "bz2", "css", "map", "doc", "gif", "gz", "htc", "ico", "jpg", "mp3", "mp4", "ogg",
"ogv", "pdf", "png", "rar", "tar", "tgz", "wav", "weba", "webm", "webp", "woff", "zip", "jpeg",
"js", "mjs", "rtf", "woff2",
];
pub fn control_headers(res: warp::fs::File) -> warp::reply::WithHeader<warp::fs::File> {
let mut max_age = 60 * 60 * 24_u64;
if let Some(ext) = res.path().extension() {
if let Some(ext) = ext.to_str() {
if CACHE_EXT_ONE_HOUR.iter().any(|x| *x == ext) {
max_age = 60 * 60;
} else if CACHE_EXT_ONE_YEAR.iter().any(|x| *x == ext) {
max_age = 60 * 60 * 24 * 365;
}
}
}
warp::reply::with_header(
res,
"cache-control",
[
"public, max-age=".to_string(),
duration(max_age).to_string(),
]
.concat(),
)
}
fn duration(n: u64) -> u32 {
std::cmp::min(n, u32::MAX as u64) as u32
}
pub fn has_accept_encoding(
val: &'static str,
) -> impl warp::Filter<Extract = (), Error = warp::Rejection> + Copy {
warp::header::contains("accept-encoding", val)
}
@@ -0,0 +1,63 @@
use structopt::StructOpt;
#[derive(Debug, StructOpt)]
pub struct Config {
#[structopt(long, short = "a", default_value = "::", env = "SERVER_HOST")]
pub host: String,
#[structopt(long, short = "p", default_value = "80", env = "SERVER_PORT")]
pub port: u16,
#[structopt(
long,
short = "n",
default_value = "8",
env = "SERVER_THREADS_MULTIPLIER"
)]
pub threads_multiplier: usize,
#[structopt(long, short = "d", default_value = "./public", env = "SERVER_ROOT")]
pub root: String,
#[structopt(
long,
default_value = "./public/50x.html",
env = "SERVER_ERROR_PAGE_50X"
)]
pub page50x: String,
#[structopt(
long,
default_value = "./public/404.html",
env = "SERVER_ERROR_PAGE_404"
)]
pub page404: String,
#[structopt(long, short = "x", default_value = "gzip", env = "SERVER_COMPRESSION")]
pub compression: String,
#[structopt(long, short = "g", default_value = "error", env = "SERVER_LOG_LEVEL")]
pub log_level: String,
#[structopt(
long,
short = "c",
default_value = "",
env = "SERVER_CORS_ALLOW_ORIGINS"
)]
pub cors_allow_origins: String,
}
@@ -1,45 +0,0 @@
const CACHE_EXT_ONE_HOUR: [&str; 4] = ["atom", "json", "rss", "xml"];
const CACHE_EXT_ONE_YEAR: [&str; 30] = [
"bmp", "bz2", "css", "map", "doc", "gif", "gz", "htc", "ico", "jpg", "mp3", "mp4", "ogg",
"ogv", "pdf", "png", "rar", "tar", "tgz", "wav", "weba", "webm", "webp", "woff", "zip", "jpeg",
"js", "mjs", "rtf", "woff2",
];
pub fn control_headers(res: warp::fs::File) -> warp::reply::WithHeader<warp::fs::File> {
let mut max_age = 60 * 60 * 24_u64;
if let Some(ext) = res.path().extension() {
if let Some(ext) = ext.to_str() {
if CACHE_EXT_ONE_HOUR.iter().any(|x| *x == ext) {
max_age = 60 * 60;
} else if CACHE_EXT_ONE_YEAR.iter().any(|x| *x == ext) {
max_age = 60 * 60 * 24 * 365;
}
}
}
warp::reply::with_header(
res,
"cache-control",
[
"public, max-age=".to_string(),
duration(max_age).to_string(),
]
.concat(),
)
}
fn duration(n: u64) -> u32 {
std::cmp::min(n, u32::MAX as u64) as u32
}
pub fn has_accept_encoding(
val: &'static str,
) -> impl warp::Filter<Extract = (), Error = warp::Rejection> + Copy {
warp::header::contains("accept-encoding", val)
}
@@ -1,63 +0,0 @@
use structopt::StructOpt;
#[derive(Debug, StructOpt)]
pub struct Options {
#[structopt(long, short = "a", default_value = "::", env = "SERVER_HOST")]
pub host: String,
#[structopt(long, short = "p", default_value = "80", env = "SERVER_PORT")]
pub port: u16,
#[structopt(
long,
short = "n",
default_value = "8",
env = "SERVER_THREADS_MULTIPLIER"
)]
pub threads_multiplier: usize,
#[structopt(long, short = "d", default_value = "./public", env = "SERVER_ROOT")]
pub root: String,
#[structopt(
long,
default_value = "./public/50x.html",
env = "SERVER_ERROR_PAGE_50X"
)]
pub page50x: String,
#[structopt(
long,
default_value = "./public/404.html",
env = "SERVER_ERROR_PAGE_404"
)]
pub page404: String,
#[structopt(long, short = "x", default_value = "gzip", env = "SERVER_COMPRESSION")]
pub compression: String,
#[structopt(long, short = "g", default_value = "error", env = "SERVER_LOG_LEVEL")]
pub log_level: String,
#[structopt(
long,
short = "c",
default_value = "",
env = "SERVER_CORS_ALLOW_ORIGINS"
)]
pub cors_allow_origins: String,
}
@@ -1,36 +0,0 @@
use std::collections::HashSet;
use warp::filters::cors::Builder;
pub fn get_opt_cors_filter(origins: &str) -> (Option<Builder>, String) {
let mut cors_allowed_hosts = String::new();
let cors_filter = if origins.is_empty() {
None
} else if origins == "*" {
cors_allowed_hosts = origins.into();
Some(
warp::cors()
.allow_any_origin()
.allow_methods(vec!["GET", "HEAD", "OPTIONS"]),
)
} else {
cors_allowed_hosts = origins.into();
let hosts = cors_allowed_hosts
.split(',')
.map(|s| s.trim().as_ref())
.collect::<HashSet<_>>();
if hosts.is_empty() {
cors_allowed_hosts = hosts.into_iter().collect::<Vec<&str>>().join(", ");
None
} else {
Some(
warp::cors()
.allow_origins(hosts)
.allow_methods(vec!["GET", "HEAD", "OPTIONS"]),
)
}
};
(cors_filter, cors_allowed_hosts)
}
@@ -1,37 +0,0 @@
use std::fs;
use std::path::{Path, PathBuf};
use super::Result;
pub fn get_valid_dirpath<P: AsRef<Path>>(path: P) -> Result<PathBuf>
where
PathBuf: From<P>,
{
match PathBuf::from(path) {
v if !v.exists() => bail!("path \"{:?}\" was not found or inaccessible", &v),
v if !v.is_dir() => bail!("path \"{:?}\" is not a valid directory", &v),
v => Ok(v),
}
}
pub fn get_dirname<P: AsRef<Path>>(path: P) -> Result<String>
where
PathBuf: From<P>,
{
let path = get_valid_dirpath(path)?;
match path.iter().last() {
Some(v) => Ok(v.to_str().unwrap().to_string()),
_ => bail!("directory name for path \"{:?}\" was not determined", path),
}
}
pub fn read_file_content(p: &str) -> String {
if !p.is_empty() && Path::new(p).exists() {
return fs::read_to_string(p).unwrap_or_default();
}
String::new()
}
@@ -1,17 +0,0 @@
use tracing::Level;
use tracing_subscriber::fmt::format::FmtSpan;
use super::Result;
pub fn init(level: &str) -> Result {
let level = level.parse::<Level>()?;
match tracing_subscriber::fmt()
.with_max_level(level)
.with_span_events(FmtSpan::CLOSE)
.try_init()
{
Err(err) => Err(anyhow!(err)),
_ => Ok(()),
}
}
@@ -1,12 +0,0 @@
pub mod cache;
pub mod config;
pub mod cors;
pub mod helpers;
pub mod logger;
pub mod rejection;
pub mod signals;
#[macro_use]
pub mod result;
pub use result::*;
@@ -1,38 +0,0 @@
use anyhow::Result;
use std::convert::Infallible;
use warp::http::StatusCode;
use warp::{Rejection, Reply};
pub async fn handle_rejection(
page_404: String,
page_50x: String,
err: Rejection,
) -> Result<impl Reply, Infallible> {
let mut content = String::new();
let code = if err.is_not_found() {
content = page_404;
StatusCode::NOT_FOUND
} else if err
.find::<warp::filters::body::BodyDeserializeError>()
.is_some()
{
StatusCode::BAD_REQUEST
} else if err.find::<warp::reject::MethodNotAllowed>().is_some() {
StatusCode::METHOD_NOT_ALLOWED
} else if err.find::<warp::reject::UnsupportedMediaType>().is_some() {
StatusCode::UNSUPPORTED_MEDIA_TYPE
} else {
content = page_50x;
StatusCode::INTERNAL_SERVER_ERROR
};
if content.is_empty() {
content = format!(
"<html><head><title>{}</title></head><body><center><h1>{}</h1></center></body></html>",
code, code
);
}
Ok(warp::reply::with_status(warp::reply::html(content), code))
}
@@ -1,4 +0,0 @@
pub type Result<T = ()> = anyhow::Result<T, anyhow::Error>;
pub use anyhow::Context;
@@ -1,51 +0,0 @@
use nix::errno::Errno;
use nix::libc::c_int;
use nix::sys::signal::{SIGCHLD, SIGINT, SIGTERM};
use nix::sys::wait::WaitStatus::{Exited, Signaled, StillAlive};
use nix::sys::wait::{waitpid, WaitPidFlag};
use nix::Error;
pub use signal::Signal;
pub fn wait<F>(func: F)
where
F: Fn(signal::Signal),
{
let sig_trap = signal::trap::Trap::trap(&[SIGTERM, SIGINT, SIGCHLD]);
for sig in sig_trap {
match sig {
SIGCHLD => {
loop {
match waitpid(None, Some(WaitPidFlag::WNOHANG)) {
Ok(Exited(pid, status)) => {
println!("{} exited with status {}", pid, status);
continue;
}
Ok(Signaled(pid, sig, _)) => {
println!("{} killed by {}", pid, sig as c_int);
continue;
}
Ok(StillAlive) => break,
Ok(status) => {
println!("Temporary status {:?}", status);
continue;
}
Err(Error::Sys(Errno::ECHILD)) => return,
Err(e) => {
panic!("Error {:?}", e);
}
}
}
}
sig => func(sig),
}
}
}
pub fn as_int(sig: signal::Signal) -> i32 {
sig as c_int
}
@@ -0,0 +1,36 @@
use std::collections::HashSet;
use warp::filters::cors::Builder;
pub fn get_opt_cors_filter(origins: &str) -> (Option<Builder>, String) {
let mut cors_allowed_hosts = String::new();
let cors_filter = if origins.is_empty() {
None
} else if origins == "*" {
cors_allowed_hosts = origins.into();
Some(
warp::cors()
.allow_any_origin()
.allow_methods(vec!["GET", "HEAD", "OPTIONS"]),
)
} else {
cors_allowed_hosts = origins.into();
let hosts = cors_allowed_hosts
.split(',')
.map(|s| s.trim().as_ref())
.collect::<HashSet<_>>();
if hosts.is_empty() {
cors_allowed_hosts = hosts.into_iter().collect::<Vec<&str>>().join(", ");
None
} else {
Some(
warp::cors()
.allow_origins(hosts)
.allow_methods(vec!["GET", "HEAD", "OPTIONS"]),
)
}
};
(cors_filter, cors_allowed_hosts)
}
@@ -0,0 +1,10 @@
pub type Result<T = (), E = anyhow::Error> = anyhow::Result<T, E>;
pub type Error = anyhow::Error;
pub use anyhow::anyhow;
pub use anyhow::bail;
pub use anyhow::Context;
@@ -0,0 +1,37 @@
use std::fs;
use std::path::{Path, PathBuf};
use super::Result;
pub fn get_valid_dirpath<P: AsRef<Path>>(path: P) -> Result<PathBuf>
where
PathBuf: From<P>,
{
match PathBuf::from(path) {
v if !v.exists() => bail!("path \"{:?}\" was not found or inaccessible", &v),
v if !v.is_dir() => bail!("path \"{:?}\" is not a valid directory", &v),
v => Ok(v),
}
}
pub fn get_dirname<P: AsRef<Path>>(path: P) -> Result<String>
where
PathBuf: From<P>,
{
let path = get_valid_dirpath(path)?;
match path.iter().last() {
Some(v) => Ok(v.to_str().unwrap().to_string()),
_ => bail!("directory name for path \"{:?}\" was not determined", path),
}
}
pub fn read_file_content(p: &str) -> String {
if !p.is_empty() && Path::new(p).exists() {
return fs::read_to_string(p).unwrap_or_default();
}
String::new()
}
@@ -1,4 +1,16 @@
#[macro_use]
extern crate anyhow;
pub mod core;
pub mod cache;
pub mod config;
pub mod cors;
pub mod helpers;
pub mod logger;
pub mod rejection;
pub mod server;
pub mod signals;
#[macro_use]
pub mod error;
pub use error::*;
@@ -0,0 +1,17 @@
use tracing::Level;
use tracing_subscriber::fmt::format::FmtSpan;
use super::Result;
pub fn init(level: &str) -> Result {
let level = level.parse::<Level>()?;
match tracing_subscriber::fmt()
.with_max_level(level)
.with_span_events(FmtSpan::CLOSE)
.try_init()
{
Err(err) => Err(anyhow!(err)),
_ => Ok(()),
}
}
@@ -0,0 +1,40 @@
use anyhow::Result;
use std::convert::Infallible;
use warp::http::StatusCode;
use warp::{Rejection, Reply};
pub async fn handle_rejection(
page_404: String,
page_50x: String,
err: Rejection,
) -> Result<impl Reply, Infallible> {
let mut content = String::new();
let code = if err.is_not_found() {
content = page_404;
StatusCode::NOT_FOUND
} else if err
.find::<warp::filters::body::BodyDeserializeError>()
.is_some()
{
StatusCode::BAD_REQUEST
} else if err.find::<warp::reject::MethodNotAllowed>().is_some() {
StatusCode::METHOD_NOT_ALLOWED
} else if err.find::<warp::filters::cors::CorsForbidden>().is_some() {
StatusCode::FORBIDDEN
} else if err.find::<warp::reject::UnsupportedMediaType>().is_some() {
StatusCode::UNSUPPORTED_MEDIA_TYPE
} else {
content = page_50x;
StatusCode::INTERNAL_SERVER_ERROR
};
if content.is_empty() {
content = format!(
"<html><head><title>{}</title></head><body><center><h1>{}</h1></center></body></html>",
code, code
);
}
Ok(warp::reply::with_status(warp::reply::html(content), code))
}
@@ -0,0 +1,216 @@
use std::net::{IpAddr, SocketAddr};
use warp::Filter;
use crate::{cache, config, cors, helpers, logger, rejection, signals, Result};
pub struct Server {
opts: config::Config,
threads: usize,
}
impl Server {
pub fn new(opts: config::Config) -> Self {
let n = if opts.threads_multiplier == 0 {
1
} else {
opts.threads_multiplier
};
let threads = num_cpus::get() * n;
Self { opts, threads }
}
pub fn run(self) -> Result {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(self.threads)
.enable_all()
.build()?
.block_on(async {
let r = self.start_server().await;
if r.is_err() {
panic!("Server error: {:?}", r.unwrap_err())
}
});
Ok(())
}
async fn start_server(self) -> Result {
let opts = self.opts;
logger::init(&opts.log_level)?;
let ip = opts.host.parse::<IpAddr>()?;
let addr = SocketAddr::from((ip, opts.port));
let root_dir = helpers::get_valid_dirpath(opts.root)?;
let page404 = helpers::read_file_content(opts.page404.as_ref());
let page50x = helpers::read_file_content(opts.page50x.as_ref());
let page404_a = page404.clone();
let page50x_a = page50x.clone();
let (cors_filter, cors_allowed_origins) =
cors::get_opt_cors_filter(opts.cors_allow_origins.as_ref());
let base_dir_filter = warp::fs::dir(root_dir.clone())
.map(cache::control_headers)
.with(warp::trace::request())
.recover(move |rej| {
let page404_a = page404_a.clone();
let page50x_a = page50x_a.clone();
async move { rejection::handle_rejection(page404_a, page50x_a, rej).await }
});
let public_head = warp::head().and(base_dir_filter.clone());
let public_get_default = warp::get().and(base_dir_filter.clone());
match opts.compression.as_ref() {
"brotli" => tokio::task::spawn(async move {
let with_dir = warp::fs::dir(root_dir)
.map(cache::control_headers)
.with(warp::trace::request())
.with(warp::compression::brotli(true))
.recover(move |rej| {
let page404 = page404.clone();
let page50x = page50x.clone();
async move { rejection::handle_rejection(page404, page50x, rej).await }
});
if let Some(cors_filter) = cors_filter {
tracing::info!(
cors_enabled = ?true,
allowed_origins = ?cors_allowed_origins
);
warp::serve(
public_head.with(cors_filter.clone()).or(warp::get()
.and(cache::has_accept_encoding("br"))
.and(with_dir)
.with(cors_filter.clone())
.or(public_get_default.with(cors_filter))),
)
.run(addr)
.await
} else {
warp::serve(
public_head.or(warp::get()
.and(cache::has_accept_encoding("br"))
.and(with_dir)
.or(public_get_default)),
)
.run(addr)
.await
}
}),
"deflate" => tokio::task::spawn(async move {
let with_dir = warp::fs::dir(root_dir)
.map(cache::control_headers)
.with(warp::trace::request())
.with(warp::compression::deflate(true))
.recover(move |rej| {
let page404 = page404.clone();
let page50x = page50x.clone();
async move { rejection::handle_rejection(page404, page50x, rej).await }
});
if let Some(cors_filter) = cors_filter {
tracing::info!(
cors_enabled = ?true,
allowed_origins = ?cors_allowed_origins
);
warp::serve(
public_head.with(cors_filter.clone()).or(warp::get()
.and(cache::has_accept_encoding("deflate"))
.and(with_dir)
.with(cors_filter.clone())
.or(public_get_default.with(cors_filter))),
)
.run(addr)
.await
} else {
warp::serve(
public_head.or(warp::get()
.and(cache::has_accept_encoding("deflate"))
.and(with_dir)
.or(public_get_default)),
)
.run(addr)
.await
}
}),
"gzip" => tokio::task::spawn(async move {
let with_dir = warp::fs::dir(root_dir)
.map(cache::control_headers)
.with(warp::trace::request())
.with(warp::compression::gzip(true))
.recover(move |rej| {
let page404 = page404.clone();
let page50x = page50x.clone();
async move { rejection::handle_rejection(page404, page50x, rej).await }
});
if let Some(cors_filter) = cors_filter {
tracing::info!(
cors_enabled = ?true,
allowed_origins = ?cors_allowed_origins
);
warp::serve(
public_head.with(cors_filter.clone()).or(warp::get()
.and(cache::has_accept_encoding("gzip"))
.and(with_dir)
.with(cors_filter.clone())
.or(public_get_default.with(cors_filter))),
)
.run(addr)
.await
} else {
warp::serve(
public_head.or(warp::get()
.and(cache::has_accept_encoding("gzip"))
.and(with_dir)
.or(public_get_default)),
)
.run(addr)
.await
}
}),
_ => tokio::task::spawn(async move {
if let Some(cors_filter) = cors_filter {
tracing::info!(
cors_enabled = ?true,
allowed_origins = ?cors_allowed_origins
);
let public_get_default = warp::get()
.and(base_dir_filter.clone())
.with(cors_filter.clone());
warp::serve(public_head.or(public_get_default.with(cors_filter)))
.run(addr)
.await
} else {
warp::serve(public_head.or(public_get_default))
.run(addr)
.await
}
}),
};
signals::wait(|sig: signals::Signal| {
let code = signals::as_int(sig);
tracing::warn!("Signal {} caught. Server execution exited.", code);
std::process::exit(code)
});
Ok(())
}
}
@@ -0,0 +1,51 @@
use nix::errno::Errno;
use nix::libc::c_int;
use nix::sys::signal::{SIGCHLD, SIGINT, SIGTERM};
use nix::sys::wait::WaitStatus::{Exited, Signaled, StillAlive};
use nix::sys::wait::{waitpid, WaitPidFlag};
use nix::Error;
pub use signal::Signal;
pub fn wait<F>(func: F)
where
F: Fn(signal::Signal),
{
let sig_trap = signal::trap::Trap::trap(&[SIGTERM, SIGINT, SIGCHLD]);
for sig in sig_trap {
match sig {
SIGCHLD => {
loop {
match waitpid(None, Some(WaitPidFlag::WNOHANG)) {
Ok(Exited(pid, status)) => {
println!("{} exited with status {}", pid, status);
continue;
}
Ok(Signaled(pid, sig, _)) => {
println!("{} killed by {}", pid, sig as c_int);
continue;
}
Ok(StillAlive) => break,
Ok(status) => {
println!("Temporary status {:?}", status);
continue;
}
Err(Error::Sys(Errno::ECHILD)) => return,
Err(e) => {
panic!("Error {:?}", e);
}
}
}
}
sig => func(sig),
}
}
}
pub fn as_int(sig: signal::Signal) -> i32 {
sig as c_int
}