WIP add content defined chunking #42

Closed
trinity-1686a wants to merge 42 commits from content-defined-chunking into master
6 changed files with 85 additions and 146 deletions
Showing only changes of commit 2a41b82384 - Show all commits

View file

@ -5,6 +5,11 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*; use garage_util::data::*;
// A partition number is encoded on 16 bits,
// i.e. we have up to 2**16 partitions.
// (in practice we have exactly 2**PARTITION_BITS partitions)
pub type Partition = u16;
// TODO: make this constant parametrizable in the config file // TODO: make this constant parametrizable in the config file
// For deployments with many nodes it might make sense to bump // For deployments with many nodes it might make sense to bump
// it up to 10. // it up to 10.
@ -17,6 +22,7 @@ const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_B
// (most deployments use a replication factor of 3, so...) // (most deployments use a replication factor of 3, so...)
pub const MAX_REPLICATION: usize = 3; pub const MAX_REPLICATION: usize = 3;
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfig { pub struct NetworkConfig {
pub members: HashMap<UUID, NetworkConfigEntry>, pub members: HashMap<UUID, NetworkConfigEntry>,
@ -170,11 +176,24 @@ impl Ring {
Self { config, ring } Self { config, ring }
} }
pub fn partition_of(&self, from: &Hash) -> u16 { pub fn partition_of(&self, from: &Hash) -> Partition {
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
top >> (16 - PARTITION_BITS) top >> (16 - PARTITION_BITS)
} }
pub fn partitions(&self) -> Vec<(Partition, Hash)> {
let mut ret = vec![];
for (i, entry) in self.ring.iter().enumerate() {
ret.push((i as u16, entry.location));
}
if ret.len() > 0 {
assert_eq!(ret[0].1, [0u8; 32].into());
}
ret
}
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> { pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
if self.ring.len() != 1 << PARTITION_BITS { if self.ring.len() != 1 << PARTITION_BITS {
warn!("Ring not yet ready, read/writes will be lost!"); warn!("Ring not yet ready, read/writes will be lost!");

View file

@ -1,4 +1,3 @@
use std::convert::TryInto;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -15,22 +14,12 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
use garage_rpc::ring::*;
use crate::data::*; use crate::data::*;
use crate::replication::*; use crate::replication::*;
use crate::schema::*; use crate::schema::*;
pub type MerklePartition = [u8; 2];
pub fn hash_of_merkle_partition(p: MerklePartition) -> Hash {
let mut partition_pos = [0u8; 32];
partition_pos[0..2].copy_from_slice(&p[..]);
partition_pos.into()
}
pub fn hash_of_merkle_partition_opt(p: Option<MerklePartition>) -> Hash {
p.map(hash_of_merkle_partition)
.unwrap_or([0xFFu8; 32].into())
}
// This modules partitions the data in 2**16 partitions, based on the top // This modules partitions the data in 2**16 partitions, based on the top
// 16 bits (two bytes) of item's partition keys' hashes. // 16 bits (two bytes) of item's partition keys' hashes.
@ -57,8 +46,8 @@ pub struct MerkleUpdater<F: TableSchema, R: TableReplication> {
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MerkleNodeKey { pub struct MerkleNodeKey {
// partition: first 16 bits (two bytes) of the partition_key's hash // partition number
pub partition: [u8; 2], pub partition: Partition,
// prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key) // prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key)
#[serde(with = "serde_bytes")] #[serde(with = "serde_bytes")]
@ -143,7 +132,7 @@ where
}; };
let key = MerkleNodeKey { let key = MerkleNodeKey {
partition: k[0..2].try_into().unwrap(), partition: self.data.replication.partition_of(&Hash::try_from(&k[0..32]).unwrap()),
prefix: vec![], prefix: vec![],
}; };
self.data self.data
@ -325,7 +314,7 @@ where
impl MerkleNodeKey { impl MerkleNodeKey {
fn encode(&self) -> Vec<u8> { fn encode(&self) -> Vec<u8> {
let mut ret = Vec::with_capacity(2 + self.prefix.len()); let mut ret = Vec::with_capacity(2 + self.prefix.len());
ret.extend(&self.partition[..]); ret.extend(&u16::to_be_bytes(self.partition)[..]);
ret.extend(&self.prefix[..]); ret.extend(&self.prefix[..]);
ret ret
} }
@ -443,3 +432,9 @@ fn test_intermediate_aux() {
] ]
); );
} }
impl MerkleNode {
pub fn is_empty(&self) -> bool {
*self == MerkleNode::Empty
}
}

View file

@ -1,7 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use garage_rpc::membership::System; use garage_rpc::membership::System;
use garage_rpc::ring::Ring; use garage_rpc::ring::*;
use garage_util::data::*; use garage_util::data::*;
use crate::replication::*; use crate::replication::*;
@ -19,10 +19,6 @@ impl TableReplication for TableFullReplication {
// Advantage: do all reads locally, extremely fast // Advantage: do all reads locally, extremely fast
// Inconvenient: only suitable to reasonably small tables // Inconvenient: only suitable to reasonably small tables
fn partition_of(&self, _hash: &Hash) -> u16 {
0u16
}
fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> { fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> {
vec![self.system.id] vec![self.system.id]
} }
@ -46,9 +42,10 @@ impl TableReplication for TableFullReplication {
self.max_faults self.max_faults
} }
fn split_points(&self, _ring: &Ring) -> Vec<Hash> { fn partition_of(&self, _hash: &Hash) -> Partition {
let mut ret = vec![]; 0u16
ret.push([0u8; 32].into()); }
ret fn partitions(&self) -> Vec<(Partition, Hash)> {
vec![(0u16, [0u8; 32].into())]
} }
} }

