Merge pull request 'multi-hdd support (fix #218)' (#625) from multihdd into next
Some checks failed
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is passing

Reviewed-on: #625
This commit is contained in:
Alex 2023-09-11 10:52:01 +00:00
commit 7228fbfd4f
20 changed files with 1135 additions and 322 deletions

1
Cargo.lock generated
View file

@ -1300,6 +1300,7 @@ dependencies = [
"async-compression",
"async-trait",
"bytes",
"bytesize",
"futures",
"futures-util",
"garage_db",

View file

@ -33,7 +33,7 @@ args@{
ignoreLockHash,
}:
let
nixifiedLockHash = "f5b86f9d75664ba528a26ae71f07a38e9c72c78fe331420b9b639e2a099d4dad";
nixifiedLockHash = "685d51432f57c5ad2d5c80e725822b9c9bfd7cc632340f70aa1377c1d89117e4";
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
lockHashIgnored = if ignoreLockHash
@ -1844,6 +1844,7 @@ in
async_compression = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".async-compression."0.4.1" { inherit profileName; }).out;
async_trait = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.73" { profileName = "__noProfile"; }).out;
bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.4.0" { inherit profileName; }).out;
bytesize = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytesize."1.3.0" { inherit profileName; }).out;
futures = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.28" { inherit profileName; }).out;
futures_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.28" { inherit profileName; }).out;
garage_db = (rustPackages."unknown".garage_db."0.8.3" { inherit profileName; }).out;

View file

@ -75,16 +75,11 @@ to store 2 TB of data in total.
- For the metadata storage, Garage does not do checksumming and integrity
verification on its own. If you are afraid of bitrot/data corruption,
put your metadata directory on a BTRFS partition. Otherwise, just use regular
put your metadata directory on a ZFS or BTRFS partition. Otherwise, just use regular
EXT4 or XFS.
- Having a single server with several storage drives is currently not very well
supported in Garage ([#218](https://git.deuxfleurs.fr/Deuxfleurs/garage/issues/218)).
For an easy setup, just put all your drives in a RAID0 or a ZFS RAIDZ array.
If you're adventurous, you can try to format each of your disk as
a separate XFS partition, and then run one `garage` daemon per disk drive,
or use something like [`mergerfs`](https://github.com/trapexit/mergerfs) to merge
all your disks in a single union filesystem that spreads load over them.
- Servers with multiple HDDs are supported natively by Garage without resorting
to RAID, see [our dedicated documentation page](@/documentation/operations/multi-hdd.md).
## Get a Docker image

View file

@ -91,6 +91,16 @@ is definitely lost, then there is no other choice than to declare your S3 object
as unrecoverable, and to delete them properly from the data store. This can be done
using the `garage block purge` command.
## Rebalancing data directories
In [multi-HDD setups](@/documentation/operations/multi-hdd.md), to ensure that
data blocks are well balanced between storage locations, you may run a
rebalance operation using `garage repair rebalance`. This is usefull when
adding storage locations or when capacities of the storage locations have been
changed. Once this is finished, Garage will know for each block of a single
possible location where it can be, which can increase access speed. This
operation will also move out all data from locations marked as read-only.
# Metadata operations
@ -114,4 +124,3 @@ in your cluster, you can run one of the following repair procedures:
- `garage repair versions`: checks that all versions belong to a non-deleted object, and purges any orphan version
- `garage repair block_refs`: checks that all block references belong to a non-deleted object version, and purges any orphan block reference (this will then allow the blocks to be garbage-collected)

View file

@ -0,0 +1,101 @@
+++
title = "Multi-HDD support"
weight = 15
+++
Since v0.9, Garage natively supports nodes that have several storage drives
for storing data blocks (not for metadata storage).
## Initial setup
To set up a new Garage storage node with multiple HDDs,
format and mount all your drives in different directories,
and use a Garage configuration as follows:
```toml
data_dir = [
{ path = "/path/to/hdd1", capacity = "2T" },
{ path = "/path/to/hdd2", capacity = "4T" },
]
```
Garage will automatically balance all blocks stored by the node
among the different specified directories, proportionnally to the
specified capacities.
## Updating the list of storage locations
If you add new storage locations to your `data_dir`,
Garage will not rebalance existing data between storage locations.
Newly written blocks will be balanced proportionnally to the specified capacities,
and existing data may be moved between drives to improve balancing,
but only opportunistically when a data block is re-written (e.g. an object
is re-uploaded, or an object with a duplicate block is uploaded).
To understand precisely what is happening, we need to dive in to how Garage
splits data among the different storage locations.
First of all, Garage divides the set of all possible block hashes
in a fixed number of slices (currently 1024), and assigns
to each slice a primary storage location among the specified data directories.
The number of slices having their primary location in each data directory
is proportionnal to the capacity specified in the config file.
When Garage receives a block to write, it will always write it in the primary
directory of the slice that contains its hash.
Now, to be able to not lose existing data blocks when storage locations
are added, Garage also keeps a list of secondary data directories
for all of the hash slices. Secondary data directories for a slice indicates
storage locations that once were primary directories for that slice, i.e. where
Garage knows that data blocks of that slice might be stored.
When Garage is requested to read a certain data block,
it will first look in the primary storage directory of its slice,
and if it doesn't find it there it goes through all of the secondary storage
locations until it finds it. This allows Garage to continue operating
normally when storage locations are added, without having to shuffle
files between drives to place them in the correct location.
This relatively simple strategy works well but does not ensure that data
is correctly balanced among drives according to their capacity.
To rebalance data, two strategies can be used:
- Lazy rebalancing: when a block is re-written (e.g. the object is re-uploaded),
Garage checks whether the existing copy is in the primary directory of the slice
or in a secondary directory. If the current copy is in a secondary directory,
Garage re-writes a copy in the primary directory and deletes the one from the
secondary directory. This might never end up rebalancing everything if there
are data blocks that are only read and never written.
- Active rebalancing: an operator of a Garage node can explicitly launch a repair
procedure that rebalances the data directories, moving all blocks to their
primary location. Once done, all secondary locations for all hash slices are
removed so that they won't be checked anymore when looking for a data block.
## Read-only storage locations
If you would like to move all data blocks from an existing data directory to one
or several new data directories, mark the old directory as read-only:
```toml
data_dir = [
{ path = "/path/to/old_data", read_only = true },
{ path = "/path/to/new_hdd1", capacity = "2T" },
{ path = "/path/to/new_hdd2", capacity = "4T" },
]
```
Garage will be able to read requested blocks from the read-only directory.
Garage will also move data out of the read-only directory either progressively
(lazy rebalancing) or if requested explicitly (active rebalancing).
Once an active rebalancing has finished, your read-only directory should be empty:
it might still contain subdirectories, but no data files. You can check that
it contains no files using:
```bash
find -type f /path/to/old_data # should not print anything
```
at which point it can be removed from the `data_dir` list in your config file.

View file

@ -80,6 +80,6 @@ The entire procedure would look something like this:
5. If any specific migration procedure is required, it is usually in one of the two cases:
- It can be run on online nodes after the new version has started, during regular cluster operation.
- it has to be run offline
- it has to be run offline, in which case you will have to again take all nodes offline one after the other to run the repair
For this last step, please refer to the specific documentation pertaining to the version upgrade you are doing.

View file

@ -91,6 +91,19 @@ This folder can be placed on an HDD. The space available for `data_dir`
should be counted to determine a node's capacity
when [adding it to the cluster layout](@/documentation/cookbook/real-world.md).
Since `v0.9.0`, Garage supports multiple data directories with the following syntax:
```toml
data_dir = [
{ path = "/path/to/old_data", read_only = true },
{ path = "/path/to/new_hdd1", capacity = "2T" },
{ path = "/path/to/new_hdd2", capacity = "4T" },
]
```
See [the dedicated documentation page](@/documentation/operations/multi-hdd.md)
on how to operate Garage in such a setup.
### `db_engine` (since `v0.8.0`)
By default, Garage uses the Sled embedded database library

View file

@ -24,6 +24,7 @@ opentelemetry = "0.17"
arc-swap = "1.5"
async-trait = "0.1.7"
bytes = "1.0"
bytesize = "1.2"
hex = "0.4"
tracing = "0.1"
rand = "0.8"

View file

@ -1,3 +1,5 @@
use std::path::PathBuf;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use zstd::stream::{decode_all as zstd_decode, Encoder};
@ -19,6 +21,14 @@ pub enum DataBlock {
Compressed(Bytes),
}
#[derive(Debug)]
pub enum DataBlockPath {
/// Uncompressed data fail
Plain(PathBuf),
/// Compressed data fail
Compressed(PathBuf),
}
impl DataBlock {
/// Query whether this block is compressed
pub fn is_compressed(&self) -> bool {

337
src/block/layout.rs Normal file
View file

@ -0,0 +1,337 @@
use std::path::PathBuf;
use serde::{Deserialize, Serialize};
use garage_util::config::DataDirEnum;
use garage_util::data::Hash;
use garage_util::error::{Error, OkOrMessage};
use garage_util::migrate::*;
type Idx = u16;
const DRIVE_NPART: usize = 1024;
const HASH_DRIVE_BYTES: (usize, usize) = (2, 3);
#[derive(Serialize, Deserialize, Debug, Clone)]
pub(crate) struct DataLayout {
pub(crate) data_dirs: Vec<DataDir>,
/// Primary storage location (index in data_dirs) for each partition
/// = the location where the data is supposed to be, blocks are always
/// written there (copies in other dirs may be deleted if they exist)
pub(crate) part_prim: Vec<Idx>,
/// Secondary storage locations for each partition = locations
/// where data blocks might be, we check from these dirs when reading
pub(crate) part_sec: Vec<Vec<Idx>>,
}
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
pub(crate) struct DataDir {
pub(crate) path: PathBuf,
pub(crate) state: DataDirState,
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) enum DataDirState {
Active { capacity: u64 },
ReadOnly,
}
impl DataLayout {
pub(crate) fn initialize(dirs: &DataDirEnum) -> Result<Self, Error> {
let data_dirs = make_data_dirs(dirs)?;
// Split partitions proportionnally to capacity for all drives
// to affect primary storage location
let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::<u64>();
assert!(total_cap > 0);
let mut part_prim = Vec::with_capacity(DRIVE_NPART);
let mut cum_cap = 0;
for (i, dd) in data_dirs.iter().enumerate() {
if let DataDirState::Active { capacity } = dd.state {
cum_cap += capacity;
let n_total = (cum_cap * DRIVE_NPART as u64) / total_cap;
part_prim.resize(n_total as usize, i as Idx);
}
}
assert_eq!(cum_cap, total_cap);
assert_eq!(part_prim.len(), DRIVE_NPART);
// If any of the storage locations is non-empty, it probably existed before
// this algorithm was added, so add it as a secondary storage location for all partitions
// to make sure existing files are not lost
let mut part_sec = vec![vec![]; DRIVE_NPART];
for (i, dd) in data_dirs.iter().enumerate() {
if dir_not_empty(&dd.path)? {
for (sec, prim) in part_sec.iter_mut().zip(part_prim.iter()) {
if *prim != i as Idx {
sec.push(i as Idx);
}
}
}
}
Ok(Self {
data_dirs,
part_prim,
part_sec,
})
}
pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Result<(), Error> {
// Make list of new data directories, exit if nothing changed
let data_dirs = make_data_dirs(dirs)?;
if data_dirs == self.data_dirs {
return Ok(());
}
let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::<u64>();
assert!(total_cap > 0);
// Compute mapping of old indices to new indices
let old2new = self
.data_dirs
.iter()
.map(|x| {
data_dirs
.iter()
.position(|y| y.path == x.path)
.map(|x| x as Idx)
})
.collect::<Vec<_>>();
// Compute secondary location list for partitions based on existing
// folders, translating indices from old to new
let mut part_sec = self
.part_sec
.iter()
.map(|dl| {
dl.iter()
.filter_map(|old| old2new.get(*old as usize).copied().flatten())
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
// Compute a vector that, for each data dir,
// contains the list of partitions primarily stored on that drive
let mut dir_prim = vec![vec![]; data_dirs.len()];
for (ipart, prim) in self.part_prim.iter().enumerate() {
if let Some(new) = old2new.get(*prim as usize).copied().flatten() {
dir_prim[new as usize].push(ipart);
}
}
// Compute the target number of partitions per data directory
let mut cum_cap = 0;
let mut npart_per_dir = vec![0; data_dirs.len()];
for (idir, dd) in data_dirs.iter().enumerate() {
if let DataDirState::Active { capacity } = dd.state {
let begin = (cum_cap * DRIVE_NPART as u64) / total_cap;
cum_cap += capacity;
let end = (cum_cap * DRIVE_NPART as u64) / total_cap;
npart_per_dir[idir] = (end - begin) as usize;
}
}
assert_eq!(cum_cap, total_cap);
assert_eq!(npart_per_dir.iter().sum::<usize>(), DRIVE_NPART);
// For all directories that have too many primary partitions,
// move that partition to secondary
for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate()
{
while parts.len() > *tgt_npart {
let part = parts.pop().unwrap();
if !part_sec[part].contains(&(idir as Idx)) {
part_sec[part].push(idir as Idx);
}
}
}
// Calculate the vector of primary partition dir index
let mut part_prim = vec![None; DRIVE_NPART];
for (idir, parts) in dir_prim.iter().enumerate() {
for part in parts.iter() {
assert!(part_prim[*part].is_none());
part_prim[*part] = Some(idir as Idx)
}
}
// Calculate a vector of unassigned partitions
let mut unassigned = part_prim
.iter()
.enumerate()
.filter(|(_, dir)| dir.is_none())
.map(|(ipart, _)| ipart)
.collect::<Vec<_>>();
// For all directories that don't have enough primary partitions,
// add partitions from unassigned
for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate()
{
if parts.len() < *tgt_npart {
let required = *tgt_npart - parts.len();
assert!(unassigned.len() >= required);
for _ in 0..required {
let new_part = unassigned.pop().unwrap();
part_prim[new_part] = Some(idir as Idx);
part_sec[new_part].retain(|x| *x != idir as Idx);
}
}
}
// Sanity checks
assert!(part_prim.iter().all(|x| x.is_some()));
assert!(unassigned.is_empty());
// Transform part_prim from vec of Option<Idx> to vec of Idx
let part_prim = part_prim
.into_iter()
.map(|x| x.unwrap())
.collect::<Vec<_>>();
assert!(part_prim.iter().all(|p| data_dirs
.get(*p as usize)
.and_then(|x| x.capacity())
.unwrap_or(0)
> 0));
// If any of the newly added storage locations is non-empty,
// it might have been removed and added again and might contain data,
// so add it as a secondary storage location for all partitions
// to make sure existing files are not lost
for (i, dd) in data_dirs.iter().enumerate() {
if self.data_dirs.iter().any(|ed| ed.path == dd.path) {
continue;
}
if dir_not_empty(&dd.path)? {
for (sec, prim) in part_sec.iter_mut().zip(part_prim.iter()) {
if *prim != i as Idx && !sec.contains(&(i as Idx)) {
sec.push(i as Idx);
}
}
}
}
// Apply newly generated config
*self = Self {
data_dirs,
part_prim,
part_sec,
};
Ok(())
}
pub(crate) fn primary_block_dir(&self, hash: &Hash) -> PathBuf {
let ipart = self.partition_from(hash);
let idir = self.part_prim[ipart] as usize;
self.block_dir_from(hash, &self.data_dirs[idir].path)
}
pub(crate) fn secondary_block_dirs<'a>(
&'a self,
hash: &'a Hash,
) -> impl Iterator<Item = PathBuf> + 'a {
let ipart = self.partition_from(hash);
self.part_sec[ipart]
.iter()
.map(move |idir| self.block_dir_from(hash, &self.data_dirs[*idir as usize].path))
}
fn partition_from(&self, hash: &Hash) -> usize {
u16::from_be_bytes([
hash.as_slice()[HASH_DRIVE_BYTES.0],
hash.as_slice()[HASH_DRIVE_BYTES.1],
]) as usize % DRIVE_NPART
}
fn block_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf {
let mut path = dir.clone();
path.push(hex::encode(&hash.as_slice()[0..1]));
path.push(hex::encode(&hash.as_slice()[1..2]));
path
}
pub(crate) fn without_secondary_locations(&self) -> Self {
Self {
data_dirs: self.data_dirs.clone(),
part_prim: self.part_prim.clone(),
part_sec: self.part_sec.iter().map(|_| vec![]).collect::<Vec<_>>(),
}
}
}
impl InitialFormat for DataLayout {
const VERSION_MARKER: &'static [u8] = b"G09bmdl";
}
impl DataDir {
pub fn capacity(&self) -> Option<u64> {
match self.state {
DataDirState::Active { capacity } => Some(capacity),
_ => None,
}
}
}
fn make_data_dirs(dirs: &DataDirEnum) -> Result<Vec<DataDir>, Error> {
let mut data_dirs = vec![];
match dirs {
DataDirEnum::Single(path) => data_dirs.push(DataDir {
path: path.clone(),
state: DataDirState::Active {
capacity: 1_000_000_000, // whatever, doesn't matter
},
}),
DataDirEnum::Multiple(dirs) => {
let mut ok = false;
for dir in dirs.iter() {
let state = match &dir.capacity {
Some(cap) if dir.read_only == false => {
let capacity = cap.parse::<bytesize::ByteSize>()
.ok_or_message("invalid capacity value")?.as_u64();
if capacity == 0 {
return Err(Error::Message(format!("data directory {} should have non-zero capacity", dir.path.to_string_lossy())));
}
ok = true;
DataDirState::Active {
capacity,
}
}
None if dir.read_only == true => {
DataDirState::ReadOnly
}
_ => return Err(Error::Message(format!("data directories in data_dir should have a capacity value or be marked read_only, not the case for {}", dir.path.to_string_lossy()))),
};
data_dirs.push(DataDir {
path: dir.path.clone(),
state,
});
}
if !ok {
return Err(Error::Message(
"incorrect data_dir configuration, no primary writable directory specified"
.into(),
));
}
}
}
Ok(data_dirs)
}
fn dir_not_empty(path: &PathBuf) -> Result<bool, Error> {
for entry in std::fs::read_dir(&path)? {
let dir = entry?;
if dir.file_type()?.is_dir()
&& dir
.file_name()
.into_string()
.ok()
.and_then(|hex| hex::decode(&hex).ok())
.is_some()
{
return Ok(true);
}
}
Ok(false)
}

View file

@ -6,5 +6,6 @@ pub mod repair;
pub mod resync;
mod block;
mod layout;
mod metrics;
mod rc;

View file

@ -3,7 +3,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwapOption;
use arc_swap::{ArcSwap, ArcSwapOption};
use async_trait::async_trait;
use bytes::Bytes;
use rand::prelude::*;
@ -25,10 +25,11 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream};
use garage_db as db;
use garage_util::background::{vars, BackgroundRunner};
use garage_util::config::DataDirEnum;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
use garage_util::persister::PersisterShared;
use garage_util::persister::{Persister, PersisterShared};
use garage_util::time::msec_to_rfc3339;
use garage_rpc::rpc_helper::OrderTag;
@ -38,6 +39,7 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block::*;
use crate::layout::*;
use crate::metrics::*;
use crate::rc::*;
use crate::repair::*;
@ -77,13 +79,16 @@ impl Rpc for BlockRpc {
pub struct BlockManager {
/// Replication strategy, allowing to find on which node blocks should be located
pub replication: TableShardedReplication,
/// Directory in which block are stored
pub data_dir: PathBuf,
/// Data layout
pub(crate) data_layout: ArcSwap<DataLayout>,
/// Data layout persister
pub(crate) data_layout_persister: Persister<DataLayout>,
data_fsync: bool,
compression_level: Option<i32>,
mutation_lock: [Mutex<BlockManagerLocked>; 256],
mutation_lock: Vec<Mutex<BlockManagerLocked>>,
pub(crate) rc: BlockRc,
pub resync: BlockResyncManager,
@ -106,6 +111,9 @@ pub struct BlockResyncErrorInfo {
pub next_try: u64,
}
// The number of different mutexes used to parallelize write access to data blocks
const MUTEX_COUNT: usize = 256;
// This custom struct contains functions that must only be ran
// when the lock is held. We ensure that it is the case by storing
// it INSIDE a Mutex.
@ -114,12 +122,29 @@ struct BlockManagerLocked();
impl BlockManager {
pub fn new(
db: &db::Db,
data_dir: PathBuf,
data_dir: DataDirEnum,
data_fsync: bool,
compression_level: Option<i32>,
replication: TableShardedReplication,
system: Arc<System>,
) -> Arc<Self> {
) -> Result<Arc<Self>, Error> {
// Load or compute layout, i.e. assignment of data blocks to the different data directories
let data_layout_persister: Persister<DataLayout> =
Persister::new(&system.metadata_dir, "data_layout");
let data_layout = match data_layout_persister.load() {
Ok(mut layout) => {
layout
.update(&data_dir)
.ok_or_message("invalid data_dir config")?;
layout
}
Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?,
};
data_layout_persister
.save(&data_layout)
.expect("cannot save data_layout");
// Open metadata tables
let rc = db
.open_tree("block_local_rc")
.expect("Unable to open block_local_rc tree");
@ -142,10 +167,14 @@ impl BlockManager {
let block_manager = Arc::new(Self {
replication,
data_dir,
data_layout: ArcSwap::new(Arc::new(data_layout)),
data_layout_persister,
data_fsync,
compression_level,
mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())),
mutation_lock: vec![(); MUTEX_COUNT]
.iter()
.map(|_| Mutex::new(BlockManagerLocked()))
.collect::<Vec<_>>(),
rc,
resync,
system,
@ -157,7 +186,7 @@ impl BlockManager {
block_manager.endpoint.set_handler(block_manager.clone());
block_manager.scrub_persister.set_with(|_| ()).unwrap();
block_manager
Ok(block_manager)
}
pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
@ -204,44 +233,10 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<(DataBlockHeader, ByteStream), Error> {
let who = self.replication.read_nodes(hash);
let who = self.system.rpc.request_order(&who);
for node in who.iter() {
let node_id = NodeID::from(*node);
let rpc = self.endpoint.call_streaming(
&node_id,
BlockRpc::GetBlock(*hash, order_tag),
PRIO_NORMAL | PRIO_SECONDARY,
);
tokio::select! {
res = rpc => {
let res = match res {
Ok(res) => res,
Err(e) => {
debug!("Node {:?} returned error: {}", node, e);
continue;
}
};
let (header, stream) = match res.into_parts() {
(Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
_ => {
debug!("Node {:?} returned a malformed response", node);
continue;
}
};
return Ok((header, stream));
}
_ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
debug!("Node {:?} didn't return block in time, trying next.", node);
}
};
}
Err(Error::Message(format!(
"Unable to read block {:?}: no node returned a valid block",
hash
)))
self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
Ok((header, stream))
})
.await
}
/// Ask nodes that might have a (possibly compressed) block for it
@ -251,6 +246,24 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> {
self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
read_stream_to_end(stream)
.await
.map(|data| DataBlock::from_parts(header, data))
})
.await
}
async fn rpc_get_raw_block_internal<F, Fut, T>(
&self,
hash: &Hash,
order_tag: Option<OrderTag>,
f: F,
) -> Result<T, Error>
where
F: Fn(DataBlockHeader, ByteStream) -> Fut,
Fut: futures::Future<Output = Result<T, Error>>,
{
let who = self.replication.read_nodes(hash);
let who = self.system.rpc.request_order(&who);
@ -266,34 +279,41 @@ impl BlockManager {
let res = match res {
Ok(res) => res,
Err(e) => {
debug!("Node {:?} returned error: {}", node, e);
debug!("Get block {:?}: node {:?} could not be contacted: {}", hash, node, e);
continue;
}
};
let (header, stream) = match res.into_parts() {
(Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
_ => {
debug!("Node {:?} returned a malformed response", node);
(Ok(_), _) => {
debug!("Get block {:?}: node {:?} returned a malformed response", hash, node);
continue;
}
(Err(e), _) => {
debug!("Get block {:?}: node {:?} returned error: {}", hash, node, e);
continue;
}
};
match read_stream_to_end(stream).await {
Ok(bytes) => return Ok(DataBlock::from_parts(header, bytes)),
match f(header, stream).await {
Ok(ret) => return Ok(ret),
Err(e) => {
debug!("Error reading stream from node {:?}: {}", node, e);
debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e);
}
}
}
// TODO: sleep less long (fail early), initiate a second request earlier
// if the first one doesn't succeed rapidly
// TODO: keep first request running when initiating a new one and take the
// one that finishes earlier
_ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
debug!("Node {:?} didn't return block in time, trying next.", node);
debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node);
}
};
}
Err(Error::Message(format!(
"Unable to read block {:?}: no node returned a valid block",
hash
)))
let msg = format!("Get block {:?}: no node returned a valid block", hash);
debug!("{}", msg);
Err(Error::Message(msg))
}
// ---- Public interface ----
@ -471,8 +491,6 @@ impl BlockManager {
pub(crate) async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage");
let write_size = data.inner_buffer().len() as u64;
self.lock_mutate(hash)
.await
.write_block(hash, data, self)
@ -482,8 +500,6 @@ impl BlockManager {
))
.await?;
self.metrics.bytes_written.add(write_size);
Ok(())
}
@ -510,36 +526,42 @@ impl BlockManager {
/// Read block from disk, verifying it's integrity
pub(crate) async fn read_block(&self, hash: &Hash) -> Result<DataBlock, Error> {
let data = self
.read_block_internal(hash)
.bound_record_duration(&self.metrics.block_read_duration)
.await?;
self.metrics
.bytes_read
.add(data.inner_buffer().len() as u64);
Ok(data)
let tracer = opentelemetry::global::tracer("garage");
async {
match self.find_block(hash).await {
Some(p) => self.read_block_from(hash, &p).await,
None => {
// Not found but maybe we should have had it ??
self.resync
.put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
return Err(Error::Message(format!(
"block {:?} not found on node",
hash
)));
}
}
}
.bound_record_duration(&self.metrics.block_read_duration)
.with_context(Context::current_with_span(
tracer.start("BlockManager::read_block"),
))
.await
}
async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> {
let mut path = self.block_path(hash);
let compressed = match self.is_block_compressed(hash).await {
Ok(c) => c,
Err(e) => {
// Not found but maybe we should have had it ??
self.resync
.put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
return Err(Into::into(e));
}
pub(crate) async fn read_block_from(
&self,
hash: &Hash,
block_path: &DataBlockPath,
) -> Result<DataBlock, Error> {
let (path, compressed) = match block_path {
DataBlockPath::Plain(p) => (p, false),
DataBlockPath::Compressed(p) => (p, true),
};
if compressed {
path.set_extension("zst");
}
let mut f = fs::File::open(&path).await?;
let mut f = fs::File::open(&path).await?;
let mut data = vec![];
f.read_to_end(&mut data).await?;
self.metrics.bytes_read.add(data.len() as u64);
drop(f);
let data = if compressed {
@ -551,29 +573,27 @@ impl BlockManager {
if data.verify(*hash).is_err() {
self.metrics.corruption_counter.add(1);
warn!(
"Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
hash
);
self.lock_mutate(hash)
.await
.move_block_to_corrupted(hash, self)
.move_block_to_corrupted(block_path)
.await?;
self.resync.put_to_resync(hash, Duration::from_millis(0))?;
return Err(Error::CorruptData(*hash));
}
Ok(data)
}
/// Check if this node has a block and whether it needs it
pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result<BlockStatus, Error> {
self.lock_mutate(hash)
.await
.check_block_status(hash, self)
.await
}
/// Check if this node should have a block, but don't actually have it
async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
let BlockStatus { exists, needed } = self.check_block_status(hash).await?;
Ok(needed.is_nonzero() && !exists)
let rc = self.rc.get_block_rc(hash)?;
let exists = self.find_block(hash).await.is_some();
Ok(rc.is_nonzero() && !exists)
}
/// Delete block if it is not needed anymore
@ -584,59 +604,65 @@ impl BlockManager {
.await
}
/// Utility: gives the path of the directory in which a block should be found
fn block_dir(&self, hash: &Hash) -> PathBuf {
let mut path = self.data_dir.clone();
path.push(hex::encode(&hash.as_slice()[0..1]));
path.push(hex::encode(&hash.as_slice()[1..2]));
path
}
/// Find the path where a block is currently stored
pub(crate) async fn find_block(&self, hash: &Hash) -> Option<DataBlockPath> {
let data_layout = self.data_layout.load_full();
let dirs = Some(data_layout.primary_block_dir(hash))
.into_iter()
.chain(data_layout.secondary_block_dirs(hash));
let filename = hex::encode(hash.as_ref());
/// Utility: give the full path where a block should be found, minus extension if block is
/// compressed
fn block_path(&self, hash: &Hash) -> PathBuf {
let mut path = self.block_dir(hash);
path.push(hex::encode(hash.as_ref()));
path
}
for dir in dirs {
let mut path = dir;
path.push(&filename);
/// Utility: check if block is stored compressed. Error if block is not stored
async fn is_block_compressed(&self, hash: &Hash) -> Result<bool, Error> {
let mut path = self.block_path(hash);
// If compression is disabled on node - check for the raw block
// first and then a compressed one (as compression may have been
// previously enabled).
match self.compression_level {
None => {
if self.compression_level.is_none() {
// If compression is disabled on node - check for the raw block
// first and then a compressed one (as compression may have been
// previously enabled).
if fs::metadata(&path).await.is_ok() {
return Ok(false);
return Some(DataBlockPath::Plain(path));
}
path.set_extension("zst");
fs::metadata(&path).await.map(|_| true).map_err(Into::into)
}
_ => {
path.set_extension("zst");
if fs::metadata(&path).await.is_ok() {
return Ok(true);
return Some(DataBlockPath::Compressed(path));
}
} else {
path.set_extension("zst");
if fs::metadata(&path).await.is_ok() {
return Some(DataBlockPath::Compressed(path));
}
path.set_extension("");
fs::metadata(&path).await.map(|_| false).map_err(Into::into)
if fs::metadata(&path).await.is_ok() {
return Some(DataBlockPath::Plain(path));
}
}
}
None
}
/// Rewrite a block at the primary location for its path and delete the old path.
/// Returns the number of bytes read/written
pub(crate) async fn fix_block_location(
&self,
hash: &Hash,
wrong_path: DataBlockPath,
) -> Result<usize, Error> {
self.lock_mutate(hash)
.await
.fix_block_location(hash, wrong_path, self)
.await
}
async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
let tracer = opentelemetry::global::tracer("garage");
self.mutation_lock[hash.as_slice()[0] as usize]
let ilock = u16::from_be_bytes([hash.as_slice()[0], hash.as_slice()[1]]) as usize
% self.mutation_lock.len();
self.mutation_lock[ilock]
.lock()
.with_context(Context::current_with_span(
tracer.start("Acquire mutation_lock"),
tracer.start(format!("Acquire mutation_lock #{}", ilock)),
))
.await
}
@ -649,7 +675,7 @@ impl StreamingEndpointHandler<BlockRpc> for BlockManager {
BlockRpc::PutBlock { hash, header } => Resp::new(
self.handle_put_block(*hash, *header, message.take_stream())
.await
.map(|_| BlockRpc::Ok),
.map(|()| BlockRpc::Ok),
),
BlockRpc::GetBlock(h, order_tag) => self.handle_get_block(h, *order_tag).await,
BlockRpc::NeedBlockQuery(h) => {
@ -660,62 +686,70 @@ impl StreamingEndpointHandler<BlockRpc> for BlockManager {
}
}
pub(crate) struct BlockStatus {
pub(crate) exists: bool,
pub(crate) needed: RcEntry,
}
impl BlockManagerLocked {
async fn check_block_status(
&self,
hash: &Hash,
mgr: &BlockManager,
) -> Result<BlockStatus, Error> {
let exists = mgr.is_block_compressed(hash).await.is_ok();
let needed = mgr.rc.get_block_rc(hash)?;
Ok(BlockStatus { exists, needed })
}
async fn write_block(
&self,
hash: &Hash,
data: &DataBlock,
mgr: &BlockManager,
) -> Result<(), Error> {
let existing_path = mgr.find_block(hash).await;
self.write_block_inner(hash, data, mgr, existing_path).await
}
async fn write_block_inner(
&self,
hash: &Hash,
data: &DataBlock,
mgr: &BlockManager,
existing_path: Option<DataBlockPath>,
) -> Result<(), Error> {
let compressed = data.is_compressed();
let data = data.inner_buffer();
let mut path = mgr.block_dir(hash);
let directory = path.clone();
path.push(hex::encode(hash));
let directory = mgr.data_layout.load().primary_block_dir(hash);
fs::create_dir_all(&directory).await?;
let mut tgt_path = directory.clone();
tgt_path.push(hex::encode(hash));
if compressed {
tgt_path.set_extension("zst");
}
let to_delete = match (mgr.is_block_compressed(hash).await, compressed) {
(Ok(true), _) => return Ok(()),
(Ok(false), false) => return Ok(()),
(Ok(false), true) => {
let path_to_delete = path.clone();
path.set_extension("zst");
Some(path_to_delete)
}
(Err(_), compressed) => {
if compressed {
path.set_extension("zst");
}
None
}
let to_delete = match (existing_path, compressed) {
// If the block is stored in the wrong directory,
// write it again at the correct path and delete the old path
(Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p),
(Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p),
// If the block is already stored not compressed but we have a compressed
// copy, write the compressed copy and delete the uncompressed one
(Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path),
// If the block is already stored compressed,
// keep the stored copy, we have nothing to do
(Some(DataBlockPath::Compressed(_)), _) => return Ok(()),
// If the block is already stored not compressed,
// and we don't have a compressed copy either,
// keep the stored copy, we have nothing to do
(Some(DataBlockPath::Plain(_)), false) => return Ok(()),
// If the block isn't stored already, just store what is given to us
(None, _) => None,
};
assert!(to_delete.as_ref() != Some(&tgt_path));
let mut path_tmp = path.clone();
let mut path_tmp = tgt_path.clone();
let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>()));
path_tmp.set_extension(tmp_extension);
fs::create_dir_all(&directory).await?;
let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone()));
let mut f = fs::File::create(&path_tmp).await?;
f.write_all(data).await?;
mgr.metrics.bytes_written.add(data.len() as u64);
if mgr.data_fsync {
f.sync_all().await?;
@ -723,7 +757,7 @@ impl BlockManagerLocked {
drop(f);
fs::rename(path_tmp, path).await?;
fs::rename(path_tmp, tgt_path).await?;
delete_on_drop.cancel();
@ -749,36 +783,49 @@ impl BlockManagerLocked {
Ok(())
}
async fn move_block_to_corrupted(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> {
warn!(
"Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
hash
);
let mut path = mgr.block_path(hash);
let mut path2 = path.clone();
if mgr.is_block_compressed(hash).await? {
path.set_extension("zst");
path2.set_extension("zst.corrupted");
} else {
path2.set_extension("corrupted");
}
async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> {
let (path, path2) = match block_path {
DataBlockPath::Plain(p) => {
let mut p2 = p.clone();
p2.set_extension("corrupted");
(p, p2)
}
DataBlockPath::Compressed(p) => {
let mut p2 = p.clone();
p2.set_extension("zst.corrupted");
(p, p2)
}
};
fs::rename(path, path2).await?;
Ok(())
}
async fn delete_if_unneeded(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> {
let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?;
if exists && needed.is_deletable() {
let mut path = mgr.block_path(hash);
if mgr.is_block_compressed(hash).await? {
path.set_extension("zst");
let rc = mgr.rc.get_block_rc(hash)?;
if rc.is_deletable() {
while let Some(path) = mgr.find_block(hash).await {
let path = match path {
DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p,
};
fs::remove_file(path).await?;
mgr.metrics.delete_counter.add(1);
}
fs::remove_file(path).await?;
mgr.metrics.delete_counter.add(1);
}
Ok(())
}
async fn fix_block_location(
&self,
hash: &Hash,
wrong_path: DataBlockPath,
mgr: &BlockManager,
) -> Result<usize, Error> {
let data = mgr.read_block_from(hash, &wrong_path).await?;
self.write_block_inner(hash, &data, mgr, Some(wrong_path))
.await?;
Ok(data.inner_buffer().len())
}
}
async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {

View file

@ -17,6 +17,7 @@ use garage_util::persister::PersisterShared;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
use crate::block::*;
use crate::manager::*;
// Full scrub every 25 days with a random element of 10 days mixed in below
@ -136,7 +137,7 @@ impl Worker for RepairWorker {
// Lists all blocks on disk and adds them to the resync queue.
// This allows us to find blocks we are storing but don't actually need,
// so that we can offload them if necessary and then delete them locally.
if let Some(hash) = bi.next().await? {
if let Some((_path, hash)) = bi.next().await? {
self.manager
.resync
.put_to_resync(&hash, Duration::from_secs(0))?;
@ -175,7 +176,9 @@ mod v081 {
}
mod v082 {
use garage_util::data::Hash;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use super::v081;
@ -185,6 +188,27 @@ mod v082 {
pub(crate) time_last_complete_scrub: u64,
pub(crate) time_next_run_scrub: u64,
pub(crate) corruptions_detected: u64,
#[serde(default)]
pub(crate) checkpoint: Option<BlockStoreIterator>,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct BlockStoreIterator {
pub todo: Vec<BsiTodo>,
}
#[derive(Serialize, Deserialize, Clone)]
pub enum BsiTodo {
Directory {
path: PathBuf,
progress_min: u64,
progress_max: u64,
},
File {
path: PathBuf,
hash: Hash,
progress: u64,
},
}
impl garage_util::migrate::Migrate for ScrubWorkerPersisted {
@ -199,6 +223,7 @@ mod v082 {
time_last_complete_scrub: old.time_last_complete_scrub,
time_next_run_scrub: randomize_next_scrub_run_time(old.time_last_complete_scrub),
corruptions_detected: old.corruptions_detected,
checkpoint: None,
}
}
}
@ -235,14 +260,23 @@ impl Default for ScrubWorkerPersisted {
time_next_run_scrub: randomize_next_scrub_run_time(now_msec()),
tranquility: INITIAL_SCRUB_TRANQUILITY,
corruptions_detected: 0,
checkpoint: None,
}
}
}
#[derive(Default)]
enum ScrubWorkerState {
Running(BlockStoreIterator),
Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub
Running {
iterator: BlockStoreIterator,
// time of the last checkpoint
t_cp: u64,
},
Paused {
iterator: BlockStoreIterator,
// time at which the scrub should be resumed
t_resume: u64,
},
#[default]
Finished,
}
@ -261,10 +295,17 @@ impl ScrubWorker {
rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
persister: PersisterShared<ScrubWorkerPersisted>,
) -> Self {
let work = match persister.get_with(|x| x.checkpoint.clone()) {
None => ScrubWorkerState::Finished,
Some(iterator) => ScrubWorkerState::Running {
iterator,
t_cp: now_msec(),
},
};
Self {
manager,
rx_cmd,
work: ScrubWorkerState::Finished,
work,
tranquilizer: Tranquilizer::new(30),
persister,
}
@ -277,7 +318,16 @@ impl ScrubWorker {
ScrubWorkerState::Finished => {
info!("Scrub worker initializing, now performing datastore scrub");
let iterator = BlockStoreIterator::new(&self.manager);
ScrubWorkerState::Running(iterator)
if let Err(e) = self
.persister
.set_with(|x| x.checkpoint = Some(iterator.clone()))
{
error!("Could not save scrub checkpoint: {}", e);
}
ScrubWorkerState::Running {
iterator,
t_cp: now_msec(),
}
}
work => {
error!("Cannot start scrub worker: already running!");
@ -287,8 +337,18 @@ impl ScrubWorker {
}
ScrubWorkerCommand::Pause(dur) => {
self.work = match std::mem::take(&mut self.work) {
ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => {
ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64)
ScrubWorkerState::Running { iterator, .. }
| ScrubWorkerState::Paused { iterator, .. } => {
if let Err(e) = self
.persister
.set_with(|x| x.checkpoint = Some(iterator.clone()))
{
error!("Could not save scrub checkpoint: {}", e);
}
ScrubWorkerState::Paused {
iterator,
t_resume: now_msec() + dur.as_millis() as u64,
}
}
work => {
error!("Cannot pause scrub worker: not running!");
@ -298,7 +358,10 @@ impl ScrubWorker {
}
ScrubWorkerCommand::Resume => {
self.work = match std::mem::take(&mut self.work) {
ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it),
ScrubWorkerState::Paused { iterator, .. } => ScrubWorkerState::Running {
iterator,
t_cp: now_msec(),
},
work => {
error!("Cannot resume scrub worker: not paused!");
work
@ -307,7 +370,10 @@ impl ScrubWorker {
}
ScrubWorkerCommand::Cancel => {
self.work = match std::mem::take(&mut self.work) {
ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => {
ScrubWorkerState::Running { .. } | ScrubWorkerState::Paused { .. } => {
if let Err(e) = self.persister.set_with(|x| x.checkpoint = None) {
error!("Could not save scrub checkpoint: {}", e);
}
ScrubWorkerState::Finished
}
work => {
@ -343,12 +409,15 @@ impl Worker for ScrubWorker {
..Default::default()
};
match &self.work {
ScrubWorkerState::Running(bsi) => {
s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
ScrubWorkerState::Running { iterator, .. } => {
s.progress = Some(format!("{:.2}%", iterator.progress() * 100.));
}
ScrubWorkerState::Paused(bsi, rt) => {
s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))];
ScrubWorkerState::Paused { iterator, t_resume } => {
s.progress = Some(format!("{:.2}%", iterator.progress() * 100.));
s.freeform = vec![format!(
"Scrub paused, resumes at {}",
msec_to_rfc3339(*t_resume)
)];
}
ScrubWorkerState::Finished => {
s.freeform = vec![
@ -374,9 +443,11 @@ impl Worker for ScrubWorker {
};
match &mut self.work {
ScrubWorkerState::Running(bsi) => {
ScrubWorkerState::Running { iterator, t_cp } => {
self.tranquilizer.reset();
if let Some(hash) = bsi.next().await? {
let now = now_msec();
if let Some((_path, hash)) = iterator.next().await? {
match self.manager.read_block(&hash).await {
Err(Error::CorruptData(_)) => {
error!("Found corrupt data block during scrub: {:?}", hash);
@ -385,16 +456,23 @@ impl Worker for ScrubWorker {
Err(e) => return Err(e),
_ => (),
};
if now - *t_cp > 60 * 1000 {
self.persister
.set_with(|p| p.checkpoint = Some(iterator.clone()))?;
*t_cp = now;
}
Ok(self
.tranquilizer
.tranquilize_worker(self.persister.get_with(|p| p.tranquility)))
} else {
let now = now_msec();
let next_scrub_timestamp = randomize_next_scrub_run_time(now);
self.persister.set_with(|p| {
p.time_last_complete_scrub = now;
p.time_next_run_scrub = next_scrub_timestamp;
p.checkpoint = None;
})?;
self.work = ScrubWorkerState::Finished;
self.tranquilizer.clear();
@ -413,8 +491,8 @@ impl Worker for ScrubWorker {
async fn wait_for_work(&mut self) -> WorkerState {
let (wait_until, command) = match &self.work {
ScrubWorkerState::Running(_) => return WorkerState::Busy,
ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
ScrubWorkerState::Running { .. } => return WorkerState::Busy,
ScrubWorkerState::Paused { t_resume, .. } => (*t_resume, ScrubWorkerCommand::Resume),
ScrubWorkerState::Finished => (
self.persister.get_with(|p| p.time_next_run_scrub),
ScrubWorkerCommand::Start,
@ -437,110 +515,250 @@ impl Worker for ScrubWorker {
}
match &self.work {
ScrubWorkerState::Running(_) => WorkerState::Busy,
ScrubWorkerState::Running { .. } => WorkerState::Busy,
_ => WorkerState::Idle,
}
}
}
// ---- ---- ----
// THIRD KIND OF REPAIR: REBALANCING DATA BLOCKS
// between multiple storage locations.
// This is a one-shot repair operation that can be launched,
// checks everything, and then exits.
// ---- ---- ----
pub struct RebalanceWorker {
manager: Arc<BlockManager>,
block_iter: BlockStoreIterator,
t_started: u64,
t_finished: Option<u64>,
moved: usize,
moved_bytes: u64,
}
impl RebalanceWorker {
pub fn new(manager: Arc<BlockManager>) -> Self {
let block_iter = BlockStoreIterator::new(&manager);
Self {
manager,
block_iter,
t_started: now_msec(),
t_finished: None,
moved: 0,
moved_bytes: 0,
}
}
}
#[async_trait]
impl Worker for RebalanceWorker {
fn name(&self) -> String {
"Block rebalance worker".into()
}
fn status(&self) -> WorkerStatus {
let t_cur = self.t_finished.unwrap_or_else(|| now_msec());
let rate = self.moved_bytes / std::cmp::max(1, (t_cur - self.t_started) / 1000);
let mut freeform = vec![
format!("Blocks moved: {}", self.moved),
format!(
"Bytes moved: {} ({}/s)",
bytesize::ByteSize::b(self.moved_bytes),
bytesize::ByteSize::b(rate)
),
format!("Started: {}", msec_to_rfc3339(self.t_started)),
];
if let Some(t_fin) = self.t_finished {
freeform.push(format!("Finished: {}", msec_to_rfc3339(t_fin)))
}
WorkerStatus {
progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)),
freeform,
..Default::default()
}
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
if let Some((path, hash)) = self.block_iter.next().await? {
let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash);
if path.ancestors().all(|x| x != prim_loc) {
let block_path = match path.extension() {
None => DataBlockPath::Plain(path.clone()),
Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path.clone()),
_ => {
warn!("not rebalancing file: {}", path.to_string_lossy());
return Ok(WorkerState::Busy);
}
};
// block is not in its primary location,
// move it there (reading and re-writing does the trick)
debug!("rebalance: moving block {:?} => {:?}", block_path, prim_loc);
let block_len = self.manager.fix_block_location(&hash, block_path).await?;
self.moved += 1;
self.moved_bytes += block_len as u64;
}
Ok(WorkerState::Busy)
} else {
// all blocks are in their primary location:
// - the ones we moved now are
// - the ones written in the meantime always were, because we only
// write to primary locations
// so we can safely remove all secondary locations from the data layout
let new_layout = self
.manager
.data_layout
.load_full()
.without_secondary_locations();
self.manager
.data_layout_persister
.save_async(&new_layout)
.await?;
self.manager.data_layout.store(Arc::new(new_layout));
self.t_finished = Some(now_msec());
Ok(WorkerState::Done)
}
}
async fn wait_for_work(&mut self) -> WorkerState {
unreachable!()
}
}
// ---- ---- ----
// UTILITY FOR ENUMERATING THE BLOCK STORE
// ---- ---- ----
struct BlockStoreIterator {
path: Vec<ReadingDir>,
}
enum ReadingDir {
Pending(PathBuf),
Read {
subpaths: Vec<fs::DirEntry>,
pos: usize,
},
}
const PROGRESS_FP: u64 = 1_000_000_000;
impl BlockStoreIterator {
fn new(manager: &BlockManager) -> Self {
let root_dir = manager.data_dir.clone();
Self {
path: vec![ReadingDir::Pending(root_dir)],
let data_layout = manager.data_layout.load_full();
let mut dir_cap = vec![0; data_layout.data_dirs.len()];
for prim in data_layout.part_prim.iter() {
dir_cap[*prim as usize] += 1;
}
for sec_vec in data_layout.part_sec.iter() {
for sec in sec_vec.iter() {
dir_cap[*sec as usize] += 1;
}
}
let sum_cap = dir_cap.iter().sum::<usize>() as u64;
let mut cum_cap = 0;
let mut todo = vec![];
for (dir, cap) in data_layout.data_dirs.iter().zip(dir_cap.into_iter()) {
let progress_min = (cum_cap * PROGRESS_FP) / sum_cap;
let progress_max = ((cum_cap + cap as u64) * PROGRESS_FP) / sum_cap;
cum_cap += cap as u64;
todo.push(BsiTodo::Directory {
path: dir.path.clone(),
progress_min,
progress_max,
});
}
// entries are processed back-to-front (because of .pop()),
// so reverse entries to process them in increasing progress bounds
todo.reverse();
let ret = Self { todo };
debug_assert!(ret.progress_invariant());
ret
}
/// Returns progress done, between 0 and 1
fn progress(&self) -> f32 {
if self.path.is_empty() {
1.0
} else {
let mut ret = 0.0;
let mut next_div = 1;
for p in self.path.iter() {
match p {
ReadingDir::Pending(_) => break,
ReadingDir::Read { subpaths, pos } => {
next_div *= subpaths.len();
ret += ((*pos - 1) as f32) / (next_div as f32);
self.todo
.last()
.map(|x| match x {
BsiTodo::Directory { progress_min, .. } => *progress_min,
BsiTodo::File { progress, .. } => *progress,
})
.map(|x| x as f32 / PROGRESS_FP as f32)
.unwrap_or(1.0)
}
async fn next(&mut self) -> Result<Option<(PathBuf, Hash)>, Error> {
loop {
match self.todo.pop() {
None => return Ok(None),
Some(BsiTodo::Directory {
path,
progress_min,
progress_max,
}) => {
let istart = self.todo.len();
let mut reader = fs::read_dir(&path).await?;
while let Some(ent) = reader.next_entry().await? {
let name = if let Ok(n) = ent.file_name().into_string() {
n
} else {
continue;
};
let ft = ent.file_type().await?;
if ft.is_dir() && hex::decode(&name).is_ok() {
self.todo.push(BsiTodo::Directory {
path: ent.path(),
progress_min: 0,
progress_max: 0,
});
} else if ft.is_file() {
let filename = name.split_once('.').map(|(f, _)| f).unwrap_or(&name);
if filename.len() == 64 {
if let Ok(h) = hex::decode(filename) {
let mut hash = [0u8; 32];
hash.copy_from_slice(&h);
self.todo.push(BsiTodo::File {
path: ent.path(),
hash: hash.into(),
progress: 0,
});
}
}
}
}
let count = self.todo.len() - istart;
for (i, ent) in self.todo[istart..].iter_mut().enumerate() {
let p1 = progress_min
+ ((progress_max - progress_min) * i as u64) / count as u64;
let p2 = progress_min
+ ((progress_max - progress_min) * (i + 1) as u64) / count as u64;
match ent {
BsiTodo::Directory {
progress_min,
progress_max,
..
} => {
*progress_min = p1;
*progress_max = p2;
}
BsiTodo::File { progress, .. } => {
*progress = p1;
}
}
}
self.todo[istart..].reverse();
debug_assert!(self.progress_invariant());
}
Some(BsiTodo::File { path, hash, .. }) => {
return Ok(Some((path, hash)));
}
}
ret
}
}
async fn next(&mut self) -> Result<Option<Hash>, Error> {
loop {
let last_path = match self.path.last_mut() {
None => return Ok(None),
Some(lp) => lp,
};
if let ReadingDir::Pending(path) = last_path {
let mut reader = fs::read_dir(&path).await?;
let mut subpaths = vec![];
while let Some(ent) = reader.next_entry().await? {
subpaths.push(ent);
}
*last_path = ReadingDir::Read { subpaths, pos: 0 };
}
let (subpaths, pos) = match *last_path {
ReadingDir::Read {
ref subpaths,
ref mut pos,
} => (subpaths, pos),
ReadingDir::Pending(_) => unreachable!(),
};
let data_dir_ent = match subpaths.get(*pos) {
None => {
self.path.pop();
continue;
}
Some(ent) => {
*pos += 1;
ent
}
};
let name = data_dir_ent.file_name();
let name = if let Ok(n) = name.into_string() {
n
} else {
continue;
};
let ent_type = data_dir_ent.file_type().await?;
let name = name.strip_suffix(".zst").unwrap_or(&name);
if name.len() == 2 && hex::decode(name).is_ok() && ent_type.is_dir() {
let path = data_dir_ent.path();
self.path.push(ReadingDir::Pending(path));
} else if name.len() == 64 {
if let Ok(h) = hex::decode(name) {
let mut hash = [0u8; 32];
hash.copy_from_slice(&h);
return Ok(Some(hash.into()));
}
}
}
// for debug_assert!
fn progress_invariant(&self) -> bool {
let iter = self.todo.iter().map(|x| match x {
BsiTodo::Directory { progress_min, .. } => progress_min,
BsiTodo::File { progress, .. } => progress,
});
let iter_1 = iter.clone().skip(1);
iter.zip(iter_1).all(|(prev, next)| prev >= next)
}
}

View file

@ -41,7 +41,7 @@ pub(crate) const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
pub(crate) const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6;
// No more than 4 resync workers can be running in the system
pub(crate) const MAX_RESYNC_WORKERS: usize = 4;
pub(crate) const MAX_RESYNC_WORKERS: usize = 8;
// Resync tranquility is initially set to 2, but can be changed in the CLI
// and the updated version is persisted over Garage restarts
const INITIAL_RESYNC_TRANQUILITY: u32 = 2;
@ -359,20 +359,23 @@ impl BlockResyncManager {
}
async fn resync_block(&self, manager: &BlockManager, hash: &Hash) -> Result<(), Error> {
let BlockStatus { exists, needed } = manager.check_block_status(hash).await?;
let existing_path = manager.find_block(hash).await;
let exists = existing_path.is_some();
let rc = manager.rc.get_block_rc(hash)?;
if exists != needed.is_needed() || exists != needed.is_nonzero() {
if exists != rc.is_needed() || exists != rc.is_nonzero() {
debug!(
"Resync block {:?}: exists {}, nonzero rc {}, deletable {}",
hash,
exists,
needed.is_nonzero(),
needed.is_deletable(),
rc.is_nonzero(),
rc.is_deletable(),
);
}
if exists && needed.is_deletable() {
if exists && rc.is_deletable() {
info!("Resync block {:?}: offloading and deleting", hash);
let existing_path = existing_path.unwrap();
let mut who = manager.replication.write_nodes(hash);
if who.len() < manager.replication.write_quorum() {
@ -419,7 +422,7 @@ impl BlockResyncManager {
.add(1, &[KeyValue::new("to", format!("{:?}", node))]);
}
let block = manager.read_block(hash).await?;
let block = manager.read_block_from(hash, &existing_path).await?;
let (header, bytes) = block.into_parts();
let put_block_message = Req::new(BlockRpc::PutBlock {
hash: *hash,
@ -451,7 +454,7 @@ impl BlockResyncManager {
manager.rc.clear_deleted_block_rc(hash)?;
}
if needed.is_nonzero() && !exists {
if rc.is_nonzero() && !exists {
info!(
"Resync block {:?}: fetching absent but needed block (refcount > 0)",
hash

View file

@ -471,6 +471,9 @@ pub enum RepairWhat {
#[structopt(subcommand)]
cmd: ScrubCmd,
},
/// Rebalance data blocks among storage locations
#[structopt(name = "rebalance", version = garage_version())]
Rebalance,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]

View file

@ -70,6 +70,12 @@ pub async fn launch_online_repair(
info!("Sending command to scrub worker: {:?}", cmd);
garage.block_manager.send_scrub_command(cmd).await?;
}
RepairWhat::Rebalance => {
info!("Rebalancing the stored blocks among storage locations");
bg.spawn_worker(garage_block::repair::RebalanceWorker::new(
garage.block_manager.clone(),
));
}
}
Ok(())
}

View file

@ -92,8 +92,22 @@ impl Garage {
// Create meta dir and data dir if they don't exist already
std::fs::create_dir_all(&config.metadata_dir)
.ok_or_message("Unable to create Garage metadata directory")?;
std::fs::create_dir_all(&config.data_dir)
.ok_or_message("Unable to create Garage data directory")?;
match &config.data_dir {
DataDirEnum::Single(data_dir) => {
std::fs::create_dir_all(data_dir).ok_or_message(format!(
"Unable to create Garage data directory: {}",
data_dir.to_string_lossy()
))?;
}
DataDirEnum::Multiple(data_dirs) => {
for dir in data_dirs {
std::fs::create_dir_all(&dir.path).ok_or_message(format!(
"Unable to create Garage data directory: {}",
dir.path.to_string_lossy()
))?;
}
}
}
info!("Opening database...");
let mut db_path = config.metadata_dir.clone();
@ -237,7 +251,7 @@ impl Garage {
config.compression_level,
data_rep_param,
system.clone(),
);
)?;
block_manager.register_bg_vars(&mut bg_vars);
// ---- admin tables ----

View file

@ -22,9 +22,9 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy;
use netapp::util::parse_and_resolve_peer_addr_async;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
use garage_util::config::Config;
#[cfg(feature = "kubernetes-discovery")]
use garage_util::config::KubernetesDiscoveryConfig;
use garage_util::config::{Config, DataDirEnum};
use garage_util::data::*;
use garage_util::error::*;
use garage_util::persister::Persister;
@ -119,7 +119,7 @@ pub struct System {
/// Path to metadata directory
pub metadata_dir: PathBuf,
/// Path to data directory
pub data_dir: PathBuf,
pub data_dir: DataDirEnum,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -890,7 +890,12 @@ impl NodeStatus {
}
}
fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &Path, metrics: &SystemMetrics) {
fn update_disk_usage(
&mut self,
meta_dir: &Path,
data_dir: &DataDirEnum,
metrics: &SystemMetrics,
) {
use systemstat::{Platform, System};
let mounts = System::new().mounts().unwrap_or_default();
@ -903,7 +908,35 @@ impl NodeStatus {
};
self.meta_disk_avail = mount_avail(meta_dir);
self.data_disk_avail = mount_avail(data_dir);
self.data_disk_avail = match data_dir {
DataDirEnum::Single(dir) => mount_avail(dir),
DataDirEnum::Multiple(dirs) => {
// Take mounts corresponding to all specified data directories that
// can be used for writing data
let mounts = dirs
.iter()
.filter(|dir| dir.capacity.is_some())
.map(|dir| {
mounts
.iter()
.filter(|mnt| dir.path.starts_with(&mnt.fs_mounted_on))
.max_by_key(|mnt| mnt.fs_mounted_on.len())
})
.collect::<Vec<_>>();
if mounts.iter().any(|x| x.is_none()) {
None // could not get info for at least one mount
} else {
// dedup mounts in case several data directories are on the same filesystem
let mut mounts = mounts.iter().map(|x| x.unwrap()).collect::<Vec<_>>();
mounts.sort_by(|x, y| x.fs_mounted_on.cmp(&y.fs_mounted_on));
mounts.dedup_by(|x, y| x.fs_mounted_on == y.fs_mounted_on);
// calculate sum of available and total space
Some(mounts.iter().fold((0, 0), |(x, y), mnt| {
(x + mnt.avail.as_u64(), y + mnt.total.as_u64())
}))
}
}
};
if let Some((avail, total)) = self.meta_disk_avail {
metrics

View file

@ -12,7 +12,7 @@ use crate::replication::*;
use crate::schema::*;
use crate::table::*;
const BATCH_SIZE: usize = 100;
const BATCH_SIZE: usize = 1024;
pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>)
where

View file

@ -13,7 +13,7 @@ pub struct Config {
/// Path where to store metadata. Should be fast, but low volume
pub metadata_dir: PathBuf,
/// Path where to store data. Can be slower, but need higher volume
pub data_dir: PathBuf,
pub data_dir: DataDirEnum,
/// Whether to fsync after all metadata transactions (disabled by default)
#[serde(default)]
@ -94,6 +94,26 @@ pub struct Config {
pub admin: AdminConfig,
}
/// Value for data_dir: either a single directory or a list of dirs with attributes
#[derive(Deserialize, Debug, Clone)]
#[serde(untagged)]
pub enum DataDirEnum {
Single(PathBuf),
Multiple(Vec<DataDir>),
}
#[derive(Deserialize, Debug, Clone)]
pub struct DataDir {
/// Path to the data directory
pub path: PathBuf,
/// Capacity of the drive (required if read_only is false)
#[serde(default)]
pub capacity: Option<String>,
/// Whether this is a legacy read-only path (capacity should be None)
#[serde(default)]
pub read_only: bool,
}
/// Configuration for S3 api
#[derive(Deserialize, Debug, Clone)]
pub struct S3ApiConfig {