block manager: fix bugs
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing

This commit is contained in:
Alex 2023-09-07 15:30:56 +02:00
parent eb972a8422
commit 2657b5c1b9
2 changed files with 26 additions and 5 deletions

View file

@ -200,7 +200,6 @@ impl DataLayout {
// it might have been removed and added again and might contain data, // it might have been removed and added again and might contain data,
// so add it as a secondary storage location for all partitions // so add it as a secondary storage location for all partitions
// to make sure existing files are not lost // to make sure existing files are not lost
let mut part_sec = vec![vec![]; DRIVE_NPART];
for (i, dd) in data_dirs.iter().enumerate() { for (i, dd) in data_dirs.iter().enumerate() {
if self.data_dirs.iter().any(|ed| ed.path == dd.path) { if self.data_dirs.iter().any(|ed| ed.path == dd.path) {
continue; continue;

View file

@ -17,6 +17,7 @@ use garage_util::persister::PersisterShared;
use garage_util::time::*; use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer; use garage_util::tranquilizer::Tranquilizer;
use crate::block::*;
use crate::layout::*; use crate::layout::*;
use crate::manager::*; use crate::manager::*;
@ -528,8 +529,10 @@ impl Worker for ScrubWorker {
pub struct RebalanceWorker { pub struct RebalanceWorker {
manager: Arc<BlockManager>, manager: Arc<BlockManager>,
block_iter: BlockStoreIterator, block_iter: BlockStoreIterator,
t_started: u64,
t_finished: Option<u64>,
moved: usize, moved: usize,
moved_bytes: usize, moved_bytes: u64,
} }
impl RebalanceWorker { impl RebalanceWorker {
@ -538,6 +541,8 @@ impl RebalanceWorker {
Self { Self {
manager, manager,
block_iter, block_iter,
t_started: now_msec(),
t_finished: None,
moved: 0, moved: 0,
moved_bytes: 0, moved_bytes: 0,
} }
@ -551,11 +556,18 @@ impl Worker for RebalanceWorker {
} }
fn status(&self) -> WorkerStatus { fn status(&self) -> WorkerStatus {
let t_cur = self.t_finished.unwrap_or_else(|| now_msec());
let rate = self.moved_bytes / std::cmp::max(1, (t_cur - self.t_started) / 1000);
WorkerStatus { WorkerStatus {
progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)), progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)),
freeform: vec![ freeform: vec![
format!("Blocks moved: {}", self.moved), format!("Blocks moved: {}", self.moved),
format!("Bytes moved: {}", self.moved_bytes), format!(
"Bytes moved: {} ({}/s)",
bytesize::ByteSize::b(self.moved_bytes),
bytesize::ByteSize::b(rate)
),
format!("Started: {}", msec_to_rfc3339(self.t_started)),
], ],
..Default::default() ..Default::default()
} }
@ -565,12 +577,21 @@ impl Worker for RebalanceWorker {
if let Some((path, hash)) = self.block_iter.next().await? { if let Some((path, hash)) = self.block_iter.next().await? {
let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash); let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash);
if path.parent().expect("no parent?") != prim_loc { if path.parent().expect("no parent?") != prim_loc {
let path = match path.extension() {
None => DataBlockPath::Plain(path),
Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path),
_ => {
warn!("not rebalancing file: {}", path.to_string_lossy());
return Ok(WorkerState::Busy);
}
};
// block is not in its primary location, // block is not in its primary location,
// move it there (reading and re-writing does the trick) // move it there (reading and re-writing does the trick)
let data = self.manager.read_block(&hash).await?; debug!("rebalance: moving block {:?}", hash);
let data = self.manager.read_block_from(&hash, &path).await?;
self.manager.write_block(&hash, &data).await?; self.manager.write_block(&hash, &data).await?;
self.moved += 1; self.moved += 1;
self.moved_bytes += data.inner_buffer().len(); self.moved_bytes += data.inner_buffer().len() as u64;
} }
Ok(WorkerState::Busy) Ok(WorkerState::Busy)
} else { } else {
@ -589,6 +610,7 @@ impl Worker for RebalanceWorker {
.save_async(&new_layout) .save_async(&new_layout)
.await?; .await?;
self.manager.data_layout.store(Arc::new(new_layout)); self.manager.data_layout.store(Arc::new(new_layout));
self.t_finished = Some(now_msec());
Ok(WorkerState::Done) Ok(WorkerState::Done)
} }
} }