View file

@ -1,4 +1,4 @@
use garage_rpc::ring::Ring; use garage_rpc::ring::*;
use garage_util::data::*; use garage_util::data::*;
@ -6,9 +6,6 @@ pub trait TableReplication: Send + Sync {
// See examples in table_sharded.rs and table_fullcopy.rs // See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods // To understand various replication methods
// Partition number of data item (for Merkle tree)
fn partition_of(&self, hash: &Hash) -> u16;
// Which nodes to send reads from // Which nodes to send reads from
fn read_nodes(&self, hash: &Hash) -> Vec<UUID>; fn read_nodes(&self, hash: &Hash) -> Vec<UUID>;
fn read_quorum(&self) -> usize; fn read_quorum(&self) -> usize;
@ -18,6 +15,7 @@ pub trait TableReplication: Send + Sync {
fn write_quorum(&self) -> usize; fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize; fn max_write_errors(&self) -> usize;
// Get partition boundaries // Accessing partitions, for Merkle tree & sync
fn split_points(&self, ring: &Ring) -> Vec<Hash>; fn partition_of(&self, hash: &Hash) -> Partition;
fn partitions(&self) -> Vec<(Partition, Hash)>;
} }

View file

@ -1,7 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use garage_rpc::membership::System; use garage_rpc::membership::System;
use garage_rpc::ring::Ring; use garage_rpc::ring::*;
use garage_util::data::*; use garage_util::data::*;
use crate::replication::*; use crate::replication::*;
@ -22,10 +22,6 @@ impl TableReplication for TableShardedReplication {
// - reads are done on all of the nodes that replicate the data // - reads are done on all of the nodes that replicate the data
// - writes as well // - writes as well
fn partition_of(&self, hash: &Hash) -> u16 {
self.system.ring.borrow().partition_of(hash)
}
fn read_nodes(&self, hash: &Hash) -> Vec<UUID> { fn read_nodes(&self, hash: &Hash) -> Vec<UUID> {
let ring = self.system.ring.borrow().clone(); let ring = self.system.ring.borrow().clone();
ring.walk_ring(&hash, self.replication_factor) ring.walk_ring(&hash, self.replication_factor)
@ -45,16 +41,10 @@ impl TableReplication for TableShardedReplication {
self.replication_factor - self.write_quorum self.replication_factor - self.write_quorum
} }
fn split_points(&self, ring: &Ring) -> Vec<Hash> { fn partition_of(&self, hash: &Hash) -> Partition {
let mut ret = vec![]; self.system.ring.borrow().partition_of(hash)
for entry in ring.ring.iter() {
ret.push(entry.location);
} }
if ret.len() > 0 { fn partitions(&self) -> Vec<(Partition, Hash)> {
assert_eq!(ret[0], [0u8; 32].into()); self.system.ring.borrow().partitions()
}
ret
} }
} }

