Garage v0.9 #473
3 changed files with 59 additions and 18 deletions
|
@ -21,6 +21,7 @@ pub enum DataBlock {
|
||||||
Compressed(Bytes),
|
Compressed(Bytes),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub enum DataBlockPath {
|
pub enum DataBlockPath {
|
||||||
/// Uncompressed data fail
|
/// Uncompressed data fail
|
||||||
Plain(PathBuf),
|
Plain(PathBuf),
|
||||||
|
|
|
@ -645,6 +645,19 @@ impl BlockManager {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Rewrite a block at the primary location for its path and delete the old path.
|
||||||
|
/// Returns the number of bytes read/written
|
||||||
|
pub(crate) async fn fix_block_location(
|
||||||
|
&self,
|
||||||
|
hash: &Hash,
|
||||||
|
wrong_path: DataBlockPath,
|
||||||
|
) -> Result<usize, Error> {
|
||||||
|
self.lock_mutate(hash)
|
||||||
|
.await
|
||||||
|
.fix_block_location(hash, wrong_path, self)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
|
async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
|
||||||
let tracer = opentelemetry::global::tracer("garage");
|
let tracer = opentelemetry::global::tracer("garage");
|
||||||
let ilock = u16::from_be_bytes([hash.as_slice()[0], hash.as_slice()[1]]) as usize
|
let ilock = u16::from_be_bytes([hash.as_slice()[0], hash.as_slice()[1]]) as usize
|
||||||
|
@ -682,6 +695,17 @@ impl BlockManagerLocked {
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
data: &DataBlock,
|
data: &DataBlock,
|
||||||
mgr: &BlockManager,
|
mgr: &BlockManager,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let existing_path = mgr.find_block(hash).await;
|
||||||
|
self.write_block_inner(hash, data, mgr, existing_path).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn write_block_inner(
|
||||||
|
&self,
|
||||||
|
hash: &Hash,
|
||||||
|
data: &DataBlock,
|
||||||
|
mgr: &BlockManager,
|
||||||
|
existing_path: Option<DataBlockPath>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let compressed = data.is_compressed();
|
let compressed = data.is_compressed();
|
||||||
let data = data.inner_buffer();
|
let data = data.inner_buffer();
|
||||||
|
@ -694,7 +718,7 @@ impl BlockManagerLocked {
|
||||||
tgt_path.set_extension("zst");
|
tgt_path.set_extension("zst");
|
||||||
}
|
}
|
||||||
|
|
||||||
let to_delete = match (mgr.find_block(hash).await, compressed) {
|
let to_delete = match (existing_path, compressed) {
|
||||||
// If the block is stored in the wrong directory,
|
// If the block is stored in the wrong directory,
|
||||||
// write it again at the correct path and delete the old path
|
// write it again at the correct path and delete the old path
|
||||||
(Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p),
|
(Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p),
|
||||||
|
@ -716,6 +740,7 @@ impl BlockManagerLocked {
|
||||||
// If the block isn't stored already, just store what is given to us
|
// If the block isn't stored already, just store what is given to us
|
||||||
(None, _) => None,
|
(None, _) => None,
|
||||||
};
|
};
|
||||||
|
assert!(to_delete.as_ref() != Some(&tgt_path));
|
||||||
|
|
||||||
let mut path_tmp = tgt_path.clone();
|
let mut path_tmp = tgt_path.clone();
|
||||||
let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>()));
|
let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>()));
|
||||||
|
@ -792,6 +817,18 @@ impl BlockManagerLocked {
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn fix_block_location(
|
||||||
|
&self,
|
||||||
|
hash: &Hash,
|
||||||
|
wrong_path: DataBlockPath,
|
||||||
|
mgr: &BlockManager,
|
||||||
|
) -> Result<usize, Error> {
|
||||||
|
let data = mgr.read_block_from(hash, &wrong_path).await?;
|
||||||
|
self.write_block_inner(hash, &data, mgr, Some(wrong_path))
|
||||||
|
.await?;
|
||||||
|
Ok(data.inner_buffer().len())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {
|
async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {
|
||||||
|
|
|
@ -558,9 +558,7 @@ impl Worker for RebalanceWorker {
|
||||||
fn status(&self) -> WorkerStatus {
|
fn status(&self) -> WorkerStatus {
|
||||||
let t_cur = self.t_finished.unwrap_or_else(|| now_msec());
|
let t_cur = self.t_finished.unwrap_or_else(|| now_msec());
|
||||||
let rate = self.moved_bytes / std::cmp::max(1, (t_cur - self.t_started) / 1000);
|
let rate = self.moved_bytes / std::cmp::max(1, (t_cur - self.t_started) / 1000);
|
||||||
WorkerStatus {
|
let mut freeform = vec![
|
||||||
progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)),
|
|
||||||
freeform: vec![
|
|
||||||
format!("Blocks moved: {}", self.moved),
|
format!("Blocks moved: {}", self.moved),
|
||||||
format!(
|
format!(
|
||||||
"Bytes moved: {} ({}/s)",
|
"Bytes moved: {} ({}/s)",
|
||||||
|
@ -568,7 +566,13 @@ impl Worker for RebalanceWorker {
|
||||||
bytesize::ByteSize::b(rate)
|
bytesize::ByteSize::b(rate)
|
||||||
),
|
),
|
||||||
format!("Started: {}", msec_to_rfc3339(self.t_started)),
|
format!("Started: {}", msec_to_rfc3339(self.t_started)),
|
||||||
],
|
];
|
||||||
|
if let Some(t_fin) = self.t_finished {
|
||||||
|
freeform.push(format!("Finished: {}", msec_to_rfc3339(t_fin)))
|
||||||
|
}
|
||||||
|
WorkerStatus {
|
||||||
|
progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)),
|
||||||
|
freeform,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -576,10 +580,10 @@ impl Worker for RebalanceWorker {
|
||||||
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
|
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
|
||||||
if let Some((path, hash)) = self.block_iter.next().await? {
|
if let Some((path, hash)) = self.block_iter.next().await? {
|
||||||
let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash);
|
let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash);
|
||||||
if path.parent().expect("no parent?") != prim_loc {
|
if path.ancestors().all(|x| x != prim_loc) {
|
||||||
let path = match path.extension() {
|
let block_path = match path.extension() {
|
||||||
None => DataBlockPath::Plain(path),
|
None => DataBlockPath::Plain(path.clone()),
|
||||||
Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path),
|
Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path.clone()),
|
||||||
_ => {
|
_ => {
|
||||||
warn!("not rebalancing file: {}", path.to_string_lossy());
|
warn!("not rebalancing file: {}", path.to_string_lossy());
|
||||||
return Ok(WorkerState::Busy);
|
return Ok(WorkerState::Busy);
|
||||||
|
@ -587,11 +591,10 @@ impl Worker for RebalanceWorker {
|
||||||
};
|
};
|
||||||
// block is not in its primary location,
|
// block is not in its primary location,
|
||||||
// move it there (reading and re-writing does the trick)
|
// move it there (reading and re-writing does the trick)
|
||||||
debug!("rebalance: moving block {:?}", hash);
|
debug!("rebalance: moving block {:?} => {:?}", block_path, prim_loc);
|
||||||
let data = self.manager.read_block_from(&hash, &path).await?;
|
let block_len = self.manager.fix_block_location(&hash, block_path).await?;
|
||||||
self.manager.write_block(&hash, &data).await?;
|
|
||||||
self.moved += 1;
|
self.moved += 1;
|
||||||
self.moved_bytes += data.inner_buffer().len() as u64;
|
self.moved_bytes += block_len as u64;
|
||||||
}
|
}
|
||||||
Ok(WorkerState::Busy)
|
Ok(WorkerState::Busy)
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in a new issue