From f914db057a85e0fa70f319ee3af85998a551af96 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 5 Feb 2025 15:36:47 +0100 Subject: [PATCH] cli_v2: implement LaunchRepairOperation and remove old stuff --- Cargo.lock | 2 +- src/api/admin/Cargo.toml | 1 + src/api/admin/api.rs | 34 ++++ src/api/admin/lib.rs | 1 + .../repair/online.rs => api/admin/repair.rs} | 171 ++++++++++-------- src/api/admin/router_v2.rs | 1 + src/garage/Cargo.toml | 2 - src/garage/admin/mod.rs | 108 ----------- src/garage/cli/cmd.rs | 21 --- src/garage/cli/layout.rs | 2 +- src/garage/cli/mod.rs | 9 +- .../{repair/offline.rs => cli/repair.rs} | 0 src/garage/cli/structs.rs | 64 +++---- src/garage/cli_v2/mod.rs | 14 +- src/garage/cli_v2/node.rs | 48 ++++- src/garage/main.rs | 13 +- src/garage/repair/mod.rs | 2 - src/garage/server.rs | 4 - 18 files changed, 214 insertions(+), 283 deletions(-) rename src/{garage/repair/online.rs => api/admin/repair.rs} (69%) delete mode 100644 src/garage/admin/mod.rs delete mode 100644 src/garage/cli/cmd.rs rename src/garage/{repair/offline.rs => cli/repair.rs} (100%) delete mode 100644 src/garage/repair/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 9ba0d553..0b86147b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1258,7 +1258,6 @@ dependencies = [ "opentelemetry-otlp", "opentelemetry-prometheus", "parse_duration", - "serde", "serde_json", "sha1", "sha2", @@ -1282,6 +1281,7 @@ dependencies = [ "format_table", "futures", "garage_api_common", + "garage_block", "garage_model", "garage_rpc", "garage_table", diff --git a/src/api/admin/Cargo.toml b/src/api/admin/Cargo.toml index 7b1ad2f0..9ac099e8 100644 --- a/src/api/admin/Cargo.toml +++ b/src/api/admin/Cargo.toml @@ -16,6 +16,7 @@ path = "lib.rs" [dependencies] format_table.workspace = true garage_model.workspace = true +garage_block.workspace = true garage_table.workspace = true garage_util.workspace = true garage_rpc.workspace = true diff --git a/src/api/admin/api.rs b/src/api/admin/api.rs index 4caae02c..48c9ee0b 100644 --- a/src/api/admin/api.rs +++ b/src/api/admin/api.rs @@ -81,6 +81,7 @@ admin_endpoints![ CreateMetadataSnapshot, GetNodeStatistics, GetClusterStatistics, + LaunchRepairOperation, // Worker operations ListWorkers, @@ -99,6 +100,7 @@ local_admin_endpoints![ // Node operations CreateMetadataSnapshot, GetNodeStatistics, + LaunchRepairOperation, // Background workers ListWorkers, GetWorkerInfo, @@ -663,6 +665,38 @@ pub struct GetClusterStatisticsResponse { pub freeform: String, } +// ---- LaunchRepairOperation ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalLaunchRepairOperationRequest { + pub repair_type: RepairType, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum RepairType { + Tables, + Blocks, + Versions, + MultipartUploads, + BlockRefs, + BlockRc, + Rebalance, + Scrub(ScrubCommand), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum ScrubCommand { + Start, + Pause, + Resume, + Cancel, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalLaunchRepairOperationResponse; + // ********************************************** // Worker operations // ********************************************** diff --git a/src/api/admin/lib.rs b/src/api/admin/lib.rs index cc673eef..fe4b0598 100644 --- a/src/api/admin/lib.rs +++ b/src/api/admin/lib.rs @@ -17,6 +17,7 @@ mod special; mod block; mod node; +mod repair; mod worker; use std::sync::Arc; diff --git a/src/garage/repair/online.rs b/src/api/admin/repair.rs similarity index 69% rename from src/garage/repair/online.rs rename to src/api/admin/repair.rs index 2c5227d2..19bb4d51 100644 --- a/src/garage/repair/online.rs +++ b/src/api/admin/repair.rs @@ -4,6 +4,14 @@ use std::time::Duration; use async_trait::async_trait; use tokio::sync::watch; +use garage_util::background::*; +use garage_util::data::*; +use garage_util::error::{Error as GarageError, OkOrMessage}; +use garage_util::migrate::Migrate; + +use garage_table::replication::*; +use garage_table::*; + use garage_block::manager::BlockManager; use garage_block::repair::ScrubWorkerCommand; @@ -13,82 +21,77 @@ use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use garage_table::replication::*; -use garage_table::*; - -use garage_util::background::*; -use garage_util::data::*; -use garage_util::error::Error; -use garage_util::migrate::Migrate; - -use crate::*; +use crate::api::*; +use crate::error::Error; +use crate::{Admin, RequestHandler}; const RC_REPAIR_ITER_COUNT: usize = 64; -pub async fn launch_online_repair( - garage: &Arc, - bg: &BackgroundRunner, - opt: RepairOpt, -) -> Result<(), Error> { - match opt.what { - RepairWhat::Tables => { - info!("Launching a full sync of tables"); - garage.bucket_table.syncer.add_full_sync()?; - garage.object_table.syncer.add_full_sync()?; - garage.version_table.syncer.add_full_sync()?; - garage.block_ref_table.syncer.add_full_sync()?; - garage.key_table.syncer.add_full_sync()?; - } - RepairWhat::Versions => { - info!("Repairing the versions table"); - bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); - } - RepairWhat::MultipartUploads => { - info!("Repairing the multipart uploads table"); - bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu)); - } - RepairWhat::BlockRefs => { - info!("Repairing the block refs table"); - bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); - } - RepairWhat::BlockRc => { - info!("Repairing the block reference counters"); - bg.spawn_worker(BlockRcRepair::new( - garage.block_manager.clone(), - garage.block_ref_table.clone(), - )); - } - RepairWhat::Blocks => { - info!("Repairing the stored blocks"); - bg.spawn_worker(garage_block::repair::RepairWorker::new( - garage.block_manager.clone(), - )); - } - RepairWhat::Scrub { cmd } => { - let cmd = match cmd { - ScrubCmd::Start => ScrubWorkerCommand::Start, - ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)), - ScrubCmd::Resume => ScrubWorkerCommand::Resume, - ScrubCmd::Cancel => ScrubWorkerCommand::Cancel, - ScrubCmd::SetTranquility { tranquility } => { - garage - .block_manager - .scrub_persister - .set_with(|x| x.tranquility = tranquility)?; - return Ok(()); - } - }; - info!("Sending command to scrub worker: {:?}", cmd); - garage.block_manager.send_scrub_command(cmd).await?; - } - RepairWhat::Rebalance => { - info!("Rebalancing the stored blocks among storage locations"); - bg.spawn_worker(garage_block::repair::RebalanceWorker::new( - garage.block_manager.clone(), - )); +#[async_trait] +impl RequestHandler for LocalLaunchRepairOperationRequest { + type Response = LocalLaunchRepairOperationResponse; + + async fn handle( + self, + garage: &Arc, + admin: &Admin, + ) -> Result { + let bg = &admin.background; + match self.repair_type { + RepairType::Tables => { + info!("Launching a full sync of tables"); + garage.bucket_table.syncer.add_full_sync()?; + garage.object_table.syncer.add_full_sync()?; + garage.version_table.syncer.add_full_sync()?; + garage.block_ref_table.syncer.add_full_sync()?; + garage.key_table.syncer.add_full_sync()?; + } + RepairType::Versions => { + info!("Repairing the versions table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); + } + RepairType::MultipartUploads => { + info!("Repairing the multipart uploads table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu)); + } + RepairType::BlockRefs => { + info!("Repairing the block refs table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); + } + RepairType::BlockRc => { + info!("Repairing the block reference counters"); + bg.spawn_worker(BlockRcRepair::new( + garage.block_manager.clone(), + garage.block_ref_table.clone(), + )); + } + RepairType::Blocks => { + info!("Repairing the stored blocks"); + bg.spawn_worker(garage_block::repair::RepairWorker::new( + garage.block_manager.clone(), + )); + } + RepairType::Scrub(cmd) => { + let cmd = match cmd { + ScrubCommand::Start => ScrubWorkerCommand::Start, + ScrubCommand::Pause => { + ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)) + } + ScrubCommand::Resume => ScrubWorkerCommand::Resume, + ScrubCommand::Cancel => ScrubWorkerCommand::Cancel, + }; + info!("Sending command to scrub worker: {:?}", cmd); + garage.block_manager.send_scrub_command(cmd).await?; + } + RepairType::Rebalance => { + info!("Rebalancing the stored blocks among storage locations"); + bg.spawn_worker(garage_block::repair::RebalanceWorker::new( + garage.block_manager.clone(), + )); + } } + Ok(LocalLaunchRepairOperationResponse) } - Ok(()) } // ---- @@ -103,7 +106,7 @@ trait TableRepair: Send + Sync + 'static { &mut self, garage: &Garage, entry: <::T as TableSchema>::E, - ) -> Result; + ) -> Result; } struct TableRepairWorker { @@ -139,7 +142,10 @@ impl Worker for TableRepairWorker { } } - async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { + async fn work( + &mut self, + _must_exit: &mut watch::Receiver, + ) -> Result { let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? { Some((k, v)) => (v, k), None => { @@ -182,7 +188,7 @@ impl TableRepair for RepairVersions { &garage.version_table } - async fn process(&mut self, garage: &Garage, version: Version) -> Result { + async fn process(&mut self, garage: &Garage, version: Version) -> Result { if !version.deleted.get() { let ref_exists = match &version.backlink { VersionBacklink::Object { bucket_id, key } => garage @@ -229,7 +235,11 @@ impl TableRepair for RepairBlockRefs { &garage.block_ref_table } - async fn process(&mut self, garage: &Garage, mut block_ref: BlockRef) -> Result { + async fn process( + &mut self, + garage: &Garage, + mut block_ref: BlockRef, + ) -> Result { if !block_ref.deleted.get() { let ref_exists = garage .version_table @@ -265,7 +275,11 @@ impl TableRepair for RepairMpu { &garage.mpu_table } - async fn process(&mut self, garage: &Garage, mut mpu: MultipartUpload) -> Result { + async fn process( + &mut self, + garage: &Garage, + mut mpu: MultipartUpload, + ) -> Result { if !mpu.deleted.get() { let ref_exists = garage .object_table @@ -332,7 +346,10 @@ impl Worker for BlockRcRepair { } } - async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { + async fn work( + &mut self, + _must_exit: &mut watch::Receiver, + ) -> Result { for _i in 0..RC_REPAIR_ITER_COUNT { let next1 = self .block_manager diff --git a/src/api/admin/router_v2.rs b/src/api/admin/router_v2.rs index a0f415c2..4d5c015e 100644 --- a/src/api/admin/router_v2.rs +++ b/src/api/admin/router_v2.rs @@ -63,6 +63,7 @@ impl AdminApiRequest { POST CreateMetadataSnapshot (default::body, query::node), GET GetNodeStatistics (default::body, query::node), GET GetClusterStatistics (), + POST LaunchRepairOperation (body_field, query::node), // Worker APIs POST ListWorkers (body_field, query::node), POST GetWorkerInfo (body_field, query::node), diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 4f823fc6..c566c3e0 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -49,8 +49,6 @@ sodiumoxide.workspace = true structopt.workspace = true git-version.workspace = true -serde.workspace = true - futures.workspace = true tokio.workspace = true diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs deleted file mode 100644 index c4ab2810..00000000 --- a/src/garage/admin/mod.rs +++ /dev/null @@ -1,108 +0,0 @@ -use std::sync::Arc; - -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; - -use garage_util::background::BackgroundRunner; -use garage_util::error::Error as GarageError; - -use garage_rpc::*; - -use garage_model::garage::Garage; -use garage_model::helper::error::Error; - -use crate::cli::*; -use crate::repair::online::launch_online_repair; - -pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; - -#[derive(Debug, Serialize, Deserialize)] -#[allow(clippy::large_enum_variant)] -pub enum AdminRpc { - LaunchRepair(RepairOpt), - - // Replies - Ok(String), -} - -impl Rpc for AdminRpc { - type Response = Result; -} - -pub struct AdminRpcHandler { - garage: Arc, - background: Arc, - endpoint: Arc>, -} - -impl AdminRpcHandler { - pub fn new(garage: Arc, background: Arc) -> Arc { - let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into()); - let admin = Arc::new(Self { - garage, - background, - endpoint, - }); - admin.endpoint.set_handler(admin.clone()); - admin - } - - // ================ REPAIR COMMANDS ==================== - - async fn handle_launch_repair(self: &Arc, opt: RepairOpt) -> Result { - if !opt.yes { - return Err(Error::BadRequest( - "Please provide the --yes flag to initiate repair operations.".to_string(), - )); - } - if opt.all_nodes { - let mut opt_to_send = opt.clone(); - opt_to_send.all_nodes = false; - - let mut failures = vec![]; - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in all_nodes.iter() { - let node = (*node).into(); - let resp = self - .endpoint - .call( - &node, - AdminRpc::LaunchRepair(opt_to_send.clone()), - PRIO_NORMAL, - ) - .await; - if !matches!(resp, Ok(Ok(_))) { - failures.push(node); - } - } - if failures.is_empty() { - Ok(AdminRpc::Ok("Repair launched on all nodes".to_string())) - } else { - Err(Error::BadRequest(format!( - "Could not launch repair on nodes: {:?} (launched successfully on other nodes)", - failures - ))) - } - } else { - launch_online_repair(&self.garage, &self.background, opt).await?; - Ok(AdminRpc::Ok(format!( - "Repair launched on {:?}", - self.garage.system.id - ))) - } - } -} - -#[async_trait] -impl EndpointHandler for AdminRpcHandler { - async fn handle( - self: &Arc, - message: &AdminRpc, - _from: NodeID, - ) -> Result { - match message { - AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, - m => Err(GarageError::unexpected_rpc_message(m).into()), - } - } -} diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs deleted file mode 100644 index 1a9c7841..00000000 --- a/src/garage/cli/cmd.rs +++ /dev/null @@ -1,21 +0,0 @@ -use garage_rpc::*; - -use garage_model::helper::error::Error as HelperError; - -use crate::admin::*; - -pub async fn cmd_admin( - rpc_cli: &Endpoint, - rpc_host: NodeID, - args: AdminRpc, -) -> Result<(), HelperError> { - match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? { - AdminRpc::Ok(msg) => { - println!("{}", msg); - } - r => { - error!("Unexpected response: {:?}", r); - } - } - Ok(()) -} diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 15040aaa..bb77cc2a 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -7,7 +7,7 @@ use garage_rpc::layout::*; use garage_rpc::system::*; use garage_rpc::*; -use crate::cli::*; +use crate::cli::structs::*; pub async fn cmd_show_layout( rpc_cli: &Endpoint, diff --git a/src/garage/cli/mod.rs b/src/garage/cli/mod.rs index c15afda1..e007808b 100644 --- a/src/garage/cli/mod.rs +++ b/src/garage/cli/mod.rs @@ -1,10 +1,7 @@ -pub(crate) mod cmd; -pub(crate) mod init; -pub(crate) mod layout; pub(crate) mod structs; pub(crate) mod convert_db; +pub(crate) mod init; +pub(crate) mod repair; -pub(crate) use cmd::*; -pub(crate) use init::*; -pub(crate) use structs::*; +pub(crate) mod layout; diff --git a/src/garage/repair/offline.rs b/src/garage/cli/repair.rs similarity index 100% rename from src/garage/repair/offline.rs rename to src/garage/cli/repair.rs diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 4ec35e68..c6471515 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -1,4 +1,3 @@ -use serde::{Deserialize, Serialize}; use structopt::StructOpt; use garage_util::version::garage_version; @@ -190,7 +189,7 @@ pub struct SkipDeadNodesOpt { pub(crate) allow_missing_data: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub enum BucketOperation { /// List buckets #[structopt(name = "list", version = garage_version())] @@ -237,7 +236,7 @@ pub enum BucketOperation { CleanupIncompleteUploads(CleanupIncompleteUploadsOpt), } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct WebsiteOpt { /// Create #[structopt(long = "allow")] @@ -259,13 +258,13 @@ pub struct WebsiteOpt { pub error_document: Option, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct BucketOpt { /// Bucket name pub name: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct DeleteBucketOpt { /// Bucket name pub name: String, @@ -275,7 +274,7 @@ pub struct DeleteBucketOpt { pub yes: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct AliasBucketOpt { /// Existing bucket name (its alias in global namespace or its full hex uuid) pub existing_bucket: String, @@ -288,7 +287,7 @@ pub struct AliasBucketOpt { pub local: Option, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct UnaliasBucketOpt { /// Bucket name pub name: String, @@ -298,7 +297,7 @@ pub struct UnaliasBucketOpt { pub local: Option, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct PermBucketOpt { /// Access key name or ID #[structopt(long = "key")] @@ -321,7 +320,7 @@ pub struct PermBucketOpt { pub bucket: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct SetQuotasOpt { /// Bucket name pub bucket: String, @@ -336,7 +335,7 @@ pub struct SetQuotasOpt { pub max_objects: Option, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct CleanupIncompleteUploadsOpt { /// Abort multipart uploads older than this value #[structopt(long = "older-than", default_value = "1d")] @@ -347,7 +346,7 @@ pub struct CleanupIncompleteUploadsOpt { pub buckets: Vec, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub enum KeyOperation { /// List keys #[structopt(name = "list", version = garage_version())] @@ -382,7 +381,7 @@ pub enum KeyOperation { Import(KeyImportOpt), } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyInfoOpt { /// ID or name of the key pub key_pattern: String, @@ -391,14 +390,14 @@ pub struct KeyInfoOpt { pub show_secret: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyNewOpt { /// Name of the key #[structopt(default_value = "Unnamed key")] pub name: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyRenameOpt { /// ID or name of the key pub key_pattern: String, @@ -407,7 +406,7 @@ pub struct KeyRenameOpt { pub new_name: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyDeleteOpt { /// ID or name of the key pub key_pattern: String, @@ -417,7 +416,7 @@ pub struct KeyDeleteOpt { pub yes: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyPermOpt { /// ID or name of the key pub key_pattern: String, @@ -427,7 +426,7 @@ pub struct KeyPermOpt { pub create_bucket: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyImportOpt { /// Access key ID pub key_id: String, @@ -444,7 +443,7 @@ pub struct KeyImportOpt { pub yes: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +#[derive(StructOpt, Debug, Clone)] pub struct RepairOpt { /// Launch repair operation on all nodes #[structopt(short = "a", long = "all-nodes")] @@ -458,7 +457,7 @@ pub struct RepairOpt { pub what: RepairWhat, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum RepairWhat { /// Do a full sync of metadata tables #[structopt(name = "tables", version = garage_version())] @@ -489,7 +488,7 @@ pub enum RepairWhat { Rebalance, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum ScrubCmd { /// Start scrub #[structopt(name = "start", version = garage_version())] @@ -503,15 +502,9 @@ pub enum ScrubCmd { /// Cancel scrub in progress #[structopt(name = "cancel", version = garage_version())] Cancel, - /// Set tranquility level for in-progress and future scrubs - #[structopt(name = "set-tranquility", version = garage_version())] - SetTranquility { - #[structopt()] - tranquility: u32, - }, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +#[derive(StructOpt, Debug, Clone)] pub struct OfflineRepairOpt { /// Confirm the launch of the repair operation #[structopt(long = "yes")] @@ -521,7 +514,7 @@ pub struct OfflineRepairOpt { pub what: OfflineRepairWhat, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum OfflineRepairWhat { /// Repair K2V item counters #[cfg(feature = "k2v")] @@ -532,19 +525,14 @@ pub enum OfflineRepairWhat { ObjectCounters, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +#[derive(StructOpt, Debug, Clone)] pub struct StatsOpt { /// Gather statistics from all nodes #[structopt(short = "a", long = "all-nodes")] pub all_nodes: bool, - - /// Don't show global cluster stats (internal use in RPC) - #[structopt(skip)] - #[serde(default)] - pub skip_global: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum WorkerOperation { /// List all workers on Garage node #[structopt(name = "list", version = garage_version())] @@ -577,7 +565,7 @@ pub enum WorkerOperation { }, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone, Copy)] pub struct WorkerListOpt { /// Show only busy workers #[structopt(short = "b", long = "busy")] @@ -587,7 +575,7 @@ pub struct WorkerListOpt { pub errors: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum BlockOperation { /// List all blocks that currently have a resync error #[structopt(name = "list-errors", version = garage_version())] @@ -619,7 +607,7 @@ pub enum BlockOperation { }, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone, Copy)] pub enum MetaOperation { /// Save a snapshot of the metadata db file #[structopt(name = "snapshot", version = garage_version())] diff --git a/src/garage/cli_v2/mod.rs b/src/garage/cli_v2/mod.rs index dccdc295..28c7c824 100644 --- a/src/garage/cli_v2/mod.rs +++ b/src/garage/cli_v2/mod.rs @@ -20,14 +20,10 @@ use garage_api_admin::api::*; use garage_api_admin::api_server::{AdminRpc as ProxyRpc, AdminRpcResponse as ProxyRpcResponse}; use garage_api_admin::RequestHandler; -use crate::admin::*; -use crate::cli as cli_v1; use crate::cli::structs::*; -use crate::cli::Command; pub struct Cli { pub system_rpc_endpoint: Arc>, - pub admin_rpc_endpoint: Arc>, pub proxy_rpc_endpoint: Arc>, pub rpc_host: NodeID, } @@ -46,15 +42,7 @@ impl Cli { Command::Block(bo) => self.cmd_block(bo).await, Command::Meta(mo) => self.cmd_meta(mo).await, Command::Stats(so) => self.cmd_stats(so).await, - - // TODO - Command::Repair(ro) => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::LaunchRepair(ro), - ) - .await - .ok_or_message("cli_v1"), + Command::Repair(ro) => self.cmd_repair(ro).await, _ => unreachable!(), } diff --git a/src/garage/cli_v2/node.rs b/src/garage/cli_v2/node.rs index b1915dc4..c5d0cdea 100644 --- a/src/garage/cli_v2/node.rs +++ b/src/garage/cli_v2/node.rs @@ -27,7 +27,7 @@ impl Cli { table.push(format!("{:.16}\tError: {}", node, err)); } for (node, _) in res.success.iter() { - table.push(format!("{:.16}\tOk", node)); + table.push(format!("{:.16}\tSnapshot created", node)); } format_table(table); @@ -64,4 +64,50 @@ impl Cli { Ok(()) } + + pub async fn cmd_repair(&self, cmd: RepairOpt) -> Result<(), Error> { + if !cmd.yes { + return Err(Error::Message( + "Please add --yes to start the repair operation".into(), + )); + } + + let repair_type = match cmd.what { + RepairWhat::Tables => RepairType::Tables, + RepairWhat::Blocks => RepairType::Blocks, + RepairWhat::Versions => RepairType::Versions, + RepairWhat::MultipartUploads => RepairType::MultipartUploads, + RepairWhat::BlockRefs => RepairType::BlockRefs, + RepairWhat::BlockRc => RepairType::BlockRc, + RepairWhat::Rebalance => RepairType::Rebalance, + RepairWhat::Scrub { cmd } => RepairType::Scrub(match cmd { + ScrubCmd::Start => ScrubCommand::Start, + ScrubCmd::Cancel => ScrubCommand::Cancel, + ScrubCmd::Pause => ScrubCommand::Pause, + ScrubCmd::Resume => ScrubCommand::Resume, + }), + }; + + let res = self + .api_request(LaunchRepairOperationRequest { + node: if cmd.all_nodes { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalLaunchRepairOperationRequest { repair_type }, + }) + .await?; + + let mut table = vec![]; + for (node, err) in res.error.iter() { + table.push(format!("{:.16}\tError: {}", node, err)); + } + for (node, _) in res.success.iter() { + table.push(format!("{:.16}\tRepair launched", node)); + } + format_table(table); + + Ok(()) + } } diff --git a/src/garage/main.rs b/src/garage/main.rs index 022841f5..2a88d760 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -4,10 +4,8 @@ #[macro_use] extern crate tracing; -mod admin; mod cli; mod cli_v2; -mod repair; mod secrets; mod server; #[cfg(feature = "telemetry-otlp")] @@ -37,8 +35,7 @@ use garage_rpc::*; use garage_api_admin::api_server::{AdminRpc as ProxyRpc, ADMIN_RPC_PATH as PROXY_RPC_PATH}; -use admin::*; -use cli::*; +use cli::structs::*; use secrets::Secrets; #[derive(StructOpt, Debug)] @@ -146,13 +143,13 @@ async fn main() { let res = match opt.cmd { Command::Server => server::run_server(opt.config_file, opt.secrets).await, Command::OfflineRepair(repair_opt) => { - repair::offline::offline_repair(opt.config_file, opt.secrets, repair_opt).await + cli::repair::offline_repair(opt.config_file, opt.secrets, repair_opt).await } Command::ConvertDb(conv_opt) => { cli::convert_db::do_conversion(conv_opt).map_err(From::from) } Command::Node(NodeOperation::NodeId(node_id_opt)) => { - node_id_command(opt.config_file, node_id_opt.quiet) + cli::init::node_id_command(opt.config_file, node_id_opt.quiet) } _ => cli_command(opt).await, }; @@ -253,7 +250,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { (id, addrs[0], false) } else { let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir) - .err_context(READ_KEY_ERROR)?; + .err_context(cli::init::READ_KEY_ERROR)?; if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr.as_ref()) { use std::net::ToSocketAddrs; let a = a @@ -283,12 +280,10 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { } let system_rpc_endpoint = netapp.endpoint::(SYSTEM_RPC_PATH.into()); - let admin_rpc_endpoint = netapp.endpoint::(ADMIN_RPC_PATH.into()); let proxy_rpc_endpoint = netapp.endpoint::(PROXY_RPC_PATH.into()); let cli = cli_v2::Cli { system_rpc_endpoint, - admin_rpc_endpoint, proxy_rpc_endpoint, rpc_host: id, }; diff --git a/src/garage/repair/mod.rs b/src/garage/repair/mod.rs deleted file mode 100644 index 4699ace5..00000000 --- a/src/garage/repair/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod offline; -pub mod online; diff --git a/src/garage/server.rs b/src/garage/server.rs index e629041c..131cc8aa 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -14,7 +14,6 @@ use garage_web::WebServer; #[cfg(feature = "k2v")] use garage_api_k2v::api_server::K2VApiServer; -use crate::admin::*; use crate::secrets::{fill_secrets, Secrets}; #[cfg(feature = "telemetry-otlp")] use crate::tracing_setup::*; @@ -74,9 +73,6 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er info!("Launching internal Garage cluster communications..."); let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone())); - info!("Create admin RPC handler..."); - AdminRpcHandler::new(garage.clone(), background.clone()); - // ---- Launch public-facing API servers ---- let mut servers = vec![];