cli_v2: implement LaunchRepairOperation and remove old stuff
All checks were successful
ci/woodpecker/pr/debug Pipeline was successful
ci/woodpecker/push/debug Pipeline was successful

This commit is contained in:
Alex 2025-02-05 15:36:47 +01:00
parent 406b6da163
commit f914db057a
18 changed files with 214 additions and 283 deletions

2
Cargo.lock generated
View file

@ -1258,7 +1258,6 @@ dependencies = [
"opentelemetry-otlp",
"opentelemetry-prometheus",
"parse_duration",
"serde",
"serde_json",
"sha1",
"sha2",
@ -1282,6 +1281,7 @@ dependencies = [
"format_table",
"futures",
"garage_api_common",
"garage_block",
"garage_model",
"garage_rpc",
"garage_table",

View file

@ -16,6 +16,7 @@ path = "lib.rs"
[dependencies]
format_table.workspace = true
garage_model.workspace = true
garage_block.workspace = true
garage_table.workspace = true
garage_util.workspace = true
garage_rpc.workspace = true

View file

@ -81,6 +81,7 @@ admin_endpoints![
CreateMetadataSnapshot,
GetNodeStatistics,
GetClusterStatistics,
LaunchRepairOperation,
// Worker operations
ListWorkers,
@ -99,6 +100,7 @@ local_admin_endpoints![
// Node operations
CreateMetadataSnapshot,
GetNodeStatistics,
LaunchRepairOperation,
// Background workers
ListWorkers,
GetWorkerInfo,
@ -663,6 +665,38 @@ pub struct GetClusterStatisticsResponse {
pub freeform: String,
}
// ---- LaunchRepairOperation ----
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalLaunchRepairOperationRequest {
pub repair_type: RepairType,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum RepairType {
Tables,
Blocks,
Versions,
MultipartUploads,
BlockRefs,
BlockRc,
Rebalance,
Scrub(ScrubCommand),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum ScrubCommand {
Start,
Pause,
Resume,
Cancel,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalLaunchRepairOperationResponse;
// **********************************************
// Worker operations
// **********************************************

View file

@ -17,6 +17,7 @@ mod special;
mod block;
mod node;
mod repair;
mod worker;
use std::sync::Arc;

View file

@ -4,6 +4,14 @@ use std::time::Duration;
use async_trait::async_trait;
use tokio::sync::watch;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::{Error as GarageError, OkOrMessage};
use garage_util::migrate::Migrate;
use garage_table::replication::*;
use garage_table::*;
use garage_block::manager::BlockManager;
use garage_block::repair::ScrubWorkerCommand;
@ -13,82 +21,77 @@ use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
use garage_table::replication::*;
use garage_table::*;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::migrate::Migrate;
use crate::*;
use crate::api::*;
use crate::error::Error;
use crate::{Admin, RequestHandler};
const RC_REPAIR_ITER_COUNT: usize = 64;
pub async fn launch_online_repair(
garage: &Arc<Garage>,
bg: &BackgroundRunner,
opt: RepairOpt,
) -> Result<(), Error> {
match opt.what {
RepairWhat::Tables => {
info!("Launching a full sync of tables");
garage.bucket_table.syncer.add_full_sync()?;
garage.object_table.syncer.add_full_sync()?;
garage.version_table.syncer.add_full_sync()?;
garage.block_ref_table.syncer.add_full_sync()?;
garage.key_table.syncer.add_full_sync()?;
}
RepairWhat::Versions => {
info!("Repairing the versions table");
bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions));
}
RepairWhat::MultipartUploads => {
info!("Repairing the multipart uploads table");
bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu));
}
RepairWhat::BlockRefs => {
info!("Repairing the block refs table");
bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs));
}
RepairWhat::BlockRc => {
info!("Repairing the block reference counters");
bg.spawn_worker(BlockRcRepair::new(
garage.block_manager.clone(),
garage.block_ref_table.clone(),
));
}
RepairWhat::Blocks => {
info!("Repairing the stored blocks");
bg.spawn_worker(garage_block::repair::RepairWorker::new(
garage.block_manager.clone(),
));
}
RepairWhat::Scrub { cmd } => {
let cmd = match cmd {
ScrubCmd::Start => ScrubWorkerCommand::Start,
ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)),
ScrubCmd::Resume => ScrubWorkerCommand::Resume,
ScrubCmd::Cancel => ScrubWorkerCommand::Cancel,
ScrubCmd::SetTranquility { tranquility } => {
garage
.block_manager
.scrub_persister
.set_with(|x| x.tranquility = tranquility)?;
return Ok(());
}
};
info!("Sending command to scrub worker: {:?}", cmd);
garage.block_manager.send_scrub_command(cmd).await?;
}
RepairWhat::Rebalance => {
info!("Rebalancing the stored blocks among storage locations");
bg.spawn_worker(garage_block::repair::RebalanceWorker::new(
garage.block_manager.clone(),
));
#[async_trait]
impl RequestHandler for LocalLaunchRepairOperationRequest {
type Response = LocalLaunchRepairOperationResponse;
async fn handle(
self,
garage: &Arc<Garage>,
admin: &Admin,
) -> Result<LocalLaunchRepairOperationResponse, Error> {
let bg = &admin.background;
match self.repair_type {
RepairType::Tables => {
info!("Launching a full sync of tables");
garage.bucket_table.syncer.add_full_sync()?;
garage.object_table.syncer.add_full_sync()?;
garage.version_table.syncer.add_full_sync()?;
garage.block_ref_table.syncer.add_full_sync()?;
garage.key_table.syncer.add_full_sync()?;
}
RepairType::Versions => {
info!("Repairing the versions table");
bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions));
}
RepairType::MultipartUploads => {
info!("Repairing the multipart uploads table");
bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu));
}
RepairType::BlockRefs => {
info!("Repairing the block refs table");
bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs));
}
RepairType::BlockRc => {
info!("Repairing the block reference counters");
bg.spawn_worker(BlockRcRepair::new(
garage.block_manager.clone(),
garage.block_ref_table.clone(),
));
}
RepairType::Blocks => {
info!("Repairing the stored blocks");
bg.spawn_worker(garage_block::repair::RepairWorker::new(
garage.block_manager.clone(),
));
}
RepairType::Scrub(cmd) => {
let cmd = match cmd {
ScrubCommand::Start => ScrubWorkerCommand::Start,
ScrubCommand::Pause => {
ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24))
}
ScrubCommand::Resume => ScrubWorkerCommand::Resume,
ScrubCommand::Cancel => ScrubWorkerCommand::Cancel,
};
info!("Sending command to scrub worker: {:?}", cmd);
garage.block_manager.send_scrub_command(cmd).await?;
}
RepairType::Rebalance => {
info!("Rebalancing the stored blocks among storage locations");
bg.spawn_worker(garage_block::repair::RebalanceWorker::new(
garage.block_manager.clone(),
));
}
}
Ok(LocalLaunchRepairOperationResponse)
}
Ok(())
}
// ----
@ -103,7 +106,7 @@ trait TableRepair: Send + Sync + 'static {
&mut self,
garage: &Garage,
entry: <<Self as TableRepair>::T as TableSchema>::E,
) -> Result<bool, Error>;
) -> Result<bool, GarageError>;
}
struct TableRepairWorker<T: TableRepair> {
@ -139,7 +142,10 @@ impl<R: TableRepair> Worker for TableRepairWorker<R> {
}
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerState, GarageError> {
let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? {
Some((k, v)) => (v, k),
None => {
@ -182,7 +188,7 @@ impl TableRepair for RepairVersions {
&garage.version_table
}
async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, Error> {
async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, GarageError> {
if !version.deleted.get() {
let ref_exists = match &version.backlink {
VersionBacklink::Object { bucket_id, key } => garage
@ -229,7 +235,11 @@ impl TableRepair for RepairBlockRefs {
&garage.block_ref_table
}
async fn process(&mut self, garage: &Garage, mut block_ref: BlockRef) -> Result<bool, Error> {
async fn process(
&mut self,
garage: &Garage,
mut block_ref: BlockRef,
) -> Result<bool, GarageError> {
if !block_ref.deleted.get() {
let ref_exists = garage
.version_table
@ -265,7 +275,11 @@ impl TableRepair for RepairMpu {
&garage.mpu_table
}
async fn process(&mut self, garage: &Garage, mut mpu: MultipartUpload) -> Result<bool, Error> {
async fn process(
&mut self,
garage: &Garage,
mut mpu: MultipartUpload,
) -> Result<bool, GarageError> {
if !mpu.deleted.get() {
let ref_exists = garage
.object_table
@ -332,7 +346,10 @@ impl Worker for BlockRcRepair {
}
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerState, GarageError> {
for _i in 0..RC_REPAIR_ITER_COUNT {
let next1 = self
.block_manager

View file

@ -63,6 +63,7 @@ impl AdminApiRequest {
POST CreateMetadataSnapshot (default::body, query::node),
GET GetNodeStatistics (default::body, query::node),
GET GetClusterStatistics (),
POST LaunchRepairOperation (body_field, query::node),
// Worker APIs
POST ListWorkers (body_field, query::node),
POST GetWorkerInfo (body_field, query::node),

View file

@ -49,8 +49,6 @@ sodiumoxide.workspace = true
structopt.workspace = true
git-version.workspace = true
serde.workspace = true
futures.workspace = true
tokio.workspace = true

View file

@ -1,108 +0,0 @@
use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use garage_util::background::BackgroundRunner;
use garage_util::error::Error as GarageError;
use garage_rpc::*;
use garage_model::garage::Garage;
use garage_model::helper::error::Error;
use crate::cli::*;
use crate::repair::online::launch_online_repair;
pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
#[derive(Debug, Serialize, Deserialize)]
#[allow(clippy::large_enum_variant)]
pub enum AdminRpc {
LaunchRepair(RepairOpt),
// Replies
Ok(String),
}
impl Rpc for AdminRpc {
type Response = Result<AdminRpc, Error>;
}
pub struct AdminRpcHandler {
garage: Arc<Garage>,
background: Arc<BackgroundRunner>,
endpoint: Arc<Endpoint<AdminRpc, Self>>,
}
impl AdminRpcHandler {
pub fn new(garage: Arc<Garage>, background: Arc<BackgroundRunner>) -> Arc<Self> {
let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
let admin = Arc::new(Self {
garage,
background,
endpoint,
});
admin.endpoint.set_handler(admin.clone());
admin
}
// ================ REPAIR COMMANDS ====================
async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
if !opt.yes {
return Err(Error::BadRequest(
"Please provide the --yes flag to initiate repair operations.".to_string(),
));
}
if opt.all_nodes {
let mut opt_to_send = opt.clone();
opt_to_send.all_nodes = false;
let mut failures = vec![];
let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
for node in all_nodes.iter() {
let node = (*node).into();
let resp = self
.endpoint
.call(
&node,
AdminRpc::LaunchRepair(opt_to_send.clone()),
PRIO_NORMAL,
)
.await;
if !matches!(resp, Ok(Ok(_))) {
failures.push(node);
}
}
if failures.is_empty() {
Ok(AdminRpc::Ok("Repair launched on all nodes".to_string()))
} else {
Err(Error::BadRequest(format!(
"Could not launch repair on nodes: {:?} (launched successfully on other nodes)",
failures
)))
}
} else {
launch_online_repair(&self.garage, &self.background, opt).await?;
Ok(AdminRpc::Ok(format!(
"Repair launched on {:?}",
self.garage.system.id
)))
}
}
}
#[async_trait]
impl EndpointHandler<AdminRpc> for AdminRpcHandler {
async fn handle(
self: &Arc<Self>,
message: &AdminRpc,
_from: NodeID,
) -> Result<AdminRpc, Error> {
match message {
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
}
}
}

View file

@ -1,21 +0,0 @@
use garage_rpc::*;
use garage_model::helper::error::Error as HelperError;
use crate::admin::*;
pub async fn cmd_admin(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
args: AdminRpc,
) -> Result<(), HelperError> {
match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? {
AdminRpc::Ok(msg) => {
println!("{}", msg);
}
r => {
error!("Unexpected response: {:?}", r);
}
}
Ok(())
}

View file

@ -7,7 +7,7 @@ use garage_rpc::layout::*;
use garage_rpc::system::*;
use garage_rpc::*;
use crate::cli::*;
use crate::cli::structs::*;
pub async fn cmd_show_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,

View file

@ -1,10 +1,7 @@
pub(crate) mod cmd;
pub(crate) mod init;
pub(crate) mod layout;
pub(crate) mod structs;
pub(crate) mod convert_db;
pub(crate) mod init;
pub(crate) mod repair;
pub(crate) use cmd::*;
pub(crate) use init::*;
pub(crate) use structs::*;
pub(crate) mod layout;

View file

@ -1,4 +1,3 @@
use serde::{Deserialize, Serialize};
use structopt::StructOpt;
use garage_util::version::garage_version;
@ -190,7 +189,7 @@ pub struct SkipDeadNodesOpt {
pub(crate) allow_missing_data: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub enum BucketOperation {
/// List buckets
#[structopt(name = "list", version = garage_version())]
@ -237,7 +236,7 @@ pub enum BucketOperation {
CleanupIncompleteUploads(CleanupIncompleteUploadsOpt),
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct WebsiteOpt {
/// Create
#[structopt(long = "allow")]
@ -259,13 +258,13 @@ pub struct WebsiteOpt {
pub error_document: Option<String>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct BucketOpt {
/// Bucket name
pub name: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct DeleteBucketOpt {
/// Bucket name
pub name: String,
@ -275,7 +274,7 @@ pub struct DeleteBucketOpt {
pub yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct AliasBucketOpt {
/// Existing bucket name (its alias in global namespace or its full hex uuid)
pub existing_bucket: String,
@ -288,7 +287,7 @@ pub struct AliasBucketOpt {
pub local: Option<String>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct UnaliasBucketOpt {
/// Bucket name
pub name: String,
@ -298,7 +297,7 @@ pub struct UnaliasBucketOpt {
pub local: Option<String>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct PermBucketOpt {
/// Access key name or ID
#[structopt(long = "key")]
@ -321,7 +320,7 @@ pub struct PermBucketOpt {
pub bucket: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct SetQuotasOpt {
/// Bucket name
pub bucket: String,
@ -336,7 +335,7 @@ pub struct SetQuotasOpt {
pub max_objects: Option<String>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct CleanupIncompleteUploadsOpt {
/// Abort multipart uploads older than this value
#[structopt(long = "older-than", default_value = "1d")]
@ -347,7 +346,7 @@ pub struct CleanupIncompleteUploadsOpt {
pub buckets: Vec<String>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub enum KeyOperation {
/// List keys
#[structopt(name = "list", version = garage_version())]
@ -382,7 +381,7 @@ pub enum KeyOperation {
Import(KeyImportOpt),
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct KeyInfoOpt {
/// ID or name of the key
pub key_pattern: String,
@ -391,14 +390,14 @@ pub struct KeyInfoOpt {
pub show_secret: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct KeyNewOpt {
/// Name of the key
#[structopt(default_value = "Unnamed key")]
pub name: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct KeyRenameOpt {
/// ID or name of the key
pub key_pattern: String,
@ -407,7 +406,7 @@ pub struct KeyRenameOpt {
pub new_name: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct KeyDeleteOpt {
/// ID or name of the key
pub key_pattern: String,
@ -417,7 +416,7 @@ pub struct KeyDeleteOpt {
pub yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct KeyPermOpt {
/// ID or name of the key
pub key_pattern: String,
@ -427,7 +426,7 @@ pub struct KeyPermOpt {
pub create_bucket: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct KeyImportOpt {
/// Access key ID
pub key_id: String,
@ -444,7 +443,7 @@ pub struct KeyImportOpt {
pub yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
#[derive(StructOpt, Debug, Clone)]
pub struct RepairOpt {
/// Launch repair operation on all nodes
#[structopt(short = "a", long = "all-nodes")]
@ -458,7 +457,7 @@ pub struct RepairOpt {
pub what: RepairWhat,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum RepairWhat {
/// Do a full sync of metadata tables
#[structopt(name = "tables", version = garage_version())]
@ -489,7 +488,7 @@ pub enum RepairWhat {
Rebalance,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum ScrubCmd {
/// Start scrub
#[structopt(name = "start", version = garage_version())]
@ -503,15 +502,9 @@ pub enum ScrubCmd {
/// Cancel scrub in progress
#[structopt(name = "cancel", version = garage_version())]
Cancel,
/// Set tranquility level for in-progress and future scrubs
#[structopt(name = "set-tranquility", version = garage_version())]
SetTranquility {
#[structopt()]
tranquility: u32,
},
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
#[derive(StructOpt, Debug, Clone)]
pub struct OfflineRepairOpt {
/// Confirm the launch of the repair operation
#[structopt(long = "yes")]
@ -521,7 +514,7 @@ pub struct OfflineRepairOpt {
pub what: OfflineRepairWhat,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum OfflineRepairWhat {
/// Repair K2V item counters
#[cfg(feature = "k2v")]
@ -532,19 +525,14 @@ pub enum OfflineRepairWhat {
ObjectCounters,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
#[derive(StructOpt, Debug, Clone)]
pub struct StatsOpt {
/// Gather statistics from all nodes
#[structopt(short = "a", long = "all-nodes")]
pub all_nodes: bool,
/// Don't show global cluster stats (internal use in RPC)
#[structopt(skip)]
#[serde(default)]
pub skip_global: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum WorkerOperation {
/// List all workers on Garage node
#[structopt(name = "list", version = garage_version())]
@ -577,7 +565,7 @@ pub enum WorkerOperation {
},
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
#[derive(StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
pub struct WorkerListOpt {
/// Show only busy workers
#[structopt(short = "b", long = "busy")]
@ -587,7 +575,7 @@ pub struct WorkerListOpt {
pub errors: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum BlockOperation {
/// List all blocks that currently have a resync error
#[structopt(name = "list-errors", version = garage_version())]
@ -619,7 +607,7 @@ pub enum BlockOperation {
},
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
#[derive(StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
pub enum MetaOperation {
/// Save a snapshot of the metadata db file
#[structopt(name = "snapshot", version = garage_version())]

View file

@ -20,14 +20,10 @@ use garage_api_admin::api::*;
use garage_api_admin::api_server::{AdminRpc as ProxyRpc, AdminRpcResponse as ProxyRpcResponse};
use garage_api_admin::RequestHandler;
use crate::admin::*;
use crate::cli as cli_v1;
use crate::cli::structs::*;
use crate::cli::Command;
pub struct Cli {
pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>,
pub admin_rpc_endpoint: Arc<Endpoint<AdminRpc, ()>>,
pub proxy_rpc_endpoint: Arc<Endpoint<ProxyRpc, ()>>,
pub rpc_host: NodeID,
}
@ -46,15 +42,7 @@ impl Cli {
Command::Block(bo) => self.cmd_block(bo).await,
Command::Meta(mo) => self.cmd_meta(mo).await,
Command::Stats(so) => self.cmd_stats(so).await,
// TODO
Command::Repair(ro) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::LaunchRepair(ro),
)
.await
.ok_or_message("cli_v1"),
Command::Repair(ro) => self.cmd_repair(ro).await,
_ => unreachable!(),
}

View file

@ -27,7 +27,7 @@ impl Cli {
table.push(format!("{:.16}\tError: {}", node, err));
}
for (node, _) in res.success.iter() {
table.push(format!("{:.16}\tOk", node));
table.push(format!("{:.16}\tSnapshot created", node));
}
format_table(table);
@ -64,4 +64,50 @@ impl Cli {
Ok(())
}
pub async fn cmd_repair(&self, cmd: RepairOpt) -> Result<(), Error> {
if !cmd.yes {
return Err(Error::Message(
"Please add --yes to start the repair operation".into(),
));
}
let repair_type = match cmd.what {
RepairWhat::Tables => RepairType::Tables,
RepairWhat::Blocks => RepairType::Blocks,
RepairWhat::Versions => RepairType::Versions,
RepairWhat::MultipartUploads => RepairType::MultipartUploads,
RepairWhat::BlockRefs => RepairType::BlockRefs,
RepairWhat::BlockRc => RepairType::BlockRc,
RepairWhat::Rebalance => RepairType::Rebalance,
RepairWhat::Scrub { cmd } => RepairType::Scrub(match cmd {
ScrubCmd::Start => ScrubCommand::Start,
ScrubCmd::Cancel => ScrubCommand::Cancel,
ScrubCmd::Pause => ScrubCommand::Pause,
ScrubCmd::Resume => ScrubCommand::Resume,
}),
};
let res = self
.api_request(LaunchRepairOperationRequest {
node: if cmd.all_nodes {
"*".to_string()
} else {
hex::encode(self.rpc_host)
},
body: LocalLaunchRepairOperationRequest { repair_type },
})
.await?;
let mut table = vec![];
for (node, err) in res.error.iter() {
table.push(format!("{:.16}\tError: {}", node, err));
}
for (node, _) in res.success.iter() {
table.push(format!("{:.16}\tRepair launched", node));
}
format_table(table);
Ok(())
}
}

View file

@ -4,10 +4,8 @@
#[macro_use]
extern crate tracing;
mod admin;
mod cli;
mod cli_v2;
mod repair;
mod secrets;
mod server;
#[cfg(feature = "telemetry-otlp")]
@ -37,8 +35,7 @@ use garage_rpc::*;
use garage_api_admin::api_server::{AdminRpc as ProxyRpc, ADMIN_RPC_PATH as PROXY_RPC_PATH};
use admin::*;
use cli::*;
use cli::structs::*;
use secrets::Secrets;
#[derive(StructOpt, Debug)]
@ -146,13 +143,13 @@ async fn main() {
let res = match opt.cmd {
Command::Server => server::run_server(opt.config_file, opt.secrets).await,
Command::OfflineRepair(repair_opt) => {
repair::offline::offline_repair(opt.config_file, opt.secrets, repair_opt).await
cli::repair::offline_repair(opt.config_file, opt.secrets, repair_opt).await
}
Command::ConvertDb(conv_opt) => {
cli::convert_db::do_conversion(conv_opt).map_err(From::from)
}
Command::Node(NodeOperation::NodeId(node_id_opt)) => {
node_id_command(opt.config_file, node_id_opt.quiet)
cli::init::node_id_command(opt.config_file, node_id_opt.quiet)
}
_ => cli_command(opt).await,
};
@ -253,7 +250,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
(id, addrs[0], false)
} else {
let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir)
.err_context(READ_KEY_ERROR)?;
.err_context(cli::init::READ_KEY_ERROR)?;
if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr.as_ref()) {
use std::net::ToSocketAddrs;
let a = a
@ -283,12 +280,10 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
}
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
let proxy_rpc_endpoint = netapp.endpoint::<ProxyRpc, ()>(PROXY_RPC_PATH.into());
let cli = cli_v2::Cli {
system_rpc_endpoint,
admin_rpc_endpoint,
proxy_rpc_endpoint,
rpc_host: id,
};

View file

@ -1,2 +0,0 @@
pub mod offline;
pub mod online;

View file

@ -14,7 +14,6 @@ use garage_web::WebServer;
#[cfg(feature = "k2v")]
use garage_api_k2v::api_server::K2VApiServer;
use crate::admin::*;
use crate::secrets::{fill_secrets, Secrets};
#[cfg(feature = "telemetry-otlp")]
use crate::tracing_setup::*;
@ -74,9 +73,6 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
info!("Launching internal Garage cluster communications...");
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
info!("Create admin RPC handler...");
AdminRpcHandler::new(garage.clone(), background.clone());
// ---- Launch public-facing API servers ----
let mut servers = vec![];