index : static-web-server.git

ascending towards madness

author Jose Quintana <1700322+joseluisq@users.noreply.github.com> 2022-10-16 22:14:57.0 +00:00:00
committer GitHub <noreply@github.com> 2022-10-16 22:14:57.0 +00:00:00
commit
d1b72fdd1dc31271e96f5c89a5234f40297e91f1 [patch]
tree
f84cc718217edb50c10622903fef3b5d7c9913cb
parent
abef7857798a8a6bfc394b07e8bbbd61ebc4ace2
download
d1b72fdd1dc31271e96f5c89a5234f40297e91f1.tar.gz

refactor: make static file content/metadata read operations sync (#153)

we know that the throughput is critical for file
reading (content/metadata) operations of our static file module.
however, those operations were performed async, causing significant
overhead due to the added extra costs of using the async runtime.

we have extremely little waiting (we essentially block on one task)
for those file reading operations.
so we turned them into their sync variants which resulted into
~58% performance increase and ~10% CPU / ~52% RAM less utilization.

below two 1min tests using the sws defaults:

$ wrk --latency -t4 -c100 -d1m http://localhost

before (v2.13.0):

Running 1m test @ http://localhost
  4 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     5.47ms    1.15ms  33.78ms   75.84%
    Req/Sec     4.58k   276.47     5.74k    74.96%
  Latency Distribution
     50%    5.47ms
     75%    6.06ms
     90%    6.73ms
     99%    8.49ms
  1094168 requests in 1.00m, 747.13MB read
Requests/sec:  18222.47
Transfer/sec:     12.44MB

after:

Running 1m test @ http://localhost
  4 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     2.29ms    0.91ms  19.52ms   71.84%
    Req/Sec    10.97k   628.29    13.89k    86.83%
  Latency Distribution
     50%    2.25ms
     75%    2.82ms
     90%    3.40ms
     99%    4.73ms
  2620459 requests in 1.00m, 1.75GB read
Requests/sec:  43648.84
Transfer/sec:     29.80MB

resolves #146

Diff

 src/compression_static.rs |   2 +-
 src/static_files.rs       | 202 ++++++++++++++++-------------------------------
 tests/static_files.rs     |  45 +++++++++-
 3 files changed, 118 insertions(+), 131 deletions(-)

diff --git a/src/compression_static.rs b/src/compression_static.rs
index 041ac4e..1fd92c6 100644
--- a/src/compression_static.rs
+++ b/src/compression_static.rs
@@ -40,7 +40,7 @@ pub async fn precompressed_variant(
            filepath_precomp.display()
        );

        if let Ok((meta, _)) = file_metadata(&filepath_precomp).await {
        if let Ok((meta, _)) = file_metadata(&filepath_precomp) {
            tracing::trace!("pre-compressed file variant found, serving it directly");

            let encoding = if ext == "gz" { "gzip" } else { ext };
diff --git a/src/static_files.rs b/src/static_files.rs
index 80f126d..6c0736f 100644
--- a/src/static_files.rs
+++ b/src/static_files.rs
@@ -1,9 +1,11 @@
// Static File handler
// -> Most of the file is borrowed from https://github.com/seanmonstar/warp/blob/master/src/filters/fs.rs
//! Static File handler
//!
//! Part of the file is borrowed and adapted at a convenience from
//! https://github.com/seanmonstar/warp/blob/master/src/filters/fs.rs

use bytes::{Bytes, BytesMut};
use futures_util::future::Either;
use futures_util::{future, ready, stream, FutureExt, Stream, StreamExt};
use futures_util::future::{Either, Future};
use futures_util::{future, Stream};
use headers::{
    AcceptRanges, ContentLength, ContentRange, ContentType, HeaderMap, HeaderMapExt, HeaderValue,
    IfModifiedSince, IfRange, IfUnmodifiedSince, LastModified, Range,
@@ -11,17 +13,13 @@ use headers::{
use http::header::CONTENT_LENGTH;
use hyper::{header::CONTENT_ENCODING, Body, Method, Response, StatusCode};
use percent_encoding::percent_decode_str;
use std::fs::Metadata;
use std::future::Future;
use std::io;
use std::fs::{File, Metadata};
use std::io::{self, BufReader, Read, Seek, SeekFrom};
use std::ops::Bound;
use std::path::{Component, PathBuf};
use std::path::{Component, Path, PathBuf};
use std::pin::Pin;
use std::task::Poll;
use std::{cmp, path::Path};
use tokio::fs::File as TkFile;
use tokio::io::AsyncSeekExt;
use tokio_util::io::poll_read_buf;
use std::sync::Mutex;
use std::task::{Context, Poll};

use crate::directory_listing::DirListFmt;
use crate::{compression_static, directory_listing, Result};
@@ -157,7 +155,7 @@ async fn composed_file_metadata<'a>(

    tracing::trace!("getting metadata for file {}", file_path.display());

    match file_metadata(file_path.as_ref()).await {
    match file_metadata(file_path.as_ref()) {
        Ok((mut meta, is_dir)) => {
            if is_dir {
                // Append a HTML index page by default if it's a directory path (`autoindex`)
@@ -166,7 +164,7 @@ async fn composed_file_metadata<'a>(

                // If file exists then overwrite the `meta`
                // Also noting that it's still a directory request
                if let Ok(meta_res) = file_metadata(file_path.as_ref()).await {
                if let Ok(meta_res) = file_metadata(file_path.as_ref()) {
                    (meta, _) = meta_res
                }
            }
@@ -189,8 +187,8 @@ async fn composed_file_metadata<'a>(
}

/// Try to find the file system metadata for the given file path.
pub async fn file_metadata(file_path: &Path) -> Result<(Metadata, bool), StatusCode> {
    match tokio::fs::metadata(file_path).await {
pub fn file_metadata(file_path: &Path) -> Result<(Metadata, bool), StatusCode> {
    match std::fs::metadata(file_path) {
        Ok(meta) => {
            let is_dir = meta.is_dir();
            tracing::trace!("file found: {:?}", file_path);
@@ -219,7 +217,7 @@ fn file_reply<'a>(

    let file_path = path_precompressed.unwrap_or_else(|| path.clone());

    TkFile::open(file_path).then(move |res| match res {
    match File::open(file_path) {
        Ok(file) => Either::Left(response_body(file, path, meta, conditionals)),
        Err(err) => {
            let status = match err.kind() {
@@ -238,7 +236,7 @@ fn file_reply<'a>(
            };
            Either::Right(future::err(status))
        }
    })
    }
}

fn get_conditional_headers(header_list: &HeaderMap<HeaderValue>) -> Conditionals {
@@ -356,24 +354,71 @@ impl Conditionals {
    }
}

#[cfg(unix)]
const READ_BUF_SIZE: usize = 4_096;

#[cfg(not(unix))]
const READ_BUF_SIZE: usize = 8_192;

#[derive(Debug)]
pub struct FileStream<T> {
    reader: Mutex<T>,
}

impl<T: Read> Stream for FileStream<T> {
    type Item = Result<Bytes>;

    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut reader = match self.reader.lock() {
            Ok(reader) => reader,
            Err(err) => {
                tracing::error!("file stream error: {:?}", err);
                return Poll::Ready(Some(Err(anyhow::anyhow!("failed to read stream file"))));
            }
        };

        let mut buf = BytesMut::zeroed(READ_BUF_SIZE);
        match reader.read(&mut buf[..]) {
            Ok(n) => {
                if n == 0 {
                    Poll::Ready(None)
                } else {
                    buf.truncate(n);
                    Poll::Ready(Some(Ok(buf.freeze())))
                }
            }
            Err(err) => Poll::Ready(Some(Err(anyhow::Error::from(err)))),
        }
    }
}

async fn response_body(
    file: TkFile,
    mut file: File,
    path: &PathBuf,
    meta: &Metadata,
    conditionals: Conditionals,
) -> Result<Response<Body>, StatusCode> {
    let mut len = meta.len();
    let modified = meta.modified().ok().map(LastModified::from);
    let resp = match conditionals.check(modified) {
        Cond::NoBody(resp) => resp,

    match conditionals.check(modified) {
        Cond::NoBody(resp) => Ok(resp),
        Cond::WithBody(range) => {
            bytes_range(range, len)
                .map(|(start, end)| {
                    match file.seek(SeekFrom::Start(start)) {
                        Ok(_) => (),
                        Err(err) => {
                            tracing::error!("seek file from start error: {:?}", err);
                            return Err(StatusCode::INTERNAL_SERVER_ERROR);
                        }
                    };

                    let sub_len = end - start;
                    let buf_size = optimal_buf_size(meta);
                    let stream = file_stream(file, buf_size, (start, end));
                    let body = Body::wrap_stream(stream);
                    let reader = Mutex::new(BufReader::new(file).take(sub_len));
                    let stream = FileStream { reader };

                    let body = Body::wrap_stream(stream);
                    let mut resp = Response::new(body);

                    if sub_len != len {
@@ -395,7 +440,7 @@ async fn response_body(
                        resp.headers_mut().typed_insert(last_modified);
                    }

                    resp
                    Ok(resp)
                })
                .unwrap_or_else(|BadRange| {
                    // bad byte range
@@ -403,12 +448,10 @@ async fn response_body(
                    *resp.status_mut() = StatusCode::RANGE_NOT_SATISFIABLE;
                    resp.headers_mut()
                        .typed_insert(ContentRange::unsatisfied_bytes(len));
                    resp
                    Ok(resp)
                })
        }
    };

    Ok(resp)
    }
}

struct BadRange;
@@ -451,95 +494,9 @@ fn bytes_range(range: Option<Range>, max_len: u64) -> Result<(u64, u64), BadRang
    res
}

fn file_stream(
    mut file: TkFile,
    buf_size: usize,
    (start, end): (u64, u64),
) -> impl Stream<Item = Result<Bytes, io::Error>> + Send {
    let seek = async move {
        if start != 0 {
            file.seek(io::SeekFrom::Start(start)).await?;
        }
        Ok(file)
    };

    seek.into_stream()
        .map(move |result| {
            let mut buf = BytesMut::new();
            let mut len = end - start;
            let mut f = match result {
                Ok(f) => f,
                Err(f) => return Either::Left(stream::once(future::err(f))),
            };

            Either::Right(stream::poll_fn(move |cx| {
                if len == 0 {
                    return Poll::Ready(None);
                }
                reserve_at_least(&mut buf, buf_size);

                let n = match ready!(poll_read_buf(Pin::new(&mut f), cx, &mut buf)) {
                    Ok(n) => n as u64,
                    Err(err) => {
                        tracing::debug!("file read error: {}", err);
                        return Poll::Ready(Some(Err(err)));
                    }
                };

                if n == 0 {
                    tracing::debug!("file read found EOF before expected length");
                    return Poll::Ready(None);
                }

                let mut chunk = buf.split().freeze();
                if n > len {
                    chunk = chunk.split_to(len as usize);
                    len = 0;
                } else {
                    len -= n;
                }

                Poll::Ready(Some(Ok(chunk)))
            }))
        })
        .flatten()
}

fn reserve_at_least(buf: &mut BytesMut, cap: usize) {
    if buf.capacity() - buf.len() < cap {
        buf.reserve(cap);
    }
}

const DEFAULT_READ_BUF_SIZE: usize = 8_192;

fn optimal_buf_size(metadata: &Metadata) -> usize {
    let block_size = get_block_size(metadata);

    // If file length is smaller than block size, don't waste space
    // reserving a bigger-than-needed buffer.
    cmp::min(block_size as u64, metadata.len()) as usize
}

#[cfg(unix)]
fn get_block_size(metadata: &Metadata) -> usize {
    use std::os::unix::fs::MetadataExt;
    //TODO: blksize() returns u64, should handle bad cast...
    //(really, a block size bigger than 4gb?)

    // Use device blocksize unless it's really small.
    cmp::max(metadata.blksize() as usize, DEFAULT_READ_BUF_SIZE)
}

#[cfg(not(unix))]
fn get_block_size(_metadata: &Metadata) -> usize {
    DEFAULT_READ_BUF_SIZE
}

#[cfg(test)]
mod tests {
    use super::sanitize_path;
    use bytes::BytesMut;
    use std::path::PathBuf;

    fn root_dir() -> PathBuf {
@@ -570,17 +527,4 @@ mod tests {
            expected_path
        );
    }

    #[test]
    fn test_reserve_at_least() {
        let mut buf = BytesMut::new();
        let cap = 8_192;

        assert_eq!(buf.len(), 0);
        assert_eq!(buf.capacity(), 0);

        super::reserve_at_least(&mut buf, cap);
        assert_eq!(buf.len(), 0);
        assert_eq!(buf.capacity(), cap);
    }
}
diff --git a/tests/static_files.rs b/tests/static_files.rs
index 725f9ee..81c623f 100644
--- a/tests/static_files.rs
+++ b/tests/static_files.rs
@@ -634,7 +634,50 @@ mod tests {
    }

    #[tokio::test]
    async fn handle_byte_ranges() {
    async fn handle_byte_ranges_single() {
        let mut headers = HeaderMap::new();
        headers.insert("range", "bytes=0-0".parse().unwrap());

        let buf = fs::read(root_dir().join("index.html"))
            .expect("unexpected error during index.html reading");
        let buf = Bytes::from(buf);

        for method in [Method::HEAD, Method::GET] {
            match static_files::handle(&HandleOpts {
                method: &method,
                headers: &headers,
                base_path: &root_dir(),
                uri_path: "index.html",
                uri_query: None,
                dir_listing: false,
                dir_listing_order: 6,
                dir_listing_format: &DirListFmt::Html,
                redirect_trailing_slash: true,
                compression_static: false,
            })
            .await
            {
                Ok((mut res, _)) => {
                    assert_eq!(res.status(), 206);
                    assert_eq!(
                        res.headers()["content-range"],
                        format!("bytes 0-0/{}", buf.len())
                    );
                    assert_eq!(res.headers()["content-length"], "1");
                    let body = hyper::body::to_bytes(res.body_mut())
                        .await
                        .expect("unexpected bytes error during `body` conversion");
                    assert_eq!(body, &buf[..=0]);
                }
                Err(_) => {
                    panic!("expected a normal response rather than a status error")
                }
            }
        }
    }

    #[tokio::test]
    async fn handle_byte_ranges_multiple() {
        let mut headers = HeaderMap::new();
        headers.insert("range", "bytes=100-200".parse().unwrap());