From d1b72fdd1dc31271e96f5c89a5234f40297e91f1 Mon Sep 17 00:00:00 2001 From: Jose Quintana <1700322+joseluisq@users.noreply.github.com> Date: Mon, 17 Oct 2022 00:14:57 +0200 Subject: [PATCH] refactor: make static file content/metadata read operations sync (#153) we know that the throughput is critical for file reading (content/metadata) operations of our static file module. however, those operations were performed async, causing significant overhead due to the added extra costs of using the async runtime. we have extremely little waiting (we essentially block on one task) for those file reading operations. so we turned them into their sync variants which resulted into ~58% performance increase and ~10% CPU / ~52% RAM less utilization. below two 1min tests using the sws defaults: $ wrk --latency -t4 -c100 -d1m http://localhost before (v2.13.0): Running 1m test @ http://localhost 4 threads and 100 connections Thread Stats Avg Stdev Max +/- Stdev Latency 5.47ms 1.15ms 33.78ms 75.84% Req/Sec 4.58k 276.47 5.74k 74.96% Latency Distribution 50% 5.47ms 75% 6.06ms 90% 6.73ms 99% 8.49ms 1094168 requests in 1.00m, 747.13MB read Requests/sec: 18222.47 Transfer/sec: 12.44MB after: Running 1m test @ http://localhost 4 threads and 100 connections Thread Stats Avg Stdev Max +/- Stdev Latency 2.29ms 0.91ms 19.52ms 71.84% Req/Sec 10.97k 628.29 13.89k 86.83% Latency Distribution 50% 2.25ms 75% 2.82ms 90% 3.40ms 99% 4.73ms 2620459 requests in 1.00m, 1.75GB read Requests/sec: 43648.84 Transfer/sec: 29.80MB resolves #146 --- src/compression_static.rs | 2 +- src/static_files.rs | 202 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------------------------------------------------------------------------------------------------------------- tests/static_files.rs | 45 ++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 118 insertions(+), 131 deletions(-) diff --git a/src/compression_static.rs b/src/compression_static.rs index 041ac4e..1fd92c6 100644 --- a/src/compression_static.rs +++ b/src/compression_static.rs @@ -40,7 +40,7 @@ pub async fn precompressed_variant( filepath_precomp.display() ); - if let Ok((meta, _)) = file_metadata(&filepath_precomp).await { + if let Ok((meta, _)) = file_metadata(&filepath_precomp) { tracing::trace!("pre-compressed file variant found, serving it directly"); let encoding = if ext == "gz" { "gzip" } else { ext }; diff --git a/src/static_files.rs b/src/static_files.rs index 80f126d..6c0736f 100644 --- a/src/static_files.rs +++ b/src/static_files.rs @@ -1,9 +1,11 @@ -// Static File handler -// -> Most of the file is borrowed from https://github.com/seanmonstar/warp/blob/master/src/filters/fs.rs +//! Static File handler +//! +//! Part of the file is borrowed and adapted at a convenience from +//! https://github.com/seanmonstar/warp/blob/master/src/filters/fs.rs use bytes::{Bytes, BytesMut}; -use futures_util::future::Either; -use futures_util::{future, ready, stream, FutureExt, Stream, StreamExt}; +use futures_util::future::{Either, Future}; +use futures_util::{future, Stream}; use headers::{ AcceptRanges, ContentLength, ContentRange, ContentType, HeaderMap, HeaderMapExt, HeaderValue, IfModifiedSince, IfRange, IfUnmodifiedSince, LastModified, Range, @@ -11,17 +13,13 @@ use headers::{ use http::header::CONTENT_LENGTH; use hyper::{header::CONTENT_ENCODING, Body, Method, Response, StatusCode}; use percent_encoding::percent_decode_str; -use std::fs::Metadata; -use std::future::Future; -use std::io; +use std::fs::{File, Metadata}; +use std::io::{self, BufReader, Read, Seek, SeekFrom}; use std::ops::Bound; -use std::path::{Component, PathBuf}; +use std::path::{Component, Path, PathBuf}; use std::pin::Pin; -use std::task::Poll; -use std::{cmp, path::Path}; -use tokio::fs::File as TkFile; -use tokio::io::AsyncSeekExt; -use tokio_util::io::poll_read_buf; +use std::sync::Mutex; +use std::task::{Context, Poll}; use crate::directory_listing::DirListFmt; use crate::{compression_static, directory_listing, Result}; @@ -157,7 +155,7 @@ async fn composed_file_metadata<'a>( tracing::trace!("getting metadata for file {}", file_path.display()); - match file_metadata(file_path.as_ref()).await { + match file_metadata(file_path.as_ref()) { Ok((mut meta, is_dir)) => { if is_dir { // Append a HTML index page by default if it's a directory path (`autoindex`) @@ -166,7 +164,7 @@ async fn composed_file_metadata<'a>( // If file exists then overwrite the `meta` // Also noting that it's still a directory request - if let Ok(meta_res) = file_metadata(file_path.as_ref()).await { + if let Ok(meta_res) = file_metadata(file_path.as_ref()) { (meta, _) = meta_res } } @@ -189,8 +187,8 @@ async fn composed_file_metadata<'a>( } /// Try to find the file system metadata for the given file path. -pub async fn file_metadata(file_path: &Path) -> Result<(Metadata, bool), StatusCode> { - match tokio::fs::metadata(file_path).await { +pub fn file_metadata(file_path: &Path) -> Result<(Metadata, bool), StatusCode> { + match std::fs::metadata(file_path) { Ok(meta) => { let is_dir = meta.is_dir(); tracing::trace!("file found: {:?}", file_path); @@ -219,7 +217,7 @@ fn file_reply<'a>( let file_path = path_precompressed.unwrap_or_else(|| path.clone()); - TkFile::open(file_path).then(move |res| match res { + match File::open(file_path) { Ok(file) => Either::Left(response_body(file, path, meta, conditionals)), Err(err) => { let status = match err.kind() { @@ -238,7 +236,7 @@ fn file_reply<'a>( }; Either::Right(future::err(status)) } - }) + } } fn get_conditional_headers(header_list: &HeaderMap) -> Conditionals { @@ -356,24 +354,71 @@ impl Conditionals { } } +#[cfg(unix)] +const READ_BUF_SIZE: usize = 4_096; + +#[cfg(not(unix))] +const READ_BUF_SIZE: usize = 8_192; + +#[derive(Debug)] +pub struct FileStream { + reader: Mutex, +} + +impl Stream for FileStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + let mut reader = match self.reader.lock() { + Ok(reader) => reader, + Err(err) => { + tracing::error!("file stream error: {:?}", err); + return Poll::Ready(Some(Err(anyhow::anyhow!("failed to read stream file")))); + } + }; + + let mut buf = BytesMut::zeroed(READ_BUF_SIZE); + match reader.read(&mut buf[..]) { + Ok(n) => { + if n == 0 { + Poll::Ready(None) + } else { + buf.truncate(n); + Poll::Ready(Some(Ok(buf.freeze()))) + } + } + Err(err) => Poll::Ready(Some(Err(anyhow::Error::from(err)))), + } + } +} + async fn response_body( - file: TkFile, + mut file: File, path: &PathBuf, meta: &Metadata, conditionals: Conditionals, ) -> Result, StatusCode> { let mut len = meta.len(); let modified = meta.modified().ok().map(LastModified::from); - let resp = match conditionals.check(modified) { - Cond::NoBody(resp) => resp, + + match conditionals.check(modified) { + Cond::NoBody(resp) => Ok(resp), Cond::WithBody(range) => { bytes_range(range, len) .map(|(start, end)| { + match file.seek(SeekFrom::Start(start)) { + Ok(_) => (), + Err(err) => { + tracing::error!("seek file from start error: {:?}", err); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + }; + let sub_len = end - start; - let buf_size = optimal_buf_size(meta); - let stream = file_stream(file, buf_size, (start, end)); - let body = Body::wrap_stream(stream); + let reader = Mutex::new(BufReader::new(file).take(sub_len)); + let stream = FileStream { reader }; + let body = Body::wrap_stream(stream); let mut resp = Response::new(body); if sub_len != len { @@ -395,7 +440,7 @@ async fn response_body( resp.headers_mut().typed_insert(last_modified); } - resp + Ok(resp) }) .unwrap_or_else(|BadRange| { // bad byte range @@ -403,12 +448,10 @@ async fn response_body( *resp.status_mut() = StatusCode::RANGE_NOT_SATISFIABLE; resp.headers_mut() .typed_insert(ContentRange::unsatisfied_bytes(len)); - resp + Ok(resp) }) } - }; - - Ok(resp) + } } struct BadRange; @@ -451,95 +494,9 @@ fn bytes_range(range: Option, max_len: u64) -> Result<(u64, u64), BadRang res } -fn file_stream( - mut file: TkFile, - buf_size: usize, - (start, end): (u64, u64), -) -> impl Stream> + Send { - let seek = async move { - if start != 0 { - file.seek(io::SeekFrom::Start(start)).await?; - } - Ok(file) - }; - - seek.into_stream() - .map(move |result| { - let mut buf = BytesMut::new(); - let mut len = end - start; - let mut f = match result { - Ok(f) => f, - Err(f) => return Either::Left(stream::once(future::err(f))), - }; - - Either::Right(stream::poll_fn(move |cx| { - if len == 0 { - return Poll::Ready(None); - } - reserve_at_least(&mut buf, buf_size); - - let n = match ready!(poll_read_buf(Pin::new(&mut f), cx, &mut buf)) { - Ok(n) => n as u64, - Err(err) => { - tracing::debug!("file read error: {}", err); - return Poll::Ready(Some(Err(err))); - } - }; - - if n == 0 { - tracing::debug!("file read found EOF before expected length"); - return Poll::Ready(None); - } - - let mut chunk = buf.split().freeze(); - if n > len { - chunk = chunk.split_to(len as usize); - len = 0; - } else { - len -= n; - } - - Poll::Ready(Some(Ok(chunk))) - })) - }) - .flatten() -} - -fn reserve_at_least(buf: &mut BytesMut, cap: usize) { - if buf.capacity() - buf.len() < cap { - buf.reserve(cap); - } -} - -const DEFAULT_READ_BUF_SIZE: usize = 8_192; - -fn optimal_buf_size(metadata: &Metadata) -> usize { - let block_size = get_block_size(metadata); - - // If file length is smaller than block size, don't waste space - // reserving a bigger-than-needed buffer. - cmp::min(block_size as u64, metadata.len()) as usize -} - -#[cfg(unix)] -fn get_block_size(metadata: &Metadata) -> usize { - use std::os::unix::fs::MetadataExt; - //TODO: blksize() returns u64, should handle bad cast... - //(really, a block size bigger than 4gb?) - - // Use device blocksize unless it's really small. - cmp::max(metadata.blksize() as usize, DEFAULT_READ_BUF_SIZE) -} - -#[cfg(not(unix))] -fn get_block_size(_metadata: &Metadata) -> usize { - DEFAULT_READ_BUF_SIZE -} - #[cfg(test)] mod tests { use super::sanitize_path; - use bytes::BytesMut; use std::path::PathBuf; fn root_dir() -> PathBuf { @@ -570,17 +527,4 @@ mod tests { expected_path ); } - - #[test] - fn test_reserve_at_least() { - let mut buf = BytesMut::new(); - let cap = 8_192; - - assert_eq!(buf.len(), 0); - assert_eq!(buf.capacity(), 0); - - super::reserve_at_least(&mut buf, cap); - assert_eq!(buf.len(), 0); - assert_eq!(buf.capacity(), cap); - } } diff --git a/tests/static_files.rs b/tests/static_files.rs index 725f9ee..81c623f 100644 --- a/tests/static_files.rs +++ b/tests/static_files.rs @@ -634,7 +634,50 @@ mod tests { } #[tokio::test] - async fn handle_byte_ranges() { + async fn handle_byte_ranges_single() { + let mut headers = HeaderMap::new(); + headers.insert("range", "bytes=0-0".parse().unwrap()); + + let buf = fs::read(root_dir().join("index.html")) + .expect("unexpected error during index.html reading"); + let buf = Bytes::from(buf); + + for method in [Method::HEAD, Method::GET] { + match static_files::handle(&HandleOpts { + method: &method, + headers: &headers, + base_path: &root_dir(), + uri_path: "index.html", + uri_query: None, + dir_listing: false, + dir_listing_order: 6, + dir_listing_format: &DirListFmt::Html, + redirect_trailing_slash: true, + compression_static: false, + }) + .await + { + Ok((mut res, _)) => { + assert_eq!(res.status(), 206); + assert_eq!( + res.headers()["content-range"], + format!("bytes 0-0/{}", buf.len()) + ); + assert_eq!(res.headers()["content-length"], "1"); + let body = hyper::body::to_bytes(res.body_mut()) + .await + .expect("unexpected bytes error during `body` conversion"); + assert_eq!(body, &buf[..=0]); + } + Err(_) => { + panic!("expected a normal response rather than a status error") + } + } + } + } + + #[tokio::test] + async fn handle_byte_ranges_multiple() { let mut headers = HeaderMap::new(); headers.insert("range", "bytes=100-200".parse().unwrap()); -- libgit2 1.7.2