Garage v0.9 #473
5 changed files with 256 additions and 30 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -1300,6 +1300,7 @@ dependencies = [
|
||||||
"async-compression",
|
"async-compression",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
"bytesize",
|
||||||
"futures",
|
"futures",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"garage_db",
|
"garage_db",
|
||||||
|
|
|
@ -24,6 +24,7 @@ opentelemetry = "0.17"
|
||||||
arc-swap = "1.5"
|
arc-swap = "1.5"
|
||||||
async-trait = "0.1.7"
|
async-trait = "0.1.7"
|
||||||
bytes = "1.0"
|
bytes = "1.0"
|
||||||
|
bytesize = "1.2"
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
|
|
|
@ -4,14 +4,23 @@ use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use garage_util::config::DataDirEnum;
|
use garage_util::config::DataDirEnum;
|
||||||
use garage_util::data::Hash;
|
use garage_util::data::Hash;
|
||||||
|
use garage_util::error::{Error, OkOrMessage};
|
||||||
use garage_util::migrate::*;
|
use garage_util::migrate::*;
|
||||||
|
|
||||||
pub const DRIVE_NPART: usize = 1024;
|
type Idx = u16;
|
||||||
|
|
||||||
|
const DRIVE_NPART: usize = 1024;
|
||||||
|
|
||||||
|
const DPART_BYTES: (usize, usize) = (2, 3);
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
pub(crate) struct DataLayout {
|
pub(crate) struct DataLayout {
|
||||||
pub(crate) data_dirs: Vec<DataDir>,
|
pub(crate) data_dirs: Vec<DataDir>,
|
||||||
pub(crate) partitions: Vec<Partition>,
|
|
||||||
|
/// Primary storage location (index in data_dirs) for each partition
|
||||||
|
pub(crate) part_prim: Vec<Idx>,
|
||||||
|
/// Secondary storage locations for each partition
|
||||||
|
pub(crate) part_sec: Vec<Vec<Idx>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
@ -20,38 +29,255 @@ pub(crate) struct DataDir {
|
||||||
pub(crate) state: DataDirState,
|
pub(crate) state: DataDirState,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
|
||||||
pub(crate) enum DataDirState {
|
pub(crate) enum DataDirState {
|
||||||
Active { capacity: u64 },
|
Active { capacity: u64 },
|
||||||
ReadOnly,
|
ReadOnly,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
|
||||||
pub(crate) struct Partition {
|
|
||||||
pub(crate) prim: usize,
|
|
||||||
pub(crate) sec: Vec<usize>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DataLayout {
|
impl DataLayout {
|
||||||
pub(crate) fn initialize(dirs: &DataDirEnum) -> Self {
|
pub(crate) fn initialize(dirs: &DataDirEnum) -> Result<Self, Error> {
|
||||||
todo!()
|
let data_dirs = make_data_dirs(dirs)?;
|
||||||
|
|
||||||
|
// Split partitions proportionnally to capacity for all drives
|
||||||
|
// to affect primary storage location
|
||||||
|
let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::<u64>();
|
||||||
|
|
||||||
|
let mut part_prim = Vec::with_capacity(DRIVE_NPART);
|
||||||
|
let mut cum_cap = 0;
|
||||||
|
for (i, dd) in data_dirs.iter().enumerate() {
|
||||||
|
if let DataDirState::Active { capacity } = dd.state {
|
||||||
|
cum_cap += capacity;
|
||||||
|
let n_total = (cum_cap * DRIVE_NPART as u64) / total_cap;
|
||||||
|
part_prim.resize(n_total as usize, i as Idx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert_eq!(cum_cap, total_cap);
|
||||||
|
assert_eq!(part_prim.len(), DRIVE_NPART);
|
||||||
|
|
||||||
|
// If any of the storage locations is non-empty, add it as a secondary
|
||||||
|
// storage location for all partitions
|
||||||
|
let mut part_sec = vec![vec![]; DRIVE_NPART];
|
||||||
|
for (i, dd) in data_dirs.iter().enumerate() {
|
||||||
|
if dir_not_empty(&dd.path)? {
|
||||||
|
for (sec, prim) in part_sec.iter_mut().zip(part_prim.iter()) {
|
||||||
|
if *prim != i as Idx {
|
||||||
|
sec.push(i as Idx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
data_dirs,
|
||||||
|
part_prim,
|
||||||
|
part_sec,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Self {
|
pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Result<Self, Error> {
|
||||||
todo!()
|
// Compute list of new data directories and mapping of old indices
|
||||||
|
// to new indices
|
||||||
|
let data_dirs = make_data_dirs(dirs)?;
|
||||||
|
let old2new = self
|
||||||
|
.data_dirs
|
||||||
|
.iter()
|
||||||
|
.map(|x| {
|
||||||
|
data_dirs
|
||||||
|
.iter()
|
||||||
|
.position(|y| y.path == x.path)
|
||||||
|
.map(|x| x as Idx)
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
// Compute secondary location list for partitions based on existing
|
||||||
|
// folders, translating indices from old to new
|
||||||
|
let mut part_sec = self
|
||||||
|
.part_sec
|
||||||
|
.iter()
|
||||||
|
.map(|dl| {
|
||||||
|
dl.iter()
|
||||||
|
.filter_map(|old| old2new.get(*old as usize).copied().flatten())
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
// Compute a vector that, for each data dir,
|
||||||
|
// contains the list of partitions primarily stored on that drive
|
||||||
|
let mut dir_prim = vec![vec![]; data_dirs.len()];
|
||||||
|
for (ipart, prim) in self.part_prim.iter().enumerate() {
|
||||||
|
if let Some(new) = old2new.get(*prim as usize).copied().flatten() {
|
||||||
|
dir_prim[new as usize].push(ipart);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute the target number of partitions per data directory
|
||||||
|
let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::<u64>();
|
||||||
|
let mut cum_cap = 0;
|
||||||
|
let mut npart_per_dir = vec![];
|
||||||
|
for dd in data_dirs.iter() {
|
||||||
|
if let DataDirState::Active { capacity } = dd.state {
|
||||||
|
let begin = (cum_cap * DRIVE_NPART as u64) / total_cap;
|
||||||
|
cum_cap += capacity;
|
||||||
|
let end = (cum_cap * DRIVE_NPART as u64) / total_cap;
|
||||||
|
npart_per_dir.push((end - begin) as usize);
|
||||||
|
} else {
|
||||||
|
npart_per_dir.push(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert_eq!(cum_cap, total_cap);
|
||||||
|
assert_eq!(npart_per_dir.iter().sum::<usize>(), DRIVE_NPART);
|
||||||
|
|
||||||
|
// For all directories that have too many primary partitions,
|
||||||
|
// move that partition to secondary
|
||||||
|
for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate()
|
||||||
|
{
|
||||||
|
while parts.len() > *tgt_npart {
|
||||||
|
let part = parts.pop().unwrap();
|
||||||
|
if !part_sec[part].contains(&(idir as Idx)) {
|
||||||
|
part_sec[part].push(idir as Idx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate the vector of primary partition dir index
|
||||||
|
let mut part_prim = vec![None; DRIVE_NPART];
|
||||||
|
for (idir, parts) in dir_prim.iter().enumerate() {
|
||||||
|
for part in parts.iter() {
|
||||||
|
assert!(part_prim[*part].is_none());
|
||||||
|
part_prim[*part] = Some(idir as Idx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate a vector of unassigned partitions
|
||||||
|
let mut unassigned = part_prim
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.filter(|(_, dir)| dir.is_none())
|
||||||
|
.map(|(ipart, _)| ipart)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
// For all directories that don't have enough primary partitions,
|
||||||
|
// add partitions from unassigned
|
||||||
|
for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate()
|
||||||
|
{
|
||||||
|
assert!(unassigned.len() >= *tgt_npart - parts.len());
|
||||||
|
for _ in parts.len()..*tgt_npart {
|
||||||
|
let new_part = unassigned.pop().unwrap();
|
||||||
|
part_prim[new_part] = Some(idir as Idx);
|
||||||
|
part_sec[new_part].retain(|x| *x != idir as Idx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sanity checks
|
||||||
|
assert!(part_prim.iter().all(|x| x.is_some()));
|
||||||
|
assert!(unassigned.is_empty());
|
||||||
|
|
||||||
|
let part_prim = part_prim
|
||||||
|
.into_iter()
|
||||||
|
.map(|x| x.unwrap())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
assert!(part_prim.iter().all(|p| data_dirs
|
||||||
|
.get(*p as usize)
|
||||||
|
.and_then(|x| x.capacity())
|
||||||
|
.unwrap_or(0)
|
||||||
|
> 0));
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
data_dirs,
|
||||||
|
part_prim,
|
||||||
|
part_sec,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn data_dir(&self, hash: &Hash) -> PathBuf {
|
pub(crate) fn primary_data_dir(&self, hash: &Hash) -> PathBuf {
|
||||||
todo!()
|
let ipart = self.partition_from(hash);
|
||||||
/*
|
let idir = self.part_prim[ipart] as usize;
|
||||||
let mut path = self.data_dir.clone();
|
self.data_dir_from(hash, &self.data_dirs[idir].path)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn secondary_data_dirs<'a>(&'a self, hash: &'a Hash) -> impl Iterator<Item=PathBuf> + 'a {
|
||||||
|
let ipart = self.partition_from(hash);
|
||||||
|
self.part_sec[ipart]
|
||||||
|
.iter()
|
||||||
|
.map(move |idir| self.data_dir_from(hash, &self.data_dirs[*idir as usize].path))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn partition_from(&self, hash: &Hash) -> usize {
|
||||||
|
u16::from_be_bytes([
|
||||||
|
hash.as_slice()[DPART_BYTES.0],
|
||||||
|
hash.as_slice()[DPART_BYTES.1]
|
||||||
|
]) as usize % DRIVE_NPART
|
||||||
|
}
|
||||||
|
|
||||||
|
fn data_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf {
|
||||||
|
let mut path = dir.clone();
|
||||||
path.push(hex::encode(&hash.as_slice()[0..1]));
|
path.push(hex::encode(&hash.as_slice()[0..1]));
|
||||||
path.push(hex::encode(&hash.as_slice()[1..2]));
|
path.push(hex::encode(&hash.as_slice()[1..2]));
|
||||||
path
|
path
|
||||||
*/
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl InitialFormat for DataLayout {
|
impl InitialFormat for DataLayout {
|
||||||
const VERSION_MARKER: &'static [u8] = b"G09bmdl";
|
const VERSION_MARKER: &'static [u8] = b"G09bmdl";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl DataDir {
|
||||||
|
pub fn capacity(&self) -> Option<u64> {
|
||||||
|
match self.state {
|
||||||
|
DataDirState::Active { capacity } => Some(capacity),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_data_dirs(dirs: &DataDirEnum) -> Result<Vec<DataDir>, Error> {
|
||||||
|
let mut data_dirs = vec![];
|
||||||
|
match dirs {
|
||||||
|
DataDirEnum::Single(path) => data_dirs.push(DataDir {
|
||||||
|
path: path.clone(),
|
||||||
|
state: DataDirState::Active {
|
||||||
|
capacity: 1_000_000_000, // whatever, doesn't matter
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
DataDirEnum::Multiple(dirs) => {
|
||||||
|
for dir in dirs.iter() {
|
||||||
|
let state = match &dir.capacity {
|
||||||
|
Some(cap) if dir.read_only == false => {
|
||||||
|
DataDirState::Active {
|
||||||
|
capacity: cap.parse::<bytesize::ByteSize>()
|
||||||
|
.ok_or_message("invalid capacity value")?.as_u64(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None if dir.read_only == true => {
|
||||||
|
DataDirState::ReadOnly
|
||||||
|
}
|
||||||
|
_ => return Err(Error::Message(format!("data directories in data_dir should have a capacity value or be marked read_only, not the case for {}", dir.path.to_string_lossy()))),
|
||||||
|
};
|
||||||
|
data_dirs.push(DataDir {
|
||||||
|
path: dir.path.clone(),
|
||||||
|
state,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(data_dirs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn dir_not_empty(path: &PathBuf) -> Result<bool, Error> {
|
||||||
|
for entry in std::fs::read_dir(&path)? {
|
||||||
|
let dir = entry?;
|
||||||
|
if dir.file_type()?.is_dir()
|
||||||
|
&& dir
|
||||||
|
.file_name()
|
||||||
|
.into_string()
|
||||||
|
.ok()
|
||||||
|
.and_then(|hex| hex::decode(&hex).ok())
|
||||||
|
.map(|bytes| (2..=4).contains(&bytes.len()))
|
||||||
|
.unwrap_or(false)
|
||||||
|
{
|
||||||
|
return Ok(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(false)
|
||||||
|
}
|
||||||
|
|
|
@ -125,15 +125,19 @@ impl BlockManager {
|
||||||
replication: TableShardedReplication,
|
replication: TableShardedReplication,
|
||||||
system: Arc<System>,
|
system: Arc<System>,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
|
// TODO don't panic, report error
|
||||||
let layout_persister: Persister<DataLayout> =
|
let layout_persister: Persister<DataLayout> =
|
||||||
Persister::new(&system.metadata_dir, "data_layout");
|
Persister::new(&system.metadata_dir, "data_layout");
|
||||||
let data_layout = match layout_persister.load() {
|
let data_layout = match layout_persister.load() {
|
||||||
Ok(mut layout) => {
|
Ok(mut layout) => {
|
||||||
layout.update(&data_dir);
|
layout.update(&data_dir).expect("invalid data_dir config");
|
||||||
layout
|
layout
|
||||||
}
|
}
|
||||||
Err(_) => DataLayout::initialize(&data_dir),
|
Err(_) => DataLayout::initialize(&data_dir).expect("invalid data_dir config"),
|
||||||
};
|
};
|
||||||
|
layout_persister
|
||||||
|
.save(&data_layout)
|
||||||
|
.expect("cannot save data_layout");
|
||||||
|
|
||||||
let rc = db
|
let rc = db
|
||||||
.open_tree("block_local_rc")
|
.open_tree("block_local_rc")
|
||||||
|
@ -602,7 +606,7 @@ impl BlockManager {
|
||||||
|
|
||||||
/// Utility: gives the path of the directory in which a block should be found
|
/// Utility: gives the path of the directory in which a block should be found
|
||||||
fn block_dir(&self, hash: &Hash) -> PathBuf {
|
fn block_dir(&self, hash: &Hash) -> PathBuf {
|
||||||
self.data_layout.data_dir(hash)
|
self.data_layout.primary_data_dir(hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Utility: give the full path where a block should be found, minus extension if block is
|
/// Utility: give the full path where a block should be found, minus extension if block is
|
||||||
|
|
|
@ -473,10 +473,7 @@ impl BlockStoreIterator {
|
||||||
.data_layout
|
.data_layout
|
||||||
.data_dirs
|
.data_dirs
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|x| match x.state {
|
.filter_map(|x| x.capacity())
|
||||||
DataDirState::Active { capacity } => Some(capacity),
|
|
||||||
_ => None,
|
|
||||||
})
|
|
||||||
.min()
|
.min()
|
||||||
.unwrap_or(0);
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
@ -484,10 +481,7 @@ impl BlockStoreIterator {
|
||||||
.data_layout
|
.data_layout
|
||||||
.data_dirs
|
.data_dirs
|
||||||
.iter()
|
.iter()
|
||||||
.map(|x| match x.state {
|
.map(|x| x.capacity().unwrap_or(min_cap /* approximation */))
|
||||||
DataDirState::Active { capacity } => capacity,
|
|
||||||
_ => min_cap, // approximation
|
|
||||||
})
|
|
||||||
.sum::<u64>() as u128;
|
.sum::<u64>() as u128;
|
||||||
|
|
||||||
let mut cum_cap = 0;
|
let mut cum_cap = 0;
|
||||||
|
|
Loading…
Reference in a new issue