Discovery via consul
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing

This commit is contained in:
Alex 2021-10-21 00:29:50 +02:00
parent 6455347955
commit 5e986c443f
No known key found for this signature in database
GPG key ID: EDABF9711E244EB1
4 changed files with 125 additions and 10 deletions

2
Cargo.lock generated
View file

@ -874,7 +874,7 @@ dependencies = [
[[package]] [[package]]
name = "netapp" name = "netapp"
version = "0.3.0" version = "0.3.0"
source = "git+https://git.deuxfleurs.fr/lx/netapp#de981aace0e47a1fa65b38212ac21d91e52f7c15" source = "git+https://git.deuxfleurs.fr/lx/netapp#e9add586a5fd6304473b9138b920e325629346f5"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",

View file

@ -4,19 +4,21 @@ use std::net::{IpAddr, SocketAddr};
use hyper::client::Client; use hyper::client::Client;
use hyper::StatusCode; use hyper::StatusCode;
use hyper::{Body, Method, Request}; use hyper::{Body, Method, Request};
use serde::Deserialize; use serde::{Deserialize, Serialize};
use netapp::NodeID; use netapp::NodeID;
use garage_util::error::Error; use garage_util::error::Error;
// ---- READING FROM CONSUL CATALOG ----
#[derive(Deserialize, Clone, Debug)] #[derive(Deserialize, Clone, Debug)]
struct ConsulEntry { struct ConsulQueryEntry {
#[serde(alias = "Address")] #[serde(rename = "Address")]
address: String, address: String,
#[serde(alias = "ServicePort")] #[serde(rename = "ServicePort")]
service_port: u16, service_port: u16,
#[serde(alias = "NodeMeta")] #[serde(rename = "NodeMeta")]
node_meta: HashMap<String, String>, node_meta: HashMap<String, String>,
} }
@ -41,7 +43,7 @@ pub async fn get_consul_nodes(
} }
let body = hyper::body::to_bytes(resp.into_body()).await?; let body = hyper::body::to_bytes(resp.into_body()).await?;
let entries = serde_json::from_slice::<Vec<ConsulEntry>>(body.as_ref())?; let entries = serde_json::from_slice::<Vec<ConsulQueryEntry>>(body.as_ref())?;
let mut ret = vec![]; let mut ret = vec![];
for ent in entries { for ent in entries {
@ -66,3 +68,86 @@ pub async fn get_consul_nodes(
Ok(ret) Ok(ret)
} }
// ---- PUBLISHING TO CONSUL CATALOG ----
#[derive(Serialize, Clone, Debug)]
struct ConsulPublishEntry {
#[serde(rename = "Node")]
node: String,
#[serde(rename = "Address")]
address: IpAddr,
#[serde(rename = "NodeMeta")]
node_meta: HashMap<String, String>,
#[serde(rename = "Service")]
service: ConsulPublishService,
}
#[derive(Serialize, Clone, Debug)]
struct ConsulPublishService {
#[serde(rename = "ID")]
service_id: String,
#[serde(rename = "Service")]
service_name: String,
#[serde(rename = "Tags")]
tags: Vec<String>,
#[serde(rename = "Address")]
address: IpAddr,
#[serde(rename = "Port")]
port: u16,
}
pub async fn publish_consul_service(
consul_host: &str,
consul_service_name: &str,
node_id: NodeID,
hostname: &str,
rpc_public_addr: SocketAddr,
) -> Result<(), Error> {
let node = format!("garage:{}", hex::encode(&node_id[..8]));
let advertisment = ConsulPublishEntry {
node: node.clone(),
address: rpc_public_addr.ip(),
node_meta: [
("pubkey".to_string(), hex::encode(node_id)),
("hostname".to_string(), hostname.to_string()),
]
.iter()
.cloned()
.collect(),
service: ConsulPublishService {
service_id: node.clone(),
service_name: consul_service_name.to_string(),
tags: vec!["advertised-by-garage".into(), hostname.into()],
address: rpc_public_addr.ip(),
port: rpc_public_addr.port(),
},
};
let url = format!("http://{}/v1/catalog/register", consul_host);
let req_body = serde_json::to_string(&advertisment)?;
debug!("Request body for consul adv: {}", req_body);
let req = Request::builder()
.uri(url)
.method(Method::PUT)
.body(Body::from(req_body))?;
let client = Client::new();
let resp = client.request(req).await?;
debug!("Response of advertising to Consul: {:?}", resp);
let resp_code = resp.status();
debug!(
"{}",
std::str::from_utf8(&hyper::body::to_bytes(resp.into_body()).await?)
.unwrap_or("<invalid utf8>")
);
if resp_code != StatusCode::OK {
return Err(Error::Message(format!("HTTP error {}", resp_code)));
}
Ok(())
}

View file

@ -28,7 +28,7 @@ use garage_util::error::Error;
use garage_util::persister::Persister; use garage_util::persister::Persister;
use garage_util::time::*; use garage_util::time::*;
use crate::consul::get_consul_nodes; use crate::consul::*;
use crate::ring::*; use crate::ring::*;
use crate::rpc_helper::*; use crate::rpc_helper::*;
@ -80,6 +80,7 @@ pub struct System {
system_endpoint: Arc<Endpoint<SystemRpc, System>>, system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr, rpc_listen_addr: SocketAddr,
rpc_public_addr: Option<SocketAddr>,
bootstrap_peers: Vec<(NodeID, SocketAddr)>, bootstrap_peers: Vec<(NodeID, SocketAddr)>,
consul_host: Option<String>, consul_host: Option<String>,
consul_service_name: Option<String>, consul_service_name: Option<String>,
@ -199,6 +200,7 @@ impl System {
system_endpoint, system_endpoint,
replication_factor, replication_factor,
rpc_listen_addr: config.rpc_bind_addr, rpc_listen_addr: config.rpc_bind_addr,
rpc_public_addr: config.rpc_public_addr,
bootstrap_peers: config.bootstrap_peers.clone(), bootstrap_peers: config.bootstrap_peers.clone(),
consul_host: config.consul_host.clone(), consul_host: config.consul_host.clone(),
consul_service_name: config.consul_service_name.clone(), consul_service_name: config.consul_service_name.clone(),
@ -224,6 +226,32 @@ impl System {
// ---- INTERNALS ---- // ---- INTERNALS ----
async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> {
let (consul_host, consul_service_name) =
match (&self.consul_host, &self.consul_service_name) {
(Some(ch), Some(csn)) => (ch, csn),
_ => return Ok(()),
};
let rpc_public_addr = match self.rpc_public_addr {
Some(addr) => addr,
None => {
warn!("Not advertising to Consul because rpc_public_addr is not defined in config file.");
return Ok(());
}
};
publish_consul_service(
consul_host,
consul_service_name,
self.netapp.id,
&self.local_status.load_full().hostname,
rpc_public_addr,
)
.await
.map_err(|e| Error::Message(format!("Error while publishing Consul service: {}", e)))
}
/// Save network configuration to disc /// Save network configuration to disc
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> { async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
let ring: Arc<Ring> = self.ring.borrow().clone(); let ring: Arc<Ring> = self.ring.borrow().clone();
@ -375,7 +403,7 @@ impl System {
} }
} }
async fn discovery_loop(&self, mut stop_signal: watch::Receiver<bool>) { async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
let consul_config = match (&self.consul_host, &self.consul_service_name) { let consul_config = match (&self.consul_host, &self.consul_service_name) {
(Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())), (Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())),
_ => None, _ => None,
@ -419,6 +447,8 @@ impl System {
} }
} }
self.background.spawn(self.clone().advertise_to_consul());
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! { select! {
_ = restart_at.fuse() => {}, _ = restart_at.fuse() => {},

View file

@ -41,7 +41,7 @@ pub struct Config {
/// Bootstrap peers RPC address /// Bootstrap peers RPC address
#[serde(deserialize_with = "deserialize_vec_addr")] #[serde(deserialize_with = "deserialize_vec_addr")]
pub bootstrap_peers: Vec<(NodeID, SocketAddr)>, pub bootstrap_peers: Vec<(NodeID, SocketAddr)>,
/// Consule host to connect to to discover more peers /// Consul host to connect to to discover more peers
pub consul_host: Option<String>, pub consul_host: Option<String>,
/// Consul service name to use /// Consul service name to use
pub consul_service_name: Option<String>, pub consul_service_name: Option<String>,