index : static-web-server.git

ascending towards madness

author Jose Quintana <1700322+joseluisq@users.noreply.github.com> 2024-02-28 17:53:00.0 +00:00:00
committer GitHub <noreply@github.com> 2024-02-28 17:53:00.0 +00:00:00
commit
afd6a87389ecb0bc06094f7045c6830901cff9be [patch]
tree
afe6b4265fcb567c6d629323c91db41333023a02
parent
b6444f471fa432ea6fd3039e04a9fdc2b2c6dac6
download
afd6a87389ecb0bc06094f7045c6830901cff9be.tar.gz

feat: cancellation ability for `server::Server::run_server_on_rt` and `server::Server::run_standalone` functions (#319)

It enhances the `server::Server::run_server_on_rt` and `server::Server::run_standalone` functions to support cancellation to shut down the server gracefully on demand as a complement to the termination signals handling in Linux/Unix systems.

The `Server::run_standalone` function now accepts an optional
`cancel` parameter to shut down the server gracefully on demand as
a complement to the termination signals handling in Linux/Unix systems.

Functions updated:

```rs
/// NOTE:
/// Now it will also handle server cancellation via the `cancel_recv` param in Linux/Unix systems
/// similar to what Windows does.
pub fn run_server_on_rt<F>(self, cancel_recv: Option<Receiver<()>>, cancel_fn: F) -> Result

/// NOTE:
/// Now it also accepts a `cancel` param to shut down the server in Linux/Unix systems
/// similar to what Windows does.
pub fn run_standalone(self, cancel: Option<Receiver<()>>) -> Result
```

Diff

 src/bin/server.rs |  2 +-
 src/server.rs     | 67 ++++++++++++++++++++++++++++++++++++++------------------
 src/signals.rs    | 52 +++++++++++++++++++++++++++++--------------
 3 files changed, 83 insertions(+), 38 deletions(-)

diff --git a/src/bin/server.rs b/src/bin/server.rs
index b1cc092..354ff66 100644
--- a/src/bin/server.rs
+++ b/src/bin/server.rs
@@ -37,7 +37,7 @@ fn main() -> Result {
    }

    // Run the server by default
    static_web_server::Server::new(opts)?.run_standalone()?;
    static_web_server::Server::new(opts)?.run_standalone(None)?;

    Ok(())
}
diff --git a/src/server.rs b/src/server.rs
index 333c8c9..a6b9674 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -3,14 +3,14 @@
// See https://static-web-server.net/ for more information
// Copyright (C) 2019-present Jose Quintana <joseluisq.net>

//! Server module intended to construct a multi-thread HTTP or HTTP/2 web server.
//! Server module intended to construct a multi-threaded HTTP or HTTP/2 web server.
//!

use hyper::server::Server as HyperServer;
use listenfd::ListenFd;
use std::net::{IpAddr, SocketAddr, TcpListener};
use std::sync::Arc;
use tokio::sync::watch::Receiver;
use tokio::sync::{watch::Receiver, Mutex};

