forked from Deuxfleurs/garage
Merge pull request 'Fixes to garage_net peering manager' (#786) from net-fixes into next-0.10
Reviewed-on: Deuxfleurs/garage#786
This commit is contained in:
commit
7e0107c47d
5 changed files with 118 additions and 120 deletions
|
@ -27,7 +27,7 @@ pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<
|
||||||
i.id,
|
i.id,
|
||||||
NodeResp {
|
NodeResp {
|
||||||
id: hex::encode(i.id),
|
id: hex::encode(i.id),
|
||||||
addr: Some(i.addr),
|
addr: i.addr,
|
||||||
hostname: i.status.hostname,
|
hostname: i.status.hostname,
|
||||||
is_up: i.is_up,
|
is_up: i.is_up,
|
||||||
last_seen_secs_ago: i.last_seen_secs_ago,
|
last_seen_secs_ago: i.last_seen_secs_ago,
|
||||||
|
|
|
@ -57,6 +57,10 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
|
||||||
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()];
|
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()];
|
||||||
for adv in status.iter().filter(|adv| adv.is_up) {
|
for adv in status.iter().filter(|adv| adv.is_up) {
|
||||||
let host = adv.status.hostname.as_deref().unwrap_or("?");
|
let host = adv.status.hostname.as_deref().unwrap_or("?");
|
||||||
|
let addr = match adv.addr {
|
||||||
|
Some(addr) => addr.to_string(),
|
||||||
|
None => "N/A".to_string(),
|
||||||
|
};
|
||||||
if let Some(NodeRoleV(Some(cfg))) = layout.current().roles.get(&adv.id) {
|
if let Some(NodeRoleV(Some(cfg))) = layout.current().roles.get(&adv.id) {
|
||||||
let data_avail = match &adv.status.data_disk_avail {
|
let data_avail = match &adv.status.data_disk_avail {
|
||||||
_ if cfg.capacity.is_none() => "N/A".into(),
|
_ if cfg.capacity.is_none() => "N/A".into(),
|
||||||
|
@ -71,7 +75,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
|
||||||
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}",
|
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}",
|
||||||
id = adv.id,
|
id = adv.id,
|
||||||
host = host,
|
host = host,
|
||||||
addr = adv.addr,
|
addr = addr,
|
||||||
tags = cfg.tags.join(","),
|
tags = cfg.tags.join(","),
|
||||||
zone = cfg.zone,
|
zone = cfg.zone,
|
||||||
capacity = cfg.capacity_string(),
|
capacity = cfg.capacity_string(),
|
||||||
|
@ -91,7 +95,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
|
||||||
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\tdraining metadata...",
|
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\tdraining metadata...",
|
||||||
id = adv.id,
|
id = adv.id,
|
||||||
host = host,
|
host = host,
|
||||||
addr = adv.addr,
|
addr = addr,
|
||||||
tags = cfg.tags.join(","),
|
tags = cfg.tags.join(","),
|
||||||
zone = cfg.zone,
|
zone = cfg.zone,
|
||||||
));
|
));
|
||||||
|
@ -104,7 +108,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
|
||||||
"{id:?}\t{h}\t{addr}\t\t\t{new_role}",
|
"{id:?}\t{h}\t{addr}\t\t\t{new_role}",
|
||||||
id = adv.id,
|
id = adv.id,
|
||||||
h = host,
|
h = host,
|
||||||
addr = adv.addr,
|
addr = addr,
|
||||||
new_role = new_role,
|
new_role = new_role,
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
@ -120,8 +124,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
|
||||||
|
|
||||||
let tf = timeago::Formatter::new();
|
let tf = timeago::Formatter::new();
|
||||||
let mut drain_msg = false;
|
let mut drain_msg = false;
|
||||||
let mut failed_nodes =
|
let mut failed_nodes = vec!["ID\tHostname\tTags\tZone\tCapacity\tLast seen".to_string()];
|
||||||
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
|
|
||||||
let mut listed = HashSet::new();
|
let mut listed = HashSet::new();
|
||||||
for ver in layout.versions.iter().rev() {
|
for ver in layout.versions.iter().rev() {
|
||||||
for (node, _, role) in ver.roles.items().iter() {
|
for (node, _, role) in ver.roles.items().iter() {
|
||||||
|
@ -142,15 +145,14 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
|
||||||
|
|
||||||
// Node is in a layout version, is not a gateway node, and is not up:
|
// Node is in a layout version, is not a gateway node, and is not up:
|
||||||
// it is in a failed state, add proper line to the output
|
// it is in a failed state, add proper line to the output
|
||||||
let (host, addr, last_seen) = match adv {
|
let (host, last_seen) = match adv {
|
||||||
Some(adv) => (
|
Some(adv) => (
|
||||||
adv.status.hostname.as_deref().unwrap_or("?"),
|
adv.status.hostname.as_deref().unwrap_or("?"),
|
||||||
adv.addr.to_string(),
|
|
||||||
adv.last_seen_secs_ago
|
adv.last_seen_secs_ago
|
||||||
.map(|s| tf.convert(Duration::from_secs(s)))
|
.map(|s| tf.convert(Duration::from_secs(s)))
|
||||||
.unwrap_or_else(|| "never seen".into()),
|
.unwrap_or_else(|| "never seen".into()),
|
||||||
),
|
),
|
||||||
None => ("??", "??".into(), "never seen".into()),
|
None => ("??", "never seen".into()),
|
||||||
};
|
};
|
||||||
let capacity = if ver.version == layout.current().version {
|
let capacity = if ver.version == layout.current().version {
|
||||||
cfg.capacity_string()
|
cfg.capacity_string()
|
||||||
|
@ -159,10 +161,9 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
|
||||||
"draining metadata...".to_string()
|
"draining metadata...".to_string()
|
||||||
};
|
};
|
||||||
failed_nodes.push(format!(
|
failed_nodes.push(format!(
|
||||||
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
|
"{id:?}\t{host}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
|
||||||
id = node,
|
id = node,
|
||||||
host = host,
|
host = host,
|
||||||
addr = addr,
|
|
||||||
tags = cfg.tags.join(","),
|
tags = cfg.tags.join(","),
|
||||||
zone = cfg.zone,
|
zone = cfg.zone,
|
||||||
capacity = capacity,
|
capacity = capacity,
|
||||||
|
|
|
@ -292,13 +292,7 @@ impl NetApp {
|
||||||
/// the other node with `Netapp::request`
|
/// the other node with `Netapp::request`
|
||||||
pub async fn try_connect(self: Arc<Self>, ip: SocketAddr, id: NodeID) -> Result<(), Error> {
|
pub async fn try_connect(self: Arc<Self>, ip: SocketAddr, id: NodeID) -> Result<(), Error> {
|
||||||
// Don't connect to ourself, we don't care
|
// Don't connect to ourself, we don't care
|
||||||
// but pretend we did
|
|
||||||
if id == self.id {
|
if id == self.id {
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Some(h) = self.on_connected_handler.load().as_ref() {
|
|
||||||
h(id, ip, false);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -327,31 +321,32 @@ impl NetApp {
|
||||||
/// Close the outgoing connection we have to a node specified by its public key,
|
/// Close the outgoing connection we have to a node specified by its public key,
|
||||||
/// if such a connection is currently open.
|
/// if such a connection is currently open.
|
||||||
pub fn disconnect(self: &Arc<Self>, id: &NodeID) {
|
pub fn disconnect(self: &Arc<Self>, id: &NodeID) {
|
||||||
|
let conn = self.client_conns.write().unwrap().remove(id);
|
||||||
|
|
||||||
// If id is ourself, we're not supposed to have a connection open
|
// If id is ourself, we're not supposed to have a connection open
|
||||||
if *id != self.id {
|
if *id == self.id {
|
||||||
let conn = self.client_conns.write().unwrap().remove(id);
|
// sanity check
|
||||||
if let Some(c) = conn {
|
assert!(conn.is_none(), "had a connection to local node");
|
||||||
debug!(
|
return;
|
||||||
"Closing connection to {} ({})",
|
|
||||||
hex::encode(&c.peer_id[..8]),
|
|
||||||
c.remote_addr
|
|
||||||
);
|
|
||||||
c.close();
|
|
||||||
} else {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// call on_disconnected_handler immediately, since the connection
|
if let Some(c) = conn {
|
||||||
// was removed
|
debug!(
|
||||||
// (if id == self.id, we pretend we disconnected)
|
"Closing connection to {} ({})",
|
||||||
let id = *id;
|
hex::encode(&c.peer_id[..8]),
|
||||||
let self2 = self.clone();
|
c.remote_addr
|
||||||
tokio::spawn(async move {
|
);
|
||||||
if let Some(h) = self2.on_disconnected_handler.load().as_ref() {
|
c.close();
|
||||||
h(id, false);
|
|
||||||
}
|
// call on_disconnected_handler immediately, since the connection was removed
|
||||||
});
|
let id = *id;
|
||||||
|
let self2 = self.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Some(h) = self2.on_disconnected_handler.load().as_ref() {
|
||||||
|
h(id, false);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Called from conn.rs when an incoming connection is successfully established
|
// Called from conn.rs when an incoming connection is successfully established
|
||||||
|
|
|
@ -54,12 +54,8 @@ impl Message for PeerListMessage {
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PeerInfoInternal {
|
struct PeerInfoInternal {
|
||||||
// addr is the currently connected address,
|
// known_addrs contains all of the addresses everyone gave us
|
||||||
// or the last address we were connected to,
|
known_addrs: Vec<SocketAddr>,
|
||||||
// or an arbitrary address some other peer gave us
|
|
||||||
addr: SocketAddr,
|
|
||||||
// all_addrs contains all of the addresses everyone gave us
|
|
||||||
all_addrs: Vec<SocketAddr>,
|
|
||||||
|
|
||||||
state: PeerConnState,
|
state: PeerConnState,
|
||||||
last_send_ping: Option<Instant>,
|
last_send_ping: Option<Instant>,
|
||||||
|
@ -69,10 +65,9 @@ struct PeerInfoInternal {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PeerInfoInternal {
|
impl PeerInfoInternal {
|
||||||
fn new(addr: SocketAddr, state: PeerConnState) -> Self {
|
fn new(state: PeerConnState, known_addr: Option<SocketAddr>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
addr,
|
known_addrs: known_addr.map(|x| vec![x]).unwrap_or_default(),
|
||||||
all_addrs: vec![addr],
|
|
||||||
state,
|
state,
|
||||||
last_send_ping: None,
|
last_send_ping: None,
|
||||||
last_seen: None,
|
last_seen: None,
|
||||||
|
@ -81,8 +76,8 @@ impl PeerInfoInternal {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fn add_addr(&mut self, addr: SocketAddr) -> bool {
|
fn add_addr(&mut self, addr: SocketAddr) -> bool {
|
||||||
if !self.all_addrs.contains(&addr) {
|
if !self.known_addrs.contains(&addr) {
|
||||||
self.all_addrs.push(addr);
|
self.known_addrs.push(addr);
|
||||||
// If we are learning a new address for this node,
|
// If we are learning a new address for this node,
|
||||||
// we want to retry connecting
|
// we want to retry connecting
|
||||||
self.state = match self.state {
|
self.state = match self.state {
|
||||||
|
@ -90,7 +85,7 @@ impl PeerInfoInternal {
|
||||||
PeerConnState::Waiting(_, _) | PeerConnState::Abandonned => {
|
PeerConnState::Waiting(_, _) | PeerConnState::Abandonned => {
|
||||||
PeerConnState::Waiting(0, Instant::now())
|
PeerConnState::Waiting(0, Instant::now())
|
||||||
}
|
}
|
||||||
x @ (PeerConnState::Ourself | PeerConnState::Connected) => x,
|
x @ (PeerConnState::Ourself | PeerConnState::Connected { .. }) => x,
|
||||||
};
|
};
|
||||||
true
|
true
|
||||||
} else {
|
} else {
|
||||||
|
@ -104,8 +99,6 @@ impl PeerInfoInternal {
|
||||||
pub struct PeerInfo {
|
pub struct PeerInfo {
|
||||||
/// The node's identifier (its public key)
|
/// The node's identifier (its public key)
|
||||||
pub id: NodeID,
|
pub id: NodeID,
|
||||||
/// The node's network address
|
|
||||||
pub addr: SocketAddr,
|
|
||||||
/// The current status of our connection to this node
|
/// The current status of our connection to this node
|
||||||
pub state: PeerConnState,
|
pub state: PeerConnState,
|
||||||
/// The last time at which the node was seen
|
/// The last time at which the node was seen
|
||||||
|
@ -136,7 +129,7 @@ pub enum PeerConnState {
|
||||||
Ourself,
|
Ourself,
|
||||||
|
|
||||||
/// We currently have a connection to this peer
|
/// We currently have a connection to this peer
|
||||||
Connected,
|
Connected { addr: SocketAddr },
|
||||||
|
|
||||||
/// Our next connection tentative (the nth, where n is the first value of the tuple)
|
/// Our next connection tentative (the nth, where n is the first value of the tuple)
|
||||||
/// will be at given Instant
|
/// will be at given Instant
|
||||||
|
@ -152,7 +145,7 @@ pub enum PeerConnState {
|
||||||
impl PeerConnState {
|
impl PeerConnState {
|
||||||
/// Returns true if we can currently send requests to this peer
|
/// Returns true if we can currently send requests to this peer
|
||||||
pub fn is_up(&self) -> bool {
|
pub fn is_up(&self) -> bool {
|
||||||
matches!(self, Self::Ourself | Self::Connected)
|
matches!(self, Self::Ourself | Self::Connected { .. })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,29 +157,42 @@ struct KnownHosts {
|
||||||
impl KnownHosts {
|
impl KnownHosts {
|
||||||
fn new() -> Self {
|
fn new() -> Self {
|
||||||
let list = HashMap::new();
|
let list = HashMap::new();
|
||||||
let hash = Self::calculate_hash(vec![]);
|
let mut ret = Self {
|
||||||
Self { list, hash }
|
list,
|
||||||
|
hash: hash::Digest::from_slice(&[0u8; 64][..]).unwrap(),
|
||||||
|
};
|
||||||
|
ret.update_hash();
|
||||||
|
ret
|
||||||
}
|
}
|
||||||
fn update_hash(&mut self) {
|
fn update_hash(&mut self) {
|
||||||
self.hash = Self::calculate_hash(self.connected_peers_vec());
|
// The hash is a value that is exchanged between nodes when they ping one
|
||||||
}
|
// another. Nodes compare their known hosts hash to know if they are connected
|
||||||
fn connected_peers_vec(&self) -> Vec<(NodeID, SocketAddr)> {
|
// to the same set of nodes. If the hashes differ, they are connected to
|
||||||
let mut list = Vec::with_capacity(self.list.len());
|
// different nodes and they trigger an exchange of the full list of active
|
||||||
for (id, peer) in self.list.iter() {
|
// connections. The hash value only represents the set of node IDs and not
|
||||||
if peer.state.is_up() {
|
// their actual socket addresses, because nodes can be connected via different
|
||||||
list.push((*id, peer.addr));
|
// addresses and that shouldn't necessarily trigger a full peer exchange.
|
||||||
}
|
let mut list = self
|
||||||
}
|
.list
|
||||||
list
|
.iter()
|
||||||
}
|
.filter(|(_, peer)| peer.state.is_up())
|
||||||
fn calculate_hash(mut list: Vec<(NodeID, SocketAddr)>) -> hash::Digest {
|
.map(|(id, _)| *id)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
list.sort();
|
list.sort();
|
||||||
let mut hash_state = hash::State::new();
|
let mut hash_state = hash::State::new();
|
||||||
for (id, addr) in list {
|
for id in list {
|
||||||
hash_state.update(&id[..]);
|
hash_state.update(&id[..]);
|
||||||
hash_state.update(&format!("{}\n", addr).into_bytes()[..]);
|
|
||||||
}
|
}
|
||||||
hash_state.finalize()
|
self.hash = hash_state.finalize();
|
||||||
|
}
|
||||||
|
fn connected_peers_vec(&self) -> Vec<(NodeID, SocketAddr)> {
|
||||||
|
self.list
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(id, peer)| match peer.state {
|
||||||
|
PeerConnState::Connected { addr } => Some((*id, addr)),
|
||||||
|
_ => None,
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -220,18 +226,16 @@ impl PeeringManager {
|
||||||
if id != netapp.id {
|
if id != netapp.id {
|
||||||
known_hosts.list.insert(
|
known_hosts.list.insert(
|
||||||
id,
|
id,
|
||||||
PeerInfoInternal::new(addr, PeerConnState::Waiting(0, Instant::now())),
|
PeerInfoInternal::new(PeerConnState::Waiting(0, Instant::now()), Some(addr)),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(addr) = our_addr {
|
known_hosts.list.insert(
|
||||||
known_hosts.list.insert(
|
netapp.id,
|
||||||
netapp.id,
|
PeerInfoInternal::new(PeerConnState::Ourself, our_addr),
|
||||||
PeerInfoInternal::new(addr, PeerConnState::Ourself),
|
);
|
||||||
);
|
known_hosts.update_hash();
|
||||||
known_hosts.update_hash();
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO for v0.10 / v1.0 : rename the endpoint (it will break compatibility)
|
// TODO for v0.10 / v1.0 : rename the endpoint (it will break compatibility)
|
||||||
let strat = Arc::new(Self {
|
let strat = Arc::new(Self {
|
||||||
|
@ -276,7 +280,7 @@ impl PeeringManager {
|
||||||
for (id, info) in known_hosts.list.iter() {
|
for (id, info) in known_hosts.list.iter() {
|
||||||
trace!("{}, {:?}", hex::encode(&id[..8]), info);
|
trace!("{}, {:?}", hex::encode(&id[..8]), info);
|
||||||
match info.state {
|
match info.state {
|
||||||
PeerConnState::Connected => {
|
PeerConnState::Connected { .. } => {
|
||||||
let must_ping = match info.last_send_ping {
|
let must_ping = match info.last_send_ping {
|
||||||
None => true,
|
None => true,
|
||||||
Some(t) => Instant::now() - t > PING_INTERVAL,
|
Some(t) => Instant::now() - t > PING_INTERVAL,
|
||||||
|
@ -319,7 +323,7 @@ impl PeeringManager {
|
||||||
info!(
|
info!(
|
||||||
"Retrying connection to {} at {} ({})",
|
"Retrying connection to {} at {} ({})",
|
||||||
hex::encode(&id[..8]),
|
hex::encode(&id[..8]),
|
||||||
h.all_addrs
|
h.known_addrs
|
||||||
.iter()
|
.iter()
|
||||||
.map(|x| format!("{}", x))
|
.map(|x| format!("{}", x))
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
|
@ -328,13 +332,8 @@ impl PeeringManager {
|
||||||
);
|
);
|
||||||
h.state = PeerConnState::Trying(i);
|
h.state = PeerConnState::Trying(i);
|
||||||
|
|
||||||
let alternate_addrs = h
|
let addresses = h.known_addrs.clone();
|
||||||
.all_addrs
|
tokio::spawn(self.clone().try_connect(id, addresses));
|
||||||
.iter()
|
|
||||||
.filter(|x| **x != h.addr)
|
|
||||||
.cloned()
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
tokio::spawn(self.clone().try_connect(id, h.addr, alternate_addrs));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -362,27 +361,24 @@ impl PeeringManager {
|
||||||
fn update_public_peer_list(&self, known_hosts: &KnownHosts) {
|
fn update_public_peer_list(&self, known_hosts: &KnownHosts) {
|
||||||
let mut pub_peer_list = Vec::with_capacity(known_hosts.list.len());
|
let mut pub_peer_list = Vec::with_capacity(known_hosts.list.len());
|
||||||
for (id, info) in known_hosts.list.iter() {
|
for (id, info) in known_hosts.list.iter() {
|
||||||
|
if *id == self.netapp.id {
|
||||||
|
// sanity check
|
||||||
|
assert!(matches!(info.state, PeerConnState::Ourself));
|
||||||
|
}
|
||||||
let mut pings = info.ping.iter().cloned().collect::<Vec<_>>();
|
let mut pings = info.ping.iter().cloned().collect::<Vec<_>>();
|
||||||
pings.sort();
|
pings.sort();
|
||||||
if !pings.is_empty() {
|
if !pings.is_empty() {
|
||||||
pub_peer_list.push(PeerInfo {
|
pub_peer_list.push(PeerInfo {
|
||||||
id: *id,
|
id: *id,
|
||||||
addr: info.addr,
|
|
||||||
state: info.state,
|
state: info.state,
|
||||||
last_seen: info.last_seen,
|
last_seen: info.last_seen,
|
||||||
avg_ping: Some(
|
avg_ping: Some(pings.iter().sum::<Duration>().div_f64(pings.len() as f64)),
|
||||||
pings
|
|
||||||
.iter()
|
|
||||||
.fold(Duration::from_secs(0), |x, y| x + *y)
|
|
||||||
.div_f64(pings.len() as f64),
|
|
||||||
),
|
|
||||||
max_ping: pings.last().cloned(),
|
max_ping: pings.last().cloned(),
|
||||||
med_ping: Some(pings[pings.len() / 2]),
|
med_ping: Some(pings[pings.len() / 2]),
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
pub_peer_list.push(PeerInfo {
|
pub_peer_list.push(PeerInfo {
|
||||||
id: *id,
|
id: *id,
|
||||||
addr: info.addr,
|
|
||||||
state: info.state,
|
state: info.state,
|
||||||
last_seen: info.last_seen,
|
last_seen: info.last_seen,
|
||||||
avg_ping: None,
|
avg_ping: None,
|
||||||
|
@ -495,15 +491,10 @@ impl PeeringManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn try_connect(
|
async fn try_connect(self: Arc<Self>, id: NodeID, addresses: Vec<SocketAddr>) {
|
||||||
self: Arc<Self>,
|
|
||||||
id: NodeID,
|
|
||||||
default_addr: SocketAddr,
|
|
||||||
alternate_addrs: Vec<SocketAddr>,
|
|
||||||
) {
|
|
||||||
let conn_addr = {
|
let conn_addr = {
|
||||||
let mut ret = None;
|
let mut ret = None;
|
||||||
for addr in [default_addr].iter().chain(alternate_addrs.iter()) {
|
for addr in addresses.iter() {
|
||||||
debug!("Trying address {} for peer {}", addr, hex::encode(&id[..8]));
|
debug!("Trying address {} for peer {}", addr, hex::encode(&id[..8]));
|
||||||
match self.netapp.clone().try_connect(*addr, id).await {
|
match self.netapp.clone().try_connect(*addr, id).await {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
|
@ -529,7 +520,7 @@ impl PeeringManager {
|
||||||
warn!(
|
warn!(
|
||||||
"Could not connect to peer {} ({} addresses tried)",
|
"Could not connect to peer {} ({} addresses tried)",
|
||||||
hex::encode(&id[..8]),
|
hex::encode(&id[..8]),
|
||||||
1 + alternate_addrs.len()
|
addresses.len()
|
||||||
);
|
);
|
||||||
let mut known_hosts = self.known_hosts.write().unwrap();
|
let mut known_hosts = self.known_hosts.write().unwrap();
|
||||||
if let Some(host) = known_hosts.list.get_mut(&id) {
|
if let Some(host) = known_hosts.list.get_mut(&id) {
|
||||||
|
@ -549,6 +540,14 @@ impl PeeringManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_connected(self: &Arc<Self>, id: NodeID, addr: SocketAddr, is_incoming: bool) {
|
fn on_connected(self: &Arc<Self>, id: NodeID, addr: SocketAddr, is_incoming: bool) {
|
||||||
|
if id == self.netapp.id {
|
||||||
|
// sanity check
|
||||||
|
panic!(
|
||||||
|
"on_connected from local node, id={:?}, addr={}, incoming={}",
|
||||||
|
id, addr, is_incoming
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
let mut known_hosts = self.known_hosts.write().unwrap();
|
let mut known_hosts = self.known_hosts.write().unwrap();
|
||||||
if is_incoming {
|
if is_incoming {
|
||||||
if let Some(host) = known_hosts.list.get_mut(&id) {
|
if let Some(host) = known_hosts.list.get_mut(&id) {
|
||||||
|
@ -563,13 +562,13 @@ impl PeeringManager {
|
||||||
addr
|
addr
|
||||||
);
|
);
|
||||||
if let Some(host) = known_hosts.list.get_mut(&id) {
|
if let Some(host) = known_hosts.list.get_mut(&id) {
|
||||||
host.state = PeerConnState::Connected;
|
host.state = PeerConnState::Connected { addr };
|
||||||
host.addr = addr;
|
|
||||||
host.add_addr(addr);
|
host.add_addr(addr);
|
||||||
} else {
|
} else {
|
||||||
known_hosts
|
known_hosts.list.insert(
|
||||||
.list
|
id,
|
||||||
.insert(id, PeerInfoInternal::new(addr, PeerConnState::Connected));
|
PeerInfoInternal::new(PeerConnState::Connected { addr }, Some(addr)),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
known_hosts.update_hash();
|
known_hosts.update_hash();
|
||||||
|
@ -589,12 +588,8 @@ impl PeeringManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfoInternal {
|
fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfoInternal {
|
||||||
let state = if *id == self.netapp.id {
|
assert!(*id != self.netapp.id);
|
||||||
PeerConnState::Ourself
|
PeerInfoInternal::new(PeerConnState::Waiting(0, Instant::now()), Some(addr))
|
||||||
} else {
|
|
||||||
PeerConnState::Waiting(0, Instant::now())
|
|
||||||
};
|
|
||||||
PeerInfoInternal::new(addr, state)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,7 @@ use tokio::sync::{watch, Notify};
|
||||||
|
|
||||||
use garage_net::endpoint::{Endpoint, EndpointHandler};
|
use garage_net::endpoint::{Endpoint, EndpointHandler};
|
||||||
use garage_net::message::*;
|
use garage_net::message::*;
|
||||||
use garage_net::peering::PeeringManager;
|
use garage_net::peering::{PeerConnState, PeeringManager};
|
||||||
use garage_net::util::parse_and_resolve_peer_addr_async;
|
use garage_net::util::parse_and_resolve_peer_addr_async;
|
||||||
use garage_net::{NetApp, NetworkKey, NodeID, NodeKey};
|
use garage_net::{NetApp, NetworkKey, NodeID, NodeKey};
|
||||||
|
|
||||||
|
@ -142,7 +142,7 @@ pub struct NodeStatus {
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct KnownNodeInfo {
|
pub struct KnownNodeInfo {
|
||||||
pub id: Uuid,
|
pub id: Uuid,
|
||||||
pub addr: SocketAddr,
|
pub addr: Option<SocketAddr>,
|
||||||
pub is_up: bool,
|
pub is_up: bool,
|
||||||
pub last_seen_secs_ago: Option<u64>,
|
pub last_seen_secs_ago: Option<u64>,
|
||||||
pub status: NodeStatus,
|
pub status: NodeStatus,
|
||||||
|
@ -381,7 +381,11 @@ impl System {
|
||||||
.iter()
|
.iter()
|
||||||
.map(|n| KnownNodeInfo {
|
.map(|n| KnownNodeInfo {
|
||||||
id: n.id.into(),
|
id: n.id.into(),
|
||||||
addr: n.addr,
|
addr: match n.state {
|
||||||
|
PeerConnState::Ourself => self.rpc_public_addr,
|
||||||
|
PeerConnState::Connected { addr } => Some(addr),
|
||||||
|
_ => None,
|
||||||
|
},
|
||||||
is_up: n.is_up(),
|
is_up: n.is_up(),
|
||||||
last_seen_secs_ago: n
|
last_seen_secs_ago: n
|
||||||
.last_seen
|
.last_seen
|
||||||
|
@ -722,7 +726,10 @@ impl System {
|
||||||
.peering
|
.peering
|
||||||
.get_peer_list()
|
.get_peer_list()
|
||||||
.iter()
|
.iter()
|
||||||
.map(|n| (n.id.into(), n.addr))
|
.filter_map(|n| match n.state {
|
||||||
|
PeerConnState::Connected { addr } => Some((n.id.into(), addr)),
|
||||||
|
_ => None,
|
||||||
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
// Before doing it, we read the current peer list file (if it exists)
|
// Before doing it, we read the current peer list file (if it exists)
|
||||||
|
|
Loading…
Reference in a new issue