Change scheduling algo to deprioritize slow backends + refactoring
continuous-integration/drone/push Build was killed Details

This commit is contained in:
Alex 2022-12-06 14:02:32 +01:00
parent ba5bf133f6
commit 8d1162f206
Signed by: lx
GPG Key ID: 0E496D15096376BE
4 changed files with 88 additions and 68 deletions

View File

@ -1,4 +1,4 @@
FROM rust:1.58-buster as builder FROM rust:1.65-buster as builder
RUN apt-get update && \ RUN apt-get update && \
apt-get install -y libssl-dev pkg-config apt-get install -y libssl-dev pkg-config

View File

@ -24,7 +24,7 @@ use tokio_util::io::{ReaderStream, StreamReader};
use opentelemetry::{metrics, KeyValue}; use opentelemetry::{metrics, KeyValue};
use crate::cert_store::{CertStore, StoreResolver}; use crate::cert_store::{CertStore, StoreResolver};
use crate::proxy_config::ProxyConfig; use crate::proxy_config::{ProxyConfig, ProxyEntry};
use crate::reverse_proxy; use crate::reverse_proxy;
const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(24 * 3600); const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(24 * 3600);
@ -33,6 +33,7 @@ pub struct HttpsConfig {
pub bind_addr: SocketAddr, pub bind_addr: SocketAddr,
pub enable_compression: bool, pub enable_compression: bool,
pub compress_mime_types: Vec<String>, pub compress_mime_types: Vec<String>,
pub time_origin: Instant,
} }
struct HttpsMetrics { struct HttpsMetrics {
@ -110,7 +111,13 @@ pub async fn serve_https(
let proxy_config: Arc<ProxyConfig> = let proxy_config: Arc<ProxyConfig> =
rx_proxy_config.borrow().clone(); rx_proxy_config.borrow().clone();
let metrics = metrics.clone(); let metrics = metrics.clone();
handle_outer(remote_addr, req, https_config, proxy_config, metrics) handle_request(
remote_addr,
req,
https_config,
proxy_config,
metrics,
)
}), }),
) )
.with_upgrades(); .with_upgrades();
@ -145,7 +152,7 @@ pub async fn serve_https(
Ok(()) Ok(())
} }
async fn handle_outer( async fn handle_request(
remote_addr: SocketAddr, remote_addr: SocketAddr,
req: Request<Body>, req: Request<Body>,
https_config: Arc<HttpsConfig>, https_config: Arc<HttpsConfig>,
@ -169,25 +176,15 @@ async fn handle_outer(
]; ];
metrics.requests_received.add(1, &tags); metrics.requests_received.add(1, &tags);
let resp = match handle( let resp = select_target_and_proxy(
&https_config,
&proxy_config,
&metrics,
remote_addr, remote_addr,
req, req,
https_config,
proxy_config,
&mut tags, &mut tags,
&metrics,
) )
.await .await;
{
Err(e) => {
warn!("Handler error: {}", e);
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{}", e)))
.unwrap()
}
Ok(r) => r,
};
tags.push(KeyValue::new( tags.push(KeyValue::new(
"status_code", "status_code",
@ -200,14 +197,14 @@ async fn handle_outer(
// Custom echo service, handling two different routes and a // Custom echo service, handling two different routes and a
// catch-all 404 responder. // catch-all 404 responder.
async fn handle( async fn select_target_and_proxy(
https_config: &HttpsConfig,
proxy_config: &ProxyConfig,
metrics: &HttpsMetrics,
remote_addr: SocketAddr, remote_addr: SocketAddr,
req: Request<Body>, req: Request<Body>,
https_config: Arc<HttpsConfig>,
proxy_config: Arc<ProxyConfig>,
tags: &mut Vec<KeyValue>, tags: &mut Vec<KeyValue>,
metrics: &HttpsMetrics, ) -> Response<Body> {
) -> Result<Response<Body>, anyhow::Error> {
let received_time = Instant::now(); let received_time = Instant::now();
let method = req.method().clone(); let method = req.method().clone();
@ -216,13 +213,17 @@ async fn handle(
let host = if let Some(auth) = req.uri().authority() { let host = if let Some(auth) = req.uri().authority() {
auth.as_str() auth.as_str()
} else { } else {
req.headers() match req.headers().get("host").map(|x| x.to_str().ok()).flatten() {
.get("host") Some(host) => host,
.ok_or_else(|| anyhow!("Missing host header"))? None => {
.to_str()? return Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from("Missing Host header"))
.unwrap();
}
}
}; };
let path = req.uri().path(); let path = req.uri().path();
let accept_encoding = accept_encoding_fork::encodings(req.headers()).unwrap_or_else(|_| vec![]);
let best_match = proxy_config let best_match = proxy_config
.entries .entries
@ -244,7 +245,8 @@ async fn handle(
.unwrap_or(0), .unwrap_or(0),
ent.same_node, ent.same_node,
ent.same_site, ent.same_site,
-ent.calls.load(Ordering::SeqCst), -ent.calls_in_progress.load(Ordering::SeqCst),
-ent.last_call.load(Ordering::SeqCst),
) )
}); });
@ -257,59 +259,74 @@ async fn handle(
tags.push(KeyValue::new("same_node", proxy_to.same_node.to_string())); tags.push(KeyValue::new("same_node", proxy_to.same_node.to_string()));
tags.push(KeyValue::new("same_site", proxy_to.same_site.to_string())); tags.push(KeyValue::new("same_site", proxy_to.same_site.to_string()));
proxy_to.calls.fetch_add(1, Ordering::SeqCst); proxy_to.last_call.fetch_max(
(received_time - https_config.time_origin).as_millis() as i64,
Ordering::Relaxed,
);
proxy_to.calls_in_progress.fetch_add(1, Ordering::SeqCst);
debug!("{}{} -> {}", host, path, proxy_to); debug!("{}{} -> {}", host, path, proxy_to);
trace!("Request: {:?}", req); trace!("Request: {:?}", req);
let mut response = if proxy_to.https_target { let response = match do_proxy(&https_config, remote_addr, req, proxy_to).await {
let to_addr = format!("https://{}", proxy_to.target_addr); Ok(resp) => resp,
handle_error(reverse_proxy::call_https(remote_addr.ip(), &to_addr, req).await) Err(e) => Response::builder()
} else { .status(StatusCode::BAD_GATEWAY)
let to_addr = format!("http://{}", proxy_to.target_addr); .body(Body::from(format!("Proxy error: {}", e)))
handle_error(reverse_proxy::call(remote_addr.ip(), &to_addr, req).await) .unwrap(),
}; };
proxy_to.calls_in_progress.fetch_sub(1, Ordering::SeqCst);
metrics metrics
.request_proxy_duration .request_proxy_duration
.record(received_time.elapsed().as_secs_f64(), &tags); .record(received_time.elapsed().as_secs_f64(), &tags);
if response.status().is_success() {
// (TODO: maybe we want to add these headers even if it's not a success?)
for (header, value) in proxy_to.add_headers.iter() {
response.headers_mut().insert(
HeaderName::from_bytes(header.as_bytes())?,
HeaderValue::from_str(value)?,
);
}
}
if https_config.enable_compression {
response =
try_compress(response, method.clone(), accept_encoding, &https_config).await?
};
trace!("Final response: {:?}", response); trace!("Final response: {:?}", response);
info!("{} {} {}", method, response.status().as_u16(), uri); info!("{} {} {}", method, response.status().as_u16(), uri);
Ok(response) response
} else { } else {
debug!("{}{} -> NOT FOUND", host, path); debug!("{}{} -> NOT FOUND", host, path);
info!("{} 404 {}", method, uri); info!("{} 404 {}", method, uri);
Ok(Response::builder() Response::builder()
.status(StatusCode::NOT_FOUND) .status(StatusCode::NOT_FOUND)
.body(Body::from("No matching proxy entry"))?) .body(Body::from("No matching proxy entry"))
.unwrap()
} }
} }
fn handle_error(resp: Result<Response<Body>>) -> Response<Body> { async fn do_proxy(
match resp { https_config: &HttpsConfig,
Ok(resp) => resp, remote_addr: SocketAddr,
Err(e) => Response::builder() req: Request<Body>,
.status(StatusCode::BAD_GATEWAY) proxy_to: &ProxyEntry,
.body(Body::from(format!("Proxy error: {}", e))) ) -> Result<Response<Body>> {
.unwrap(), let method = req.method().clone();
let accept_encoding = accept_encoding_fork::encodings(req.headers()).unwrap_or_else(|_| vec![]);
let mut response = if proxy_to.https_target {
let to_addr = format!("https://{}", proxy_to.target_addr);
reverse_proxy::call_https(remote_addr.ip(), &to_addr, req).await?
} else {
let to_addr = format!("http://{}", proxy_to.target_addr);
reverse_proxy::call(remote_addr.ip(), &to_addr, req).await?
};
if response.status().is_success() {
// (TODO: maybe we want to add these headers even if it's not a success?)
for (header, value) in proxy_to.add_headers.iter() {
response.headers_mut().insert(
HeaderName::from_bytes(header.as_bytes())?,
HeaderValue::from_str(value)?,
);
}
} }
if https_config.enable_compression {
response = try_compress(response, method, accept_encoding, &https_config).await?
};
Ok(response)
} }
async fn try_compress( async fn try_compress(

View File

@ -3,6 +3,7 @@ extern crate anyhow;
use log::*; use log::*;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use futures::{FutureExt, TryFutureExt}; use futures::{FutureExt, TryFutureExt};
use std::net::SocketAddr; use std::net::SocketAddr;
@ -175,6 +176,7 @@ async fn main() {
.split(',') .split(',')
.map(|x| x.to_string()) .map(|x| x.to_string())
.collect(), .collect(),
time_origin: Instant::now(),
}; };
let https_task = tokio::spawn( let https_task = tokio::spawn(

View File

@ -75,10 +75,10 @@ pub struct ProxyEntry {
/// when matching this rule /// when matching this rule
pub add_headers: Vec<(String, String)>, pub add_headers: Vec<(String, String)>,
// Counts the number of times this proxy server has been called to /// Number of calls in progress, used to deprioritize slow back-ends
// This implements a round-robin load balancer if there are multiple pub calls_in_progress: atomic::AtomicI64,
// entries for the same host and same path prefix. /// Time of last call, used for round-robin selection
pub calls: atomic::AtomicI64, pub last_call: atomic::AtomicI64,
} }
impl std::fmt::Display for ProxyEntry { impl std::fmt::Display for ProxyEntry {
@ -102,7 +102,7 @@ impl std::fmt::Display for ProxyEntry {
if !self.add_headers.is_empty() { if !self.add_headers.is_empty() {
write!(f, " +Headers: {:?}", self.add_headers)?; write!(f, " +Headers: {:?}", self.add_headers)?;
} }
write!(f, " ({})", self.calls.load(atomic::Ordering::Relaxed)) Ok(())
} }
} }
@ -167,7 +167,8 @@ fn parse_tricot_tag(
path_prefix, path_prefix,
priority, priority,
add_headers: add_headers.to_vec(), add_headers: add_headers.to_vec(),
calls: atomic::AtomicI64::from(0), last_call: atomic::AtomicI64::from(0),
calls_in_progress: atomic::AtomicI64::from(0),
}) })
} }