diff --git a/src/block/layout.rs b/src/block/layout.rs new file mode 100644 index 00000000..cbc326d8 --- /dev/null +++ b/src/block/layout.rs @@ -0,0 +1,57 @@ +use std::path::PathBuf; + +use serde::{Deserialize, Serialize}; + +use garage_util::config::DataDirEnum; +use garage_util::data::Hash; +use garage_util::migrate::*; + +pub const DRIVE_NPART: usize = 1024; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) struct DataLayout { + pub(crate) data_dirs: Vec, + pub(crate) partitions: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) struct DataDir { + pub(crate) path: PathBuf, + pub(crate) state: DataDirState, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) enum DataDirState { + Active { capacity: u64 }, + ReadOnly, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) struct Partition { + pub(crate) prim: usize, + pub(crate) sec: Vec, +} + +impl DataLayout { + pub(crate) fn initialize(dirs: &DataDirEnum) -> Self { + todo!() + } + + pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Self { + todo!() + } + + pub(crate) fn data_dir(&self, hash: &Hash) -> PathBuf { + todo!() + /* + let mut path = self.data_dir.clone(); + path.push(hex::encode(&hash.as_slice()[0..1])); + path.push(hex::encode(&hash.as_slice()[1..2])); + path + */ + } +} + +impl InitialFormat for DataLayout { + const VERSION_MARKER: &'static [u8] = b"G09bmdl"; +} diff --git a/src/block/lib.rs b/src/block/lib.rs index d2814f77..c9ff2845 100644 --- a/src/block/lib.rs +++ b/src/block/lib.rs @@ -6,5 +6,6 @@ pub mod repair; pub mod resync; mod block; +mod layout; mod metrics; mod rc; diff --git a/src/block/manager.rs b/src/block/manager.rs index c7e4cd03..18a2686e 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -25,10 +25,11 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream}; use garage_db as db; use garage_util::background::{vars, BackgroundRunner}; +use garage_util::config::DataDirEnum; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; -use garage_util::persister::PersisterShared; +use garage_util::persister::{Persister, PersisterShared}; use garage_util::time::msec_to_rfc3339; use garage_rpc::rpc_helper::OrderTag; @@ -38,6 +39,7 @@ use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; use crate::block::*; +use crate::layout::*; use crate::metrics::*; use crate::rc::*; use crate::repair::*; @@ -77,8 +79,11 @@ impl Rpc for BlockRpc { pub struct BlockManager { /// Replication strategy, allowing to find on which node blocks should be located pub replication: TableShardedReplication, - /// Directory in which block are stored - pub data_dir: PathBuf, + + /// Directory/ies in which block are stored + pub data_dir: DataDirEnum, + /// Data layout + pub(crate) data_layout: DataLayout, data_fsync: bool, compression_level: Option, @@ -114,12 +119,22 @@ struct BlockManagerLocked(); impl BlockManager { pub fn new( db: &db::Db, - data_dir: PathBuf, + data_dir: DataDirEnum, data_fsync: bool, compression_level: Option, replication: TableShardedReplication, system: Arc, ) -> Arc { + let layout_persister: Persister = + Persister::new(&system.metadata_dir, "data_layout"); + let data_layout = match layout_persister.load() { + Ok(mut layout) => { + layout.update(&data_dir); + layout + } + Err(_) => DataLayout::initialize(&data_dir), + }; + let rc = db .open_tree("block_local_rc") .expect("Unable to open block_local_rc tree"); @@ -143,6 +158,7 @@ impl BlockManager { let block_manager = Arc::new(Self { replication, data_dir, + data_layout, data_fsync, compression_level, mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())), @@ -586,10 +602,7 @@ impl BlockManager { /// Utility: gives the path of the directory in which a block should be found fn block_dir(&self, hash: &Hash) -> PathBuf { - let mut path = self.data_dir.clone(); - path.push(hex::encode(&hash.as_slice()[0..1])); - path.push(hex::encode(&hash.as_slice()[1..2])); - path + self.data_layout.data_dir(hash) } /// Utility: give the full path where a block should be found, minus extension if block is diff --git a/src/block/repair.rs b/src/block/repair.rs index 71093d69..d5e2e168 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -17,6 +17,7 @@ use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; +use crate::layout::*; use crate::manager::*; // Full scrub every 25 days with a random element of 10 days mixed in below @@ -136,7 +137,7 @@ impl Worker for RepairWorker { // Lists all blocks on disk and adds them to the resync queue. // This allows us to find blocks we are storing but don't actually need, // so that we can offload them if necessary and then delete them locally. - if let Some(hash) = bi.next().await? { + if let Some((_path, hash)) = bi.next().await? { self.manager .resync .put_to_resync(&hash, Duration::from_secs(0))?; @@ -376,7 +377,7 @@ impl Worker for ScrubWorker { match &mut self.work { ScrubWorkerState::Running(bsi) => { self.tranquilizer.reset(); - if let Some(hash) = bsi.next().await? { + if let Some((_path, hash)) = bsi.next().await? { match self.manager.read_block(&hash).await { Err(Error::CorruptData(_)) => { error!("Found corrupt data block during scrub: {:?}", hash); @@ -447,100 +448,166 @@ impl Worker for ScrubWorker { // UTILITY FOR ENUMERATING THE BLOCK STORE // ---- ---- ---- +const PROGRESS_FP: u64 = 1_000_000_000; + struct BlockStoreIterator { - path: Vec, + todo: Vec, } -enum ReadingDir { - Pending(PathBuf), - Read { - subpaths: Vec, - pos: usize, +enum BsiTodo { + Directory { + path: PathBuf, + progress_min: u64, + progress_max: u64, + }, + File { + path: PathBuf, + filename: String, + progress: u64, }, } impl BlockStoreIterator { fn new(manager: &BlockManager) -> Self { - let root_dir = manager.data_dir.clone(); - Self { - path: vec![ReadingDir::Pending(root_dir)], + let min_cap = manager + .data_layout + .data_dirs + .iter() + .filter_map(|x| match x.state { + DataDirState::Active { capacity } => Some(capacity), + _ => None, + }) + .min() + .unwrap_or(0); + + let sum_cap = manager + .data_layout + .data_dirs + .iter() + .map(|x| match x.state { + DataDirState::Active { capacity } => capacity, + _ => min_cap, // approximation + }) + .sum::() as u128; + + let mut cum_cap = 0; + let mut todo = vec![]; + for dir in manager.data_layout.data_dirs.iter() { + let cap = match dir.state { + DataDirState::Active { capacity } => capacity, + _ => min_cap, + }; + + let progress_min = ((cum_cap as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64; + let progress_max = + (((cum_cap + cap) as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64; + cum_cap += cap; + + todo.push(BsiTodo::Directory { + path: dir.path.clone(), + progress_min, + progress_max, + }); } + // entries are processed back-to-front (because of .pop()), + // so reverse entries to process them in increasing progress bounds + todo.reverse(); + + let ret = Self { todo }; + debug_assert!(ret.progress_invariant()); + + ret } /// Returns progress done, between 0 and 1 fn progress(&self) -> f32 { - if self.path.is_empty() { - 1.0 - } else { - let mut ret = 0.0; - let mut next_div = 1; - for p in self.path.iter() { - match p { - ReadingDir::Pending(_) => break, - ReadingDir::Read { subpaths, pos } => { - next_div *= subpaths.len(); - ret += ((*pos - 1) as f32) / (next_div as f32); + self.todo + .last() + .map(|x| match x { + BsiTodo::Directory { progress_min, .. } => *progress_min, + BsiTodo::File { progress, .. } => *progress, + }) + .map(|x| x as f32 / PROGRESS_FP as f32) + .unwrap_or(1.0) + } + + async fn next(&mut self) -> Result, Error> { + loop { + match self.todo.pop() { + None => return Ok(None), + Some(BsiTodo::Directory { + path, + progress_min, + progress_max, + }) => { + let istart = self.todo.len(); + + let mut reader = fs::read_dir(&path).await?; + while let Some(ent) = reader.next_entry().await? { + let name = if let Ok(n) = ent.file_name().into_string() { + n + } else { + continue; + }; + let ft = ent.file_type().await?; + if ft.is_dir() && hex::decode(&name).is_ok() { + self.todo.push(BsiTodo::Directory { + path: ent.path(), + progress_min: 0, + progress_max: 0, + }); + } else if ft.is_file() { + self.todo.push(BsiTodo::File { + path: ent.path(), + filename: name, + progress: 0, + }); + } + } + + let count = self.todo.len() - istart; + for (i, ent) in self.todo[istart..].iter_mut().enumerate() { + let p1 = progress_min + + ((progress_max - progress_min) * i as u64) / count as u64; + let p2 = progress_min + + ((progress_max - progress_min) * (i + 1) as u64) / count as u64; + match ent { + BsiTodo::Directory { + progress_min, + progress_max, + .. + } => { + *progress_min = p1; + *progress_max = p2; + } + BsiTodo::File { progress, .. } => { + *progress = p1; + } + } + } + self.todo[istart..].reverse(); + debug_assert!(self.progress_invariant()); + } + Some(BsiTodo::File { path, filename, .. }) => { + let filename = filename.strip_suffix(".zst").unwrap_or(&filename); + if filename.len() == 64 { + if let Ok(h) = hex::decode(filename) { + let mut hash = [0u8; 32]; + hash.copy_from_slice(&h); + return Ok(Some((path, hash.into()))); + } } } } - ret } } - async fn next(&mut self) -> Result, Error> { - loop { - let last_path = match self.path.last_mut() { - None => return Ok(None), - Some(lp) => lp, - }; - - if let ReadingDir::Pending(path) = last_path { - let mut reader = fs::read_dir(&path).await?; - let mut subpaths = vec![]; - while let Some(ent) = reader.next_entry().await? { - subpaths.push(ent); - } - *last_path = ReadingDir::Read { subpaths, pos: 0 }; - } - - let (subpaths, pos) = match *last_path { - ReadingDir::Read { - ref subpaths, - ref mut pos, - } => (subpaths, pos), - ReadingDir::Pending(_) => unreachable!(), - }; - - let data_dir_ent = match subpaths.get(*pos) { - None => { - self.path.pop(); - continue; - } - Some(ent) => { - *pos += 1; - ent - } - }; - - let name = data_dir_ent.file_name(); - let name = if let Ok(n) = name.into_string() { - n - } else { - continue; - }; - let ent_type = data_dir_ent.file_type().await?; - - let name = name.strip_suffix(".zst").unwrap_or(&name); - if name.len() == 2 && hex::decode(name).is_ok() && ent_type.is_dir() { - let path = data_dir_ent.path(); - self.path.push(ReadingDir::Pending(path)); - } else if name.len() == 64 { - if let Ok(h) = hex::decode(name) { - let mut hash = [0u8; 32]; - hash.copy_from_slice(&h); - return Ok(Some(hash.into())); - } - } - } + fn progress_invariant(&self) -> bool { + let iter = self.todo.iter().map(|x| match x { + BsiTodo::Directory { progress_min, .. } => progress_min, + BsiTodo::File { progress, .. } => progress, + }); + let iter_1 = iter.clone().skip(1); + iter.zip(iter_1).all(|(prev, next)| prev >= next) } } diff --git a/src/model/garage.rs b/src/model/garage.rs index 981430fb..d6eebfb0 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -92,8 +92,22 @@ impl Garage { // Create meta dir and data dir if they don't exist already std::fs::create_dir_all(&config.metadata_dir) .ok_or_message("Unable to create Garage metadata directory")?; - std::fs::create_dir_all(&config.data_dir) - .ok_or_message("Unable to create Garage data directory")?; + match &config.data_dir { + DataDirEnum::Single(data_dir) => { + std::fs::create_dir_all(data_dir).ok_or_message(format!( + "Unable to create Garage data directory: {}", + data_dir.to_string_lossy() + ))?; + } + DataDirEnum::Multiple(data_dirs) => { + for dir in data_dirs { + std::fs::create_dir_all(&dir.path).ok_or_message(format!( + "Unable to create Garage data directory: {}", + dir.path.to_string_lossy() + ))?; + } + } + } info!("Opening database..."); let mut db_path = config.metadata_dir.clone(); diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 1675e70e..c5751d5d 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -22,9 +22,9 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::util::parse_and_resolve_peer_addr_async; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; -use garage_util::config::Config; #[cfg(feature = "kubernetes-discovery")] use garage_util::config::KubernetesDiscoveryConfig; +use garage_util::config::{Config, DataDirEnum}; use garage_util::data::*; use garage_util::error::*; use garage_util::persister::Persister; @@ -119,7 +119,7 @@ pub struct System { /// Path to metadata directory pub metadata_dir: PathBuf, /// Path to data directory - pub data_dir: PathBuf, + pub data_dir: DataDirEnum, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -890,7 +890,12 @@ impl NodeStatus { } } - fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &Path, metrics: &SystemMetrics) { + fn update_disk_usage( + &mut self, + meta_dir: &Path, + data_dir: &DataDirEnum, + metrics: &SystemMetrics, + ) { use systemstat::{Platform, System}; let mounts = System::new().mounts().unwrap_or_default(); @@ -903,7 +908,17 @@ impl NodeStatus { }; self.meta_disk_avail = mount_avail(meta_dir); - self.data_disk_avail = mount_avail(data_dir); + self.data_disk_avail = match data_dir { + DataDirEnum::Single(dir) => mount_avail(dir), + DataDirEnum::Multiple(dirs) => { + dirs.iter() + .map(|d| mount_avail(&d.path)) + .fold(Some((0, 0)), |acc, cur| match (acc, cur) { + (Some((x, y)), Some((a, b))) => Some((x + a, y + b)), + _ => None, + }) + } + }; if let Some((avail, total)) = self.meta_disk_avail { metrics diff --git a/src/util/config.rs b/src/util/config.rs index eeb17e0e..9d00fe82 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -13,7 +13,7 @@ pub struct Config { /// Path where to store metadata. Should be fast, but low volume pub metadata_dir: PathBuf, /// Path where to store data. Can be slower, but need higher volume - pub data_dir: PathBuf, + pub data_dir: DataDirEnum, /// Whether to fsync after all metadata transactions (disabled by default) #[serde(default)] @@ -94,6 +94,26 @@ pub struct Config { pub admin: AdminConfig, } +/// Value for data_dir: either a single directory or a list of dirs with attributes +#[derive(Deserialize, Debug, Clone)] +#[serde(untagged)] +pub enum DataDirEnum { + Single(PathBuf), + Multiple(Vec), +} + +#[derive(Deserialize, Debug, Clone)] +pub struct DataDir { + /// Path to the data directory + pub path: PathBuf, + /// Capacity of the drive (required if read_only is false) + #[serde(default)] + pub capacity: Option, + /// Whether this is a legacy read-only path (capacity should be None) + #[serde(default)] + pub read_only: bool, +} + /// Configuration for S3 api #[derive(Deserialize, Debug, Clone)] pub struct S3ApiConfig {