refactor: handle windows service events properly
Diff
Cargo.lock | 2 +-
Cargo.toml | 2 +-
src/bin/server.rs | 10 ++--
src/server.rs | 67 +++++++++++++++---------
src/signals.rs | 26 ++++-----
src/winservice.rs | 152 ++++++++++++++++++++++++++++++++++++-------------------
6 files changed, 164 insertions(+), 95 deletions(-)
@@ -1401,7 +1401,7 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-service"
version = "0.4.0"
source = "git+https://github.com/yescallop/windows-service-rs#5c9abd3d86a8a35d158807661f215c04661369dd"
source = "git+https://github.com/joseluisq/windows-service-rs#5c9abd3d86a8a35d158807661f215c04661369dd"
dependencies = [
"bitflags",
"err-derive",
@@ -65,7 +65,7 @@ signal-hook = { version = "0.3", features = ["extended-siginfo"] }
signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"], default-features = false }
[target.'cfg(windows)'.dependencies]
windows-service = { git = "https://github.com/yescallop/windows-service-rs" }
windows-service = { git = "https://github.com/joseluisq/windows-service-rs" }
windows-sys = { version = "0.36.1", features = [ "Win32_Foundation", "Win32_NetworkManagement_IpHelper", "Win32_Networking_WinSock", "Win32_System_Memory" ] }
[dev-dependencies]
@@ -20,17 +20,17 @@ fn main() -> Result {
match commands {
Commands::Install {} => {
#[cfg(windows)]
return static_web_server::winservice::install_service();
return static_web_server::winservice::install_service(opts.general.config_file);
#[cfg(unix)]
println!("ignored: `install` command is only available for Windows");
println!("ignored: the `install` command is only available for Windows");
}
Commands::Uninstall {} => {
#[cfg(windows)]
return static_web_server::winservice::uninstall_service();
#[cfg(unix)]
println!("ignored: `uninstall` command is only available for Windows");
println!("ignored: the `uninstall` command is only available for Windows");
}
}
} else if opts.general.as_windows_service {
@@ -38,11 +38,11 @@ fn main() -> Result {
return static_web_server::winservice::run_server_as_service();
#[cfg(unix)]
println!("ignored: `--as-windows-service` option is only available for Windows");
println!("ignored: the `--as-windows-service` option is only available for Windows");
}
static_web_server::Server::new(None)?.run()?;
static_web_server::Server::new()?.run_standalone()?;
Ok(())
}
@@ -2,8 +2,8 @@ use hyper::server::conn::AddrIncoming;
use hyper::server::Server as HyperServer;
use listenfd::ListenFd;
use std::net::{IpAddr, SocketAddr, TcpListener};
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use tokio::sync::oneshot::Receiver;
use crate::handler::{RequestHandler, RequestHandlerOpts};
use crate::tls::{TlsAcceptor, TlsConfigBuilder};
@@ -14,12 +14,11 @@ use crate::{service::RouterService, Context, Result};
pub struct Server {
opts: Settings,
threads: usize,
cancel: Option<Receiver<()>>,
}
impl Server {
pub fn new(cancel: Option<Receiver<()>>) -> Result<Server> {
pub fn new() -> Result<Server> {
let opts = Settings::get()?;
@@ -30,20 +29,31 @@ impl Server {
n => cpus * n,
};
Ok(Server {
opts,
threads,
cancel,
})
Ok(Server { opts, threads })
}
pub fn run(self) -> Result {
pub fn run_standalone(self) -> Result {
if self.cancel.is_none() {
logger::init(&self.opts.general.log_level)?;
}
logger::init(&self.opts.general.log_level)?;
self.run_server_on_rt(None, || {})
}
#[cfg(windows)]
pub fn run_as_service<F>(self, cancel: Option<Receiver<()>>, cancel_fn: F) -> Result
where
F: FnOnce(),
{
self.run_server_on_rt(cancel, cancel_fn)
}
fn run_server_on_rt<F>(self, cancel_recv: Option<Receiver<()>>, cancel_fn: F) -> Result
where
F: FnOnce(),
{
tracing::debug!("initializing tokio runtime with multi thread scheduler");
tokio::runtime::Builder::new_multi_thread()
@@ -52,10 +62,8 @@ impl Server {
.enable_all()
.build()?
.block_on(async {
let r = self.start_server().await;
if r.is_err() {
tracing::error!("server failed to start up: {:?}", r);
println!("server failed to start up: {:?}", r.unwrap_err());
if let Err(err) = self.start_server(cancel_recv, cancel_fn).await {
tracing::error!("server failed to start up: {:?}", err);
std::process::exit(1)
}
});
@@ -65,7 +73,10 @@ impl Server {
async fn start_server(self) -> Result {
async fn start_server<F>(self, _cancel_recv: Option<Receiver<()>>, _cancel_fn: F) -> Result
where
F: FnOnce(),
{
let general = self.opts.general;
@@ -116,7 +127,7 @@ impl Server {
let threads = self.threads;
tracing::info!("runtime worker threads: {}", self.threads);
tracing::info!("runtime worker threads: {}", threads);
let security_headers = general.security_headers;
@@ -218,8 +229,11 @@ impl Server {
let server =
server.with_graceful_shutdown(signals::wait_for_signals(signals, grace_period));
#[cfg(windows)]
let server =
server.with_graceful_shutdown(signals::wait_for_ctrl_c(self.cancel, grace_period));
let server = server.with_graceful_shutdown(signals::wait_for_ctrl_c(
_cancel_recv,
_cancel_fn,
grace_period,
));
tracing::info!(
parent: tracing::info_span!("Server::start_server", ?addr_str, ?threads),
@@ -242,6 +256,10 @@ impl Server {
#[cfg(unix)]
let handle = signals.handle();
tcp_listener
.set_nonblocking(true)
.with_context(|| "failed to set TCP non-blocking mode")?;
let server = HyperServer::from_tcp(tcp_listener)
.unwrap()
.tcp_nodelay(true)
@@ -251,8 +269,11 @@ impl Server {
let server =
server.with_graceful_shutdown(signals::wait_for_signals(signals, grace_period));
#[cfg(windows)]
let server =
server.with_graceful_shutdown(signals::wait_for_ctrl_c(self.cancel, grace_period));
let server = server.with_graceful_shutdown(signals::wait_for_ctrl_c(
_cancel_recv,
_cancel_fn,
grace_period,
));
tracing::info!(
parent: tracing::info_span!("Server::start_server", ?addr_str, ?threads),
@@ -7,7 +7,7 @@ use {
};
#[cfg(windows)]
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use tokio::sync::oneshot::Receiver;
#[cfg(unix)]
@@ -51,20 +51,18 @@ async fn delay_graceful_shutdown(grace_period_secs: u8) {
#[cfg(windows)]
pub async fn wait_for_ctrl_c(cancel: Option<Receiver<()>>, grace_period_secs: u8) {
if let Some(recv) = cancel {
async {
loop {
match recv.recv_timeout(Duration::from_secs(60)) {
Ok(_) | Err(RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Timeout) => (),
}
}
pub async fn wait_for_ctrl_c<F>(
cancel_recv: Option<Receiver<()>>,
cancel_fn: F,
grace_period_secs: u8,
) where
F: FnOnce(),
{
if let Some(recv) = cancel_recv {
if let Err(err) = recv.await {
tracing::error!("error during cancel recv: {:?}", err)
}
.await;
cancel_fn()
} else {
tokio::signal::ctrl_c()
.await
@@ -1,14 +1,13 @@
use std::env;
use std::ffi::OsString;
use std::thread;
use std::time::Duration;
use std::{env, path::PathBuf};
use windows_service::{
define_windows_service,
service::{
ServiceAccess, ServiceControl, ServiceControlAccept, ServiceDependency,
ServiceErrorControl, ServiceExitCode, ServiceInfo, ServiceStartType, ServiceState,
ServiceStatus, ServiceType,
ServiceAccess, ServiceControl, ServiceControlAccept, ServiceErrorControl, ServiceExitCode,
ServiceInfo, ServiceStartType, ServiceState, ServiceStatus, ServiceType,
},
service_control_handler::{self, ServiceControlHandlerResult, ServiceStatusHandle},
service_dispatcher,
@@ -23,16 +22,25 @@ const SERVICE_EXE: &str = "static-web-server.exe";
const SERVICE_DESC: &str = "A blazing fast and asynchronous web server for static files-serving";
const SERVICE_DISPLAY_NAME: &str = "Static Web Server";
define_windows_service!(ffi_service_main, my_service_main);
define_windows_service!(ffi_service_main, custom_service_main);
fn custom_service_main(_args: Vec<OsString>) {
if let Err(err) = run_service() {
tracing::info!("error starting the service: {:?}", err);
}
}
fn set_service_state(
status_handle: &ServiceStatusHandle,
current_state: ServiceState,
) -> Result<()> {
checkpoint: u32,
wait_hint: Duration,
) -> Result {
let next_status = ServiceStatus {
service_type: SERVICE_TYPE,
@@ -43,32 +51,27 @@ fn set_service_state(
exit_code: ServiceExitCode::Win32(0),
checkpoint: 0,
checkpoint,
wait_hint: Duration::default(),
wait_hint,
process_id: None,
};
Ok(status_handle.set_service_status(next_status)?)
}
fn my_service_main(_args: Vec<OsString>) {
if let Err(err) = run_service() {
println!("error starting the service: {:?}", err);
}
}
fn run_service() -> Result<()> {
fn run_service() -> Result {
let opts = Settings::get()?;
logger::init(&opts.general.log_level)?;
println!("sws service started");
tracing::info!("windows service: starting setup");
let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let mut shutdown_tx = Some(shutdown_tx);
let event_handler = move |control_event| -> ServiceControlHandlerResult {
@@ -79,7 +82,11 @@ fn run_service() -> Result<()> {
ServiceControl::Stop => {
shutdown_tx.send(()).unwrap();
tracing::debug!("windows service: handled 'ServiceControl::Stop' event");
if let Some(sender) = shutdown_tx.take() {
tracing::debug!("windows service: delegated 'ServiceControl::Stop' event");
sender.send(()).unwrap();
}
ServiceControlHandlerResult::NoError
}
@@ -90,39 +97,62 @@ fn run_service() -> Result<()> {
let status_handle = service_control_handler::register(SERVICE_NAME, event_handler)?;
println!("registering sws service");
set_service_state(&status_handle, ServiceState::StartPending)?;
println!("sws service start pending");
tracing::info!("windows service: registering service");
set_service_state(
&status_handle,
ServiceState::Running,
1,
Duration::default(),
)?;
tracing::info!("windows service: set service 'Running' state");
let stop_handler = || {
match set_service_state(
&status_handle,
ServiceState::StopPending,
2,
Duration::from_secs(3),
) {
Ok(()) => tracing::info!("windows service: set service 'StopPending' state"),
Err(err) => tracing::error!(
"windows service: error when setting 'StopPending' state: {:?}",
err
),
}
};
match Server::new(Some(shutdown_rx)) {
match Server::new() {
Ok(server) => {
set_service_state(&status_handle, ServiceState::Running).unwrap();
println!("sws service running");
let r = server.run();
if r.is_err() {
println!("error starting the server: {:?}", r.unwrap_err());
if let Err(err) = server.run_as_service(Some(shutdown_rx), stop_handler) {
tracing::error!(
"windows service: error after starting the server: {:?}",
err
);
}
set_service_state(&status_handle, ServiceState::Stopped).unwrap();
println!("sws service stopping");
}
Err(err) => {
println!("error starting the server: {:?}", err);
std::process::exit(1);
tracing::info!("windows service: error starting the server: {:?}", err);
}
}
set_service_state(&status_handle, ServiceState::Stopped)?;
println!("sws service stopping");
set_service_state(
&status_handle,
ServiceState::Stopped,
3,
Duration::from_secs(3),
)?;
tracing::info!("windows service: set service 'Stopped' state");
Ok(())
}
pub fn run_server_as_service() -> Result<()> {
pub fn run_server_as_service() -> Result {
let mut path = env::current_exe().unwrap();
path.pop();
@@ -135,15 +165,35 @@ pub fn run_server_as_service() -> Result<()> {
Ok(())
}
fn adjust_canonicalization(p: PathBuf) -> String {
const VERBATIM_PREFIX: &str = r#"\\?\"#;
let p = p.to_str().unwrap_or_default();
let p = if p.starts_with(VERBATIM_PREFIX) {
p.strip_prefix(VERBATIM_PREFIX).unwrap_or_default()
} else {
p
};
p.to_owned()
}
pub fn install_service() -> Result {
pub fn install_service(config_file: Option<PathBuf>) -> Result {
let manager_access = ServiceManagerAccess::CONNECT | ServiceManagerAccess::CREATE_SERVICE;
let service_manager = ServiceManager::local_computer(None::<&str>, manager_access)?;
let service_binary_path = std::env::current_exe().unwrap().with_file_name(SERVICE_EXE);
let mut service_binary_arguments = vec![OsString::from("--as-windows-service=true")];
if let Some(f) = config_file {
let f = adjust_canonicalization(f);
if !f.is_empty() {
service_binary_arguments.push(OsString::from(["--config-file=", &f].concat()));
}
}
let service_info = ServiceInfo {
name: OsString::from(SERVICE_NAME),
display_name: OsString::from(SERVICE_DISPLAY_NAME),
@@ -151,8 +201,8 @@ pub fn install_service() -> Result {
start_type: ServiceStartType::OnDemand,
error_control: ServiceErrorControl::Normal,
executable_path: service_binary_path,
launch_arguments: vec![OsString::from("--as-windows-service=true")],
dependencies: vec![ServiceDependency::from_system_identifier("+network")],
launch_arguments: service_binary_arguments,
dependencies: vec![],
account_name: None, account_password: None,
};
@@ -161,8 +211,8 @@ pub fn install_service() -> Result {
service.set_description(SERVICE_DESC)?;
println!(
"Windows service ({}) is installed successfully!",
SERVICE_DISPLAY_NAME
"Windows Service ({}) is installed successfully!",
SERVICE_NAME
);
Ok(())
@@ -186,8 +236,8 @@ pub fn uninstall_service() -> Result {
service.delete()?;
println!(
"Windows service ({}) is uninstalled successfully!",
SERVICE_DISPLAY_NAME
"Windows Service ({}) uninstalled successfully!",
SERVICE_NAME
);
Ok(())