Implement table gc, currently for block_ref and version only

This commit is contained in:
Alex 2021-03-12 19:57:37 +01:00
parent f4aad8fe6e
commit c475471e7a
14 changed files with 301 additions and 57 deletions

View file

@ -225,9 +225,7 @@ async fn read_and_put_blocks(
let data_md5sum = md5hasher.finalize(); let data_md5sum = md5hasher.finalize();
let data_sha256sum = sha256hasher.result(); let data_sha256sum = sha256hasher.result();
let mut hash = [0u8; 32]; let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
hash.copy_from_slice(&data_sha256sum[..]);
let data_sha256sum = Hash::from(hash);
Ok((total_size, data_md5sum, data_sha256sum)) Ok((total_size, data_md5sum, data_sha256sum))
} }

View file

@ -106,12 +106,8 @@ pub async fn check_signature(
} else { } else {
let bytes = hex::decode(authorization.content_sha256) let bytes = hex::decode(authorization.content_sha256)
.ok_or_bad_request("Invalid content sha256 hash")?; .ok_or_bad_request("Invalid content sha256 hash")?;
let mut hash = [0u8; 32]; Some(Hash::try_from(&bytes[..])
if bytes.len() != 32 { .ok_or(Error::BadRequest(format!("Invalid content sha256 hash")))?)
return Err(Error::BadRequest(format!("Invalid content sha256 hash")));
}
hash.copy_from_slice(&bytes[..]);
Some(Hash::from(hash))
}; };
Ok((key, content_sha256)) Ok((key, content_sha256))

View file

