NLnet task 3 #667

Merged
lx merged 60 commits from nlnet-task3 into next-0.10 2024-01-11 10:58:08 +00:00
4 changed files with 147 additions and 3 deletions
Showing only changes of commit 11e6fef93c - Show all commits

View file

@ -135,13 +135,14 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
}
format_table(healthy_nodes);
// Determine which nodes are unhealthy and print that to stdout
// Determine which nodes are unhealthy and print that to stdout
let status_map = status
.iter()
.map(|adv| (adv.id, adv))
.collect::<HashMap<_, _>>();
let tf = timeago::Formatter::new();
let mut drain_msg = false;
let mut failed_nodes =
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
let mut listed = HashSet::new();
@ -163,7 +164,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
}
// Node is in a layout version, is not a gateway node, and is not up:
// it is in a failed state, add proper line to the output
// it is in a failed state, add proper line to the output
let (host, addr, last_seen) = match adv {
Some(adv) => (
adv.status.hostname.as_str(),
@ -177,6 +178,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
let capacity = if ver.version == layout.current().version {
cfg.capacity_string()
} else {
drain_msg = true;
"draining metadata...".to_string()
};
failed_nodes.push(format!(
@ -195,6 +197,14 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
if failed_nodes.len() > 1 {
println!("\n==== FAILED NODES ====");
format_table(failed_nodes);
if drain_msg {
println!();
println!("Your cluster is expecting to drain data from nodes that are currently unavailable.");
println!("If these nodes are definitely dead, please review the layout history with");
println!(
"`garage layout history` and use `garage layout assume-sync` to force progress."
);
}
}
if print_staging_role_changes(&layout) {

View file

@ -32,6 +32,10 @@ pub async fn cli_layout_command_dispatch(
LayoutOperation::Config(config_opt) => {
cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await
}
LayoutOperation::History => cmd_layout_history(system_rpc_endpoint, rpc_host).await,
LayoutOperation::AssumeSync(assume_sync_opt) => {
cmd_layout_assume_sync(system_rpc_endpoint, rpc_host, assume_sync_opt).await
}
}
}
@ -311,6 +315,113 @@ pub async fn cmd_config_layout(
Ok(())
}
pub async fn cmd_layout_history(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
) -> Result<(), Error> {
let layout = fetch_layout(rpc_cli, rpc_host).await?;
let min_stored = layout.min_stored();
println!("==== LAYOUT HISTORY ====");
let mut table = vec!["Version\tStatus\tStorage nodes\tGateway nodes".to_string()];
for ver in layout
.versions
.iter()
.rev()
.chain(layout.old_versions.iter().rev())
{
let status = if ver.version == layout.current().version {
"current"
} else if ver.version >= min_stored {
"draining"
} else {
"historical"
};
table.push(format!(
"#{}\t{}\t{}\t{}",
ver.version,
status,
ver.roles
.items()
.iter()
.filter(|(_, _, x)| matches!(x, NodeRoleV(Some(c)) if c.capacity.is_some()))
.count(),
ver.roles
.items()
.iter()
.filter(|(_, _, x)| matches!(x, NodeRoleV(Some(c)) if c.capacity.is_none()))
.count(),
));
}
format_table(table);
println!();
println!("==== UPDATE TRACKERS ====");
println!("This is the internal data that Garage stores to know which nodes have what data.");
println!();
let mut table = vec!["Node\tAck\tSync\tSync_ack".to_string()];
let all_nodes = layout.get_all_nodes();
for node in all_nodes.iter() {
table.push(format!(
"{:?}\t#{}\t#{}\t#{}",
node,
layout.update_trackers.ack_map.get(node),
layout.update_trackers.sync_map.get(node),
layout.update_trackers.sync_ack_map.get(node),
));
}
table[1..].sort();
format_table(table);
if layout.versions.len() > 1 {
println!();
println!(
"If some nodes are not catching up to the latest layout version in the update tracker,"
);
println!("it might be because they are offline or unable to complete a sync successfully.");
println!(
"You may force progress using `garage layout assume-sync --version {}`",
layout.current().version
);
}
Ok(())
}
pub async fn cmd_layout_assume_sync(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
opt: AssumeSyncOpt,
) -> Result<(), Error> {
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
let min_v = layout.min_stored();
if opt.version <= min_v || opt.version > layout.current().version {
return Err(Error::Message(format!(
"Invalid version, you may use the following version numbers: {}",
(min_v + 1..=layout.current().version)
.map(|x| x.to_string())
.collect::<Vec<_>>()
.join(" ")
)));
}
let all_nodes = layout.get_all_nodes();
for node in all_nodes.iter() {
layout.update_trackers.ack_map.set_max(*node, opt.version);
layout.update_trackers.sync_map.set_max(*node, opt.version);
layout
.update_trackers
.sync_ack_map
.set_max(*node, opt.version);
}
send_layout(rpc_cli, rpc_host, layout).await?;
println!("Success.");
Ok(())
}
// --- utility ---
pub async fn fetch_layout(

View file

@ -112,6 +112,14 @@ pub enum LayoutOperation {
/// Revert staged changes to cluster layout
#[structopt(name = "revert", version = garage_version())]
Revert(RevertLayoutOpt),
/// View the history of layouts in the cluster
#[structopt(name = "history", version = garage_version())]
History,
/// Assume all nodes are synchronized up to a certain layout version
#[structopt(name = "assume-sync", version = garage_version())]
AssumeSync(AssumeSyncOpt),
}
#[derive(StructOpt, Debug)]
@ -169,6 +177,14 @@ pub struct RevertLayoutOpt {
pub(crate) yes: bool,
}
#[derive(StructOpt, Debug)]
pub struct AssumeSyncOpt {
/// Version number of the layout to assume is currently up-to-date.
/// This will generally be the current layout version.
#[structopt(long = "version")]
pub(crate) version: u64,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub enum BucketOperation {
/// List buckets

View file

@ -391,7 +391,10 @@ impl UpdateTracker {
changed
}
pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) -> bool {
/// This bumps the update tracker for a given node up to the specified value.
/// This has potential impacts on the correctness of Garage and should only
/// be used in very specific circumstances.
pub fn set_max(&mut self, peer: Uuid, value: u64) -> bool {
match self.0.get_mut(&peer) {
Some(e) if *e < value => {
*e = value;
@ -412,6 +415,10 @@ impl UpdateTracker {
.min()
.unwrap_or(min_version)
}
pub fn get(&self, node: &Uuid) -> u64 {
self.0.get(node).copied().unwrap_or(0)
}
}
impl UpdateTrackers {