feat: cancellation ability for `server::Server::run_server_on_rt` and `server::Server::run_standalone` functions (#319)
It enhances the `server::Server::run_server_on_rt` and `server::Server::run_standalone` functions to support cancellation to shut down the server gracefully on demand as a complement to the termination signals handling in Linux/Unix systems.
The `Server::run_standalone` function now accepts an optional
`cancel` parameter to shut down the server gracefully on demand as
a complement to the termination signals handling in Linux/Unix systems.
Functions updated:
```rs
/// NOTE:
/// Now it will also handle server cancellation via the `cancel_recv` param in Linux/Unix systems
/// similar to what Windows does.
pub fn run_server_on_rt<F>(self, cancel_recv: Option<Receiver<()>>, cancel_fn: F) -> Result
/// NOTE:
/// Now it also accepts a `cancel` param to shut down the server in Linux/Unix systems
/// similar to what Windows does.
pub fn run_standalone(self, cancel: Option<Receiver<()>>) -> Result
```
Diff
src/bin/server.rs | 2 +-
src/server.rs | 67 ++++++++++++++++++++++++++++++++++++++------------------
src/signals.rs | 52 +++++++++++++++++++++++++++++--------------
3 files changed, 83 insertions(+), 38 deletions(-)
@@ -37,7 +37,7 @@ fn main() -> Result {
}
static_web_server::Server::new(opts)?.run_standalone()?;
static_web_server::Server::new(opts)?.run_standalone(None)?;
Ok(())
}
@@ -3,14 +3,14 @@
use hyper::server::Server as HyperServer;
use listenfd::ListenFd;
use std::net::{IpAddr, SocketAddr, TcpListener};
use std::sync::Arc;
use tokio::sync::watch::Receiver;
use tokio::sync::{watch::Receiver, Mutex};
use crate::handler::{RequestHandler, RequestHandlerOpts};
#[cfg(any(unix, windows))]
@@ -27,7 +27,7 @@ use {
use crate::{cors, helpers, Settings};
use crate::{service::RouterService, Context, Result};
pub struct Server {
opts: Settings,
worker_threads: usize,
@@ -35,7 +35,7 @@ pub struct Server {
}
impl Server {
pub fn new(opts: Settings) -> Result<Server> {
let cpus = num_cpus::get();
@@ -52,14 +52,25 @@ impl Server {
})
}
pub fn run_standalone(self) -> Result {
self.run_server_on_rt(None, || {})
pub fn run_standalone(self, cancel: Option<Receiver<()>>) -> Result {
self.run_server_on_rt(cancel, || {})
}
#[cfg(windows)]
pub fn run_as_service<F>(self, cancel: Option<Receiver<()>>, cancel_fn: F) -> Result
where
@@ -68,12 +79,12 @@ impl Server {
self.run_server_on_rt(cancel, cancel_fn)
}
pub fn run_server_on_rt<F>(self, cancel_recv: Option<Receiver<()>>, cancel_fn: F) -> Result
where
F: FnOnce(),
{
tracing::debug!(%self.worker_threads, "initializing tokio runtime with multi-thread scheduler");
tracing::debug!(%self.worker_threads, "initializing tokio runtime with multi-threaded scheduler");
tokio::runtime::Builder::new_multi_thread()
.worker_threads(self.worker_threads)
@@ -431,16 +442,24 @@ impl Server {
HyperServer::builder(TlsAcceptor::new(tls, incoming)).serve(router_service);
#[cfg(unix)]
let http2_server = http2_server
.with_graceful_shutdown(signals::wait_for_signals(signals, grace_period));
let http2_cancel_recv = Arc::new(Mutex::new(_cancel_recv));
#[cfg(unix)]
let redirect_cancel_recv = http2_cancel_recv.clone();
#[cfg(unix)]
let http2_server = http2_server.with_graceful_shutdown(signals::wait_for_signals(
signals,
grace_period,
http2_cancel_recv,
));
#[cfg(windows)]
let http2_cancel_recv = Arc::new(tokio::sync::Mutex::new(_cancel_recv));
let http2_cancel_recv = Arc::new(Mutex::new(_cancel_recv));
#[cfg(windows)]
let redirect_cancel_recv = http2_cancel_recv.clone();
#[cfg(windows)]
let http2_ctrlc_recv = Arc::new(tokio::sync::Mutex::new(Some(receiver)));
let http2_ctrlc_recv = Arc::new(Mutex::new(Some(receiver)));
#[cfg(windows)]
let redirect_ctrlc_recv = http2_ctrlc_recv.clone();
@@ -530,7 +549,7 @@ impl Server {
#[cfg(unix)]
let server_redirect = server_redirect.with_graceful_shutdown(
signals::wait_for_signals(redirect_signals, grace_period),
signals::wait_for_signals(redirect_signals, grace_period, redirect_cancel_recv),
);
#[cfg(windows)]
let server_redirect = server_redirect.with_graceful_shutdown(async move {
@@ -599,13 +618,19 @@ impl Server {
.serve(router_service);
#[cfg(unix)]
let http1_server =
http1_server.with_graceful_shutdown(signals::wait_for_signals(signals, grace_period));
let http1_cancel_recv = Arc::new(Mutex::new(_cancel_recv));
#[cfg(unix)]
let http1_server = http1_server.with_graceful_shutdown(signals::wait_for_signals(
signals,
grace_period,
http1_cancel_recv,
));
#[cfg(windows)]
let http1_cancel_recv = Arc::new(tokio::sync::Mutex::new(_cancel_recv));
let http1_cancel_recv = Arc::new(Mutex::new(_cancel_recv));
#[cfg(windows)]
let http1_ctrlc_recv = Arc::new(tokio::sync::Mutex::new(Some(receiver)));
let http1_ctrlc_recv = Arc::new(Mutex::new(Some(receiver)));
#[cfg(windows)]
let http1_server = http1_server.with_graceful_shutdown(async move {
@@ -6,6 +6,8 @@
use std::sync::Arc;
use tokio::sync::{watch::Receiver, Mutex};
use tokio::time::{sleep, Duration};
#[cfg(unix)]
@@ -14,9 +16,6 @@ use {
signal_hook_tokio::Signals,
};
#[cfg(windows)]
use {std::sync::Arc, tokio::sync::watch::Receiver, tokio::sync::Mutex};
#[cfg(unix)]
#[cfg_attr(docsrs, doc(cfg(unix)))]
@@ -26,21 +25,42 @@ pub fn create_signals() -> Result<Signals> {
#[cfg(unix)]
pub async fn wait_for_signals(signals: Signals, grace_period_secs: u8) {
let mut signals = signals.fuse();
while let Some(signal) = signals.next().await {
match signal {
SIGHUP => {
tracing::debug!("SIGHUP caught, nothing to do about")
}
SIGTERM | SIGINT | SIGQUIT => {
tracing::info!("SIGTERM, SIGINT or SIGQUIT signal caught");
break;
pub async fn wait_for_signals(
signals: Signals,
grace_period_secs: u8,
cancel_recv: Arc<Mutex<Option<Receiver<()>>>>,
) {
let (first_tx, mut base_rx) = tokio::sync::mpsc::channel(1);
let last_tx = first_tx.clone();
tokio::spawn(async move {
let mut signals = signals.fuse();
while let Some(signal) = signals.next().await {
match signal {
SIGHUP => {
tracing::debug!("SIGHUP caught, nothing to do about")
}
SIGTERM | SIGINT | SIGQUIT => {
tracing::info!("SIGTERM, SIGINT or SIGQUIT signal caught");
first_tx.send(()).await.ok();
break;
}
_ => unreachable!(),
}
_ => unreachable!(),
}
}
});
tokio::spawn(async move {
if let Some(recv) = &mut *cancel_recv.lock().await {
recv.changed().await.ok();
last_tx.send(()).await.ok();
tracing::info!("signals interrupted manually by cancel_recv");
}
});
base_rx.recv().await.take();
delay_graceful_shutdown(grace_period_secs).await;
tracing::info!("delegating server's graceful shutdown");