block manager: avoid incorrect data_dir configs and avoid losing files
This commit is contained in:
parent
e30865984a
commit
f38a31b330
2 changed files with 44 additions and 5 deletions
|
@ -45,6 +45,7 @@ impl DataLayout {
|
|||
// Split partitions proportionnally to capacity for all drives
|
||||
// to affect primary storage location
|
||||
let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::<u64>();
|
||||
assert!(total_cap > 0);
|
||||
|
||||
let mut part_prim = Vec::with_capacity(DRIVE_NPART);
|
||||
let mut cum_cap = 0;
|
||||
|
@ -86,6 +87,9 @@ impl DataLayout {
|
|||
return Ok(());
|
||||
}
|
||||
|
||||
let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::<u64>();
|
||||
assert!(total_cap > 0);
|
||||
|
||||
// Compute mapping of old indices to new indices
|
||||
let old2new = self
|
||||
.data_dirs
|
||||
|
@ -120,7 +124,6 @@ impl DataLayout {
|
|||
}
|
||||
|
||||
// Compute the target number of partitions per data directory
|
||||
let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::<u64>();
|
||||
let mut cum_cap = 0;
|
||||
let mut npart_per_dir = vec![0; data_dirs.len()];
|
||||
for (idir, dd) in data_dirs.iter().enumerate() {
|
||||
|
@ -182,6 +185,7 @@ impl DataLayout {
|
|||
assert!(part_prim.iter().all(|x| x.is_some()));
|
||||
assert!(unassigned.is_empty());
|
||||
|
||||
// Transform part_prim from vec of Option<Idx> to vec of Idx
|
||||
let part_prim = part_prim
|
||||
.into_iter()
|
||||
.map(|x| x.unwrap())
|
||||
|
@ -192,6 +196,25 @@ impl DataLayout {
|
|||
.unwrap_or(0)
|
||||
> 0));
|
||||
|
||||
// If any of the newly added storage locations is non-empty,
|
||||
// it might have been removed and added again and might contain data,
|
||||
// so add it as a secondary storage location for all partitions
|
||||
// to make sure existing files are not lost
|
||||
let mut part_sec = vec![vec![]; DRIVE_NPART];
|
||||
for (i, dd) in data_dirs.iter().enumerate() {
|
||||
if self.data_dirs.iter().any(|ed| ed.path == dd.path) {
|
||||
continue;
|
||||
}
|
||||
if dir_not_empty(&dd.path)? {
|
||||
for (sec, prim) in part_sec.iter_mut().zip(part_prim.iter()) {
|
||||
if *prim != i as Idx && !sec.contains(&(i as Idx)) {
|
||||
sec.push(i as Idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Apply newly generated config
|
||||
*self = Self {
|
||||
data_dirs,
|
||||
part_prim,
|
||||
|
@ -254,12 +277,18 @@ fn make_data_dirs(dirs: &DataDirEnum) -> Result<Vec<DataDir>, Error> {
|
|||
},
|
||||
}),
|
||||
DataDirEnum::Multiple(dirs) => {
|
||||
let mut ok = false;
|
||||
for dir in dirs.iter() {
|
||||
let state = match &dir.capacity {
|
||||
Some(cap) if dir.read_only == false => {
|
||||
let capacity = cap.parse::<bytesize::ByteSize>()
|
||||
.ok_or_message("invalid capacity value")?.as_u64();
|
||||
if capacity == 0 {
|
||||
return Err(Error::Message(format!("data directory {} should have non-zero capacity", dir.path.to_string_lossy())));
|
||||
}
|
||||
ok = true;
|
||||
DataDirState::Active {
|
||||
capacity: cap.parse::<bytesize::ByteSize>()
|
||||
.ok_or_message("invalid capacity value")?.as_u64(),
|
||||
capacity,
|
||||
}
|
||||
}
|
||||
None if dir.read_only == true => {
|
||||
|
@ -272,6 +301,12 @@ fn make_data_dirs(dirs: &DataDirEnum) -> Result<Vec<DataDir>, Error> {
|
|||
state,
|
||||
});
|
||||
}
|
||||
if !ok {
|
||||
return Err(Error::Message(
|
||||
"incorrect data_dir configuration, no primary writable directory specified"
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(data_dirs)
|
||||
|
|
|
@ -279,16 +279,20 @@ impl BlockManager {
|
|||
let res = match res {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
debug!("Get block {:?}: node {:?} returned error: {}", hash, node, e);
|
||||
debug!("Get block {:?}: node {:?} could not be contacted: {}", hash, node, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let (header, stream) = match res.into_parts() {
|
||||
(Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
|
||||
_ => {
|
||||
(Ok(_), _) => {
|
||||
debug!("Get block {:?}: node {:?} returned a malformed response", hash, node);
|
||||
continue;
|
||||
}
|
||||
(Err(e), _) => {
|
||||
debug!("Get block {:?}: node {:?} returned error: {}", hash, node, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
match f(header, stream).await {
|
||||
Ok(ret) => return Ok(ret),
|
||||
|
|
Loading…
Reference in a new issue