Add wrapper over sled tree to count items (used for big queues)
This commit is contained in:
parent
203e8d2c34
commit
2377a92f6b
8 changed files with 118 additions and 7 deletions
|
@ -22,6 +22,7 @@ use opentelemetry::{
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
use garage_util::metrics::RecordDuration;
|
use garage_util::metrics::RecordDuration;
|
||||||
|
use garage_util::sled_counter::SledCountedTree;
|
||||||
use garage_util::time::*;
|
use garage_util::time::*;
|
||||||
use garage_util::tranquilizer::Tranquilizer;
|
use garage_util::tranquilizer::Tranquilizer;
|
||||||
|
|
||||||
|
@ -155,7 +156,7 @@ pub struct BlockManager {
|
||||||
|
|
||||||
rc: sled::Tree,
|
rc: sled::Tree,
|
||||||
|
|
||||||
resync_queue: sled::Tree,
|
resync_queue: SledCountedTree,
|
||||||
resync_notify: Notify,
|
resync_notify: Notify,
|
||||||
|
|
||||||
system: Arc<System>,
|
system: Arc<System>,
|
||||||
|
@ -184,6 +185,7 @@ impl BlockManager {
|
||||||
let resync_queue = db
|
let resync_queue = db
|
||||||
.open_tree("block_local_resync_queue")
|
.open_tree("block_local_resync_queue")
|
||||||
.expect("Unable to open block_local_resync_queue tree");
|
.expect("Unable to open block_local_resync_queue tree");
|
||||||
|
let resync_queue = SledCountedTree::new(resync_queue);
|
||||||
|
|
||||||
let endpoint = system
|
let endpoint = system
|
||||||
.netapp
|
.netapp
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
use opentelemetry::{global, metrics::*};
|
use opentelemetry::{global, metrics::*};
|
||||||
|
|
||||||
|
use garage_util::sled_counter::SledCountedTree;
|
||||||
|
|
||||||
/// TableMetrics reference all counter used for metrics
|
/// TableMetrics reference all counter used for metrics
|
||||||
pub struct BlockManagerMetrics {
|
pub struct BlockManagerMetrics {
|
||||||
pub(crate) _resync_queue_len: ValueObserver<u64>,
|
pub(crate) _resync_queue_len: ValueObserver<u64>,
|
||||||
|
@ -20,7 +22,7 @@ pub struct BlockManagerMetrics {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockManagerMetrics {
|
impl BlockManagerMetrics {
|
||||||
pub fn new(resync_queue: sled::Tree) -> Self {
|
pub fn new(resync_queue: SledCountedTree) -> Self {
|
||||||
let meter = global::meter("garage_model/block");
|
let meter = global::meter("garage_model/block");
|
||||||
Self {
|
Self {
|
||||||
_resync_queue_len: meter
|
_resync_queue_len: meter
|
||||||
|
|
|
@ -134,7 +134,11 @@ impl RpcHelper {
|
||||||
M: Rpc<Response = Result<S, Error>>,
|
M: Rpc<Response = Result<S, Error>>,
|
||||||
H: EndpointHandler<M>,
|
H: EndpointHandler<M>,
|
||||||
{
|
{
|
||||||
let metric_tags = [KeyValue::new("rpc_endpoint", endpoint.path().to_string())];
|
let metric_tags = [
|
||||||
|
KeyValue::new("rpc_endpoint", endpoint.path().to_string()),
|
||||||
|
KeyValue::new("from", format!("{:?}", self.0.our_node_id)),
|
||||||
|
KeyValue::new("to", format!("{:?}", to)),
|
||||||
|
];
|
||||||
|
|
||||||
let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
|
let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
|
||||||
let permit = self
|
let permit = self
|
||||||
|
@ -245,6 +249,7 @@ impl RpcHelper {
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
let mut span = tracer.start(span_name);
|
let mut span = tracer.start(span_name);
|
||||||
|
span.set_attribute(KeyValue::new("from", format!("{:?}", self.0.our_node_id)));
|
||||||
span.set_attribute(KeyValue::new("to", format!("{:?}", to)));
|
span.set_attribute(KeyValue::new("to", format!("{:?}", to)));
|
||||||
span.set_attribute(KeyValue::new("quorum", quorum as i64));
|
span.set_attribute(KeyValue::new("quorum", quorum as i64));
|
||||||
span.set_attribute(KeyValue::new(
|
span.set_attribute(KeyValue::new(
|
||||||
|
|
|
@ -7,6 +7,7 @@ use tokio::sync::Notify;
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
|
use garage_util::sled_counter::SledCountedTree;
|
||||||
|
|
||||||
use garage_rpc::system::System;
|
use garage_rpc::system::System;
|
||||||
|
|
||||||
|
@ -27,7 +28,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
|
||||||
pub(crate) merkle_tree: sled::Tree,
|
pub(crate) merkle_tree: sled::Tree,
|
||||||
pub(crate) merkle_todo: sled::Tree,
|
pub(crate) merkle_todo: sled::Tree,
|
||||||
pub(crate) merkle_todo_notify: Notify,
|
pub(crate) merkle_todo_notify: Notify,
|
||||||
pub(crate) gc_todo: sled::Tree,
|
pub(crate) gc_todo: SledCountedTree,
|
||||||
|
|
||||||
pub(crate) metrics: TableMetrics,
|
pub(crate) metrics: TableMetrics,
|
||||||
}
|
}
|
||||||
|
@ -52,6 +53,7 @@ where
|
||||||
let gc_todo = db
|
let gc_todo = db
|
||||||
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
|
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
|
||||||
.expect("Unable to open DB tree");
|
.expect("Unable to open DB tree");
|
||||||
|
let gc_todo = SledCountedTree::new(gc_todo);
|
||||||
|
|
||||||
let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone());
|
let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone());
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@ use tokio::sync::watch;
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
|
use garage_util::sled_counter::SledCountedTree;
|
||||||
use garage_util::time::*;
|
use garage_util::time::*;
|
||||||
|
|
||||||
use garage_rpc::system::System;
|
use garage_rpc::system::System;
|
||||||
|
@ -362,7 +363,7 @@ impl GcTodoEntry {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Saves the GcTodoEntry in the gc_todo tree
|
/// Saves the GcTodoEntry in the gc_todo tree
|
||||||
pub(crate) fn save(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> {
|
pub(crate) fn save(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> {
|
||||||
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
|
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -372,7 +373,7 @@ impl GcTodoEntry {
|
||||||
/// This is usefull to remove a todo entry only under the condition
|
/// This is usefull to remove a todo entry only under the condition
|
||||||
/// that it has not changed since the time it was read, i.e.
|
/// that it has not changed since the time it was read, i.e.
|
||||||
/// what we have to do is still the same
|
/// what we have to do is still the same
|
||||||
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> {
|
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> {
|
||||||
let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>(
|
let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>(
|
||||||
&self.todo_table_key()[..],
|
&self.todo_table_key()[..],
|
||||||
Some(self.value_hash),
|
Some(self.value_hash),
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
use opentelemetry::{global, metrics::*, KeyValue};
|
use opentelemetry::{global, metrics::*, KeyValue};
|
||||||
|
|
||||||
|
use garage_util::sled_counter::SledCountedTree;
|
||||||
|
|
||||||
/// TableMetrics reference all counter used for metrics
|
/// TableMetrics reference all counter used for metrics
|
||||||
pub struct TableMetrics {
|
pub struct TableMetrics {
|
||||||
pub(crate) _merkle_todo_len: ValueObserver<u64>,
|
pub(crate) _merkle_todo_len: ValueObserver<u64>,
|
||||||
|
@ -17,7 +19,11 @@ pub struct TableMetrics {
|
||||||
pub(crate) sync_items_received: Counter<u64>,
|
pub(crate) sync_items_received: Counter<u64>,
|
||||||
}
|
}
|
||||||
impl TableMetrics {
|
impl TableMetrics {
|
||||||
pub fn new(table_name: &'static str, merkle_todo: sled::Tree, gc_todo: sled::Tree) -> Self {
|
pub fn new(
|
||||||
|
table_name: &'static str,
|
||||||
|
merkle_todo: sled::Tree,
|
||||||
|
gc_todo: SledCountedTree,
|
||||||
|
) -> Self {
|
||||||
let meter = global::meter(table_name);
|
let meter = global::meter(table_name);
|
||||||
TableMetrics {
|
TableMetrics {
|
||||||
_merkle_todo_len: meter
|
_merkle_todo_len: meter
|
||||||
|
|
|
@ -10,6 +10,7 @@ pub mod data;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod metrics;
|
pub mod metrics;
|
||||||
pub mod persister;
|
pub mod persister;
|
||||||
|
pub mod sled_counter;
|
||||||
pub mod time;
|
pub mod time;
|
||||||
pub mod token_bucket;
|
pub mod token_bucket;
|
||||||
pub mod tranquilizer;
|
pub mod tranquilizer;
|
||||||
|
|
92
src/util/sled_counter.rs
Normal file
92
src/util/sled_counter.rs
Normal file
|
@ -0,0 +1,92 @@
|
||||||
|
use std::sync::{
|
||||||
|
atomic::{AtomicUsize, Ordering},
|
||||||
|
Arc,
|
||||||
|
};
|
||||||
|
|
||||||
|
use sled::{CompareAndSwapError, IVec, Iter, Result, Tree};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct SledCountedTree(Arc<SledCountedTreeInternal>);
|
||||||
|
|
||||||
|
struct SledCountedTreeInternal {
|
||||||
|
tree: Tree,
|
||||||
|
len: AtomicUsize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SledCountedTree {
|
||||||
|
pub fn new(tree: Tree) -> Self {
|
||||||
|
let len = tree.len();
|
||||||
|
Self(Arc::new(SledCountedTreeInternal {
|
||||||
|
tree,
|
||||||
|
len: AtomicUsize::new(len),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
self.0.len.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.0.tree.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
|
||||||
|
self.0.tree.get(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn iter(&self) -> Iter {
|
||||||
|
self.0.tree.iter()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- writing functions ----
|
||||||
|
|
||||||
|
pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<IVec>>
|
||||||
|
where
|
||||||
|
K: AsRef<[u8]>,
|
||||||
|
V: Into<IVec>,
|
||||||
|
{
|
||||||
|
let res = self.0.tree.insert(key, value);
|
||||||
|
if res == Ok(None) {
|
||||||
|
self.0.len.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> {
|
||||||
|
let res = self.0.tree.pop_min();
|
||||||
|
if let Ok(Some(_)) = &res {
|
||||||
|
self.0.len.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
};
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn compare_and_swap<K, OV, NV>(
|
||||||
|
&self,
|
||||||
|
key: K,
|
||||||
|
old: Option<OV>,
|
||||||
|
new: Option<NV>,
|
||||||
|
) -> Result<std::result::Result<(), CompareAndSwapError>>
|
||||||
|
where
|
||||||
|
K: AsRef<[u8]>,
|
||||||
|
OV: AsRef<[u8]>,
|
||||||
|
NV: Into<IVec>,
|
||||||
|
{
|
||||||
|
let old_some = old.is_some();
|
||||||
|
let new_some = new.is_some();
|
||||||
|
|
||||||
|
let res = self.0.tree.compare_and_swap(key, old, new);
|
||||||
|
|
||||||
|
if res == Ok(Ok(())) {
|
||||||
|
match (old_some, new_some) {
|
||||||
|
(false, true) => {
|
||||||
|
self.0.len.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
(true, false) => {
|
||||||
|
self.0.len.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
res
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue