Handle proxy timeouts

This commit is contained in:
Alex 2022-01-24 19:38:13 +01:00
parent 5e5299a6d0
commit 21ea26bbff
No known key found for this signature in database
GPG Key ID: EDABF9711E244EB1
1 changed files with 33 additions and 4 deletions

View File

@ -1,6 +1,7 @@
use std::convert::Infallible; use std::convert::Infallible;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::{atomic::Ordering, Arc}; use std::sync::{atomic::Ordering, Arc};
use std::time::Duration;
use anyhow::Result; use anyhow::Result;
use log::*; use log::*;
@ -8,8 +9,7 @@ use log::*;
use accept_encoding_fork::Encoding; use accept_encoding_fork::Encoding;
use async_compression::tokio::bufread::*; use async_compression::tokio::bufread::*;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::{Future, StreamExt, TryStreamExt};
use futures::TryStreamExt;
use http::header::{HeaderName, HeaderValue}; use http::header::{HeaderName, HeaderValue};
use http::method::Method; use http::method::Method;
use hyper::server::conn::Http; use hyper::server::conn::Http;
@ -177,12 +177,18 @@ async fn handle(
let mut response = if proxy_to.https_target { let mut response = if proxy_to.https_target {
let to_addr = format!("https://{}", proxy_to.target_addr); let to_addr = format!("https://{}", proxy_to.target_addr);
reverse_proxy::call_https(remote_addr.ip(), &to_addr, req).await? handle_timeout_and_error(reverse_proxy::call_https(remote_addr.ip(), &to_addr, req))
.await
} else { } else {
let to_addr = format!("http://{}", proxy_to.target_addr); let to_addr = format!("http://{}", proxy_to.target_addr);
reverse_proxy::call(remote_addr.ip(), &to_addr, req).await? handle_timeout_and_error(reverse_proxy::call(remote_addr.ip(), &to_addr, req)).await
}; };
// Do further processing (compression, additionnal headers) only for 2xx responses
if !response.status().is_success() {
return Ok(response);
}
for (header, value) in proxy_to.add_headers.iter() { for (header, value) in proxy_to.add_headers.iter() {
response.headers_mut().insert( response.headers_mut().insert(
HeaderName::from_bytes(header.as_bytes())?, HeaderName::from_bytes(header.as_bytes())?,
@ -207,6 +213,29 @@ async fn handle(
} }
} }
async fn handle_timeout_and_error(
fut: impl Future<Output = Result<Response<Body>>>,
) -> Response<Body> {
select!(
resp = fut => {
match resp {
Ok(resp) => resp,
Err(e) =>
Response::builder()
.status(StatusCode::BAD_GATEWAY)
.body(Body::from(format!("Proxy error: {}", e)))
.unwrap(),
}
}
_ = tokio::time::sleep(Duration::from_secs(60)) => {
Response::builder()
.status(StatusCode::BAD_GATEWAY)
.body(Body::from("Proxy timeout"))
.unwrap()
}
)
}
async fn try_compress( async fn try_compress(
response: Response<Body>, response: Response<Body>,
method: Method, method: Method,