Merge pull request #38 from joseluisq/hyper_tokio
refactor: hyper + tokio
Diff
.drone.yml | 14 +-
.gitignore | 2 +-
Cargo.lock | 149 ++++----------
Cargo.toml | 14 +-
README.md | 32 +--
src/cache.rs | 38 +----
src/compression.rs | 157 ++++++++++++++-
src/config.rs | 13 +-
src/cors.rs | 36 +---
src/error_page.rs | 88 ++++++++-
src/handler.rs | 16 ++-
src/lib.rs | 10 +-
src/rejection.rs | 46 +----
src/server.rs | 217 +++++++++------------
src/static_files.rs | 458 ++++++++++++++++++++++++++++++++++++++++++++-
src/tls.rs | 414 ++++++++++++++++++++++++++++++++++++++++-
src/transport.rs | 56 +++++-
tests/tls/local.dev_cert.pem | 26 ++-
tests/tls/local.dev_key.pem | 28 +++-
19 files changed, 1435 insertions(+), 379 deletions(-)
@@ -19,7 +19,7 @@ steps:
- ./target
- name: test
image: joseluisq/rust-linux-darwin-builder:1.51.0
image: joseluisq/rust-linux-darwin-builder:1.52.0
commands:
- make test
@@ -62,7 +62,7 @@ platform:
steps:
- name: test
image: joseluisq/rust-linux-darwin-builder:1.51.0
image: joseluisq/rust-linux-darwin-builder:1.52.0
commands:
- make test
when:
@@ -70,7 +70,7 @@ steps:
- tag
- name: release
image: joseluisq/rust-linux-darwin-builder:1.51.0
image: joseluisq/rust-linux-darwin-builder:1.52.0
commands:
- make prod.release
@@ -173,7 +173,7 @@ platform:
steps:
- name: test
image: joseluisq/rust-linux-darwin-builder:1.51.0
image: joseluisq/rust-linux-darwin-builder:1.52.0
commands:
- make test
when:
@@ -181,7 +181,7 @@ steps:
- tag
- name: release
image: joseluisq/rust-linux-darwin-builder:1.51.0
image: joseluisq/rust-linux-darwin-builder:1.52.0
commands:
- make prod.release
@@ -274,12 +274,12 @@ platform:
steps:
- name: test
image: joseluisq/rust-linux-darwin-builder:1.51.0
image: joseluisq/rust-linux-darwin-builder:1.52.0
commands:
- make test
- name: release
image: joseluisq/rust-linux-darwin-builder:1.51.0
image: joseluisq/rust-linux-darwin-builder:1.52.0
commands:
- make prod.release
@@ -15,6 +15,8 @@
**/*.env
**/*.svg
**/*.data
**/*.zst
**/*.out*
release
.vscode
TODO
@@ -158,10 +158,13 @@ dependencies = [
]
[[package]]
name = "cpuid-bool"
version = "0.1.2"
name = "cpufeatures"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634"
checksum = "dec1028182c380cc45a2e2c5ec841134f2dfd0f8f5f0a5bcd68004f81b5efdf4"
dependencies = [
"libc",
]
[[package]]
name = "crc32fast"
@@ -206,16 +209,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "form_urlencoded"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191"
dependencies = [
"matches",
"percent-encoding",
]
[[package]]
name = "fs_extra"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -294,9 +287,9 @@ dependencies = [
[[package]]
name = "h2"
version = "0.3.2"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc018e188373e2777d0ef2467ebff62a08e66c3f5857b23c8fbec3018210dc00"
checksum = "825343c4eef0b63f541f8903f395dc5beb362a979b5799a84062527ef1e37726"
dependencies = [
"bytes",
"fnv",
@@ -503,16 +496,10 @@ dependencies = [
]
[[package]]
name = "matches"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
[[package]]
name = "memchr"
version = "2.3.4"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525"
checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc"
[[package]]
name = "mime"
@@ -707,9 +694,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.4.6"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a26af418b574bd56588335b3a3659a65725d4e636eb1016c2f9e3b38c7cc759"
checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
dependencies = [
"regex-syntax",
]
@@ -726,9 +713,9 @@ dependencies = [
[[package]]
name = "regex-syntax"
version = "0.6.23"
version = "0.6.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d5f089152e60f62d28b835fbff2cd2e8dc0baf1ac13343bef92ab7eed84548"
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]]
name = "ring"
@@ -765,12 +752,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
[[package]]
name = "scoped-tls"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2"
[[package]]
name = "sct"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -798,26 +779,14 @@ dependencies = [
]
[[package]]
name = "serde_urlencoded"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edfa57a7f8d9c1d260a549e7224100f6c43d43f9103e06dd8b4095a9b2b43ce9"
dependencies = [
"form_urlencoded",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "sha-1"
version = "0.9.4"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfebf75d25bd900fd1e7d11501efab59bc846dbc76196839663e6637bba9f25f"
checksum = "b659df5fc3ce22274daac600ffb845300bd2125bcfaec047823075afdab81c00"
dependencies = [
"block-buffer",
"cfg-if 1.0.0",
"cpuid-bool",
"cpufeatures",
"digest",
"opaque-debug",
]
@@ -874,16 +843,26 @@ name = "static-web-server"
version = "2.0.0-beta.3"
dependencies = [
"anyhow",
"async-compression",
"bytes",
"futures",
"headers",
"http",
"hyper",
"jemallocator",
"mime_guess",
"nix",
"num_cpus",
"once_cell",
"percent-encoding",
"pin-project",
"signal",
"structopt",
"tokio",
"tokio-rustls",
"tokio-util",
"tracing",
"tracing-subscriber",
"warp",
]
[[package]]
@@ -912,9 +891,9 @@ dependencies = [
[[package]]
name = "syn"
version = "1.0.70"
version = "1.0.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9505f307c872bab8eb46f77ae357c8eba1fdacead58ee5a850116b1d7f82883"
checksum = "a1e8cdbefb79a9a5a65e0db8b47b723ee907b7c7f8496c76a1770b5c310bab82"
dependencies = [
"proc-macro2",
"quote",
@@ -963,28 +942,29 @@ dependencies = [
"mio",
"num_cpus",
"pin-project-lite",
"tokio-macros",
]
[[package]]
name = "tokio-rustls"
version = "0.22.0"
name = "tokio-macros"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6"
checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57"
dependencies = [
"rustls",
"tokio",
"webpki",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tokio-stream"
version = "0.1.5"
name = "tokio-rustls"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e177a5d8c3bf36de9ebe6d58537d8879e964332f93fb3339e43f618c81361af0"
checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6"
dependencies = [
"futures-core",
"pin-project-lite",
"rustls",
"tokio",
"webpki",
]
[[package]]
@@ -1009,12 +989,11 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
version = "0.1.25"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01ebdc2bb4498ab1ab5f5b73c5803825e60199229ccba0698170e3be0e7f959f"
checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d"
dependencies = [
"cfg-if 1.0.0",
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@@ -1033,9 +1012,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.17"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f50de3927f93d202783f4513cda820ab47ef17f624b03c096e86ef00c67e6b5f"
checksum = "a9ff14f98b1a4b289c6248a023c1c2fa1491062964e9fed67ab29c4e4da4a052"
dependencies = [
"lazy_static",
]
@@ -1063,9 +1042,9 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.2.17"
version = "0.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "705096c6f83bf68ea5d357a6aa01829ddbdac531b357b45abeca842938085baa"
checksum = "aa5553bf0883ba7c9cbe493b085c29926bd41b66afc31ff72cf17ff4fb60dcd5"
dependencies = [
"ansi_term",
"chrono",
@@ -1118,9 +1097,9 @@ checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3"
[[package]]
name = "unicode-xid"
version = "0.2.1"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]]
name = "untrusted"
@@ -1151,34 +1130,6 @@ dependencies = [
]
[[package]]
name = "warp"
version = "0.3.1"
source = "git+https://github.com/joseluisq/warp.git?branch=0.3.x#f638f8958addb953501a08d427aae64a4c4f5a21"
dependencies = [
"async-compression",
"bytes",
"futures",
"headers",
"http",
"hyper",
"log",
"mime",
"mime_guess",
"percent-encoding",
"pin-project",
"scoped-tls",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-rustls",
"tokio-stream",
"tokio-util",
"tower-service",
"tracing",
]
[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -25,14 +25,24 @@ name = "static-web-server"
path = "src/bin/server.rs"
[dependencies]
tokio = { version = "1", features = ["rt-multi-thread"], default-features = false }
warp = { git = "https://github.com/joseluisq/warp.git", branch = "0.3.x", features = ["tls", "compression"], default-features = false }
hyper = { version = "0.14", features = ["stream", "http1", "http2", "tcp", "server"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "fs", "io-util"], default-features = false }
futures = { version = "0.3", default-features = false }
async-compression = { version = "0.3", features = ["brotli", "deflate", "gzip", "tokio"] }
headers = { git = "https://github.com/joseluisq/hyper-headers.git", branch = "headers_encoding" }
http = "0.2"
tokio-util = { version = "0.6", features = ["io"] }
anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = "0.2"
mime_guess = "2.0"
bytes = "1.0"
percent-encoding = "2.1"
structopt = { version = "0.3", default-features = false }
num_cpus = { version = "1.13" }
once_cell = "1.7"
pin-project = "1.0"
tokio-rustls = { version = "0.22" }
[target.'cfg(not(windows))'.dependencies.nix]
version = "0.14"
@@ -10,7 +10,7 @@
- Built with [Rust](https://rust-lang.org) which is focused on [safety, speed, and concurrency](https://kornel.ski/rust-c-speed).
- Memory safety and very reduced CPU and RAM overhead.
- Blazing fast static files-serving and asynchronous powered by [Warp](https://github.com/seanmonstar/warp/) `v0.3` ([Hyper](https://github.com/hyperium/hyper/) `v0.14`), [Tokio](https://github.com/tokio-rs/tokio) `v1` and a set of [awesome crates](./Cargo.toml).
- Blazing fast static files-serving and asynchronous powered by [Hyper](https://github.com/hyperium/hyper/) `v0.14`, [Tokio](https://github.com/tokio-rs/tokio) `v1` and a set of [awesome crates](./Cargo.toml).
- Suitable for lightweight [GNU/Linux Docker containers](https://hub.docker.com/r/joseluisq/static-web-server/tags). It's a fully __5MB__ static binary thanks to [Rust and Musl libc](https://doc.rust-lang.org/edition-guide/rust-2018/platform-and-target-support/musl-support-for-fully-static-binaries.html).
- GZip, Deflate or Brotli compression for text-based web files only.
- Compression on demand via [Accept-Encoding](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept-Encoding) header.
@@ -40,19 +40,19 @@ Server can be configured either via environment variables or their equivalent co
### Environment Variables
### Command-line arguments
@@ -99,9 +99,9 @@ OPTIONS:
-n, --threads-multiplier <threads-multiplier>
Number of worker threads multiplier that'll be multiplied by the number of system CPUs using the formula:
`worker threads = number of CPUs * n` where `n` is the value that changes here. When multiplier value is 0
or 1 then the `number of CPUs` is used. Number of worker threads result should be a number between 1 and
or 1 then one thread per core is used. Number of worker threads result should be a number between 1 and
32,768 though it is advised to keep this value on the smaller side [env: SERVER_THREADS_MULTIPLIER=]
[default: 8]
[default: 1]
```
## Docker stack
@@ -1,38 +0,0 @@
const CACHE_EXT_ONE_HOUR: [&str; 4] = ["atom", "json", "rss", "xml"];
const CACHE_EXT_ONE_YEAR: [&str; 30] = [
"bmp", "bz2", "css", "doc", "gif", "gz", "htc", "ico", "jpeg", "jpg", "js", "map", "mjs",
"mp3", "mp4", "ogg", "ogv", "pdf", "png", "rar", "rtf", "tar", "tgz", "wav", "weba", "webm",
"webp", "woff", "woff2", "zip",
];
pub fn control_headers(res: warp::fs::File) -> warp::reply::WithHeader<warp::fs::File> {
let mut max_age = 60 * 60 * 24_u64;
if let Some(ext) = res.path().extension() {
if let Some(ext) = ext.to_str() {
if CACHE_EXT_ONE_HOUR.iter().any(|x| *x == ext) {
max_age = 60 * 60;
} else if CACHE_EXT_ONE_YEAR.iter().any(|x| *x == ext) {
max_age = 60 * 60 * 24 * 365;
}
}
}
warp::reply::with_header(
res,
"cache-control",
[
"public, max-age=".to_string(),
duration(max_age).to_string(),
]
.concat(),
)
}
fn duration(n: u64) -> u32 {
std::cmp::min(n, u32::MAX as u64) as u32
}
@@ -1,4 +1,23 @@
use async_compression::tokio::bufread::{BrotliEncoder, DeflateEncoder, GzipEncoder};
use bytes::Bytes;
use futures::Stream;
use headers::{AcceptEncoding, ContentCoding, ContentType, HeaderMap, HeaderMapExt};
use hyper::{
header::{HeaderValue, CONTENT_ENCODING, CONTENT_LENGTH},
Body, Method, Response,
};
use pin_project::pin_project;
use std::convert::TryFrom;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_util::io::{ReaderStream, StreamReader};
use crate::error::Result;
pub const TEXT_MIME_TYPES: [&str; 16] = [
"text/html",
"text/css",
@@ -17,3 +36,139 @@ pub const TEXT_MIME_TYPES: [&str; 16] = [
"application/vnd.ms-fontobject",
"image/svg+xml",
];
pub fn auto(
method: &Method,
headers: &HeaderMap<HeaderValue>,
resp: Response<Body>,
) -> Result<Response<Body>> {
if method == Method::HEAD {
return Ok(resp);
}
if let Some(ref accept_encoding) = headers.typed_get::<AcceptEncoding>() {
if let Some(encoding) = accept_encoding.prefered_encoding() {
if let Some(content_type) = resp.headers().typed_get::<ContentType>() {
let content_type = &content_type.to_string();
if !TEXT_MIME_TYPES.iter().any(|h| *h == content_type) {
return Ok(resp);
}
}
if encoding == ContentCoding::GZIP {
let (head, body) = resp.into_parts();
return Ok(gzip(head, body.into()));
}
if encoding == ContentCoding::DEFLATE {
let (head, body) = resp.into_parts();
return Ok(deflate(head, body.into()));
}
if encoding == ContentCoding::BROTLI {
let (head, body) = resp.into_parts();
return Ok(brotli(head, body.into()));
}
}
}
Ok(resp)
}
pub fn gzip(
mut head: http::response::Parts,
body: CompressableBody<Body, hyper::Error>,
) -> Response<Body> {
let body = Body::wrap_stream(ReaderStream::new(GzipEncoder::new(StreamReader::new(body))));
let header = create_encoding_header(head.headers.remove(CONTENT_ENCODING), ContentCoding::GZIP);
head.headers.remove(CONTENT_LENGTH);
head.headers.append(CONTENT_ENCODING, header);
Response::from_parts(head, body)
}
pub fn deflate(
mut head: http::response::Parts,
body: CompressableBody<Body, hyper::Error>,
) -> Response<Body> {
let body = Body::wrap_stream(ReaderStream::new(DeflateEncoder::new(StreamReader::new(
body,
))));
let header = create_encoding_header(
head.headers.remove(CONTENT_ENCODING),
ContentCoding::DEFLATE,
);
head.headers.remove(CONTENT_LENGTH);
head.headers.append(CONTENT_ENCODING, header);
Response::from_parts(head, body)
}
pub fn brotli(
mut head: http::response::Parts,
body: CompressableBody<Body, hyper::Error>,
) -> Response<Body> {
let body = Body::wrap_stream(ReaderStream::new(BrotliEncoder::new(StreamReader::new(
body,
))));
let header =
create_encoding_header(head.headers.remove(CONTENT_ENCODING), ContentCoding::BROTLI);
head.headers.remove(CONTENT_LENGTH);
head.headers.append(CONTENT_ENCODING, header);
Response::from_parts(head, body)
}
fn create_encoding_header(existing: Option<HeaderValue>, coding: ContentCoding) -> HeaderValue {
if let Some(val) = existing {
if let Ok(str_val) = val.to_str() {
return HeaderValue::try_from(&format!("{}, {}", str_val, coding.to_string()))
.unwrap_or_else(|_| coding.into());
}
}
coding.into()
}
#[pin_project]
#[derive(Debug)]
pub struct CompressableBody<S, E>
where
S: Stream<Item = Result<Bytes, E>>,
E: std::error::Error,
{
#[pin]
pub body: S,
}
impl<S, E> Stream for CompressableBody<S, E>
where
S: Stream<Item = Result<Bytes, E>>,
E: std::error::Error,
{
type Item = std::io::Result<Bytes>;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use std::io::{Error, ErrorKind};
let pin = self.project();
S::poll_next(pin.body, ctx)
.map(|err| err.map(|res| res.map_err(|_| Error::from(ErrorKind::InvalidData))))
}
}
impl From<Body> for CompressableBody<Body, hyper::Error> {
fn from(body: Body) -> Self {
CompressableBody { body }
}
}
@@ -1,8 +1,5 @@
use once_cell::sync::OnceCell;
use structopt::StructOpt;
pub static CONFIG: OnceCell<Config> = OnceCell::new();
#[derive(Debug, StructOpt)]
pub struct Config {
@@ -17,12 +14,12 @@ pub struct Config {
#[structopt(
long,
short = "n",
default_value = "8",
default_value = "1",
env = "SERVER_THREADS_MULTIPLIER"
)]
pub threads_multiplier: usize,
@@ -71,9 +68,3 @@ pub struct Config {
pub http2_tls_key: String,
}
impl Config {
pub fn global() -> &'static Config {
CONFIG.get().expect("Config is not initialized")
}
}
@@ -1,36 +0,0 @@
use std::collections::HashSet;
use warp::filters::cors::Builder;
pub fn get_opt_cors_filter(origins: &str) -> (Option<Builder>, String) {
let mut cors_allowed_hosts = String::new();
let cors_filter = if origins.is_empty() {
None
} else if origins == "*" {
cors_allowed_hosts = origins.into();
Some(
warp::cors()
.allow_any_origin()
.allow_methods(vec!["GET", "HEAD", "OPTIONS"]),
)
} else {
cors_allowed_hosts = origins.into();
let hosts = cors_allowed_hosts
.split(',')
.map(|s| s.trim().as_ref())
.collect::<HashSet<_>>();
if hosts.is_empty() {
cors_allowed_hosts = hosts.into_iter().collect::<Vec<&str>>().join(", ");
None
} else {
Some(
warp::cors()
.allow_origins(hosts)
.allow_methods(vec!["GET", "HEAD", "OPTIONS"]),
)
}
};
(cors_filter, cors_allowed_hosts)
}
@@ -0,0 +1,88 @@
use headers::{AcceptRanges, ContentLength, ContentType, HeaderMapExt};
use hyper::{Body, Method, Response, StatusCode};
use once_cell::sync::OnceCell;
use crate::error::Result;
pub static PAGE_404: OnceCell<String> = OnceCell::new();
pub static PAGE_50X: OnceCell<String> = OnceCell::new();
pub fn get_error_response(method: &Method, status_code: &StatusCode) -> Result<Response<Body>> {
tracing::warn!(method = ?method, status = status_code.as_u16(), error = ?status_code.to_string());
let mut error_page_content = String::new();
let status_code = match status_code {
&StatusCode::BAD_REQUEST
| &StatusCode::UNAUTHORIZED
| &StatusCode::PAYMENT_REQUIRED
| &StatusCode::FORBIDDEN
| &StatusCode::NOT_FOUND
| &StatusCode::METHOD_NOT_ALLOWED
| &StatusCode::NOT_ACCEPTABLE
| &StatusCode::PROXY_AUTHENTICATION_REQUIRED
| &StatusCode::REQUEST_TIMEOUT
| &StatusCode::CONFLICT
| &StatusCode::GONE
| &StatusCode::LENGTH_REQUIRED
| &StatusCode::PRECONDITION_FAILED
| &StatusCode::PAYLOAD_TOO_LARGE
| &StatusCode::URI_TOO_LONG
| &StatusCode::UNSUPPORTED_MEDIA_TYPE
| &StatusCode::RANGE_NOT_SATISFIABLE
| &StatusCode::EXPECTATION_FAILED => {
if status_code == &StatusCode::NOT_FOUND {
error_page_content = PAGE_404
.get()
.expect("PAGE_404 contant is not initialized")
.to_string();
}
status_code
}
&StatusCode::INTERNAL_SERVER_ERROR
| &StatusCode::NOT_IMPLEMENTED
| &StatusCode::BAD_GATEWAY
| &StatusCode::SERVICE_UNAVAILABLE
| &StatusCode::GATEWAY_TIMEOUT
| &StatusCode::HTTP_VERSION_NOT_SUPPORTED
| &StatusCode::VARIANT_ALSO_NEGOTIATES
| &StatusCode::INSUFFICIENT_STORAGE
| &StatusCode::LOOP_DETECTED => {
error_page_content = PAGE_50X
.get()
.expect("PAGE_50X contant is not initialized")
.to_string();
status_code
}
_ => status_code,
};
if error_page_content.is_empty() {
error_page_content = format!(
"<html><head><title>{}</title></head><body><center><h1>{}</h1></center></body></html>",
status_code, status_code
);
}
let mut body = Body::empty();
let len = error_page_content.len() as u64;
if method != Method::HEAD {
body = Body::from(error_page_content)
}
let mut resp = Response::new(body);
*resp.status_mut() = *status_code;
resp.headers_mut().typed_insert(ContentLength(len));
resp.headers_mut().typed_insert(ContentType::html());
resp.headers_mut().typed_insert(AcceptRanges::bytes());
Ok(resp)
}
@@ -0,0 +1,16 @@
use hyper::{Body, Request, Response};
use std::path::Path;
use crate::{compression, static_files};
use crate::{error::Result, error_page};
pub async fn handle_request(base: &Path, req: &Request<Body>) -> Result<Response<Body>> {
let headers = req.headers();
let method = req.method();
match static_files::handle_request(method, headers, base, req.uri().path()).await {
Ok(resp) => compression::auto(method, headers, resp),
Err(status) => error_page::get_error_response(method, &status),
}
}
@@ -3,19 +3,21 @@
#[macro_use]
extern crate anyhow;
pub mod cache;
pub mod compression;
pub mod config;
pub mod cors;
pub mod error_page;
pub mod handler;
pub mod helpers;
pub mod logger;
pub mod rejection;
pub mod server;
pub mod signals;
pub mod static_files;
pub mod tls;
pub mod transport;
#[macro_use]
pub mod error;
pub use config::{Config, CONFIG};
pub use config::Config;
pub use error::*;
pub use server::Server;
@@ -1,46 +0,0 @@
use anyhow::Result;
use once_cell::sync::OnceCell;
use std::convert::Infallible;
use warp::http::StatusCode;
use warp::{Rejection, Reply};
pub static PAGE_404: OnceCell<String> = OnceCell::new();
pub static PAGE_50X: OnceCell<String> = OnceCell::new();
pub async fn handle_rejection(err: Rejection) -> Result<impl Reply, Infallible> {
let mut content = String::new();
let code = if err.is_not_found() {
content = PAGE_404
.get()
.expect("page 404 is not initialized")
.to_string();
StatusCode::NOT_FOUND
} else if err
.find::<warp::filters::body::BodyDeserializeError>()
.is_some()
{
StatusCode::BAD_REQUEST
} else if err.find::<warp::reject::MethodNotAllowed>().is_some() {
StatusCode::METHOD_NOT_ALLOWED
} else if err.find::<warp::filters::cors::CorsForbidden>().is_some() {
StatusCode::FORBIDDEN
} else if err.find::<warp::reject::UnsupportedMediaType>().is_some() {
StatusCode::UNSUPPORTED_MEDIA_TYPE
} else {
content = PAGE_50X
.get()
.expect("page 50x is not initialized")
.to_string();
StatusCode::INTERNAL_SERVER_ERROR
};
if content.is_empty() {
content = format!(
"<html><head><title>{}</title></head><body><center><h1>{}</h1></center></body></html>",
code, code
);
}
Ok(warp::reply::with_status(warp::reply::html(content), code))
}
@@ -1,42 +1,47 @@
use hyper::server::conn::AddrIncoming;
use hyper::server::Server as HyperServer;
use hyper::service::{make_service_fn, service_fn};
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use structopt::StructOpt;
use warp::Filter;
use crate::{cache, cors, helpers, logger, rejection, Result};
use crate::{
compression::TEXT_MIME_TYPES,
config::{Config, CONFIG},
};
use crate::config::Config;
use crate::static_files::ArcPath;
use crate::tls::{TlsAcceptor, TlsConfigBuilder};
use crate::Result;
use crate::{error, error_page, handler, helpers, logger};
pub struct Server {
opts: Config,
threads: usize,
}
impl Server {
pub fn new() -> Self {
CONFIG.set(Config::from_args()).unwrap();
let opts = Config::global();
pub fn new() -> Server {
let opts = Config::from_args();
let cpus = num_cpus::get();
let threads = match opts.threads_multiplier {
0 | 1 => 1,
_ => num_cpus::get() * opts.threads_multiplier,
0 | 1 => cpus,
n => cpus * n,
};
Self { threads }
Server { opts, threads }
}
pub fn run(self) -> Result {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("static-web-server")
.worker_threads(self.threads)
.thread_name("static-web-server")
.enable_all()
.build()?
.block_on(async {
let r = self.run_server_with_config().await;
let r = self.start_server().await;
if r.is_err() {
panic!("Server error during start up: {:?}", r.unwrap_err())
}
@@ -45,9 +50,9 @@ impl Server {
Ok(())
}
async fn run_server_with_config(self) -> Result {
let opts = Config::global();
async fn start_server(self) -> Result {
let opts = &self.opts;
logger::init(&opts.log_level)?;
@@ -59,34 +64,85 @@ impl Server {
let root_dir = helpers::get_valid_dirpath(&opts.root)?;
let root_dir = ArcPath(Arc::new(root_dir));
rejection::PAGE_404
error_page::PAGE_404
.set(helpers::read_file_content(opts.page404.as_ref()))
.expect("page 404 is not initialized");
rejection::PAGE_50X
error_page::PAGE_50X
.set(helpers::read_file_content(opts.page50x.as_ref()))
.expect("page 50x is not initialized");
let (cors_filter_opt, cors_allowed_origins) =
cors::get_opt_cors_filter(opts.cors_allow_origins.as_ref());
let http2 = opts.http2;
let http2_tls_cert_path = &opts.http2_tls_cert;
let http2_tls_key_path = &opts.http2_tls_key;
tokio::task::spawn(run_server_with_options(
addr,
root_dir,
http2,
http2_tls_cert_path,
http2_tls_key_path,
cors_filter_opt,
cors_allowed_origins,
));
let threads = self.threads;
if opts.http2 {
let cert_path = opts.http2_tls_cert.clone();
let key_path = opts.http2_tls_key.clone();
tokio::task::spawn(async move {
let make_service = make_service_fn(move |_| {
let root_dir = root_dir.clone();
async move {
Ok::<_, error::Error>(service_fn(move |req| {
let root_dir = root_dir.clone();
async move { handler::handle_request(root_dir.as_ref(), &req).await }
}))
}
});
let mut incoming = AddrIncoming::bind(&addr)?;
incoming.set_nodelay(true);
let tls = TlsConfigBuilder::new()
.cert_path(cert_path)
.key_path(key_path)
.build()
.unwrap();
let server =
HyperServer::builder(TlsAcceptor::new(tls, incoming)).serve(make_service);
tracing::info!(
parent: tracing::info_span!("Server::start_server", ?addr, ?threads),
"listening on https://{}",
addr
);
server.await
});
} else {
tokio::task::spawn(async move {
let make_service = make_service_fn(move |_| {
let root_dir = root_dir.clone();
async move {
Ok::<_, error::Error>(service_fn(move |req| {
let root_dir = root_dir.clone();
async move { handler::handle_request(root_dir.as_ref(), &req).await }
}))
}
});
let server = HyperServer::bind(&addr)
.tcp_nodelay(true)
.serve(make_service);
tracing::info!(
parent: tracing::info_span!("Server::start_server", ?addr, ?threads),
"listening on http://{}",
addr
);
server.await
});
}
handle_signals();
@@ -100,83 +156,6 @@ impl Default for Server {
}
}
pub async fn run_server_with_options(
addr: SocketAddr,
root_dir: PathBuf,
http2: bool,
http2_tls_cert_path: &'static str,
http2_tls_key_path: &'static str,
cors_filter_opt: Option<warp::filters::cors::Builder>,
cors_allowed_origins: String,
) {
let base_fs_dir_filter = warp::fs::dir(root_dir.clone())
.map(cache::control_headers)
.with(warp::trace::request())
.recover(rejection::handle_rejection);
let public_head = warp::head().and(base_fs_dir_filter.clone());
let public_get_default = warp::get().and(base_fs_dir_filter);
let fs_dir_filter = warp::fs::dir(root_dir)
.map(cache::control_headers)
.with(warp::compression::auto(|headers| {
if let Some(content_type) = headers.get("content-type") {
!TEXT_MIME_TYPES.iter().any(|h| h == content_type)
} else {
false
}
}))
.with(warp::trace::request())
.recover(rejection::handle_rejection);
if let Some(cors_filter) = cors_filter_opt {
tracing::info!(
cors_enabled = ?true,
allowed_origins = ?cors_allowed_origins
);
let public_head = public_head.with(cors_filter.clone());
let public_get_default = public_get_default.with(cors_filter.clone());
let public_get = warp::get().and(fs_dir_filter).with(cors_filter.clone());
let server = warp::serve(public_head.or(public_get).or(public_get_default));
if http2 {
server
.tls()
.cert_path(http2_tls_cert_path)
.key_path(http2_tls_key_path)
.run(addr)
.await
} else {
server.run(addr).await
}
} else {
let public_get = warp::get().and(fs_dir_filter);
let server = warp::serve(public_head.or(public_get).or(public_get_default));
if http2 {
server
.tls()
.cert_path(http2_tls_cert_path)
.key_path(http2_tls_key_path)
.run(addr)
.await
} else {
server.run(addr).await
}
}
}
#[cfg(not(windows))]
fn handle_signals() {
@@ -0,0 +1,458 @@
use bytes::{Bytes, BytesMut};
use futures::future::Either;
use futures::{future, ready, stream, FutureExt, Stream, StreamExt, TryFutureExt};
use headers::{
AcceptRanges, ContentLength, ContentRange, ContentType, HeaderMap, HeaderMapExt, HeaderValue,
IfModifiedSince, IfRange, IfUnmodifiedSince, LastModified, Range,
};
use hyper::{Body, Method, Response, StatusCode};
use percent_encoding::percent_decode_str;
use std::fs::Metadata;
use std::future::Future;
use std::io;
use std::ops::Bound;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::{cmp, path::Path};
use tokio::fs::File as TkFile;
use tokio::io::AsyncSeekExt;
use tokio_util::io::poll_read_buf;
#[derive(Clone, Debug)]
pub struct ArcPath(pub Arc<PathBuf>);
impl AsRef<Path> for ArcPath {
fn as_ref(&self) -> &Path {
(*self.0).as_ref()
}
}
pub async fn handle_request(
method: &Method,
headers: &HeaderMap<HeaderValue>,
base: &Path,
uri_path: &str,
) -> Result<Response<Body>, StatusCode> {
if !(method == Method::HEAD || method == Method::GET) {
return Err(StatusCode::METHOD_NOT_ALLOWED);
}
let base = Arc::new(base.into());
let res = path_from_tail(base, uri_path).await?;
file_reply(headers, res).await
}
fn path_from_tail(
base: Arc<PathBuf>,
tail: &str,
) -> impl Future<Output = Result<(ArcPath, Metadata, bool), StatusCode>> + Send {
future::ready(sanitize_path(base.as_ref(), tail)).and_then(|mut buf| async {
match tokio::fs::metadata(&buf).await {
Ok(meta) => {
let mut auto_index = false;
if meta.is_dir() {
tracing::debug!("dir: appending index.html to directory path");
buf.push("index.html");
auto_index = true;
}
tracing::trace!("dir: {:?}", buf);
Ok((ArcPath(Arc::new(buf)), meta, auto_index))
}
Err(err) => {
tracing::debug!("file not found: {:?}", err);
Err(StatusCode::NOT_FOUND)
}
}
})
}
fn file_reply(
headers: &HeaderMap<HeaderValue>,
res: (ArcPath, Metadata, bool),
) -> impl Future<Output = Result<Response<Body>, StatusCode>> + Send {
let (path, meta, auto_index) = res;
let conditionals = get_conditional_headers(headers);
TkFile::open(path.clone()).then(move |res| match res {
Ok(f) => Either::Left(file_conditional(f, path, meta, auto_index, conditionals)),
Err(err) => {
let status = match err.kind() {
io::ErrorKind::NotFound => {
tracing::debug!("file not found: {:?}", path.as_ref().display());
StatusCode::NOT_FOUND
}
io::ErrorKind::PermissionDenied => {
tracing::warn!("file permission denied: {:?}", path.as_ref().display());
StatusCode::FORBIDDEN
}
_ => {
tracing::error!(
"file open error (path={:?}): {} ",
path.as_ref().display(),
err
);
StatusCode::INTERNAL_SERVER_ERROR
}
};
Either::Right(future::err(status))
}
})
}
fn get_conditional_headers(header_list: &HeaderMap<HeaderValue>) -> Conditionals {
let if_modified_since = header_list.typed_get::<IfModifiedSince>();
let if_unmodified_since = header_list.typed_get::<IfUnmodifiedSince>();
let if_range = header_list.typed_get::<IfRange>();
let range = header_list.typed_get::<Range>();
Conditionals {
if_modified_since,
if_unmodified_since,
if_range,
range,
}
}
fn sanitize_path(base: impl AsRef<Path>, tail: &str) -> Result<PathBuf, StatusCode> {
let mut buf = PathBuf::from(base.as_ref());
let p = match percent_decode_str(tail).decode_utf8() {
Ok(p) => p,
Err(err) => {
tracing::debug!("dir: failed to decode route={:?}: {:?}", tail, err);
return Err(StatusCode::UNSUPPORTED_MEDIA_TYPE);
}
};
tracing::trace!("dir? base={:?}, route={:?}", base.as_ref(), p);
for seg in p.split('/') {
if seg.starts_with("..") {
tracing::warn!("dir: rejecting segment starting with '..'");
return Err(StatusCode::NOT_FOUND);
} else if seg.contains('\\') {
tracing::warn!("dir: rejecting segment containing with backslash (\\)");
return Err(StatusCode::NOT_FOUND);
} else {
buf.push(seg);
}
}
Ok(buf)
}
#[derive(Debug)]
struct Conditionals {
if_modified_since: Option<IfModifiedSince>,
if_unmodified_since: Option<IfUnmodifiedSince>,
if_range: Option<IfRange>,
range: Option<Range>,
}
enum Cond {
NoBody(Response<Body>),
WithBody(Option<Range>),
}
impl Conditionals {
fn check(self, last_modified: Option<LastModified>) -> Cond {
if let Some(since) = self.if_unmodified_since {
let precondition = last_modified
.map(|time| since.precondition_passes(time.into()))
.unwrap_or(false);
tracing::trace!(
"if-unmodified-since? {:?} vs {:?} = {}",
since,
last_modified,
precondition
);
if !precondition {
let mut res = Response::new(Body::empty());
*res.status_mut() = StatusCode::PRECONDITION_FAILED;
return Cond::NoBody(res);
}
}
if let Some(since) = self.if_modified_since {
tracing::trace!(
"if-modified-since? header = {:?}, file = {:?}",
since,
last_modified
);
let unmodified = last_modified
.map(|time| !since.is_modified(time.into()))
.unwrap_or(false);
if unmodified {
let mut res = Response::new(Body::empty());
*res.status_mut() = StatusCode::NOT_MODIFIED;
return Cond::NoBody(res);
}
}
if let Some(if_range) = self.if_range {
tracing::trace!("if-range? {:?} vs {:?}", if_range, last_modified);
let can_range = !if_range.is_modified(None, last_modified.as_ref());
if !can_range {
return Cond::WithBody(None);
}
}
Cond::WithBody(self.range)
}
}
fn file_conditional(
f: TkFile,
path: ArcPath,
meta: Metadata,
auto_index: bool,
conditionals: Conditionals,
) -> impl Future<Output = Result<Response<Body>, StatusCode>> + Send {
file_metadata(f, meta, auto_index)
.map_ok(|(file, meta)| response_body(file, meta, path, conditionals))
}
async fn file_metadata(
f: TkFile,
meta: Metadata,
auto_index: bool,
) -> Result<(TkFile, Metadata), StatusCode> {
if !auto_index {
return Ok((f, meta));
}
match f.metadata().await {
Ok(meta) => Ok((f, meta)),
Err(err) => {
tracing::debug!("file metadata error: {}", err);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
fn response_body(
file: TkFile,
meta: Metadata,
path: ArcPath,
conditionals: Conditionals,
) -> Response<Body> {
let mut len = meta.len();
let modified = meta.modified().ok().map(LastModified::from);
match conditionals.check(modified) {
Cond::NoBody(resp) => resp,
Cond::WithBody(range) => {
bytes_range(range, len)
.map(|(start, end)| {
let sub_len = end - start;
let buf_size = optimal_buf_size(&meta);
let stream = file_stream(file, buf_size, (start, end));
let body = Body::wrap_stream(stream);
let mut resp = Response::new(body);
if sub_len != len {
*resp.status_mut() = StatusCode::PARTIAL_CONTENT;
resp.headers_mut().typed_insert(
ContentRange::bytes(start..end, len).expect("valid ContentRange"),
);
len = sub_len;
}
let mime = mime_guess::from_path(path.as_ref()).first_or_octet_stream();
resp.headers_mut().typed_insert(ContentLength(len));
resp.headers_mut().typed_insert(ContentType::from(mime));
resp.headers_mut().typed_insert(AcceptRanges::bytes());
if let Some(last_modified) = modified {
resp.headers_mut().typed_insert(last_modified);
}
resp
})
.unwrap_or_else(|BadRange| {
let mut resp = Response::new(Body::empty());
*resp.status_mut() = StatusCode::RANGE_NOT_SATISFIABLE;
resp.headers_mut()
.typed_insert(ContentRange::unsatisfied_bytes(len));
resp
})
}
}
}
struct BadRange;
fn bytes_range(range: Option<Range>, max_len: u64) -> Result<(u64, u64), BadRange> {
let range = if let Some(range) = range {
range
} else {
return Ok((0, max_len));
};
let ret = range
.iter()
.map(|(start, end)| {
let start = match start {
Bound::Unbounded => 0,
Bound::Included(s) => s,
Bound::Excluded(s) => s + 1,
};
let end = match end {
Bound::Unbounded => max_len,
Bound::Included(s) => {
if s == max_len {
s
} else {
s + 1
}
}
Bound::Excluded(s) => s,
};
if start < end && end <= max_len {
Ok((start, end))
} else {
tracing::trace!("unsatisfiable byte range: {}-{}/{}", start, end, max_len);
Err(BadRange)
}
})
.next()
.unwrap_or(Ok((0, max_len)));
ret
}
fn file_stream(
mut file: TkFile,
buf_size: usize,
(start, end): (u64, u64),
) -> impl Stream<Item = Result<Bytes, io::Error>> + Send {
let seek = async move {
if start != 0 {
file.seek(io::SeekFrom::Start(start)).await?;
}
Ok(file)
};
seek.into_stream()
.map(move |result| {
let mut buf = BytesMut::new();
let mut len = end - start;
let mut f = match result {
Ok(f) => f,
Err(f) => return Either::Left(stream::once(future::err(f))),
};
Either::Right(stream::poll_fn(move |cx| {
if len == 0 {
return Poll::Ready(None);
}
reserve_at_least(&mut buf, buf_size);
let n = match ready!(poll_read_buf(Pin::new(&mut f), cx, &mut buf)) {
Ok(n) => n as u64,
Err(err) => {
tracing::debug!("file read error: {}", err);
return Poll::Ready(Some(Err(err)));
}
};
if n == 0 {
tracing::debug!("file read found EOF before expected length");
return Poll::Ready(None);
}
let mut chunk = buf.split().freeze();
if n > len {
chunk = chunk.split_to(len as usize);
len = 0;
} else {
len -= n;
}
Poll::Ready(Some(Ok(chunk)))
}))
})
.flatten()
}
fn reserve_at_least(buf: &mut BytesMut, cap: usize) {
if buf.capacity() - buf.len() < cap {
buf.reserve(cap);
}
}
const DEFAULT_READ_BUF_SIZE: usize = 8_192;
fn optimal_buf_size(metadata: &Metadata) -> usize {
let block_size = get_block_size(metadata);
cmp::min(block_size as u64, metadata.len()) as usize
}
#[cfg(unix)]
fn get_block_size(metadata: &Metadata) -> usize {
use std::os::unix::fs::MetadataExt;
cmp::max(metadata.blksize() as usize, DEFAULT_READ_BUF_SIZE)
}
#[cfg(not(unix))]
fn get_block_size(_metadata: &Metadata) -> usize {
DEFAULT_READ_BUF_SIZE
}
#[cfg(test)]
mod tests {
use super::sanitize_path;
use bytes::BytesMut;
#[test]
fn test_sanitize_path() {
let base = "/var/www";
fn p(s: &str) -> &::std::path::Path {
s.as_ref()
}
assert_eq!(
sanitize_path(base, "/foo.html").unwrap(),
p("/var/www/foo.html")
);
sanitize_path(base, "/../foo.html").expect_err("dot dot");
sanitize_path(base, "/C:\\/foo.html").expect_err("C:\\");
}
#[test]
fn test_reserve_at_least() {
let mut buf = BytesMut::new();
let cap = 8_192;
assert_eq!(buf.len(), 0);
assert_eq!(buf.capacity(), 0);
super::reserve_at_least(&mut buf, cap);
assert_eq!(buf.len(), 0);
assert_eq!(buf.capacity(), cap);
}
}
@@ -0,0 +1,414 @@
use std::fs::File;
use std::future::Future;
use std::io::{self, BufReader, Cursor, Read};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use futures::ready;
use hyper::server::accept::Accept;
use hyper::server::conn::{AddrIncoming, AddrStream};
use crate::transport::Transport;
use tokio_rustls::rustls::{
AllowAnyAnonymousOrAuthenticatedClient, AllowAnyAuthenticatedClient, NoClientAuth,
RootCertStore, ServerConfig, TLSError,
};
#[derive(Debug)]
pub enum TlsConfigError {
Io(io::Error),
CertParseError,
Pkcs8ParseError,
RsaParseError,
EmptyKey,
InvalidKey(TLSError),
}
impl std::fmt::Display for TlsConfigError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TlsConfigError::Io(err) => err.fmt(f),
TlsConfigError::CertParseError => write!(f, "certificate parse error"),
TlsConfigError::Pkcs8ParseError => write!(f, "pkcs8 parse error"),
TlsConfigError::RsaParseError => write!(f, "rsa parse error"),
TlsConfigError::EmptyKey => write!(f, "key contains no private key"),
TlsConfigError::InvalidKey(err) => write!(f, "key contains an invalid key, {}", err),
}
}
}
impl std::error::Error for TlsConfigError {}
pub enum TlsClientAuth {
Off,
Optional(Box<dyn Read + Send + Sync>),
Required(Box<dyn Read + Send + Sync>),
}
pub struct TlsConfigBuilder {
cert: Box<dyn Read + Send + Sync>,
key: Box<dyn Read + Send + Sync>,
client_auth: TlsClientAuth,
ocsp_resp: Vec<u8>,
}
impl std::fmt::Debug for TlsConfigBuilder {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
f.debug_struct("TlsConfigBuilder").finish()
}
}
impl TlsConfigBuilder {
pub fn new() -> TlsConfigBuilder {
TlsConfigBuilder {
key: Box::new(io::empty()),
cert: Box::new(io::empty()),
client_auth: TlsClientAuth::Off,
ocsp_resp: Vec::new(),
}
}
pub fn key_path(mut self, path: impl AsRef<Path>) -> Self {
self.key = Box::new(LazyFile {
path: path.as_ref().into(),
file: None,
});
self
}
pub fn key(mut self, key: &[u8]) -> Self {
self.key = Box::new(Cursor::new(Vec::from(key)));
self
}
pub fn cert_path(mut self, path: impl AsRef<Path>) -> Self {
self.cert = Box::new(LazyFile {
path: path.as_ref().into(),
file: None,
});
self
}
pub fn cert(mut self, cert: &[u8]) -> Self {
self.cert = Box::new(Cursor::new(Vec::from(cert)));
self
}
pub fn client_auth_optional_path(mut self, path: impl AsRef<Path>) -> Self {
let file = Box::new(LazyFile {
path: path.as_ref().into(),
file: None,
});
self.client_auth = TlsClientAuth::Optional(file);
self
}
pub fn client_auth_optional(mut self, trust_anchor: &[u8]) -> Self {
let cursor = Box::new(Cursor::new(Vec::from(trust_anchor)));
self.client_auth = TlsClientAuth::Optional(cursor);
self
}
pub fn client_auth_required_path(mut self, path: impl AsRef<Path>) -> Self {
let file = Box::new(LazyFile {
path: path.as_ref().into(),
file: None,
});
self.client_auth = TlsClientAuth::Required(file);
self
}
pub fn client_auth_required(mut self, trust_anchor: &[u8]) -> Self {
let cursor = Box::new(Cursor::new(Vec::from(trust_anchor)));
self.client_auth = TlsClientAuth::Required(cursor);
self
}
pub fn ocsp_resp(mut self, ocsp_resp: &[u8]) -> Self {
self.ocsp_resp = Vec::from(ocsp_resp);
self
}
pub fn build(mut self) -> Result<ServerConfig, TlsConfigError> {
let mut cert_rdr = BufReader::new(self.cert);
let cert = tokio_rustls::rustls::internal::pemfile::certs(&mut cert_rdr)
.map_err(|()| TlsConfigError::CertParseError)?;
let key = {
let mut key_vec = Vec::new();
self.key
.read_to_end(&mut key_vec)
.map_err(TlsConfigError::Io)?;
if key_vec.is_empty() {
return Err(TlsConfigError::EmptyKey);
}
let mut pkcs8 = tokio_rustls::rustls::internal::pemfile::pkcs8_private_keys(
&mut key_vec.as_slice(),
)
.map_err(|()| TlsConfigError::Pkcs8ParseError)?;
if !pkcs8.is_empty() {
pkcs8.remove(0)
} else {
let mut rsa = tokio_rustls::rustls::internal::pemfile::rsa_private_keys(
&mut key_vec.as_slice(),
)
.map_err(|()| TlsConfigError::RsaParseError)?;
if !rsa.is_empty() {
rsa.remove(0)
} else {
return Err(TlsConfigError::EmptyKey);
}
}
};
fn read_trust_anchor(
trust_anchor: Box<dyn Read + Send + Sync>,
) -> Result<RootCertStore, TlsConfigError> {
let mut reader = BufReader::new(trust_anchor);
let mut store = RootCertStore::empty();
if let Ok((0, _)) | Err(()) = store.add_pem_file(&mut reader) {
Err(TlsConfigError::CertParseError)
} else {
Ok(store)
}
}
let client_auth = match self.client_auth {
TlsClientAuth::Off => NoClientAuth::new(),
TlsClientAuth::Optional(trust_anchor) => {
AllowAnyAnonymousOrAuthenticatedClient::new(read_trust_anchor(trust_anchor)?)
}
TlsClientAuth::Required(trust_anchor) => {
AllowAnyAuthenticatedClient::new(read_trust_anchor(trust_anchor)?)
}
};
let mut config = ServerConfig::new(client_auth);
config
.set_single_cert_with_ocsp_and_sct(cert, key, self.ocsp_resp, Vec::new())
.map_err(TlsConfigError::InvalidKey)?;
config.set_protocols(&["h2".into(), "http/1.1".into()]);
Ok(config)
}
}
impl Default for TlsConfigBuilder {
fn default() -> Self {
Self::new()
}
}
struct LazyFile {
path: PathBuf,
file: Option<File>,
}
impl LazyFile {
fn lazy_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.file.is_none() {
self.file = Some(File::open(&self.path)?);
}
self.file.as_mut().unwrap().read(buf)
}
}
impl Read for LazyFile {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.lazy_read(buf).map_err(|err| {
let kind = err.kind();
io::Error::new(
kind,
format!("error reading file ({:?}): {}", self.path.display(), err),
)
})
}
}
impl Transport for TlsStream {
fn remote_addr(&self) -> Option<SocketAddr> {
Some(self.remote_addr)
}
}
enum State {
Handshaking(tokio_rustls::Accept<AddrStream>),
Streaming(tokio_rustls::server::TlsStream<AddrStream>),
}
pub struct TlsStream {
state: State,
remote_addr: SocketAddr,
}
impl TlsStream {
fn new(stream: AddrStream, config: Arc<ServerConfig>) -> TlsStream {
let remote_addr = stream.remote_addr();
let accept = tokio_rustls::TlsAcceptor::from(config).accept(stream);
TlsStream {
state: State::Handshaking(accept),
remote_addr,
}
}
}
impl AsyncRead for TlsStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut ReadBuf,
) -> Poll<io::Result<()>> {
let pin = self.get_mut();
match pin.state {
State::Handshaking(ref mut accept) => match ready!(Pin::new(accept).poll(cx)) {
Ok(mut stream) => {
let result = Pin::new(&mut stream).poll_read(cx, buf);
pin.state = State::Streaming(stream);
result
}
Err(err) => Poll::Ready(Err(err)),
},
State::Streaming(ref mut stream) => Pin::new(stream).poll_read(cx, buf),
}
}
}
impl AsyncWrite for TlsStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let pin = self.get_mut();
match pin.state {
State::Handshaking(ref mut accept) => match ready!(Pin::new(accept).poll(cx)) {
Ok(mut stream) => {
let result = Pin::new(&mut stream).poll_write(cx, buf);
pin.state = State::Streaming(stream);
result
}
Err(err) => Poll::Ready(Err(err)),
},
State::Streaming(ref mut stream) => Pin::new(stream).poll_write(cx, buf),
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self.state {
State::Handshaking(_) => Poll::Ready(Ok(())),
State::Streaming(ref mut stream) => Pin::new(stream).poll_flush(cx),
}
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self.state {
State::Handshaking(_) => Poll::Ready(Ok(())),
State::Streaming(ref mut stream) => Pin::new(stream).poll_shutdown(cx),
}
}
}
pub struct TlsAcceptor {
config: Arc<ServerConfig>,
incoming: AddrIncoming,
}
impl TlsAcceptor {
pub fn new(config: ServerConfig, incoming: AddrIncoming) -> TlsAcceptor {
TlsAcceptor {
config: Arc::new(config),
incoming,
}
}
}
impl Accept for TlsAcceptor {
type Conn = TlsStream;
type Error = io::Error;
fn poll_accept(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let pin = self.get_mut();
match ready!(Pin::new(&mut pin.incoming).poll_accept(cx)) {
Some(Ok(sock)) => Poll::Ready(Some(Ok(TlsStream::new(sock, pin.config.clone())))),
Some(Err(e)) => Poll::Ready(Some(Err(e))),
None => Poll::Ready(None),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn file_cert_key() {
TlsConfigBuilder::new()
.cert_path("tests/tls/local.dev_cert.pem")
.key_path("tests/tls/local.dev_key.pem")
.build()
.unwrap();
}
#[test]
fn bytes_cert_key() {
let cert = include_str!("../tests/tls/local.dev_cert.pem");
let key = include_str!("../tests/tls/local.dev_key.pem");
TlsConfigBuilder::new()
.key(key.as_bytes())
.cert(cert.as_bytes())
.build()
.unwrap();
}
}
@@ -0,0 +1,56 @@
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use hyper::server::conn::AddrStream;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
pub trait Transport: AsyncRead + AsyncWrite {
fn remote_addr(&self) -> Option<SocketAddr>;
}
impl Transport for AddrStream {
fn remote_addr(&self) -> Option<SocketAddr> {
Some(self.remote_addr())
}
}
pub struct LiftIo<T>(pub T);
impl<T: AsyncRead + Unpin> AsyncRead for LiftIo<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
}
}
impl<T: AsyncWrite + Unpin> AsyncWrite for LiftIo<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.get_mut().0).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.get_mut().0).poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.get_mut().0).poll_shutdown(cx)
}
}
impl<T: AsyncRead + AsyncWrite + Unpin> Transport for LiftIo<T> {
fn remote_addr(&self) -> Option<SocketAddr> {
None
}
}
@@ -0,0 +1,26 @@
-----BEGIN CERTIFICATE-----
MIIEUDCCArigAwIBAgIRAJ+wfoSa2gGM7bMna/sxtB4wDQYJKoZIhvcNAQELBQAw
YTEeMBwGA1UEChMVbWtjZXJ0IGRldmVsb3BtZW50IENBMRswGQYDVQQLDBJqb3Nl
bHVpc3FAcXVpbnRhbmExIjAgBgNVBAMMGW1rY2VydCBqb3NlbHVpc3FAcXVpbnRh
bmEwHhcNMTkwODI1MjIzNDM4WhcNMjkwODI1MjIzNDM4WjBGMScwJQYDVQQKEx5t
a2NlcnQgZGV2ZWxvcG1lbnQgY2VydGlmaWNhdGUxGzAZBgNVBAsMEmpvc2VsdWlz
cUBxdWludGFuYTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAL7Lg4jL
kuabbfC2Qbv2iU+fCPKMht9LUT16VBh0cOxqpd75aTj6qikaDmYQZYJcAJYD2Hfh
fgP6dsT6/VRw7oWWYD/h9f7cz9xKjLRl/jBN1ob7VMbzJTFiJ4ajMZI5g/Yy6azC
/HEAlFGkXWfwblJPQdZHoQLksTSaHS5NR7RnmFMkgYxyaqIpkXNqUtyc+f5nUW6t
1VRoVBfG6V+LFY4IRYXoYehI5q+uK6w6jNEDHnDUTLagFc+D2UgMXQtG7TtvHAQz
jjTzpmb4pwmemkdc1xJlRa/1UdsPYHffjE2vUm6xrVJ07zvcxkS9gLwXKLLzuHnU
I2brgY0DdzFx3s0CAwEAAaOBnTCBmjAOBgNVHQ8BAf8EBAMCBaAwEwYDVR0lBAww
CgYIKwYBBQUHAwEwDAYDVR0TAQH/BAIwADAfBgNVHSMEGDAWgBTmeQONv1LFhIi6
WK47Dmc46TuFBDBEBgNVHREEPTA7gglsb2NhbC5kZXaCCyoubG9jYWwuZGV2ggls
b2NhbGhvc3SHBH8AAAGHEAAAAAAAAAAAAAAAAAAAAAEwDQYJKoZIhvcNAQELBQAD
ggGBADlgyQy/bwIekxRITUXnArLO9//C+I9hDOVs4GnY6nZJ0XwUOfWHq90U92Yh
YmCcQOBstYclBL9KzVHAOLa0erTEqbh1+2ZRrY8vzAf7RGwaZsE4uj6bB3KdOa00
zvkyHNYJnvL1xdOJAWckbaMgnBJwEGQGA9Bk5urozDYhbwIZS5PKXGPcLeiHIvn5
taC4x0fsCk4QkkPhOk92NjUD5t70vGQ5ty69fD11p1GOrC0szHZjnEdeW7SfPtsY
5qES+U9ppbJFeaFK/hhlRSdXjqk4a/P/HdM52QDvkrujk3DJYmNSQGdCa3fxiAnK
ivEBoYVIyVKRrCKNhyw8D4uWEUrMbsoo9/joAJYFOPHeYhSmkxA9HN0GvGBQ1MH4
zPd9B+hw90f8YokfGOH3dQiHAvvUyb1//uYN1FOlp/a9cTx0Y8oXTZuTvRL/259+
NjAizN+fctVbGloPEvTlxPkqveLNmzzJBk1bbj+Gt6tPqXN+DecNQMsMRzJ3HFOk
4EcwBQ==
-----END CERTIFICATE-----
@@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEwAIBADANBgkqhkiG9w0BAQEFAASCBKowggSmAgEAAoIBAQC+y4OIy5Lmm23w
tkG79olPnwjyjIbfS1E9elQYdHDsaqXe+Wk4+qopGg5mEGWCXACWA9h34X4D+nbE
+v1UcO6FlmA/4fX+3M/cSoy0Zf4wTdaG+1TG8yUxYieGozGSOYP2MumswvxxAJRR
pF1n8G5ST0HWR6EC5LE0mh0uTUe0Z5hTJIGMcmqiKZFzalLcnPn+Z1FurdVUaFQX
xulfixWOCEWF6GHoSOavriusOozRAx5w1Ey2oBXPg9lIDF0LRu07bxwEM44086Zm
+KcJnppHXNcSZUWv9VHbD2B334xNr1Jusa1SdO873MZEvYC8Fyiy87h51CNm64GN
A3cxcd7NAgMBAAECggEBALSchOR/CY3hvt4qOenMBMnpm5e3rYk9jCctYORRfgBf
KKv94Dy/FUuZTd4SUXVo0GkyNL2vKRJtC/eGPT+tNC4jXvO6XJspvl8j9zRihJCH
brgSvXsj+qZX62DJpYhth90M7yXK4xu51629cWqOMHEcdA97eRD7GkDYTx1grKs5
7ykYki3NNGQFDncSmQz/ZjHs/W44byVKdKVLUHWeexfkOFZ4tmr4gDcLG+M6f6m3
TTDOIdh9FvpNBOyg+GDWgJbn1nw6PYF3c2cOMQopRwAQKuHfVwpbF+zzxvtcCTkF
GmsprSdLTeXY4v2RT+kla9Hmgot1XIPY6iMvXUkkhwUCgYEA07BuPYWTxY0gfNo3
CrTNhhGyW4IA8wjwA57ao71Eg7vzhTub+sMZXCMMpFibIGD3pEcW8hG4ke5ghH3n
4jxNBCtFX0q3OHAbBtStX03iggsDoy8piYLxjHrRp+pxEDncwqFaIhWhR0S5wi/M
u2+hE0A9pWAhc/y+DWnoUZvFTL8CgYEA5rtyavAGMA0m8hN9uwMtmc7gSFp0oo8a
mm1pDFe8Z8Mv/SG16pYAuM3wUa+KqfdRXOf6vHvI4OmX1PuiqocECf6acOJu9lzg
bU0WTwoweusGISY5oYUzQ98lkbOVpGR5+1kslACVQmzvX8+EHqFIbdS2de29TGux
vj9drfYX23MCgYEAobKGwp+h/KiMRFI68QaiZuJlpthq+Tm+fEV/JMuR5j5PCVo7
DxSv7l0nbvHvrI/lGarjsAwxO+cl+o5h7cG54pFa8CsWQRoAyvrxY3cOqd7X7HI9
/Df1YiT+uJCvxIEuS80MGDUFeHbanaX9cL8X/qh3bjc71mkckwpu1sdxsekCgYEA
tYjnpeGBTM8cRDw3oSsH9srAxcx9leS3zqakjvR8pLr6h9O9GHu6x6woF2zg0Ydn
uYw/R4qw6tx+/DCbtEWUVPS/uG8/VJCQdw6+raNbr2o4oV4826s8QXtRSMidxQDU
xIBNxYiL5v5ke+J+lcbZgKhqgnBxjq3w47lhUFyeOqcCgYEAy3leJK+E9xtXcCGO
WszUz9LU6UqrTqtiFP29ZLmd+VG17/1bt8az6wChMqUJfHa7i8DuG1hANNmiJimE
lpTJY6rxzoq8wkaUi7MqZnkACeWLnLT9i5BZDPTwsNSxkKezYg7j3zoQVj4d86PN
A/DYsS6Gzzwo60cYfO/Kcfwb6vE=
-----END PRIVATE KEY-----