cargo fmt
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Alex 2021-03-12 21:52:19 +01:00
parent c475471e7a
commit 831eb35763
8 changed files with 121 additions and 74 deletions

View file

@ -106,8 +106,10 @@ pub async fn check_signature(
} else { } else {
let bytes = hex::decode(authorization.content_sha256) let bytes = hex::decode(authorization.content_sha256)
.ok_or_bad_request("Invalid content sha256 hash")?; .ok_or_bad_request("Invalid content sha256 hash")?;
Some(Hash::try_from(&bytes[..]) Some(
.ok_or(Error::BadRequest(format!("Invalid content sha256 hash")))?) Hash::try_from(&bytes[..])
.ok_or(Error::BadRequest(format!("Invalid content sha256 hash")))?,
)
}; };
Ok((key, content_sha256)) Ok((key, content_sha256))

View file

@ -339,10 +339,14 @@ impl BlockManager {
); );
let put_block_message = self.read_block(hash).await?; let put_block_message = self.read_block(hash).await?;
self.rpc_client.try_call_many( self.rpc_client
.try_call_many(
&need_nodes[..], &need_nodes[..],
put_block_message, put_block_message,
RequestStrategy::with_quorum(need_nodes.len()).with_timeout(BLOCK_RW_TIMEOUT)).await?; RequestStrategy::with_quorum(need_nodes.len())
.with_timeout(BLOCK_RW_TIMEOUT),
)
.await?;
} }
trace!( trace!(
"Deleting block {:?}, offload finished ({} / {})", "Deleting block {:?}, offload finished ({} / {})",

View file

@ -129,31 +129,32 @@ where
let update = self.decode_entry(update_bytes)?; let update = self.decode_entry(update_bytes)?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key()); let tree_key = self.tree_key(update.partition_key(), update.sort_key());
let changed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { let changed =
let (old_entry, new_entry) = match store.get(&tree_key)? { (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
Some(prev_bytes) => { let (old_entry, new_entry) = match store.get(&tree_key)? {
let old_entry = self Some(prev_bytes) => {
.decode_entry(&prev_bytes) let old_entry = self
.map_err(sled::transaction::ConflictableTransactionError::Abort)?; .decode_entry(&prev_bytes)
let mut new_entry = old_entry.clone(); .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
new_entry.merge(&update); let mut new_entry = old_entry.clone();
(Some(old_entry), new_entry) new_entry.merge(&update);
} (Some(old_entry), new_entry)
None => (None, update.clone()), }
}; None => (None, update.clone()),
};
if Some(&new_entry) != old_entry.as_ref() { if Some(&new_entry) != old_entry.as_ref() {
let new_bytes = rmp_to_vec_all_named(&new_entry) let new_bytes = rmp_to_vec_all_named(&new_entry)
.map_err(Error::RMPEncode) .map_err(Error::RMPEncode)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?; .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
let new_bytes_hash = blake2sum(&new_bytes[..]); let new_bytes_hash = blake2sum(&new_bytes[..]);
mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?;
store.insert(tree_key.clone(), new_bytes)?; store.insert(tree_key.clone(), new_bytes)?;
Ok(Some((old_entry, new_entry, new_bytes_hash))) Ok(Some((old_entry, new_entry, new_bytes_hash)))
} else { } else {
Ok(None) Ok(None)
} }
})?; })?;
if let Some((old_entry, new_entry, new_bytes_hash)) = changed { if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
let is_tombstone = new_entry.is_tombstone(); let is_tombstone = new_entry.is_tombstone();
@ -168,16 +169,17 @@ where
} }
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
let removed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { let removed =
if let Some(cur_v) = store.get(k)? { (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
if cur_v == v { if let Some(cur_v) = store.get(k)? {
store.remove(k)?; if cur_v == v {
mkl_todo.insert(k, vec![])?; store.remove(k)?;
return Ok(true); mkl_todo.insert(k, vec![])?;
return Ok(true);
}
} }
} Ok(false)
Ok(false) })?;
})?;
if removed { if removed {
let old_entry = self.decode_entry(v)?; let old_entry = self.decode_entry(v)?;
@ -187,17 +189,22 @@ where
Ok(removed) Ok(removed)
} }
pub(crate) fn delete_if_equal_hash(self: &Arc<Self>, k: &[u8], vhash: Hash) -> Result<bool, Error> { pub(crate) fn delete_if_equal_hash(
let removed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { self: &Arc<Self>,
if let Some(cur_v) = store.get(k)? { k: &[u8],
if blake2sum(&cur_v[..]) == vhash { vhash: Hash,
store.remove(k)?; ) -> Result<bool, Error> {
mkl_todo.insert(k, vec![])?; let removed =
return Ok(Some(cur_v)); (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
if let Some(cur_v) = store.get(k)? {
if blake2sum(&cur_v[..]) == vhash {
store.remove(k)?;
mkl_todo.insert(k, vec![])?;
return Ok(Some(cur_v));
}
} }
} Ok(None)
Ok(None) })?;
})?;
if let Some(old_v) = removed { if let Some(old_v) = removed {
let old_entry = self.decode_entry(&old_v[..])?; let old_entry = self.decode_entry(&old_v[..])?;

View file

@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::collections::HashMap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf; use serde_bytes::ByteBuf;
@ -17,9 +17,9 @@ use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*; use garage_rpc::rpc_server::*;
use crate::data::*; use crate::data::*;
use crate::table::*;
use crate::schema::*;
use crate::replication::*; use crate::replication::*;
use crate::schema::*;
use crate::table::*;
const TABLE_GC_BATCH_SIZE: usize = 1024; const TABLE_GC_BATCH_SIZE: usize = 1024;
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
@ -99,7 +99,10 @@ where
let vhash = Hash::try_from(&vhash[..]).unwrap(); let vhash = Hash::try_from(&vhash[..]).unwrap();
let v_opt = self.data.store.get(&k[..])? let v_opt = self
.data
.store
.get(&k[..])?
.filter(|v| blake2sum(&v[..]) == vhash); .filter(|v| blake2sum(&v[..]) == vhash);
if let Some(v) = v_opt { if let Some(v) = v_opt {
@ -113,7 +116,10 @@ where
} }
for (k, vhash) in excluded { for (k, vhash) in excluded {
let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?; let _ = self
.data
.gc_todo
.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?;
} }
if entries.len() == 0 { if entries.len() == 0 {
@ -136,18 +142,29 @@ where
partitions.get_mut(&nodes).unwrap().push((k, vhash, v)); partitions.get_mut(&nodes).unwrap().push((k, vhash, v));
} }
let resps = join_all(partitions.into_iter() let resps = join_all(
.map(|(nodes, items)| self.try_send_and_delete(nodes, items))).await; partitions
.into_iter()
.map(|(nodes, items)| self.try_send_and_delete(nodes, items)),
)
.await;
for resp in resps { for resp in resps {
if let Err(e) = resp { if let Err(e) = resp {
warn!("({}) Unable to send and delete for GC: {}", self.data.name, e); warn!(
"({}) Unable to send and delete for GC: {}",
self.data.name, e
);
} }
} }
Ok(true) Ok(true)
} }
async fn try_send_and_delete(&self, nodes: Vec<UUID>, items: Vec<(ByteBuf, Hash, ByteBuf)>) -> Result<(), Error> { async fn try_send_and_delete(
&self,
nodes: Vec<UUID>,
items: Vec<(ByteBuf, Hash, ByteBuf)>,
) -> Result<(), Error> {
let n_items = items.len(); let n_items = items.len();
let mut updates = vec![]; let mut updates = vec![];
@ -157,21 +174,33 @@ where
deletes.push((k, vhash)); deletes.push((k, vhash));
} }
self.rpc_client.try_call_many( self.rpc_client
&nodes[..], .try_call_many(
GcRPC::Update(updates), &nodes[..],
RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?; GcRPC::Update(updates),
RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
)
.await?;
info!("({}) GC: {} items successfully pushed, will try to delete.", self.data.name, n_items); info!(
"({}) GC: {} items successfully pushed, will try to delete.",
self.data.name, n_items
);
self.rpc_client
.try_call_many(
&nodes[..],
GcRPC::DeleteIfEqualHash(deletes.clone()),
RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
)
.await?;
self.rpc_client.try_call_many(
&nodes[..],
GcRPC::DeleteIfEqualHash(deletes.clone()),
RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?;
for (k, vhash) in deletes { for (k, vhash) in deletes {
self.data.delete_if_equal_hash(&k[..], vhash)?; self.data.delete_if_equal_hash(&k[..], vhash)?;
let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?; let _ = self
.data
.gc_todo
.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?;
} }
Ok(()) Ok(())

View file

@ -8,10 +8,10 @@ pub mod schema;
pub mod util; pub mod util;
pub mod data; pub mod data;
pub mod gc;
pub mod merkle; pub mod merkle;
pub mod replication; pub mod replication;
pub mod sync; pub mod sync;
pub mod gc;
pub mod table; pub mod table;
pub use schema::*; pub use schema::*;

View file

@ -42,7 +42,9 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
fn partition_key(&self) -> &P; fn partition_key(&self) -> &P;
fn sort_key(&self) -> &S; fn sort_key(&self) -> &S;
fn is_tombstone(&self) -> bool { false } fn is_tombstone(&self) -> bool {
false
}
} }
pub trait TableSchema: Send + Sync { pub trait TableSchema: Send + Sync {

View file

@ -347,10 +347,13 @@ where
) -> Result<(), Error> { ) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>(); let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
self.rpc_client.try_call_many( self.rpc_client
&nodes[..], .try_call_many(
SyncRPC::Items(values), &nodes[..],
RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT)).await?; SyncRPC::Items(values),
RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT),
)
.await?;
// All remote nodes have written those items, now we can delete them locally // All remote nodes have written those items, now we can delete them locally
let mut not_removed = 0; let mut not_removed = 0;

View file

@ -15,10 +15,10 @@ use garage_rpc::rpc_server::*;
use crate::crdt::CRDT; use crate::crdt::CRDT;
use crate::data::*; use crate::data::*;
use crate::gc::*;
use crate::replication::*; use crate::replication::*;
use crate::schema::*; use crate::schema::*;
use crate::sync::*; use crate::sync::*;
use crate::gc::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);