refactor: make static file content/metadata read operations sync (#153)
we know that the throughput is critical for file
reading (content/metadata) operations of our static file module.
however, those operations were performed async, causing significant
overhead due to the added extra costs of using the async runtime.
we have extremely little waiting (we essentially block on one task)
for those file reading operations.
so we turned them into their sync variants which resulted into
~58% performance increase and ~10% CPU / ~52% RAM less utilization.
below two 1min tests using the sws defaults:
$ wrk --latency -t4 -c100 -d1m http://localhost
before (v2.13.0):
Running 1m test @ http://localhost
4 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 5.47ms 1.15ms 33.78ms 75.84%
Req/Sec 4.58k 276.47 5.74k 74.96%
Latency Distribution
50% 5.47ms
75% 6.06ms
90% 6.73ms
99% 8.49ms
1094168 requests in 1.00m, 747.13MB read
Requests/sec: 18222.47
Transfer/sec: 12.44MB
after:
Running 1m test @ http://localhost
4 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 2.29ms 0.91ms 19.52ms 71.84%
Req/Sec 10.97k 628.29 13.89k 86.83%
Latency Distribution
50% 2.25ms
75% 2.82ms
90% 3.40ms
99% 4.73ms
2620459 requests in 1.00m, 1.75GB read
Requests/sec: 43648.84
Transfer/sec: 29.80MB
resolves #146
Diff
src/compression_static.rs | 2 +-
src/static_files.rs | 202 ++++++++++++++++-------------------------------
tests/static_files.rs | 45 +++++++++-
3 files changed, 118 insertions(+), 131 deletions(-)
@@ -40,7 +40,7 @@ pub async fn precompressed_variant(
filepath_precomp.display()
);
if let Ok((meta, _)) = file_metadata(&filepath_precomp).await {
if let Ok((meta, _)) = file_metadata(&filepath_precomp) {
tracing::trace!("pre-compressed file variant found, serving it directly");
let encoding = if ext == "gz" { "gzip" } else { ext };
@@ -1,9 +1,11 @@
use bytes::{Bytes, BytesMut};
use futures_util::future::Either;
use futures_util::{future, ready, stream, FutureExt, Stream, StreamExt};
use futures_util::future::{Either, Future};
use futures_util::{future, Stream};
use headers::{
AcceptRanges, ContentLength, ContentRange, ContentType, HeaderMap, HeaderMapExt, HeaderValue,
IfModifiedSince, IfRange, IfUnmodifiedSince, LastModified, Range,
@@ -11,17 +13,13 @@ use headers::{
use http::header::CONTENT_LENGTH;
use hyper::{header::CONTENT_ENCODING, Body, Method, Response, StatusCode};
use percent_encoding::percent_decode_str;
use std::fs::Metadata;
use std::future::Future;
use std::io;
use std::fs::{File, Metadata};
use std::io::{self, BufReader, Read, Seek, SeekFrom};
use std::ops::Bound;
use std::path::{Component, PathBuf};
use std::path::{Component, Path, PathBuf};
use std::pin::Pin;
use std::task::Poll;
use std::{cmp, path::Path};
use tokio::fs::File as TkFile;
use tokio::io::AsyncSeekExt;
use tokio_util::io::poll_read_buf;
use std::sync::Mutex;
use std::task::{Context, Poll};
use crate::directory_listing::DirListFmt;
use crate::{compression_static, directory_listing, Result};
@@ -157,7 +155,7 @@ async fn composed_file_metadata<'a>(
tracing::trace!("getting metadata for file {}", file_path.display());
match file_metadata(file_path.as_ref()).await {
match file_metadata(file_path.as_ref()) {
Ok((mut meta, is_dir)) => {
if is_dir {
@@ -166,7 +164,7 @@ async fn composed_file_metadata<'a>(
if let Ok(meta_res) = file_metadata(file_path.as_ref()).await {
if let Ok(meta_res) = file_metadata(file_path.as_ref()) {
(meta, _) = meta_res
}
}
@@ -189,8 +187,8 @@ async fn composed_file_metadata<'a>(
}
pub async fn file_metadata(file_path: &Path) -> Result<(Metadata, bool), StatusCode> {
match tokio::fs::metadata(file_path).await {
pub fn file_metadata(file_path: &Path) -> Result<(Metadata, bool), StatusCode> {
match std::fs::metadata(file_path) {
Ok(meta) => {
let is_dir = meta.is_dir();
tracing::trace!("file found: {:?}", file_path);
@@ -219,7 +217,7 @@ fn file_reply<'a>(
let file_path = path_precompressed.unwrap_or_else(|| path.clone());
TkFile::open(file_path).then(move |res| match res {
match File::open(file_path) {
Ok(file) => Either::Left(response_body(file, path, meta, conditionals)),
Err(err) => {
let status = match err.kind() {
@@ -238,7 +236,7 @@ fn file_reply<'a>(
};
Either::Right(future::err(status))
}
})
}
}
fn get_conditional_headers(header_list: &HeaderMap<HeaderValue>) -> Conditionals {
@@ -356,24 +354,71 @@ impl Conditionals {
}
}
#[cfg(unix)]
const READ_BUF_SIZE: usize = 4_096;
#[cfg(not(unix))]
const READ_BUF_SIZE: usize = 8_192;
#[derive(Debug)]
pub struct FileStream<T> {
reader: Mutex<T>,
}
impl<T: Read> Stream for FileStream<T> {
type Item = Result<Bytes>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut reader = match self.reader.lock() {
Ok(reader) => reader,
Err(err) => {
tracing::error!("file stream error: {:?}", err);
return Poll::Ready(Some(Err(anyhow::anyhow!("failed to read stream file"))));
}
};
let mut buf = BytesMut::zeroed(READ_BUF_SIZE);
match reader.read(&mut buf[..]) {
Ok(n) => {
if n == 0 {
Poll::Ready(None)
} else {
buf.truncate(n);
Poll::Ready(Some(Ok(buf.freeze())))
}
}
Err(err) => Poll::Ready(Some(Err(anyhow::Error::from(err)))),
}
}
}
async fn response_body(
file: TkFile,
mut file: File,
path: &PathBuf,
meta: &Metadata,
conditionals: Conditionals,
) -> Result<Response<Body>, StatusCode> {
let mut len = meta.len();
let modified = meta.modified().ok().map(LastModified::from);
let resp = match conditionals.check(modified) {
Cond::NoBody(resp) => resp,
match conditionals.check(modified) {
Cond::NoBody(resp) => Ok(resp),
Cond::WithBody(range) => {
bytes_range(range, len)
.map(|(start, end)| {
match file.seek(SeekFrom::Start(start)) {
Ok(_) => (),
Err(err) => {
tracing::error!("seek file from start error: {:?}", err);
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
};
let sub_len = end - start;
let buf_size = optimal_buf_size(meta);
let stream = file_stream(file, buf_size, (start, end));
let body = Body::wrap_stream(stream);
let reader = Mutex::new(BufReader::new(file).take(sub_len));
let stream = FileStream { reader };
let body = Body::wrap_stream(stream);
let mut resp = Response::new(body);
if sub_len != len {
@@ -395,7 +440,7 @@ async fn response_body(
resp.headers_mut().typed_insert(last_modified);
}
resp
Ok(resp)
})
.unwrap_or_else(|BadRange| {
@@ -403,12 +448,10 @@ async fn response_body(
*resp.status_mut() = StatusCode::RANGE_NOT_SATISFIABLE;
resp.headers_mut()
.typed_insert(ContentRange::unsatisfied_bytes(len));
resp
Ok(resp)
})
}
};
Ok(resp)
}
}
struct BadRange;
@@ -451,95 +494,9 @@ fn bytes_range(range: Option<Range>, max_len: u64) -> Result<(u64, u64), BadRang
res
}
fn file_stream(
mut file: TkFile,
buf_size: usize,
(start, end): (u64, u64),
) -> impl Stream<Item = Result<Bytes, io::Error>> + Send {
let seek = async move {
if start != 0 {
file.seek(io::SeekFrom::Start(start)).await?;
}
Ok(file)
};
seek.into_stream()
.map(move |result| {
let mut buf = BytesMut::new();
let mut len = end - start;
let mut f = match result {
Ok(f) => f,
Err(f) => return Either::Left(stream::once(future::err(f))),
};
Either::Right(stream::poll_fn(move |cx| {
if len == 0 {
return Poll::Ready(None);
}
reserve_at_least(&mut buf, buf_size);
let n = match ready!(poll_read_buf(Pin::new(&mut f), cx, &mut buf)) {
Ok(n) => n as u64,
Err(err) => {
tracing::debug!("file read error: {}", err);
return Poll::Ready(Some(Err(err)));
}
};
if n == 0 {
tracing::debug!("file read found EOF before expected length");
return Poll::Ready(None);
}
let mut chunk = buf.split().freeze();
if n > len {
chunk = chunk.split_to(len as usize);
len = 0;
} else {
len -= n;
}
Poll::Ready(Some(Ok(chunk)))
}))
})
.flatten()
}
fn reserve_at_least(buf: &mut BytesMut, cap: usize) {
if buf.capacity() - buf.len() < cap {
buf.reserve(cap);
}
}
const DEFAULT_READ_BUF_SIZE: usize = 8_192;
fn optimal_buf_size(metadata: &Metadata) -> usize {
let block_size = get_block_size(metadata);
cmp::min(block_size as u64, metadata.len()) as usize
}
#[cfg(unix)]
fn get_block_size(metadata: &Metadata) -> usize {
use std::os::unix::fs::MetadataExt;
cmp::max(metadata.blksize() as usize, DEFAULT_READ_BUF_SIZE)
}
#[cfg(not(unix))]
fn get_block_size(_metadata: &Metadata) -> usize {
DEFAULT_READ_BUF_SIZE
}
#[cfg(test)]
mod tests {
use super::sanitize_path;
use bytes::BytesMut;
use std::path::PathBuf;
fn root_dir() -> PathBuf {
@@ -570,17 +527,4 @@ mod tests {
expected_path
);
}
#[test]
fn test_reserve_at_least() {
let mut buf = BytesMut::new();
let cap = 8_192;
assert_eq!(buf.len(), 0);
assert_eq!(buf.capacity(), 0);
super::reserve_at_least(&mut buf, cap);
assert_eq!(buf.len(), 0);
assert_eq!(buf.capacity(), cap);
}
}
@@ -634,7 +634,50 @@ mod tests {
}
#[tokio::test]
async fn handle_byte_ranges() {
async fn handle_byte_ranges_single() {
let mut headers = HeaderMap::new();
headers.insert("range", "bytes=0-0".parse().unwrap());
let buf = fs::read(root_dir().join("index.html"))
.expect("unexpected error during index.html reading");
let buf = Bytes::from(buf);
for method in [Method::HEAD, Method::GET] {
match static_files::handle(&HandleOpts {
method: &method,
headers: &headers,
base_path: &root_dir(),
uri_path: "index.html",
uri_query: None,
dir_listing: false,
dir_listing_order: 6,
dir_listing_format: &DirListFmt::Html,
redirect_trailing_slash: true,
compression_static: false,
})
.await
{
Ok((mut res, _)) => {
assert_eq!(res.status(), 206);
assert_eq!(
res.headers()["content-range"],
format!("bytes 0-0/{}", buf.len())
);
assert_eq!(res.headers()["content-length"], "1");
let body = hyper::body::to_bytes(res.body_mut())
.await
.expect("unexpected bytes error during `body` conversion");
assert_eq!(body, &buf[..=0]);
}
Err(_) => {
panic!("expected a normal response rather than a status error")
}
}
}
}
#[tokio::test]
async fn handle_byte_ranges_multiple() {
let mut headers = HeaderMap::new();
headers.insert("range", "bytes=100-200".parse().unwrap());