index : static-web-server.git

ascending towards madness

author Jose Quintana <joseluisquintana20@gmail.com> 2021-04-27 10:27:19.0 +00:00:00
committer Jose Quintana <joseluisquintana20@gmail.com> 2021-04-27 10:27:19.0 +00:00:00
commit
a1969362f9b90a88b24e7a78fc6262a0b79e881a [patch]
tree
8bc6cdf8249ccc4ff028032fa0382a6d31d62e85
parent
6822d94359b0d62d96496d9a8fc61525fb5e10dc
download
a1969362f9b90a88b24e7a78fc6262a0b79e881a.tar.gz

refactor: tokio server tasks independently



Diff

 src/server.rs | 433 +++++++++++++++++++++++++++++++++++------------------------
 1 file changed, 264 insertions(+), 169 deletions(-)

diff --git a/src/server.rs b/src/server.rs
index 768478f..57dcbaf 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,11 +1,12 @@
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use structopt::StructOpt;
use warp::Filter;

use crate::config::{Config, CONFIG};
use crate::{cache, cors, filters, helpers, logger, rejection, signals, Result};

/// Define a multi-thread HTTP/HTTPS web server.
/// Define a multi-thread HTTP or HTTP/2 web server.
pub struct Server {
    threads: usize,
}
@@ -24,7 +25,7 @@ impl Server {
        Self { threads }
    }

    /// Build and run the `Server` forever on the current thread.
    /// Build and run the multi-thread `Server` spawning a new Tokio asynchronous task for it.
    pub fn run(self) -> Result {
        tokio::runtime::Builder::new_multi_thread()
            .enable_all()
@@ -32,7 +33,7 @@ impl Server {
            .worker_threads(self.threads)
            .build()?
            .block_on(async {
                let r = self.start_server().await;
                let r = self.run_server_with_config().await;
                if r.is_err() {
                    panic!("Server error during start up: {:?}", r.unwrap_err())
                }
@@ -41,8 +42,8 @@ impl Server {
        Ok(())
    }

    /// Run the inner `Warp` server forever on the current thread.
    async fn start_server(self) -> Result {
    /// Create and run the `Warp` server spawning a new Tokio asynchronous task with the given configuration.
    async fn run_server_with_config(self) -> Result {
        let opts = Config::global();

        logger::init(&opts.log_level)?;
@@ -68,176 +69,59 @@ impl Server {
        let (cors_filter_opt, cors_allowed_origins) =
            cors::get_opt_cors_filter(opts.cors_allow_origins.as_ref());

        // Base fs directory filter
        let base_fs_dir_filter = warp::fs::dir(root_dir.clone())
            .map(cache::control_headers)
            .with(warp::trace::request())
            .recover(rejection::handle_rejection);

        // Public HEAD endpoint
        let public_head = warp::head().and(base_fs_dir_filter.clone());

        // Public GET endpoint (default)
        let public_get_default = warp::get().and(base_fs_dir_filter);

        // HTTP/2 + TLS
        let http2 = opts.http2;
        let http2_tls_cert_path = &opts.http2_tls_cert;
        let http2_tls_key_path = &opts.http2_tls_key;

        // Public GET/HEAD endpoints with compression (gzip, brotli or none)
        match opts.compression.as_ref() {
            "brotli" => tokio::task::spawn(async move {
                let fs_dir_filter = warp::fs::dir(root_dir)
                    .map(cache::control_headers)
                    .with(warp::compression::brotli(true))
                    .with(warp::trace::request())
                    .recover(rejection::handle_rejection);

                match cors_filter_opt {
                    Some(cors_filter) => {
                        tracing::info!(
                            cors_enabled = ?true,
                            allowed_origins = ?cors_allowed_origins
                        );

                        let public_head = public_head.with(cors_filter.clone());
                        let public_get_default = public_get_default.with(cors_filter.clone());

                        let public_get = warp::get()
                            .and(filters::has_accept_encoding("br"))
                            .and(fs_dir_filter)
                            .with(cors_filter.clone());

                        let server = warp::serve(public_head.or(public_get).or(public_get_default));

                        if http2 {
                            server
                                .tls()
                                .cert_path(http2_tls_cert_path)
                                .key_path(http2_tls_key_path)
                                .run(addr)
                                .await;
                        } else {
                            server.run(addr).await
                        }
                    }
                    None => {
                        let public_get = warp::get()
                            .and(filters::has_accept_encoding("br"))
                            .and(fs_dir_filter);

                        let server = warp::serve(public_head.or(public_get).or(public_get_default));

                        if http2 {
                            server
                                .tls()
                                .cert_path(http2_tls_cert_path)
                                .key_path(http2_tls_key_path)
                                .run(addr)
                                .await;
                        } else {
                            server.run(addr).await
                        }
                    }
                }
            }),
            "gzip" => tokio::task::spawn(async move {
                let fs_dir_filter = warp::fs::dir(root_dir)
                    .map(cache::control_headers)
                    .with(warp::compression::gzip(true))
                    .with(warp::trace::request())
                    .recover(rejection::handle_rejection);

                match cors_filter_opt {
                    Some(cors_filter) => {
                        tracing::info!(
                            cors_enabled = ?true,
                            allowed_origins = ?cors_allowed_origins
                        );

                        let public_head = public_head.with(cors_filter.clone());
                        let public_get_default = public_get_default.with(cors_filter.clone());

                        let public_get = warp::get()
                            .and(filters::has_accept_encoding("gzip"))
                            .and(fs_dir_filter)
                            .with(cors_filter.clone());

                        let server = warp::serve(public_head.or(public_get).or(public_get_default));

                        if http2 {
                            server
                                .tls()
                                .cert_path(http2_tls_cert_path)
                                .key_path(http2_tls_key_path)
                                .run(addr)
                                .await;
                        } else {
                            server.run(addr).await
                        }
                    }
                    None => {
                        let public_get = warp::get()
                            .and(filters::has_accept_encoding("gzip"))
                            .and(fs_dir_filter);

                        let server = warp::serve(public_head.or(public_get).or(public_get_default));

                        if http2 {
                            server
                                .tls()
                                .cert_path(http2_tls_cert_path)
                                .key_path(http2_tls_key_path)
                                .run(addr)
                                .await;
                        } else {
                            server.run(addr).await
                        }
                    }
                }
            }),
            _ => tokio::task::spawn(async move {
                match cors_filter_opt {
                    Some(cors_filter) => {
                        tracing::info!(
                            cors_enabled = ?true,
                            allowed_origins = ?cors_allowed_origins
                        );

                        let public_get = public_get_default.with(cors_filter.clone());

                        let server = warp::serve(public_head.or(public_get));

                        if http2 {
                            server
                                .tls()
                                .cert_path(http2_tls_cert_path)
                                .key_path(http2_tls_key_path)
                                .run(addr)
                                .await;
                        } else {
                            server.run(addr).await
                        }
                    }
                    None => {
                        let server = warp::serve(public_head.or(public_get_default));

                        if http2 {
                            server
                                .tls()
                                .cert_path(http2_tls_cert_path)
                                .key_path(http2_tls_key_path)
                                .run(addr)
                                .await;
                        } else {
                            server.run(addr).await
                        }
                    }
                }
            }),
        };
        // Spawn a new Tokio asynchronous server task determined by the given compression type (gzip, brotli or none)
        // TODO: this can be simplified by replicating something similar to `warp::compression::auto()` but skipping `deflate
        // see Warp PR #513 https://github.com/seanmonstar/warp/pull/513
        tokio::task::spawn(async move {
            if opts.compression == "brotli" {
                run_server_with_brotli_compression(
                    addr,
                    root_dir,
                    http2,
                    http2_tls_cert_path,
                    http2_tls_key_path,
                    cors_filter_opt,
                    cors_allowed_origins,
                )
                .await;
                return;
            }

            if opts.compression == "gzip" {
                run_server_with_gzip_compression(
                    addr,
                    root_dir,
                    http2,
                    http2_tls_cert_path,
                    http2_tls_key_path,
                    cors_filter_opt,
                    cors_allowed_origins,
                )
                .await;
                return;
            }

            // Fallback HTTP or HTTP/2 server with no compression
            run_server_with_no_compression(
                addr,
                root_dir,
                http2,
                http2_tls_cert_path,
                http2_tls_key_path,
                cors_filter_opt,
                cors_allowed_origins,
            )
            .await
        });

        // Handle incoming signals for Unix-like OS's only
        // TODO: make this more explicit like v1
        // see https://github.com/joseluisq/static-web-server/blob/1.x/src/server.rs#L96
        signals::wait(|sig: signals::Signal| {
            let code = signals::as_int(sig);
            tracing::warn!("Signal {} caught. Server execution exited.", code);
@@ -248,6 +132,217 @@ impl Server {
    }
}

/// It creates and starts a Warp HTTP or HTTP/2 server with Brotli compression.
pub async fn run_server_with_brotli_compression(
    addr: SocketAddr,
    root_dir: PathBuf,
    http2: bool,
    http2_tls_cert_path: &'static str,
    http2_tls_key_path: &'static str,
    cors_filter_opt: Option<warp::filters::cors::Builder>,
    cors_allowed_origins: String,
) {
    // Base fs directory filter
    let base_fs_dir_filter = warp::fs::dir(root_dir.clone())
        .map(cache::control_headers)
        .with(warp::trace::request())
        .recover(rejection::handle_rejection);

    // Public HEAD endpoint
    let public_head = warp::head().and(base_fs_dir_filter.clone());

    // Public GET endpoint (default)
    let public_get_default = warp::get().and(base_fs_dir_filter);

    // Current fs directory filter
    let fs_dir_filter = warp::fs::dir(root_dir)
        .map(cache::control_headers)
        .with(warp::compression::brotli(true))
        .with(warp::trace::request())
        .recover(rejection::handle_rejection);

    // Determine CORS filter
    if let Some(cors_filter) = cors_filter_opt {
        tracing::info!(
            cors_enabled = ?true,
            allowed_origins = ?cors_allowed_origins
        );

        let public_head = public_head.with(cors_filter.clone());
        let public_get_default = public_get_default.with(cors_filter.clone());

        let public_get = warp::get()
            .and(filters::has_accept_encoding("br"))
            .and(fs_dir_filter)
            .with(cors_filter.clone());

        let server = warp::serve(public_head.or(public_get).or(public_get_default));

        if http2 {
            server
                .tls()
                .cert_path(http2_tls_cert_path)
                .key_path(http2_tls_key_path)
                .run(addr)
                .await
        } else {
            server.run(addr).await
        }
    } else {
        let public_get = warp::get()
            .and(filters::has_accept_encoding("br"))
            .and(fs_dir_filter);

        let server = warp::serve(public_head.or(public_get).or(public_get_default));

        if http2 {
            server
                .tls()
                .cert_path(http2_tls_cert_path)
                .key_path(http2_tls_key_path)
                .run(addr)
                .await
        } else {
            server.run(addr).await
        }
    }
}

/// It creates and starts a Warp HTTP or HTTP/2 server with GZIP compression.
pub async fn run_server_with_gzip_compression(
    addr: SocketAddr,
    root_dir: PathBuf,
    http2: bool,
    http2_tls_cert_path: &'static str,
    http2_tls_key_path: &'static str,
    cors_filter_opt: Option<warp::filters::cors::Builder>,
    cors_allowed_origins: String,
) {
    // Base fs directory filter
    let base_fs_dir_filter = warp::fs::dir(root_dir.clone())
        .map(cache::control_headers)
        .with(warp::trace::request())
        .recover(rejection::handle_rejection);

    // Public HEAD endpoint
    let public_head = warp::head().and(base_fs_dir_filter.clone());

    // Public GET endpoint (default)
    let public_get_default = warp::get().and(base_fs_dir_filter);

    // Current fs directory filter
    let fs_dir_filter = warp::fs::dir(root_dir)
        .map(cache::control_headers)
        .with(warp::compression::gzip(true))
        .with(warp::trace::request())
        .recover(rejection::handle_rejection);

    // Determine CORS filter
    if let Some(cors_filter) = cors_filter_opt {
        tracing::info!(
            cors_enabled = ?true,
            allowed_origins = ?cors_allowed_origins
        );

        let public_head = public_head.with(cors_filter.clone());
        let public_get_default = public_get_default.with(cors_filter.clone());

        let public_get = warp::get()
            .and(filters::has_accept_encoding("gzip"))
            .and(fs_dir_filter)
            .with(cors_filter.clone());

        let server = warp::serve(public_head.or(public_get).or(public_get_default));

        if http2 {
            server
                .tls()
                .cert_path(http2_tls_cert_path)
                .key_path(http2_tls_key_path)
                .run(addr)
                .await
        } else {
            server.run(addr).await
        }
    } else {
        let public_get = warp::get()
            .and(filters::has_accept_encoding("gzip"))
            .and(fs_dir_filter);

        let server = warp::serve(public_head.or(public_get).or(public_get_default));

        if http2 {
            server
                .tls()
                .cert_path(http2_tls_cert_path)
                .key_path(http2_tls_key_path)
                .run(addr)
                .await
        } else {
            server.run(addr).await
        }
    }
}

/// It creates and starts a Warp HTTP or HTTP/2 server with no compression.
pub async fn run_server_with_no_compression(
    addr: SocketAddr,
    root_dir: PathBuf,
    http2: bool,
    http2_tls_cert_path: &'static str,
    http2_tls_key_path: &'static str,
    cors_filter_opt: Option<warp::filters::cors::Builder>,
    cors_allowed_origins: String,
) {
    // Base fs directory filter
    let base_fs_dir_filter = warp::fs::dir(root_dir.clone())
        .map(cache::control_headers)
        .with(warp::trace::request())
        .recover(rejection::handle_rejection);

    // Public HEAD endpoint
    let public_head = warp::head().and(base_fs_dir_filter.clone());

    // Public GET endpoint (default)
    let public_get_default = warp::get().and(base_fs_dir_filter);

    // Determine CORS filter
    if let Some(cors_filter) = cors_filter_opt {
        tracing::info!(
            cors_enabled = ?true,
            allowed_origins = ?cors_allowed_origins
        );

        let public_get = public_get_default.with(cors_filter.clone());

        let server = warp::serve(public_head.or(public_get));

        if http2 {
            server
                .tls()
                .cert_path(http2_tls_cert_path)
                .key_path(http2_tls_key_path)
                .run(addr)
                .await
        } else {
            server.run(addr).await
        }
    } else {
        let server = warp::serve(public_head.or(public_get_default));

        if http2 {
            server
                .tls()
                .cert_path(http2_tls_cert_path)
                .key_path(http2_tls_key_path)
                .run(addr)
                .await
        } else {
            server.run(addr).await
        }
    }
}

impl Default for Server {
    fn default() -> Self {
        Self::new()