diff --git a/src/block/block.rs b/src/block/block.rs index 6d79fb6c..20f57aa5 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -21,6 +21,7 @@ pub enum DataBlock { Compressed(Bytes), } +#[derive(Debug)] pub enum DataBlockPath { /// Uncompressed data fail Plain(PathBuf), diff --git a/src/block/manager.rs b/src/block/manager.rs index e0fbfe74..ea70b19c 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -645,6 +645,19 @@ impl BlockManager { None } + /// Rewrite a block at the primary location for its path and delete the old path. + /// Returns the number of bytes read/written + pub(crate) async fn fix_block_location( + &self, + hash: &Hash, + wrong_path: DataBlockPath, + ) -> Result { + self.lock_mutate(hash) + .await + .fix_block_location(hash, wrong_path, self) + .await + } + async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> { let tracer = opentelemetry::global::tracer("garage"); let ilock = u16::from_be_bytes([hash.as_slice()[0], hash.as_slice()[1]]) as usize @@ -682,6 +695,17 @@ impl BlockManagerLocked { hash: &Hash, data: &DataBlock, mgr: &BlockManager, + ) -> Result<(), Error> { + let existing_path = mgr.find_block(hash).await; + self.write_block_inner(hash, data, mgr, existing_path).await + } + + async fn write_block_inner( + &self, + hash: &Hash, + data: &DataBlock, + mgr: &BlockManager, + existing_path: Option, ) -> Result<(), Error> { let compressed = data.is_compressed(); let data = data.inner_buffer(); @@ -694,7 +718,7 @@ impl BlockManagerLocked { tgt_path.set_extension("zst"); } - let to_delete = match (mgr.find_block(hash).await, compressed) { + let to_delete = match (existing_path, compressed) { // If the block is stored in the wrong directory, // write it again at the correct path and delete the old path (Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p), @@ -716,6 +740,7 @@ impl BlockManagerLocked { // If the block isn't stored already, just store what is given to us (None, _) => None, }; + assert!(to_delete.as_ref() != Some(&tgt_path)); let mut path_tmp = tgt_path.clone(); let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>())); @@ -792,6 +817,18 @@ impl BlockManagerLocked { } Ok(()) } + + async fn fix_block_location( + &self, + hash: &Hash, + wrong_path: DataBlockPath, + mgr: &BlockManager, + ) -> Result { + let data = mgr.read_block_from(hash, &wrong_path).await?; + self.write_block_inner(hash, &data, mgr, Some(wrong_path)) + .await?; + Ok(data.inner_buffer().len()) + } } async fn read_stream_to_end(mut stream: ByteStream) -> Result { diff --git a/src/block/repair.rs b/src/block/repair.rs index e18eeaeb..bd14085f 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -558,17 +558,21 @@ impl Worker for RebalanceWorker { fn status(&self) -> WorkerStatus { let t_cur = self.t_finished.unwrap_or_else(|| now_msec()); let rate = self.moved_bytes / std::cmp::max(1, (t_cur - self.t_started) / 1000); + let mut freeform = vec![ + format!("Blocks moved: {}", self.moved), + format!( + "Bytes moved: {} ({}/s)", + bytesize::ByteSize::b(self.moved_bytes), + bytesize::ByteSize::b(rate) + ), + format!("Started: {}", msec_to_rfc3339(self.t_started)), + ]; + if let Some(t_fin) = self.t_finished { + freeform.push(format!("Finished: {}", msec_to_rfc3339(t_fin))) + } WorkerStatus { progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)), - freeform: vec![ - format!("Blocks moved: {}", self.moved), - format!( - "Bytes moved: {} ({}/s)", - bytesize::ByteSize::b(self.moved_bytes), - bytesize::ByteSize::b(rate) - ), - format!("Started: {}", msec_to_rfc3339(self.t_started)), - ], + freeform, ..Default::default() } } @@ -576,10 +580,10 @@ impl Worker for RebalanceWorker { async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { if let Some((path, hash)) = self.block_iter.next().await? { let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash); - if path.parent().expect("no parent?") != prim_loc { - let path = match path.extension() { - None => DataBlockPath::Plain(path), - Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path), + if path.ancestors().all(|x| x != prim_loc) { + let block_path = match path.extension() { + None => DataBlockPath::Plain(path.clone()), + Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path.clone()), _ => { warn!("not rebalancing file: {}", path.to_string_lossy()); return Ok(WorkerState::Busy); @@ -587,11 +591,10 @@ impl Worker for RebalanceWorker { }; // block is not in its primary location, // move it there (reading and re-writing does the trick) - debug!("rebalance: moving block {:?}", hash); - let data = self.manager.read_block_from(&hash, &path).await?; - self.manager.write_block(&hash, &data).await?; + debug!("rebalance: moving block {:?} => {:?}", block_path, prim_loc); + let block_len = self.manager.fix_block_location(&hash, block_path).await?; self.moved += 1; - self.moved_bytes += data.inner_buffer().len() as u64; + self.moved_bytes += block_len as u64; } Ok(WorkerState::Busy) } else {