Compare commits

..

No commits in common. "7e0107c47db71e8da13990c9111ebde8cbf60d8f" and "ce69dc302c6eaad4fe5268cca3511620fcca12f8" have entirely different histories.

5 changed files with 122 additions and 120 deletions

View file

@ -27,7 +27,7 @@ pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<
i.id, i.id,
NodeResp { NodeResp {
id: hex::encode(i.id), id: hex::encode(i.id),
addr: i.addr, addr: Some(i.addr),
hostname: i.status.hostname, hostname: i.status.hostname,
is_up: i.is_up, is_up: i.is_up,
last_seen_secs_ago: i.last_seen_secs_ago, last_seen_secs_ago: i.last_seen_secs_ago,

View file

@ -57,10 +57,6 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()]; vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()];
for adv in status.iter().filter(|adv| adv.is_up) { for adv in status.iter().filter(|adv| adv.is_up) {
let host = adv.status.hostname.as_deref().unwrap_or("?"); let host = adv.status.hostname.as_deref().unwrap_or("?");
let addr = match adv.addr {
Some(addr) => addr.to_string(),
None => "N/A".to_string(),
};
if let Some(NodeRoleV(Some(cfg))) = layout.current().roles.get(&adv.id) { if let Some(NodeRoleV(Some(cfg))) = layout.current().roles.get(&adv.id) {
let data_avail = match &adv.status.data_disk_avail { let data_avail = match &adv.status.data_disk_avail {
_ if cfg.capacity.is_none() => "N/A".into(), _ if cfg.capacity.is_none() => "N/A".into(),
@ -75,7 +71,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}", "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}",
id = adv.id, id = adv.id,
host = host, host = host,
addr = addr, addr = adv.addr,
tags = cfg.tags.join(","), tags = cfg.tags.join(","),
zone = cfg.zone, zone = cfg.zone,
capacity = cfg.capacity_string(), capacity = cfg.capacity_string(),
@ -95,7 +91,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\tdraining metadata...", "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\tdraining metadata...",
id = adv.id, id = adv.id,
host = host, host = host,
addr = addr, addr = adv.addr,
tags = cfg.tags.join(","), tags = cfg.tags.join(","),
zone = cfg.zone, zone = cfg.zone,
)); ));
@ -108,7 +104,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
"{id:?}\t{h}\t{addr}\t\t\t{new_role}", "{id:?}\t{h}\t{addr}\t\t\t{new_role}",
id = adv.id, id = adv.id,
h = host, h = host,
addr = addr, addr = adv.addr,
new_role = new_role, new_role = new_role,
)); ));
} }
@ -124,7 +120,8 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
let tf = timeago::Formatter::new(); let tf = timeago::Formatter::new();
let mut drain_msg = false; let mut drain_msg = false;
let mut failed_nodes = vec!["ID\tHostname\tTags\tZone\tCapacity\tLast seen".to_string()]; let mut failed_nodes =
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
let mut listed = HashSet::new(); let mut listed = HashSet::new();
for ver in layout.versions.iter().rev() { for ver in layout.versions.iter().rev() {
for (node, _, role) in ver.roles.items().iter() { for (node, _, role) in ver.roles.items().iter() {
@ -145,14 +142,15 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
// Node is in a layout version, is not a gateway node, and is not up: // Node is in a layout version, is not a gateway node, and is not up:
// it is in a failed state, add proper line to the output // it is in a failed state, add proper line to the output
let (host, last_seen) = match adv { let (host, addr, last_seen) = match adv {
Some(adv) => ( Some(adv) => (
adv.status.hostname.as_deref().unwrap_or("?"), adv.status.hostname.as_deref().unwrap_or("?"),
adv.addr.to_string(),
adv.last_seen_secs_ago adv.last_seen_secs_ago
.map(|s| tf.convert(Duration::from_secs(s))) .map(|s| tf.convert(Duration::from_secs(s)))
.unwrap_or_else(|| "never seen".into()), .unwrap_or_else(|| "never seen".into()),
), ),
None => ("??", "never seen".into()), None => ("??", "??".into(), "never seen".into()),
}; };
let capacity = if ver.version == layout.current().version { let capacity = if ver.version == layout.current().version {
cfg.capacity_string() cfg.capacity_string()
@ -161,9 +159,10 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
"draining metadata...".to_string() "draining metadata...".to_string()
}; };
failed_nodes.push(format!( failed_nodes.push(format!(
"{id:?}\t{host}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}", "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
id = node, id = node,
host = host, host = host,
addr = addr,
tags = cfg.tags.join(","), tags = cfg.tags.join(","),
zone = cfg.zone, zone = cfg.zone,
capacity = capacity, capacity = capacity,

View file

@ -292,7 +292,13 @@ impl NetApp {
/// the other node with `Netapp::request` /// the other node with `Netapp::request`
pub async fn try_connect(self: Arc<Self>, ip: SocketAddr, id: NodeID) -> Result<(), Error> { pub async fn try_connect(self: Arc<Self>, ip: SocketAddr, id: NodeID) -> Result<(), Error> {
// Don't connect to ourself, we don't care // Don't connect to ourself, we don't care
// but pretend we did
if id == self.id { if id == self.id {
tokio::spawn(async move {
if let Some(h) = self.on_connected_handler.load().as_ref() {
h(id, ip, false);
}
});
return Ok(()); return Ok(());
} }
@ -321,32 +327,31 @@ impl NetApp {
/// Close the outgoing connection we have to a node specified by its public key, /// Close the outgoing connection we have to a node specified by its public key,
/// if such a connection is currently open. /// if such a connection is currently open.
pub fn disconnect(self: &Arc<Self>, id: &NodeID) { pub fn disconnect(self: &Arc<Self>, id: &NodeID) {
let conn = self.client_conns.write().unwrap().remove(id);
// If id is ourself, we're not supposed to have a connection open // If id is ourself, we're not supposed to have a connection open
if *id == self.id { if *id != self.id {
// sanity check let conn = self.client_conns.write().unwrap().remove(id);
assert!(conn.is_none(), "had a connection to local node"); if let Some(c) = conn {
return; debug!(
"Closing connection to {} ({})",
hex::encode(&c.peer_id[..8]),
c.remote_addr
);
c.close();
} else {
return;
}
} }
if let Some(c) = conn { // call on_disconnected_handler immediately, since the connection
debug!( // was removed
"Closing connection to {} ({})", // (if id == self.id, we pretend we disconnected)
hex::encode(&c.peer_id[..8]), let id = *id;
c.remote_addr let self2 = self.clone();
); tokio::spawn(async move {
c.close(); if let Some(h) = self2.on_disconnected_handler.load().as_ref() {
h(id, false);
// call on_disconnected_handler immediately, since the connection was removed }
let id = *id; });
let self2 = self.clone();
tokio::spawn(async move {
if let Some(h) = self2.on_disconnected_handler.load().as_ref() {
h(id, false);
}
});
}
} }
// Called from conn.rs when an incoming connection is successfully established // Called from conn.rs when an incoming connection is successfully established

View file

@ -54,8 +54,12 @@ impl Message for PeerListMessage {
#[derive(Debug)] #[derive(Debug)]
struct PeerInfoInternal { struct PeerInfoInternal {
// known_addrs contains all of the addresses everyone gave us // addr is the currently connected address,
known_addrs: Vec<SocketAddr>, // or the last address we were connected to,
// or an arbitrary address some other peer gave us
addr: SocketAddr,
// all_addrs contains all of the addresses everyone gave us
all_addrs: Vec<SocketAddr>,
state: PeerConnState, state: PeerConnState,
last_send_ping: Option<Instant>, last_send_ping: Option<Instant>,
@ -65,9 +69,10 @@ struct PeerInfoInternal {
} }
impl PeerInfoInternal { impl PeerInfoInternal {
fn new(state: PeerConnState, known_addr: Option<SocketAddr>) -> Self { fn new(addr: SocketAddr, state: PeerConnState) -> Self {
Self { Self {
known_addrs: known_addr.map(|x| vec![x]).unwrap_or_default(), addr,
all_addrs: vec![addr],
state, state,
last_send_ping: None, last_send_ping: None,
last_seen: None, last_seen: None,
@ -76,8 +81,8 @@ impl PeerInfoInternal {
} }
} }
fn add_addr(&mut self, addr: SocketAddr) -> bool { fn add_addr(&mut self, addr: SocketAddr) -> bool {
if !self.known_addrs.contains(&addr) { if !self.all_addrs.contains(&addr) {
self.known_addrs.push(addr); self.all_addrs.push(addr);
// If we are learning a new address for this node, // If we are learning a new address for this node,
// we want to retry connecting // we want to retry connecting
self.state = match self.state { self.state = match self.state {
@ -85,7 +90,7 @@ impl PeerInfoInternal {
PeerConnState::Waiting(_, _) | PeerConnState::Abandonned => { PeerConnState::Waiting(_, _) | PeerConnState::Abandonned => {
PeerConnState::Waiting(0, Instant::now()) PeerConnState::Waiting(0, Instant::now())
} }
x @ (PeerConnState::Ourself | PeerConnState::Connected { .. }) => x, x @ (PeerConnState::Ourself | PeerConnState::Connected) => x,
}; };
true true
} else { } else {
@ -99,6 +104,8 @@ impl PeerInfoInternal {
pub struct PeerInfo { pub struct PeerInfo {
/// The node's identifier (its public key) /// The node's identifier (its public key)
pub id: NodeID, pub id: NodeID,
/// The node's network address
pub addr: SocketAddr,
/// The current status of our connection to this node /// The current status of our connection to this node
pub state: PeerConnState, pub state: PeerConnState,
/// The last time at which the node was seen /// The last time at which the node was seen
@ -129,7 +136,7 @@ pub enum PeerConnState {
Ourself, Ourself,
/// We currently have a connection to this peer /// We currently have a connection to this peer
Connected { addr: SocketAddr }, Connected,
/// Our next connection tentative (the nth, where n is the first value of the tuple) /// Our next connection tentative (the nth, where n is the first value of the tuple)
/// will be at given Instant /// will be at given Instant
@ -145,7 +152,7 @@ pub enum PeerConnState {
impl PeerConnState { impl PeerConnState {
/// Returns true if we can currently send requests to this peer /// Returns true if we can currently send requests to this peer
pub fn is_up(&self) -> bool { pub fn is_up(&self) -> bool {
matches!(self, Self::Ourself | Self::Connected { .. }) matches!(self, Self::Ourself | Self::Connected)
} }
} }
@ -157,42 +164,29 @@ struct KnownHosts {
impl KnownHosts { impl KnownHosts {
fn new() -> Self { fn new() -> Self {
let list = HashMap::new(); let list = HashMap::new();
let mut ret = Self { let hash = Self::calculate_hash(vec![]);
list, Self { list, hash }
hash: hash::Digest::from_slice(&[0u8; 64][..]).unwrap(),
};
ret.update_hash();
ret
} }
fn update_hash(&mut self) { fn update_hash(&mut self) {
// The hash is a value that is exchanged between nodes when they ping one self.hash = Self::calculate_hash(self.connected_peers_vec());
// another. Nodes compare their known hosts hash to know if they are connected
// to the same set of nodes. If the hashes differ, they are connected to
// different nodes and they trigger an exchange of the full list of active
// connections. The hash value only represents the set of node IDs and not
// their actual socket addresses, because nodes can be connected via different
// addresses and that shouldn't necessarily trigger a full peer exchange.
let mut list = self
.list
.iter()
.filter(|(_, peer)| peer.state.is_up())
.map(|(id, _)| *id)
.collect::<Vec<_>>();
list.sort();
let mut hash_state = hash::State::new();
for id in list {
hash_state.update(&id[..]);
}
self.hash = hash_state.finalize();
} }
fn connected_peers_vec(&self) -> Vec<(NodeID, SocketAddr)> { fn connected_peers_vec(&self) -> Vec<(NodeID, SocketAddr)> {
self.list let mut list = Vec::with_capacity(self.list.len());
.iter() for (id, peer) in self.list.iter() {
.filter_map(|(id, peer)| match peer.state { if peer.state.is_up() {
PeerConnState::Connected { addr } => Some((*id, addr)), list.push((*id, peer.addr));
_ => None, }
}) }
.collect::<Vec<_>>() list
}
fn calculate_hash(mut list: Vec<(NodeID, SocketAddr)>) -> hash::Digest {
list.sort();
let mut hash_state = hash::State::new();
for (id, addr) in list {
hash_state.update(&id[..]);
hash_state.update(&format!("{}\n", addr).into_bytes()[..]);
}
hash_state.finalize()
} }
} }
@ -226,16 +220,18 @@ impl PeeringManager {
if id != netapp.id { if id != netapp.id {
known_hosts.list.insert( known_hosts.list.insert(
id, id,
PeerInfoInternal::new(PeerConnState::Waiting(0, Instant::now()), Some(addr)), PeerInfoInternal::new(addr, PeerConnState::Waiting(0, Instant::now())),
); );
} }
} }
known_hosts.list.insert( if let Some(addr) = our_addr {
netapp.id, known_hosts.list.insert(
PeerInfoInternal::new(PeerConnState::Ourself, our_addr), netapp.id,
); PeerInfoInternal::new(addr, PeerConnState::Ourself),
known_hosts.update_hash(); );
known_hosts.update_hash();
}
// TODO for v0.10 / v1.0 : rename the endpoint (it will break compatibility) // TODO for v0.10 / v1.0 : rename the endpoint (it will break compatibility)
let strat = Arc::new(Self { let strat = Arc::new(Self {
@ -280,7 +276,7 @@ impl PeeringManager {
for (id, info) in known_hosts.list.iter() { for (id, info) in known_hosts.list.iter() {
trace!("{}, {:?}", hex::encode(&id[..8]), info); trace!("{}, {:?}", hex::encode(&id[..8]), info);
match info.state { match info.state {
PeerConnState::Connected { .. } => { PeerConnState::Connected => {
let must_ping = match info.last_send_ping { let must_ping = match info.last_send_ping {
None => true, None => true,
Some(t) => Instant::now() - t > PING_INTERVAL, Some(t) => Instant::now() - t > PING_INTERVAL,
@ -323,7 +319,7 @@ impl PeeringManager {
info!( info!(
"Retrying connection to {} at {} ({})", "Retrying connection to {} at {} ({})",
hex::encode(&id[..8]), hex::encode(&id[..8]),
h.known_addrs h.all_addrs
.iter() .iter()
.map(|x| format!("{}", x)) .map(|x| format!("{}", x))
.collect::<Vec<_>>() .collect::<Vec<_>>()
@ -332,8 +328,13 @@ impl PeeringManager {
); );
h.state = PeerConnState::Trying(i); h.state = PeerConnState::Trying(i);
let addresses = h.known_addrs.clone(); let alternate_addrs = h
tokio::spawn(self.clone().try_connect(id, addresses)); .all_addrs
.iter()
.filter(|x| **x != h.addr)
.cloned()
.collect::<Vec<_>>();
tokio::spawn(self.clone().try_connect(id, h.addr, alternate_addrs));
} }
} }
} }
@ -361,24 +362,27 @@ impl PeeringManager {
fn update_public_peer_list(&self, known_hosts: &KnownHosts) { fn update_public_peer_list(&self, known_hosts: &KnownHosts) {
let mut pub_peer_list = Vec::with_capacity(known_hosts.list.len()); let mut pub_peer_list = Vec::with_capacity(known_hosts.list.len());
for (id, info) in known_hosts.list.iter() { for (id, info) in known_hosts.list.iter() {
if *id == self.netapp.id {
// sanity check
assert!(matches!(info.state, PeerConnState::Ourself));
}
let mut pings = info.ping.iter().cloned().collect::<Vec<_>>(); let mut pings = info.ping.iter().cloned().collect::<Vec<_>>();
pings.sort(); pings.sort();
if !pings.is_empty() { if !pings.is_empty() {
pub_peer_list.push(PeerInfo { pub_peer_list.push(PeerInfo {
id: *id, id: *id,
addr: info.addr,
state: info.state, state: info.state,
last_seen: info.last_seen, last_seen: info.last_seen,
avg_ping: Some(pings.iter().sum::<Duration>().div_f64(pings.len() as f64)), avg_ping: Some(
pings
.iter()
.fold(Duration::from_secs(0), |x, y| x + *y)
.div_f64(pings.len() as f64),
),
max_ping: pings.last().cloned(), max_ping: pings.last().cloned(),
med_ping: Some(pings[pings.len() / 2]), med_ping: Some(pings[pings.len() / 2]),
}); });
} else { } else {
pub_peer_list.push(PeerInfo { pub_peer_list.push(PeerInfo {
id: *id, id: *id,
addr: info.addr,
state: info.state, state: info.state,
last_seen: info.last_seen, last_seen: info.last_seen,
avg_ping: None, avg_ping: None,
@ -491,10 +495,15 @@ impl PeeringManager {
} }
} }
async fn try_connect(self: Arc<Self>, id: NodeID, addresses: Vec<SocketAddr>) { async fn try_connect(
self: Arc<Self>,
id: NodeID,
default_addr: SocketAddr,
alternate_addrs: Vec<SocketAddr>,
) {
let conn_addr = { let conn_addr = {
let mut ret = None; let mut ret = None;
for addr in addresses.iter() { for addr in [default_addr].iter().chain(alternate_addrs.iter()) {
debug!("Trying address {} for peer {}", addr, hex::encode(&id[..8])); debug!("Trying address {} for peer {}", addr, hex::encode(&id[..8]));
match self.netapp.clone().try_connect(*addr, id).await { match self.netapp.clone().try_connect(*addr, id).await {
Ok(()) => { Ok(()) => {
@ -520,7 +529,7 @@ impl PeeringManager {
warn!( warn!(
"Could not connect to peer {} ({} addresses tried)", "Could not connect to peer {} ({} addresses tried)",
hex::encode(&id[..8]), hex::encode(&id[..8]),
addresses.len() 1 + alternate_addrs.len()
); );
let mut known_hosts = self.known_hosts.write().unwrap(); let mut known_hosts = self.known_hosts.write().unwrap();
if let Some(host) = known_hosts.list.get_mut(&id) { if let Some(host) = known_hosts.list.get_mut(&id) {
@ -540,14 +549,6 @@ impl PeeringManager {
} }
fn on_connected(self: &Arc<Self>, id: NodeID, addr: SocketAddr, is_incoming: bool) { fn on_connected(self: &Arc<Self>, id: NodeID, addr: SocketAddr, is_incoming: bool) {
if id == self.netapp.id {
// sanity check
panic!(
"on_connected from local node, id={:?}, addr={}, incoming={}",
id, addr, is_incoming
);
}
let mut known_hosts = self.known_hosts.write().unwrap(); let mut known_hosts = self.known_hosts.write().unwrap();
if is_incoming { if is_incoming {
if let Some(host) = known_hosts.list.get_mut(&id) { if let Some(host) = known_hosts.list.get_mut(&id) {
@ -562,13 +563,13 @@ impl PeeringManager {
addr addr
); );
if let Some(host) = known_hosts.list.get_mut(&id) { if let Some(host) = known_hosts.list.get_mut(&id) {
host.state = PeerConnState::Connected { addr }; host.state = PeerConnState::Connected;
host.addr = addr;
host.add_addr(addr); host.add_addr(addr);
} else { } else {
known_hosts.list.insert( known_hosts
id, .list
PeerInfoInternal::new(PeerConnState::Connected { addr }, Some(addr)), .insert(id, PeerInfoInternal::new(addr, PeerConnState::Connected));
);
} }
} }
known_hosts.update_hash(); known_hosts.update_hash();
@ -588,8 +589,12 @@ impl PeeringManager {
} }
fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfoInternal { fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfoInternal {
assert!(*id != self.netapp.id); let state = if *id == self.netapp.id {
PeerInfoInternal::new(PeerConnState::Waiting(0, Instant::now()), Some(addr)) PeerConnState::Ourself
} else {
PeerConnState::Waiting(0, Instant::now())
};
PeerInfoInternal::new(addr, state)
} }
} }

View file

@ -16,7 +16,7 @@ use tokio::sync::{watch, Notify};
use garage_net::endpoint::{Endpoint, EndpointHandler}; use garage_net::endpoint::{Endpoint, EndpointHandler};
use garage_net::message::*; use garage_net::message::*;
use garage_net::peering::{PeerConnState, PeeringManager}; use garage_net::peering::PeeringManager;
use garage_net::util::parse_and_resolve_peer_addr_async; use garage_net::util::parse_and_resolve_peer_addr_async;
use garage_net::{NetApp, NetworkKey, NodeID, NodeKey}; use garage_net::{NetApp, NetworkKey, NodeID, NodeKey};
@ -142,7 +142,7 @@ pub struct NodeStatus {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KnownNodeInfo { pub struct KnownNodeInfo {
pub id: Uuid, pub id: Uuid,
pub addr: Option<SocketAddr>, pub addr: SocketAddr,
pub is_up: bool, pub is_up: bool,
pub last_seen_secs_ago: Option<u64>, pub last_seen_secs_ago: Option<u64>,
pub status: NodeStatus, pub status: NodeStatus,
@ -381,11 +381,7 @@ impl System {
.iter() .iter()
.map(|n| KnownNodeInfo { .map(|n| KnownNodeInfo {
id: n.id.into(), id: n.id.into(),
addr: match n.state { addr: n.addr,
PeerConnState::Ourself => self.rpc_public_addr,
PeerConnState::Connected { addr } => Some(addr),
_ => None,
},
is_up: n.is_up(), is_up: n.is_up(),
last_seen_secs_ago: n last_seen_secs_ago: n
.last_seen .last_seen
@ -726,10 +722,7 @@ impl System {
.peering .peering
.get_peer_list() .get_peer_list()
.iter() .iter()
.filter_map(|n| match n.state { .map(|n| (n.id.into(), n.addr))
PeerConnState::Connected { addr } => Some((n.id.into(), addr)),
_ => None,
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// Before doing it, we read the current peer list file (if it exists) // Before doing it, we read the current peer list file (if it exists)