diff --git a/Cargo.lock b/Cargo.lock index 79b35191a..1ace7cc2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1300,6 +1300,7 @@ dependencies = [ "async-compression", "async-trait", "bytes", + "bytesize", "futures", "futures-util", "garage_db", diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index 1057b699e..b77988d6c 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -24,6 +24,7 @@ opentelemetry = "0.17" arc-swap = "1.5" async-trait = "0.1.7" bytes = "1.0" +bytesize = "1.2" hex = "0.4" tracing = "0.1" rand = "0.8" diff --git a/src/block/layout.rs b/src/block/layout.rs index cbc326d8f..4a49b2870 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -4,14 +4,23 @@ use serde::{Deserialize, Serialize}; use garage_util::config::DataDirEnum; use garage_util::data::Hash; +use garage_util::error::{Error, OkOrMessage}; use garage_util::migrate::*; -pub const DRIVE_NPART: usize = 1024; +type Idx = u16; + +const DRIVE_NPART: usize = 1024; + +const DPART_BYTES: (usize, usize) = (2, 3); #[derive(Serialize, Deserialize, Debug, Clone)] pub(crate) struct DataLayout { pub(crate) data_dirs: Vec, - pub(crate) partitions: Vec, + + /// Primary storage location (index in data_dirs) for each partition + pub(crate) part_prim: Vec, + /// Secondary storage locations for each partition + pub(crate) part_sec: Vec>, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -20,38 +29,255 @@ pub(crate) struct DataDir { pub(crate) state: DataDirState, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub(crate) enum DataDirState { Active { capacity: u64 }, ReadOnly, } -#[derive(Serialize, Deserialize, Debug, Clone)] -pub(crate) struct Partition { - pub(crate) prim: usize, - pub(crate) sec: Vec, -} - impl DataLayout { - pub(crate) fn initialize(dirs: &DataDirEnum) -> Self { - todo!() + pub(crate) fn initialize(dirs: &DataDirEnum) -> Result { + let data_dirs = make_data_dirs(dirs)?; + + // Split partitions proportionnally to capacity for all drives + // to affect primary storage location + let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::(); + + let mut part_prim = Vec::with_capacity(DRIVE_NPART); + let mut cum_cap = 0; + for (i, dd) in data_dirs.iter().enumerate() { + if let DataDirState::Active { capacity } = dd.state { + cum_cap += capacity; + let n_total = (cum_cap * DRIVE_NPART as u64) / total_cap; + part_prim.resize(n_total as usize, i as Idx); + } + } + assert_eq!(cum_cap, total_cap); + assert_eq!(part_prim.len(), DRIVE_NPART); + + // If any of the storage locations is non-empty, add it as a secondary + // storage location for all partitions + let mut part_sec = vec![vec![]; DRIVE_NPART]; + for (i, dd) in data_dirs.iter().enumerate() { + if dir_not_empty(&dd.path)? { + for (sec, prim) in part_sec.iter_mut().zip(part_prim.iter()) { + if *prim != i as Idx { + sec.push(i as Idx); + } + } + } + } + + Ok(Self { + data_dirs, + part_prim, + part_sec, + }) } - pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Self { - todo!() + pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Result { + // Compute list of new data directories and mapping of old indices + // to new indices + let data_dirs = make_data_dirs(dirs)?; + let old2new = self + .data_dirs + .iter() + .map(|x| { + data_dirs + .iter() + .position(|y| y.path == x.path) + .map(|x| x as Idx) + }) + .collect::>(); + + // Compute secondary location list for partitions based on existing + // folders, translating indices from old to new + let mut part_sec = self + .part_sec + .iter() + .map(|dl| { + dl.iter() + .filter_map(|old| old2new.get(*old as usize).copied().flatten()) + .collect::>() + }) + .collect::>(); + + // Compute a vector that, for each data dir, + // contains the list of partitions primarily stored on that drive + let mut dir_prim = vec![vec![]; data_dirs.len()]; + for (ipart, prim) in self.part_prim.iter().enumerate() { + if let Some(new) = old2new.get(*prim as usize).copied().flatten() { + dir_prim[new as usize].push(ipart); + } + } + + // Compute the target number of partitions per data directory + let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::(); + let mut cum_cap = 0; + let mut npart_per_dir = vec![]; + for dd in data_dirs.iter() { + if let DataDirState::Active { capacity } = dd.state { + let begin = (cum_cap * DRIVE_NPART as u64) / total_cap; + cum_cap += capacity; + let end = (cum_cap * DRIVE_NPART as u64) / total_cap; + npart_per_dir.push((end - begin) as usize); + } else { + npart_per_dir.push(0); + } + } + assert_eq!(cum_cap, total_cap); + assert_eq!(npart_per_dir.iter().sum::(), DRIVE_NPART); + + // For all directories that have too many primary partitions, + // move that partition to secondary + for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate() + { + while parts.len() > *tgt_npart { + let part = parts.pop().unwrap(); + if !part_sec[part].contains(&(idir as Idx)) { + part_sec[part].push(idir as Idx); + } + } + } + + // Calculate the vector of primary partition dir index + let mut part_prim = vec![None; DRIVE_NPART]; + for (idir, parts) in dir_prim.iter().enumerate() { + for part in parts.iter() { + assert!(part_prim[*part].is_none()); + part_prim[*part] = Some(idir as Idx) + } + } + + // Calculate a vector of unassigned partitions + let mut unassigned = part_prim + .iter() + .enumerate() + .filter(|(_, dir)| dir.is_none()) + .map(|(ipart, _)| ipart) + .collect::>(); + + // For all directories that don't have enough primary partitions, + // add partitions from unassigned + for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate() + { + assert!(unassigned.len() >= *tgt_npart - parts.len()); + for _ in parts.len()..*tgt_npart { + let new_part = unassigned.pop().unwrap(); + part_prim[new_part] = Some(idir as Idx); + part_sec[new_part].retain(|x| *x != idir as Idx); + } + } + + // Sanity checks + assert!(part_prim.iter().all(|x| x.is_some())); + assert!(unassigned.is_empty()); + + let part_prim = part_prim + .into_iter() + .map(|x| x.unwrap()) + .collect::>(); + assert!(part_prim.iter().all(|p| data_dirs + .get(*p as usize) + .and_then(|x| x.capacity()) + .unwrap_or(0) + > 0)); + + Ok(Self { + data_dirs, + part_prim, + part_sec, + }) } - pub(crate) fn data_dir(&self, hash: &Hash) -> PathBuf { - todo!() - /* - let mut path = self.data_dir.clone(); + pub(crate) fn primary_data_dir(&self, hash: &Hash) -> PathBuf { + let ipart = self.partition_from(hash); + let idir = self.part_prim[ipart] as usize; + self.data_dir_from(hash, &self.data_dirs[idir].path) + } + + pub(crate) fn secondary_data_dirs<'a>(&'a self, hash: &'a Hash) -> impl Iterator + 'a { + let ipart = self.partition_from(hash); + self.part_sec[ipart] + .iter() + .map(move |idir| self.data_dir_from(hash, &self.data_dirs[*idir as usize].path)) + } + + fn partition_from(&self, hash: &Hash) -> usize { + u16::from_be_bytes([ + hash.as_slice()[DPART_BYTES.0], + hash.as_slice()[DPART_BYTES.1] + ]) as usize % DRIVE_NPART + } + + fn data_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf { + let mut path = dir.clone(); path.push(hex::encode(&hash.as_slice()[0..1])); path.push(hex::encode(&hash.as_slice()[1..2])); path - */ - } + } } impl InitialFormat for DataLayout { const VERSION_MARKER: &'static [u8] = b"G09bmdl"; } + +impl DataDir { + pub fn capacity(&self) -> Option { + match self.state { + DataDirState::Active { capacity } => Some(capacity), + _ => None, + } + } +} + +fn make_data_dirs(dirs: &DataDirEnum) -> Result, Error> { + let mut data_dirs = vec![]; + match dirs { + DataDirEnum::Single(path) => data_dirs.push(DataDir { + path: path.clone(), + state: DataDirState::Active { + capacity: 1_000_000_000, // whatever, doesn't matter + }, + }), + DataDirEnum::Multiple(dirs) => { + for dir in dirs.iter() { + let state = match &dir.capacity { + Some(cap) if dir.read_only == false => { + DataDirState::Active { + capacity: cap.parse::() + .ok_or_message("invalid capacity value")?.as_u64(), + } + } + None if dir.read_only == true => { + DataDirState::ReadOnly + } + _ => return Err(Error::Message(format!("data directories in data_dir should have a capacity value or be marked read_only, not the case for {}", dir.path.to_string_lossy()))), + }; + data_dirs.push(DataDir { + path: dir.path.clone(), + state, + }); + } + } + } + Ok(data_dirs) +} + +fn dir_not_empty(path: &PathBuf) -> Result { + for entry in std::fs::read_dir(&path)? { + let dir = entry?; + if dir.file_type()?.is_dir() + && dir + .file_name() + .into_string() + .ok() + .and_then(|hex| hex::decode(&hex).ok()) + .map(|bytes| (2..=4).contains(&bytes.len())) + .unwrap_or(false) + { + return Ok(true); + } + } + Ok(false) +} diff --git a/src/block/manager.rs b/src/block/manager.rs index 18a2686ef..45729a003 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -125,15 +125,19 @@ impl BlockManager { replication: TableShardedReplication, system: Arc, ) -> Arc { + // TODO don't panic, report error let layout_persister: Persister = Persister::new(&system.metadata_dir, "data_layout"); let data_layout = match layout_persister.load() { Ok(mut layout) => { - layout.update(&data_dir); + layout.update(&data_dir).expect("invalid data_dir config"); layout } - Err(_) => DataLayout::initialize(&data_dir), + Err(_) => DataLayout::initialize(&data_dir).expect("invalid data_dir config"), }; + layout_persister + .save(&data_layout) + .expect("cannot save data_layout"); let rc = db .open_tree("block_local_rc") @@ -602,7 +606,7 @@ impl BlockManager { /// Utility: gives the path of the directory in which a block should be found fn block_dir(&self, hash: &Hash) -> PathBuf { - self.data_layout.data_dir(hash) + self.data_layout.primary_data_dir(hash) } /// Utility: give the full path where a block should be found, minus extension if block is diff --git a/src/block/repair.rs b/src/block/repair.rs index d5e2e1680..0e7fe0df0 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -473,10 +473,7 @@ impl BlockStoreIterator { .data_layout .data_dirs .iter() - .filter_map(|x| match x.state { - DataDirState::Active { capacity } => Some(capacity), - _ => None, - }) + .filter_map(|x| x.capacity()) .min() .unwrap_or(0); @@ -484,10 +481,7 @@ impl BlockStoreIterator { .data_layout .data_dirs .iter() - .map(|x| match x.state { - DataDirState::Active { capacity } => capacity, - _ => min_cap, // approximation - }) + .map(|x| x.capacity().unwrap_or(min_cap /* approximation */)) .sum::() as u128; let mut cum_cap = 0;