K2V PollRange, version 2 #471
2 changed files with 61 additions and 21 deletions
|
@ -8,7 +8,7 @@
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::sync::{Arc, Mutex, MutexGuard};
|
use std::sync::{Arc, Mutex, MutexGuard};
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures::stream::FuturesUnordered;
|
use futures::stream::FuturesUnordered;
|
||||||
|
@ -35,6 +35,8 @@ use crate::k2v::item_table::*;
|
||||||
use crate::k2v::seen::*;
|
use crate::k2v::seen::*;
|
||||||
use crate::k2v::sub::*;
|
use crate::k2v::sub::*;
|
||||||
|
|
||||||
|
const POLL_RANGE_EXTRA_DELAY: Duration = Duration::from_millis(200);
|
||||||
|
|
||||||
const TIMESTAMP_KEY: &'static [u8] = b"timestamp";
|
const TIMESTAMP_KEY: &'static [u8] = b"timestamp";
|
||||||
|
|
||||||
/// RPC messages for K2V
|
/// RPC messages for K2V
|
||||||
|
@ -271,6 +273,8 @@ impl K2VRpcHandler {
|
||||||
) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, HelperError> {
|
) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, HelperError> {
|
||||||
let has_seen_marker = seen_str.is_some();
|
let has_seen_marker = seen_str.is_some();
|
||||||
|
|
||||||
|
// Parse seen marker, we will use it below. This is also the first check
|
||||||
|
// that it is valid, which returns a bad request error if not.
|
||||||
let mut seen = seen_str
|
let mut seen = seen_str
|
||||||
.as_deref()
|
.as_deref()
|
||||||
.map(RangeSeenMarker::decode_helper)
|
.map(RangeSeenMarker::decode_helper)
|
||||||
|
@ -278,30 +282,67 @@ impl K2VRpcHandler {
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
seen.restrict(&range);
|
seen.restrict(&range);
|
||||||
|
|
||||||
|
// Prepare PollRange RPC to send to the storage nodes responsible for the parititon
|
||||||
let nodes = self
|
let nodes = self
|
||||||
.item_table
|
.item_table
|
||||||
.data
|
.data
|
||||||
.replication
|
.replication
|
||||||
.write_nodes(&range.partition.hash());
|
.write_nodes(&range.partition.hash());
|
||||||
|
let quorum = self.item_table.data.replication.read_quorum();
|
||||||
let rpc = self.system.rpc.try_call_many(
|
let msg = K2VRpc::PollRange {
|
||||||
&self.endpoint,
|
range,
|
||||||
&nodes[..],
|
seen_str,
|
||||||
K2VRpc::PollRange {
|
timeout_msec,
|
||||||
range,
|
|
||||||
seen_str,
|
|
||||||
timeout_msec,
|
|
||||||
},
|
|
||||||
RequestStrategy::with_priority(PRIO_NORMAL)
|
|
||||||
.with_quorum(self.item_table.data.replication.read_quorum())
|
|
||||||
.without_timeout(),
|
|
||||||
);
|
|
||||||
let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
|
|
||||||
let resps = select! {
|
|
||||||
r = rpc => r?,
|
|
||||||
_ = tokio::time::sleep(timeout_duration) => return Ok(None),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Send the request to all nodes, use FuturesUnordered to get the responses in any order
|
||||||
|
let msg = msg.into_req().map_err(netapp::error::Error::from)?;
|
||||||
|
let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
|
||||||
|
let mut requests = nodes
|
||||||
|
.iter()
|
||||||
|
.map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs))
|
||||||
|
.collect::<FuturesUnordered<_>>();
|
||||||
|
|
||||||
|
// Fetch responses. This procedure stops fetching responses when any of the following
|
||||||
|
// conditions arise:
|
||||||
|
// - we have a response to all requests
|
||||||
|
// - we have a response to a read quorum of requests (e.g. 2/3), and an extra delay
|
||||||
|
// has passed since the quorum was achieved
|
||||||
|
// - a global RPC timeout expired
|
||||||
|
// The extra delay after a quorum was received is usefull if the third response was to
|
||||||
|
// arrive during this short interval: this would allow us to consider all the data seen
|
||||||
|
// by that last node in the response we produce, and would likely help reduce the
|
||||||
|
// size of the seen marker that we will return (because we would have an info of the
|
||||||
|
// kind: all items produced by that node until time ts have been returned, so we can
|
||||||
|
// bump the entry in the global vector clock and possibly remove some item-specific
|
||||||
|
// vector clocks)
|
||||||
|
let mut deadline =
|
||||||
|
Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
|
||||||
|
let mut resps = vec![];
|
||||||
|
let mut errors = vec![];
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
_ = tokio::time::sleep_until(deadline.into()) => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
res = requests.next() => match res {
|
||||||
|
None => break,
|
||||||
|
Some(Err(e)) => errors.push(e),
|
||||||
|
Some(Ok(r)) => {
|
||||||
|
resps.push(r);
|
||||||
|
if resps.len() >= quorum {
|
||||||
|
deadline = std::cmp::min(deadline, Instant::now() + POLL_RANGE_EXTRA_DELAY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if errors.len() > nodes.len() - quorum {
|
||||||
|
let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
|
||||||
|
return Err(Error::Quorum(quorum, resps.len(), nodes.len(), errors).into());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Take all returned items into account to produce the response.
|
||||||
let mut new_items = BTreeMap::<String, K2VItem>::new();
|
let mut new_items = BTreeMap::<String, K2VItem>::new();
|
||||||
for v in resps {
|
for v in resps {
|
||||||
if let K2VRpc::PollRangeResponse(node, items) = v {
|
if let K2VRpc::PollRangeResponse(node, items) = v {
|
||||||
|
|
|
@ -15,10 +15,9 @@ use opentelemetry::{
|
||||||
};
|
};
|
||||||
|
|
||||||
pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler};
|
pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler};
|
||||||
use netapp::message::IntoReq;
|
|
||||||
pub use netapp::message::{
|
pub use netapp::message::{
|
||||||
Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL,
|
IntoReq, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH,
|
||||||
PRIO_SECONDARY,
|
PRIO_NORMAL, PRIO_SECONDARY,
|
||||||
};
|
};
|
||||||
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
|
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
|
||||||
pub use netapp::{self, NetApp, NodeID};
|
pub use netapp::{self, NetApp, NodeID};
|
||||||
|
|
Loading…
Reference in a new issue