From 71c0188055e25aa1c00d0226f0ca99ce323310a6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 4 Sep 2023 14:49:49 +0200 Subject: [PATCH 01/26] block manager: skeleton for multi-hdd support --- src/block/layout.rs | 57 +++++++++++ src/block/lib.rs | 1 + src/block/manager.rs | 29 ++++-- src/block/repair.rs | 223 ++++++++++++++++++++++++++++--------------- src/model/garage.rs | 18 +++- src/rpc/system.rs | 23 ++++- src/util/config.rs | 22 ++++- 7 files changed, 280 insertions(+), 93 deletions(-) create mode 100644 src/block/layout.rs diff --git a/src/block/layout.rs b/src/block/layout.rs new file mode 100644 index 000000000..cbc326d8f --- /dev/null +++ b/src/block/layout.rs @@ -0,0 +1,57 @@ +use std::path::PathBuf; + +use serde::{Deserialize, Serialize}; + +use garage_util::config::DataDirEnum; +use garage_util::data::Hash; +use garage_util::migrate::*; + +pub const DRIVE_NPART: usize = 1024; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) struct DataLayout { + pub(crate) data_dirs: Vec, + pub(crate) partitions: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) struct DataDir { + pub(crate) path: PathBuf, + pub(crate) state: DataDirState, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) enum DataDirState { + Active { capacity: u64 }, + ReadOnly, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) struct Partition { + pub(crate) prim: usize, + pub(crate) sec: Vec, +} + +impl DataLayout { + pub(crate) fn initialize(dirs: &DataDirEnum) -> Self { + todo!() + } + + pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Self { + todo!() + } + + pub(crate) fn data_dir(&self, hash: &Hash) -> PathBuf { + todo!() + /* + let mut path = self.data_dir.clone(); + path.push(hex::encode(&hash.as_slice()[0..1])); + path.push(hex::encode(&hash.as_slice()[1..2])); + path + */ + } +} + +impl InitialFormat for DataLayout { + const VERSION_MARKER: &'static [u8] = b"G09bmdl"; +} diff --git a/src/block/lib.rs b/src/block/lib.rs index d2814f770..c9ff28457 100644 --- a/src/block/lib.rs +++ b/src/block/lib.rs @@ -6,5 +6,6 @@ pub mod repair; pub mod resync; mod block; +mod layout; mod metrics; mod rc; diff --git a/src/block/manager.rs b/src/block/manager.rs index c7e4cd03b..18a2686ef 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -25,10 +25,11 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream}; use garage_db as db; use garage_util::background::{vars, BackgroundRunner}; +use garage_util::config::DataDirEnum; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; -use garage_util::persister::PersisterShared; +use garage_util::persister::{Persister, PersisterShared}; use garage_util::time::msec_to_rfc3339; use garage_rpc::rpc_helper::OrderTag; @@ -38,6 +39,7 @@ use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; use crate::block::*; +use crate::layout::*; use crate::metrics::*; use crate::rc::*; use crate::repair::*; @@ -77,8 +79,11 @@ impl Rpc for BlockRpc { pub struct BlockManager { /// Replication strategy, allowing to find on which node blocks should be located pub replication: TableShardedReplication, - /// Directory in which block are stored - pub data_dir: PathBuf, + + /// Directory/ies in which block are stored + pub data_dir: DataDirEnum, + /// Data layout + pub(crate) data_layout: DataLayout, data_fsync: bool, compression_level: Option, @@ -114,12 +119,22 @@ struct BlockManagerLocked(); impl BlockManager { pub fn new( db: &db::Db, - data_dir: PathBuf, + data_dir: DataDirEnum, data_fsync: bool, compression_level: Option, replication: TableShardedReplication, system: Arc, ) -> Arc { + let layout_persister: Persister = + Persister::new(&system.metadata_dir, "data_layout"); + let data_layout = match layout_persister.load() { + Ok(mut layout) => { + layout.update(&data_dir); + layout + } + Err(_) => DataLayout::initialize(&data_dir), + }; + let rc = db .open_tree("block_local_rc") .expect("Unable to open block_local_rc tree"); @@ -143,6 +158,7 @@ impl BlockManager { let block_manager = Arc::new(Self { replication, data_dir, + data_layout, data_fsync, compression_level, mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())), @@ -586,10 +602,7 @@ impl BlockManager { /// Utility: gives the path of the directory in which a block should be found fn block_dir(&self, hash: &Hash) -> PathBuf { - let mut path = self.data_dir.clone(); - path.push(hex::encode(&hash.as_slice()[0..1])); - path.push(hex::encode(&hash.as_slice()[1..2])); - path + self.data_layout.data_dir(hash) } /// Utility: give the full path where a block should be found, minus extension if block is diff --git a/src/block/repair.rs b/src/block/repair.rs index 71093d697..d5e2e1680 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -17,6 +17,7 @@ use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; +use crate::layout::*; use crate::manager::*; // Full scrub every 25 days with a random element of 10 days mixed in below @@ -136,7 +137,7 @@ impl Worker for RepairWorker { // Lists all blocks on disk and adds them to the resync queue. // This allows us to find blocks we are storing but don't actually need, // so that we can offload them if necessary and then delete them locally. - if let Some(hash) = bi.next().await? { + if let Some((_path, hash)) = bi.next().await? { self.manager .resync .put_to_resync(&hash, Duration::from_secs(0))?; @@ -376,7 +377,7 @@ impl Worker for ScrubWorker { match &mut self.work { ScrubWorkerState::Running(bsi) => { self.tranquilizer.reset(); - if let Some(hash) = bsi.next().await? { + if let Some((_path, hash)) = bsi.next().await? { match self.manager.read_block(&hash).await { Err(Error::CorruptData(_)) => { error!("Found corrupt data block during scrub: {:?}", hash); @@ -447,100 +448,166 @@ impl Worker for ScrubWorker { // UTILITY FOR ENUMERATING THE BLOCK STORE // ---- ---- ---- +const PROGRESS_FP: u64 = 1_000_000_000; + struct BlockStoreIterator { - path: Vec, + todo: Vec, } -enum ReadingDir { - Pending(PathBuf), - Read { - subpaths: Vec, - pos: usize, +enum BsiTodo { + Directory { + path: PathBuf, + progress_min: u64, + progress_max: u64, + }, + File { + path: PathBuf, + filename: String, + progress: u64, }, } impl BlockStoreIterator { fn new(manager: &BlockManager) -> Self { - let root_dir = manager.data_dir.clone(); - Self { - path: vec![ReadingDir::Pending(root_dir)], + let min_cap = manager + .data_layout + .data_dirs + .iter() + .filter_map(|x| match x.state { + DataDirState::Active { capacity } => Some(capacity), + _ => None, + }) + .min() + .unwrap_or(0); + + let sum_cap = manager + .data_layout + .data_dirs + .iter() + .map(|x| match x.state { + DataDirState::Active { capacity } => capacity, + _ => min_cap, // approximation + }) + .sum::() as u128; + + let mut cum_cap = 0; + let mut todo = vec![]; + for dir in manager.data_layout.data_dirs.iter() { + let cap = match dir.state { + DataDirState::Active { capacity } => capacity, + _ => min_cap, + }; + + let progress_min = ((cum_cap as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64; + let progress_max = + (((cum_cap + cap) as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64; + cum_cap += cap; + + todo.push(BsiTodo::Directory { + path: dir.path.clone(), + progress_min, + progress_max, + }); } + // entries are processed back-to-front (because of .pop()), + // so reverse entries to process them in increasing progress bounds + todo.reverse(); + + let ret = Self { todo }; + debug_assert!(ret.progress_invariant()); + + ret } /// Returns progress done, between 0 and 1 fn progress(&self) -> f32 { - if self.path.is_empty() { - 1.0 - } else { - let mut ret = 0.0; - let mut next_div = 1; - for p in self.path.iter() { - match p { - ReadingDir::Pending(_) => break, - ReadingDir::Read { subpaths, pos } => { - next_div *= subpaths.len(); - ret += ((*pos - 1) as f32) / (next_div as f32); + self.todo + .last() + .map(|x| match x { + BsiTodo::Directory { progress_min, .. } => *progress_min, + BsiTodo::File { progress, .. } => *progress, + }) + .map(|x| x as f32 / PROGRESS_FP as f32) + .unwrap_or(1.0) + } + + async fn next(&mut self) -> Result, Error> { + loop { + match self.todo.pop() { + None => return Ok(None), + Some(BsiTodo::Directory { + path, + progress_min, + progress_max, + }) => { + let istart = self.todo.len(); + + let mut reader = fs::read_dir(&path).await?; + while let Some(ent) = reader.next_entry().await? { + let name = if let Ok(n) = ent.file_name().into_string() { + n + } else { + continue; + }; + let ft = ent.file_type().await?; + if ft.is_dir() && hex::decode(&name).is_ok() { + self.todo.push(BsiTodo::Directory { + path: ent.path(), + progress_min: 0, + progress_max: 0, + }); + } else if ft.is_file() { + self.todo.push(BsiTodo::File { + path: ent.path(), + filename: name, + progress: 0, + }); + } + } + + let count = self.todo.len() - istart; + for (i, ent) in self.todo[istart..].iter_mut().enumerate() { + let p1 = progress_min + + ((progress_max - progress_min) * i as u64) / count as u64; + let p2 = progress_min + + ((progress_max - progress_min) * (i + 1) as u64) / count as u64; + match ent { + BsiTodo::Directory { + progress_min, + progress_max, + .. + } => { + *progress_min = p1; + *progress_max = p2; + } + BsiTodo::File { progress, .. } => { + *progress = p1; + } + } + } + self.todo[istart..].reverse(); + debug_assert!(self.progress_invariant()); + } + Some(BsiTodo::File { path, filename, .. }) => { + let filename = filename.strip_suffix(".zst").unwrap_or(&filename); + if filename.len() == 64 { + if let Ok(h) = hex::decode(filename) { + let mut hash = [0u8; 32]; + hash.copy_from_slice(&h); + return Ok(Some((path, hash.into()))); + } } } } - ret } } - async fn next(&mut self) -> Result, Error> { - loop { - let last_path = match self.path.last_mut() { - None => return Ok(None), - Some(lp) => lp, - }; - - if let ReadingDir::Pending(path) = last_path { - let mut reader = fs::read_dir(&path).await?; - let mut subpaths = vec![]; - while let Some(ent) = reader.next_entry().await? { - subpaths.push(ent); - } - *last_path = ReadingDir::Read { subpaths, pos: 0 }; - } - - let (subpaths, pos) = match *last_path { - ReadingDir::Read { - ref subpaths, - ref mut pos, - } => (subpaths, pos), - ReadingDir::Pending(_) => unreachable!(), - }; - - let data_dir_ent = match subpaths.get(*pos) { - None => { - self.path.pop(); - continue; - } - Some(ent) => { - *pos += 1; - ent - } - }; - - let name = data_dir_ent.file_name(); - let name = if let Ok(n) = name.into_string() { - n - } else { - continue; - }; - let ent_type = data_dir_ent.file_type().await?; - - let name = name.strip_suffix(".zst").unwrap_or(&name); - if name.len() == 2 && hex::decode(name).is_ok() && ent_type.is_dir() { - let path = data_dir_ent.path(); - self.path.push(ReadingDir::Pending(path)); - } else if name.len() == 64 { - if let Ok(h) = hex::decode(name) { - let mut hash = [0u8; 32]; - hash.copy_from_slice(&h); - return Ok(Some(hash.into())); - } - } - } + fn progress_invariant(&self) -> bool { + let iter = self.todo.iter().map(|x| match x { + BsiTodo::Directory { progress_min, .. } => progress_min, + BsiTodo::File { progress, .. } => progress, + }); + let iter_1 = iter.clone().skip(1); + iter.zip(iter_1).all(|(prev, next)| prev >= next) } } diff --git a/src/model/garage.rs b/src/model/garage.rs index 981430fbe..d6eebfb0d 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -92,8 +92,22 @@ impl Garage { // Create meta dir and data dir if they don't exist already std::fs::create_dir_all(&config.metadata_dir) .ok_or_message("Unable to create Garage metadata directory")?; - std::fs::create_dir_all(&config.data_dir) - .ok_or_message("Unable to create Garage data directory")?; + match &config.data_dir { + DataDirEnum::Single(data_dir) => { + std::fs::create_dir_all(data_dir).ok_or_message(format!( + "Unable to create Garage data directory: {}", + data_dir.to_string_lossy() + ))?; + } + DataDirEnum::Multiple(data_dirs) => { + for dir in data_dirs { + std::fs::create_dir_all(&dir.path).ok_or_message(format!( + "Unable to create Garage data directory: {}", + dir.path.to_string_lossy() + ))?; + } + } + } info!("Opening database..."); let mut db_path = config.metadata_dir.clone(); diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 1675e70e7..c5751d5de 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -22,9 +22,9 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::util::parse_and_resolve_peer_addr_async; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; -use garage_util::config::Config; #[cfg(feature = "kubernetes-discovery")] use garage_util::config::KubernetesDiscoveryConfig; +use garage_util::config::{Config, DataDirEnum}; use garage_util::data::*; use garage_util::error::*; use garage_util::persister::Persister; @@ -119,7 +119,7 @@ pub struct System { /// Path to metadata directory pub metadata_dir: PathBuf, /// Path to data directory - pub data_dir: PathBuf, + pub data_dir: DataDirEnum, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -890,7 +890,12 @@ impl NodeStatus { } } - fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &Path, metrics: &SystemMetrics) { + fn update_disk_usage( + &mut self, + meta_dir: &Path, + data_dir: &DataDirEnum, + metrics: &SystemMetrics, + ) { use systemstat::{Platform, System}; let mounts = System::new().mounts().unwrap_or_default(); @@ -903,7 +908,17 @@ impl NodeStatus { }; self.meta_disk_avail = mount_avail(meta_dir); - self.data_disk_avail = mount_avail(data_dir); + self.data_disk_avail = match data_dir { + DataDirEnum::Single(dir) => mount_avail(dir), + DataDirEnum::Multiple(dirs) => { + dirs.iter() + .map(|d| mount_avail(&d.path)) + .fold(Some((0, 0)), |acc, cur| match (acc, cur) { + (Some((x, y)), Some((a, b))) => Some((x + a, y + b)), + _ => None, + }) + } + }; if let Some((avail, total)) = self.meta_disk_avail { metrics diff --git a/src/util/config.rs b/src/util/config.rs index eeb17e0e6..9d00fe823 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -13,7 +13,7 @@ pub struct Config { /// Path where to store metadata. Should be fast, but low volume pub metadata_dir: PathBuf, /// Path where to store data. Can be slower, but need higher volume - pub data_dir: PathBuf, + pub data_dir: DataDirEnum, /// Whether to fsync after all metadata transactions (disabled by default) #[serde(default)] @@ -94,6 +94,26 @@ pub struct Config { pub admin: AdminConfig, } +/// Value for data_dir: either a single directory or a list of dirs with attributes +#[derive(Deserialize, Debug, Clone)] +#[serde(untagged)] +pub enum DataDirEnum { + Single(PathBuf), + Multiple(Vec), +} + +#[derive(Deserialize, Debug, Clone)] +pub struct DataDir { + /// Path to the data directory + pub path: PathBuf, + /// Capacity of the drive (required if read_only is false) + #[serde(default)] + pub capacity: Option, + /// Whether this is a legacy read-only path (capacity should be None) + #[serde(default)] + pub read_only: bool, +} + /// Configuration for S3 api #[derive(Deserialize, Debug, Clone)] pub struct S3ApiConfig { From 6c420c0880de742b2b6416da1178df828fd977bf Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 13:43:38 +0200 Subject: [PATCH 02/26] block manager: multi-directory layout computation --- Cargo.lock | 1 + src/block/Cargo.toml | 1 + src/block/layout.rs | 264 +++++++++++++++++++++++++++++++++++++++---- src/block/manager.rs | 10 +- src/block/repair.rs | 10 +- 5 files changed, 256 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 79b35191a..1ace7cc2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1300,6 +1300,7 @@ dependencies = [ "async-compression", "async-trait", "bytes", + "bytesize", "futures", "futures-util", "garage_db", diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index 1057b699e..b77988d6c 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -24,6 +24,7 @@ opentelemetry = "0.17" arc-swap = "1.5" async-trait = "0.1.7" bytes = "1.0" +bytesize = "1.2" hex = "0.4" tracing = "0.1" rand = "0.8" diff --git a/src/block/layout.rs b/src/block/layout.rs index cbc326d8f..4a49b2870 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -4,14 +4,23 @@ use serde::{Deserialize, Serialize}; use garage_util::config::DataDirEnum; use garage_util::data::Hash; +use garage_util::error::{Error, OkOrMessage}; use garage_util::migrate::*; -pub const DRIVE_NPART: usize = 1024; +type Idx = u16; + +const DRIVE_NPART: usize = 1024; + +const DPART_BYTES: (usize, usize) = (2, 3); #[derive(Serialize, Deserialize, Debug, Clone)] pub(crate) struct DataLayout { pub(crate) data_dirs: Vec, - pub(crate) partitions: Vec, + + /// Primary storage location (index in data_dirs) for each partition + pub(crate) part_prim: Vec, + /// Secondary storage locations for each partition + pub(crate) part_sec: Vec>, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -20,38 +29,255 @@ pub(crate) struct DataDir { pub(crate) state: DataDirState, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub(crate) enum DataDirState { Active { capacity: u64 }, ReadOnly, } -#[derive(Serialize, Deserialize, Debug, Clone)] -pub(crate) struct Partition { - pub(crate) prim: usize, - pub(crate) sec: Vec, -} - impl DataLayout { - pub(crate) fn initialize(dirs: &DataDirEnum) -> Self { - todo!() + pub(crate) fn initialize(dirs: &DataDirEnum) -> Result { + let data_dirs = make_data_dirs(dirs)?; + + // Split partitions proportionnally to capacity for all drives + // to affect primary storage location + let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::(); + + let mut part_prim = Vec::with_capacity(DRIVE_NPART); + let mut cum_cap = 0; + for (i, dd) in data_dirs.iter().enumerate() { + if let DataDirState::Active { capacity } = dd.state { + cum_cap += capacity; + let n_total = (cum_cap * DRIVE_NPART as u64) / total_cap; + part_prim.resize(n_total as usize, i as Idx); + } + } + assert_eq!(cum_cap, total_cap); + assert_eq!(part_prim.len(), DRIVE_NPART); + + // If any of the storage locations is non-empty, add it as a secondary + // storage location for all partitions + let mut part_sec = vec![vec![]; DRIVE_NPART]; + for (i, dd) in data_dirs.iter().enumerate() { + if dir_not_empty(&dd.path)? { + for (sec, prim) in part_sec.iter_mut().zip(part_prim.iter()) { + if *prim != i as Idx { + sec.push(i as Idx); + } + } + } + } + + Ok(Self { + data_dirs, + part_prim, + part_sec, + }) } - pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Self { - todo!() + pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Result { + // Compute list of new data directories and mapping of old indices + // to new indices + let data_dirs = make_data_dirs(dirs)?; + let old2new = self + .data_dirs + .iter() + .map(|x| { + data_dirs + .iter() + .position(|y| y.path == x.path) + .map(|x| x as Idx) + }) + .collect::>(); + + // Compute secondary location list for partitions based on existing + // folders, translating indices from old to new + let mut part_sec = self + .part_sec + .iter() + .map(|dl| { + dl.iter() + .filter_map(|old| old2new.get(*old as usize).copied().flatten()) + .collect::>() + }) + .collect::>(); + + // Compute a vector that, for each data dir, + // contains the list of partitions primarily stored on that drive + let mut dir_prim = vec![vec![]; data_dirs.len()]; + for (ipart, prim) in self.part_prim.iter().enumerate() { + if let Some(new) = old2new.get(*prim as usize).copied().flatten() { + dir_prim[new as usize].push(ipart); + } + } + + // Compute the target number of partitions per data directory + let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::(); + let mut cum_cap = 0; + let mut npart_per_dir = vec![]; + for dd in data_dirs.iter() { + if let DataDirState::Active { capacity } = dd.state { + let begin = (cum_cap * DRIVE_NPART as u64) / total_cap; + cum_cap += capacity; + let end = (cum_cap * DRIVE_NPART as u64) / total_cap; + npart_per_dir.push((end - begin) as usize); + } else { + npart_per_dir.push(0); + } + } + assert_eq!(cum_cap, total_cap); + assert_eq!(npart_per_dir.iter().sum::(), DRIVE_NPART); + + // For all directories that have too many primary partitions, + // move that partition to secondary + for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate() + { + while parts.len() > *tgt_npart { + let part = parts.pop().unwrap(); + if !part_sec[part].contains(&(idir as Idx)) { + part_sec[part].push(idir as Idx); + } + } + } + + // Calculate the vector of primary partition dir index + let mut part_prim = vec![None; DRIVE_NPART]; + for (idir, parts) in dir_prim.iter().enumerate() { + for part in parts.iter() { + assert!(part_prim[*part].is_none()); + part_prim[*part] = Some(idir as Idx) + } + } + + // Calculate a vector of unassigned partitions + let mut unassigned = part_prim + .iter() + .enumerate() + .filter(|(_, dir)| dir.is_none()) + .map(|(ipart, _)| ipart) + .collect::>(); + + // For all directories that don't have enough primary partitions, + // add partitions from unassigned + for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate() + { + assert!(unassigned.len() >= *tgt_npart - parts.len()); + for _ in parts.len()..*tgt_npart { + let new_part = unassigned.pop().unwrap(); + part_prim[new_part] = Some(idir as Idx); + part_sec[new_part].retain(|x| *x != idir as Idx); + } + } + + // Sanity checks + assert!(part_prim.iter().all(|x| x.is_some())); + assert!(unassigned.is_empty()); + + let part_prim = part_prim + .into_iter() + .map(|x| x.unwrap()) + .collect::>(); + assert!(part_prim.iter().all(|p| data_dirs + .get(*p as usize) + .and_then(|x| x.capacity()) + .unwrap_or(0) + > 0)); + + Ok(Self { + data_dirs, + part_prim, + part_sec, + }) } - pub(crate) fn data_dir(&self, hash: &Hash) -> PathBuf { - todo!() - /* - let mut path = self.data_dir.clone(); + pub(crate) fn primary_data_dir(&self, hash: &Hash) -> PathBuf { + let ipart = self.partition_from(hash); + let idir = self.part_prim[ipart] as usize; + self.data_dir_from(hash, &self.data_dirs[idir].path) + } + + pub(crate) fn secondary_data_dirs<'a>(&'a self, hash: &'a Hash) -> impl Iterator + 'a { + let ipart = self.partition_from(hash); + self.part_sec[ipart] + .iter() + .map(move |idir| self.data_dir_from(hash, &self.data_dirs[*idir as usize].path)) + } + + fn partition_from(&self, hash: &Hash) -> usize { + u16::from_be_bytes([ + hash.as_slice()[DPART_BYTES.0], + hash.as_slice()[DPART_BYTES.1] + ]) as usize % DRIVE_NPART + } + + fn data_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf { + let mut path = dir.clone(); path.push(hex::encode(&hash.as_slice()[0..1])); path.push(hex::encode(&hash.as_slice()[1..2])); path - */ - } + } } impl InitialFormat for DataLayout { const VERSION_MARKER: &'static [u8] = b"G09bmdl"; } + +impl DataDir { + pub fn capacity(&self) -> Option { + match self.state { + DataDirState::Active { capacity } => Some(capacity), + _ => None, + } + } +} + +fn make_data_dirs(dirs: &DataDirEnum) -> Result, Error> { + let mut data_dirs = vec![]; + match dirs { + DataDirEnum::Single(path) => data_dirs.push(DataDir { + path: path.clone(), + state: DataDirState::Active { + capacity: 1_000_000_000, // whatever, doesn't matter + }, + }), + DataDirEnum::Multiple(dirs) => { + for dir in dirs.iter() { + let state = match &dir.capacity { + Some(cap) if dir.read_only == false => { + DataDirState::Active { + capacity: cap.parse::() + .ok_or_message("invalid capacity value")?.as_u64(), + } + } + None if dir.read_only == true => { + DataDirState::ReadOnly + } + _ => return Err(Error::Message(format!("data directories in data_dir should have a capacity value or be marked read_only, not the case for {}", dir.path.to_string_lossy()))), + }; + data_dirs.push(DataDir { + path: dir.path.clone(), + state, + }); + } + } + } + Ok(data_dirs) +} + +fn dir_not_empty(path: &PathBuf) -> Result { + for entry in std::fs::read_dir(&path)? { + let dir = entry?; + if dir.file_type()?.is_dir() + && dir + .file_name() + .into_string() + .ok() + .and_then(|hex| hex::decode(&hex).ok()) + .map(|bytes| (2..=4).contains(&bytes.len())) + .unwrap_or(false) + { + return Ok(true); + } + } + Ok(false) +} diff --git a/src/block/manager.rs b/src/block/manager.rs index 18a2686ef..45729a003 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -125,15 +125,19 @@ impl BlockManager { replication: TableShardedReplication, system: Arc, ) -> Arc { + // TODO don't panic, report error let layout_persister: Persister = Persister::new(&system.metadata_dir, "data_layout"); let data_layout = match layout_persister.load() { Ok(mut layout) => { - layout.update(&data_dir); + layout.update(&data_dir).expect("invalid data_dir config"); layout } - Err(_) => DataLayout::initialize(&data_dir), + Err(_) => DataLayout::initialize(&data_dir).expect("invalid data_dir config"), }; + layout_persister + .save(&data_layout) + .expect("cannot save data_layout"); let rc = db .open_tree("block_local_rc") @@ -602,7 +606,7 @@ impl BlockManager { /// Utility: gives the path of the directory in which a block should be found fn block_dir(&self, hash: &Hash) -> PathBuf { - self.data_layout.data_dir(hash) + self.data_layout.primary_data_dir(hash) } /// Utility: give the full path where a block should be found, minus extension if block is diff --git a/src/block/repair.rs b/src/block/repair.rs index d5e2e1680..0e7fe0df0 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -473,10 +473,7 @@ impl BlockStoreIterator { .data_layout .data_dirs .iter() - .filter_map(|x| match x.state { - DataDirState::Active { capacity } => Some(capacity), - _ => None, - }) + .filter_map(|x| x.capacity()) .min() .unwrap_or(0); @@ -484,10 +481,7 @@ impl BlockStoreIterator { .data_layout .data_dirs .iter() - .map(|x| match x.state { - DataDirState::Active { capacity } => capacity, - _ => min_cap, // approximation - }) + .map(|x| x.capacity().unwrap_or(min_cap /* approximation */)) .sum::() as u128; let mut cum_cap = 0; From 887b3233f45ade24def08b3faa2d6da5fe85a3a1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 14:27:39 +0200 Subject: [PATCH 03/26] block manager: use data paths from layout --- src/block/block.rs | 9 +++ src/block/layout.rs | 37 ++++++----- src/block/manager.rs | 149 ++++++++++++++++++++++--------------------- 3 files changed, 105 insertions(+), 90 deletions(-) diff --git a/src/block/block.rs b/src/block/block.rs index 935aa9002..6d79fb6c3 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -1,3 +1,5 @@ +use std::path::PathBuf; + use bytes::Bytes; use serde::{Deserialize, Serialize}; use zstd::stream::{decode_all as zstd_decode, Encoder}; @@ -19,6 +21,13 @@ pub enum DataBlock { Compressed(Bytes), } +pub enum DataBlockPath { + /// Uncompressed data fail + Plain(PathBuf), + /// Compressed data fail + Compressed(PathBuf), +} + impl DataBlock { /// Query whether this block is compressed pub fn is_compressed(&self) -> bool { diff --git a/src/block/layout.rs b/src/block/layout.rs index 4a49b2870..b119281b9 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -190,32 +190,35 @@ impl DataLayout { }) } - pub(crate) fn primary_data_dir(&self, hash: &Hash) -> PathBuf { - let ipart = self.partition_from(hash); - let idir = self.part_prim[ipart] as usize; - self.data_dir_from(hash, &self.data_dirs[idir].path) + pub(crate) fn primary_block_dir(&self, hash: &Hash) -> PathBuf { + let ipart = self.partition_from(hash); + let idir = self.part_prim[ipart] as usize; + self.block_dir_from(hash, &self.data_dirs[idir].path) } - pub(crate) fn secondary_data_dirs<'a>(&'a self, hash: &'a Hash) -> impl Iterator + 'a { - let ipart = self.partition_from(hash); - self.part_sec[ipart] - .iter() - .map(move |idir| self.data_dir_from(hash, &self.data_dirs[*idir as usize].path)) + pub(crate) fn secondary_block_dirs<'a>( + &'a self, + hash: &'a Hash, + ) -> impl Iterator + 'a { + let ipart = self.partition_from(hash); + self.part_sec[ipart] + .iter() + .map(move |idir| self.block_dir_from(hash, &self.data_dirs[*idir as usize].path)) } - fn partition_from(&self, hash: &Hash) -> usize { - u16::from_be_bytes([ - hash.as_slice()[DPART_BYTES.0], - hash.as_slice()[DPART_BYTES.1] - ]) as usize % DRIVE_NPART - } + fn partition_from(&self, hash: &Hash) -> usize { + u16::from_be_bytes([ + hash.as_slice()[DPART_BYTES.0], + hash.as_slice()[DPART_BYTES.1], + ]) as usize % DRIVE_NPART + } - fn data_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf { + fn block_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf { let mut path = dir.clone(); path.push(hex::encode(&hash.as_slice()[0..1])); path.push(hex::encode(&hash.as_slice()[1..2])); path - } + } } impl InitialFormat for DataLayout { diff --git a/src/block/manager.rs b/src/block/manager.rs index 45729a003..73fefa0c2 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -543,21 +543,25 @@ impl BlockManager { } async fn read_block_internal(&self, hash: &Hash) -> Result { - let mut path = self.block_path(hash); - let compressed = match self.is_block_compressed(hash).await { - Ok(c) => c, - Err(e) => { + let block_path = match self.find_block(hash).await { + Some(p) => p, + None => { // Not found but maybe we should have had it ?? self.resync .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?; - return Err(Into::into(e)); + return Err(Error::Message(format!( + "block {:?} not found on node", + hash + ))); } }; - if compressed { - path.set_extension("zst"); - } - let mut f = fs::File::open(&path).await?; + let (path, compressed) = match &block_path { + DataBlockPath::Plain(p) => (p, false), + DataBlockPath::Compressed(p) => (p, true), + }; + + let mut f = fs::File::open(&path).await?; let mut data = vec![]; f.read_to_end(&mut data).await?; drop(f); @@ -571,11 +575,16 @@ impl BlockManager { if data.verify(*hash).is_err() { self.metrics.corruption_counter.add(1); + warn!( + "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", + hash + ); self.lock_mutate(hash) .await - .move_block_to_corrupted(hash, self) + .move_block_to_corrupted(&block_path) .await?; self.resync.put_to_resync(hash, Duration::from_millis(0))?; + return Err(Error::CorruptData(*hash)); } @@ -604,56 +613,51 @@ impl BlockManager { .await } - /// Utility: gives the path of the directory in which a block should be found - fn block_dir(&self, hash: &Hash) -> PathBuf { - self.data_layout.primary_data_dir(hash) - } + /// Utility: check if block is stored compressed. + async fn find_block(&self, hash: &Hash) -> Option { + let dirs = Some(self.data_layout.primary_block_dir(hash)) + .into_iter() + .chain(self.data_layout.secondary_block_dirs(hash)); + let filename = hex::encode(hash.as_ref()); - /// Utility: give the full path where a block should be found, minus extension if block is - /// compressed - fn block_path(&self, hash: &Hash) -> PathBuf { - let mut path = self.block_dir(hash); - path.push(hex::encode(hash.as_ref())); - path - } + for dir in dirs { + let mut path = dir; + path.push(&filename); - /// Utility: check if block is stored compressed. Error if block is not stored - async fn is_block_compressed(&self, hash: &Hash) -> Result { - let mut path = self.block_path(hash); - - // If compression is disabled on node - check for the raw block - // first and then a compressed one (as compression may have been - // previously enabled). - match self.compression_level { - None => { + if self.compression_level.is_none() { + // If compression is disabled on node - check for the raw block + // first and then a compressed one (as compression may have been + // previously enabled). if fs::metadata(&path).await.is_ok() { - return Ok(false); + return Some(DataBlockPath::Plain(path)); } - path.set_extension("zst"); - - fs::metadata(&path).await.map(|_| true).map_err(Into::into) - } - _ => { - path.set_extension("zst"); - if fs::metadata(&path).await.is_ok() { - return Ok(true); + return Some(DataBlockPath::Compressed(path)); + } + } else { + path.set_extension("zst"); + if fs::metadata(&path).await.is_ok() { + return Some(DataBlockPath::Compressed(path)); } - path.set_extension(""); - - fs::metadata(&path).await.map(|_| false).map_err(Into::into) + if fs::metadata(&path).await.is_ok() { + return Some(DataBlockPath::Plain(path)); + } } } + + None } async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> { let tracer = opentelemetry::global::tracer("garage"); - self.mutation_lock[hash.as_slice()[0] as usize] + let ilock = u16::from_be_bytes([hash.as_slice()[0], hash.as_slice()[1]]) as usize + % self.mutation_lock.len(); + self.mutation_lock[ilock] .lock() .with_context(Context::current_with_span( - tracer.start("Acquire mutation_lock"), + tracer.start(format!("Acquire mutation_lock #{}", ilock)), )) .await } @@ -688,7 +692,7 @@ impl BlockManagerLocked { hash: &Hash, mgr: &BlockManager, ) -> Result { - let exists = mgr.is_block_compressed(hash).await.is_ok(); + let exists = mgr.find_block(hash).await.is_some(); let needed = mgr.rc.get_block_rc(hash)?; Ok(BlockStatus { exists, needed }) @@ -703,21 +707,17 @@ impl BlockManagerLocked { let compressed = data.is_compressed(); let data = data.inner_buffer(); - let mut path = mgr.block_dir(hash); + let mut path = mgr.data_layout.primary_block_dir(hash); let directory = path.clone(); path.push(hex::encode(hash)); fs::create_dir_all(&directory).await?; - let to_delete = match (mgr.is_block_compressed(hash).await, compressed) { - (Ok(true), _) => return Ok(()), - (Ok(false), false) => return Ok(()), - (Ok(false), true) => { - let path_to_delete = path.clone(); - path.set_extension("zst"); - Some(path_to_delete) - } - (Err(_), compressed) => { + let to_delete = match (mgr.find_block(hash).await, compressed) { + (Some(DataBlockPath::Compressed(_)), _) => return Ok(()), + (Some(DataBlockPath::Plain(_)), false) => return Ok(()), + (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path), + (None, compressed) => { if compressed { path.set_extension("zst"); } @@ -766,19 +766,20 @@ impl BlockManagerLocked { Ok(()) } - async fn move_block_to_corrupted(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> { - warn!( - "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", - hash - ); - let mut path = mgr.block_path(hash); - let mut path2 = path.clone(); - if mgr.is_block_compressed(hash).await? { - path.set_extension("zst"); - path2.set_extension("zst.corrupted"); - } else { - path2.set_extension("corrupted"); - } + async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> { + let (path, path2) = match block_path { + DataBlockPath::Plain(p) => { + let mut p2 = p.clone(); + p2.set_extension("corrupted"); + (p, p2) + } + DataBlockPath::Compressed(p) => { + let mut p2 = p.clone(); + p2.set_extension("zst.corrupted"); + (p, p2) + } + }; + fs::rename(path, path2).await?; Ok(()) } @@ -787,12 +788,14 @@ impl BlockManagerLocked { let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?; if exists && needed.is_deletable() { - let mut path = mgr.block_path(hash); - if mgr.is_block_compressed(hash).await? { - path.set_extension("zst"); + let path_opt = match mgr.find_block(hash).await { + Some(DataBlockPath::Plain(p)) | Some(DataBlockPath::Compressed(p)) => Some(p), + None => None, + }; + if let Some(path) = path_opt { + fs::remove_file(path).await?; + mgr.metrics.delete_counter.add(1); } - fs::remove_file(path).await?; - mgr.metrics.delete_counter.add(1); } Ok(()) } From a09f86729c1c28c6881b802b49d5574386ef1d0d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 14:37:10 +0200 Subject: [PATCH 04/26] block manager: move blocks in write_block if necessary --- src/block/manager.rs | 40 +++++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index 73fefa0c2..798cedf97 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -707,25 +707,39 @@ impl BlockManagerLocked { let compressed = data.is_compressed(); let data = data.inner_buffer(); - let mut path = mgr.data_layout.primary_block_dir(hash); - let directory = path.clone(); - path.push(hex::encode(hash)); + let mut tgt_path = mgr.data_layout.primary_block_dir(hash); + let directory = tgt_path.clone(); + tgt_path.push(hex::encode(hash)); + if compressed { + tgt_path.set_extension("zst"); + } fs::create_dir_all(&directory).await?; let to_delete = match (mgr.find_block(hash).await, compressed) { - (Some(DataBlockPath::Compressed(_)), _) => return Ok(()), - (Some(DataBlockPath::Plain(_)), false) => return Ok(()), + // If the block is stored in the wrong directory, + // write it again at the correct path and delete the old path + (Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p), + (Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p), + + // If the block is already stored not compressed but we have a compressed + // copy, write the compressed copy and delete the uncompressed one (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path), - (None, compressed) => { - if compressed { - path.set_extension("zst"); - } - None - } + + // If the block is already stored compressed, + // keep the stored copy, we have nothing to do + (Some(DataBlockPath::Compressed(_)), _) => return Ok(()), + + // If the block is already stored not compressed, + // and we don't have a compressed copy either, + // keep the stored copy, we have nothing to do + (Some(DataBlockPath::Plain(_)), false) => return Ok(()), + + // If the block isn't stored already, just store what is given to us + (None, _) => None, }; - let mut path_tmp = path.clone(); + let mut path_tmp = tgt_path.clone(); let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>())); path_tmp.set_extension(tmp_extension); @@ -740,7 +754,7 @@ impl BlockManagerLocked { drop(f); - fs::rename(path_tmp, path).await?; + fs::rename(path_tmp, tgt_path).await?; delete_on_drop.cancel(); From 3199cab4c89cac7351925303f8bb6408ffe56ff0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 14:45:26 +0200 Subject: [PATCH 05/26] update cargo.nix --- Cargo.nix | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.nix b/Cargo.nix index dc30c3553..b9bda61da 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -33,7 +33,7 @@ args@{ ignoreLockHash, }: let - nixifiedLockHash = "f5b86f9d75664ba528a26ae71f07a38e9c72c78fe331420b9b639e2a099d4dad"; + nixifiedLockHash = "685d51432f57c5ad2d5c80e725822b9c9bfd7cc632340f70aa1377c1d89117e4"; workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc; currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock); lockHashIgnored = if ignoreLockHash @@ -1844,6 +1844,7 @@ in async_compression = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".async-compression."0.4.1" { inherit profileName; }).out; async_trait = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.73" { profileName = "__noProfile"; }).out; bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.4.0" { inherit profileName; }).out; + bytesize = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytesize."1.3.0" { inherit profileName; }).out; futures = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.28" { inherit profileName; }).out; futures_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.28" { inherit profileName; }).out; garage_db = (rustPackages."unknown".garage_db."0.8.3" { inherit profileName; }).out; From 1b8c265c14b2f788693aed6c15f65684c72d2d1c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 15:04:59 +0200 Subject: [PATCH 06/26] block manager: get rid of check_block_status --- src/block/manager.rs | 84 ++++++++++++++++++-------------------------- src/block/resync.rs | 17 +++++---- 2 files changed, 45 insertions(+), 56 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index 798cedf97..5bad34d40 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -543,8 +543,8 @@ impl BlockManager { } async fn read_block_internal(&self, hash: &Hash) -> Result { - let block_path = match self.find_block(hash).await { - Some(p) => p, + match self.find_block(hash).await { + Some(p) => self.read_block_from(hash, &p).await, None => { // Not found but maybe we should have had it ?? self.resync @@ -554,9 +554,15 @@ impl BlockManager { hash ))); } - }; + } + } - let (path, compressed) = match &block_path { + pub(crate) async fn read_block_from( + &self, + hash: &Hash, + block_path: &DataBlockPath, + ) -> Result { + let (path, compressed) = match block_path { DataBlockPath::Plain(p) => (p, false), DataBlockPath::Compressed(p) => (p, true), }; @@ -581,7 +587,7 @@ impl BlockManager { ); self.lock_mutate(hash) .await - .move_block_to_corrupted(&block_path) + .move_block_to_corrupted(block_path) .await?; self.resync.put_to_resync(hash, Duration::from_millis(0))?; @@ -591,18 +597,11 @@ impl BlockManager { Ok(data) } - /// Check if this node has a block and whether it needs it - pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result { - self.lock_mutate(hash) - .await - .check_block_status(hash, self) - .await - } - /// Check if this node should have a block, but don't actually have it async fn need_block(&self, hash: &Hash) -> Result { - let BlockStatus { exists, needed } = self.check_block_status(hash).await?; - Ok(needed.is_nonzero() && !exists) + let rc = self.rc.get_block_rc(hash)?; + let exists = self.find_block(hash).await.is_some(); + Ok(rc.is_nonzero() && !exists) } /// Delete block if it is not needed anymore @@ -613,8 +612,8 @@ impl BlockManager { .await } - /// Utility: check if block is stored compressed. - async fn find_block(&self, hash: &Hash) -> Option { + /// Find the path where a block is currently stored + pub(crate) async fn find_block(&self, hash: &Hash) -> Option { let dirs = Some(self.data_layout.primary_block_dir(hash)) .into_iter() .chain(self.data_layout.secondary_block_dirs(hash)); @@ -687,17 +686,6 @@ pub(crate) struct BlockStatus { } impl BlockManagerLocked { - async fn check_block_status( - &self, - hash: &Hash, - mgr: &BlockManager, - ) -> Result { - let exists = mgr.find_block(hash).await.is_some(); - let needed = mgr.rc.get_block_rc(hash)?; - - Ok(BlockStatus { exists, needed }) - } - async fn write_block( &self, hash: &Hash, @@ -710,32 +698,32 @@ impl BlockManagerLocked { let mut tgt_path = mgr.data_layout.primary_block_dir(hash); let directory = tgt_path.clone(); tgt_path.push(hex::encode(hash)); - if compressed { - tgt_path.set_extension("zst"); - } + if compressed { + tgt_path.set_extension("zst"); + } fs::create_dir_all(&directory).await?; let to_delete = match (mgr.find_block(hash).await, compressed) { - // If the block is stored in the wrong directory, - // write it again at the correct path and delete the old path + // If the block is stored in the wrong directory, + // write it again at the correct path and delete the old path (Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p), (Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p), - // If the block is already stored not compressed but we have a compressed - // copy, write the compressed copy and delete the uncompressed one + // If the block is already stored not compressed but we have a compressed + // copy, write the compressed copy and delete the uncompressed one (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path), - // If the block is already stored compressed, - // keep the stored copy, we have nothing to do + // If the block is already stored compressed, + // keep the stored copy, we have nothing to do (Some(DataBlockPath::Compressed(_)), _) => return Ok(()), - // If the block is already stored not compressed, - // and we don't have a compressed copy either, - // keep the stored copy, we have nothing to do + // If the block is already stored not compressed, + // and we don't have a compressed copy either, + // keep the stored copy, we have nothing to do (Some(DataBlockPath::Plain(_)), false) => return Ok(()), - // If the block isn't stored already, just store what is given to us + // If the block isn't stored already, just store what is given to us (None, _) => None, }; @@ -799,14 +787,12 @@ impl BlockManagerLocked { } async fn delete_if_unneeded(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> { - let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?; - - if exists && needed.is_deletable() { - let path_opt = match mgr.find_block(hash).await { - Some(DataBlockPath::Plain(p)) | Some(DataBlockPath::Compressed(p)) => Some(p), - None => None, - }; - if let Some(path) = path_opt { + let rc = mgr.rc.get_block_rc(hash)?; + if rc.is_deletable() { + while let Some(path) = mgr.find_block(hash).await { + let path = match path { + DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p, + }; fs::remove_file(path).await?; mgr.metrics.delete_counter.add(1); } diff --git a/src/block/resync.rs b/src/block/resync.rs index ea280ad4b..bb43ad7e9 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -359,20 +359,23 @@ impl BlockResyncManager { } async fn resync_block(&self, manager: &BlockManager, hash: &Hash) -> Result<(), Error> { - let BlockStatus { exists, needed } = manager.check_block_status(hash).await?; + let existing_path = manager.find_block(hash).await; + let exists = existing_path.is_some(); + let rc = manager.rc.get_block_rc(hash)?; - if exists != needed.is_needed() || exists != needed.is_nonzero() { + if exists != rc.is_needed() || exists != rc.is_nonzero() { debug!( "Resync block {:?}: exists {}, nonzero rc {}, deletable {}", hash, exists, - needed.is_nonzero(), - needed.is_deletable(), + rc.is_nonzero(), + rc.is_deletable(), ); } - if exists && needed.is_deletable() { + if exists && rc.is_deletable() { info!("Resync block {:?}: offloading and deleting", hash); + let existing_path = existing_path.unwrap(); let mut who = manager.replication.write_nodes(hash); if who.len() < manager.replication.write_quorum() { @@ -419,7 +422,7 @@ impl BlockResyncManager { .add(1, &[KeyValue::new("to", format!("{:?}", node))]); } - let block = manager.read_block(hash).await?; + let block = manager.read_block_from(hash, &existing_path).await?; let (header, bytes) = block.into_parts(); let put_block_message = Req::new(BlockRpc::PutBlock { hash: *hash, @@ -451,7 +454,7 @@ impl BlockResyncManager { manager.rc.clear_deleted_block_rc(hash)?; } - if needed.is_nonzero() && !exists { + if rc.is_nonzero() && !exists { info!( "Resync block {:?}: fetching absent but needed block (refcount > 0)", hash From fd00a47ddc4acfe62428064847176a5938ed64a9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 15:07:29 +0200 Subject: [PATCH 07/26] table queue: increase batch size --- src/table/queue.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/table/queue.rs b/src/table/queue.rs index 0857209b9..096ac8b47 100644 --- a/src/table/queue.rs +++ b/src/table/queue.rs @@ -12,7 +12,7 @@ use crate::replication::*; use crate::schema::*; use crate::table::*; -const BATCH_SIZE: usize = 100; +const BATCH_SIZE: usize = 1024; pub(crate) struct InsertQueueWorker(pub(crate) Arc>) where From 93114a9747cefa441ebd274f206c09699d051b39 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 15:39:21 +0200 Subject: [PATCH 08/26] block manager: refactoring --- src/block/manager.rs | 96 ++++++++++++++++++++------------------------ src/model/garage.rs | 2 +- 2 files changed, 45 insertions(+), 53 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index 5bad34d40..d18d3f4cc 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -88,7 +88,7 @@ pub struct BlockManager { data_fsync: bool, compression_level: Option, - mutation_lock: [Mutex; 256], + mutation_lock: Vec>, pub(crate) rc: BlockRc, pub resync: BlockResyncManager, @@ -111,6 +111,9 @@ pub struct BlockResyncErrorInfo { pub next_try: u64, } +// The number of different mutexes used to parallelize write access to data blocks +const MUTEX_COUNT: usize = 256; + // This custom struct contains functions that must only be ran // when the lock is held. We ensure that it is the case by storing // it INSIDE a Mutex. @@ -124,21 +127,24 @@ impl BlockManager { compression_level: Option, replication: TableShardedReplication, system: Arc, - ) -> Arc { - // TODO don't panic, report error + ) -> Result, Error> { + // Load or compute layout, i.e. assignment of data blocks to the different data directories let layout_persister: Persister = Persister::new(&system.metadata_dir, "data_layout"); let data_layout = match layout_persister.load() { Ok(mut layout) => { - layout.update(&data_dir).expect("invalid data_dir config"); + layout + .update(&data_dir) + .ok_or_message("invalid data_dir config")?; layout } - Err(_) => DataLayout::initialize(&data_dir).expect("invalid data_dir config"), + Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?, }; layout_persister .save(&data_layout) .expect("cannot save data_layout"); + // Open metadata tables let rc = db .open_tree("block_local_rc") .expect("Unable to open block_local_rc tree"); @@ -165,7 +171,10 @@ impl BlockManager { data_layout, data_fsync, compression_level, - mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())), + mutation_lock: vec![(); MUTEX_COUNT] + .iter() + .map(|_| Mutex::new(BlockManagerLocked())) + .collect::>(), rc, resync, system, @@ -177,7 +186,7 @@ impl BlockManager { block_manager.endpoint.set_handler(block_manager.clone()); block_manager.scrub_persister.set_with(|_| ()).unwrap(); - block_manager + Ok(block_manager) } pub fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { @@ -224,44 +233,10 @@ impl BlockManager { hash: &Hash, order_tag: Option, ) -> Result<(DataBlockHeader, ByteStream), Error> { - let who = self.replication.read_nodes(hash); - let who = self.system.rpc.request_order(&who); - - for node in who.iter() { - let node_id = NodeID::from(*node); - let rpc = self.endpoint.call_streaming( - &node_id, - BlockRpc::GetBlock(*hash, order_tag), - PRIO_NORMAL | PRIO_SECONDARY, - ); - tokio::select! { - res = rpc => { - let res = match res { - Ok(res) => res, - Err(e) => { - debug!("Node {:?} returned error: {}", node, e); - continue; - } - }; - let (header, stream) = match res.into_parts() { - (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), - _ => { - debug!("Node {:?} returned a malformed response", node); - continue; - } - }; - return Ok((header, stream)); - } - _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { - debug!("Node {:?} didn't return block in time, trying next.", node); - } - }; - } - - Err(Error::Message(format!( - "Unable to read block {:?}: no node returned a valid block", - hash - ))) + self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { + Ok((header, stream)) + }) + .await } /// Ask nodes that might have a (possibly compressed) block for it @@ -271,6 +246,24 @@ impl BlockManager { hash: &Hash, order_tag: Option, ) -> Result { + self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { + read_stream_to_end(stream) + .await + .map(|data| DataBlock::from_parts(header, data)) + }) + .await + } + + async fn rpc_get_raw_block_internal( + &self, + hash: &Hash, + order_tag: Option, + f: F, + ) -> Result + where + F: Fn(DataBlockHeader, ByteStream) -> Fut, + Fut: futures::Future>, + { let who = self.replication.read_nodes(hash); let who = self.system.rpc.request_order(&who); @@ -297,13 +290,17 @@ impl BlockManager { continue; } }; - match read_stream_to_end(stream).await { - Ok(bytes) => return Ok(DataBlock::from_parts(header, bytes)), + match f(header, stream).await { + Ok(ret) => return Ok(ret), Err(e) => { debug!("Error reading stream from node {:?}: {}", node, e); } } } + // TODO: sleep less long (fail early), initiate a second request earlier + // if the first one doesn't succeed rapidly + // TODO: keep first request running when initiating a new one and take the + // one that finishes earlier _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { debug!("Node {:?} didn't return block in time, trying next.", node); } @@ -680,11 +677,6 @@ impl StreamingEndpointHandler for BlockManager { } } -pub(crate) struct BlockStatus { - pub(crate) exists: bool, - pub(crate) needed: RcEntry, -} - impl BlockManagerLocked { async fn write_block( &self, diff --git a/src/model/garage.rs b/src/model/garage.rs index d6eebfb0d..721d5e3ac 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -251,7 +251,7 @@ impl Garage { config.compression_level, data_rep_param, system.clone(), - ); + )?; block_manager.register_bg_vars(&mut bg_vars); // ---- admin tables ---- From 3a74844df02b5ecec0b96bfb8b2ff3bcdd33f7f4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 15:41:36 +0200 Subject: [PATCH 09/26] block manager: fix dir_not_empty --- src/block/layout.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/block/layout.rs b/src/block/layout.rs index b119281b9..3b529fc0f 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -276,8 +276,7 @@ fn dir_not_empty(path: &PathBuf) -> Result { .into_string() .ok() .and_then(|hex| hex::decode(&hex).ok()) - .map(|bytes| (2..=4).contains(&bytes.len())) - .unwrap_or(false) + .is_some() { return Ok(true); } From a44f4869312678e3c6eaac1a26a7beb4652f3e69 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 15:57:25 +0200 Subject: [PATCH 10/26] block manager: refactoring & increase max worker count to 8 --- src/block/manager.rs | 26 +++++++++++++------------- src/block/resync.rs | 2 +- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index d18d3f4cc..b42a9aa9c 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -279,21 +279,21 @@ impl BlockManager { let res = match res { Ok(res) => res, Err(e) => { - debug!("Node {:?} returned error: {}", node, e); + debug!("Get block {:?}: node {:?} returned error: {}", hash, node, e); continue; } }; let (header, stream) = match res.into_parts() { (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), _ => { - debug!("Node {:?} returned a malformed response", node); + debug!("Get block {:?}: node {:?} returned a malformed response", hash, node); continue; } }; match f(header, stream).await { Ok(ret) => return Ok(ret), Err(e) => { - debug!("Error reading stream from node {:?}: {}", node, e); + debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e); } } } @@ -302,15 +302,14 @@ impl BlockManager { // TODO: keep first request running when initiating a new one and take the // one that finishes earlier _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { - debug!("Node {:?} didn't return block in time, trying next.", node); + debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node); } }; } - Err(Error::Message(format!( - "Unable to read block {:?}: no node returned a valid block", - hash - ))) + let msg = format!("Get block {:?}: no node returned a valid block", hash); + debug!("{}", msg); + Err(Error::Message(msg)) } // ---- Public interface ---- @@ -666,7 +665,7 @@ impl StreamingEndpointHandler for BlockManager { BlockRpc::PutBlock { hash, header } => Resp::new( self.handle_put_block(*hash, *header, message.take_stream()) .await - .map(|_| BlockRpc::Ok), + .map(|()| BlockRpc::Ok), ), BlockRpc::GetBlock(h, order_tag) => self.handle_get_block(h, *order_tag).await, BlockRpc::NeedBlockQuery(h) => { @@ -687,15 +686,14 @@ impl BlockManagerLocked { let compressed = data.is_compressed(); let data = data.inner_buffer(); - let mut tgt_path = mgr.data_layout.primary_block_dir(hash); - let directory = tgt_path.clone(); + let directory = mgr.data_layout.primary_block_dir(hash); + + let mut tgt_path = directory.clone(); tgt_path.push(hex::encode(hash)); if compressed { tgt_path.set_extension("zst"); } - fs::create_dir_all(&directory).await?; - let to_delete = match (mgr.find_block(hash).await, compressed) { // If the block is stored in the wrong directory, // write it again at the correct path and delete the old path @@ -723,6 +721,8 @@ impl BlockManagerLocked { let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>())); path_tmp.set_extension(tmp_extension); + fs::create_dir_all(&directory).await?; + let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone())); let mut f = fs::File::create(&path_tmp).await?; diff --git a/src/block/resync.rs b/src/block/resync.rs index bb43ad7e9..9c1da4a77 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -41,7 +41,7 @@ pub(crate) const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60); pub(crate) const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6; // No more than 4 resync workers can be running in the system -pub(crate) const MAX_RESYNC_WORKERS: usize = 4; +pub(crate) const MAX_RESYNC_WORKERS: usize = 8; // Resync tranquility is initially set to 2, but can be changed in the CLI // and the updated version is persisted over Garage restarts const INITIAL_RESYNC_TRANQUILITY: u32 = 2; From 55c514999eef17d7764040cde1b7b38ca111d24c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 17:00:52 +0200 Subject: [PATCH 11/26] block manager: fixes in layout --- src/block/layout.rs | 56 ++++++++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/src/block/layout.rs b/src/block/layout.rs index 3b529fc0f..19b6fa177 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -11,25 +11,28 @@ type Idx = u16; const DRIVE_NPART: usize = 1024; -const DPART_BYTES: (usize, usize) = (2, 3); +const HASH_DRIVE_BYTES: (usize, usize) = (2, 3); #[derive(Serialize, Deserialize, Debug, Clone)] pub(crate) struct DataLayout { pub(crate) data_dirs: Vec, /// Primary storage location (index in data_dirs) for each partition + /// = the location where the data is supposed to be, blocks are always + /// written there (copies in other dirs may be deleted if they exist) pub(crate) part_prim: Vec, - /// Secondary storage locations for each partition + /// Secondary storage locations for each partition = locations + /// where data blocks might be, we check from these dirs when reading pub(crate) part_sec: Vec>, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] pub(crate) struct DataDir { pub(crate) path: PathBuf, pub(crate) state: DataDirState, } -#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)] pub(crate) enum DataDirState { Active { capacity: u64 }, ReadOnly, @@ -55,8 +58,9 @@ impl DataLayout { assert_eq!(cum_cap, total_cap); assert_eq!(part_prim.len(), DRIVE_NPART); - // If any of the storage locations is non-empty, add it as a secondary - // storage location for all partitions + // If any of the storage locations is non-empty, it probably existed before + // this algorithm was added, so add it as a secondary storage location for all partitions + // to make sure existing files are not lost let mut part_sec = vec![vec![]; DRIVE_NPART]; for (i, dd) in data_dirs.iter().enumerate() { if dir_not_empty(&dd.path)? { @@ -75,10 +79,14 @@ impl DataLayout { }) } - pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Result { - // Compute list of new data directories and mapping of old indices - // to new indices + pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Result<(), Error> { + // Make list of new data directories, exit if nothing changed let data_dirs = make_data_dirs(dirs)?; + if data_dirs == self.data_dirs { + return Ok(()); + } + + // Compute mapping of old indices to new indices let old2new = self .data_dirs .iter() @@ -114,15 +122,13 @@ impl DataLayout { // Compute the target number of partitions per data directory let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::(); let mut cum_cap = 0; - let mut npart_per_dir = vec![]; - for dd in data_dirs.iter() { + let mut npart_per_dir = vec![0; data_dirs.len()]; + for (idir, dd) in data_dirs.iter().enumerate() { if let DataDirState::Active { capacity } = dd.state { let begin = (cum_cap * DRIVE_NPART as u64) / total_cap; cum_cap += capacity; let end = (cum_cap * DRIVE_NPART as u64) / total_cap; - npart_per_dir.push((end - begin) as usize); - } else { - npart_per_dir.push(0); + npart_per_dir[idir] = (end - begin) as usize; } } assert_eq!(cum_cap, total_cap); @@ -161,11 +167,14 @@ impl DataLayout { // add partitions from unassigned for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate() { - assert!(unassigned.len() >= *tgt_npart - parts.len()); - for _ in parts.len()..*tgt_npart { - let new_part = unassigned.pop().unwrap(); - part_prim[new_part] = Some(idir as Idx); - part_sec[new_part].retain(|x| *x != idir as Idx); + if parts.len() < *tgt_npart { + let required = *tgt_npart - parts.len(); + assert!(unassigned.len() >= required); + for _ in 0..required { + let new_part = unassigned.pop().unwrap(); + part_prim[new_part] = Some(idir as Idx); + part_sec[new_part].retain(|x| *x != idir as Idx); + } } } @@ -183,11 +192,12 @@ impl DataLayout { .unwrap_or(0) > 0)); - Ok(Self { + *self = Self { data_dirs, part_prim, part_sec, - }) + }; + Ok(()) } pub(crate) fn primary_block_dir(&self, hash: &Hash) -> PathBuf { @@ -208,8 +218,8 @@ impl DataLayout { fn partition_from(&self, hash: &Hash) -> usize { u16::from_be_bytes([ - hash.as_slice()[DPART_BYTES.0], - hash.as_slice()[DPART_BYTES.1], + hash.as_slice()[HASH_DRIVE_BYTES.0], + hash.as_slice()[HASH_DRIVE_BYTES.1], ]) as usize % DRIVE_NPART } From e30865984a5f23f046396ca192c1930314b50115 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 17:37:45 +0200 Subject: [PATCH 12/26] block manager: scrub checkpointing --- src/block/repair.rs | 158 ++++++++++++++++++++++++++++++-------------- 1 file changed, 108 insertions(+), 50 deletions(-) diff --git a/src/block/repair.rs b/src/block/repair.rs index 0e7fe0df0..a7c90d4f9 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -176,7 +176,9 @@ mod v081 { } mod v082 { + use garage_util::data::Hash; use serde::{Deserialize, Serialize}; + use std::path::PathBuf; use super::v081; @@ -186,6 +188,27 @@ mod v082 { pub(crate) time_last_complete_scrub: u64, pub(crate) time_next_run_scrub: u64, pub(crate) corruptions_detected: u64, + #[serde(default)] + pub(crate) checkpoint: Option, + } + + #[derive(Serialize, Deserialize, Clone)] + pub struct BlockStoreIterator { + pub todo: Vec, + } + + #[derive(Serialize, Deserialize, Clone)] + pub enum BsiTodo { + Directory { + path: PathBuf, + progress_min: u64, + progress_max: u64, + }, + File { + path: PathBuf, + hash: Hash, + progress: u64, + }, } impl garage_util::migrate::Migrate for ScrubWorkerPersisted { @@ -200,6 +223,7 @@ mod v082 { time_last_complete_scrub: old.time_last_complete_scrub, time_next_run_scrub: randomize_next_scrub_run_time(old.time_last_complete_scrub), corruptions_detected: old.corruptions_detected, + checkpoint: None, } } } @@ -236,14 +260,23 @@ impl Default for ScrubWorkerPersisted { time_next_run_scrub: randomize_next_scrub_run_time(now_msec()), tranquility: INITIAL_SCRUB_TRANQUILITY, corruptions_detected: 0, + checkpoint: None, } } } #[derive(Default)] enum ScrubWorkerState { - Running(BlockStoreIterator), - Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub + Running { + iterator: BlockStoreIterator, + // time of the last checkpoint + t_cp: u64, + }, + Paused { + iterator: BlockStoreIterator, + // time at which the scrub should be resumed + t_resume: u64, + }, #[default] Finished, } @@ -262,10 +295,17 @@ impl ScrubWorker { rx_cmd: mpsc::Receiver, persister: PersisterShared, ) -> Self { + let work = match persister.get_with(|x| x.checkpoint.clone()) { + None => ScrubWorkerState::Finished, + Some(iterator) => ScrubWorkerState::Running { + iterator, + t_cp: now_msec(), + }, + }; Self { manager, rx_cmd, - work: ScrubWorkerState::Finished, + work, tranquilizer: Tranquilizer::new(30), persister, } @@ -278,7 +318,16 @@ impl ScrubWorker { ScrubWorkerState::Finished => { info!("Scrub worker initializing, now performing datastore scrub"); let iterator = BlockStoreIterator::new(&self.manager); - ScrubWorkerState::Running(iterator) + if let Err(e) = self + .persister + .set_with(|x| x.checkpoint = Some(iterator.clone())) + { + error!("Could not save scrub checkpoint: {}", e); + } + ScrubWorkerState::Running { + iterator, + t_cp: now_msec(), + } } work => { error!("Cannot start scrub worker: already running!"); @@ -288,8 +337,18 @@ impl ScrubWorker { } ScrubWorkerCommand::Pause(dur) => { self.work = match std::mem::take(&mut self.work) { - ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => { - ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64) + ScrubWorkerState::Running { iterator, .. } + | ScrubWorkerState::Paused { iterator, .. } => { + if let Err(e) = self + .persister + .set_with(|x| x.checkpoint = Some(iterator.clone())) + { + error!("Could not save scrub checkpoint: {}", e); + } + ScrubWorkerState::Paused { + iterator, + t_resume: now_msec() + dur.as_millis() as u64, + } } work => { error!("Cannot pause scrub worker: not running!"); @@ -299,7 +358,10 @@ impl ScrubWorker { } ScrubWorkerCommand::Resume => { self.work = match std::mem::take(&mut self.work) { - ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it), + ScrubWorkerState::Paused { iterator, .. } => ScrubWorkerState::Running { + iterator, + t_cp: now_msec(), + }, work => { error!("Cannot resume scrub worker: not paused!"); work @@ -308,7 +370,7 @@ impl ScrubWorker { } ScrubWorkerCommand::Cancel => { self.work = match std::mem::take(&mut self.work) { - ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => { + ScrubWorkerState::Running { .. } | ScrubWorkerState::Paused { .. } => { ScrubWorkerState::Finished } work => { @@ -344,12 +406,15 @@ impl Worker for ScrubWorker { ..Default::default() }; match &self.work { - ScrubWorkerState::Running(bsi) => { - s.progress = Some(format!("{:.2}%", bsi.progress() * 100.)); + ScrubWorkerState::Running { iterator, .. } => { + s.progress = Some(format!("{:.2}%", iterator.progress() * 100.)); } - ScrubWorkerState::Paused(bsi, rt) => { - s.progress = Some(format!("{:.2}%", bsi.progress() * 100.)); - s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))]; + ScrubWorkerState::Paused { iterator, t_resume } => { + s.progress = Some(format!("{:.2}%", iterator.progress() * 100.)); + s.freeform = vec![format!( + "Scrub paused, resumes at {}", + msec_to_rfc3339(*t_resume) + )]; } ScrubWorkerState::Finished => { s.freeform = vec![ @@ -375,9 +440,11 @@ impl Worker for ScrubWorker { }; match &mut self.work { - ScrubWorkerState::Running(bsi) => { + ScrubWorkerState::Running { iterator, t_cp } => { self.tranquilizer.reset(); - if let Some((_path, hash)) = bsi.next().await? { + let now = now_msec(); + + if let Some((_path, hash)) = iterator.next().await? { match self.manager.read_block(&hash).await { Err(Error::CorruptData(_)) => { error!("Found corrupt data block during scrub: {:?}", hash); @@ -386,16 +453,23 @@ impl Worker for ScrubWorker { Err(e) => return Err(e), _ => (), }; + + if now - *t_cp > 60 * 1000 { + self.persister + .set_with(|p| p.checkpoint = Some(iterator.clone()))?; + *t_cp = now; + } + Ok(self .tranquilizer .tranquilize_worker(self.persister.get_with(|p| p.tranquility))) } else { - let now = now_msec(); let next_scrub_timestamp = randomize_next_scrub_run_time(now); self.persister.set_with(|p| { p.time_last_complete_scrub = now; p.time_next_run_scrub = next_scrub_timestamp; + p.checkpoint = None; })?; self.work = ScrubWorkerState::Finished; self.tranquilizer.clear(); @@ -414,8 +488,8 @@ impl Worker for ScrubWorker { async fn wait_for_work(&mut self) -> WorkerState { let (wait_until, command) = match &self.work { - ScrubWorkerState::Running(_) => return WorkerState::Busy, - ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), + ScrubWorkerState::Running { .. } => return WorkerState::Busy, + ScrubWorkerState::Paused { t_resume, .. } => (*t_resume, ScrubWorkerCommand::Resume), ScrubWorkerState::Finished => ( self.persister.get_with(|p| p.time_next_run_scrub), ScrubWorkerCommand::Start, @@ -438,7 +512,7 @@ impl Worker for ScrubWorker { } match &self.work { - ScrubWorkerState::Running(_) => WorkerState::Busy, + ScrubWorkerState::Running { .. } => WorkerState::Busy, _ => WorkerState::Idle, } } @@ -450,23 +524,6 @@ impl Worker for ScrubWorker { const PROGRESS_FP: u64 = 1_000_000_000; -struct BlockStoreIterator { - todo: Vec, -} - -enum BsiTodo { - Directory { - path: PathBuf, - progress_min: u64, - progress_max: u64, - }, - File { - path: PathBuf, - filename: String, - progress: u64, - }, -} - impl BlockStoreIterator { fn new(manager: &BlockManager) -> Self { let min_cap = manager @@ -551,11 +608,18 @@ impl BlockStoreIterator { progress_max: 0, }); } else if ft.is_file() { - self.todo.push(BsiTodo::File { - path: ent.path(), - filename: name, - progress: 0, - }); + let filename = name.split_once('.').map(|(f, _)| f).unwrap_or(&name); + if filename.len() == 64 { + if let Ok(h) = hex::decode(filename) { + let mut hash = [0u8; 32]; + hash.copy_from_slice(&h); + self.todo.push(BsiTodo::File { + path: ent.path(), + hash: hash.into(), + progress: 0, + }); + } + } } } @@ -582,20 +646,14 @@ impl BlockStoreIterator { self.todo[istart..].reverse(); debug_assert!(self.progress_invariant()); } - Some(BsiTodo::File { path, filename, .. }) => { - let filename = filename.strip_suffix(".zst").unwrap_or(&filename); - if filename.len() == 64 { - if let Ok(h) = hex::decode(filename) { - let mut hash = [0u8; 32]; - hash.copy_from_slice(&h); - return Ok(Some((path, hash.into()))); - } - } + Some(BsiTodo::File { path, hash, .. }) => { + return Ok(Some((path, hash))); } } } } + // for debug_assert! fn progress_invariant(&self) -> bool { let iter = self.todo.iter().map(|x| match x { BsiTodo::Directory { progress_min, .. } => progress_min, From f38a31b3304726aa7c890ba1a9f7a3e67b11bc60 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 6 Sep 2023 17:49:30 +0200 Subject: [PATCH 13/26] block manager: avoid incorrect data_dir configs and avoid losing files --- src/block/layout.rs | 41 ++++++++++++++++++++++++++++++++++++++--- src/block/manager.rs | 8 ++++++-- 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/src/block/layout.rs b/src/block/layout.rs index 19b6fa177..8098654f6 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -45,6 +45,7 @@ impl DataLayout { // Split partitions proportionnally to capacity for all drives // to affect primary storage location let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::(); + assert!(total_cap > 0); let mut part_prim = Vec::with_capacity(DRIVE_NPART); let mut cum_cap = 0; @@ -86,6 +87,9 @@ impl DataLayout { return Ok(()); } + let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::(); + assert!(total_cap > 0); + // Compute mapping of old indices to new indices let old2new = self .data_dirs @@ -120,7 +124,6 @@ impl DataLayout { } // Compute the target number of partitions per data directory - let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::(); let mut cum_cap = 0; let mut npart_per_dir = vec![0; data_dirs.len()]; for (idir, dd) in data_dirs.iter().enumerate() { @@ -182,6 +185,7 @@ impl DataLayout { assert!(part_prim.iter().all(|x| x.is_some())); assert!(unassigned.is_empty()); + // Transform part_prim from vec of Option to vec of Idx let part_prim = part_prim .into_iter() .map(|x| x.unwrap()) @@ -192,6 +196,25 @@ impl DataLayout { .unwrap_or(0) > 0)); + // If any of the newly added storage locations is non-empty, + // it might have been removed and added again and might contain data, + // so add it as a secondary storage location for all partitions + // to make sure existing files are not lost + let mut part_sec = vec![vec![]; DRIVE_NPART]; + for (i, dd) in data_dirs.iter().enumerate() { + if self.data_dirs.iter().any(|ed| ed.path == dd.path) { + continue; + } + if dir_not_empty(&dd.path)? { + for (sec, prim) in part_sec.iter_mut().zip(part_prim.iter()) { + if *prim != i as Idx && !sec.contains(&(i as Idx)) { + sec.push(i as Idx); + } + } + } + } + + // Apply newly generated config *self = Self { data_dirs, part_prim, @@ -254,12 +277,18 @@ fn make_data_dirs(dirs: &DataDirEnum) -> Result, Error> { }, }), DataDirEnum::Multiple(dirs) => { + let mut ok = false; for dir in dirs.iter() { let state = match &dir.capacity { Some(cap) if dir.read_only == false => { + let capacity = cap.parse::() + .ok_or_message("invalid capacity value")?.as_u64(); + if capacity == 0 { + return Err(Error::Message(format!("data directory {} should have non-zero capacity", dir.path.to_string_lossy()))); + } + ok = true; DataDirState::Active { - capacity: cap.parse::() - .ok_or_message("invalid capacity value")?.as_u64(), + capacity, } } None if dir.read_only == true => { @@ -272,6 +301,12 @@ fn make_data_dirs(dirs: &DataDirEnum) -> Result, Error> { state, }); } + if !ok { + return Err(Error::Message( + "incorrect data_dir configuration, no primary writable directory specified" + .into(), + )); + } } } Ok(data_dirs) diff --git a/src/block/manager.rs b/src/block/manager.rs index b42a9aa9c..eb498be0d 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -279,16 +279,20 @@ impl BlockManager { let res = match res { Ok(res) => res, Err(e) => { - debug!("Get block {:?}: node {:?} returned error: {}", hash, node, e); + debug!("Get block {:?}: node {:?} could not be contacted: {}", hash, node, e); continue; } }; let (header, stream) = match res.into_parts() { (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), - _ => { + (Ok(_), _) => { debug!("Get block {:?}: node {:?} returned a malformed response", hash, node); continue; } + (Err(e), _) => { + debug!("Get block {:?}: node {:?} returned error: {}", hash, node, e); + continue; + } }; match f(header, stream).await { Ok(ret) => return Ok(ret), From 99ed18350f9572ebb1968107d3708d53682ee805 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Sep 2023 12:41:36 +0200 Subject: [PATCH 14/26] block manager: refactor and fix monitoring/statistics --- src/block/manager.rs | 47 +++++++++++++++++++------------------------- 1 file changed, 20 insertions(+), 27 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index eb498be0d..0081f46cf 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -491,8 +491,6 @@ impl BlockManager { pub(crate) async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<(), Error> { let tracer = opentelemetry::global::tracer("garage"); - let write_size = data.inner_buffer().len() as u64; - self.lock_mutate(hash) .await .write_block(hash, data, self) @@ -502,8 +500,6 @@ impl BlockManager { )) .await?; - self.metrics.bytes_written.add(write_size); - Ok(()) } @@ -530,31 +526,26 @@ impl BlockManager { /// Read block from disk, verifying it's integrity pub(crate) async fn read_block(&self, hash: &Hash) -> Result { - let data = self - .read_block_internal(hash) - .bound_record_duration(&self.metrics.block_read_duration) - .await?; - - self.metrics - .bytes_read - .add(data.inner_buffer().len() as u64); - - Ok(data) - } - - async fn read_block_internal(&self, hash: &Hash) -> Result { - match self.find_block(hash).await { - Some(p) => self.read_block_from(hash, &p).await, - None => { - // Not found but maybe we should have had it ?? - self.resync - .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?; - return Err(Error::Message(format!( - "block {:?} not found on node", - hash - ))); + let tracer = opentelemetry::global::tracer("garage"); + async { + match self.find_block(hash).await { + Some(p) => self.read_block_from(hash, &p).await, + None => { + // Not found but maybe we should have had it ?? + self.resync + .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?; + return Err(Error::Message(format!( + "block {:?} not found on node", + hash + ))); + } } } + .bound_record_duration(&self.metrics.block_read_duration) + .with_context(Context::current_with_span( + tracer.start("BlockManager::read_block"), + )) + .await } pub(crate) async fn read_block_from( @@ -570,6 +561,7 @@ impl BlockManager { let mut f = fs::File::open(&path).await?; let mut data = vec![]; f.read_to_end(&mut data).await?; + self.metrics.bytes_read.add(data.len() as u64); drop(f); let data = if compressed { @@ -731,6 +723,7 @@ impl BlockManagerLocked { let mut f = fs::File::create(&path_tmp).await?; f.write_all(data).await?; + mgr.metrics.bytes_written.add(data.len() as u64); if mgr.data_fsync { f.sync_all().await?; From bca347a1e8e4bb74e744ec8e020b8144c6cafdf3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Sep 2023 12:52:44 +0200 Subject: [PATCH 15/26] doc: update page on upgradin clusters --- doc/book/operations/upgrading.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/book/operations/upgrading.md b/doc/book/operations/upgrading.md index e8919a190..9a7382822 100644 --- a/doc/book/operations/upgrading.md +++ b/doc/book/operations/upgrading.md @@ -80,6 +80,6 @@ The entire procedure would look something like this: 5. If any specific migration procedure is required, it is usually in one of the two cases: - It can be run on online nodes after the new version has started, during regular cluster operation. - - it has to be run offline + - it has to be run offline, in which case you will have to again take all nodes offline one after the other to run the repair For this last step, please refer to the specific documentation pertaining to the version upgrade you are doing. From 6595efd82fc849c97b964969b6ff935738e7d24a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Sep 2023 13:23:02 +0200 Subject: [PATCH 16/26] Document multi-hdd support --- doc/book/cookbook/real-world.md | 11 +-- doc/book/operations/multi-hdd.md | 100 +++++++++++++++++++++ doc/book/reference-manual/configuration.md | 13 +++ 3 files changed, 116 insertions(+), 8 deletions(-) create mode 100644 doc/book/operations/multi-hdd.md diff --git a/doc/book/cookbook/real-world.md b/doc/book/cookbook/real-world.md index 7061069f9..a8fbb3711 100644 --- a/doc/book/cookbook/real-world.md +++ b/doc/book/cookbook/real-world.md @@ -75,16 +75,11 @@ to store 2 TB of data in total. - For the metadata storage, Garage does not do checksumming and integrity verification on its own. If you are afraid of bitrot/data corruption, - put your metadata directory on a BTRFS partition. Otherwise, just use regular + put your metadata directory on a ZFS or BTRFS partition. Otherwise, just use regular EXT4 or XFS. -- Having a single server with several storage drives is currently not very well - supported in Garage ([#218](https://git.deuxfleurs.fr/Deuxfleurs/garage/issues/218)). - For an easy setup, just put all your drives in a RAID0 or a ZFS RAIDZ array. - If you're adventurous, you can try to format each of your disk as - a separate XFS partition, and then run one `garage` daemon per disk drive, - or use something like [`mergerfs`](https://github.com/trapexit/mergerfs) to merge - all your disks in a single union filesystem that spreads load over them. +- Servers with multiple HDDs are supported natively by Garage without resorting + to RAID, see [our dedicated documentation page](@/documentation/operations/multi-hdd.md). ## Get a Docker image diff --git a/doc/book/operations/multi-hdd.md b/doc/book/operations/multi-hdd.md new file mode 100644 index 000000000..5f9522623 --- /dev/null +++ b/doc/book/operations/multi-hdd.md @@ -0,0 +1,100 @@ ++++ +title = "Multi-HDD support" +weight = 15 ++++ + + +Since v0.9, Garage natively supports nodes that have several storage drives +for storing data blocks (not for metadata storage). + +## Initial setup + +To set up a new Garage storage node with multiple HDDs, +format and mount all your drives in different directories, +and use a Garage configuration as follows: + +```toml +data_dir = [ + { path = "/path/to/hdd1", capacity = "2T" }, + { path = "/path/to/hdd2", capacity = "4T" }, +] +``` + +Garage will automatically balance all blocks stored by the node +among the different specified directories, proportionnally to the +specified capacities. + +## Updating the list of storage locations + +If you add new storage locations to your `data_dir`, +Garage will not rebalance existing data between storage locations. +Newly written blocks will be balanced proportionnally to the specified capacities, +and existing data may be moved between drives to improve balancing, +but only opportunistically when a data block is re-written (e.g. an object +is re-uploaded, or an object with a duplicate block is uploaded). + +To understand precisely what is happening, we need to dive in to how Garage +splits data among the different storage locations. + +First of all, Garage divides the set of all possible block hashes +in a fixed number of slices (currently 1024), and assigns +to each slice a primary storage location among the specified data directories. +The number of slices having their primary location in each data directory +is proportionnal to the capacity specified in the config file. + +When Garage receives a block to write, it will always write it in the primary +directory of the slice that contains its hash. + +Now, to be able to not lose existing data blocks when storage locations +are added, Garage also keeps a list of secondary data directories +for all of the hash slices. Secondary data directories for a slice indicates +storage locations that once were primary directories for that slice, i.e. where +Garage knows that data blocks of that slice might be stored. +When Garage is requested to read a certain data block, +it will first look in the primary storage directory of its slice, +and if it doesn't find it there it goes through all of the secondary storage +locations until it finds it. This allows Garage to continue operating +normally when storage locations are added, without having to shuffle +files between drives to place them in the correct location. + +This relatively simple strategy works well but does not ensure that data +is correctly balanced among drives according to their capacity. +To rebalance data, two strategies can be used: + +- Lazy rebalancing: when a block is re-written (e.g. the object is re-uploaded), + Garage checks whether the existing copy is in the primary directory of the slice + or in a secondary directory. If the current copy is in a secondary directory, + Garage re-writes a copy in the primary directory and deletes the one from the + secondary directory. + +- Active rebalancing: an operator of a Garage node can explicitly launch a repair + procedure that rebalances the data directories, moving all blocks to their + primary location. Once done, all secondary locations for all hash slices are + removed so that they won't be checked anymore when looking for a data block. + +## Read-only storage locations + +If you would like to move all data blocks from an existing data directory to one +or several new data directories, mark the old directory as read-only: + +```toml +data_dir = [ + { path = "/path/to/old_data", read_only = true }, + { path = "/path/to/new_hdd1", capacity = "2T" }, + { path = "/path/to/new_hdd2", capacity = "4T" }, +] +``` + +Garage will be able to read requested blocks from the read-only directory. +Garage will also move data out of the read-only directory either progressively +(lazy rebalancing) or if requested explicitly (active rebalancing). + +Once an active rebalancing has finished, your read-only directory should be empty: +it might still contain subdirectories, but no data files. You can check that +it contains no files using: + +```bash +find -type f /path/to/old_data +``` + +at which point it can be removed from the `data_dir` list in your config file. diff --git a/doc/book/reference-manual/configuration.md b/doc/book/reference-manual/configuration.md index 3f6ec091f..df1251c21 100644 --- a/doc/book/reference-manual/configuration.md +++ b/doc/book/reference-manual/configuration.md @@ -91,6 +91,19 @@ This folder can be placed on an HDD. The space available for `data_dir` should be counted to determine a node's capacity when [adding it to the cluster layout](@/documentation/cookbook/real-world.md). +Since `v0.9.0`, Garage supports multiple data directories with the following syntax: + +```toml +data_dir = [ + { path = "/path/to/old_data", read_only = true }, + { path = "/path/to/new_hdd1", capacity = "2T" }, + { path = "/path/to/new_hdd2", capacity = "4T" }, +] +``` + +See [the dedicated documentation page](@/documentation/operations/multi-hdd.md) +on how to operate Garage in such a setup. + ### `db_engine` (since `v0.8.0`) By default, Garage uses the Sled embedded database library From 6b008b5bd3843bb236f94a1b4472de11f5755f04 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Sep 2023 13:44:11 +0200 Subject: [PATCH 17/26] block manager: add rebalance operation to rebalance multi-hdd setups --- src/block/layout.rs | 8 ++++ src/block/manager.rs | 22 +++++---- src/block/repair.rs | 90 ++++++++++++++++++++++++++++++++++--- src/garage/cli/structs.rs | 3 ++ src/garage/repair/online.rs | 6 +++ 5 files changed, 115 insertions(+), 14 deletions(-) diff --git a/src/block/layout.rs b/src/block/layout.rs index 8098654f6..e32ef7855 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -252,6 +252,14 @@ impl DataLayout { path.push(hex::encode(&hash.as_slice()[1..2])); path } + + pub(crate) fn without_secondary_locations(&self) -> Self { + Self { + data_dirs: self.data_dirs.clone(), + part_prim: self.part_prim.clone(), + part_sec: self.part_sec.iter().map(|_| vec![]).collect::>(), + } + } } impl InitialFormat for DataLayout { diff --git a/src/block/manager.rs b/src/block/manager.rs index 0081f46cf..e0fbfe742 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use arc_swap::ArcSwapOption; +use arc_swap::{ArcSwap, ArcSwapOption}; use async_trait::async_trait; use bytes::Bytes; use rand::prelude::*; @@ -83,7 +83,9 @@ pub struct BlockManager { /// Directory/ies in which block are stored pub data_dir: DataDirEnum, /// Data layout - pub(crate) data_layout: DataLayout, + pub(crate) data_layout: ArcSwap, + /// Data layout persister + pub(crate) data_layout_persister: Persister, data_fsync: bool, compression_level: Option, @@ -129,9 +131,9 @@ impl BlockManager { system: Arc, ) -> Result, Error> { // Load or compute layout, i.e. assignment of data blocks to the different data directories - let layout_persister: Persister = + let data_layout_persister: Persister = Persister::new(&system.metadata_dir, "data_layout"); - let data_layout = match layout_persister.load() { + let data_layout = match data_layout_persister.load() { Ok(mut layout) => { layout .update(&data_dir) @@ -140,7 +142,7 @@ impl BlockManager { } Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?, }; - layout_persister + data_layout_persister .save(&data_layout) .expect("cannot save data_layout"); @@ -168,7 +170,8 @@ impl BlockManager { let block_manager = Arc::new(Self { replication, data_dir, - data_layout, + data_layout: ArcSwap::new(Arc::new(data_layout)), + data_layout_persister, data_fsync, compression_level, mutation_lock: vec![(); MUTEX_COUNT] @@ -606,9 +609,10 @@ impl BlockManager { /// Find the path where a block is currently stored pub(crate) async fn find_block(&self, hash: &Hash) -> Option { - let dirs = Some(self.data_layout.primary_block_dir(hash)) + let data_layout = self.data_layout.load_full(); + let dirs = Some(data_layout.primary_block_dir(hash)) .into_iter() - .chain(self.data_layout.secondary_block_dirs(hash)); + .chain(data_layout.secondary_block_dirs(hash)); let filename = hex::encode(hash.as_ref()); for dir in dirs { @@ -682,7 +686,7 @@ impl BlockManagerLocked { let compressed = data.is_compressed(); let data = data.inner_buffer(); - let directory = mgr.data_layout.primary_block_dir(hash); + let directory = mgr.data_layout.load().primary_block_dir(hash); let mut tgt_path = directory.clone(); tgt_path.push(hex::encode(hash)); diff --git a/src/block/repair.rs b/src/block/repair.rs index a7c90d4f9..1bea9f09c 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -518,6 +518,86 @@ impl Worker for ScrubWorker { } } +// ---- ---- ---- +// THIRD KIND OF REPAIR: REBALANCING DATA BLOCKS +// between multiple storage locations. +// This is a one-shot repair operation that can be launched, +// checks everything, and then exits. +// ---- ---- ---- + +pub struct RebalanceWorker { + manager: Arc, + block_iter: BlockStoreIterator, + moved: usize, + moved_bytes: usize, +} + +impl RebalanceWorker { + pub fn new(manager: Arc) -> Self { + let block_iter = BlockStoreIterator::new(&manager); + Self { + manager, + block_iter, + moved: 0, + moved_bytes: 0, + } + } +} + +#[async_trait] +impl Worker for RebalanceWorker { + fn name(&self) -> String { + "Block rebalance worker".into() + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)), + freeform: vec![ + format!("Blocks moved: {}", self.moved), + format!("Bytes moved: {}", self.moved_bytes), + ], + ..Default::default() + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { + if let Some((path, hash)) = self.block_iter.next().await? { + let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash); + if path.parent().expect("no parent?") != prim_loc { + // block is not in its primary location, + // move it there (reading and re-writing does the trick) + let data = self.manager.read_block(&hash).await?; + self.manager.write_block(&hash, &data).await?; + self.moved += 1; + self.moved_bytes += data.inner_buffer().len(); + } + Ok(WorkerState::Busy) + } else { + // all blocks are in their primary location: + // - the ones we moved now are + // - the ones written in the meantime always were, because we only + // write to primary locations + // so we can safely remove all secondary locations from the data layout + let new_layout = self + .manager + .data_layout + .load_full() + .without_secondary_locations(); + self.manager + .data_layout_persister + .save_async(&new_layout) + .await?; + self.manager.data_layout.store(Arc::new(new_layout)); + Ok(WorkerState::Done) + } + } + + async fn wait_for_work(&mut self) -> WorkerState { + unreachable!() + } +} + // ---- ---- ---- // UTILITY FOR ENUMERATING THE BLOCK STORE // ---- ---- ---- @@ -526,16 +606,16 @@ const PROGRESS_FP: u64 = 1_000_000_000; impl BlockStoreIterator { fn new(manager: &BlockManager) -> Self { - let min_cap = manager - .data_layout + let data_layout = manager.data_layout.load_full(); + + let min_cap = data_layout .data_dirs .iter() .filter_map(|x| x.capacity()) .min() .unwrap_or(0); - let sum_cap = manager - .data_layout + let sum_cap = data_layout .data_dirs .iter() .map(|x| x.capacity().unwrap_or(min_cap /* approximation */)) @@ -543,7 +623,7 @@ impl BlockStoreIterator { let mut cum_cap = 0; let mut todo = vec![]; - for dir in manager.data_layout.data_dirs.iter() { + for dir in data_layout.data_dirs.iter() { let cap = match dir.state { DataDirState::Active { capacity } => capacity, _ => min_cap, diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 9ca4a059c..fd37a24e6 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -471,6 +471,9 @@ pub enum RepairWhat { #[structopt(subcommand)] cmd: ScrubCmd, }, + /// Rebalance data blocks among storage locations + #[structopt(name = "rebalance", version = garage_version())] + Rebalance, } #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index abfaf9f91..9e4de873c 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -70,6 +70,12 @@ pub async fn launch_online_repair( info!("Sending command to scrub worker: {:?}", cmd); garage.block_manager.send_scrub_command(cmd).await?; } + RepairWhat::Rebalance => { + info!("Rebalancing the stored blocks among storage locations"); + bg.spawn_worker(garage_block::repair::RebalanceWorker::new( + garage.block_manager.clone(), + )); + } } Ok(()) } From 6a067e30ee51d3ea9874e3ce18670c39edfd665b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Sep 2023 13:49:12 +0200 Subject: [PATCH 18/26] doc: documentation of rebalance repair --- doc/book/operations/durability-repairs.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/doc/book/operations/durability-repairs.md b/doc/book/operations/durability-repairs.md index 498c8fda1..331309528 100644 --- a/doc/book/operations/durability-repairs.md +++ b/doc/book/operations/durability-repairs.md @@ -91,6 +91,16 @@ is definitely lost, then there is no other choice than to declare your S3 object as unrecoverable, and to delete them properly from the data store. This can be done using the `garage block purge` command. +## Rebalancing data directories + +In [multi-HDD setups](@/documentation/operations/multi-hdd.md), to ensure that +data blocks are well balanced between storage locations, you may run a +rebalance operation using `garage repair rebalance`. This is usefull when +adding storage locations or when capacities of the storage locations have been +changed. Once this is finished, Garage will know for each block of a single +possible location where it can be, which can increase access speed. This +operation will also move out all data from locations marked as read-only. + # Metadata operations From 2f112ac6827d24f5e8c87915a31a86ec721ebf9e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Sep 2023 14:42:20 +0200 Subject: [PATCH 19/26] correct free data space accounting for multiple data dirs on same fs --- src/rpc/system.rs | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/src/rpc/system.rs b/src/rpc/system.rs index c5751d5de..cf4805492 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -911,12 +911,30 @@ impl NodeStatus { self.data_disk_avail = match data_dir { DataDirEnum::Single(dir) => mount_avail(dir), DataDirEnum::Multiple(dirs) => { - dirs.iter() - .map(|d| mount_avail(&d.path)) - .fold(Some((0, 0)), |acc, cur| match (acc, cur) { - (Some((x, y)), Some((a, b))) => Some((x + a, y + b)), - _ => None, + // Take mounts corresponding to all specified data directories that + // can be used for writing data + let mounts = dirs + .iter() + .filter(|dir| dir.capacity.is_some()) + .map(|dir| { + mounts + .iter() + .filter(|mnt| dir.path.starts_with(&mnt.fs_mounted_on)) + .max_by_key(|mnt| mnt.fs_mounted_on.len()) }) + .collect::>(); + if mounts.iter().any(|x| x.is_none()) { + None // could not get info for at least one mount + } else { + // dedup mounts in case several data directories are on the same filesystem + let mut mounts = mounts.iter().map(|x| x.unwrap()).collect::>(); + mounts.sort_by(|x, y| x.fs_mounted_on.cmp(&y.fs_mounted_on)); + mounts.dedup_by(|x, y| x.fs_mounted_on == y.fs_mounted_on); + // calculate sum of available and total space + Some(mounts.iter().fold((0, 0), |(x, y), mnt| { + (x + mnt.avail.as_u64(), y + mnt.total.as_u64()) + })) + } } }; From eb972a8422e19e1eaf69281571f4e52f9c7794ff Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Sep 2023 14:48:36 +0200 Subject: [PATCH 20/26] doc: update multi-hdd section --- doc/book/operations/durability-repairs.md | 1 - doc/book/operations/multi-hdd.md | 5 +++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/doc/book/operations/durability-repairs.md b/doc/book/operations/durability-repairs.md index 331309528..b0d2c78a9 100644 --- a/doc/book/operations/durability-repairs.md +++ b/doc/book/operations/durability-repairs.md @@ -124,4 +124,3 @@ in your cluster, you can run one of the following repair procedures: - `garage repair versions`: checks that all versions belong to a non-deleted object, and purges any orphan version - `garage repair block_refs`: checks that all block references belong to a non-deleted object version, and purges any orphan block reference (this will then allow the blocks to be garbage-collected) - diff --git a/doc/book/operations/multi-hdd.md b/doc/book/operations/multi-hdd.md index 5f9522623..36445b0ac 100644 --- a/doc/book/operations/multi-hdd.md +++ b/doc/book/operations/multi-hdd.md @@ -65,7 +65,8 @@ To rebalance data, two strategies can be used: Garage checks whether the existing copy is in the primary directory of the slice or in a secondary directory. If the current copy is in a secondary directory, Garage re-writes a copy in the primary directory and deletes the one from the - secondary directory. + secondary directory. This might never end up rebalancing everything if there + are data blocks that are only read and never written. - Active rebalancing: an operator of a Garage node can explicitly launch a repair procedure that rebalances the data directories, moving all blocks to their @@ -94,7 +95,7 @@ it might still contain subdirectories, but no data files. You can check that it contains no files using: ```bash -find -type f /path/to/old_data +find -type f /path/to/old_data # should not print anything ``` at which point it can be removed from the `data_dir` list in your config file. From 2657b5c1b911b7c5f2d97f8c564e60202ddf4124 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Sep 2023 15:30:56 +0200 Subject: [PATCH 21/26] block manager: fix bugs --- src/block/layout.rs | 1 - src/block/repair.rs | 30 ++++++++++++++++++++++++++---- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/src/block/layout.rs b/src/block/layout.rs index e32ef7855..1d8f4cda6 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -200,7 +200,6 @@ impl DataLayout { // it might have been removed and added again and might contain data, // so add it as a secondary storage location for all partitions // to make sure existing files are not lost - let mut part_sec = vec![vec![]; DRIVE_NPART]; for (i, dd) in data_dirs.iter().enumerate() { if self.data_dirs.iter().any(|ed| ed.path == dd.path) { continue; diff --git a/src/block/repair.rs b/src/block/repair.rs index 1bea9f09c..e18eeaeb4 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -17,6 +17,7 @@ use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; +use crate::block::*; use crate::layout::*; use crate::manager::*; @@ -528,8 +529,10 @@ impl Worker for ScrubWorker { pub struct RebalanceWorker { manager: Arc, block_iter: BlockStoreIterator, + t_started: u64, + t_finished: Option, moved: usize, - moved_bytes: usize, + moved_bytes: u64, } impl RebalanceWorker { @@ -538,6 +541,8 @@ impl RebalanceWorker { Self { manager, block_iter, + t_started: now_msec(), + t_finished: None, moved: 0, moved_bytes: 0, } @@ -551,11 +556,18 @@ impl Worker for RebalanceWorker { } fn status(&self) -> WorkerStatus { + let t_cur = self.t_finished.unwrap_or_else(|| now_msec()); + let rate = self.moved_bytes / std::cmp::max(1, (t_cur - self.t_started) / 1000); WorkerStatus { progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)), freeform: vec![ format!("Blocks moved: {}", self.moved), - format!("Bytes moved: {}", self.moved_bytes), + format!( + "Bytes moved: {} ({}/s)", + bytesize::ByteSize::b(self.moved_bytes), + bytesize::ByteSize::b(rate) + ), + format!("Started: {}", msec_to_rfc3339(self.t_started)), ], ..Default::default() } @@ -565,12 +577,21 @@ impl Worker for RebalanceWorker { if let Some((path, hash)) = self.block_iter.next().await? { let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash); if path.parent().expect("no parent?") != prim_loc { + let path = match path.extension() { + None => DataBlockPath::Plain(path), + Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path), + _ => { + warn!("not rebalancing file: {}", path.to_string_lossy()); + return Ok(WorkerState::Busy); + } + }; // block is not in its primary location, // move it there (reading and re-writing does the trick) - let data = self.manager.read_block(&hash).await?; + debug!("rebalance: moving block {:?}", hash); + let data = self.manager.read_block_from(&hash, &path).await?; self.manager.write_block(&hash, &data).await?; self.moved += 1; - self.moved_bytes += data.inner_buffer().len(); + self.moved_bytes += data.inner_buffer().len() as u64; } Ok(WorkerState::Busy) } else { @@ -589,6 +610,7 @@ impl Worker for RebalanceWorker { .save_async(&new_layout) .await?; self.manager.data_layout.store(Arc::new(new_layout)); + self.t_finished = Some(now_msec()); Ok(WorkerState::Done) } } From be91ef6294bcc699f075746fd3abb57a9b22e838 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Sep 2023 16:04:03 +0200 Subject: [PATCH 22/26] block manager: fix bug where rebalance didn't delete old copies --- src/block/block.rs | 1 + src/block/manager.rs | 39 ++++++++++++++++++++++++++++++++++++++- src/block/repair.rs | 37 ++++++++++++++++++++----------------- 3 files changed, 59 insertions(+), 18 deletions(-) diff --git a/src/block/block.rs b/src/block/block.rs index 6d79fb6c3..20f57aa53 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -21,6 +21,7 @@ pub enum DataBlock { Compressed(Bytes), } +#[derive(Debug)] pub enum DataBlockPath { /// Uncompressed data fail Plain(PathBuf), diff --git a/src/block/manager.rs b/src/block/manager.rs index e0fbfe742..ea70b19cd 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -645,6 +645,19 @@ impl BlockManager { None } + /// Rewrite a block at the primary location for its path and delete the old path. + /// Returns the number of bytes read/written + pub(crate) async fn fix_block_location( + &self, + hash: &Hash, + wrong_path: DataBlockPath, + ) -> Result { + self.lock_mutate(hash) + .await + .fix_block_location(hash, wrong_path, self) + .await + } + async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> { let tracer = opentelemetry::global::tracer("garage"); let ilock = u16::from_be_bytes([hash.as_slice()[0], hash.as_slice()[1]]) as usize @@ -682,6 +695,17 @@ impl BlockManagerLocked { hash: &Hash, data: &DataBlock, mgr: &BlockManager, + ) -> Result<(), Error> { + let existing_path = mgr.find_block(hash).await; + self.write_block_inner(hash, data, mgr, existing_path).await + } + + async fn write_block_inner( + &self, + hash: &Hash, + data: &DataBlock, + mgr: &BlockManager, + existing_path: Option, ) -> Result<(), Error> { let compressed = data.is_compressed(); let data = data.inner_buffer(); @@ -694,7 +718,7 @@ impl BlockManagerLocked { tgt_path.set_extension("zst"); } - let to_delete = match (mgr.find_block(hash).await, compressed) { + let to_delete = match (existing_path, compressed) { // If the block is stored in the wrong directory, // write it again at the correct path and delete the old path (Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p), @@ -716,6 +740,7 @@ impl BlockManagerLocked { // If the block isn't stored already, just store what is given to us (None, _) => None, }; + assert!(to_delete.as_ref() != Some(&tgt_path)); let mut path_tmp = tgt_path.clone(); let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>())); @@ -792,6 +817,18 @@ impl BlockManagerLocked { } Ok(()) } + + async fn fix_block_location( + &self, + hash: &Hash, + wrong_path: DataBlockPath, + mgr: &BlockManager, + ) -> Result { + let data = mgr.read_block_from(hash, &wrong_path).await?; + self.write_block_inner(hash, &data, mgr, Some(wrong_path)) + .await?; + Ok(data.inner_buffer().len()) + } } async fn read_stream_to_end(mut stream: ByteStream) -> Result { diff --git a/src/block/repair.rs b/src/block/repair.rs index e18eeaeb4..bd14085f5 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -558,17 +558,21 @@ impl Worker for RebalanceWorker { fn status(&self) -> WorkerStatus { let t_cur = self.t_finished.unwrap_or_else(|| now_msec()); let rate = self.moved_bytes / std::cmp::max(1, (t_cur - self.t_started) / 1000); + let mut freeform = vec![ + format!("Blocks moved: {}", self.moved), + format!( + "Bytes moved: {} ({}/s)", + bytesize::ByteSize::b(self.moved_bytes), + bytesize::ByteSize::b(rate) + ), + format!("Started: {}", msec_to_rfc3339(self.t_started)), + ]; + if let Some(t_fin) = self.t_finished { + freeform.push(format!("Finished: {}", msec_to_rfc3339(t_fin))) + } WorkerStatus { progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)), - freeform: vec![ - format!("Blocks moved: {}", self.moved), - format!( - "Bytes moved: {} ({}/s)", - bytesize::ByteSize::b(self.moved_bytes), - bytesize::ByteSize::b(rate) - ), - format!("Started: {}", msec_to_rfc3339(self.t_started)), - ], + freeform, ..Default::default() } } @@ -576,10 +580,10 @@ impl Worker for RebalanceWorker { async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { if let Some((path, hash)) = self.block_iter.next().await? { let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash); - if path.parent().expect("no parent?") != prim_loc { - let path = match path.extension() { - None => DataBlockPath::Plain(path), - Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path), + if path.ancestors().all(|x| x != prim_loc) { + let block_path = match path.extension() { + None => DataBlockPath::Plain(path.clone()), + Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path.clone()), _ => { warn!("not rebalancing file: {}", path.to_string_lossy()); return Ok(WorkerState::Busy); @@ -587,11 +591,10 @@ impl Worker for RebalanceWorker { }; // block is not in its primary location, // move it there (reading and re-writing does the trick) - debug!("rebalance: moving block {:?}", hash); - let data = self.manager.read_block_from(&hash, &path).await?; - self.manager.write_block(&hash, &data).await?; + debug!("rebalance: moving block {:?} => {:?}", block_path, prim_loc); + let block_len = self.manager.fix_block_location(&hash, block_path).await?; self.moved += 1; - self.moved_bytes += data.inner_buffer().len() as u64; + self.moved_bytes += block_len as u64; } Ok(WorkerState::Busy) } else { From de5d7921813ad84038053c96004ce617bc144722 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Sep 2023 11:52:57 +0200 Subject: [PATCH 23/26] block manager: fix indentation (why not detected by cargo fmt?) --- src/block/layout.rs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/block/layout.rs b/src/block/layout.rs index 1d8f4cda6..e83394050 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -287,22 +287,22 @@ fn make_data_dirs(dirs: &DataDirEnum) -> Result, Error> { let mut ok = false; for dir in dirs.iter() { let state = match &dir.capacity { - Some(cap) if dir.read_only == false => { - let capacity = cap.parse::() - .ok_or_message("invalid capacity value")?.as_u64(); - if capacity == 0 { - return Err(Error::Message(format!("data directory {} should have non-zero capacity", dir.path.to_string_lossy()))); - } - ok = true; - DataDirState::Active { - capacity, - } - } - None if dir.read_only == true => { - DataDirState::ReadOnly - } - _ => return Err(Error::Message(format!("data directories in data_dir should have a capacity value or be marked read_only, not the case for {}", dir.path.to_string_lossy()))), - }; + Some(cap) if dir.read_only == false => { + let capacity = cap.parse::() + .ok_or_message("invalid capacity value")?.as_u64(); + if capacity == 0 { + return Err(Error::Message(format!("data directory {} should have non-zero capacity", dir.path.to_string_lossy()))); + } + ok = true; + DataDirState::Active { + capacity, + } + } + None if dir.read_only == true => { + DataDirState::ReadOnly + } + _ => return Err(Error::Message(format!("data directories in data_dir should have a capacity value or be marked read_only, not the case for {}", dir.path.to_string_lossy()))), + }; data_dirs.push(DataDir { path: dir.path.clone(), state, From 7f9ba49c7151a0c3c29fbe0b0208b4a1f1dfc1e8 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Sep 2023 11:57:36 +0200 Subject: [PATCH 24/26] block manager: remove data_dir field --- src/block/manager.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index ea70b19cd..2d1b5c678 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -80,8 +80,6 @@ pub struct BlockManager { /// Replication strategy, allowing to find on which node blocks should be located pub replication: TableShardedReplication, - /// Directory/ies in which block are stored - pub data_dir: DataDirEnum, /// Data layout pub(crate) data_layout: ArcSwap, /// Data layout persister @@ -169,7 +167,6 @@ impl BlockManager { let block_manager = Arc::new(Self { replication, - data_dir, data_layout: ArcSwap::new(Arc::new(data_layout)), data_layout_persister, data_fsync, From 9526328d386ab6261df416327c2efb0791369339 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Sep 2023 12:10:48 +0200 Subject: [PATCH 25/26] scrub: clear saved checkpoint when canceling scrub --- src/block/repair.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/block/repair.rs b/src/block/repair.rs index bd14085f5..a464e2b67 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -372,6 +372,9 @@ impl ScrubWorker { ScrubWorkerCommand::Cancel => { self.work = match std::mem::take(&mut self.work) { ScrubWorkerState::Running { .. } | ScrubWorkerState::Paused { .. } => { + if let Err(e) = self.persister.set_with(|x| x.checkpoint = None) { + error!("Could not save scrub checkpoint: {}", e); + } ScrubWorkerState::Finished } work => { From ba7ac52c196c452e0b09fef63862264e0c4582bb Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Sep 2023 12:28:29 +0200 Subject: [PATCH 26/26] block repair: simpler/more robust iterator progress calculation --- src/block/repair.rs | 37 ++++++++++++++----------------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/src/block/repair.rs b/src/block/repair.rs index a464e2b67..77ee0d14c 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -18,7 +18,6 @@ use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; use crate::block::*; -use crate::layout::*; use crate::manager::*; // Full scrub every 25 days with a random element of 10 days mixed in below @@ -636,31 +635,23 @@ impl BlockStoreIterator { fn new(manager: &BlockManager) -> Self { let data_layout = manager.data_layout.load_full(); - let min_cap = data_layout - .data_dirs - .iter() - .filter_map(|x| x.capacity()) - .min() - .unwrap_or(0); - - let sum_cap = data_layout - .data_dirs - .iter() - .map(|x| x.capacity().unwrap_or(min_cap /* approximation */)) - .sum::() as u128; + let mut dir_cap = vec![0; data_layout.data_dirs.len()]; + for prim in data_layout.part_prim.iter() { + dir_cap[*prim as usize] += 1; + } + for sec_vec in data_layout.part_sec.iter() { + for sec in sec_vec.iter() { + dir_cap[*sec as usize] += 1; + } + } + let sum_cap = dir_cap.iter().sum::() as u64; let mut cum_cap = 0; let mut todo = vec![]; - for dir in data_layout.data_dirs.iter() { - let cap = match dir.state { - DataDirState::Active { capacity } => capacity, - _ => min_cap, - }; - - let progress_min = ((cum_cap as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64; - let progress_max = - (((cum_cap + cap) as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64; - cum_cap += cap; + for (dir, cap) in data_layout.data_dirs.iter().zip(dir_cap.into_iter()) { + let progress_min = (cum_cap * PROGRESS_FP) / sum_cap; + let progress_max = ((cum_cap + cap as u64) * PROGRESS_FP) / sum_cap; + cum_cap += cap as u64; todo.push(BsiTodo::Directory { path: dir.path.clone(),