@ -248,9 +248,7 @@ impl BlockManager {
let time_msec = u64_from_bytes(&time_bytes[0..8]); let time_msec = u64_from_bytes(&time_bytes[0..8]);
let now = now_msec(); let now = now_msec();
if now >= time_msec { if now >= time_msec {
let mut hash = [0u8; 32]; let hash = Hash::try_from(&hash_bytes[..]).unwrap();
hash.copy_from_slice(hash_bytes.as_ref());
let hash = Hash::from(hash);
if let Err(e) = self.resync_iter(&hash).await { if let Err(e) = self.resync_iter(&hash).await {
warn!("Failed to resync block {:?}, retrying later: {}", hash, e); warn!("Failed to resync block {:?}, retrying later: {}", hash, e);
@ -340,15 +338,11 @@ impl BlockManager {
need_nodes.len() need_nodes.len()
); );
let put_block_message = Arc::new(self.read_block(hash).await?); let put_block_message = self.read_block(hash).await?;
let put_resps = join_all(need_nodes.iter().map(|to| { self.rpc_client.try_call_many(
self.rpc_client &need_nodes[..],
.call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT) put_block_message,
})) RequestStrategy::with_quorum(need_nodes.len()).with_timeout(BLOCK_RW_TIMEOUT)).await?;
.await;
for resp in put_resps {
resp?;
}
} }
trace!( trace!(
"Deleting block {:?}, offload finished ({} / {})", "Deleting block {:?}, offload finished ({} / {})",

View file

@ -27,6 +27,9 @@ impl Entry<Hash, UUID> for BlockRef {
fn sort_key(&self) -> &UUID { fn sort_key(&self) -> &UUID {
&self.version &self.version
} }
fn is_tombstone(&self) -> bool {
self.deleted.get()
}
} }
impl CRDT for BlockRef { impl CRDT for BlockRef {

View file

@ -78,6 +78,9 @@ impl Entry<Hash, EmptyKey> for Version {
fn sort_key(&self) -> &EmptyKey { fn sort_key(&self) -> &EmptyKey {
&EmptyKey &EmptyKey
} }
fn is_tombstone(&self) -> bool {
self.deleted.get()
}
} }
impl CRDT for Version { impl CRDT for Version {

View file

@ -197,11 +197,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
if !strategy.rs_interrupt_after_quorum { if !strategy.rs_interrupt_after_quorum {
let wait_finished_fut = tokio::spawn(async move { let wait_finished_fut = tokio::spawn(async move {
resp_stream.collect::<Vec<_>>().await; resp_stream.collect::<Vec<_>>().await;
Ok(())
}); });
self.background.spawn(wait_finished_fut.map(|x| { self.background.spawn(wait_finished_fut.map(|_| Ok(())));
x.unwrap_or_else(|e| Err(Error::Message(format!("Await failed: {}", e))))
}));
} }
Ok(results) Ok(results)

View file

@ -1,3 +1,4 @@
use core::borrow::Borrow;
use std::sync::Arc; use std::sync::Arc;
use log::warn; use log::warn;
@ -17,6 +18,7 @@ pub struct TableData<F: TableSchema> {
pub instance: F, pub instance: F,
pub store: sled::Tree, pub store: sled::Tree,
pub gc_todo: sled::Tree,
pub merkle_updater: Arc<MerkleUpdater>, pub merkle_updater: Arc<MerkleUpdater>,
} }
@ -41,6 +43,10 @@ where
.open_tree(&format!("{}:merkle_tree", name)) .open_tree(&format!("{}:merkle_tree", name))
.expect("Unable to open DB Merkle tree tree"); .expect("Unable to open DB Merkle tree tree");
let gc_todo = db
.open_tree(&format!("{}:gc_todo", name))
.expect("Unable to open DB tree");
let merkle_updater = MerkleUpdater::launch( let merkle_updater = MerkleUpdater::launch(
name.clone(), name.clone(),
background, background,
@ -52,6 +58,7 @@ where
name, name,
instance, instance,
store, store,
gc_todo,
merkle_updater, merkle_updater,
}) })
} }
@ -103,10 +110,17 @@ where
} }
// Mutation functions // Mutation functions
// When changing this code, take care of propagating modifications correctly:
// - When an entry is modified or deleted, call the updated() function
// on the table instance
// - When an entry is modified or deleted, add it to the merkle updater's todo list.
// This has to be done atomically with the modification for the merkle updater
// to maintain consistency. The merkle updater must then be notified with todo_notify.
// - When an entry is updated to be a tombstone, add it to the gc_todo tree
pub(crate) fn update_many(&self, entries: &[Arc<ByteBuf>]) -> Result<(), Error> { pub(crate) fn update_many<T: Borrow<ByteBuf>>(&self, entries: &[T]) -> Result<(), Error> {
for update_bytes in entries.iter() { for update_bytes in entries.iter() {
self.update_entry(update_bytes.as_slice())?; self.update_entry(update_bytes.borrow().as_slice())?;
} }
Ok(()) Ok(())
} }
@ -115,8 +129,8 @@ where
let update = self.decode_entry(update_bytes)?; let update = self.decode_entry(update_bytes)?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key()); let tree_key = self.tree_key(update.partition_key(), update.sort_key());
let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| { let changed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
let (old_entry, new_entry) = match db.get(&tree_key)? { let (old_entry, new_entry) = match store.get(&tree_key)? {
Some(prev_bytes) => { Some(prev_bytes) => {
let old_entry = self let old_entry = self
.decode_entry(&prev_bytes) .decode_entry(&prev_bytes)
@ -132,27 +146,32 @@ where
let new_bytes = rmp_to_vec_all_named(&new_entry) let new_bytes = rmp_to_vec_all_named(&new_entry)
.map_err(Error::RMPEncode) .map_err(Error::RMPEncode)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?; .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?; let new_bytes_hash = blake2sum(&new_bytes[..]);
db.insert(tree_key.clone(), new_bytes)?; mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?;
Ok(Some((old_entry, new_entry))) store.insert(tree_key.clone(), new_bytes)?;
Ok(Some((old_entry, new_entry, new_bytes_hash)))
} else { } else {
Ok(None) Ok(None)
} }
})?; })?;
if let Some((old_entry, new_entry)) = changed { if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
let is_tombstone = new_entry.is_tombstone();
self.instance.updated(old_entry, Some(new_entry)); self.instance.updated(old_entry, Some(new_entry));
self.merkle_updater.todo_notify.notify(); self.merkle_updater.todo_notify.notify();
if is_tombstone {
self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?;
}
} }
Ok(()) Ok(())
} }
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| { let removed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
if let Some(cur_v) = txn.get(k)? { if let Some(cur_v) = store.get(k)? {
if cur_v == v { if cur_v == v {
txn.remove(k)?; store.remove(k)?;
mkl_todo.insert(k, vec![])?; mkl_todo.insert(k, vec![])?;
return Ok(true); return Ok(true);
} }
@ -168,6 +187,30 @@ where
Ok(removed) Ok(removed)
} }
pub(crate) fn delete_if_equal_hash(self: &Arc<Self>, k: &[u8], vhash: Hash) -> Result<bool, Error> {
let removed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
if let Some(cur_v) = store.get(k)? {
if blake2sum(&cur_v[..]) == vhash {
store.remove(k)?;
mkl_todo.insert(k, vec![])?;
return Ok(Some(cur_v));
}
}
Ok(None)
})?;
if let Some(old_v) = removed {
let old_entry = self.decode_entry(&old_v[..])?;
self.instance.updated(Some(old_entry), None);
self.merkle_updater.todo_notify.notify();
Ok(true)
} else {
Ok(false)
}
}
// ---- Utility functions ----
pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
let mut ret = p.hash().to_vec(); let mut ret = p.hash().to_vec();
ret.extend(s.sort_key()); ret.extend(s.sort_key());

