Add wrapper over sled tree to count items (used for big queues)

This commit is contained in:
Alex 2022-02-24 14:59:49 +01:00
parent 50096bc460
commit 34a557e6a6
Signed by: lx
GPG key ID: 0E496D15096376BE
8 changed files with 118 additions and 7 deletions

View file

@ -22,6 +22,7 @@ use opentelemetry::{
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::metrics::RecordDuration; use garage_util::metrics::RecordDuration;
use garage_util::sled_counter::SledCountedTree;
use garage_util::time::*; use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer; use garage_util::tranquilizer::Tranquilizer;
@ -155,7 +156,7 @@ pub struct BlockManager {
rc: sled::Tree, rc: sled::Tree,
resync_queue: sled::Tree, resync_queue: SledCountedTree,
resync_notify: Notify, resync_notify: Notify,
system: Arc<System>, system: Arc<System>,
@ -184,6 +185,7 @@ impl BlockManager {
let resync_queue = db let resync_queue = db
.open_tree("block_local_resync_queue") .open_tree("block_local_resync_queue")
.expect("Unable to open block_local_resync_queue tree"); .expect("Unable to open block_local_resync_queue tree");
let resync_queue = SledCountedTree::new(resync_queue);
let endpoint = system let endpoint = system
.netapp .netapp

View file

@ -1,5 +1,7 @@
use opentelemetry::{global, metrics::*}; use opentelemetry::{global, metrics::*};
use garage_util::sled_counter::SledCountedTree;
/// TableMetrics reference all counter used for metrics /// TableMetrics reference all counter used for metrics
pub struct BlockManagerMetrics { pub struct BlockManagerMetrics {
pub(crate) _resync_queue_len: ValueObserver<u64>, pub(crate) _resync_queue_len: ValueObserver<u64>,
@ -20,7 +22,7 @@ pub struct BlockManagerMetrics {
} }
impl BlockManagerMetrics { impl BlockManagerMetrics {
pub fn new(resync_queue: sled::Tree) -> Self { pub fn new(resync_queue: SledCountedTree) -> Self {
let meter = global::meter("garage_model/block"); let meter = global::meter("garage_model/block");
Self { Self {
_resync_queue_len: meter _resync_queue_len: meter

View file

@ -134,7 +134,11 @@ impl RpcHelper {
M: Rpc<Response = Result<S, Error>>, M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>, H: EndpointHandler<M>,
{ {
let metric_tags = [KeyValue::new("rpc_endpoint", endpoint.path().to_string())]; let metric_tags = [
KeyValue::new("rpc_endpoint", endpoint.path().to_string()),
KeyValue::new("from", format!("{:?}", self.0.our_node_id)),
KeyValue::new("to", format!("{:?}", to)),
];
let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32; let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
let permit = self let permit = self
@ -245,6 +249,7 @@ impl RpcHelper {
) )
}; };
let mut span = tracer.start(span_name); let mut span = tracer.start(span_name);
span.set_attribute(KeyValue::new("from", format!("{:?}", self.0.our_node_id)));
span.set_attribute(KeyValue::new("to", format!("{:?}", to))); span.set_attribute(KeyValue::new("to", format!("{:?}", to)));
span.set_attribute(KeyValue::new("quorum", quorum as i64)); span.set_attribute(KeyValue::new("quorum", quorum as i64));
span.set_attribute(KeyValue::new( span.set_attribute(KeyValue::new(

View file

@ -7,6 +7,7 @@ use tokio::sync::Notify;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::sled_counter::SledCountedTree;
use garage_rpc::system::System; use garage_rpc::system::System;
@ -27,7 +28,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) merkle_tree: sled::Tree, pub(crate) merkle_tree: sled::Tree,
pub(crate) merkle_todo: sled::Tree, pub(crate) merkle_todo: sled::Tree,
pub(crate) merkle_todo_notify: Notify, pub(crate) merkle_todo_notify: Notify,
pub(crate) gc_todo: sled::Tree, pub(crate) gc_todo: SledCountedTree,
pub(crate) metrics: TableMetrics, pub(crate) metrics: TableMetrics,
} }
@ -52,6 +53,7 @@ where
let gc_todo = db let gc_todo = db
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open DB tree"); .expect("Unable to open DB tree");
let gc_todo = SledCountedTree::new(gc_todo);
let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone()); let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone());

View file

@ -14,6 +14,7 @@ use tokio::sync::watch;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::sled_counter::SledCountedTree;
use garage_util::time::*; use garage_util::time::*;
use garage_rpc::system::System; use garage_rpc::system::System;
@ -362,7 +363,7 @@ impl GcTodoEntry {
} }
/// Saves the GcTodoEntry in the gc_todo tree /// Saves the GcTodoEntry in the gc_todo tree
pub(crate) fn save(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> { pub(crate) fn save(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> {
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?; gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
Ok(()) Ok(())
} }
@ -372,7 +373,7 @@ impl GcTodoEntry {
/// This is usefull to remove a todo entry only under the condition /// This is usefull to remove a todo entry only under the condition
/// that it has not changed since the time it was read, i.e. /// that it has not changed since the time it was read, i.e.
/// what we have to do is still the same /// what we have to do is still the same
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> { pub(crate) fn remove_if_equal(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> {
let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>( let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>(
&self.todo_table_key()[..], &self.todo_table_key()[..],
Some(self.value_hash), Some(self.value_hash),

View file

@ -1,5 +1,7 @@
use opentelemetry::{global, metrics::*, KeyValue}; use opentelemetry::{global, metrics::*, KeyValue};
use garage_util::sled_counter::SledCountedTree;
/// TableMetrics reference all counter used for metrics /// TableMetrics reference all counter used for metrics
pub struct TableMetrics { pub struct TableMetrics {
pub(crate) _merkle_todo_len: ValueObserver<u64>, pub(crate) _merkle_todo_len: ValueObserver<u64>,
@ -17,7 +19,11 @@ pub struct TableMetrics {
pub(crate) sync_items_received: Counter<u64>, pub(crate) sync_items_received: Counter<u64>,
} }
impl TableMetrics { impl TableMetrics {
pub fn new(table_name: &'static str, merkle_todo: sled::Tree, gc_todo: sled::Tree) -> Self { pub fn new(
table_name: &'static str,
merkle_todo: sled::Tree,
gc_todo: SledCountedTree,
) -> Self {
let meter = global::meter(table_name); let meter = global::meter(table_name);
TableMetrics { TableMetrics {
_merkle_todo_len: meter _merkle_todo_len: meter

View file

@ -10,6 +10,7 @@ pub mod data;
pub mod error; pub mod error;
pub mod metrics; pub mod metrics;
pub mod persister; pub mod persister;
pub mod sled_counter;
pub mod time; pub mod time;
pub mod token_bucket; pub mod token_bucket;
pub mod tranquilizer; pub mod tranquilizer;

92
src/util/sled_counter.rs Normal file
View file

@ -0,0 +1,92 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use sled::{CompareAndSwapError, IVec, Iter, Result, Tree};
#[derive(Clone)]
pub struct SledCountedTree(Arc<SledCountedTreeInternal>);
struct SledCountedTreeInternal {
tree: Tree,
len: AtomicUsize,
}
impl SledCountedTree {
pub fn new(tree: Tree) -> Self {
let len = tree.len();
Self(Arc::new(SledCountedTreeInternal {
tree,
len: AtomicUsize::new(len),
}))
}
pub fn len(&self) -> usize {
self.0.len.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.0.tree.is_empty()
}
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
self.0.tree.get(key)
}
pub fn iter(&self) -> Iter {
self.0.tree.iter()
}
// ---- writing functions ----
pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<IVec>>
where
K: AsRef<[u8]>,
V: Into<IVec>,
{
let res = self.0.tree.insert(key, value);
if res == Ok(None) {
self.0.len.fetch_add(1, Ordering::Relaxed);
}
res
}
pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> {
let res = self.0.tree.pop_min();
if let Ok(Some(_)) = &res {
self.0.len.fetch_sub(1, Ordering::Relaxed);
};
res
}
pub fn compare_and_swap<K, OV, NV>(
&self,
key: K,
old: Option<OV>,
new: Option<NV>,
) -> Result<std::result::Result<(), CompareAndSwapError>>
where
K: AsRef<[u8]>,
OV: AsRef<[u8]>,
NV: Into<IVec>,
{
let old_some = old.is_some();
let new_some = new.is_some();
let res = self.0.tree.compare_and_swap(key, old, new);
if res == Ok(Ok(())) {
match (old_some, new_some) {
(false, true) => {
self.0.len.fetch_add(1, Ordering::Relaxed);
}
(true, false) => {
self.0.len.fetch_sub(1, Ordering::Relaxed);
}
_ => (),
}
}
res
}
}