refactor: tokio server tasks independently
Diff
src/server.rs | 433 +++++++++++++++++++++++++++++++++++------------------------
1 file changed, 264 insertions(+), 169 deletions(-)
@@ -1,11 +1,12 @@
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use structopt::StructOpt;
use warp::Filter;
use crate::config::{Config, CONFIG};
use crate::{cache, cors, filters, helpers, logger, rejection, signals, Result};
pub struct Server {
threads: usize,
}
@@ -24,7 +25,7 @@ impl Server {
Self { threads }
}
pub fn run(self) -> Result {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
@@ -32,7 +33,7 @@ impl Server {
.worker_threads(self.threads)
.build()?
.block_on(async {
let r = self.start_server().await;
let r = self.run_server_with_config().await;
if r.is_err() {
panic!("Server error during start up: {:?}", r.unwrap_err())
}
@@ -41,8 +42,8 @@ impl Server {
Ok(())
}
async fn start_server(self) -> Result {
async fn run_server_with_config(self) -> Result {
let opts = Config::global();
logger::init(&opts.log_level)?;
@@ -68,176 +69,59 @@ impl Server {
let (cors_filter_opt, cors_allowed_origins) =
cors::get_opt_cors_filter(opts.cors_allow_origins.as_ref());
let base_fs_dir_filter = warp::fs::dir(root_dir.clone())
.map(cache::control_headers)
.with(warp::trace::request())
.recover(rejection::handle_rejection);
let public_head = warp::head().and(base_fs_dir_filter.clone());
let public_get_default = warp::get().and(base_fs_dir_filter);
let http2 = opts.http2;
let http2_tls_cert_path = &opts.http2_tls_cert;
let http2_tls_key_path = &opts.http2_tls_key;
match opts.compression.as_ref() {
"brotli" => tokio::task::spawn(async move {
let fs_dir_filter = warp::fs::dir(root_dir)
.map(cache::control_headers)
.with(warp::compression::brotli(true))
.with(warp::trace::request())
.recover(rejection::handle_rejection);
match cors_filter_opt {
Some(cors_filter) => {
tracing::info!(
cors_enabled = ?true,
allowed_origins = ?cors_allowed_origins
);
let public_head = public_head.with(cors_filter.clone());
let public_get_default = public_get_default.with(cors_filter.clone());
let public_get = warp::get()
.and(filters::has_accept_encoding("br"))
.and(fs_dir_filter)
.with(cors_filter.clone());
let server = warp::serve(public_head.or(public_get).or(public_get_default));
if http2 {
server
.tls()
.cert_path(http2_tls_cert_path)
.key_path(http2_tls_key_path)
.run(addr)
.await;
} else {
server.run(addr).await
}
}
None => {
let public_get = warp::get()
.and(filters::has_accept_encoding("br"))
.and(fs_dir_filter);
let server = warp::serve(public_head.or(public_get).or(public_get_default));
if http2 {
server
.tls()
.cert_path(http2_tls_cert_path)
.key_path(http2_tls_key_path)
.run(addr)
.await;
} else {
server.run(addr).await
}
}
}
}),
"gzip" => tokio::task::spawn(async move {
let fs_dir_filter = warp::fs::dir(root_dir)
.map(cache::control_headers)
.with(warp::compression::gzip(true))
.with(warp::trace::request())
.recover(rejection::handle_rejection);
match cors_filter_opt {
Some(cors_filter) => {
tracing::info!(
cors_enabled = ?true,
allowed_origins = ?cors_allowed_origins
);
let public_head = public_head.with(cors_filter.clone());
let public_get_default = public_get_default.with(cors_filter.clone());
let public_get = warp::get()
.and(filters::has_accept_encoding("gzip"))
.and(fs_dir_filter)
.with(cors_filter.clone());
let server = warp::serve(public_head.or(public_get).or(public_get_default));
if http2 {
server
.tls()
.cert_path(http2_tls_cert_path)
.key_path(http2_tls_key_path)
.run(addr)
.await;
} else {
server.run(addr).await
}
}
None => {
let public_get = warp::get()
.and(filters::has_accept_encoding("gzip"))
.and(fs_dir_filter);
let server = warp::serve(public_head.or(public_get).or(public_get_default));
if http2 {
server
.tls()
.cert_path(http2_tls_cert_path)
.key_path(http2_tls_key_path)
.run(addr)
.await;
} else {
server.run(addr).await
}
}
}
}),
_ => tokio::task::spawn(async move {
match cors_filter_opt {
Some(cors_filter) => {
tracing::info!(
cors_enabled = ?true,
allowed_origins = ?cors_allowed_origins
);
let public_get = public_get_default.with(cors_filter.clone());
let server = warp::serve(public_head.or(public_get));
if http2 {
server
.tls()
.cert_path(http2_tls_cert_path)
.key_path(http2_tls_key_path)
.run(addr)
.await;
} else {
server.run(addr).await
}
}
None => {
let server = warp::serve(public_head.or(public_get_default));
if http2 {
server
.tls()
.cert_path(http2_tls_cert_path)
.key_path(http2_tls_key_path)
.run(addr)
.await;
} else {
server.run(addr).await
}
}
}
}),
};
tokio::task::spawn(async move {
if opts.compression == "brotli" {
run_server_with_brotli_compression(
addr,
root_dir,
http2,
http2_tls_cert_path,
http2_tls_key_path,
cors_filter_opt,
cors_allowed_origins,
)
.await;
return;
}
if opts.compression == "gzip" {
run_server_with_gzip_compression(
addr,
root_dir,
http2,
http2_tls_cert_path,
http2_tls_key_path,
cors_filter_opt,
cors_allowed_origins,
)
.await;
return;
}
run_server_with_no_compression(
addr,
root_dir,
http2,
http2_tls_cert_path,
http2_tls_key_path,
cors_filter_opt,
cors_allowed_origins,
)
.await
});
signals::wait(|sig: signals::Signal| {
let code = signals::as_int(sig);
tracing::warn!("Signal {} caught. Server execution exited.", code);
@@ -248,6 +132,217 @@ impl Server {
}
}
pub async fn run_server_with_brotli_compression(
addr: SocketAddr,
root_dir: PathBuf,
http2: bool,
http2_tls_cert_path: &'static str,
http2_tls_key_path: &'static str,
cors_filter_opt: Option<warp::filters::cors::Builder>,
cors_allowed_origins: String,
) {
let base_fs_dir_filter = warp::fs::dir(root_dir.clone())
.map(cache::control_headers)
.with(warp::trace::request())
.recover(rejection::handle_rejection);
let public_head = warp::head().and(base_fs_dir_filter.clone());
let public_get_default = warp::get().and(base_fs_dir_filter);
let fs_dir_filter = warp::fs::dir(root_dir)
.map(cache::control_headers)
.with(warp::compression::brotli(true))
.with(warp::trace::request())
.recover(rejection::handle_rejection);
if let Some(cors_filter) = cors_filter_opt {
tracing::info!(
cors_enabled = ?true,
allowed_origins = ?cors_allowed_origins
);
let public_head = public_head.with(cors_filter.clone());
let public_get_default = public_get_default.with(cors_filter.clone());
let public_get = warp::get()
.and(filters::has_accept_encoding("br"))
.and(fs_dir_filter)
.with(cors_filter.clone());
let server = warp::serve(public_head.or(public_get).or(public_get_default));
if http2 {
server
.tls()
.cert_path(http2_tls_cert_path)
.key_path(http2_tls_key_path)
.run(addr)
.await
} else {
server.run(addr).await
}
} else {
let public_get = warp::get()
.and(filters::has_accept_encoding("br"))
.and(fs_dir_filter);
let server = warp::serve(public_head.or(public_get).or(public_get_default));
if http2 {
server
.tls()
.cert_path(http2_tls_cert_path)
.key_path(http2_tls_key_path)
.run(addr)
.await
} else {
server.run(addr).await
}
}
}
pub async fn run_server_with_gzip_compression(
addr: SocketAddr,
root_dir: PathBuf,
http2: bool,
http2_tls_cert_path: &'static str,
http2_tls_key_path: &'static str,
cors_filter_opt: Option<warp::filters::cors::Builder>,
cors_allowed_origins: String,
) {
let base_fs_dir_filter = warp::fs::dir(root_dir.clone())
.map(cache::control_headers)
.with(warp::trace::request())
.recover(rejection::handle_rejection);
let public_head = warp::head().and(base_fs_dir_filter.clone());
let public_get_default = warp::get().and(base_fs_dir_filter);
let fs_dir_filter = warp::fs::dir(root_dir)
.map(cache::control_headers)
.with(warp::compression::gzip(true))
.with(warp::trace::request())
.recover(rejection::handle_rejection);
if let Some(cors_filter) = cors_filter_opt {
tracing::info!(
cors_enabled = ?true,
allowed_origins = ?cors_allowed_origins
);
let public_head = public_head.with(cors_filter.clone());
let public_get_default = public_get_default.with(cors_filter.clone());
let public_get = warp::get()
.and(filters::has_accept_encoding("gzip"))
.and(fs_dir_filter)
.with(cors_filter.clone());
let server = warp::serve(public_head.or(public_get).or(public_get_default));
if http2 {
server
.tls()
.cert_path(http2_tls_cert_path)
.key_path(http2_tls_key_path)
.run(addr)
.await
} else {
server.run(addr).await
}
} else {
let public_get = warp::get()
.and(filters::has_accept_encoding("gzip"))
.and(fs_dir_filter);
let server = warp::serve(public_head.or(public_get).or(public_get_default));
if http2 {
server
.tls()
.cert_path(http2_tls_cert_path)
.key_path(http2_tls_key_path)
.run(addr)
.await
} else {
server.run(addr).await
}
}
}
pub async fn run_server_with_no_compression(
addr: SocketAddr,
root_dir: PathBuf,
http2: bool,
http2_tls_cert_path: &'static str,
http2_tls_key_path: &'static str,
cors_filter_opt: Option<warp::filters::cors::Builder>,
cors_allowed_origins: String,
) {
let base_fs_dir_filter = warp::fs::dir(root_dir.clone())
.map(cache::control_headers)
.with(warp::trace::request())
.recover(rejection::handle_rejection);
let public_head = warp::head().and(base_fs_dir_filter.clone());
let public_get_default = warp::get().and(base_fs_dir_filter);
if let Some(cors_filter) = cors_filter_opt {
tracing::info!(
cors_enabled = ?true,
allowed_origins = ?cors_allowed_origins
);
let public_get = public_get_default.with(cors_filter.clone());
let server = warp::serve(public_head.or(public_get));
if http2 {
server
.tls()
.cert_path(http2_tls_cert_path)
.key_path(http2_tls_key_path)
.run(addr)
.await
} else {
server.run(addr).await
}
} else {
let server = warp::serve(public_head.or(public_get_default));
if http2 {
server
.tls()
.cert_path(http2_tls_cert_path)
.key_path(http2_tls_key_path)
.run(addr)
.await
} else {
server.run(addr).await
}
}
}
impl Default for Server {
fn default() -> Self {
Self::new()