From 426d8784dac0e39879af52d980887d3692fc907c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 15:08:37 +0100 Subject: [PATCH] cleanup --- src/api/router_macros.rs | 1 + src/model/bucket_alias_table.rs | 6 +++--- src/model/bucket_table.rs | 7 ++++--- src/model/index_counter.rs | 27 +++++++++++++++--------- src/model/s3/object_table.rs | 3 --- src/table/data.rs | 6 +----- src/table/gc.rs | 32 ++++++----------------------- src/table/merkle.rs | 17 +++------------ src/table/queue.rs | 10 +++------ src/table/replication/parameters.rs | 2 +- src/table/schema.rs | 17 ++++++--------- src/table/sync.rs | 20 ++++++------------ src/table/table.rs | 14 +++---------- 13 files changed, 54 insertions(+), 108 deletions(-) diff --git a/src/api/router_macros.rs b/src/api/router_macros.rs index 959e69a3..07b5570c 100644 --- a/src/api/router_macros.rs +++ b/src/api/router_macros.rs @@ -145,6 +145,7 @@ macro_rules! generateQueryParameters { ) => { #[derive(Debug)] #[allow(non_camel_case_types)] + #[allow(clippy::upper_case_acronyms)] enum Keyword { EMPTY, $( $kw_name, )* diff --git a/src/model/bucket_alias_table.rs b/src/model/bucket_alias_table.rs index d07394f6..54d7fbad 100644 --- a/src/model/bucket_alias_table.rs +++ b/src/model/bucket_alias_table.rs @@ -1,12 +1,12 @@ -use serde::{Deserialize, Serialize}; - use garage_util::data::*; use garage_table::crdt::*; use garage_table::*; mod v08 { - use super::*; + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; /// The bucket alias table holds the names given to buckets /// in the global namespace. diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 38ed88ee..ac163736 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -1,5 +1,3 @@ -use serde::{Deserialize, Serialize}; - use garage_table::crdt::*; use garage_table::*; use garage_util::data::*; @@ -8,7 +6,10 @@ use garage_util::time::*; use crate::permission::BucketKeyPerm; mod v08 { - use super::*; + use crate::permission::BucketKeyPerm; + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; /// A bucket is a collection of objects /// diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index c3ed29c7..3cd3083a 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -31,7 +31,12 @@ pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static { } mod v08 { - use super::*; + use super::CountedItem; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + use std::collections::BTreeMap; + + // ---- Global part (the table everyone queries) ---- /// A counter entry in the global table #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] @@ -48,6 +53,17 @@ mod v08 { } impl garage_util::migrate::InitialFormat for CounterEntry {} + + // ---- Local part (the counter we maintain transactionnaly on each node) ---- + + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] + pub(super) struct LocalCounterEntry { + pub(super) pk: T::CP, + pub(super) sk: T::CS, + pub(super) values: BTreeMap, + } + + impl garage_util::migrate::InitialFormat for LocalCounterEntry {} } pub use v08::*; @@ -358,15 +374,6 @@ impl IndexCounter { // ---- -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -struct LocalCounterEntry { - pk: T::CP, - sk: T::CS, - values: BTreeMap, -} - -impl garage_util::migrate::InitialFormat for LocalCounterEntry {} - impl LocalCounterEntry { fn into_counter_entry(self, this_node: Uuid) -> CounterEntry { CounterEntry { diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 616e0d35..518acc95 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -380,6 +380,3 @@ impl CountedItem for Object { ] } } - -// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv -// (we just want to change bucket into bucket_id by hashing it) diff --git a/src/table/data.rs b/src/table/data.rs index f93ed00d..5c792f1f 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -41,11 +41,7 @@ pub struct TableData { pub(crate) metrics: TableMetrics, } -impl TableData -where - F: TableSchema, - R: TableReplication, -{ +impl TableData { pub fn new(system: Arc, instance: F, replication: R, db: &db::Db) -> Arc { let store = db .open_tree(&format!("{}:table", F::TABLE_NAME)) diff --git a/src/table/gc.rs b/src/table/gc.rs index 90594fba..5b9124a7 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -31,7 +31,7 @@ const TABLE_GC_BATCH_SIZE: usize = 1024; // and the moment the garbage collection actually happens) const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600); -pub(crate) struct TableGc { +pub(crate) struct TableGc { system: Arc, data: Arc>, @@ -49,11 +49,7 @@ impl Rpc for GcRpc { type Response = Result; } -impl TableGc -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl TableGc { pub(crate) fn new(system: Arc, data: Arc>) -> Arc { let endpoint = system .netapp @@ -277,11 +273,7 @@ where } #[async_trait] -impl EndpointHandler for TableGc -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl EndpointHandler for TableGc { async fn handle(self: &Arc, message: &GcRpc, _from: NodeID) -> Result { match message { GcRpc::Update(items) => { @@ -299,20 +291,12 @@ where } } -struct GcWorker -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +struct GcWorker { gc: Arc>, wait_delay: Duration, } -impl GcWorker -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl GcWorker { fn new(gc: Arc>) -> Self { Self { gc, @@ -322,11 +306,7 @@ where } #[async_trait] -impl Worker for GcWorker -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl Worker for GcWorker { fn name(&self) -> String { format!("{} GC", F::TABLE_NAME) } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 736354fa..2d593e6d 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -65,11 +65,7 @@ pub enum MerkleNode { Leaf(Vec, Hash), } -impl MerkleUpdater -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl MerkleUpdater { pub(crate) fn new(data: Arc>) -> Arc { let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); @@ -303,17 +299,10 @@ where } } -struct MerkleWorker(Arc>) -where - F: TableSchema + 'static, - R: TableReplication + 'static; +struct MerkleWorker(Arc>); #[async_trait] -impl Worker for MerkleWorker -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl Worker for MerkleWorker { fn name(&self) -> String { format!("{} Merkle", F::TABLE_NAME) } diff --git a/src/table/queue.rs b/src/table/queue.rs index 860f20d3..0857209b 100644 --- a/src/table/queue.rs +++ b/src/table/queue.rs @@ -16,15 +16,11 @@ const BATCH_SIZE: usize = 100; pub(crate) struct InsertQueueWorker(pub(crate) Arc>) where - F: TableSchema + 'static, - R: TableReplication + 'static; + F: TableSchema, + R: TableReplication; #[async_trait] -impl Worker for InsertQueueWorker -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl Worker for InsertQueueWorker { fn name(&self) -> String { format!("{} queue", F::TABLE_NAME) } diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 3740d947..f00815a2 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -2,7 +2,7 @@ use garage_rpc::ring::*; use garage_util::data::*; /// Trait to describe how a table shall be replicated -pub trait TableReplication: Send + Sync { +pub trait TableReplication: Send + Sync + 'static { // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods diff --git a/src/table/schema.rs b/src/table/schema.rs index 6538a32f..5cbf6c95 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -7,7 +7,9 @@ use garage_util::migrate::Migrate; use crate::crdt::Crdt; /// Trait for field used to partition data -pub trait PartitionKey { +pub trait PartitionKey: + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static +{ /// Get the key used to partition fn hash(&self) -> Hash; } @@ -28,7 +30,7 @@ impl PartitionKey for FixedBytes32 { } /// Trait for field used to sort data -pub trait SortKey { +pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static { /// Get the key used to sort fn sort_key(&self) -> &[u8]; } @@ -66,16 +68,9 @@ pub trait TableSchema: Send + Sync + 'static { const TABLE_NAME: &'static str; /// The partition key used in that table - type P: PartitionKey - + Clone - + PartialEq - + Serialize - + for<'de> Deserialize<'de> - + Send - + Sync - + 'static; + type P: PartitionKey; /// The sort key used int that table - type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static; + type S: SortKey; /// They type for an entry in that table type E: Entry; diff --git a/src/table/sync.rs b/src/table/sync.rs index abc034f8..29e7aa89 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -28,7 +28,7 @@ use crate::*; // Do anti-entropy every 10 minutes const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); -pub struct TableSyncer { +pub struct TableSyncer { system: Arc, data: Arc>, merkle: Arc>, @@ -61,11 +61,7 @@ struct TodoPartition { retain: bool, } -impl TableSyncer -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl TableSyncer { pub(crate) fn new( system: Arc, data: Arc>, @@ -459,11 +455,7 @@ where // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== #[async_trait] -impl EndpointHandler for TableSyncer -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl EndpointHandler for TableSyncer { async fn handle(self: &Arc, message: &SyncRpc, from: NodeID) -> Result { match message { SyncRpc::RootCkHash(range, h) => { @@ -497,7 +489,7 @@ where // -------- Sync Worker --------- -struct SyncWorker { +struct SyncWorker { syncer: Arc>, ring_recv: watch::Receiver>, ring: Arc, @@ -506,7 +498,7 @@ struct SyncWorker { next_full_sync: Instant, } -impl SyncWorker { +impl SyncWorker { fn add_full_sync(&mut self) { let system = &self.syncer.system; let data = &self.syncer.data; @@ -572,7 +564,7 @@ impl SyncWorker { } #[async_trait] -impl Worker for SyncWorker { +impl Worker for SyncWorker { fn name(&self) -> String { format!("{} sync", F::TABLE_NAME) } diff --git a/src/table/table.rs b/src/table/table.rs index 7f158314..7ad79677 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -33,7 +33,7 @@ use crate::schema::*; use crate::sync::*; use crate::util::*; -pub struct Table { +pub struct Table { pub system: Arc, pub data: Arc>, pub merkle_updater: Arc>, @@ -65,11 +65,7 @@ impl Rpc for TableRpc { type Response = Result, Error>; } -impl Table -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl Table { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== pub fn new(instance: F, replication: R, system: Arc, db: &db::Db) -> Arc { @@ -428,11 +424,7 @@ where } #[async_trait] -impl EndpointHandler> for Table -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl EndpointHandler> for Table { async fn handle( self: &Arc, msg: &TableRpc,