index : static-web-server.git

ascending towards madness

author Jose Quintana <joseluisquintana20@gmail.com> 2022-05-20 22:27:51.0 +00:00:00
committer Jose Quintana <joseluisquintana20@gmail.com> 2022-05-20 22:29:40.0 +00:00:00
commit
eb34587c532032074f011c27caf3d9cce7579895 [patch]
tree
cbd0ce17b2c819d8f2982d1fd13b37b71452468a
parent
c84d2fef01dca387465f98e1ae6ddc8a79e89ce5
download
eb34587c532032074f011c27caf3d9cce7579895.tar.gz

refactor: handle windows service events properly



Diff

 Cargo.lock        |   2 +-
 Cargo.toml        |   2 +-
 src/bin/server.rs |  10 ++--
 src/server.rs     |  67 +++++++++++++++---------
 src/signals.rs    |  26 ++++-----
 src/winservice.rs | 152 ++++++++++++++++++++++++++++++++++++-------------------
 6 files changed, 164 insertions(+), 95 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 04a7256..c011330 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1401,7 +1401,7 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-service"
version = "0.4.0"
source = "git+https://github.com/yescallop/windows-service-rs#5c9abd3d86a8a35d158807661f215c04661369dd"
source = "git+https://github.com/joseluisq/windows-service-rs#5c9abd3d86a8a35d158807661f215c04661369dd"
dependencies = [
 "bitflags",
 "err-derive",
diff --git a/Cargo.toml b/Cargo.toml
index 31a0cd2..98bf276 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -65,7 +65,7 @@ signal-hook = { version = "0.3", features = ["extended-siginfo"] }
signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"], default-features = false }

[target.'cfg(windows)'.dependencies]
windows-service = { git = "https://github.com/yescallop/windows-service-rs" }
windows-service = { git = "https://github.com/joseluisq/windows-service-rs" }
windows-sys = { version = "0.36.1", features = [ "Win32_Foundation", "Win32_NetworkManagement_IpHelper", "Win32_Networking_WinSock", "Win32_System_Memory" ] }

[dev-dependencies]
diff --git a/src/bin/server.rs b/src/bin/server.rs
index 4c52285..833ae0a 100644
--- a/src/bin/server.rs
+++ b/src/bin/server.rs
@@ -20,17 +20,17 @@ fn main() -> Result {
        match commands {
            Commands::Install {} => {
                #[cfg(windows)]
                return static_web_server::winservice::install_service();
                return static_web_server::winservice::install_service(opts.general.config_file);

                #[cfg(unix)]
                println!("ignored: `install` command is only available for Windows");
                println!("ignored: the `install` command is only available for Windows");
            }
            Commands::Uninstall {} => {
                #[cfg(windows)]
                return static_web_server::winservice::uninstall_service();

                #[cfg(unix)]
                println!("ignored: `uninstall` command is only available for Windows");
                println!("ignored: the `uninstall` command is only available for Windows");
            }
        }
    } else if opts.general.as_windows_service {
@@ -38,11 +38,11 @@ fn main() -> Result {
        return static_web_server::winservice::run_server_as_service();

        #[cfg(unix)]
        println!("ignored: `--as-windows-service` option is only available for Windows");
        println!("ignored: the `--as-windows-service` option is only available for Windows");
    }

    // Run the server by default
    static_web_server::Server::new(None)?.run()?;
    static_web_server::Server::new()?.run_standalone()?;

    Ok(())
}
diff --git a/src/server.rs b/src/server.rs
index 904e385..77f469b 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -2,8 +2,8 @@ use hyper::server::conn::AddrIncoming;
use hyper::server::Server as HyperServer;
use listenfd::ListenFd;
use std::net::{IpAddr, SocketAddr, TcpListener};
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use tokio::sync::oneshot::Receiver;

use crate::handler::{RequestHandler, RequestHandlerOpts};
use crate::tls::{TlsAcceptor, TlsConfigBuilder};
@@ -14,12 +14,11 @@ use crate::{service::RouterService, Context, Result};
pub struct Server {
    opts: Settings,
    threads: usize,
    cancel: Option<Receiver<()>>,
}

impl Server {
    /// Create new multi-thread server instance.
    pub fn new(cancel: Option<Receiver<()>>) -> Result<Server> {
    pub fn new() -> Result<Server> {
        // Get server config
        let opts = Settings::get()?;

@@ -30,20 +29,31 @@ impl Server {
            n => cpus * n,
        };

        Ok(Server {
            opts,
            threads,
            cancel,
        })
        Ok(Server { opts, threads })
    }

    /// Build and run the multi-thread `Server`.
    pub fn run(self) -> Result {
    /// Build and run the multi-thread `Server` as standalone.
    pub fn run_standalone(self) -> Result {
        // Logging system initialization
        if self.cancel.is_none() {
            logger::init(&self.opts.general.log_level)?;
        }
        logger::init(&self.opts.general.log_level)?;

        self.run_server_on_rt(None, || {})
    }

    /// Build and run the multi-thread `Server` which will be used by a Windows service.
    #[cfg(windows)]
    pub fn run_as_service<F>(self, cancel: Option<Receiver<()>>, cancel_fn: F) -> Result
    where
        F: FnOnce(),
    {
        self.run_server_on_rt(cancel, cancel_fn)
    }

    /// Build and run the multi-thread `Server` on Tokio runtime.
    fn run_server_on_rt<F>(self, cancel_recv: Option<Receiver<()>>, cancel_fn: F) -> Result
    where
        F: FnOnce(),
    {
        tracing::debug!("initializing tokio runtime with multi thread scheduler");

        tokio::runtime::Builder::new_multi_thread()
@@ -52,10 +62,8 @@ impl Server {
            .enable_all()
            .build()?
            .block_on(async {
                let r = self.start_server().await;
                if r.is_err() {
                    tracing::error!("server failed to start up: {:?}", r);
                    println!("server failed to start up: {:?}", r.unwrap_err());
                if let Err(err) = self.start_server(cancel_recv, cancel_fn).await {
                    tracing::error!("server failed to start up: {:?}", err);
                    std::process::exit(1)
                }
            });
@@ -65,7 +73,10 @@ impl Server {

    /// Run the inner Hyper `HyperServer` (HTTP1/HTTP2) forever on the current thread
    // using the given configuration.
    async fn start_server(self) -> Result {
    async fn start_server<F>(self, _cancel_recv: Option<Receiver<()>>, _cancel_fn: F) -> Result
    where
        F: FnOnce(),
    {
        // Config "general" options
        let general = self.opts.general;

@@ -116,7 +127,7 @@ impl Server {

        // Number of worker threads option
        let threads = self.threads;
        tracing::info!("runtime worker threads: {}", self.threads);
        tracing::info!("runtime worker threads: {}", threads);

        // Security Headers option
        let security_headers = general.security_headers;
@@ -218,8 +229,11 @@ impl Server {
            let server =
                server.with_graceful_shutdown(signals::wait_for_signals(signals, grace_period));
            #[cfg(windows)]
            let server =
                server.with_graceful_shutdown(signals::wait_for_ctrl_c(self.cancel, grace_period));
            let server = server.with_graceful_shutdown(signals::wait_for_ctrl_c(
                _cancel_recv,
                _cancel_fn,
                grace_period,
            ));

            tracing::info!(
                parent: tracing::info_span!("Server::start_server", ?addr_str, ?threads),
@@ -242,6 +256,10 @@ impl Server {
            #[cfg(unix)]
            let handle = signals.handle();

            tcp_listener
                .set_nonblocking(true)
                .with_context(|| "failed to set TCP non-blocking mode")?;

            let server = HyperServer::from_tcp(tcp_listener)
                .unwrap()
                .tcp_nodelay(true)
@@ -251,8 +269,11 @@ impl Server {
            let server =
                server.with_graceful_shutdown(signals::wait_for_signals(signals, grace_period));
            #[cfg(windows)]
            let server =
                server.with_graceful_shutdown(signals::wait_for_ctrl_c(self.cancel, grace_period));
            let server = server.with_graceful_shutdown(signals::wait_for_ctrl_c(
                _cancel_recv,
                _cancel_fn,
                grace_period,
            ));

            tracing::info!(
                parent: tracing::info_span!("Server::start_server", ?addr_str, ?threads),
diff --git a/src/signals.rs b/src/signals.rs
index a5f3d97..b5cf5d3 100644
--- a/src/signals.rs
+++ b/src/signals.rs
@@ -7,7 +7,7 @@ use {
};

#[cfg(windows)]
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use tokio::sync::oneshot::Receiver;

#[cfg(unix)]
/// It creates a common list of signals stream for `SIGTERM`, `SIGINT` and `SIGQUIT` to be observed.
@@ -51,20 +51,18 @@ async fn delay_graceful_shutdown(grace_period_secs: u8) {

#[cfg(windows)]
/// It waits for an incoming `ctrl+c` signal on Windows.
pub async fn wait_for_ctrl_c(cancel: Option<Receiver<()>>, grace_period_secs: u8) {
    if let Some(recv) = cancel {
        async {
            loop {
                match recv.recv_timeout(Duration::from_secs(60)) {
                    // Break the loop either upon stop or channel disconnect
                    Ok(_) | Err(RecvTimeoutError::Disconnected) => break,

                    // Continue work if no events were received within the timeout
                    Err(RecvTimeoutError::Timeout) => (),
                }
            }
pub async fn wait_for_ctrl_c<F>(
    cancel_recv: Option<Receiver<()>>,
    cancel_fn: F,
    grace_period_secs: u8,
) where
    F: FnOnce(),
{
    if let Some(recv) = cancel_recv {
        if let Err(err) = recv.await {
            tracing::error!("error during cancel recv: {:?}", err)
        }
        .await;
        cancel_fn()
    } else {
        tokio::signal::ctrl_c()
            .await
diff --git a/src/winservice.rs b/src/winservice.rs
index 3494b50..6e45e06 100644
--- a/src/winservice.rs
+++ b/src/winservice.rs
@@ -1,14 +1,13 @@
use std::env;
use std::ffi::OsString;
use std::thread;
use std::time::Duration;
use std::{env, path::PathBuf};

use windows_service::{
    define_windows_service,
    service::{
        ServiceAccess, ServiceControl, ServiceControlAccept, ServiceDependency,
        ServiceErrorControl, ServiceExitCode, ServiceInfo, ServiceStartType, ServiceState,
        ServiceStatus, ServiceType,
        ServiceAccess, ServiceControl, ServiceControlAccept, ServiceErrorControl, ServiceExitCode,
        ServiceInfo, ServiceStartType, ServiceState, ServiceStatus, ServiceType,
    },
    service_control_handler::{self, ServiceControlHandlerResult, ServiceStatusHandle},
    service_dispatcher,
@@ -23,16 +22,25 @@ const SERVICE_EXE: &str = "static-web-server.exe";
const SERVICE_DESC: &str = "A blazing fast and asynchronous web server for static files-serving";
const SERVICE_DISPLAY_NAME: &str = "Static Web Server";

// Generate the windows service boilerplate.
// Generate the Windows Service boilerplate.
// The boilerplate contains the low-level service entry function (ffi_service_main)
// that parses incoming service arguments into Vec<OsString> and passes them to
// user defined service entry (my_service_main).
define_windows_service!(ffi_service_main, my_service_main);
// user defined service entry (custom_service_main).
define_windows_service!(ffi_service_main, custom_service_main);

fn custom_service_main(_args: Vec<OsString>) {
    if let Err(err) = run_service() {
        tracing::info!("error starting the service: {:?}", err);
    }
}

/// Assigns a particular server state with its properties.
fn set_service_state(
    status_handle: &ServiceStatusHandle,
    current_state: ServiceState,
) -> Result<()> {
    checkpoint: u32,
    wait_hint: Duration,
) -> Result {
    let next_status = ServiceStatus {
        // Should match the one from system service registry
        service_type: SERVICE_TYPE,
@@ -43,32 +51,27 @@ fn set_service_state(
        // Used to report an error when starting or stopping only, otherwise must be zero
        exit_code: ServiceExitCode::Win32(0),
        // Only used for pending states, otherwise must be zero
        checkpoint: 0,
        checkpoint,
        // Only used for pending states, otherwise must be zero
        wait_hint: Duration::default(),
        wait_hint,
        // Unused for setting status
        process_id: None,
    };

    // Tell the system that the service is now running
    //  system about the service status
    Ok(status_handle.set_service_status(next_status)?)
}

fn my_service_main(_args: Vec<OsString>) {
    if let Err(err) = run_service() {
        println!("error starting the service: {:?}", err);
    }
}

fn run_service() -> Result<()> {
fn run_service() -> Result {
    let opts = Settings::get()?;

    logger::init(&opts.general.log_level)?;

    println!("sws service started");
    tracing::info!("windows service: starting setup");

    // Create a channel to be able to poll a stop event from the service worker loop.
    // let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel();
    let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel();
    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
    let mut shutdown_tx = Some(shutdown_tx);

    // Define system service event handler that will be receiving service events.
    let event_handler = move |control_event| -> ServiceControlHandlerResult {
@@ -79,7 +82,11 @@ fn run_service() -> Result<()> {

            // Handle stop
            ServiceControl::Stop => {
                shutdown_tx.send(()).unwrap();
                tracing::debug!("windows service: handled 'ServiceControl::Stop' event");
                if let Some(sender) = shutdown_tx.take() {
                    tracing::debug!("windows service: delegated 'ServiceControl::Stop' event");
                    sender.send(()).unwrap();
                }
                ServiceControlHandlerResult::NoError
            }

@@ -90,39 +97,62 @@ fn run_service() -> Result<()> {
    // Register system service event handler.
    // The returned status handle should be used to report service status changes to the system.
    let status_handle = service_control_handler::register(SERVICE_NAME, event_handler)?;
    println!("registering sws service");

    // Service starts
    set_service_state(&status_handle, ServiceState::StartPending)?;
    println!("sws service start pending");
    tracing::info!("windows service: registering service");

    // Service is running
    set_service_state(
        &status_handle,
        ServiceState::Running,
        1,
        Duration::default(),
    )?;
    tracing::info!("windows service: set service 'Running' state");

    let stop_handler = || {
        // Service is stop pending
        match set_service_state(
            &status_handle,
            ServiceState::StopPending,
            2,
            Duration::from_secs(3),
        ) {
            Ok(()) => tracing::info!("windows service: set service 'StopPending' state"),
            Err(err) => tracing::error!(
                "windows service: error when setting 'StopPending' state: {:?}",
                err
            ),
        }
    };

    match Server::new(Some(shutdown_rx)) {
    // Starting web server
    match Server::new() {
        Ok(server) => {
            // Service is running
            set_service_state(&status_handle, ServiceState::Running).unwrap();
            println!("sws service running");

            let r = server.run();
            if r.is_err() {
                println!("error starting the server: {:?}", r.unwrap_err());
            if let Err(err) = server.run_as_service(Some(shutdown_rx), stop_handler) {
                tracing::error!(
                    "windows service: error after starting the server: {:?}",
                    err
                );
            }

            set_service_state(&status_handle, ServiceState::Stopped).unwrap();
            println!("sws service stopping");
        }
        Err(err) => {
            println!("error starting the server: {:?}", err);
            std::process::exit(1);
            tracing::info!("windows service: error starting the server: {:?}", err);
        }
    }

    set_service_state(&status_handle, ServiceState::Stopped)?;
    println!("sws service stopping");
    // Service is stopped
    set_service_state(
        &status_handle,
        ServiceState::Stopped,
        3,
        Duration::from_secs(3),
    )?;
    tracing::info!("windows service: set service 'Stopped' state");

    Ok(())
}

pub fn run_server_as_service() -> Result<()> {
/// Run web server as Windows Server
pub fn run_server_as_service() -> Result {
    // Set current directory to the same as the executable
    let mut path = env::current_exe().unwrap();
    path.pop();
@@ -135,15 +165,35 @@ pub fn run_server_as_service() -> Result<()> {
    Ok(())
}

fn adjust_canonicalization(p: PathBuf) -> String {
    const VERBATIM_PREFIX: &str = r#"\\?\"#;
    let p = p.to_str().unwrap_or_default();
    let p = if p.starts_with(VERBATIM_PREFIX) {
        p.strip_prefix(VERBATIM_PREFIX).unwrap_or_default()
    } else {
        p
    };
    p.to_owned()
}

/// Install a Windows Service for SWS.
pub fn install_service() -> Result {
pub fn install_service(config_file: Option<PathBuf>) -> Result {
    let manager_access = ServiceManagerAccess::CONNECT | ServiceManagerAccess::CREATE_SERVICE;
    let service_manager = ServiceManager::local_computer(None::<&str>, manager_access)?;

    // Set the executable path to point the current binary
    let service_binary_path = std::env::current_exe().unwrap().with_file_name(SERVICE_EXE);
    let mut service_binary_arguments = vec![OsString::from("--as-windows-service=true")];

    // Append `--config-file` path to binary arguments if present
    if let Some(f) = config_file {
        let f = adjust_canonicalization(f);
        if !f.is_empty() {
            service_binary_arguments.push(OsString::from(["--config-file=", &f].concat()));
        }
    }

    // Run the service as `System`
    // Run the current service as `System` type
    let service_info = ServiceInfo {
        name: OsString::from(SERVICE_NAME),
        display_name: OsString::from(SERVICE_DISPLAY_NAME),
@@ -151,8 +201,8 @@ pub fn install_service() -> Result {
        start_type: ServiceStartType::OnDemand,
        error_control: ServiceErrorControl::Normal,
        executable_path: service_binary_path,
        launch_arguments: vec![OsString::from("--as-windows-service=true")],
        dependencies: vec![ServiceDependency::from_system_identifier("+network")],
        launch_arguments: service_binary_arguments,
        dependencies: vec![],
        account_name: None, // run as System
        account_password: None,
    };
@@ -161,8 +211,8 @@ pub fn install_service() -> Result {
    service.set_description(SERVICE_DESC)?;

    println!(
        "Windows service ({}) is installed successfully!",
        SERVICE_DISPLAY_NAME
        "Windows Service ({}) is installed successfully!",
        SERVICE_NAME
    );

    Ok(())
@@ -186,8 +236,8 @@ pub fn uninstall_service() -> Result {
    service.delete()?;

    println!(
        "Windows service ({}) is uninstalled successfully!",
        SERVICE_DISPLAY_NAME
        "Windows Service ({}) uninstalled successfully!",
        SERVICE_NAME
    );

    Ok(())