Garage v1.0 #683

Merged
lx merged 119 commits from next-0.10 into main 2024-04-10 15:23:13 +00:00
3 changed files with 49 additions and 21 deletions
Showing only changes of commit aa59059a91 - Show all commits

View file

@ -49,13 +49,7 @@ pub async fn cli_command_dispatch(
} }
pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> { pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> {
let status = match rpc_cli let status = fetch_status(rpc_cli, rpc_host).await?;
.call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let layout = fetch_layout(rpc_cli, rpc_host).await?; let layout = fetch_layout(rpc_cli, rpc_host).await?;
println!("==== HEALTHY NODES ===="); println!("==== HEALTHY NODES ====");
@ -268,3 +262,18 @@ pub async fn cmd_admin(
} }
Ok(()) Ok(())
} }
// ---- utility ----
pub async fn fetch_status(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
) -> Result<Vec<KnownNodeInfo>, Error> {
match rpc_cli
.call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes),
resp => Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
}
}

View file

@ -33,8 +33,8 @@ pub async fn cli_layout_command_dispatch(
cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await
} }
LayoutOperation::History => cmd_layout_history(system_rpc_endpoint, rpc_host).await, LayoutOperation::History => cmd_layout_history(system_rpc_endpoint, rpc_host).await,
LayoutOperation::AssumeSync(assume_sync_opt) => { LayoutOperation::SkipDeadNodes(assume_sync_opt) => {
cmd_layout_assume_sync(system_rpc_endpoint, rpc_host, assume_sync_opt).await cmd_layout_skip_dead_nodes(system_rpc_endpoint, rpc_host, assume_sync_opt).await
} }
} }
} }
@ -388,13 +388,21 @@ pub async fn cmd_layout_history(
Ok(()) Ok(())
} }
pub async fn cmd_layout_assume_sync( pub async fn cmd_layout_skip_dead_nodes(
rpc_cli: &Endpoint<SystemRpc, ()>, rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID, rpc_host: NodeID,
opt: AssumeSyncOpt, opt: SkipDeadNodesOpt,
) -> Result<(), Error> { ) -> Result<(), Error> {
let status = fetch_status(rpc_cli, rpc_host).await?;
let mut layout = fetch_layout(rpc_cli, rpc_host).await?; let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
if layout.versions.len() == 1 {
return Err(Error::Message(
"This command cannot be called when there is only one live cluster layout version"
.into(),
));
}
let min_v = layout.min_stored(); let min_v = layout.min_stored();
if opt.version <= min_v || opt.version > layout.current().version { if opt.version <= min_v || opt.version > layout.current().version {
return Err(Error::Message(format!( return Err(Error::Message(format!(
@ -408,12 +416,19 @@ pub async fn cmd_layout_assume_sync(
let all_nodes = layout.get_all_nodes(); let all_nodes = layout.get_all_nodes();
for node in all_nodes.iter() { for node in all_nodes.iter() {
layout.update_trackers.ack_map.set_max(*node, opt.version); if status.iter().any(|x| x.id == *node && x.is_up) {
layout.update_trackers.sync_map.set_max(*node, opt.version); continue;
layout }
.update_trackers
.sync_ack_map if layout.update_trackers.ack_map.set_max(*node, opt.version) {
.set_max(*node, opt.version); println!("Increased the ACK tracker for node {:?}", node);
}
if opt.allow_missing_data {
if layout.update_trackers.sync_map.set_max(*node, opt.version) {
println!("Increased the SYNC tracker for node {:?}", node);
}
}
} }
send_layout(rpc_cli, rpc_host, layout).await?; send_layout(rpc_cli, rpc_host, layout).await?;

View file

@ -117,9 +117,9 @@ pub enum LayoutOperation {
#[structopt(name = "history", version = garage_version())] #[structopt(name = "history", version = garage_version())]
History, History,
/// Assume all nodes are synchronized up to a certain layout version /// Skip dead nodes when awaiting for a new layout version to be synchronized
#[structopt(name = "assume-sync", version = garage_version())] #[structopt(name = "skip-dead-nodes", version = garage_version())]
AssumeSync(AssumeSyncOpt), SkipDeadNodes(SkipDeadNodesOpt),
} }
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
@ -178,11 +178,15 @@ pub struct RevertLayoutOpt {
} }
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
pub struct AssumeSyncOpt { pub struct SkipDeadNodesOpt {
/// Version number of the layout to assume is currently up-to-date. /// Version number of the layout to assume is currently up-to-date.
/// This will generally be the current layout version. /// This will generally be the current layout version.
#[structopt(long = "version")] #[structopt(long = "version")]
pub(crate) version: u64, pub(crate) version: u64,
/// Allow the skip even if a quorum of ndoes could not be found for
/// the data among the remaining nodes
#[structopt(long = "allow-missing-data")]
pub(crate) allow_missing_data: bool,
} }
#[derive(Serialize, Deserialize, StructOpt, Debug)] #[derive(Serialize, Deserialize, StructOpt, Debug)]