Garage v0.9 #473
5 changed files with 115 additions and 14 deletions
|
@ -252,6 +252,14 @@ impl DataLayout {
|
||||||
path.push(hex::encode(&hash.as_slice()[1..2]));
|
path.push(hex::encode(&hash.as_slice()[1..2]));
|
||||||
path
|
path
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn without_secondary_locations(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
data_dirs: self.data_dirs.clone(),
|
||||||
|
part_prim: self.part_prim.clone(),
|
||||||
|
part_sec: self.part_sec.iter().map(|_| vec![]).collect::<Vec<_>>(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl InitialFormat for DataLayout {
|
impl InitialFormat for DataLayout {
|
||||||
|
|
|
@ -3,7 +3,7 @@ use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use arc_swap::ArcSwapOption;
|
use arc_swap::{ArcSwap, ArcSwapOption};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use rand::prelude::*;
|
use rand::prelude::*;
|
||||||
|
@ -83,7 +83,9 @@ pub struct BlockManager {
|
||||||
/// Directory/ies in which block are stored
|
/// Directory/ies in which block are stored
|
||||||
pub data_dir: DataDirEnum,
|
pub data_dir: DataDirEnum,
|
||||||
/// Data layout
|
/// Data layout
|
||||||
pub(crate) data_layout: DataLayout,
|
pub(crate) data_layout: ArcSwap<DataLayout>,
|
||||||
|
/// Data layout persister
|
||||||
|
pub(crate) data_layout_persister: Persister<DataLayout>,
|
||||||
|
|
||||||
data_fsync: bool,
|
data_fsync: bool,
|
||||||
compression_level: Option<i32>,
|
compression_level: Option<i32>,
|
||||||
|
@ -129,9 +131,9 @@ impl BlockManager {
|
||||||
system: Arc<System>,
|
system: Arc<System>,
|
||||||
) -> Result<Arc<Self>, Error> {
|
) -> Result<Arc<Self>, Error> {
|
||||||
// Load or compute layout, i.e. assignment of data blocks to the different data directories
|
// Load or compute layout, i.e. assignment of data blocks to the different data directories
|
||||||
let layout_persister: Persister<DataLayout> =
|
let data_layout_persister: Persister<DataLayout> =
|
||||||
Persister::new(&system.metadata_dir, "data_layout");
|
Persister::new(&system.metadata_dir, "data_layout");
|
||||||
let data_layout = match layout_persister.load() {
|
let data_layout = match data_layout_persister.load() {
|
||||||
Ok(mut layout) => {
|
Ok(mut layout) => {
|
||||||
layout
|
layout
|
||||||
.update(&data_dir)
|
.update(&data_dir)
|
||||||
|
@ -140,7 +142,7 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?,
|
Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?,
|
||||||
};
|
};
|
||||||
layout_persister
|
data_layout_persister
|
||||||
.save(&data_layout)
|
.save(&data_layout)
|
||||||
.expect("cannot save data_layout");
|
.expect("cannot save data_layout");
|
||||||
|
|
||||||
|
@ -168,7 +170,8 @@ impl BlockManager {
|
||||||
let block_manager = Arc::new(Self {
|
let block_manager = Arc::new(Self {
|
||||||
replication,
|
replication,
|
||||||
data_dir,
|
data_dir,
|
||||||
data_layout,
|
data_layout: ArcSwap::new(Arc::new(data_layout)),
|
||||||
|
data_layout_persister,
|
||||||
data_fsync,
|
data_fsync,
|
||||||
compression_level,
|
compression_level,
|
||||||
mutation_lock: vec![(); MUTEX_COUNT]
|
mutation_lock: vec![(); MUTEX_COUNT]
|
||||||
|
@ -606,9 +609,10 @@ impl BlockManager {
|
||||||
|
|
||||||
/// Find the path where a block is currently stored
|
/// Find the path where a block is currently stored
|
||||||
pub(crate) async fn find_block(&self, hash: &Hash) -> Option<DataBlockPath> {
|
pub(crate) async fn find_block(&self, hash: &Hash) -> Option<DataBlockPath> {
|
||||||
let dirs = Some(self.data_layout.primary_block_dir(hash))
|
let data_layout = self.data_layout.load_full();
|
||||||
|
let dirs = Some(data_layout.primary_block_dir(hash))
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.chain(self.data_layout.secondary_block_dirs(hash));
|
.chain(data_layout.secondary_block_dirs(hash));
|
||||||
let filename = hex::encode(hash.as_ref());
|
let filename = hex::encode(hash.as_ref());
|
||||||
|
|
||||||
for dir in dirs {
|
for dir in dirs {
|
||||||
|
@ -682,7 +686,7 @@ impl BlockManagerLocked {
|
||||||
let compressed = data.is_compressed();
|
let compressed = data.is_compressed();
|
||||||
let data = data.inner_buffer();
|
let data = data.inner_buffer();
|
||||||
|
|
||||||
let directory = mgr.data_layout.primary_block_dir(hash);
|
let directory = mgr.data_layout.load().primary_block_dir(hash);
|
||||||
|
|
||||||
let mut tgt_path = directory.clone();
|
let mut tgt_path = directory.clone();
|
||||||
tgt_path.push(hex::encode(hash));
|
tgt_path.push(hex::encode(hash));
|
||||||
|
|
|
@ -518,6 +518,86 @@ impl Worker for ScrubWorker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- ---- ----
|
||||||
|
// THIRD KIND OF REPAIR: REBALANCING DATA BLOCKS
|
||||||
|
// between multiple storage locations.
|
||||||
|
// This is a one-shot repair operation that can be launched,
|
||||||
|
// checks everything, and then exits.
|
||||||
|
// ---- ---- ----
|
||||||
|
|
||||||
|
pub struct RebalanceWorker {
|
||||||
|
manager: Arc<BlockManager>,
|
||||||
|
block_iter: BlockStoreIterator,
|
||||||
|
moved: usize,
|
||||||
|
moved_bytes: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RebalanceWorker {
|
||||||
|
pub fn new(manager: Arc<BlockManager>) -> Self {
|
||||||
|
let block_iter = BlockStoreIterator::new(&manager);
|
||||||
|
Self {
|
||||||
|
manager,
|
||||||
|
block_iter,
|
||||||
|
moved: 0,
|
||||||
|
moved_bytes: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Worker for RebalanceWorker {
|
||||||
|
fn name(&self) -> String {
|
||||||
|
"Block rebalance worker".into()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn status(&self) -> WorkerStatus {
|
||||||
|
WorkerStatus {
|
||||||
|
progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)),
|
||||||
|
freeform: vec![
|
||||||
|
format!("Blocks moved: {}", self.moved),
|
||||||
|
format!("Bytes moved: {}", self.moved_bytes),
|
||||||
|
],
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
|
||||||
|
if let Some((path, hash)) = self.block_iter.next().await? {
|
||||||
|
let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash);
|
||||||
|
if path.parent().expect("no parent?") != prim_loc {
|
||||||
|
// block is not in its primary location,
|
||||||
|
// move it there (reading and re-writing does the trick)
|
||||||
|
let data = self.manager.read_block(&hash).await?;
|
||||||
|
self.manager.write_block(&hash, &data).await?;
|
||||||
|
self.moved += 1;
|
||||||
|
self.moved_bytes += data.inner_buffer().len();
|
||||||
|
}
|
||||||
|
Ok(WorkerState::Busy)
|
||||||
|
} else {
|
||||||
|
// all blocks are in their primary location:
|
||||||
|
// - the ones we moved now are
|
||||||
|
// - the ones written in the meantime always were, because we only
|
||||||
|
// write to primary locations
|
||||||
|
// so we can safely remove all secondary locations from the data layout
|
||||||
|
let new_layout = self
|
||||||
|
.manager
|
||||||
|
.data_layout
|
||||||
|
.load_full()
|
||||||
|
.without_secondary_locations();
|
||||||
|
self.manager
|
||||||
|
.data_layout_persister
|
||||||
|
.save_async(&new_layout)
|
||||||
|
.await?;
|
||||||
|
self.manager.data_layout.store(Arc::new(new_layout));
|
||||||
|
Ok(WorkerState::Done)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn wait_for_work(&mut self) -> WorkerState {
|
||||||
|
unreachable!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ---- ---- ----
|
// ---- ---- ----
|
||||||
// UTILITY FOR ENUMERATING THE BLOCK STORE
|
// UTILITY FOR ENUMERATING THE BLOCK STORE
|
||||||
// ---- ---- ----
|
// ---- ---- ----
|
||||||
|
@ -526,16 +606,16 @@ const PROGRESS_FP: u64 = 1_000_000_000;
|
||||||
|
|
||||||
impl BlockStoreIterator {
|
impl BlockStoreIterator {
|
||||||
fn new(manager: &BlockManager) -> Self {
|
fn new(manager: &BlockManager) -> Self {
|
||||||
let min_cap = manager
|
let data_layout = manager.data_layout.load_full();
|
||||||
.data_layout
|
|
||||||
|
let min_cap = data_layout
|
||||||
.data_dirs
|
.data_dirs
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|x| x.capacity())
|
.filter_map(|x| x.capacity())
|
||||||
.min()
|
.min()
|
||||||
.unwrap_or(0);
|
.unwrap_or(0);
|
||||||
|
|
||||||
let sum_cap = manager
|
let sum_cap = data_layout
|
||||||
.data_layout
|
|
||||||
.data_dirs
|
.data_dirs
|
||||||
.iter()
|
.iter()
|
||||||
.map(|x| x.capacity().unwrap_or(min_cap /* approximation */))
|
.map(|x| x.capacity().unwrap_or(min_cap /* approximation */))
|
||||||
|
@ -543,7 +623,7 @@ impl BlockStoreIterator {
|
||||||
|
|
||||||
let mut cum_cap = 0;
|
let mut cum_cap = 0;
|
||||||
let mut todo = vec![];
|
let mut todo = vec![];
|
||||||
for dir in manager.data_layout.data_dirs.iter() {
|
for dir in data_layout.data_dirs.iter() {
|
||||||
let cap = match dir.state {
|
let cap = match dir.state {
|
||||||
DataDirState::Active { capacity } => capacity,
|
DataDirState::Active { capacity } => capacity,
|
||||||
_ => min_cap,
|
_ => min_cap,
|
||||||
|
|
|
@ -471,6 +471,9 @@ pub enum RepairWhat {
|
||||||
#[structopt(subcommand)]
|
#[structopt(subcommand)]
|
||||||
cmd: ScrubCmd,
|
cmd: ScrubCmd,
|
||||||
},
|
},
|
||||||
|
/// Rebalance data blocks among storage locations
|
||||||
|
#[structopt(name = "rebalance", version = garage_version())]
|
||||||
|
Rebalance,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
|
||||||
|
|
|
@ -70,6 +70,12 @@ pub async fn launch_online_repair(
|
||||||
info!("Sending command to scrub worker: {:?}", cmd);
|
info!("Sending command to scrub worker: {:?}", cmd);
|
||||||
garage.block_manager.send_scrub_command(cmd).await?;
|
garage.block_manager.send_scrub_command(cmd).await?;
|
||||||
}
|
}
|
||||||
|
RepairWhat::Rebalance => {
|
||||||
|
info!("Rebalancing the stored blocks among storage locations");
|
||||||
|
bg.spawn_worker(garage_block::repair::RebalanceWorker::new(
|
||||||
|
garage.block_manager.clone(),
|
||||||
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue