Add node tags in configuration

This commit is contained in:
Alex 2020-04-21 14:07:15 +00:00
parent cc4f2f1cfb
commit be0a2bae81
4 changed files with 44 additions and 15 deletions

View file

@ -111,6 +111,10 @@ pub struct ConfigureNodeOpt {
/// Number of tokens /// Number of tokens
n_tokens: u32, n_tokens: u32,
/// Optionnal node tag
#[structopt(long = "tag", default_value = "")]
tag: String,
} }
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
@ -266,8 +270,8 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re
for adv in status.iter() { for adv in status.iter() {
if let Some(cfg) = config.members.get(&adv.id) { if let Some(cfg) = config.members.get(&adv.id) {
println!( println!(
"{:?}\t{}\t{}\t{}\t{}", "{:?}\t{}\t{}\t{}\t{}\t{}",
adv.id, adv.state_info.hostname, adv.addr, cfg.datacenter, cfg.n_tokens adv.id, adv.state_info.hostname, adv.addr, cfg.tag, cfg.datacenter, cfg.n_tokens
); );
} }
} }
@ -281,7 +285,10 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re
println!("\nFailed nodes:"); println!("\nFailed nodes:");
for (id, cfg) in config.members.iter() { for (id, cfg) in config.members.iter() {
if !status.iter().any(|x| x.id == *id) { if !status.iter().any(|x| x.id == *id) {
println!("{:?}\t{}\t{}", id, cfg.datacenter, cfg.n_tokens); println!(
"{:?}\t{}\t{}\t{}",
id, cfg.tag, cfg.datacenter, cfg.n_tokens
);
} }
} }
} }
@ -340,6 +347,7 @@ async fn cmd_configure(
NetworkConfigEntry { NetworkConfigEntry {
datacenter: args.datacenter, datacenter: args.datacenter,
n_tokens: args.n_tokens, n_tokens: args.n_tokens,
tag: args.tag,
}, },
); );
config.version += 1; config.version += 1;

View file

@ -69,6 +69,7 @@ pub struct NetworkConfig {
pub struct NetworkConfigEntry { pub struct NetworkConfigEntry {
pub datacenter: String, pub datacenter: String,
pub n_tokens: u32, pub n_tokens: u32,
pub tag: String,
} }
pub struct System { pub struct System {
@ -248,7 +249,8 @@ fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
let mut net_config_bytes = vec![]; let mut net_config_bytes = vec![];
file.read_to_end(&mut net_config_bytes)?; file.read_to_end(&mut net_config_bytes)?;
let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])?; let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])
.expect("Unable to parse network configuration file (has version format changed?).");
Ok(net_config) Ok(net_config)
} }

View file

@ -1,8 +1,8 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
use std::time::Instant;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use bytes::IntoBuf; use bytes::IntoBuf;
use futures::future::Future; use futures::future::Future;
@ -51,7 +51,11 @@ where
match handler(msg, sockaddr).await { match handler(msg, sockaddr).await {
Ok(resp) => { Ok(resp) => {
let resp_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Ok(resp))?; let resp_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Ok(resp))?;
trace!("]RPC:{},ok ({} ms)", name, (Instant::now()-begin_time).as_millis()); trace!(
"]RPC:{},ok ({} ms)",
name,
(Instant::now() - begin_time).as_millis()
);
Ok(Response::new(Body::from(resp_bytes))) Ok(Response::new(Body::from(resp_bytes)))
} }
Err(e) => { Err(e) => {
@ -59,7 +63,13 @@ where
let rep_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Err(err_str))?; let rep_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Err(err_str))?;
let mut err_response = Response::new(Body::from(rep_bytes)); let mut err_response = Response::new(Body::from(rep_bytes));
*err_response.status_mut() = e.http_status_code(); *err_response.status_mut() = e.http_status_code();
warn!("RPC error ({}): {} ({} ms), request: {}", name, e, (Instant::now()-begin_time).as_millis(), req_str); warn!(
"RPC error ({}): {} ({} ms), request: {}",
name,
e,
(Instant::now() - begin_time).as_millis(),
req_str
);
Ok(err_response) Ok(err_response)
} }
} }

View file

@ -198,19 +198,25 @@ where
partition: &TodoPartition, partition: &TodoPartition,
must_exit: &mut watch::Receiver<bool>, must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> { ) -> Result<(), Error> {
debug!("({}) Preparing to sync {:?}...", self.table.name, partition);
let root_cks = self
.root_checksum(&partition.begin, &partition.end, must_exit)
.await?;
let my_id = self.table.system.id.clone(); let my_id = self.table.system.id.clone();
let nodes = self let nodes = self
.table .table
.replication .replication
.write_nodes(&partition.begin, &self.table.system); .write_nodes(&partition.begin, &self.table.system)
.into_iter()
.filter(|node| *node != my_id)
.collect::<Vec<_>>();
debug!(
"({}) Preparing to sync {:?} with {:?}...",
self.table.name, partition, nodes
);
let root_cks = self
.root_checksum(&partition.begin, &partition.end, must_exit)
.await?;
let mut sync_futures = nodes let mut sync_futures = nodes
.iter() .iter()
.filter(|node| **node != my_id)
.map(|node| { .map(|node| {
self.clone().do_sync_with( self.clone().do_sync_with(
partition.clone(), partition.clone(),
@ -230,7 +236,10 @@ where
} }
} }
if n_errors > self.table.replication.max_write_errors() { if n_errors > self.table.replication.max_write_errors() {
return Err(Error::Message(format!("Sync failed with too many nodes."))); return Err(Error::Message(format!(
"Sync failed with too many nodes (should have been: {:?}).",
nodes
)));
} }
if !partition.retain { if !partition.retain {