Make nodes aware of where they are and use that to priorize backends

This commit is contained in:
Alex 2022-01-13 11:31:08 +01:00
parent c030c47645
commit ced324bc87
No known key found for this signature in database
GPG Key ID: EDABF9711E244EB1
3 changed files with 56 additions and 11 deletions

View File

@ -23,6 +23,8 @@ pub struct ConsulNode {
pub node: String, pub node: String,
#[serde(rename = "Address")] #[serde(rename = "Address")]
pub address: String, pub address: String,
#[serde(rename = "Meta")]
pub meta: HashMap<String, String>,
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -117,14 +119,14 @@ impl Consul {
}) })
} }
pub async fn list_nodes(&self) -> Result<Vec<String>> { pub async fn list_nodes(&self) -> Result<Vec<ConsulNode>> {
debug!("list_nodes"); debug!("list_nodes");
let url = format!("{}/v1/catalog/nodes", self.url); let url = format!("{}/v1/catalog/nodes", self.url);
let http = self.client.get(&url).send().await?; let http = self.client.get(&url).send().await?;
let resp: Vec<ConsulNode> = http.json().await?; let resp: Vec<ConsulNode> = http.json().await?;
Ok(resp.into_iter().map(|n| n.node).collect::<Vec<_>>()) Ok(resp)
} }
pub async fn watch_node( pub async fn watch_node(

View File

@ -137,6 +137,8 @@ async fn handle(
.as_ref() .as_ref()
.map(|x| x.len() as i32) .map(|x| x.len() as i32)
.unwrap_or(0), .unwrap_or(0),
ent.same_node,
ent.same_site,
-ent.calls.load(Ordering::SeqCst), -ent.calls.load(Ordering::SeqCst),
) )
}); });

View File

@ -40,12 +40,27 @@ impl HostDescription {
#[derive(Debug)] #[derive(Debug)]
pub struct ProxyEntry { pub struct ProxyEntry {
/// Publicly exposed TLS hostnames for matching this rule
pub host: HostDescription,
/// Path prefix for matching this rule
pub path_prefix: Option<String>,
/// Priority with which this rule is considered (highest first)
pub priority: u32,
/// Node address (ip+port) to handle requests that match this entry
pub target_addr: SocketAddr, pub target_addr: SocketAddr,
/// Is the target serving HTTPS instead of HTTP?
pub https_target: bool, pub https_target: bool,
pub host: HostDescription, /// Is the target the same node as we are running on?
pub path_prefix: Option<String>, /// (if yes priorize it over other matching targets)
pub priority: u32, pub same_node: bool,
/// Is the target the same site as this node?
/// (if yes priorize it over other matching targets)
pub same_site: bool,
/// Add the following headers to all responses returned
/// when matching this rule
pub add_headers: Vec<(String, String)>, pub add_headers: Vec<(String, String)>,
// Counts the number of times this proxy server has been called to // Counts the number of times this proxy server has been called to
@ -70,6 +85,11 @@ impl std::fmt::Display for ProxyEntry {
self.path_prefix.as_ref().unwrap_or(&String::new()), self.path_prefix.as_ref().unwrap_or(&String::new()),
self.priority self.priority
)?; )?;
if self.same_node {
write!(f, " OURSELF")?;
} else if self.same_site {
write!(f, " SAME_SITE")?;
}
if !self.add_headers.is_empty() { if !self.add_headers.is_empty() {
write!(f, " +Headers: {:?}", self.add_headers)?; write!(f, " +Headers: {:?}", self.add_headers)?;
} }
@ -96,6 +116,8 @@ fn parse_tricot_tag(
tag: &str, tag: &str,
target_addr: SocketAddr, target_addr: SocketAddr,
add_headers: &[(String, String)], add_headers: &[(String, String)],
same_node: bool,
same_site: bool,
) -> Option<ProxyEntry> { ) -> Option<ProxyEntry> {
let splits = tag.split(' ').collect::<Vec<_>>(); let splits = tag.split(' ').collect::<Vec<_>>();
if (splits.len() != 2 && splits.len() != 3) if (splits.len() != 2 && splits.len() != 3)
@ -129,6 +151,8 @@ fn parse_tricot_tag(
target_addr, target_addr,
https_target: (splits[0] == "tricot-https"), https_target: (splits[0] == "tricot-https"),
host, host,
same_node,
same_site,
path_prefix, path_prefix,
priority, priority,
add_headers: add_headers.to_vec(), add_headers: add_headers.to_vec(),
@ -145,7 +169,11 @@ fn parse_tricot_add_header_tag(tag: &str) -> Option<(String, String)> {
} }
} }
fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> Vec<ProxyEntry> { fn parse_consul_catalog(
catalog: &ConsulNodeCatalog,
same_node: bool,
same_site: bool,
) -> Vec<ProxyEntry> {
trace!("Parsing node catalog: {:#?}", catalog); trace!("Parsing node catalog: {:#?}", catalog);
let mut entries = vec![]; let mut entries = vec![];
@ -174,7 +202,7 @@ fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> Vec<ProxyEntry> {
} }
for tag in svc.tags.iter() { for tag in svc.tags.iter() {
if let Some(ent) = parse_tricot_tag(tag, addr, &add_headers[..]) { if let Some(ent) = parse_tricot_tag(tag, addr, &add_headers[..], same_node, same_site) {
entries.push(ent); entries.push(ent);
} }
} }
@ -206,12 +234,15 @@ pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver<Arc<ProxyConfi
let mut nodes = HashMap::new(); let mut nodes = HashMap::new();
let mut watches = FuturesUnordered::<BoxFuture<'static, (String, Result<_>)>>::new(); let mut watches = FuturesUnordered::<BoxFuture<'static, (String, Result<_>)>>::new();
let mut node_site = HashMap::new();
loop { loop {
match consul.list_nodes().await { match consul.list_nodes().await {
Ok(consul_nodes) => { Ok(consul_nodes) => {
info!("Watched consul nodes: {:?}", consul_nodes); info!("Watched consul nodes: {:?}", consul_nodes);
for node in consul_nodes { for consul_node in consul_nodes {
if !nodes.contains_key(&node) { let node = &consul_node.node;
if !nodes.contains_key(node) {
nodes.insert(node.clone(), NodeWatchState::default()); nodes.insert(node.clone(), NodeWatchState::default());
let node = node.to_string(); let node = node.to_string();
@ -222,6 +253,9 @@ pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver<Arc<ProxyConfi
(node, res) (node, res)
})); }));
} }
if let Some(site) = consul_node.meta.get("site") {
node_site.insert(node.clone(), site.clone());
}
} }
} }
Err(e) => { Err(e) => {
@ -277,9 +311,16 @@ pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver<Arc<ProxyConfi
} }
let mut entries = vec![]; let mut entries = vec![];
for (_, watch_state) in nodes.iter() { for (node_name, watch_state) in nodes.iter() {
if let Some(catalog) = &watch_state.last_catalog { if let Some(catalog) = &watch_state.last_catalog {
entries.extend(parse_consul_catalog(catalog)); let same_node = *node_name == consul.local_node;
let same_site =
match (node_site.get(node_name), node_site.get(&consul.local_node)) {
(Some(s1), Some(s2)) => s1 == s2,
_ => false,
};
entries.extend(parse_consul_catalog(catalog, same_node, same_site));
} }
} }
let config = ProxyConfig { entries }; let config = ProxyConfig { entries };