|
|
|
@ -22,7 +22,7 @@ use garage_net::stream::{read_stream_to_end, stream_asyncread, ByteStream};
|
|
|
|
|
use garage_db as db;
|
|
|
|
|
|
|
|
|
|
use garage_util::background::{vars, BackgroundRunner};
|
|
|
|
|
use garage_util::config::DataDirEnum;
|
|
|
|
|
use garage_util::config::Config;
|
|
|
|
|
use garage_util::data::*;
|
|
|
|
|
use garage_util::error::*;
|
|
|
|
|
use garage_util::metrics::RecordDuration;
|
|
|
|
@ -84,6 +84,7 @@ pub struct BlockManager {
|
|
|
|
|
|
|
|
|
|
data_fsync: bool,
|
|
|
|
|
compression_level: Option<i32>,
|
|
|
|
|
disable_scrub: bool,
|
|
|
|
|
|
|
|
|
|
mutation_lock: Vec<Mutex<BlockManagerLocked>>,
|
|
|
|
|
|
|
|
|
@ -119,9 +120,7 @@ struct BlockManagerLocked();
|
|
|
|
|
impl BlockManager {
|
|
|
|
|
pub fn new(
|
|
|
|
|
db: &db::Db,
|
|
|
|
|
data_dir: DataDirEnum,
|
|
|
|
|
data_fsync: bool,
|
|
|
|
|
compression_level: Option<i32>,
|
|
|
|
|
config: &Config,
|
|
|
|
|
replication: TableShardedReplication,
|
|
|
|
|
system: Arc<System>,
|
|
|
|
|
) -> Result<Arc<Self>, Error> {
|
|
|
|
@ -131,11 +130,13 @@ impl BlockManager {
|
|
|
|
|
let data_layout = match data_layout_persister.load() {
|
|
|
|
|
Ok(mut layout) => {
|
|
|
|
|
layout
|
|
|
|
|
.update(&data_dir)
|
|
|
|
|
.update(&config.data_dir)
|
|
|
|
|
.ok_or_message("invalid data_dir config")?;
|
|
|
|
|
layout
|
|
|
|
|
}
|
|
|
|
|
Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?,
|
|
|
|
|
Err(_) => {
|
|
|
|
|
DataLayout::initialize(&config.data_dir).ok_or_message("invalid data_dir config")?
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
data_layout_persister
|
|
|
|
|
.save(&data_layout)
|
|
|
|
@ -154,7 +155,7 @@ impl BlockManager {
|
|
|
|
|
.endpoint("garage_block/manager.rs/Rpc".to_string());
|
|
|
|
|
|
|
|
|
|
let metrics = BlockManagerMetrics::new(
|
|
|
|
|
compression_level,
|
|
|
|
|
config.compression_level,
|
|
|
|
|
rc.rc.clone(),
|
|
|
|
|
resync.queue.clone(),
|
|
|
|
|
resync.errors.clone(),
|
|
|
|
@ -166,8 +167,9 @@ impl BlockManager {
|
|
|
|
|
replication,
|
|
|
|
|
data_layout: ArcSwap::new(Arc::new(data_layout)),
|
|
|
|
|
data_layout_persister,
|
|
|
|
|
data_fsync,
|
|
|
|
|
compression_level,
|
|
|
|
|
data_fsync: config.data_fsync,
|
|
|
|
|
disable_scrub: config.disable_scrub,
|
|
|
|
|
compression_level: config.compression_level,
|
|
|
|
|
mutation_lock: vec![(); MUTEX_COUNT]
|
|
|
|
|
.iter()
|
|
|
|
|
.map(|_| Mutex::new(BlockManagerLocked()))
|
|
|
|
@ -194,33 +196,37 @@ impl BlockManager {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Spawn scrub worker
|
|
|
|
|
let (scrub_tx, scrub_rx) = mpsc::channel(1);
|
|
|
|
|
self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
|
|
|
|
|
bg.spawn_worker(ScrubWorker::new(
|
|
|
|
|
self.clone(),
|
|
|
|
|
scrub_rx,
|
|
|
|
|
self.scrub_persister.clone(),
|
|
|
|
|
));
|
|
|
|
|
if !self.disable_scrub {
|
|
|
|
|
let (scrub_tx, scrub_rx) = mpsc::channel(1);
|
|
|
|
|
self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
|
|
|
|
|
bg.spawn_worker(ScrubWorker::new(
|
|
|
|
|
self.clone(),
|
|
|
|
|
scrub_rx,
|
|
|
|
|
self.scrub_persister.clone(),
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn register_bg_vars(&self, vars: &mut vars::BgVars) {
|
|
|
|
|
self.resync.register_bg_vars(vars);
|
|
|
|
|
|
|
|
|
|
vars.register_rw(
|
|
|
|
|
&self.scrub_persister,
|
|
|
|
|
"scrub-tranquility",
|
|
|
|
|
|p| p.get_with(|x| x.tranquility),
|
|
|
|
|
|p, tranquility| p.set_with(|x| x.tranquility = tranquility),
|
|
|
|
|
);
|
|
|
|
|
vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| {
|
|
|
|
|
p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub))
|
|
|
|
|
});
|
|
|
|
|
vars.register_ro(&self.scrub_persister, "scrub-next-run", |p| {
|
|
|
|
|
p.get_with(|x| msec_to_rfc3339(x.time_next_run_scrub))
|
|
|
|
|
});
|
|
|
|
|
vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| {
|
|
|
|
|
p.get_with(|x| x.corruptions_detected)
|
|
|
|
|
});
|
|
|
|
|
if !self.disable_scrub {
|
|
|
|
|
vars.register_rw(
|
|
|
|
|
&self.scrub_persister,
|
|
|
|
|
"scrub-tranquility",
|
|
|
|
|
|p| p.get_with(|x| x.tranquility),
|
|
|
|
|
|p, tranquility| p.set_with(|x| x.tranquility = tranquility),
|
|
|
|
|
);
|
|
|
|
|
vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| {
|
|
|
|
|
p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub))
|
|
|
|
|
});
|
|
|
|
|
vars.register_ro(&self.scrub_persister, "scrub-next-run", |p| {
|
|
|
|
|
p.get_with(|x| msec_to_rfc3339(x.time_next_run_scrub))
|
|
|
|
|
});
|
|
|
|
|
vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| {
|
|
|
|
|
p.get_with(|x| x.corruptions_detected)
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Ask nodes that might have a (possibly compressed) block for it
|
|
|
|
|