use crate::handler::{RequestHandler, RequestHandlerOpts};
#[cfg(any(unix, windows))]
@@ -27,7 +27,7 @@ use {
use crate::{cors, helpers, Settings};
use crate::{service::RouterService, Context, Result};

/// Define a multi-thread HTTP or HTTP/2 web server.
/// Define a multi-threaded HTTP or HTTP/2 web server.
pub struct Server {
    opts: Settings,
    worker_threads: usize,
@@ -35,7 +35,7 @@ pub struct Server {
}

impl Server {
    /// Create new multi-thread server instance.
    /// Create a new multi-threaded server instance.
    pub fn new(opts: Settings) -> Result<Server> {
        // Configure number of worker threads
        let cpus = num_cpus::get();
@@ -52,14 +52,25 @@ impl Server {
        })
    }

    /// Run the multi-thread `Server` as standalone.
    /// It is a top-level function of [run_server_on_rt](#method.run_server_on_rt).
    pub fn run_standalone(self) -> Result {
        self.run_server_on_rt(None, || {})
    /// Run the multi-threaded `Server` as standalone.
    /// This is a top-level function of [run_server_on_rt](#method.run_server_on_rt).
    ///
    /// It accepts an optional [`cancel`] parameter to shut down the server
    /// gracefully on demand as a complement to the termination signals handling.
    ///
    /// [`cancel`]: <https://docs.rs/tokio/latest/tokio/sync/watch/struct.Receiver.html>
    pub fn run_standalone(self, cancel: Option<Receiver<()>>) -> Result {
        self.run_server_on_rt(cancel, || {})
    }

    /// Run the multi-thread `Server` which will be used by a Windows service.
    /// It is a top-level function of [run_server_on_rt](#method.run_server_on_rt).
    /// Run the multi-threaded `Server` which will be used by a Windows service.
    /// This is a top-level function of [run_server_on_rt](#method.run_server_on_rt).
    ///
    /// It accepts an optional [`cancel`] parameter to shut down the server
    /// gracefully on demand and an optional `cancel_fn` that will be executed
    /// right after the server is shut down.
    ///
    /// [`cancel`]: <https://docs.rs/tokio/latest/tokio/sync/watch/struct.Receiver.html>
    #[cfg(windows)]
    pub fn run_as_service<F>(self, cancel: Option<Receiver<()>>, cancel_fn: F) -> Result
    where
@@ -68,12 +79,12 @@ impl Server {
        self.run_server_on_rt(cancel, cancel_fn)
    }

    /// Build and run the multi-thread `Server` on the Tokio runtime.
    /// Build and run the multi-threaded `Server` on the Tokio runtime.
    pub fn run_server_on_rt<F>(self, cancel_recv: Option<Receiver<()>>, cancel_fn: F) -> Result
    where
        F: FnOnce(),
    {
        tracing::debug!(%self.worker_threads, "initializing tokio runtime with multi-thread scheduler");
        tracing::debug!(%self.worker_threads, "initializing tokio runtime with multi-threaded scheduler");

        tokio::runtime::Builder::new_multi_thread()
            .worker_threads(self.worker_threads)
@@ -431,16 +442,24 @@ impl Server {
                HyperServer::builder(TlsAcceptor::new(tls, incoming)).serve(router_service);

            #[cfg(unix)]
            let http2_server = http2_server
                .with_graceful_shutdown(signals::wait_for_signals(signals, grace_period));
            let http2_cancel_recv = Arc::new(Mutex::new(_cancel_recv));
            #[cfg(unix)]
            let redirect_cancel_recv = http2_cancel_recv.clone();

            #[cfg(unix)]
            let http2_server = http2_server.with_graceful_shutdown(signals::wait_for_signals(
                signals,
                grace_period,
                http2_cancel_recv,
            ));

            #[cfg(windows)]
            let http2_cancel_recv = Arc::new(tokio::sync::Mutex::new(_cancel_recv));
            let http2_cancel_recv = Arc::new(Mutex::new(_cancel_recv));
            #[cfg(windows)]
            let redirect_cancel_recv = http2_cancel_recv.clone();

            #[cfg(windows)]
            let http2_ctrlc_recv = Arc::new(tokio::sync::Mutex::new(Some(receiver)));
            let http2_ctrlc_recv = Arc::new(Mutex::new(Some(receiver)));
            #[cfg(windows)]
            let redirect_ctrlc_recv = http2_ctrlc_recv.clone();

@@ -530,7 +549,7 @@ impl Server {

                #[cfg(unix)]
                let server_redirect = server_redirect.with_graceful_shutdown(
                    signals::wait_for_signals(redirect_signals, grace_period),
                    signals::wait_for_signals(redirect_signals, grace_period, redirect_cancel_recv),
                );
                #[cfg(windows)]
                let server_redirect = server_redirect.with_graceful_shutdown(async move {
@@ -599,13 +618,19 @@ impl Server {
            .serve(router_service);

        #[cfg(unix)]
        let http1_server =
            http1_server.with_graceful_shutdown(signals::wait_for_signals(signals, grace_period));
        let http1_cancel_recv = Arc::new(Mutex::new(_cancel_recv));

        #[cfg(unix)]
        let http1_server = http1_server.with_graceful_shutdown(signals::wait_for_signals(
            signals,
            grace_period,
            http1_cancel_recv,
        ));

        #[cfg(windows)]
        let http1_cancel_recv = Arc::new(tokio::sync::Mutex::new(_cancel_recv));
        let http1_cancel_recv = Arc::new(Mutex::new(_cancel_recv));
        #[cfg(windows)]
        let http1_ctrlc_recv = Arc::new(tokio::sync::Mutex::new(Some(receiver)));
        let http1_ctrlc_recv = Arc::new(Mutex::new(Some(receiver)));

        #[cfg(windows)]
        let http1_server = http1_server.with_graceful_shutdown(async move {
diff --git a/src/signals.rs b/src/signals.rs
index a78d198..74610d1 100644
--- a/src/signals.rs
+++ b/src/signals.rs
@@ -6,6 +6,8 @@
//! The module provides signals support like `SIGTERM`, `SIGINT` and `SIGQUIT`.
//!

use std::sync::Arc;
use tokio::sync::{watch::Receiver, Mutex};
use tokio::time::{sleep, Duration};

#[cfg(unix)]
@@ -14,9 +16,6 @@ use {
    signal_hook_tokio::Signals,
};

#[cfg(windows)]
use {std::sync::Arc, tokio::sync::watch::Receiver, tokio::sync::Mutex};

#[cfg(unix)]
#[cfg_attr(docsrs, doc(cfg(unix)))]
/// It creates a common list of signals stream for `SIGTERM`, `SIGINT` and `SIGQUIT` to be observed.
@@ -26,21 +25,42 @@ pub fn create_signals() -> Result<Signals> {

#[cfg(unix)]
/// It waits for a specific type of incoming signals included `ctrl+c`.
pub async fn wait_for_signals(signals: Signals, grace_period_secs: u8) {
    let mut signals = signals.fuse();
    while let Some(signal) = signals.next().await {
        match signal {
            SIGHUP => {
                // NOTE: for now we don't do something for SIGHUPs
                tracing::debug!("SIGHUP caught, nothing to do about")
            }
            SIGTERM | SIGINT | SIGQUIT => {
                tracing::info!("SIGTERM, SIGINT or SIGQUIT signal caught");
                break;
pub async fn wait_for_signals(
    signals: Signals,
    grace_period_secs: u8,
    cancel_recv: Arc<Mutex<Option<Receiver<()>>>>,
) {
    let (first_tx, mut base_rx) = tokio::sync::mpsc::channel(1);
    let last_tx = first_tx.clone();

    tokio::spawn(async move {
        let mut signals = signals.fuse();
        while let Some(signal) = signals.next().await {
            match signal {
                SIGHUP => {
                    // NOTE: for now we don't do something for SIGHUPs
                    tracing::debug!("SIGHUP caught, nothing to do about")
                }
                SIGTERM | SIGINT | SIGQUIT => {
                    tracing::info!("SIGTERM, SIGINT or SIGQUIT signal caught");
                    first_tx.send(()).await.ok();
                    break;
                }
                _ => unreachable!(),
            }
            _ => unreachable!(),
        }
    }
    });

    tokio::spawn(async move {
        if let Some(recv) = &mut *cancel_recv.lock().await {
            recv.changed().await.ok();
            last_tx.send(()).await.ok();
            tracing::info!("signals interrupted manually by cancel_recv");
        }
    });

    base_rx.recv().await.take();

    // NOTE: once loop above is done then an upstream graceful shutdown should come next.
    delay_graceful_shutdown(grace_period_secs).await;
    tracing::info!("delegating server's graceful shutdown");