use netapp streaming body #343

Merged
lx merged 31 commits from netapp-stream-body into main 2022-09-13 13:26:09 +00:00
4 changed files with 60 additions and 41 deletions
Showing only changes of commit e935861854 - Show all commits

2
Cargo.lock generated
View file

@ -2158,7 +2158,7 @@ dependencies = [
[[package]]
name = "netapp"
version = "0.5.0"
source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#fed0542313824df295a7e322a9aebe8ba62f97b9"
source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#74e57016f63b6052cf6d539812859c3a46138eee"
dependencies = [
"arc-swap",
"async-trait",

View file

@ -11,7 +11,7 @@ PATH="${GARAGE_DEBUG}:${GARAGE_RELEASE}:${NIX_RELEASE}:$PATH"
FANCYCOLORS=("41m" "42m" "44m" "45m" "100m" "104m")
export RUST_BACKTRACE=1
export RUST_LOG=garage=info,garage_api=debug
export RUST_LOG=garage=info,garage_api=debug,netapp=trace
MAIN_LABEL="\e[${FANCYCOLORS[0]}[main]\e[49m"
WHICH_GARAGE=$(which garage || exit 1)

View file

@ -185,6 +185,7 @@ impl BlockManager {
hash: &Hash,
) -> Result<(DataBlockHeader, ByteStream), Error> {
let who = self.replication.read_nodes(hash);
//let who = self.system.rpc.request_order(&who);
for node in who.iter() {
let node_id = NodeID::from(*node);
@ -225,6 +226,7 @@ impl BlockManager {
/// Return its entire body
async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<DataBlock, Error> {
let who = self.replication.read_nodes(hash);
//let who = self.system.rpc.request_order(&who);
for node in who.iter() {
let node_id = NodeID::from(*node);

View file

@ -292,47 +292,19 @@ impl RpcHelper {
// to reach a quorum, priorizing nodes with the lowest latency.
// When there are errors, we start new requests to compensate.
// Retrieve some status variables that we will use to sort requests
let peer_list = self.0.fullmesh.get_peer_list();
let ring: Arc<Ring> = self.0.ring.borrow().clone();
let our_zone = match ring.layout.node_role(&self.0.our_node_id) {
Some(pc) => &pc.zone,
None => "",
};
// Augment requests with some information used to sort them.
// The tuples are as follows:
// (is another node?, is another zone?, latency, node ID, request future)
// We store all of these tuples in a vec that we can sort.
// By sorting this vec, we priorize ourself, then nodes in the same zone,
// and within a same zone we priorize nodes with the lowest latency.
let mut requests = requests
.map(|(to, fut)| {
let peer_zone = match ring.layout.node_role(&to) {
Some(pc) => &pc.zone,
None => "",
};
let peer_avg_ping = peer_list
.iter()
.find(|x| x.id.as_ref() == to.as_slice())
.and_then(|pi| pi.avg_ping)
.unwrap_or_else(|| Duration::from_secs(1));
(
to != self.0.our_node_id,
peer_zone != our_zone,
peer_avg_ping,
to,
fut,
)
})
// Reorder requests to priorize closeness / low latency
let request_order = self.request_order(to);
let mut ord_requests = vec![(); request_order.len()]
.into_iter()
.map(|_| None)
.collect::<Vec<_>>();
// Sort requests by (priorize ourself, priorize same zone, priorize low latency)
requests
.sort_by_key(|(diffnode, diffzone, ping, _to, _fut)| (*diffnode, *diffzone, *ping));
for (to, fut) in requests {
let i = request_order.iter().position(|x| *x == to).unwrap();
ord_requests[i] = Some((to, fut));
}
// Make an iterator to take requests in their sorted order
let mut requests = requests.into_iter();
let mut requests = ord_requests.into_iter().map(Option::unwrap);
// resp_stream will contain all of the requests that are currently in flight.
// (for the moment none, they will be added in the loop below)
@ -343,7 +315,7 @@ impl RpcHelper {
// If the current set of requests that are running is not enough to possibly
// reach quorum, start some new requests.
while successes.len() + resp_stream.len() < quorum {
if let Some((_, _, _, req_to, fut)) = requests.next() {
if let Some((req_to, fut)) = requests.next() {
let tracer = opentelemetry::global::tracer("garage");
let span = tracer.start(format!("RPC to {:?}", req_to));
resp_stream.push(tokio::spawn(
@ -413,4 +385,49 @@ impl RpcHelper {
Err(Error::Quorum(quorum, successes.len(), to.len(), errors))
}
}
pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> {
// Retrieve some status variables that we will use to sort requests
let peer_list = self.0.fullmesh.get_peer_list();
let ring: Arc<Ring> = self.0.ring.borrow().clone();
let our_zone = match ring.layout.node_role(&self.0.our_node_id) {
Some(pc) => &pc.zone,
None => "",
};
// Augment requests with some information used to sort them.
// The tuples are as follows:
// (is another node?, is another zone?, latency, node ID, request future)
// We store all of these tuples in a vec that we can sort.
// By sorting this vec, we priorize ourself, then nodes in the same zone,
// and within a same zone we priorize nodes with the lowest latency.
let mut nodes = nodes
.iter()
.map(|to| {
let peer_zone = match ring.layout.node_role(&to) {
Some(pc) => &pc.zone,
None => "",
};
let peer_avg_ping = peer_list
.iter()
.find(|x| x.id.as_ref() == to.as_slice())
.and_then(|pi| pi.avg_ping)
.unwrap_or_else(|| Duration::from_secs(1));
(
*to != self.0.our_node_id,
peer_zone != our_zone,
peer_avg_ping,
*to,
)
})
.collect::<Vec<_>>();
// Sort requests by (priorize ourself, priorize same zone, priorize low latency)
nodes.sort_by_key(|(diffnode, diffzone, ping, _to)| (*diffnode, *diffzone, *ping));
nodes
.into_iter()
.map(|(_, _, _, to)| to)
.collect::<Vec<_>>()
}
}