From ec7f9f07e21ca2771d348b84c6cb14eda64e501c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sun, 19 Apr 2020 21:27:08 +0000 Subject: [PATCH] Implement repair object->version and version->block ref --- TODO | 2 - src/admin_rpc.rs | 112 ++++++++++++++++++++++++++++++++++++++++++++++- src/block.rs | 14 +----- 3 files changed, 111 insertions(+), 17 deletions(-) diff --git a/TODO b/TODO index 1372496a..1b5f466d 100644 --- a/TODO +++ b/TODO @@ -7,8 +7,6 @@ How are we going to test that our replication method works correctly? We will have to introduce lots of dummy data and then add/remove nodes many times. Repair: -- re-propagate object table version deletions to version table -- re-propagate version table deletion to block ref table - re-propagate block ref table to rc diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs index 8e278dbe..0318e799 100644 --- a/src/admin_rpc.rs +++ b/src/admin_rpc.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; +use tokio::sync::watch; use crate::data::*; use crate::error::Error; @@ -10,7 +11,9 @@ use crate::server::Garage; use crate::table::*; use crate::*; +use crate::block_ref_table::*; use crate::bucket_table::*; +use crate::version_table::*; pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30); pub const ADMIN_RPC_PATH: &str = "_admin"; @@ -152,7 +155,7 @@ impl AdminRpcHandler { } } - async fn handle_launch_repair(&self, repair_all: bool) -> Result { + async fn handle_launch_repair(self: &Arc, repair_all: bool) -> Result { if repair_all { let mut failures = vec![]; let ring = self.garage.system.ring.borrow().clone(); @@ -175,11 +178,116 @@ impl AdminRpcHandler { ))) } } else { - self.garage.block_manager.launch_repair().await?; + let self2 = self.clone(); + self.garage + .system + .background + .spawn_worker(move |must_exit| async move { self2.repair_worker(must_exit).await }) + .await; Ok(AdminRPC::Ok(format!( "Repair launched on {:?}", self.garage.system.id ))) } } + + async fn repair_worker(self: Arc, must_exit: watch::Receiver) -> Result<(), Error> { + self.repair_versions(&must_exit).await?; + self.repair_block_ref(&must_exit).await?; + self.repair_rc(&must_exit).await?; + self.garage + .block_manager + .repair_data_store(&must_exit) + .await?; + Ok(()) + } + + async fn repair_versions(&self, must_exit: &watch::Receiver) -> Result<(), Error> { + let mut pos = vec![]; + + while let Some((item_key, item_bytes)) = self.garage.version_table.store.get_gt(&pos)? { + pos = item_key.to_vec(); + + let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?; + if version.deleted { + continue; + } + let object = self + .garage + .object_table + .get(&version.bucket, &version.key) + .await?; + let version_exists = match object { + Some(o) => o.versions.iter().any(|x| x.uuid == version.uuid), + None => { + eprintln!("No object entry found for version {:?}", version); + false + } + }; + if !version_exists { + eprintln!("Marking deleted version: {:?}", version); + self.garage + .version_table + .insert(&Version { + uuid: version.uuid, + deleted: true, + blocks: vec![], + bucket: version.bucket, + key: version.key, + }) + .await?; + } + + if *must_exit.borrow() { + break; + } + } + Ok(()) + } + + async fn repair_block_ref(&self, must_exit: &watch::Receiver) -> Result<(), Error> { + let mut pos = vec![]; + + while let Some((item_key, item_bytes)) = self.garage.block_ref_table.store.get_gt(&pos)? { + pos = item_key.to_vec(); + + let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?; + if block_ref.deleted { + continue; + } + let version = self + .garage + .version_table + .get(&block_ref.version, &EmptyKey) + .await?; + let ref_exists = match version { + Some(v) => !v.deleted, + None => { + eprintln!("No version found for block ref {:?}", block_ref); + false + } + }; + if !ref_exists { + eprintln!("Marking deleted block_ref: {:?}", block_ref); + self.garage + .block_ref_table + .insert(&BlockRef { + block: block_ref.block, + version: block_ref.version, + deleted: true, + }) + .await?; + } + + if *must_exit.borrow() { + break; + } + } + Ok(()) + } + + async fn repair_rc(&self, must_exit: &watch::Receiver) -> Result<(), Error> { + // TODO + Ok(()) + } } diff --git a/src/block.rs b/src/block.rs index 4ad74d76..11818fd8 100644 --- a/src/block.rs +++ b/src/block.rs @@ -358,19 +358,7 @@ impl BlockManager { Ok(()) } - pub async fn launch_repair(self: &Arc) -> Result<(), Error> { - let self2 = self.clone(); - self.system - .background - .spawn_worker(move |must_exit| async move { self2.repair_worker(must_exit).await }) - .await; - Ok(()) - } - - pub async fn repair_worker( - self: Arc, - must_exit: watch::Receiver, - ) -> Result<(), Error> { + pub async fn repair_data_store(&self, must_exit: &watch::Receiver) -> Result<(), Error> { // 1. Repair blocks from RC table let garage = self.garage.load_full().unwrap(); let mut last_hash = None;