From afd6a87389ecb0bc06094f7045c6830901cff9be Mon Sep 17 00:00:00 2001 From: Jose Quintana <1700322+joseluisq@users.noreply.github.com> Date: Wed, 28 Feb 2024 18:53:00 +0100 Subject: [PATCH] feat: cancellation ability for `server::Server::run_server_on_rt` and `server::Server::run_standalone` functions (#319) It enhances the `server::Server::run_server_on_rt` and `server::Server::run_standalone` functions to support cancellation to shut down the server gracefully on demand as a complement to the termination signals handling in Linux/Unix systems. The `Server::run_standalone` function now accepts an optional `cancel` parameter to shut down the server gracefully on demand as a complement to the termination signals handling in Linux/Unix systems. Functions updated: ```rs /// NOTE: /// Now it will also handle server cancellation via the `cancel_recv` param in Linux/Unix systems /// similar to what Windows does. pub fn run_server_on_rt(self, cancel_recv: Option>, cancel_fn: F) -> Result /// NOTE: /// Now it also accepts a `cancel` param to shut down the server in Linux/Unix systems /// similar to what Windows does. pub fn run_standalone(self, cancel: Option>) -> Result ``` --- src/bin/server.rs | 2 +- src/server.rs | 67 ++++++++++++++++++++++++++++++++++++++++++++++--------------------- src/signals.rs | 52 ++++++++++++++++++++++++++++++++++++---------------- 3 files changed, 83 insertions(+), 38 deletions(-) diff --git a/src/bin/server.rs b/src/bin/server.rs index b1cc092..354ff66 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -37,7 +37,7 @@ fn main() -> Result { } // Run the server by default - static_web_server::Server::new(opts)?.run_standalone()?; + static_web_server::Server::new(opts)?.run_standalone(None)?; Ok(()) } diff --git a/src/server.rs b/src/server.rs index 333c8c9..a6b9674 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,14 +3,14 @@ // See https://static-web-server.net/ for more information // Copyright (C) 2019-present Jose Quintana -//! Server module intended to construct a multi-thread HTTP or HTTP/2 web server. +//! Server module intended to construct a multi-threaded HTTP or HTTP/2 web server. //! use hyper::server::Server as HyperServer; use listenfd::ListenFd; use std::net::{IpAddr, SocketAddr, TcpListener}; use std::sync::Arc; -use tokio::sync::watch::Receiver; +use tokio::sync::{watch::Receiver, Mutex}; use crate::handler::{RequestHandler, RequestHandlerOpts}; #[cfg(any(unix, windows))] @@ -27,7 +27,7 @@ use { use crate::{cors, helpers, Settings}; use crate::{service::RouterService, Context, Result}; -/// Define a multi-thread HTTP or HTTP/2 web server. +/// Define a multi-threaded HTTP or HTTP/2 web server. pub struct Server { opts: Settings, worker_threads: usize, @@ -35,7 +35,7 @@ pub struct Server { } impl Server { - /// Create new multi-thread server instance. + /// Create a new multi-threaded server instance. pub fn new(opts: Settings) -> Result { // Configure number of worker threads let cpus = num_cpus::get(); @@ -52,14 +52,25 @@ impl Server { }) } - /// Run the multi-thread `Server` as standalone. - /// It is a top-level function of [run_server_on_rt](#method.run_server_on_rt). - pub fn run_standalone(self) -> Result { - self.run_server_on_rt(None, || {}) + /// Run the multi-threaded `Server` as standalone. + /// This is a top-level function of [run_server_on_rt](#method.run_server_on_rt). + /// + /// It accepts an optional [`cancel`] parameter to shut down the server + /// gracefully on demand as a complement to the termination signals handling. + /// + /// [`cancel`]: + pub fn run_standalone(self, cancel: Option>) -> Result { + self.run_server_on_rt(cancel, || {}) } - /// Run the multi-thread `Server` which will be used by a Windows service. - /// It is a top-level function of [run_server_on_rt](#method.run_server_on_rt). + /// Run the multi-threaded `Server` which will be used by a Windows service. + /// This is a top-level function of [run_server_on_rt](#method.run_server_on_rt). + /// + /// It accepts an optional [`cancel`] parameter to shut down the server + /// gracefully on demand and an optional `cancel_fn` that will be executed + /// right after the server is shut down. + /// + /// [`cancel`]: #[cfg(windows)] pub fn run_as_service(self, cancel: Option>, cancel_fn: F) -> Result where @@ -68,12 +79,12 @@ impl Server { self.run_server_on_rt(cancel, cancel_fn) } - /// Build and run the multi-thread `Server` on the Tokio runtime. + /// Build and run the multi-threaded `Server` on the Tokio runtime. pub fn run_server_on_rt(self, cancel_recv: Option>, cancel_fn: F) -> Result where F: FnOnce(), { - tracing::debug!(%self.worker_threads, "initializing tokio runtime with multi-thread scheduler"); + tracing::debug!(%self.worker_threads, "initializing tokio runtime with multi-threaded scheduler"); tokio::runtime::Builder::new_multi_thread() .worker_threads(self.worker_threads) @@ -431,16 +442,24 @@ impl Server { HyperServer::builder(TlsAcceptor::new(tls, incoming)).serve(router_service); #[cfg(unix)] - let http2_server = http2_server - .with_graceful_shutdown(signals::wait_for_signals(signals, grace_period)); + let http2_cancel_recv = Arc::new(Mutex::new(_cancel_recv)); + #[cfg(unix)] + let redirect_cancel_recv = http2_cancel_recv.clone(); + + #[cfg(unix)] + let http2_server = http2_server.with_graceful_shutdown(signals::wait_for_signals( + signals, + grace_period, + http2_cancel_recv, + )); #[cfg(windows)] - let http2_cancel_recv = Arc::new(tokio::sync::Mutex::new(_cancel_recv)); + let http2_cancel_recv = Arc::new(Mutex::new(_cancel_recv)); #[cfg(windows)] let redirect_cancel_recv = http2_cancel_recv.clone(); #[cfg(windows)] - let http2_ctrlc_recv = Arc::new(tokio::sync::Mutex::new(Some(receiver))); + let http2_ctrlc_recv = Arc::new(Mutex::new(Some(receiver))); #[cfg(windows)] let redirect_ctrlc_recv = http2_ctrlc_recv.clone(); @@ -530,7 +549,7 @@ impl Server { #[cfg(unix)] let server_redirect = server_redirect.with_graceful_shutdown( - signals::wait_for_signals(redirect_signals, grace_period), + signals::wait_for_signals(redirect_signals, grace_period, redirect_cancel_recv), ); #[cfg(windows)] let server_redirect = server_redirect.with_graceful_shutdown(async move { @@ -599,13 +618,19 @@ impl Server { .serve(router_service); #[cfg(unix)] - let http1_server = - http1_server.with_graceful_shutdown(signals::wait_for_signals(signals, grace_period)); + let http1_cancel_recv = Arc::new(Mutex::new(_cancel_recv)); + + #[cfg(unix)] + let http1_server = http1_server.with_graceful_shutdown(signals::wait_for_signals( + signals, + grace_period, + http1_cancel_recv, + )); #[cfg(windows)] - let http1_cancel_recv = Arc::new(tokio::sync::Mutex::new(_cancel_recv)); + let http1_cancel_recv = Arc::new(Mutex::new(_cancel_recv)); #[cfg(windows)] - let http1_ctrlc_recv = Arc::new(tokio::sync::Mutex::new(Some(receiver))); + let http1_ctrlc_recv = Arc::new(Mutex::new(Some(receiver))); #[cfg(windows)] let http1_server = http1_server.with_graceful_shutdown(async move { diff --git a/src/signals.rs b/src/signals.rs index a78d198..74610d1 100644 --- a/src/signals.rs +++ b/src/signals.rs @@ -6,6 +6,8 @@ //! The module provides signals support like `SIGTERM`, `SIGINT` and `SIGQUIT`. //! +use std::sync::Arc; +use tokio::sync::{watch::Receiver, Mutex}; use tokio::time::{sleep, Duration}; #[cfg(unix)] @@ -14,9 +16,6 @@ use { signal_hook_tokio::Signals, }; -#[cfg(windows)] -use {std::sync::Arc, tokio::sync::watch::Receiver, tokio::sync::Mutex}; - #[cfg(unix)] #[cfg_attr(docsrs, doc(cfg(unix)))] /// It creates a common list of signals stream for `SIGTERM`, `SIGINT` and `SIGQUIT` to be observed. @@ -26,21 +25,42 @@ pub fn create_signals() -> Result { #[cfg(unix)] /// It waits for a specific type of incoming signals included `ctrl+c`. -pub async fn wait_for_signals(signals: Signals, grace_period_secs: u8) { - let mut signals = signals.fuse(); - while let Some(signal) = signals.next().await { - match signal { - SIGHUP => { - // NOTE: for now we don't do something for SIGHUPs - tracing::debug!("SIGHUP caught, nothing to do about") - } - SIGTERM | SIGINT | SIGQUIT => { - tracing::info!("SIGTERM, SIGINT or SIGQUIT signal caught"); - break; +pub async fn wait_for_signals( + signals: Signals, + grace_period_secs: u8, + cancel_recv: Arc>>>, +) { + let (first_tx, mut base_rx) = tokio::sync::mpsc::channel(1); + let last_tx = first_tx.clone(); + + tokio::spawn(async move { + let mut signals = signals.fuse(); + while let Some(signal) = signals.next().await { + match signal { + SIGHUP => { + // NOTE: for now we don't do something for SIGHUPs + tracing::debug!("SIGHUP caught, nothing to do about") + } + SIGTERM | SIGINT | SIGQUIT => { + tracing::info!("SIGTERM, SIGINT or SIGQUIT signal caught"); + first_tx.send(()).await.ok(); + break; + } + _ => unreachable!(), } - _ => unreachable!(), } - } + }); + + tokio::spawn(async move { + if let Some(recv) = &mut *cancel_recv.lock().await { + recv.changed().await.ok(); + last_tx.send(()).await.ok(); + tracing::info!("signals interrupted manually by cancel_recv"); + } + }); + + base_rx.recv().await.take(); + // NOTE: once loop above is done then an upstream graceful shutdown should come next. delay_graceful_shutdown(grace_period_secs).await; tracing::info!("delegating server's graceful shutdown"); -- libgit2 1.7.2