Continue pinging nodes when they are down ; overall better handling of down nodes

This commit is contained in:
Alex 2020-04-23 16:05:43 +00:00
parent 2fe82be3bc
commit 82f4cd8719
5 changed files with 165 additions and 96 deletions

6
TODO
View file

@ -5,12 +5,6 @@ How are we going to test that our replication method works correctly?
We will have to introduce lots of dummy data and then add/remove nodes many times. We will have to introduce lots of dummy data and then add/remove nodes many times.
Improvements
------------
Membership: keep IP addresses of failed nodes and try to reping them regularly
Attaining S3 compatibility Attaining S3 compatibility
-------------------------- --------------------------

View file

@ -3,6 +3,7 @@ use hyper::StatusCode;
use std::io; use std::io;
use crate::data::Hash; use crate::data::Hash;
use crate::rpc_client::RPCError;
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum Error { pub enum Error {
@ -33,7 +34,6 @@ pub enum Error {
RMPDecode(#[error(source)] rmp_serde::decode::Error), RMPDecode(#[error(source)] rmp_serde::decode::Error),
#[error(display = "JSON error: {}", _0)] #[error(display = "JSON error: {}", _0)]
JSON(#[error(source)] serde_json::error::Error), JSON(#[error(source)] serde_json::error::Error),
#[error(display = "TOML decode error: {}", _0)] #[error(display = "TOML decode error: {}", _0)]
TomlDecode(#[error(source)] toml::de::Error), TomlDecode(#[error(source)] toml::de::Error),
@ -43,8 +43,11 @@ pub enum Error {
#[error(display = "Tokio join error: {}", _0)] #[error(display = "Tokio join error: {}", _0)]
TokioJoin(#[error(source)] tokio::task::JoinError), TokioJoin(#[error(source)] tokio::task::JoinError),
#[error(display = "RPC error: {} (status code {})", _0, _1)] #[error(display = "RPC call error: {}", _0)]
RPCError(String, StatusCode), RPC(#[error(source)] RPCError),
#[error(display = "Remote error: {} (status code {})", _0, _1)]
RemoteError(String, StatusCode),
#[error(display = "Bad request: {}", _0)] #[error(display = "Bad request: {}", _0)]
BadRequest(String), BadRequest(String),

View file

@ -27,14 +27,16 @@ mod rpc_server;
mod server; mod server;
mod tls_util; mod tls_util;
use serde::{Deserialize, Serialize};
use std::collections::HashSet; use std::collections::HashSet;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use serde::{Deserialize, Serialize};
use structopt::StructOpt; use structopt::StructOpt;
use data::*;
use error::Error; use error::Error;
use membership::*; use membership::*;
use rpc_client::*; use rpc_client::*;
@ -107,14 +109,16 @@ pub struct ConfigureNodeOpt {
node_id: String, node_id: String,
/// Location (datacenter) of the node /// Location (datacenter) of the node
datacenter: String, #[structopt(short = "d", long = "datacenter")]
datacenter: Option<String>,
/// Number of tokens /// Number of tokens
n_tokens: u32, #[structopt(short = "n", long = "n-tokens")]
n_tokens: Option<u32>,
/// Optionnal node tag /// Optionnal node tag
#[structopt(long = "tag", default_value = "")] #[structopt(short = "t", long = "tag")]
tag: String, tag: Option<String>,
} }
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
@ -276,54 +280,62 @@ async fn main() {
async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Result<(), Error> { async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Result<(), Error> {
let status = match rpc_cli let status = match rpc_cli
.call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
.await? .await??
{ {
Message::AdvertiseNodesUp(nodes) => nodes, Message::AdvertiseNodesUp(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
}; };
let config = match rpc_cli let config = match rpc_cli
.call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
.await? .await??
{ {
Message::AdvertiseConfig(cfg) => cfg, Message::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
}; };
println!("Healthy nodes:"); println!("Healthy nodes:");
for adv in status.iter() { for adv in status.iter().filter(|x| x.is_up) {
if let Some(cfg) = config.members.get(&adv.id) { if let Some(cfg) = config.members.get(&adv.id) {
println!( println!(
"{:?}\t{}\t{}\t{}\t{}\t{}", "{:?}\t{}\t{}\t[{}]\t{}\t{}",
adv.id, adv.state_info.hostname, adv.addr, cfg.tag, cfg.datacenter, cfg.n_tokens adv.id, adv.state_info.hostname, adv.addr, cfg.tag, cfg.datacenter, cfg.n_tokens
); );
} else {
println!(
"{:?}\t{}\t{}\tUNCONFIGURED/REMOVED",
adv.id, adv.state_info.hostname, adv.addr
);
} }
} }
let status_keys = status.iter().map(|x| x.id).collect::<HashSet<_>>(); let status_keys = status.iter().map(|x| x.id).collect::<HashSet<_>>();
if config let failure_case_1 = status.iter().any(|x| !x.is_up);
let failure_case_2 = config
.members .members
.iter() .iter()
.any(|(id, _)| !status_keys.contains(id)) .any(|(id, _)| !status_keys.contains(id));
{ if failure_case_1 || failure_case_2 {
println!("\nFailed nodes:"); println!("\nFailed nodes:");
for (id, cfg) in config.members.iter() { for adv in status.iter().filter(|x| !x.is_up) {
if !status.iter().any(|x| x.id == *id) { if let Some(cfg) = config.members.get(&adv.id) {
println!( println!(
"{:?}\t{}\t{}\t{}", "{:?}\t{}\t{}\t[{}]\t{}\t{}\tlast seen: {}s ago",
id, cfg.tag, cfg.datacenter, cfg.n_tokens adv.id,
adv.state_info.hostname,
adv.addr,
cfg.tag,
cfg.datacenter,
cfg.n_tokens,
(now_msec() - adv.last_seen)/1000,
); );
} }
} }
} for (id, cfg) in config.members.iter() {
if !status.iter().any(|x| x.id == *id) {
if status println!(
.iter() "{:?}\t{}\t{}\t{}\tnever seen",
.any(|adv| !config.members.contains_key(&adv.id)) id, cfg.tag, cfg.datacenter, cfg.n_tokens
{ );
println!("\nUnconfigured nodes:");
for adv in status.iter() {
if !config.members.contains_key(&adv.id) {
println!("{:?}\t{}\t{}", adv.id, adv.state_info.hostname, adv.addr);
} }
} }
} }
@ -338,7 +350,7 @@ async fn cmd_configure(
) -> Result<(), Error> { ) -> Result<(), Error> {
let status = match rpc_cli let status = match rpc_cli
.call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
.await? .await??
{ {
Message::AdvertiseNodesUp(nodes) => nodes, Message::AdvertiseNodesUp(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
@ -359,20 +371,30 @@ async fn cmd_configure(
let mut config = match rpc_cli let mut config = match rpc_cli
.call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
.await? .await??
{ {
Message::AdvertiseConfig(cfg) => cfg, Message::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
}; };
config.members.insert( let new_entry = match config.members.get(&candidates[0]) {
candidates[0].clone(), None => NetworkConfigEntry {
NetworkConfigEntry { datacenter: args
datacenter: args.datacenter, .datacenter
n_tokens: args.n_tokens, .expect("Please specifiy a datacenter with the -d flag"),
tag: args.tag, n_tokens: args
.n_tokens
.expect("Please specifiy a number of tokens with the -n flag"),
tag: args.tag.unwrap_or("".to_string()),
}, },
); Some(old) => NetworkConfigEntry {
datacenter: args.datacenter.unwrap_or(old.datacenter.to_string()),
n_tokens: args.n_tokens.unwrap_or(old.n_tokens),
tag: args.tag.unwrap_or(old.tag.to_string()),
},
};
config.members.insert(candidates[0].clone(), new_entry);
config.version += 1; config.version += 1;
rpc_cli rpc_cli
@ -381,7 +403,7 @@ async fn cmd_configure(
&Message::AdvertiseConfig(config), &Message::AdvertiseConfig(config),
ADMIN_RPC_TIMEOUT, ADMIN_RPC_TIMEOUT,
) )
.await?; .await??;
Ok(()) Ok(())
} }
@ -392,7 +414,7 @@ async fn cmd_remove(
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut config = match rpc_cli let mut config = match rpc_cli
.call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
.await? .await??
{ {
Message::AdvertiseConfig(cfg) => cfg, Message::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
@ -427,7 +449,7 @@ async fn cmd_remove(
&Message::AdvertiseConfig(config), &Message::AdvertiseConfig(config),
ADMIN_RPC_TIMEOUT, ADMIN_RPC_TIMEOUT,
) )
.await?; .await??;
Ok(()) Ok(())
} }
@ -436,7 +458,7 @@ async fn cmd_admin(
rpc_host: SocketAddr, rpc_host: SocketAddr,
args: AdminRPC, args: AdminRPC,
) -> Result<(), Error> { ) -> Result<(), Error> {
match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await? { match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await?? {
AdminRPC::Ok(msg) => { AdminRPC::Ok(msg) => {
println!("{}", msg); println!("{}", msg);
} }

View file

@ -4,6 +4,7 @@ use std::hash::Hasher;
use std::io::Read; use std::io::Read;
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -25,7 +26,7 @@ use crate::server::Config;
const PING_INTERVAL: Duration = Duration::from_secs(10); const PING_INTERVAL: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(2); const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILED_PINGS: usize = 3; const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
pub const MEMBERSHIP_RPC_PATH: &str = "_membership"; pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
@ -56,6 +57,10 @@ pub struct PingMessage {
pub struct AdvertisedNode { pub struct AdvertisedNode {
pub id: UUID, pub id: UUID,
pub addr: SocketAddr, pub addr: SocketAddr,
pub is_up: bool,
pub last_seen: u64,
pub state_info: StateInfo, pub state_info: StateInfo,
} }
@ -91,17 +96,24 @@ pub struct System {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Status { pub struct Status {
pub nodes: HashMap<UUID, StatusEntry>, pub nodes: HashMap<UUID, Arc<StatusEntry>>,
pub hash: Hash, pub hash: Hash,
} }
#[derive(Debug, Clone)] #[derive(Debug)]
pub struct StatusEntry { pub struct StatusEntry {
pub addr: SocketAddr, pub addr: SocketAddr,
pub remaining_ping_attempts: usize, pub last_seen: u64,
pub num_failures: AtomicUsize,
pub state_info: StateInfo, pub state_info: StateInfo,
} }
impl StatusEntry {
pub fn is_up(&self) -> bool {
self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
}
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StateInfo { pub struct StateInfo {
pub hostname: String, pub hostname: String,
@ -126,11 +138,12 @@ impl Status {
let addr = SocketAddr::new(ip, info.rpc_port); let addr = SocketAddr::new(ip, info.rpc_port);
let old_status = self.nodes.insert( let old_status = self.nodes.insert(
info.id, info.id,
StatusEntry { Arc::new(StatusEntry {
addr, addr,
remaining_ping_attempts: MAX_FAILED_PINGS, last_seen: now_msec(),
num_failures: AtomicUsize::from(0),
state_info: info.state_info.clone(), state_info: info.state_info.clone(),
}, }),
); );
match old_status { match old_status {
None => { None => {
@ -427,13 +440,15 @@ impl System {
let mut to_advertise = vec![]; let mut to_advertise = vec![];
for (id_option, addr, ping_resp) in ping_resps { for (id_option, addr, ping_resp) in ping_resps {
if let Ok(Message::Ping(info)) = ping_resp { if let Ok(Ok(Message::Ping(info))) = ping_resp {
let is_new = status.handle_ping(addr.ip(), &info); let is_new = status.handle_ping(addr.ip(), &info);
if is_new { if is_new {
has_changes = true; has_changes = true;
to_advertise.push(AdvertisedNode { to_advertise.push(AdvertisedNode {
id: info.id, id: info.id,
addr: *addr, addr: *addr,
is_up: true,
last_seen: now_msec(),
state_info: info.state_info.clone(), state_info: info.state_info.clone(),
}); });
} }
@ -446,21 +461,16 @@ impl System {
.spawn_cancellable(self.clone().pull_config(info.id).map(Ok)); .spawn_cancellable(self.clone().pull_config(info.id).map(Ok));
} }
} else if let Some(id) = id_option { } else if let Some(id) = id_option {
let remaining_attempts = status if let Some(st) = status.nodes.get_mut(id) {
.nodes st.num_failures.fetch_add(1, Ordering::SeqCst);
.get(id) if !st.is_up() {
.map(|x| x.remaining_ping_attempts) warn!("Node {:?} seems to be down.", id);
.unwrap_or(0); if !ring.config.members.contains_key(id) {
if remaining_attempts == 0 { info!("Removing node {:?} from status (not in config and not responding to pings anymore)", id);
warn!( drop(st);
"Removing node {} after too many failed pings", status.nodes.remove(&id);
hex::encode(&id) has_changes = true;
); }
status.nodes.remove(&id);
has_changes = true;
} else {
if let Some(st) = status.nodes.get_mut(id) {
st.remaining_ping_attempts = remaining_attempts - 1;
} }
} }
} }
@ -521,6 +531,8 @@ impl System {
mem.push(AdvertisedNode { mem.push(AdvertisedNode {
id: *node, id: *node,
addr: status.addr, addr: status.addr,
is_up: status.is_up(),
last_seen: status.last_seen,
state_info, state_info,
}); });
} }
@ -548,18 +560,27 @@ impl System {
let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_bind_addr.port()); let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_bind_addr.port());
let old_self = status.nodes.insert( let old_self = status.nodes.insert(
node.id, node.id,
StatusEntry { Arc::new(StatusEntry {
addr: self_addr, addr: self_addr,
remaining_ping_attempts: MAX_FAILED_PINGS, last_seen: now_msec(),
num_failures: AtomicUsize::from(0),
state_info: self.state_info.clone(), state_info: self.state_info.clone(),
}, }),
); );
has_changed = match old_self { has_changed = match old_self {
None => true, None => true,
Some(x) => x.addr != self_addr, Some(x) => x.addr != self_addr,
}; };
} else if !status.nodes.contains_key(&node.id) { } else {
to_ping.push((node.addr, Some(node.id))); let ping_them = match status.nodes.get(&node.id) {
// Case 1: new node
None => true,
// Case 2: the node might have changed address
Some(our_node) => node.is_up && !our_node.is_up() && our_node.addr != node.addr,
};
if ping_them {
to_ping.push((node.addr, Some(node.id)));
}
} }
} }
if has_changed { if has_changed {
@ -580,8 +601,8 @@ impl System {
self: Arc<Self>, self: Arc<Self>,
adv: &NetworkConfig, adv: &NetworkConfig,
) -> Result<Message, Error> { ) -> Result<Message, Error> {
let mut ring: Ring = self.ring.borrow().as_ref().clone();
let update_lock = self.update_lock.lock().await; let update_lock = self.update_lock.lock().await;
let mut ring: Ring = self.ring.borrow().as_ref().clone();
if adv.version > ring.config.version { if adv.version > ring.config.version {
ring.config = adv.clone(); ring.config = adv.clone();

View file

@ -2,11 +2,13 @@ use std::borrow::Borrow;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use bytes::IntoBuf; use bytes::IntoBuf;
use err_derive::Error;
use futures::future::Future; use futures::future::Future;
use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt; use futures::stream::StreamExt;
@ -25,6 +27,22 @@ use crate::tls_util;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Debug, Error)]
pub enum RPCError {
#[error(display = "Node is down: {:?}.", _0)]
NodeDown(UUID),
#[error(display = "Timeout: {}", _0)]
Timeout(#[error(source)] tokio::time::Elapsed),
#[error(display = "HTTP error: {}", _0)]
HTTP(#[error(source)] http::Error),
#[error(display = "Hyper error: {}", _0)]
Hyper(#[error(source)] hyper::Error),
#[error(display = "Messagepack encode error: {}", _0)]
RMPEncode(#[error(source)] rmp_serde::encode::Error),
#[error(display = "Messagepack decode error: {}", _0)]
RMPDecode(#[error(source)] rmp_serde::decode::Error),
}
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct RequestStrategy { pub struct RequestStrategy {
pub rs_timeout: Duration, pub rs_timeout: Duration,
@ -104,19 +122,34 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
return local_handler(msg).await; return local_handler(msg).await;
} }
} }
let addr = { let status = self.status.borrow().clone();
let status = self.status.borrow().clone(); let node_status = match status.nodes.get(&to) {
match status.nodes.get(to.borrow()) { Some(node_status) => {
Some(status) => status.addr, if node_status.is_up() {
None => { node_status
return Err(Error::Message(format!( } else {
"Peer ID not found: {:?}", return Err(Error::from(RPCError::NodeDown(to)));
to.borrow()
)))
} }
} }
None => {
return Err(Error::Message(format!(
"Peer ID not found: {:?}",
to.borrow()
)))
}
}; };
self.rpc_addr_client.call(&addr, msg, timeout).await match self
.rpc_addr_client
.call(&node_status.addr, msg, timeout)
.await
{
Err(rpc_error) => {
node_status.num_failures.fetch_add(1, Ordering::SeqCst);
// TODO: Save failure info somewhere
Err(Error::from(rpc_error))
}
Ok(x) => x,
}
} }
pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> { pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
@ -219,7 +252,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
to_addr: &SocketAddr, to_addr: &SocketAddr,
msg: MB, msg: MB,
timeout: Duration, timeout: Duration,
) -> Result<M, Error> ) -> Result<Result<M, Error>, RPCError>
where where
MB: Borrow<M>, MB: Borrow<M>,
{ {
@ -276,7 +309,7 @@ impl RpcHttpClient {
to_addr: &SocketAddr, to_addr: &SocketAddr,
msg: MB, msg: MB,
timeout: Duration, timeout: Duration,
) -> Result<M, Error> ) -> Result<Result<M, Error>, RPCError>
where where
MB: Borrow<M>, MB: Borrow<M>,
M: RpcMessage, M: RpcMessage,
@ -318,13 +351,9 @@ impl RpcHttpClient {
let status = resp.status(); let status = resp.status();
let body = hyper::body::to_bytes(resp.into_body()).await?; let body = hyper::body::to_bytes(resp.into_body()).await?;
match rmp_serde::decode::from_read::<_, Result<M, String>>(body.into_buf()) { match rmp_serde::decode::from_read::<_, Result<M, String>>(body.into_buf())? {
Err(e) => Err(Error::RPCError( Err(e) => Ok(Err(Error::RemoteError(e, status))),
format!("Invalid reply (deserialize error: {})", e), Ok(x) => Ok(Ok(x)),
status,
)),
Ok(Err(e)) => Err(Error::RPCError(e, status)),
Ok(Ok(x)) => Ok(x),
} }
} }
} }