From 4abfb75509f216f4d62bc8b18b22eb680eefe2d9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 17 Apr 2020 19:16:08 +0200 Subject: [PATCH] Implement sending blocks to nodes that need them --- Makefile | 2 +- TODO | 5 +--- src/block.rs | 61 ++++++++++++++++++++++++++++++++++++++++++++++- src/proto.rs | 2 ++ src/rpc_server.rs | 5 ++++ 5 files changed, 69 insertions(+), 6 deletions(-) diff --git a/Makefile b/Makefile index edacf683..21d3e2a3 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,3 @@ all: - cargo fmt + cargo fmt || true cargo build diff --git a/TODO b/TODO index 2209e5ab..2baa4f77 100644 --- a/TODO +++ b/TODO @@ -1,9 +1,7 @@ Replication ----------- -- for each interval of tokens, we know the list of nodes that are responsible -- every node watches the current ring and state of the network -- and thus determines the interval of tokens for which they are responsible +Finish the thing that sends blocks to other nodes if needed before deleting them locally. How are we going to test that our replication method works correctly? We will have to introduce lots of dummy data and then add/remove nodes many times. @@ -12,7 +10,6 @@ We will have to introduce lots of dummy data and then add/remove nodes many time To do list ---------- -- important: check block values on read and repare corrupted block contents - less a priority: hinted handoff - FIXME in rpc_server when garage shuts down and futures can be interrupted (tokio::spawn should be replaced by a new function background::spawn_joinable) diff --git a/src/block.rs b/src/block.rs index e209dab6..cd570bda 100644 --- a/src/block.rs +++ b/src/block.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::time::Duration; use arc_swap::ArcSwapOption; +use futures::future::*; use futures::stream::*; use tokio::fs; use tokio::prelude::*; @@ -16,6 +17,8 @@ use crate::proto::*; use crate::rpc_client::*; use crate::server::Garage; +const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); + pub struct BlockManager { pub data_dir: PathBuf, pub rc: sled::Tree, @@ -102,6 +105,22 @@ impl BlockManager { })) } + pub async fn need_block(&self, hash: &Hash) -> Result { + let needed = self + .rc + .get(hash.as_ref())? + .map(|x| u64_from_bytes(x.as_ref()) > 0) + .unwrap_or(false); + if needed { + let mut path = self.data_dir.clone(); + path.push(hex::encode(hash.as_ref())); + let exists = fs::metadata(&path).await.is_ok(); + Ok(!exists) + } else { + Ok(false) + } + } + fn block_dir(&self, hash: &Hash) -> PathBuf { let mut path = self.data_dir.clone(); path.push(hex::encode(&hash.as_slice()[0..1])); @@ -191,7 +210,47 @@ impl BlockManager { .await?; let needed_by_others = !active_refs.is_empty(); if needed_by_others { - // TODO check they have it and send it if not + let ring = garage.system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, garage.system.config.data_replication_factor); + let msg = Message::NeedBlockQuery(hash.clone()); + let who_needs_fut = who + .iter() + .map(|to| rpc_call(garage.system.clone(), to, &msg, NEED_BLOCK_QUERY_TIMEOUT)); + let who_needs = join_all(who_needs_fut).await; + + let mut need_nodes = vec![]; + let mut errors = 0; + for (node, needed) in who.into_iter().zip(who_needs.iter()) { + match needed { + Ok(Message::NeedBlockReply(true)) => { + need_nodes.push(node); + } + Err(_) => { + errors += 1; + } + _ => (), + } + } + + if errors > (garage.system.config.data_replication_factor - 1) / 2 { + return Err(Error::Message(format!( + "Should delete block, but not enough nodes confirm that they have it." + ))); + } + + if need_nodes.len() > 0 { + let put_block_message = self.read_block(hash).await?; + for resp in rpc_call_many( + garage.system.clone(), + &need_nodes[..], + put_block_message, + BLOCK_RW_TIMEOUT, + ) + .await + { + resp?; + } + } } fs::remove_file(path).await?; self.resync_queue.remove(&hash)?; diff --git a/src/proto.rs b/src/proto.rs index 7d8d3899..cf7ed1cc 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -20,6 +20,8 @@ pub enum Message { GetBlock(Hash), PutBlock(PutBlockMessage), + NeedBlockQuery(Hash), + NeedBlockReply(bool), TableRPC(String, #[serde(with = "serde_bytes")] Vec), } diff --git a/src/rpc_server.rs b/src/rpc_server.rs index c473a32d..3410ab97 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -66,6 +66,11 @@ async fn handler( tokio::spawn(write_fut).await? } Message::GetBlock(h) => garage.block_manager.read_block(&h).await, + Message::NeedBlockQuery(h) => garage + .block_manager + .need_block(&h) + .await + .map(Message::NeedBlockReply), Message::TableRPC(table, msg) => { // Same trick for table RPCs than for PutBlock