block manager: refactoring

This commit is contained in:
Alex 2023-09-05 15:39:21 +02:00
parent fd00a47ddc
commit 93114a9747
2 changed files with 45 additions and 53 deletions

View file

@ -88,7 +88,7 @@ pub struct BlockManager {
data_fsync: bool, data_fsync: bool,
compression_level: Option<i32>, compression_level: Option<i32>,
mutation_lock: [Mutex<BlockManagerLocked>; 256], mutation_lock: Vec<Mutex<BlockManagerLocked>>,
pub(crate) rc: BlockRc, pub(crate) rc: BlockRc,
pub resync: BlockResyncManager, pub resync: BlockResyncManager,
@ -111,6 +111,9 @@ pub struct BlockResyncErrorInfo {
pub next_try: u64, pub next_try: u64,
} }
// The number of different mutexes used to parallelize write access to data blocks
const MUTEX_COUNT: usize = 256;
// This custom struct contains functions that must only be ran // This custom struct contains functions that must only be ran
// when the lock is held. We ensure that it is the case by storing // when the lock is held. We ensure that it is the case by storing
// it INSIDE a Mutex. // it INSIDE a Mutex.
@ -124,21 +127,24 @@ impl BlockManager {
compression_level: Option<i32>, compression_level: Option<i32>,
replication: TableShardedReplication, replication: TableShardedReplication,
system: Arc<System>, system: Arc<System>,
) -> Arc<Self> { ) -> Result<Arc<Self>, Error> {
// TODO don't panic, report error // Load or compute layout, i.e. assignment of data blocks to the different data directories
let layout_persister: Persister<DataLayout> = let layout_persister: Persister<DataLayout> =
Persister::new(&system.metadata_dir, "data_layout"); Persister::new(&system.metadata_dir, "data_layout");
let data_layout = match layout_persister.load() { let data_layout = match layout_persister.load() {
Ok(mut layout) => { Ok(mut layout) => {
layout.update(&data_dir).expect("invalid data_dir config"); layout
.update(&data_dir)
.ok_or_message("invalid data_dir config")?;
layout layout
} }
Err(_) => DataLayout::initialize(&data_dir).expect("invalid data_dir config"), Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?,
}; };
layout_persister layout_persister
.save(&data_layout) .save(&data_layout)
.expect("cannot save data_layout"); .expect("cannot save data_layout");
// Open metadata tables
let rc = db let rc = db
.open_tree("block_local_rc") .open_tree("block_local_rc")
.expect("Unable to open block_local_rc tree"); .expect("Unable to open block_local_rc tree");
@ -165,7 +171,10 @@ impl BlockManager {
data_layout, data_layout,
data_fsync, data_fsync,
compression_level, compression_level,
mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())), mutation_lock: vec![(); MUTEX_COUNT]
.iter()
.map(|_| Mutex::new(BlockManagerLocked()))
.collect::<Vec<_>>(),
rc, rc,
resync, resync,
system, system,
@ -177,7 +186,7 @@ impl BlockManager {
block_manager.endpoint.set_handler(block_manager.clone()); block_manager.endpoint.set_handler(block_manager.clone());
block_manager.scrub_persister.set_with(|_| ()).unwrap(); block_manager.scrub_persister.set_with(|_| ()).unwrap();
block_manager Ok(block_manager)
} }
pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) { pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
@ -224,44 +233,10 @@ impl BlockManager {
hash: &Hash, hash: &Hash,
order_tag: Option<OrderTag>, order_tag: Option<OrderTag>,
) -> Result<(DataBlockHeader, ByteStream), Error> { ) -> Result<(DataBlockHeader, ByteStream), Error> {
let who = self.replication.read_nodes(hash); self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
let who = self.system.rpc.request_order(&who); Ok((header, stream))
})
for node in who.iter() { .await
let node_id = NodeID::from(*node);
let rpc = self.endpoint.call_streaming(
&node_id,
BlockRpc::GetBlock(*hash, order_tag),
PRIO_NORMAL | PRIO_SECONDARY,
);
tokio::select! {
res = rpc => {
let res = match res {
Ok(res) => res,
Err(e) => {
debug!("Node {:?} returned error: {}", node, e);
continue;
}
};
let (header, stream) = match res.into_parts() {
(Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
_ => {
debug!("Node {:?} returned a malformed response", node);
continue;
}
};
return Ok((header, stream));
}
_ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
debug!("Node {:?} didn't return block in time, trying next.", node);
}
};
}
Err(Error::Message(format!(
"Unable to read block {:?}: no node returned a valid block",
hash
)))
} }
/// Ask nodes that might have a (possibly compressed) block for it /// Ask nodes that might have a (possibly compressed) block for it
@ -271,6 +246,24 @@ impl BlockManager {
hash: &Hash, hash: &Hash,
order_tag: Option<OrderTag>, order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> { ) -> Result<DataBlock, Error> {
self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
read_stream_to_end(stream)
.await
.map(|data| DataBlock::from_parts(header, data))
})
.await
}
async fn rpc_get_raw_block_internal<F, Fut, T>(
&self,
hash: &Hash,
order_tag: Option<OrderTag>,
f: F,
) -> Result<T, Error>
where
F: Fn(DataBlockHeader, ByteStream) -> Fut,
Fut: futures::Future<Output = Result<T, Error>>,
{
let who = self.replication.read_nodes(hash); let who = self.replication.read_nodes(hash);
let who = self.system.rpc.request_order(&who); let who = self.system.rpc.request_order(&who);
@ -297,13 +290,17 @@ impl BlockManager {
continue; continue;
} }
}; };
match read_stream_to_end(stream).await { match f(header, stream).await {
Ok(bytes) => return Ok(DataBlock::from_parts(header, bytes)), Ok(ret) => return Ok(ret),
Err(e) => { Err(e) => {
debug!("Error reading stream from node {:?}: {}", node, e); debug!("Error reading stream from node {:?}: {}", node, e);
} }
} }
} }
// TODO: sleep less long (fail early), initiate a second request earlier
// if the first one doesn't succeed rapidly
// TODO: keep first request running when initiating a new one and take the
// one that finishes earlier
_ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
debug!("Node {:?} didn't return block in time, trying next.", node); debug!("Node {:?} didn't return block in time, trying next.", node);
} }
@ -680,11 +677,6 @@ impl StreamingEndpointHandler<BlockRpc> for BlockManager {
} }
} }
pub(crate) struct BlockStatus {
pub(crate) exists: bool,
pub(crate) needed: RcEntry,
}
impl BlockManagerLocked { impl BlockManagerLocked {
async fn write_block( async fn write_block(
&self, &self,

View file

@ -251,7 +251,7 @@ impl Garage {
config.compression_level, config.compression_level,
data_rep_param, data_rep_param,
system.clone(), system.clone(),
); )?;
block_manager.register_bg_vars(&mut bg_vars); block_manager.register_bg_vars(&mut bg_vars);
// ---- admin tables ---- // ---- admin tables ----