forked from Deuxfleurs/garage
Implement repair command
This commit is contained in:
parent
a54f3158f1
commit
5ae32972ef
4 changed files with 152 additions and 11 deletions
|
@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::data::*;
|
use crate::data::*;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
|
use crate::rpc_client::*;
|
||||||
use crate::rpc_server::*;
|
use crate::rpc_server::*;
|
||||||
use crate::server::Garage;
|
use crate::server::Garage;
|
||||||
use crate::table::*;
|
use crate::table::*;
|
||||||
|
@ -11,11 +12,13 @@ use crate::*;
|
||||||
|
|
||||||
use crate::bucket_table::*;
|
use crate::bucket_table::*;
|
||||||
|
|
||||||
|
pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30);
|
||||||
pub const ADMIN_RPC_PATH: &str = "_admin";
|
pub const ADMIN_RPC_PATH: &str = "_admin";
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub enum AdminRPC {
|
pub enum AdminRPC {
|
||||||
BucketOperation(BucketOperation),
|
BucketOperation(BucketOperation),
|
||||||
|
LaunchRepair(bool),
|
||||||
|
|
||||||
// Replies
|
// Replies
|
||||||
Ok(String),
|
Ok(String),
|
||||||
|
@ -27,11 +30,13 @@ impl RpcMessage for AdminRPC {}
|
||||||
|
|
||||||
pub struct AdminRpcHandler {
|
pub struct AdminRpcHandler {
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
|
rpc_client: Arc<RpcClient<AdminRPC>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AdminRpcHandler {
|
impl AdminRpcHandler {
|
||||||
pub fn new(garage: Arc<Garage>) -> Arc<Self> {
|
pub fn new(garage: Arc<Garage>) -> Arc<Self> {
|
||||||
Arc::new(Self { garage })
|
let rpc_client = garage.system.clone().rpc_client::<AdminRPC>(ADMIN_RPC_PATH);
|
||||||
|
Arc::new(Self { garage, rpc_client })
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer) {
|
pub fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer) {
|
||||||
|
@ -40,6 +45,9 @@ impl AdminRpcHandler {
|
||||||
async move {
|
async move {
|
||||||
match msg {
|
match msg {
|
||||||
AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await,
|
AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await,
|
||||||
|
AdminRPC::LaunchRepair(repair_all) => {
|
||||||
|
self2.handle_launch_repair(repair_all).await
|
||||||
|
}
|
||||||
_ => Err(Error::Message(format!("Invalid RPC"))),
|
_ => Err(Error::Message(format!("Invalid RPC"))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -143,4 +151,35 @@ impl AdminRpcHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_launch_repair(&self, repair_all: bool) -> Result<AdminRPC, Error> {
|
||||||
|
if repair_all {
|
||||||
|
let mut failures = vec![];
|
||||||
|
let ring = self.garage.system.ring.borrow().clone();
|
||||||
|
for node in ring.config.members.keys() {
|
||||||
|
if self
|
||||||
|
.rpc_client
|
||||||
|
.call(node, AdminRPC::LaunchRepair(false), ADMIN_RPC_TIMEOUT)
|
||||||
|
.await
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
failures.push(node.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if failures.is_empty() {
|
||||||
|
Ok(AdminRPC::Ok(format!("Repair launched on all nodes")))
|
||||||
|
} else {
|
||||||
|
Err(Error::Message(format!(
|
||||||
|
"Could not launch repair on nodes: {:?} (launched successfully on other nodes)",
|
||||||
|
failures
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.garage.block_manager.launch_repair().await?;
|
||||||
|
Ok(AdminRPC::Ok(format!(
|
||||||
|
"Repair launched on {:?}",
|
||||||
|
self.garage.system.id
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
84
src/block.rs
84
src/block.rs
|
@ -17,6 +17,7 @@ use crate::membership::System;
|
||||||
use crate::rpc_client::*;
|
use crate::rpc_client::*;
|
||||||
use crate::rpc_server::*;
|
use crate::rpc_server::*;
|
||||||
|
|
||||||
|
use crate::block_ref_table::*;
|
||||||
use crate::server::Garage;
|
use crate::server::Garage;
|
||||||
|
|
||||||
pub const INLINE_THRESHOLD: usize = 3072;
|
pub const INLINE_THRESHOLD: usize = 3072;
|
||||||
|
@ -356,6 +357,89 @@ impl BlockManager {
|
||||||
.await?;
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn launch_repair(self: &Arc<Self>) -> Result<(), Error> {
|
||||||
|
let self2 = self.clone();
|
||||||
|
self.system
|
||||||
|
.background
|
||||||
|
.spawn_worker(move |must_exit| async move { self2.repair_worker(must_exit).await })
|
||||||
|
.await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn repair_worker(
|
||||||
|
self: Arc<Self>,
|
||||||
|
must_exit: watch::Receiver<bool>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
// 1. Repair blocks from RC table
|
||||||
|
let garage = self.garage.load_full().unwrap();
|
||||||
|
let mut last_hash = None;
|
||||||
|
let mut i = 0usize;
|
||||||
|
for entry in garage.block_ref_table.store.iter() {
|
||||||
|
let (_k, v_bytes) = entry?;
|
||||||
|
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?;
|
||||||
|
if Some(&block_ref.block) == last_hash.as_ref() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if !block_ref.deleted {
|
||||||
|
last_hash = Some(block_ref.block.clone());
|
||||||
|
self.put_to_resync(&block_ref.block, 0)?;
|
||||||
|
}
|
||||||
|
i += 1;
|
||||||
|
if i & 0xFF == 0 && *must_exit.borrow() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Repair blocks actually on disk
|
||||||
|
let mut ls_data_dir = fs::read_dir(&self.data_dir).await?;
|
||||||
|
while let Some(data_dir_ent) = ls_data_dir.next().await {
|
||||||
|
let data_dir_ent = data_dir_ent?;
|
||||||
|
let dir_name = data_dir_ent.file_name();
|
||||||
|
let dir_name = match dir_name.into_string() {
|
||||||
|
Ok(x) => x,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
if dir_name.len() != 2 || hex::decode(&dir_name).is_err() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut ls_data_dir_2 = match fs::read_dir(data_dir_ent.path()).await {
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!(
|
||||||
|
"Warning: could not list dir {:?}: {}",
|
||||||
|
data_dir_ent.path().to_str(),
|
||||||
|
e
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Ok(x) => x,
|
||||||
|
};
|
||||||
|
while let Some(file) = ls_data_dir_2.next().await {
|
||||||
|
let file = file?;
|
||||||
|
let file_name = file.file_name();
|
||||||
|
let file_name = match file_name.into_string() {
|
||||||
|
Ok(x) => x,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
if file_name.len() != 64 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let hash_bytes = match hex::decode(&file_name) {
|
||||||
|
Ok(h) => h,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
let mut hash = [0u8; 32];
|
||||||
|
hash.copy_from_slice(&hash_bytes[..]);
|
||||||
|
self.put_to_resync(&hash.into(), 0)?;
|
||||||
|
|
||||||
|
if *must_exit.borrow() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn u64_from_bytes(bytes: &[u8]) -> u64 {
|
fn u64_from_bytes(bytes: &[u8]) -> u64 {
|
||||||
|
|
32
src/main.rs
32
src/main.rs
|
@ -39,8 +39,6 @@ use server::TlsConfig;
|
||||||
|
|
||||||
use admin_rpc::*;
|
use admin_rpc::*;
|
||||||
|
|
||||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
|
|
||||||
|
|
||||||
#[derive(StructOpt, Debug)]
|
#[derive(StructOpt, Debug)]
|
||||||
#[structopt(name = "garage")]
|
#[structopt(name = "garage")]
|
||||||
pub struct Opt {
|
pub struct Opt {
|
||||||
|
@ -76,6 +74,10 @@ pub enum Command {
|
||||||
/// Bucket operations
|
/// Bucket operations
|
||||||
#[structopt(name = "bucket")]
|
#[structopt(name = "bucket")]
|
||||||
Bucket(BucketOperation),
|
Bucket(BucketOperation),
|
||||||
|
|
||||||
|
/// Start repair of node data
|
||||||
|
#[structopt(name = "repair")]
|
||||||
|
Repair(RepairOpt),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(StructOpt, Debug)]
|
#[derive(StructOpt, Debug)]
|
||||||
|
@ -179,6 +181,13 @@ pub struct PermBucketOpt {
|
||||||
pub bucket: String,
|
pub bucket: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, StructOpt, Debug)]
|
||||||
|
pub struct RepairOpt {
|
||||||
|
/// Launch repair operation on all nodes
|
||||||
|
#[structopt(long = "all")]
|
||||||
|
pub all: bool,
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
let opt = Opt::from_args();
|
let opt = Opt::from_args();
|
||||||
|
@ -222,6 +231,9 @@ async fn main() {
|
||||||
Command::Bucket(bo) => {
|
Command::Bucket(bo) => {
|
||||||
cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::BucketOperation(bo)).await
|
cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::BucketOperation(bo)).await
|
||||||
}
|
}
|
||||||
|
Command::Repair(ro) => {
|
||||||
|
cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::LaunchRepair(ro.all)).await
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Err(e) = resp {
|
if let Err(e) = resp {
|
||||||
|
@ -231,14 +243,14 @@ async fn main() {
|
||||||
|
|
||||||
async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Result<(), Error> {
|
async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Result<(), Error> {
|
||||||
let status = match rpc_cli
|
let status = match rpc_cli
|
||||||
.call(&rpc_host, &Message::PullStatus, DEFAULT_TIMEOUT)
|
.call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
Message::AdvertiseNodesUp(nodes) => nodes,
|
Message::AdvertiseNodesUp(nodes) => nodes,
|
||||||
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
|
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
|
||||||
};
|
};
|
||||||
let config = match rpc_cli
|
let config = match rpc_cli
|
||||||
.call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT)
|
.call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
Message::AdvertiseConfig(cfg) => cfg,
|
Message::AdvertiseConfig(cfg) => cfg,
|
||||||
|
@ -290,7 +302,7 @@ async fn cmd_configure(
|
||||||
args: ConfigureNodeOpt,
|
args: ConfigureNodeOpt,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let status = match rpc_cli
|
let status = match rpc_cli
|
||||||
.call(&rpc_host, &Message::PullStatus, DEFAULT_TIMEOUT)
|
.call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
Message::AdvertiseNodesUp(nodes) => nodes,
|
Message::AdvertiseNodesUp(nodes) => nodes,
|
||||||
|
@ -311,7 +323,7 @@ async fn cmd_configure(
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut config = match rpc_cli
|
let mut config = match rpc_cli
|
||||||
.call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT)
|
.call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
Message::AdvertiseConfig(cfg) => cfg,
|
Message::AdvertiseConfig(cfg) => cfg,
|
||||||
|
@ -331,7 +343,7 @@ async fn cmd_configure(
|
||||||
.call(
|
.call(
|
||||||
&rpc_host,
|
&rpc_host,
|
||||||
&Message::AdvertiseConfig(config),
|
&Message::AdvertiseConfig(config),
|
||||||
DEFAULT_TIMEOUT,
|
ADMIN_RPC_TIMEOUT,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -343,7 +355,7 @@ async fn cmd_remove(
|
||||||
args: RemoveNodeOpt,
|
args: RemoveNodeOpt,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let mut config = match rpc_cli
|
let mut config = match rpc_cli
|
||||||
.call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT)
|
.call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
Message::AdvertiseConfig(cfg) => cfg,
|
Message::AdvertiseConfig(cfg) => cfg,
|
||||||
|
@ -377,7 +389,7 @@ async fn cmd_remove(
|
||||||
.call(
|
.call(
|
||||||
&rpc_host,
|
&rpc_host,
|
||||||
&Message::AdvertiseConfig(config),
|
&Message::AdvertiseConfig(config),
|
||||||
DEFAULT_TIMEOUT,
|
ADMIN_RPC_TIMEOUT,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -388,7 +400,7 @@ async fn cmd_admin(
|
||||||
rpc_host: SocketAddr,
|
rpc_host: SocketAddr,
|
||||||
args: AdminRPC,
|
args: AdminRPC,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
match rpc_cli.call(&rpc_host, args, DEFAULT_TIMEOUT).await? {
|
match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await? {
|
||||||
AdminRPC::Ok(msg) => {
|
AdminRPC::Ok(msg) => {
|
||||||
println!("{}", msg);
|
println!("{}", msg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -220,11 +220,17 @@ where
|
||||||
})
|
})
|
||||||
.collect::<FuturesUnordered<_>>();
|
.collect::<FuturesUnordered<_>>();
|
||||||
|
|
||||||
|
let mut n_errors = 0;
|
||||||
while let Some(r) = sync_futures.next().await {
|
while let Some(r) = sync_futures.next().await {
|
||||||
if let Err(e) = r {
|
if let Err(e) = r {
|
||||||
|
n_errors += 1;
|
||||||
eprintln!("({}) Sync error: {}", self.table.name, e);
|
eprintln!("({}) Sync error: {}", self.table.name, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if n_errors > self.table.replication.max_write_errors() {
|
||||||
|
return Err(Error::Message(format!("Sync failed with too many nodes.")));
|
||||||
|
}
|
||||||
|
|
||||||
if !partition.retain {
|
if !partition.retain {
|
||||||
self.table
|
self.table
|
||||||
.delete_range(&partition.begin, &partition.end)
|
.delete_range(&partition.begin, &partition.end)
|
||||||
|
|
Loading…
Reference in a new issue