diff --git a/Makefile b/Makefile index edacf6836..21d3e2a31 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,3 @@ all: - cargo fmt + cargo fmt || true cargo build diff --git a/TODO b/TODO index 2209e5ab8..2baa4f774 100644 --- a/TODO +++ b/TODO @@ -1,9 +1,7 @@ Replication ----------- -- for each interval of tokens, we know the list of nodes that are responsible -- every node watches the current ring and state of the network -- and thus determines the interval of tokens for which they are responsible +Finish the thing that sends blocks to other nodes if needed before deleting them locally. How are we going to test that our replication method works correctly? We will have to introduce lots of dummy data and then add/remove nodes many times. @@ -12,7 +10,6 @@ We will have to introduce lots of dummy data and then add/remove nodes many time To do list ---------- -- important: check block values on read and repare corrupted block contents - less a priority: hinted handoff - FIXME in rpc_server when garage shuts down and futures can be interrupted (tokio::spawn should be replaced by a new function background::spawn_joinable) diff --git a/src/block.rs b/src/block.rs index e209dab6a..cd570bda5 100644 --- a/src/block.rs +++ b/src/block.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::time::Duration; use arc_swap::ArcSwapOption; +use futures::future::*; use futures::stream::*; use tokio::fs; use tokio::prelude::*; @@ -16,6 +17,8 @@ use crate::proto::*; use crate::rpc_client::*; use crate::server::Garage; +const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); + pub struct BlockManager { pub data_dir: PathBuf, pub rc: sled::Tree, @@ -102,6 +105,22 @@ impl BlockManager { })) } + pub async fn need_block(&self, hash: &Hash) -> Result { + let needed = self + .rc + .get(hash.as_ref())? + .map(|x| u64_from_bytes(x.as_ref()) > 0) + .unwrap_or(false); + if needed { + let mut path = self.data_dir.clone(); + path.push(hex::encode(hash.as_ref())); + let exists = fs::metadata(&path).await.is_ok(); + Ok(!exists) + } else { + Ok(false) + } + } + fn block_dir(&self, hash: &Hash) -> PathBuf { let mut path = self.data_dir.clone(); path.push(hex::encode(&hash.as_slice()[0..1])); @@ -191,7 +210,47 @@ impl BlockManager { .await?; let needed_by_others = !active_refs.is_empty(); if needed_by_others { - // TODO check they have it and send it if not + let ring = garage.system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, garage.system.config.data_replication_factor); + let msg = Message::NeedBlockQuery(hash.clone()); + let who_needs_fut = who + .iter() + .map(|to| rpc_call(garage.system.clone(), to, &msg, NEED_BLOCK_QUERY_TIMEOUT)); + let who_needs = join_all(who_needs_fut).await; + + let mut need_nodes = vec![]; + let mut errors = 0; + for (node, needed) in who.into_iter().zip(who_needs.iter()) { + match needed { + Ok(Message::NeedBlockReply(true)) => { + need_nodes.push(node); + } + Err(_) => { + errors += 1; + } + _ => (), + } + } + + if errors > (garage.system.config.data_replication_factor - 1) / 2 { + return Err(Error::Message(format!( + "Should delete block, but not enough nodes confirm that they have it." + ))); + } + + if need_nodes.len() > 0 { + let put_block_message = self.read_block(hash).await?; + for resp in rpc_call_many( + garage.system.clone(), + &need_nodes[..], + put_block_message, + BLOCK_RW_TIMEOUT, + ) + .await + { + resp?; + } + } } fs::remove_file(path).await?; self.resync_queue.remove(&hash)?; diff --git a/src/proto.rs b/src/proto.rs index 7d8d38997..cf7ed1cc5 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -20,6 +20,8 @@ pub enum Message { GetBlock(Hash), PutBlock(PutBlockMessage), + NeedBlockQuery(Hash), + NeedBlockReply(bool), TableRPC(String, #[serde(with = "serde_bytes")] Vec), } diff --git a/src/rpc_server.rs b/src/rpc_server.rs index c473a32dd..3410ab971 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -66,6 +66,11 @@ async fn handler( tokio::spawn(write_fut).await? } Message::GetBlock(h) => garage.block_manager.read_block(&h).await, + Message::NeedBlockQuery(h) => garage + .block_manager + .need_block(&h) + .await + .map(Message::NeedBlockReply), Message::TableRPC(table, msg) => { // Same trick for table RPCs than for PutBlock