212
src/table/gc.rs Normal file
View file

@ -0,0 +1,212 @@
use std::sync::Arc;
use std::time::Duration;
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use tokio::sync::watch;
use garage_util::data::*;
use garage_util::error::Error;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use crate::data::*;
use crate::table::*;
use crate::schema::*;
use crate::replication::*;
const TABLE_GC_BATCH_SIZE: usize = 1024;
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
pub struct TableGC<F: TableSchema, R: TableReplication> {
data: Arc<TableData<F>>,
aux: Arc<TableAux<R>>,
rpc_client: Arc<RpcClient<GcRPC>>,
}
#[derive(Serialize, Deserialize)]
enum GcRPC {
Update(Vec<ByteBuf>),
DeleteIfEqualHash(Vec<(ByteBuf, Hash)>),
Ok,
}
impl RpcMessage for GcRPC {}
impl<F, R> TableGC<F, R>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
pub(crate) fn launch(
data: Arc<TableData<F>>,
aux: Arc<TableAux<R>>,
rpc_server: &mut RpcServer,
) -> Arc<Self> {
let rpc_path = format!("table_{}/gc", data.name);
let rpc_client = aux.system.rpc_client::<GcRPC>(&rpc_path);
let gc = Arc::new(Self {
data: data.clone(),
aux: aux.clone(),
rpc_client,
});
gc.register_handler(rpc_server, rpc_path);
let gc1 = gc.clone();
aux.system.background.spawn_worker(
format!("GC loop for {}", data.name),
move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
);
gc
}
async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) -> Result<(), Error> {
while !*must_exit.borrow() {
match self.gc_loop_iter().await {
Ok(true) => {
// Stuff was done, loop imediately
}
Ok(false) => {
select! {
_ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (),
_ = must_exit.recv().fuse() => (),
}
}
Err(e) => {
warn!("({}) Error doing GC: {}", self.data.name, e);
}
}
}
Ok(())
}
async fn gc_loop_iter(&self) -> Result<bool, Error> {
let mut entries = vec![];
let mut excluded = vec![];
for item in self.data.gc_todo.iter() {
let (k, vhash) = item?;
let vhash = Hash::try_from(&vhash[..]).unwrap();
let v_opt = self.data.store.get(&k[..])?
.filter(|v| blake2sum(&v[..]) == vhash);
if let Some(v) = v_opt {
entries.push((ByteBuf::from(k.to_vec()), vhash, ByteBuf::from(v.to_vec())));
if entries.len() >= TABLE_GC_BATCH_SIZE {
break;
}
} else {
excluded.push((k, vhash));
}
}
for (k, vhash) in excluded {
let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?;
}
if entries.len() == 0 {
// Nothing to do in this iteration
return Ok(false);
}
debug!("({}) GC: doing {} items", self.data.name, entries.len());
let mut partitions = HashMap::new();
for (k, vhash, v) in entries {
let pkh = Hash::try_from(&k[..32]).unwrap();
let mut nodes = self.aux.replication.write_nodes(&pkh, &self.aux.system);
nodes.retain(|x| *x != self.aux.system.id);
nodes.sort();
if !partitions.contains_key(&nodes) {
partitions.insert(nodes.clone(), vec![]);
}
partitions.get_mut(&nodes).unwrap().push((k, vhash, v));
}
let resps = join_all(partitions.into_iter()
.map(|(nodes, items)| self.try_send_and_delete(nodes, items))).await;
for resp in resps {
if let Err(e) = resp {
warn!("({}) Unable to send and delete for GC: {}", self.data.name, e);
}
}
Ok(true)
}
async fn try_send_and_delete(&self, nodes: Vec<UUID>, items: Vec<(ByteBuf, Hash, ByteBuf)>) -> Result<(), Error> {
let n_items = items.len();
let mut updates = vec![];
let mut deletes = vec![];
for (k, vhash, v) in items {
updates.push(v);
deletes.push((k, vhash));
}
self.rpc_client.try_call_many(
&nodes[..],
GcRPC::Update(updates),
RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?;
info!("({}) GC: {} items successfully pushed, will try to delete.", self.data.name, n_items);
self.rpc_client.try_call_many(
&nodes[..],
GcRPC::DeleteIfEqualHash(deletes.clone()),
RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?;
for (k, vhash) in deletes {
self.data.delete_if_equal_hash(&k[..], vhash)?;
let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?;
}
Ok(())
}
// ---- RPC HANDLER ----
fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
let self2 = self.clone();
rpc_server.add_handler::<GcRPC, _, _>(path, move |msg, _addr| {
let self2 = self2.clone();
async move { self2.handle_rpc(&msg).await }
});
let self2 = self.clone();
self.rpc_client
.set_local_handler(self.aux.system.id, move |msg| {
let self2 = self2.clone();
async move { self2.handle_rpc(&msg).await }
});
}
async fn handle_rpc(self: &Arc<Self>, message: &GcRPC) -> Result<GcRPC, Error> {
match message {
GcRPC::Update(items) => {
self.data.update_many(items)?;
Ok(GcRPC::Ok)
}
GcRPC::DeleteIfEqualHash(items) => {
for (key, vhash) in items.iter() {
self.data.delete_if_equal_hash(&key[..], *vhash)?;
}
Ok(GcRPC::Ok)
}
_ => Err(Error::Message(format!("Unexpected GC RPC"))),
}
}
}

