Implement repair object->version and version->block ref

This commit is contained in:
Alex 2020-04-19 21:27:08 +00:00
parent 04acaea231
commit ec7f9f07e2
3 changed files with 111 additions and 17 deletions

2
TODO
View file

@ -7,8 +7,6 @@ How are we going to test that our replication method works correctly?
We will have to introduce lots of dummy data and then add/remove nodes many times. We will have to introduce lots of dummy data and then add/remove nodes many times.
Repair: Repair:
- re-propagate object table version deletions to version table
- re-propagate version table deletion to block ref table
- re-propagate block ref table to rc - re-propagate block ref table to rc

View file

@ -1,6 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use crate::data::*; use crate::data::*;
use crate::error::Error; use crate::error::Error;
@ -10,7 +11,9 @@ use crate::server::Garage;
use crate::table::*; use crate::table::*;
use crate::*; use crate::*;
use crate::block_ref_table::*;
use crate::bucket_table::*; use crate::bucket_table::*;
use crate::version_table::*;
pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30); pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30);
pub const ADMIN_RPC_PATH: &str = "_admin"; pub const ADMIN_RPC_PATH: &str = "_admin";
@ -152,7 +155,7 @@ impl AdminRpcHandler {
} }
} }
async fn handle_launch_repair(&self, repair_all: bool) -> Result<AdminRPC, Error> { async fn handle_launch_repair(self: &Arc<Self>, repair_all: bool) -> Result<AdminRPC, Error> {
if repair_all { if repair_all {
let mut failures = vec![]; let mut failures = vec![];
let ring = self.garage.system.ring.borrow().clone(); let ring = self.garage.system.ring.borrow().clone();
@ -175,11 +178,116 @@ impl AdminRpcHandler {
))) )))
} }
} else { } else {
self.garage.block_manager.launch_repair().await?; let self2 = self.clone();
self.garage
.system
.background
.spawn_worker(move |must_exit| async move { self2.repair_worker(must_exit).await })
.await;
Ok(AdminRPC::Ok(format!( Ok(AdminRPC::Ok(format!(
"Repair launched on {:?}", "Repair launched on {:?}",
self.garage.system.id self.garage.system.id
))) )))
} }
} }
async fn repair_worker(self: Arc<Self>, must_exit: watch::Receiver<bool>) -> Result<(), Error> {
self.repair_versions(&must_exit).await?;
self.repair_block_ref(&must_exit).await?;
self.repair_rc(&must_exit).await?;
self.garage
.block_manager
.repair_data_store(&must_exit)
.await?;
Ok(())
}
async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
while let Some((item_key, item_bytes)) = self.garage.version_table.store.get_gt(&pos)? {
pos = item_key.to_vec();
let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?;
if version.deleted {
continue;
}
let object = self
.garage
.object_table
.get(&version.bucket, &version.key)
.await?;
let version_exists = match object {
Some(o) => o.versions.iter().any(|x| x.uuid == version.uuid),
None => {
eprintln!("No object entry found for version {:?}", version);
false
}
};
if !version_exists {
eprintln!("Marking deleted version: {:?}", version);
self.garage
.version_table
.insert(&Version {
uuid: version.uuid,
deleted: true,
blocks: vec![],
bucket: version.bucket,
key: version.key,
})
.await?;
}
if *must_exit.borrow() {
break;
}
}
Ok(())
}
async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
while let Some((item_key, item_bytes)) = self.garage.block_ref_table.store.get_gt(&pos)? {
pos = item_key.to_vec();
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?;
if block_ref.deleted {
continue;
}
let version = self
.garage
.version_table
.get(&block_ref.version, &EmptyKey)
.await?;
let ref_exists = match version {
Some(v) => !v.deleted,
None => {
eprintln!("No version found for block ref {:?}", block_ref);
false
}
};
if !ref_exists {
eprintln!("Marking deleted block_ref: {:?}", block_ref);
self.garage
.block_ref_table
.insert(&BlockRef {
block: block_ref.block,
version: block_ref.version,
deleted: true,
})
.await?;
}
if *must_exit.borrow() {
break;
}
}
Ok(())
}
async fn repair_rc(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
// TODO
Ok(())
}
} }

View file

@ -358,19 +358,7 @@ impl BlockManager {
Ok(()) Ok(())
} }
pub async fn launch_repair(self: &Arc<Self>) -> Result<(), Error> { pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let self2 = self.clone();
self.system
.background
.spawn_worker(move |must_exit| async move { self2.repair_worker(must_exit).await })
.await;
Ok(())
}
pub async fn repair_worker(
self: Arc<Self>,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
// 1. Repair blocks from RC table // 1. Repair blocks from RC table
let garage = self.garage.load_full().unwrap(); let garage = self.garage.load_full().unwrap();
let mut last_hash = None; let mut last_hash = None;