forked from Deuxfleurs/garage
[next-0.10] fixes to k2v rpc + comment fixes
This commit is contained in:
parent
51d11b4b26
commit
c0eeb0b0f3
4 changed files with 15 additions and 10 deletions
|
@ -219,12 +219,11 @@ impl K2VRpcHandler {
|
||||||
},
|
},
|
||||||
sort_key,
|
sort_key,
|
||||||
};
|
};
|
||||||
// TODO figure this out with write sets, is it still appropriate???
|
|
||||||
let nodes = self
|
let nodes = self
|
||||||
.item_table
|
.item_table
|
||||||
.data
|
.data
|
||||||
.replication
|
.replication
|
||||||
.read_nodes(&poll_key.partition.hash());
|
.storage_nodes(&poll_key.partition.hash());
|
||||||
|
|
||||||
let rpc = self.system.rpc_helper().try_call_many(
|
let rpc = self.system.rpc_helper().try_call_many(
|
||||||
&self.endpoint,
|
&self.endpoint,
|
||||||
|
@ -239,8 +238,7 @@ impl K2VRpcHandler {
|
||||||
.send_all_at_once(true)
|
.send_all_at_once(true)
|
||||||
.without_timeout(),
|
.without_timeout(),
|
||||||
);
|
);
|
||||||
let timeout_duration =
|
let timeout_duration = Duration::from_millis(timeout_msec);
|
||||||
Duration::from_millis(timeout_msec) + self.system.rpc_helper().rpc_timeout();
|
|
||||||
let resps = select! {
|
let resps = select! {
|
||||||
r = rpc => r?,
|
r = rpc => r?,
|
||||||
_ = tokio::time::sleep(timeout_duration) => return Ok(None),
|
_ = tokio::time::sleep(timeout_duration) => return Ok(None),
|
||||||
|
@ -282,12 +280,11 @@ impl K2VRpcHandler {
|
||||||
seen.restrict(&range);
|
seen.restrict(&range);
|
||||||
|
|
||||||
// Prepare PollRange RPC to send to the storage nodes responsible for the parititon
|
// Prepare PollRange RPC to send to the storage nodes responsible for the parititon
|
||||||
// TODO figure this out with write sets, does it still work????
|
|
||||||
let nodes = self
|
let nodes = self
|
||||||
.item_table
|
.item_table
|
||||||
.data
|
.data
|
||||||
.replication
|
.replication
|
||||||
.read_nodes(&range.partition.hash());
|
.storage_nodes(&range.partition.hash());
|
||||||
let quorum = self.item_table.data.replication.read_quorum();
|
let quorum = self.item_table.data.replication.read_quorum();
|
||||||
let msg = K2VRpc::PollRange {
|
let msg = K2VRpc::PollRange {
|
||||||
range,
|
range,
|
||||||
|
@ -320,9 +317,7 @@ impl K2VRpcHandler {
|
||||||
// kind: all items produced by that node until time ts have been returned, so we can
|
// kind: all items produced by that node until time ts have been returned, so we can
|
||||||
// bump the entry in the global vector clock and possibly remove some item-specific
|
// bump the entry in the global vector clock and possibly remove some item-specific
|
||||||
// vector clocks)
|
// vector clocks)
|
||||||
let mut deadline = Instant::now()
|
let mut deadline = Instant::now() + Duration::from_millis(timeout_msec);
|
||||||
+ Duration::from_millis(timeout_msec)
|
|
||||||
+ self.system.rpc_helper().rpc_timeout();
|
|
||||||
let mut resps = vec![];
|
let mut resps = vec![];
|
||||||
let mut errors = vec![];
|
let mut errors = vec![];
|
||||||
loop {
|
loop {
|
||||||
|
|
|
@ -363,7 +363,7 @@ mod v010 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Since ObjectVersionHeaders can now be serialized independently, for the
|
// Since ObjectVersionMetaInner can now be serialized independently, for the
|
||||||
// purpose of being encrypted, we need it to support migrations on its own
|
// purpose of being encrypted, we need it to support migrations on its own
|
||||||
// as well.
|
// as well.
|
||||||
impl garage_util::migrate::InitialFormat for ObjectVersionMetaInner {
|
impl garage_util::migrate::InitialFormat for ObjectVersionMetaInner {
|
||||||
|
|
|
@ -153,10 +153,14 @@ impl LayoutHelper {
|
||||||
|
|
||||||
// ------------------ read helpers ---------------
|
// ------------------ read helpers ---------------
|
||||||
|
|
||||||
|
/// Return all nodes that have a role (gateway or storage)
|
||||||
|
/// in one of the currently active layout versions
|
||||||
pub fn all_nodes(&self) -> &[Uuid] {
|
pub fn all_nodes(&self) -> &[Uuid] {
|
||||||
&self.all_nodes
|
&self.all_nodes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return all nodes that are configured to store data
|
||||||
|
/// in one of the currently active layout versions
|
||||||
pub fn all_nongateway_nodes(&self) -> &[Uuid] {
|
pub fn all_nongateway_nodes(&self) -> &[Uuid] {
|
||||||
&self.all_nongateway_nodes
|
&self.all_nongateway_nodes
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,14 +27,18 @@ impl LayoutHistory {
|
||||||
|
|
||||||
// ------------------ who stores what now? ---------------
|
// ------------------ who stores what now? ---------------
|
||||||
|
|
||||||
|
/// Returns the layout version with the highest number
|
||||||
pub fn current(&self) -> &LayoutVersion {
|
pub fn current(&self) -> &LayoutVersion {
|
||||||
self.versions.last().as_ref().unwrap()
|
self.versions.last().as_ref().unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the version number of the oldest layout version still active
|
||||||
pub fn min_stored(&self) -> u64 {
|
pub fn min_stored(&self) -> u64 {
|
||||||
self.versions.first().as_ref().unwrap().version
|
self.versions.first().as_ref().unwrap().version
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Calculate the set of all nodes that have a role (gateway or storage)
|
||||||
|
/// in one of the currently active layout versions
|
||||||
pub fn get_all_nodes(&self) -> Vec<Uuid> {
|
pub fn get_all_nodes(&self) -> Vec<Uuid> {
|
||||||
if self.versions.len() == 1 {
|
if self.versions.len() == 1 {
|
||||||
self.versions[0].all_nodes().to_vec()
|
self.versions[0].all_nodes().to_vec()
|
||||||
|
@ -48,6 +52,8 @@ impl LayoutHistory {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Calculate the set of all nodes that are configured to store data
|
||||||
|
/// in one of the currently active layout versions
|
||||||
pub(crate) fn get_all_nongateway_nodes(&self) -> Vec<Uuid> {
|
pub(crate) fn get_all_nongateway_nodes(&self) -> Vec<Uuid> {
|
||||||
if self.versions.len() == 1 {
|
if self.versions.len() == 1 {
|
||||||
self.versions[0].nongateway_nodes().to_vec()
|
self.versions[0].nongateway_nodes().to_vec()
|
||||||
|
|
Loading…
Reference in a new issue