attempt at documenting table crate

This commit is contained in:
Trinity Pointard 2021-03-26 19:41:46 +01:00
parent b476b702c8
commit 30bec0758b
16 changed files with 65 additions and 32 deletions

View file

@ -18,7 +18,7 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::*; use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*; use garage_rpc::rpc_server::*;
use garage_table::replication::{sharded::TableShardedReplication, TableReplication}; use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block_ref_table::*; use crate::block_ref_table::*;

View file

@ -100,6 +100,10 @@ impl TableSchema for BucketTable {
type E = Bucket; type E = Bucket;
type Filter = DeletedFilter; type Filter = DeletedFilter;
fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {
// nothing to do when updated
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
filter.apply(entry.is_deleted()) filter.apply(entry.is_deleted())
} }

View file

@ -7,8 +7,8 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::RpcHttpClient; use garage_rpc::rpc_client::RpcHttpClient;
use garage_rpc::rpc_server::RpcServer; use garage_rpc::rpc_server::RpcServer;
use garage_table::replication::fullcopy::*; use garage_table::replication::TableFullReplication;
use garage_table::replication::sharded::*; use garage_table::replication::TableShardedReplication;
use garage_table::*; use garage_table::*;
use crate::block::*; use crate::block::*;

View file

@ -113,6 +113,10 @@ impl TableSchema for KeyTable {
type E = Key; type E = Key;
type Filter = KeyFilter; type Filter = KeyFilter;
fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {
// nothing to do when updated
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
match filter { match filter {
KeyFilter::Deleted(df) => df.apply(entry.deleted.get()), KeyFilter::Deleted(df) => df.apply(entry.deleted.get()),

View file

@ -6,7 +6,7 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_table::crdt::*; use garage_table::crdt::*;
use garage_table::replication::sharded::*; use garage_table::replication::TableShardedReplication;
use garage_table::*; use garage_table::*;
use crate::version_table::*; use crate::version_table::*;

View file

@ -5,7 +5,7 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_table::crdt::*; use garage_table::crdt::*;
use garage_table::replication::sharded::*; use garage_table::replication::TableShardedReplication;
use garage_table::*; use garage_table::*;
use crate::block_ref_table::*; use crate::block_ref_table::*;

View file

@ -34,7 +34,7 @@ use crate::crdt::crdt::*;
/// and may differ from what you observed with your atomic clock! /// and may differ from what you observed with your atomic clock!
/// ///
/// This scheme is used by AWS S3 or Soundcloud and often without knowing /// This scheme is used by AWS S3 or Soundcloud and often without knowing
/// in entreprise when reconciliating databases with ad-hoc scripts. /// in enterprise when reconciliating databases with ad-hoc scripts.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LWW<T> { pub struct LWW<T> {
ts: u64, ts: u64,

View file

@ -37,6 +37,7 @@ where
Self { vals: vec![(k, v)] } Self { vals: vec![(k, v)] }
} }
/// Add a value to the map
pub fn put(&mut self, k: K, v: V) { pub fn put(&mut self, k: K, v: V) {
self.merge(&Self::put_mutator(k, v)); self.merge(&Self::put_mutator(k, v));
} }

View file

@ -74,7 +74,7 @@ where
while !*must_exit.borrow() { while !*must_exit.borrow() {
match self.gc_loop_iter().await { match self.gc_loop_iter().await {
Ok(true) => { Ok(true) => {
// Stuff was done, loop imediately // Stuff was done, loop immediately
continue; continue;
} }
Ok(false) => { Ok(false) => {

View file

@ -1,3 +1,4 @@
#![warn(missing_docs)]
#![recursion_limit = "1024"] #![recursion_limit = "1024"]
#[macro_use] #[macro_use]
@ -8,10 +9,10 @@ pub mod schema;
pub mod util; pub mod util;
pub mod data; pub mod data;
pub mod gc; mod gc;
pub mod merkle; mod merkle;
pub mod replication; pub mod replication;
pub mod sync; mod sync;
pub mod table; pub mod table;
pub use schema::*; pub use schema::*;

View file

@ -6,19 +6,19 @@ use garage_util::data::*;
use crate::replication::*; use crate::replication::*;
/// Full replication schema: all nodes store everything
/// Writes are disseminated in an epidemic manner in the network
/// Advantage: do all reads locally, extremely fast
/// Inconvenient: only suitable to reasonably small tables
#[derive(Clone)] #[derive(Clone)]
pub struct TableFullReplication { pub struct TableFullReplication {
/// The membership manager of this node
pub system: Arc<System>, pub system: Arc<System>,
/// Max number of faults allowed while replicating a record
pub max_faults: usize, pub max_faults: usize,
} }
impl TableReplication for TableFullReplication { impl TableReplication for TableFullReplication {
// Full replication schema: all nodes store everything
// Writes are disseminated in an epidemic manner in the network
// Advantage: do all reads locally, extremely fast
// Inconvenient: only suitable to reasonably small tables
fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> { fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> {
vec![self.system.id] vec![self.system.id]
} }

View file

@ -1,6 +1,8 @@
mod parameters; mod parameters;
pub mod fullcopy; mod fullcopy;
pub mod sharded; mod sharded;
pub use fullcopy::TableFullReplication;
pub use parameters::*; pub use parameters::*;
pub use sharded::TableShardedReplication;

View file

@ -2,20 +2,26 @@ use garage_rpc::ring::*;
use garage_util::data::*; use garage_util::data::*;
/// Trait to describe how a table shall be replicated
pub trait TableReplication: Send + Sync { pub trait TableReplication: Send + Sync {
// See examples in table_sharded.rs and table_fullcopy.rs // See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods // To understand various replication methods
// Which nodes to send reads from /// Which nodes to send read requests to
fn read_nodes(&self, hash: &Hash) -> Vec<UUID>; fn read_nodes(&self, hash: &Hash) -> Vec<UUID>;
/// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize; fn read_quorum(&self) -> usize;
// Which nodes to send writes to /// Which nodes to send writes to
fn write_nodes(&self, hash: &Hash) -> Vec<UUID>; fn write_nodes(&self, hash: &Hash) -> Vec<UUID>;
/// Responses needed to consider a write succesfull
fn write_quorum(&self) -> usize; fn write_quorum(&self) -> usize;
// this feels like its write_nodes().len() - write_quorum()
fn max_write_errors(&self) -> usize; fn max_write_errors(&self) -> usize;
// Accessing partitions, for Merkle tree & sync // Accessing partitions, for Merkle tree & sync
/// Get partition for data with given hash
fn partition_of(&self, hash: &Hash) -> Partition; fn partition_of(&self, hash: &Hash) -> Partition;
/// List of existing partitions
fn partitions(&self) -> Vec<(Partition, Hash)>; fn partitions(&self) -> Vec<(Partition, Hash)>;
} }

View file

@ -6,22 +6,25 @@ use garage_util::data::*;
use crate::replication::*; use crate::replication::*;
/// Sharded replication schema:
/// - based on the ring of nodes, a certain set of neighbors
/// store entries, given as a function of the position of the
/// entry's hash in the ring
/// - reads are done on all of the nodes that replicate the data
/// - writes as well
#[derive(Clone)] #[derive(Clone)]
pub struct TableShardedReplication { pub struct TableShardedReplication {
/// The membership manager of this node
pub system: Arc<System>, pub system: Arc<System>,
/// How many time each data should be replicated
pub replication_factor: usize, pub replication_factor: usize,
/// How many nodes to contact for a read, should be at most `replication_factor`
pub read_quorum: usize, pub read_quorum: usize,
/// How many nodes to contact for a write, should be at most `replication_factor`
pub write_quorum: usize, pub write_quorum: usize,
} }
impl TableReplication for TableShardedReplication { impl TableReplication for TableShardedReplication {
// Sharded replication schema:
// - based on the ring of nodes, a certain set of neighbors
// store entries, given as a function of the position of the
// entry's hash in the ring
// - reads are done on all of the nodes that replicate the data
// - writes as well
fn read_nodes(&self, hash: &Hash) -> Vec<UUID> { fn read_nodes(&self, hash: &Hash) -> Vec<UUID> {
let ring = self.system.ring.borrow().clone(); let ring = self.system.ring.borrow().clone();
ring.walk_ring(&hash, self.replication_factor) ring.walk_ring(&hash, self.replication_factor)

View file

@ -4,7 +4,9 @@ use garage_util::data::*;
use crate::crdt::CRDT; use crate::crdt::CRDT;
/// Trait for partitionnable data
pub trait PartitionKey { pub trait PartitionKey {
/// Get the key used to partition
fn hash(&self) -> Hash; fn hash(&self) -> Hash;
} }
@ -20,7 +22,9 @@ impl PartitionKey for Hash {
} }
} }
/// Trait for sortable data
pub trait SortKey { pub trait SortKey {
/// Get the key used to sort
fn sort_key(&self) -> &[u8]; fn sort_key(&self) -> &[u8];
} }
@ -36,25 +40,34 @@ impl SortKey for Hash {
} }
} }
/// Trait for an entry in a table. It must be sortable and partitionnable.
pub trait Entry<P: PartitionKey, S: SortKey>: pub trait Entry<P: PartitionKey, S: SortKey>:
CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
{ {
/// Get the key used to partition
fn partition_key(&self) -> &P; fn partition_key(&self) -> &P;
/// Get the key used to sort
fn sort_key(&self) -> &S; fn sort_key(&self) -> &S;
/// Is the entry a tombstone? Default implementation always return false
fn is_tombstone(&self) -> bool { fn is_tombstone(&self) -> bool {
false false
} }
} }
/// Trait for the schema used in a table
pub trait TableSchema: Send + Sync { pub trait TableSchema: Send + Sync {
/// The partition key used in that table
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
/// The sort key used int that table
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
/// They type for an entry in that table
type E: Entry<Self::P, Self::S>; type E: Entry<Self::P, Self::S>;
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
// Action to take if not able to decode current version: // Action to take if not able to decode current version:
// try loading from an older version // try loading from an older version
/// Try migrating an entry from an older version
fn try_migrate(_bytes: &[u8]) -> Option<Self::E> { fn try_migrate(_bytes: &[u8]) -> Option<Self::E> {
None None
} }
@ -63,9 +76,7 @@ pub trait TableSchema: Send + Sync {
// as the update itself is an unchangeable fact that will never go back // as the update itself is an unchangeable fact that will never go back
// due to CRDT logic. Typically errors in propagation of info should be logged // due to CRDT logic. Typically errors in propagation of info should be logged
// to stderr. // to stderr.
fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {} fn updated(&self, old: Option<Self::E>, new: Option<Self::E>);
fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool;
true
}
} }

View file

@ -1,6 +1,7 @@
#[macro_use] #[macro_use]
extern crate log; extern crate log;
pub mod error; mod error;
pub use error::Error;
pub mod web_server; pub mod web_server;