This commit is contained in:
Alex 2023-01-03 15:08:37 +01:00
parent a81200d345
commit 426d8784da
Signed by: lx
GPG key ID: 0E496D15096376BE
13 changed files with 54 additions and 108 deletions

View file

@ -145,6 +145,7 @@ macro_rules! generateQueryParameters {
) => { ) => {
#[derive(Debug)] #[derive(Debug)]
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
#[allow(clippy::upper_case_acronyms)]
enum Keyword { enum Keyword {
EMPTY, EMPTY,
$( $kw_name, )* $( $kw_name, )*

View file

@ -1,12 +1,12 @@
use serde::{Deserialize, Serialize};
use garage_util::data::*; use garage_util::data::*;
use garage_table::crdt::*; use garage_table::crdt::*;
use garage_table::*; use garage_table::*;
mod v08 { mod v08 {
use super::*; use garage_util::crdt;
use garage_util::data::Uuid;
use serde::{Deserialize, Serialize};
/// The bucket alias table holds the names given to buckets /// The bucket alias table holds the names given to buckets
/// in the global namespace. /// in the global namespace.

View file

@ -1,5 +1,3 @@
use serde::{Deserialize, Serialize};
use garage_table::crdt::*; use garage_table::crdt::*;
use garage_table::*; use garage_table::*;
use garage_util::data::*; use garage_util::data::*;
@ -8,7 +6,10 @@ use garage_util::time::*;
use crate::permission::BucketKeyPerm; use crate::permission::BucketKeyPerm;
mod v08 { mod v08 {
use super::*; use crate::permission::BucketKeyPerm;
use garage_util::crdt;
use garage_util::data::Uuid;
use serde::{Deserialize, Serialize};
/// A bucket is a collection of objects /// A bucket is a collection of objects
/// ///

View file

@ -31,7 +31,12 @@ pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
} }
mod v08 { mod v08 {
use super::*; use super::CountedItem;
use garage_util::data::Uuid;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
// ---- Global part (the table everyone queries) ----
/// A counter entry in the global table /// A counter entry in the global table
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
@ -48,6 +53,17 @@ mod v08 {
} }
impl<T: CountedItem> garage_util::migrate::InitialFormat for CounterEntry<T> {} impl<T: CountedItem> garage_util::migrate::InitialFormat for CounterEntry<T> {}
// ---- Local part (the counter we maintain transactionnaly on each node) ----
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub(super) struct LocalCounterEntry<T: CountedItem> {
pub(super) pk: T::CP,
pub(super) sk: T::CS,
pub(super) values: BTreeMap<String, (u64, i64)>,
}
impl<T: CountedItem> garage_util::migrate::InitialFormat for LocalCounterEntry<T> {}
} }
pub use v08::*; pub use v08::*;
@ -358,15 +374,6 @@ impl<T: CountedItem> IndexCounter<T> {
// ---- // ----
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
struct LocalCounterEntry<T: CountedItem> {
pk: T::CP,
sk: T::CS,
values: BTreeMap<String, (u64, i64)>,
}
impl<T: CountedItem> garage_util::migrate::InitialFormat for LocalCounterEntry<T> {}
impl<T: CountedItem> LocalCounterEntry<T> { impl<T: CountedItem> LocalCounterEntry<T> {
fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> { fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> {
CounterEntry { CounterEntry {

View file

@ -380,6 +380,3 @@ impl CountedItem for Object {
] ]
} }
} }
// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv
// (we just want to change bucket into bucket_id by hashing it)

View file

@ -41,11 +41,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) metrics: TableMetrics, pub(crate) metrics: TableMetrics,
} }
impl<F, R> TableData<F, R> impl<F: TableSchema, R: TableReplication> TableData<F, R> {
where
F: TableSchema,
R: TableReplication,
{
pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> { pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> {
let store = db let store = db
.open_tree(&format!("{}:table", F::TABLE_NAME)) .open_tree(&format!("{}:table", F::TABLE_NAME))

View file

@ -31,7 +31,7 @@ const TABLE_GC_BATCH_SIZE: usize = 1024;
// and the moment the garbage collection actually happens) // and the moment the garbage collection actually happens)
const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600); const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600);
pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> { pub(crate) struct TableGc<F: TableSchema, R: TableReplication> {
system: Arc<System>, system: Arc<System>,
data: Arc<TableData<F, R>>, data: Arc<TableData<F, R>>,
@ -49,11 +49,7 @@ impl Rpc for GcRpc {
type Response = Result<GcRpc, Error>; type Response = Result<GcRpc, Error>;
} }
impl<F, R> TableGc<F, R> impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
pub(crate) fn new(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> { pub(crate) fn new(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
let endpoint = system let endpoint = system
.netapp .netapp
@ -277,11 +273,7 @@ where
} }
#[async_trait] #[async_trait]
impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R> impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> {
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> { async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> {
match message { match message {
GcRpc::Update(items) => { GcRpc::Update(items) => {
@ -299,20 +291,12 @@ where
} }
} }
struct GcWorker<F, R> struct GcWorker<F: TableSchema, R: TableReplication> {
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
gc: Arc<TableGc<F, R>>, gc: Arc<TableGc<F, R>>,
wait_delay: Duration, wait_delay: Duration,
} }
impl<F, R> GcWorker<F, R> impl<F: TableSchema, R: TableReplication> GcWorker<F, R> {
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
fn new(gc: Arc<TableGc<F, R>>) -> Self { fn new(gc: Arc<TableGc<F, R>>) -> Self {
Self { Self {
gc, gc,
@ -322,11 +306,7 @@ where
} }
#[async_trait] #[async_trait]
impl<F, R> Worker for GcWorker<F, R> impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> {
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
fn name(&self) -> String { fn name(&self) -> String {
format!("{} GC", F::TABLE_NAME) format!("{} GC", F::TABLE_NAME)
} }

View file

@ -65,11 +65,7 @@ pub enum MerkleNode {
Leaf(Vec<u8>, Hash), Leaf(Vec<u8>, Hash),
} }
impl<F, R> MerkleUpdater<F, R> impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> { pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> {
let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
@ -303,17 +299,10 @@ where
} }
} }
struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>) struct MerkleWorker<F: TableSchema, R: TableReplication>(Arc<MerkleUpdater<F, R>>);
where
F: TableSchema + 'static,
R: TableReplication + 'static;
#[async_trait] #[async_trait]
impl<F, R> Worker for MerkleWorker<F, R> impl<F: TableSchema, R: TableReplication> Worker for MerkleWorker<F, R> {
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
fn name(&self) -> String { fn name(&self) -> String {
format!("{} Merkle", F::TABLE_NAME) format!("{} Merkle", F::TABLE_NAME)
} }

View file

@ -16,15 +16,11 @@ const BATCH_SIZE: usize = 100;
pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>) pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>)
where where
F: TableSchema + 'static, F: TableSchema,
R: TableReplication + 'static; R: TableReplication;
#[async_trait] #[async_trait]
impl<F, R> Worker for InsertQueueWorker<F, R> impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> {
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
fn name(&self) -> String { fn name(&self) -> String {
format!("{} queue", F::TABLE_NAME) format!("{} queue", F::TABLE_NAME)
} }

View file

@ -2,7 +2,7 @@ use garage_rpc::ring::*;
use garage_util::data::*; use garage_util::data::*;
/// Trait to describe how a table shall be replicated /// Trait to describe how a table shall be replicated
pub trait TableReplication: Send + Sync { pub trait TableReplication: Send + Sync + 'static {
// See examples in table_sharded.rs and table_fullcopy.rs // See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods // To understand various replication methods

View file

@ -7,7 +7,9 @@ use garage_util::migrate::Migrate;
use crate::crdt::Crdt; use crate::crdt::Crdt;
/// Trait for field used to partition data /// Trait for field used to partition data
pub trait PartitionKey { pub trait PartitionKey:
Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static
{
/// Get the key used to partition /// Get the key used to partition
fn hash(&self) -> Hash; fn hash(&self) -> Hash;
} }
@ -28,7 +30,7 @@ impl PartitionKey for FixedBytes32 {
} }
/// Trait for field used to sort data /// Trait for field used to sort data
pub trait SortKey { pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
/// Get the key used to sort /// Get the key used to sort
fn sort_key(&self) -> &[u8]; fn sort_key(&self) -> &[u8];
} }
@ -66,16 +68,9 @@ pub trait TableSchema: Send + Sync + 'static {
const TABLE_NAME: &'static str; const TABLE_NAME: &'static str;
/// The partition key used in that table /// The partition key used in that table
type P: PartitionKey type P: PartitionKey;
+ Clone
+ PartialEq
+ Serialize
+ for<'de> Deserialize<'de>
+ Send
+ Sync
+ 'static;
/// The sort key used int that table /// The sort key used int that table
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static; type S: SortKey;
/// They type for an entry in that table /// They type for an entry in that table
type E: Entry<Self::P, Self::S>; type E: Entry<Self::P, Self::S>;

View file

@ -28,7 +28,7 @@ use crate::*;
// Do anti-entropy every 10 minutes // Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> { pub struct TableSyncer<F: TableSchema, R: TableReplication> {
system: Arc<System>, system: Arc<System>,
data: Arc<TableData<F, R>>, data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>, merkle: Arc<MerkleUpdater<F, R>>,
@ -61,11 +61,7 @@ struct TodoPartition {
retain: bool, retain: bool,
} }
impl<F, R> TableSyncer<F, R> impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
pub(crate) fn new( pub(crate) fn new(
system: Arc<System>, system: Arc<System>,
data: Arc<TableData<F, R>>, data: Arc<TableData<F, R>>,
@ -459,11 +455,7 @@ where
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
#[async_trait] #[async_trait]
impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R> impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> {
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> { async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> {
match message { match message {
SyncRpc::RootCkHash(range, h) => { SyncRpc::RootCkHash(range, h) => {
@ -497,7 +489,7 @@ where
// -------- Sync Worker --------- // -------- Sync Worker ---------
struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> { struct SyncWorker<F: TableSchema, R: TableReplication> {
syncer: Arc<TableSyncer<F, R>>, syncer: Arc<TableSyncer<F, R>>,
ring_recv: watch::Receiver<Arc<Ring>>, ring_recv: watch::Receiver<Arc<Ring>>,
ring: Arc<Ring>, ring: Arc<Ring>,
@ -506,7 +498,7 @@ struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
next_full_sync: Instant, next_full_sync: Instant,
} }
impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> { impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> {
fn add_full_sync(&mut self) { fn add_full_sync(&mut self) {
let system = &self.syncer.system; let system = &self.syncer.system;
let data = &self.syncer.data; let data = &self.syncer.data;
@ -572,7 +564,7 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
} }
#[async_trait] #[async_trait]
impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> { impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
fn name(&self) -> String { fn name(&self) -> String {
format!("{} sync", F::TABLE_NAME) format!("{} sync", F::TABLE_NAME)
} }

View file

@ -33,7 +33,7 @@ use crate::schema::*;
use crate::sync::*; use crate::sync::*;
use crate::util::*; use crate::util::*;
pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> { pub struct Table<F: TableSchema, R: TableReplication> {
pub system: Arc<System>, pub system: Arc<System>,
pub data: Arc<TableData<F, R>>, pub data: Arc<TableData<F, R>>,
pub merkle_updater: Arc<MerkleUpdater<F, R>>, pub merkle_updater: Arc<MerkleUpdater<F, R>>,
@ -65,11 +65,7 @@ impl<F: TableSchema> Rpc for TableRpc<F> {
type Response = Result<TableRpc<F>, Error>; type Response = Result<TableRpc<F>, Error>;
} }
impl<F, R> Table<F, R> impl<F: TableSchema, R: TableReplication> Table<F, R> {
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> { pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> {
@ -428,11 +424,7 @@ where
} }
#[async_trait] #[async_trait]
impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R> impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> {
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
async fn handle( async fn handle(
self: &Arc<Self>, self: &Arc<Self>,
msg: &TableRpc<F>, msg: &TableRpc<F>,