View file

@ -11,6 +11,7 @@ pub mod data;
pub mod merkle; pub mod merkle;
pub mod replication; pub mod replication;
pub mod sync; pub mod sync;
pub mod gc;
pub mod table; pub mod table;
pub use schema::*; pub use schema::*;

View file

@ -139,10 +139,7 @@ impl MerkleUpdater {
let new_vhash = if vhash_by.len() == 0 { let new_vhash = if vhash_by.len() == 0 {
None None
} else { } else {
let vhash_by: [u8; 32] = vhash_by Some(Hash::try_from(&vhash_by[..]).unwrap())
.try_into()
.map_err(|_| Error::Message(format!("Invalid value in Merkle todo table")))?;
Some(Hash::from(vhash_by))
}; };
let key = MerkleNodeKey { let key = MerkleNodeKey {

View file

@ -41,6 +41,8 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
{ {
fn partition_key(&self) -> &P; fn partition_key(&self) -> &P;
fn sort_key(&self) -> &S; fn sort_key(&self) -> &S;
fn is_tombstone(&self) -> bool { false }
} }
pub trait TableSchema: Send + Sync { pub trait TableSchema: Send + Sync {

View file

@ -3,7 +3,6 @@ use std::convert::TryInto;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use futures::future::join_all;
use futures::{pin_mut, select}; use futures::{pin_mut, select};
use futures_util::future::*; use futures_util::future::*;
use futures_util::stream::*; use futures_util::stream::*;
@ -347,16 +346,11 @@ where
nodes: &[UUID], nodes: &[UUID],
) -> Result<(), Error> { ) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>(); let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
let update_msg = Arc::new(SyncRPC::Items(values));
for res in join_all(nodes.iter().map(|to| { self.rpc_client.try_call_many(
self.rpc_client &nodes[..],
.call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT) SyncRPC::Items(values),
})) RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT)).await?;
.await
{
res?;
}
// All remote nodes have written those items, now we can delete them locally // All remote nodes have written those items, now we can delete them locally
let mut not_removed = 0; let mut not_removed = 0;
@ -577,7 +571,7 @@ where
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
pub(crate) async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> { async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> {
match message { match message {
SyncRPC::RootCkHash(range, h) => { SyncRPC::RootCkHash(range, h) => {
let root_ck = self.get_root_ck(*range)?; let root_ck = self.get_root_ck(*range)?;

View file

@ -18,6 +18,7 @@ use crate::data::*;
use crate::replication::*; use crate::replication::*;
use crate::schema::*; use crate::schema::*;
use crate::sync::*; use crate::sync::*;
use crate::gc::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
@ -44,8 +45,6 @@ pub(crate) enum TableRPC<F: TableSchema> {
ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize), ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize),
Update(Vec<Arc<ByteBuf>>), Update(Vec<Arc<ByteBuf>>),
SyncRPC(SyncRPC),
} }
impl<F: TableSchema> RpcMessage for TableRPC<F> {} impl<F: TableSchema> RpcMessage for TableRPC<F> {}
@ -76,6 +75,7 @@ where
}); });
let syncer = TableSyncer::launch(data.clone(), aux.clone(), rpc_server); let syncer = TableSyncer::launch(data.clone(), aux.clone(), rpc_server);
TableGC::launch(data.clone(), aux.clone(), rpc_server);
let table = Arc::new(Self { let table = Arc::new(Self {
data, data,
@ -308,10 +308,6 @@ where
self.data.update_many(pairs)?; self.data.update_many(pairs)?;
Ok(TableRPC::Ok) Ok(TableRPC::Ok)
} }
TableRPC::SyncRPC(rpc) => {
let response = self.syncer.handle_rpc(rpc).await?;
Ok(TableRPC::SyncRPC(response))
}
_ => Err(Error::BadRPC(format!("Unexpected table RPC"))), _ => Err(Error::BadRPC(format!("Unexpected table RPC"))),
} }
} }

View file

@ -71,6 +71,14 @@ impl FixedBytes32 {
pub fn to_vec(&self) -> Vec<u8> { pub fn to_vec(&self) -> Vec<u8> {
self.0.to_vec() self.0.to_vec()
} }
pub fn try_from(by: &[u8]) -> Option<Self> {
if by.len() != 32 {
return None;
}
let mut ret = [0u8; 32];
ret.copy_from_slice(by);
Some(Self(ret))
}
} }
pub type UUID = FixedBytes32; pub type UUID = FixedBytes32;