Improve table sync

This commit is contained in:
Alex 2020-04-21 16:05:55 +00:00
parent 0226561035
commit 2a84d965ab
2 changed files with 173 additions and 87 deletions

View file

@ -194,6 +194,37 @@ impl AdminRpcHandler {
} }
async fn repair_worker(self: Arc<Self>, must_exit: watch::Receiver<bool>) -> Result<(), Error> { async fn repair_worker(self: Arc<Self>, must_exit: watch::Receiver<bool>) -> Result<(), Error> {
self.garage
.bucket_table
.syncer
.load_full()
.unwrap()
.add_full_scan()
.await;
self.garage
.object_table
.syncer
.load_full()
.unwrap()
.add_full_scan()
.await;
self.garage
.version_table
.syncer
.load_full()
.unwrap()
.add_full_scan()
.await;
self.garage
.block_ref_table
.syncer
.load_full()
.unwrap()
.add_full_scan()
.await;
// TODO: wait for full sync to finish before proceeding to the rest?
self.repair_versions(&must_exit).await?; self.repair_versions(&must_exit).await?;
self.repair_block_ref(&must_exit).await?; self.repair_block_ref(&must_exit).await?;
self.repair_rc(&must_exit).await?; self.repair_rc(&must_exit).await?;
@ -297,8 +328,9 @@ impl AdminRpcHandler {
Ok(()) Ok(())
} }
async fn repair_rc(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { async fn repair_rc(&self, _must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
// TODO // TODO
warn!("repair_rc: not implemented");
Ok(()) Ok(())
} }
} }

View file

