From 21ea26bbff86702b62de54392989a95b39347637 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 24 Jan 2022 19:38:13 +0100 Subject: [PATCH] Handle proxy timeouts --- src/https.rs | 37 +++++++++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/src/https.rs b/src/https.rs index 6b1f5e7..6709d43 100644 --- a/src/https.rs +++ b/src/https.rs @@ -1,6 +1,7 @@ use std::convert::Infallible; use std::net::SocketAddr; use std::sync::{atomic::Ordering, Arc}; +use std::time::Duration; use anyhow::Result; use log::*; @@ -8,8 +9,7 @@ use log::*; use accept_encoding_fork::Encoding; use async_compression::tokio::bufread::*; use futures::stream::FuturesUnordered; -use futures::StreamExt; -use futures::TryStreamExt; +use futures::{Future, StreamExt, TryStreamExt}; use http::header::{HeaderName, HeaderValue}; use http::method::Method; use hyper::server::conn::Http; @@ -177,12 +177,18 @@ async fn handle( let mut response = if proxy_to.https_target { let to_addr = format!("https://{}", proxy_to.target_addr); - reverse_proxy::call_https(remote_addr.ip(), &to_addr, req).await? + handle_timeout_and_error(reverse_proxy::call_https(remote_addr.ip(), &to_addr, req)) + .await } else { let to_addr = format!("http://{}", proxy_to.target_addr); - reverse_proxy::call(remote_addr.ip(), &to_addr, req).await? + handle_timeout_and_error(reverse_proxy::call(remote_addr.ip(), &to_addr, req)).await }; + // Do further processing (compression, additionnal headers) only for 2xx responses + if !response.status().is_success() { + return Ok(response); + } + for (header, value) in proxy_to.add_headers.iter() { response.headers_mut().insert( HeaderName::from_bytes(header.as_bytes())?, @@ -207,6 +213,29 @@ async fn handle( } } +async fn handle_timeout_and_error( + fut: impl Future>>, +) -> Response { + select!( + resp = fut => { + match resp { + Ok(resp) => resp, + Err(e) => + Response::builder() + .status(StatusCode::BAD_GATEWAY) + .body(Body::from(format!("Proxy error: {}", e))) + .unwrap(), + } + } + _ = tokio::time::sleep(Duration::from_secs(60)) => { + Response::builder() + .status(StatusCode::BAD_GATEWAY) + .body(Body::from("Proxy timeout")) + .unwrap() + } + ) +} + async fn try_compress( response: Response, method: Method,