improve internal item counter mechanisms and implement bucket quotas #326

Merged
lx merged 23 commits from counters into main 2022-06-15 18:20:31 +00:00
4 changed files with 13 additions and 12 deletions
Showing only changes of commit bc29d77ed3 - Show all commits

View file

@ -52,8 +52,8 @@ pub struct Garage {
/// Table containing S3 objects
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
/// Counting table containing object counters
pub object_counter_table: Arc<IndexCounter<Object>>,
/// Counting table containing object counters for each bucket
pub object_counter_table: Arc<IndexCounter<Object, TableFullReplication>>,
/// Table containing S3 object versions
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
/// Table containing S3 block references (not blocks themselves)
@ -68,7 +68,7 @@ pub struct GarageK2V {
/// Table containing K2V items
pub item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
/// Indexing table containing K2V item counters
pub counter_table: Arc<IndexCounter<K2VItem>>,
pub counter_table: Arc<IndexCounter<K2VItem, TableShardedReplication>>,
/// K2V RPC handler
pub rpc: Arc<K2VRpcHandler>,
}
@ -176,7 +176,7 @@ impl Garage {
&db,
);
info!("Initialize key_table_table...");
let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db);
let key_table = Table::new(KeyTable, control_rep_param.clone(), system.clone(), &db);
// ---- S3 tables ----
info!("Initialize block_ref_table...");
@ -201,7 +201,7 @@ impl Garage {
);
info!("Initialize object counter table...");
let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
let object_counter_table = IndexCounter::new(system.clone(), control_rep_param, &db);
info!("Initialize object_table...");
#[allow(clippy::redundant_clone)]

View file

@ -138,17 +138,17 @@ impl<T: CountedItem> TableSchema for CounterTable<T> {
// ----
pub struct IndexCounter<T: CountedItem> {
pub struct IndexCounter<T: CountedItem, R: TableReplication + 'static> {
this_node: Uuid,
local_counter: db::Tree,
propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>,
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
pub table: Arc<Table<CounterTable<T>, R>>,
}
impl<T: CountedItem> IndexCounter<T> {
impl<T: CountedItem, R: TableReplication + 'static> IndexCounter<T, R> {
pub fn new(
system: Arc<System>,
replication: TableShardedReplication,
replication: R,
db: &db::Db,
) -> Arc<Self> {
let background = system.background.clone();

View file

@ -6,6 +6,7 @@ use garage_db as db;
use garage_util::data::*;
use garage_table::crdt::*;
use garage_table::replication::*;
use garage_table::*;
use crate::index_counter::*;
@ -187,7 +188,7 @@ impl Entry<K2VItemPartition, String> for K2VItem {
}
pub struct K2VItemTable {
pub(crate) counter_table: Arc<IndexCounter<K2VItem>>,
pub(crate) counter_table: Arc<IndexCounter<K2VItem, TableShardedReplication>>,
pub(crate) subscriptions: Arc<SubscriptionManager>,
}

View file

@ -8,7 +8,7 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_table::crdt::*;
use garage_table::replication::TableShardedReplication;
use garage_table::replication::*;
use garage_table::*;
use crate::index_counter::*;
@ -223,7 +223,7 @@ impl Crdt for Object {
pub struct ObjectTable {
pub background: Arc<BackgroundRunner>,
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
pub object_counter_table: Arc<IndexCounter<Object>>,
pub object_counter_table: Arc<IndexCounter<Object, TableFullReplication>>,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]