@ -18,14 +18,14 @@ use crate::membership::Ring;
use crate::table::*; use crate::table::*;
const MAX_DEPTH: usize = 16; const MAX_DEPTH: usize = 16;
const SCAN_INTERVAL: Duration = Duration::from_secs(60); const SCAN_INTERVAL: Duration = Duration::from_secs(1800);
const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800); const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800);
const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(10); const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
pub struct TableSyncer<F: TableSchema, R: TableReplication> { pub struct TableSyncer<F: TableSchema, R: TableReplication> {
table: Arc<Table<F, R>>, table: Arc<Table<F, R>>,
todo: Mutex<SyncTodo>, todo: Mutex<SyncTodo>,
cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksum>>>, cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksumCache>>>,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
@ -47,6 +47,15 @@ struct TodoPartition {
retain: bool, retain: bool,
} }
// A SyncRange defines a query on the dataset stored by a node, in the following way:
// - all items whose key are >= `begin`
// - stopping at the first item whose key hash has at least `level` leading zero bytes (excluded)
// - except if the first item of the range has such many leading zero bytes
// - and stopping at `end` (excluded) if such an item is not found
// The checksum itself does not store all of the items in the database, only the hashes of the "sub-ranges"
// i.e. of ranges of level `level-1` that cover the same range
// (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin)
// See RangeChecksum for the struct that stores this information.
#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] #[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
pub struct SyncRange { pub struct SyncRange {
begin: Vec<u8>, begin: Vec<u8>,
@ -61,7 +70,10 @@ impl std::cmp::PartialOrd for SyncRange {
} }
impl std::cmp::Ord for SyncRange { impl std::cmp::Ord for SyncRange {
fn cmp(&self, other: &Self) -> std::cmp::Ordering { fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.begin.cmp(&other.begin) self.begin
.cmp(&other.begin)
.then(self.level.cmp(&other.level))
.then(self.end.cmp(&other.end))
} }
} }
@ -75,6 +87,13 @@ pub struct RangeChecksum {
time: Instant, time: Instant,
} }
#[derive(Debug, Clone)]
pub struct RangeChecksumCache {
hash: Option<Hash>, // None if no children
found_limit: Option<Vec<u8>>,
time: Instant,
}
impl<F, R> TableSyncer<F, R> impl<F, R> TableSyncer<F, R>
where where
F: TableSchema + 'static, F: TableSchema + 'static,
@ -159,7 +178,7 @@ where
if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) { if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) {
nothing_to_do_since = None; nothing_to_do_since = None;
debug!("({}) Adding full scan to syncer todo list", self.table.name); debug!("({}) Adding full scan to syncer todo list", self.table.name);
self.todo.lock().await.add_full_scan(&self.table); self.add_full_scan().await;
} }
} }
} }
@ -167,6 +186,10 @@ where
Ok(()) Ok(())
} }
pub async fn add_full_scan(&self) {
self.todo.lock().await.add_full_scan(&self.table);
}
async fn syncer_task( async fn syncer_task(
self: Arc<Self>, self: Arc<Self>,
mut must_exit: watch::Receiver<bool>, mut must_exit: watch::Receiver<bool>,
@ -273,47 +296,17 @@ where
} }
} }
Err(Error::Message(format!( Err(Error::Message(format!(
"Unable to compute root checksum (this should never happen" "Unable to compute root checksum (this should never happen)"
))) )))
} }
fn range_checksum<'a>( async fn range_checksum(
self: &'a Arc<Self>,
range: &'a SyncRange,
must_exit: &'a mut watch::Receiver<bool>,
) -> BoxFuture<'a, Result<RangeChecksum, Error>> {
async move {
let mut cache = self.cache[range.level].lock().await;
if let Some(v) = cache.get(&range) {
if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT {
return Ok(v.clone());
}
}
cache.remove(&range);
drop(cache);
let v = self.range_checksum_inner(&range, must_exit).await?;
trace!(
"({}) New checksum calculated for {}-{}/{}, {} children",
self.table.name,
hex::encode(&range.begin[..]),
hex::encode(&range.end[..]),
range.level,
v.children.len()
);
let mut cache = self.cache[range.level].lock().await;
cache.insert(range.clone(), v.clone());
Ok(v)
}
.boxed()
}
async fn range_checksum_inner(
self: &Arc<Self>, self: &Arc<Self>,
range: &SyncRange, range: &SyncRange,
must_exit: &mut watch::Receiver<bool>, must_exit: &mut watch::Receiver<bool>,
) -> Result<RangeChecksum, Error> { ) -> Result<RangeChecksum, Error> {
assert!(range.level != 0);
if range.level == 1 { if range.level == 1 {
let mut children = vec![]; let mut children = vec![];
for item in self for item in self
@ -323,7 +316,10 @@ where
{ {
let (key, value) = item?; let (key, value) = item?;
let key_hash = hash(&key[..]); let key_hash = hash(&key[..]);
if key != range.begin && key_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) if children.len() > 0
&& key_hash.as_slice()[0..range.level]
.iter()
.all(|x| *x == 0u8)
{ {
return Ok(RangeChecksum { return Ok(RangeChecksum {
bounds: range.clone(), bounds: range.clone(),
@ -354,17 +350,18 @@ where
}; };
let mut time = Instant::now(); let mut time = Instant::now();
while !*must_exit.borrow() { while !*must_exit.borrow() {
let sub_ck = self.range_checksum(&sub_range, must_exit).await?; let sub_ck = self
.range_checksum_cached_hash(&sub_range, must_exit)
.await?;
if sub_ck.children.len() > 0 { if let Some(hash) = &sub_ck.hash {
let sub_ck_hash = hash(&rmp_to_vec_all_named(&sub_ck)?[..]); children.push((sub_range.clone(), hash.clone()));
children.push((sub_range.clone(), sub_ck_hash));
if sub_ck.time < time { if sub_ck.time < time {
time = sub_ck.time; time = sub_ck.time;
} }
} }
if sub_ck.found_limit.is_none() || sub_ck.children.len() == 0 { if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() {
return Ok(RangeChecksum { return Ok(RangeChecksum {
bounds: range.clone(), bounds: range.clone(),
children, children,
@ -377,7 +374,7 @@ where
let actual_limit_hash = hash(&found_limit[..]); let actual_limit_hash = hash(&found_limit[..]);
if actual_limit_hash.as_slice()[0..range.level] if actual_limit_hash.as_slice()[0..range.level]
.iter() .iter()
.all(|x| *x == 0) .all(|x| *x == 0u8)
{ {
return Ok(RangeChecksum { return Ok(RangeChecksum {
bounds: range.clone(), bounds: range.clone(),
@ -393,6 +390,52 @@ where
} }
} }
fn range_checksum_cached_hash<'a>(
self: &'a Arc<Self>,
range: &'a SyncRange,
must_exit: &'a mut watch::Receiver<bool>,
) -> BoxFuture<'a, Result<RangeChecksumCache, Error>> {
async move {
let mut cache = self.cache[range.level].lock().await;
if let Some(v) = cache.get(&range) {
if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT {
return Ok(v.clone());
}
}
cache.remove(&range);
drop(cache);
let v = self.range_checksum(&range, must_exit).await?;
trace!(
"({}) New checksum calculated for {}-{}/{}, {} children",
self.table.name,
hex::encode(&range.begin)
.chars()
.take(16)
.collect::<String>(),
hex::encode(&range.end).chars().take(16).collect::<String>(),
range.level,
v.children.len()
);
let hash = if v.children.len() > 0 {
Some(hash(&rmp_to_vec_all_named(&v)?[..]))
} else {
None
};
let cache_entry = RangeChecksumCache {
hash,
found_limit: v.found_limit,
time: v.time,
};
let mut cache = self.cache[range.level].lock().await;
cache.insert(range.clone(), cache_entry.clone());
Ok(cache_entry)
}
.boxed()
}
async fn do_sync_with( async fn do_sync_with(
self: Arc<Self>, self: Arc<Self>,
partition: TodoPartition, partition: TodoPartition,
@ -423,6 +466,11 @@ where
} else { } else {
todo.push_back(root_ck); todo.push_back(root_ck);
} }
} else {
return Err(Error::BadRequest(format!(
"Invalid respone to GetRootChecksumRange RPC: {}",
debug_serialize(root_cks_resp)
)));
} }
while !todo.is_empty() && !*must_exit.borrow() { while !todo.is_empty() && !*must_exit.borrow() {
@ -435,8 +483,8 @@ where
total_children total_children
); );
let end = std::cmp::min(16, todo.len()); let step_size = std::cmp::min(16, todo.len());
let step = todo.drain(..end).collect::<Vec<_>>(); let step = todo.drain(..step_size).collect::<Vec<_>>();
let rpc_resp = self let rpc_resp = self
.table .table
@ -472,10 +520,7 @@ where
self.table.handle_update(diff_items).await?; self.table.handle_update(diff_items).await?;
} }
if items_to_send.len() > 0 { if items_to_send.len() > 0 {
self.table self.send_items(who.clone(), items_to_send).await?;
.system
.background
.spawn(self.clone().send_items(who.clone(), items_to_send));
} }
} else { } else {
return Err(Error::BadRequest(format!( return Err(Error::BadRequest(format!(
@ -487,7 +532,7 @@ where
Ok(()) Ok(())
} }
async fn send_items(self: Arc<Self>, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> { async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
info!( info!(
"({}) Sending {} items to {:?}", "({}) Sending {} items to {:?}",
self.table.name, self.table.name,
@ -542,56 +587,56 @@ where
) -> Result<SyncRPC, Error> { ) -> Result<SyncRPC, Error> {
let mut ret_ranges = vec![]; let mut ret_ranges = vec![];
let mut ret_items = vec![]; let mut ret_items = vec![];
for ckr in checksums.iter() {
let our_ckr = self.range_checksum(&ckr.bounds, must_exit).await?;
for (range, hash) in ckr.children.iter() {
// Only consider items that are in the intersection of the two ranges
// (other ranges will be exchanged at some point)
if our_ckr
.found_limit
.as_ref()
.map(|x| range.begin.as_slice() >= x.as_slice())
.unwrap_or(false)
{
break;
}
for their_ckr in checksums.iter() {
let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit).await?;
for (their_range, their_hash) in their_ckr.children.iter() {
let differs = match our_ckr let differs = match our_ckr
.children .children
.binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) .binary_search_by(|(our_range, _)| our_range.cmp(&their_range))
{ {
Err(_) => true, Err(_) => {
Ok(i) => our_ckr.children[i].1 != *hash, if their_range.level >= 1 {
let cached_hash = self
.range_checksum_cached_hash(&their_range, must_exit)
.await?;
cached_hash.hash.map(|h| h != *their_hash).unwrap_or(true)
} else {
true
}
}
Ok(i) => our_ckr.children[i].1 != *their_hash,
}; };
if differs { if differs {
ret_ranges.push(range.clone()); ret_ranges.push(their_range.clone());
if retain && range.level == 0 { if retain && their_range.level == 0 {
if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? { if let Some(item_bytes) =
self.table.store.get(their_range.begin.as_slice())?
{
ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
} }
} }
} }
} }
for (range, _hash) in our_ckr.children.iter() { for (our_range, _hash) in our_ckr.children.iter() {
if ckr if let Some(their_found_limit) = &their_ckr.found_limit {
.found_limit if our_range.begin.as_slice() > their_found_limit.as_slice() {
.as_ref()
.map(|x| range.begin.as_slice() >= x.as_slice())
.unwrap_or(false)
{
break; break;
} }
}
let not_present = ckr let not_present = our_ckr
.children .children
.binary_search_by(|(their_range, _)| their_range.begin.cmp(&range.begin)) .binary_search_by(|(their_range, _)| their_range.cmp(&our_range))
.is_err(); .is_err();
if not_present { if not_present {
if range.level > 0 { if our_range.level > 0 {
ret_ranges.push(range.clone()); ret_ranges.push(our_range.clone());
} }
if retain && range.level == 0 { if retain && our_range.level == 0 {
if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? { if let Some(item_bytes) =
self.table.store.get(our_range.begin.as_slice())?
{
ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
} }
} }
@ -673,6 +718,15 @@ impl SyncTodo {
) { ) {
let my_id = table.system.id.clone(); let my_id = table.system.id.clone();
// If it is us who are entering or leaving the system,
// initiate a full sync instead of incremental sync
if old_ring.config.members.contains_key(&my_id)
!= new_ring.config.members.contains_key(&my_id)
{
self.add_full_scan(table);
return;
}
let mut all_points = None let mut all_points = None
.into_iter() .into_iter()
.chain(table.replication.split_points(old_ring).drain(..)) .chain(table.replication.split_points(old_ring).drain(..))