From c0eeb0b0f32ed0a27cfdf9297d0e71e1b9948b73 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 27 Mar 2024 10:44:03 +0100 Subject: [PATCH] [next-0.10] fixes to k2v rpc + comment fixes --- src/model/k2v/rpc.rs | 13 ++++--------- src/model/s3/object_table.rs | 2 +- src/rpc/layout/helper.rs | 4 ++++ src/rpc/layout/history.rs | 6 ++++++ 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index e15f2df8..95ff2d18 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -219,12 +219,11 @@ impl K2VRpcHandler { }, sort_key, }; - // TODO figure this out with write sets, is it still appropriate??? let nodes = self .item_table .data .replication - .read_nodes(&poll_key.partition.hash()); + .storage_nodes(&poll_key.partition.hash()); let rpc = self.system.rpc_helper().try_call_many( &self.endpoint, @@ -239,8 +238,7 @@ impl K2VRpcHandler { .send_all_at_once(true) .without_timeout(), ); - let timeout_duration = - Duration::from_millis(timeout_msec) + self.system.rpc_helper().rpc_timeout(); + let timeout_duration = Duration::from_millis(timeout_msec); let resps = select! { r = rpc => r?, _ = tokio::time::sleep(timeout_duration) => return Ok(None), @@ -282,12 +280,11 @@ impl K2VRpcHandler { seen.restrict(&range); // Prepare PollRange RPC to send to the storage nodes responsible for the parititon - // TODO figure this out with write sets, does it still work???? let nodes = self .item_table .data .replication - .read_nodes(&range.partition.hash()); + .storage_nodes(&range.partition.hash()); let quorum = self.item_table.data.replication.read_quorum(); let msg = K2VRpc::PollRange { range, @@ -320,9 +317,7 @@ impl K2VRpcHandler { // kind: all items produced by that node until time ts have been returned, so we can // bump the entry in the global vector clock and possibly remove some item-specific // vector clocks) - let mut deadline = Instant::now() - + Duration::from_millis(timeout_msec) - + self.system.rpc_helper().rpc_timeout(); + let mut deadline = Instant::now() + Duration::from_millis(timeout_msec); let mut resps = vec![]; let mut errors = vec![]; loop { diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index b2f25803..5c721148 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -363,7 +363,7 @@ mod v010 { } } - // Since ObjectVersionHeaders can now be serialized independently, for the + // Since ObjectVersionMetaInner can now be serialized independently, for the // purpose of being encrypted, we need it to support migrations on its own // as well. impl garage_util::migrate::InitialFormat for ObjectVersionMetaInner { diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 2835347a..e3096945 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -153,10 +153,14 @@ impl LayoutHelper { // ------------------ read helpers --------------- + /// Return all nodes that have a role (gateway or storage) + /// in one of the currently active layout versions pub fn all_nodes(&self) -> &[Uuid] { &self.all_nodes } + /// Return all nodes that are configured to store data + /// in one of the currently active layout versions pub fn all_nongateway_nodes(&self) -> &[Uuid] { &self.all_nongateway_nodes } diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 290f058d..af2cbc63 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -27,14 +27,18 @@ impl LayoutHistory { // ------------------ who stores what now? --------------- + /// Returns the layout version with the highest number pub fn current(&self) -> &LayoutVersion { self.versions.last().as_ref().unwrap() } + /// Returns the version number of the oldest layout version still active pub fn min_stored(&self) -> u64 { self.versions.first().as_ref().unwrap().version } + /// Calculate the set of all nodes that have a role (gateway or storage) + /// in one of the currently active layout versions pub fn get_all_nodes(&self) -> Vec { if self.versions.len() == 1 { self.versions[0].all_nodes().to_vec() @@ -48,6 +52,8 @@ impl LayoutHistory { } } + /// Calculate the set of all nodes that are configured to store data + /// in one of the currently active layout versions pub(crate) fn get_all_nongateway_nodes(&self) -> Vec { if self.versions.len() == 1 { self.versions[0].nongateway_nodes().to_vec()