metadata db snapshotting #775

Merged
lx merged 4 commits from db-snapshot into main 2024-03-15 13:17:54 +00:00
3 changed files with 58 additions and 0 deletions
Showing only changes of commit a68c37555d - Show all commits

View file

@ -46,6 +46,7 @@ pub enum AdminRpc {
Stats(StatsOpt), Stats(StatsOpt),
Worker(WorkerOperation), Worker(WorkerOperation),
BlockOperation(BlockOperation), BlockOperation(BlockOperation),
MetaOperation(MetaOperation),
// Replies // Replies
Ok(String), Ok(String),
@ -518,6 +519,44 @@ impl AdminRpcHandler {
)])) )]))
} }
} }
// ================ META DB COMMANDS ====================
async fn handle_meta_cmd(self: &Arc<Self>, mo: &MetaOperation) -> Result<AdminRpc, Error> {
match mo {
MetaOperation::Snapshot { all: true } => {
let ring = self.garage.system.ring.borrow().clone();
let to = ring.layout.node_ids().to_vec();
let resps = futures::future::join_all(to.iter().map(|to| async move {
let to = (*to).into();
self.endpoint
.call(
&to,
AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }),
PRIO_NORMAL,
)
.await
}))
.await;
let mut ret = vec![];
for (to, resp) in to.iter().zip(resps.iter()) {
let res_str = match resp {
Ok(_) => "ok".to_string(),
Err(e) => format!("error: {}", e),
};
ret.push(format!("{:?}\t{}", to, res_str));
}
Ok(AdminRpc::Ok(format_table_to_string(ret)))
}
MetaOperation::Snapshot { all: false } => {
garage_model::snapshot::async_snapshot_metadata(&self.garage).await?;
Ok(AdminRpc::Ok("Snapshot has been saved.".into()))
}
}
}
} }
#[async_trait] #[async_trait]
@ -535,6 +574,7 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await, AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await, AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
m => Err(GarageError::unexpected_rpc_message(m).into()), m => Err(GarageError::unexpected_rpc_message(m).into()),
} }
} }

View file

@ -44,6 +44,9 @@ pub async fn cli_command_dispatch(
Command::Block(bo) => { Command::Block(bo) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BlockOperation(bo)).await cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BlockOperation(bo)).await
} }
Command::Meta(mo) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::MetaOperation(mo)).await
}
_ => unreachable!(), _ => unreachable!(),
} }
} }

View file

@ -57,6 +57,10 @@ pub enum Command {
#[structopt(name = "block", version = garage_version())] #[structopt(name = "block", version = garage_version())]
Block(BlockOperation), Block(BlockOperation),
/// Operations on the metadata db
#[structopt(name = "meta", version = garage_version())]
Meta(MetaOperation),
/// Convert metadata db between database engine formats /// Convert metadata db between database engine formats
#[structopt(name = "convert-db", version = garage_version())] #[structopt(name = "convert-db", version = garage_version())]
ConvertDb(convert_db::ConvertDbOpt), ConvertDb(convert_db::ConvertDbOpt),
@ -617,3 +621,14 @@ pub enum BlockOperation {
blocks: Vec<String>, blocks: Vec<String>,
}, },
} }
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
pub enum MetaOperation {
/// Save a snapshot of the metadata db file
#[structopt(name = "snapshot", version = garage_version())]
Snapshot {
/// Run on all nodes instead of only local node
#[structopt(long = "all")]
all: bool,
},
}