View file

@ -1,5 +1,4 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use std::convert::TryInto;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -15,7 +14,7 @@ use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
use garage_rpc::membership::System; use garage_rpc::membership::System;
use garage_rpc::ring::Ring; use garage_rpc::ring::*;
use garage_rpc::rpc_client::*; use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*; use garage_rpc::rpc_server::*;
@ -38,20 +37,10 @@ pub struct TableSyncer<F: TableSchema, R: TableReplication> {
rpc_client: Arc<RpcClient<SyncRPC>>, rpc_client: Arc<RpcClient<SyncRPC>>,
} }
type RootCk = Vec<(MerklePartition, Hash)>;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct PartitionRange {
begin: MerklePartition,
// if end is None, go all the way to partition 0xFFFF included
end: Option<MerklePartition>,
}
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub(crate) enum SyncRPC { pub(crate) enum SyncRPC {
RootCkHash(PartitionRange, Hash), RootCkHash(Partition, Hash),
RootCkList(PartitionRange, RootCk), RootCkDifferent(bool),
CkNoDifference,
GetNode(MerkleNodeKey), GetNode(MerkleNodeKey),
Node(MerkleNodeKey, MerkleNode), Node(MerkleNodeKey, MerkleNode),
Items(Vec<Arc<ByteBuf>>), Items(Vec<Arc<ByteBuf>>),
@ -66,7 +55,9 @@ struct SyncTodo {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct TodoPartition { struct TodoPartition {
range: PartitionRange, partition: Partition,
begin: Hash,
end: Hash,
// Are we a node that stores this partition or not? // Are we a node that stores this partition or not?
retain: bool, retain: bool,
@ -222,7 +213,7 @@ where
let nodes = self let nodes = self
.data .data
.replication .replication
.write_nodes(&hash_of_merkle_partition(partition.range.begin)) .write_nodes(&partition.begin)
.into_iter() .into_iter()
.filter(|node| *node != my_id) .filter(|node| *node != my_id)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -254,8 +245,8 @@ where
} }
} else { } else {
self.offload_partition( self.offload_partition(
&hash_of_merkle_partition(partition.range.begin), &partition.begin,
&hash_of_merkle_partition_opt(partition.range.end), &partition.end,
must_exit, must_exit,
) )
.await?; .await?;
@ -364,30 +355,13 @@ where
// side to the other will happen when the other side syncs with us, // side to the other will happen when the other side syncs with us,
// which they also do regularly. // which they also do regularly.
fn get_root_ck(&self, range: PartitionRange) -> Result<RootCk, Error> { fn get_root_ck(&self, partition: Partition) -> Result<(MerkleNodeKey, MerkleNode), Error> {
let begin = u16::from_be_bytes(range.begin);
let range_iter = match range.end {
Some(end) => {
let end = u16::from_be_bytes(end);
begin..=(end - 1)
}
None => begin..=0xFFFF,
};
let mut ret = vec![];
for i in range_iter {
let key = MerkleNodeKey { let key = MerkleNodeKey {
partition: u16::to_be_bytes(i), partition,
prefix: vec![], prefix: vec![],
}; };
match self.merkle.read_node(&key)? { let node = self.merkle.read_node(&key)?;
MerkleNode::Empty => (), Ok((key, node))
x => {
ret.push((key.partition, hash_of(&x)?));
}
}
}
Ok(ret)
} }
async fn do_sync_with( async fn do_sync_with(
@ -396,7 +370,7 @@ where
who: UUID, who: UUID,
must_exit: watch::Receiver<bool>, must_exit: watch::Receiver<bool>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let root_ck = self.get_root_ck(partition.range)?; let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?;
if root_ck.is_empty() { if root_ck.is_empty() {
debug!( debug!(
"({}) Sync {:?} with {:?}: partition is empty.", "({}) Sync {:?} with {:?}: partition is empty.",
@ -404,51 +378,29 @@ where
); );
return Ok(()); return Ok(());
} }
let root_ck_hash = hash_of::<MerkleNode>(&root_ck)?;
let root_ck_hash = hash_of(&root_ck)?; // Check if they have the same root checksum
// If so, do nothing.
// If their root checksum has level > than us, use that as a reference
let root_resp = self let root_resp = self
.rpc_client .rpc_client
.call( .call(
who, who,
SyncRPC::RootCkHash(partition.range, root_ck_hash), SyncRPC::RootCkHash(partition.partition, root_ck_hash),
TABLE_SYNC_RPC_TIMEOUT, TABLE_SYNC_RPC_TIMEOUT,
) )
.await?; .await?;
let mut todo = match root_resp { let mut todo = match root_resp {
SyncRPC::CkNoDifference => { SyncRPC::RootCkDifferent(false) => {
debug!( debug!(
"({}) Sync {:?} with {:?}: no difference", "({}) Sync {:?} with {:?}: no difference",
self.data.name, partition, who self.data.name, partition, who
); );
return Ok(()); return Ok(());
} }
SyncRPC::RootCkList(_, their_root_ck) => { SyncRPC::RootCkDifferent(true) => {
let join = join_ordered(&root_ck[..], &their_root_ck[..]); VecDeque::from(vec![root_ck_key])
let mut todo = VecDeque::new();
for (p, v1, v2) in join.iter() {
let diff = match (v1, v2) {
(Some(_), None) | (None, Some(_)) => true,
(Some(a), Some(b)) => a != b,
_ => false,
};
if diff {
todo.push_back(MerkleNodeKey {
partition: **p,
prefix: vec![],
});
}
}
debug!(
"({}) Sync {:?} with {:?}: todo.len() = {}",
self.data.name,
partition,
who,
todo.len()
);
todo
} }
x => { x => {
return Err(Error::Message(format!( return Err(Error::Message(format!(
@ -565,13 +517,9 @@ where
async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> { async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> {
match message { match message {
SyncRPC::RootCkHash(range, h) => { SyncRPC::RootCkHash(range, h) => {
let root_ck = self.get_root_ck(*range)?; let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
let hash = hash_of(&root_ck)?; let hash = hash_of::<MerkleNode>(&root_ck)?;
if hash == *h { Ok(SyncRPC::RootCkDifferent(hash != *h))
Ok(SyncRPC::CkNoDifference)
} else {
Ok(SyncRPC::RootCkList(*range, root_ck))
}
} }
SyncRPC::GetNode(k) => { SyncRPC::GetNode(k) => {
let node = self.merkle.read_node(&k)?; let node = self.merkle.read_node(&k)?;
@ -596,39 +544,31 @@ impl SyncTodo {
self.todo.clear(); self.todo.clear();
let ring = system.ring.borrow().clone(); let partitions = data.replication.partitions();
let split_points = data.replication.split_points(&ring);
for i in 0..split_points.len() { for i in 0..partitions.len() {
let begin: MerklePartition = { let begin = partitions[i].1;
let b = split_points[i];
assert_eq!(b.as_slice()[2..], [0u8; 30][..]);
b.as_slice()[..2].try_into().unwrap()
};
let end: Option<MerklePartition> = if i + 1 < split_points.len() { let end = if i + 1 < partitions.len() {
let e = split_points[i + 1]; partitions[i+1].1
assert_eq!(e.as_slice()[2..], [0u8; 30][..]);
Some(e.as_slice()[..2].try_into().unwrap())
} else { } else {
None [0xFFu8; 32].into()
}; };
let begin_hash = hash_of_merkle_partition(begin); let nodes = data.replication.write_nodes(&begin);
let end_hash = hash_of_merkle_partition_opt(end);
let nodes = data.replication.write_nodes(&begin_hash);
let retain = nodes.contains(&my_id); let retain = nodes.contains(&my_id);
if !retain { if !retain {
// Check if we have some data to send, otherwise skip // Check if we have some data to send, otherwise skip
if data.store.range(begin_hash..end_hash).next().is_none() { if data.store.range(begin..end).next().is_none() {
continue; continue;
} }
} }
self.todo.push(TodoPartition { self.todo.push(TodoPartition {
range: PartitionRange { begin, end }, partition: partitions[i].0,
begin,
end,
retain, retain,
}); });
} }