diff --git a/Cargo.lock b/Cargo.lock index 52e67f5f..a1ae7289 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -304,6 +304,7 @@ dependencies = [ "futures-channel 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "gethostname 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.13.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -332,6 +333,15 @@ dependencies = [ "typenum 1.11.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "gethostname" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "getrandom" version = "0.1.14" @@ -1273,6 +1283,7 @@ dependencies = [ "checksum futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "22766cf25d64306bedf0384da004d05c9974ab104fcc4528f1236181c18004c5" "checksum fxhash 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" "checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec" +"checksum gethostname 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e692e296bfac1d2533ef168d0b60ff5897b8b70a4009276834014dd8924cc028" "checksum getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" "checksum h2 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "377038bf3c89d18d6ca1431e7a5027194fbd724ca10592b9487ede5e8e144f42" "checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205" diff --git a/Cargo.toml b/Cargo.toml index a66a712f..72528b98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ async-trait = "0.1.30" reduce = "0.1.2" serde_json = "1.0" arc-swap = "0.4" +gethostname = "0.2" rustls = "0.17" tokio-rustls = "0.13" diff --git a/src/main.rs b/src/main.rs index 08f37dd5..0aab9e2a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -247,8 +247,8 @@ async fn cmd_status(rpc_cli: RpcAddrClient, rpc_host: SocketAddr) -> Re for adv in status.iter() { if let Some(cfg) = config.members.get(&adv.id) { println!( - "{:?}\t{}\t{}\t{}", - adv.id, cfg.datacenter, cfg.n_tokens, adv.addr + "{:?}\t{}\t{}\t{}\t{}", + adv.id, adv.state_info.hostname, adv.addr, cfg.datacenter, cfg.n_tokens ); } } @@ -274,7 +274,7 @@ async fn cmd_status(rpc_cli: RpcAddrClient, rpc_host: SocketAddr) -> Re println!("\nUnconfigured nodes:"); for adv in status.iter() { if !config.members.contains_key(&adv.id) { - println!("{:?}\t{}", adv.id, adv.addr); + println!("{:?}\t{}\t{}", adv.id, adv.state_info.hostname, adv.addr); } } } diff --git a/src/membership.rs b/src/membership.rs index 99b0388d..412a83f8 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -48,12 +48,15 @@ pub struct PingMessage { pub status_hash: Hash, pub config_version: u64, + + pub state_info: StateInfo, } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AdvertisedNode { pub id: UUID, pub addr: SocketAddr, + pub state_info: StateInfo, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -72,6 +75,8 @@ pub struct System { pub config: Config, pub id: UUID, + pub state_info: StateInfo, + pub rpc_http_client: Arc, rpc_client: Arc>, @@ -85,14 +90,20 @@ pub struct System { #[derive(Debug, Clone)] pub struct Status { - pub nodes: HashMap, + pub nodes: HashMap, pub hash: Hash, } #[derive(Debug, Clone)] -pub struct NodeStatus { +pub struct StatusEntry { pub addr: SocketAddr, pub remaining_ping_attempts: usize, + pub state_info: StateInfo, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StateInfo { + pub hostname: String, } #[derive(Clone)] @@ -114,9 +125,10 @@ impl Status { let addr = SocketAddr::new(ip, info.rpc_port); let old_status = self.nodes.insert( info.id.clone(), - NodeStatus { + StatusEntry { addr: addr.clone(), remaining_ping_attempts: MAX_FAILED_PINGS, + state_info: info.state_info.clone(), }, ); match old_status { @@ -268,6 +280,12 @@ impl System { status.recalculate_hash(); let (update_status, status) = watch::channel(Arc::new(status)); + let state_info = StateInfo { + hostname: gethostname::gethostname() + .into_string() + .unwrap_or("".to_string()), + }; + let mut ring = Ring { config: net_config, ring: Vec::new(), @@ -289,6 +307,7 @@ impl System { let sys = Arc::new(System { config, id, + state_info, rpc_http_client, rpc_client, status, @@ -346,6 +365,7 @@ impl System { rpc_port: self.config.rpc_port, status_hash: status.hash.clone(), config_version: ring.config.version, + state_info: self.state_info.clone(), }) } @@ -408,6 +428,7 @@ impl System { to_advertise.push(AdvertisedNode { id: info.id.clone(), addr: addr.clone(), + state_info: info.state_info.clone(), }); } if is_new || status.hash != info.status_hash { @@ -486,9 +507,15 @@ impl System { let status = self.status.borrow().clone(); let mut mem = vec![]; for (node, status) in status.nodes.iter() { + let state_info = if *node == self.id { + self.state_info.clone() + } else { + status.state_info.clone() + }; mem.push(AdvertisedNode { id: node.clone(), addr: status.addr.clone(), + state_info, }); } Ok(Message::AdvertiseNodesUp(mem)) @@ -515,9 +542,10 @@ impl System { let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port); let old_self = status.nodes.insert( node.id.clone(), - NodeStatus { + StatusEntry { addr: self_addr, remaining_ping_attempts: MAX_FAILED_PINGS, + state_info: self.state_info.clone(), }, ); has_changed = match old_self { diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 35debb53..b2a0cf22 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -231,10 +231,11 @@ impl RpcHttpClient { let status = resp.status(); let body = hyper::body::to_bytes(resp.into_body()).await?; match rmp_serde::decode::from_read::<_, Result>(body.into_buf()) { - Err(e) => - Err(Error::RPCError(format!("Invalid reply"), status)), - Ok(Err(e)) => - Err(Error::RPCError(e, status)), + Err(e) => Err(Error::RPCError( + format!("Invalid reply (deserialize error: {})", e), + status, + )), + Ok(Err(e)) => Err(Error::RPCError(e, status)), Ok(Ok(x)) => Ok(x), } }