Fix unbounded buffering when one node has slower network #792
5 changed files with 80 additions and 45 deletions
|
@ -238,10 +238,16 @@ impl BlockManager {
|
||||||
async fn rpc_get_raw_block_streaming(
|
async fn rpc_get_raw_block_streaming(
|
||||||
&self,
|
&self,
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
|
priority: RequestPriority,
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
) -> Result<DataBlockStream, Error> {
|
) -> Result<DataBlockStream, Error> {
|
||||||
self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) })
|
self.rpc_get_raw_block_internal(
|
||||||
.await
|
hash,
|
||||||
|
priority,
|
||||||
|
order_tag,
|
||||||
|
|stream| async move { Ok(stream) },
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Ask nodes that might have a (possibly compressed) block for it
|
/// Ask nodes that might have a (possibly compressed) block for it
|
||||||
|
@ -249,9 +255,10 @@ impl BlockManager {
|
||||||
pub(crate) async fn rpc_get_raw_block(
|
pub(crate) async fn rpc_get_raw_block(
|
||||||
&self,
|
&self,
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
|
priority: RequestPriority,
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
) -> Result<DataBlock, Error> {
|
) -> Result<DataBlock, Error> {
|
||||||
self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move {
|
self.rpc_get_raw_block_internal(hash, priority, order_tag, |block_stream| async move {
|
||||||
let (header, stream) = block_stream.into_parts();
|
let (header, stream) = block_stream.into_parts();
|
||||||
read_stream_to_end(stream)
|
read_stream_to_end(stream)
|
||||||
.await
|
.await
|
||||||
|
@ -264,6 +271,7 @@ impl BlockManager {
|
||||||
async fn rpc_get_raw_block_internal<F, Fut, T>(
|
async fn rpc_get_raw_block_internal<F, Fut, T>(
|
||||||
&self,
|
&self,
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
|
priority: RequestPriority,
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
f: F,
|
f: F,
|
||||||
) -> Result<T, Error>
|
) -> Result<T, Error>
|
||||||
|
@ -279,7 +287,7 @@ impl BlockManager {
|
||||||
let rpc = self.endpoint.call_streaming(
|
let rpc = self.endpoint.call_streaming(
|
||||||
&node_id,
|
&node_id,
|
||||||
BlockRpc::GetBlock(*hash, order_tag),
|
BlockRpc::GetBlock(*hash, order_tag),
|
||||||
PRIO_NORMAL | PRIO_SECONDARY,
|
priority,
|
||||||
);
|
);
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
res = rpc => {
|
res = rpc => {
|
||||||
|
@ -331,7 +339,9 @@ impl BlockManager {
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
) -> Result<ByteStream, Error> {
|
) -> Result<ByteStream, Error> {
|
||||||
let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
|
let block_stream = self
|
||||||
|
.rpc_get_raw_block_streaming(hash, PRIO_NORMAL | PRIO_SECONDARY, order_tag)
|
||||||
|
.await?;
|
||||||
let (header, stream) = block_stream.into_parts();
|
let (header, stream) = block_stream.into_parts();
|
||||||
match header {
|
match header {
|
||||||
DataBlockHeader::Plain => Ok(stream),
|
DataBlockHeader::Plain => Ok(stream),
|
||||||
|
|
|
@ -436,7 +436,7 @@ impl BlockResyncManager {
|
||||||
&manager.endpoint,
|
&manager.endpoint,
|
||||||
&need_nodes[..],
|
&need_nodes[..],
|
||||||
put_block_message,
|
put_block_message,
|
||||||
RequestStrategy::with_priority(PRIO_BACKGROUND)
|
RequestStrategy::with_priority(PRIO_BACKGROUND | PRIO_SECONDARY)
|
||||||
.with_quorum(need_nodes.len()),
|
.with_quorum(need_nodes.len()),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
|
@ -460,7 +460,9 @@ impl BlockResyncManager {
|
||||||
hash
|
hash
|
||||||
);
|
);
|
||||||
|
|
||||||
let block_data = manager.rpc_get_raw_block(hash, None).await?;
|
let block_data = manager
|
||||||
|
.rpc_get_raw_block(hash, PRIO_BACKGROUND | PRIO_SECONDARY, None)
|
||||||
|
.await?;
|
||||||
|
|
||||||
manager.metrics.resync_recv_counter.add(1);
|
manager.metrics.resync_recv_counter.add(1);
|
||||||
|
|
||||||
|
|
|
@ -28,12 +28,30 @@ use crate::util::*;
|
||||||
/// The same priority value is given to a request and to its associated response.
|
/// The same priority value is given to a request and to its associated response.
|
||||||
pub type RequestPriority = u8;
|
pub type RequestPriority = u8;
|
||||||
|
|
||||||
|
// Usage of priority levels in Garage:
|
||||||
|
//
|
||||||
|
// PRIO_HIGH
|
||||||
|
// for liveness check events such as pings and important
|
||||||
|
// reconfiguration events such as layout changes
|
||||||
|
//
|
||||||
|
// PRIO_NORMAL
|
||||||
|
// for standard interactive requests to exchange metadata
|
||||||
|
//
|
||||||
|
// PRIO_NORMAL | PRIO_SECONDARY
|
||||||
|
// for standard interactive requests to exchange block data
|
||||||
|
//
|
||||||
|
// PRIO_BACKGROUND
|
||||||
|
// for background resync requests to exchange metadata
|
||||||
|
// PRIO_BACKGROUND | PRIO_SECONDARY
|
||||||
|
// for background resync requests to exchange block data
|
||||||
|
|
||||||
/// Priority class: high
|
/// Priority class: high
|
||||||
pub const PRIO_HIGH: RequestPriority = 0x20;
|
pub const PRIO_HIGH: RequestPriority = 0x20;
|
||||||
/// Priority class: normal
|
/// Priority class: normal
|
||||||
pub const PRIO_NORMAL: RequestPriority = 0x40;
|
pub const PRIO_NORMAL: RequestPriority = 0x40;
|
||||||
/// Priority class: background
|
/// Priority class: background
|
||||||
pub const PRIO_BACKGROUND: RequestPriority = 0x80;
|
pub const PRIO_BACKGROUND: RequestPriority = 0x80;
|
||||||
|
|
||||||
/// Priority: primary among given class
|
/// Priority: primary among given class
|
||||||
pub const PRIO_PRIMARY: RequestPriority = 0x00;
|
pub const PRIO_PRIMARY: RequestPriority = 0x00;
|
||||||
/// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`)
|
/// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`)
|
||||||
|
|
|
@ -109,7 +109,7 @@ impl SendQueuePriority {
|
||||||
let i = order_vec.iter().take_while(|o2| **o2 < order).count();
|
let i = order_vec.iter().take_while(|o2| **o2 < order).count();
|
||||||
order_vec.insert(i, order);
|
order_vec.insert(i, order);
|
||||||
}
|
}
|
||||||
self.items.push_front(item);
|
self.items.push_back(item);
|
||||||
}
|
}
|
||||||
fn remove(&mut self, id: RequestID) {
|
fn remove(&mut self, id: RequestID) {
|
||||||
if let Some(i) = self.items.iter().position(|x| x.id == id) {
|
if let Some(i) = self.items.iter().position(|x| x.id == id) {
|
||||||
|
@ -128,51 +128,56 @@ impl SendQueuePriority {
|
||||||
self.items.is_empty()
|
self.items.is_empty()
|
||||||
}
|
}
|
||||||
fn poll_next_ready(&mut self, ctx: &mut Context<'_>) -> Poll<(RequestID, DataFrame)> {
|
fn poll_next_ready(&mut self, ctx: &mut Context<'_>) -> Poll<(RequestID, DataFrame)> {
|
||||||
for (j, item) in self.items.iter_mut().enumerate() {
|
// in step 1: poll only streams that have sent 0 bytes, we want to send them in priority
|
||||||
if let Some(OrderTag(stream, order)) = item.order_tag {
|
// as they most likely represent small requests to be sent first
|
||||||
if order > *self.order.get(&stream).unwrap().front().unwrap() {
|
// in step 2: poll all streams
|
||||||
|
for step in 0..2 {
|
||||||
|
for (j, item) in self.items.iter_mut().enumerate() {
|
||||||
|
if let Some(OrderTag(stream, order)) = item.order_tag {
|
||||||
|
if order > *self.order.get(&stream).unwrap().front().unwrap() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if step == 0 && item.sent > 0 {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
|
let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
|
||||||
if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) {
|
if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) {
|
||||||
let id = item.id;
|
let id = item.id;
|
||||||
let eos = item.data.eos();
|
let eos = item.data.eos();
|
||||||
|
|
||||||
let packet = bytes_or_err.map_err(|e| match e {
|
let packet = bytes_or_err.map_err(|e| match e {
|
||||||
ReadExactError::Stream(err) => err,
|
ReadExactError::Stream(err) => err,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
});
|
});
|
||||||
|
|
||||||
let is_err = packet.is_err();
|
let is_err = packet.is_err();
|
||||||
let data_frame = DataFrame::from_packet(packet, !eos);
|
let data_frame = DataFrame::from_packet(packet, !eos);
|
||||||
item.sent += data_frame.data().len();
|
item.sent += data_frame.data().len();
|
||||||
|
|
||||||
if eos || is_err {
|
if eos || is_err {
|
||||||
// If item had an order tag, remove it from the corresponding ordering list
|
// If item had an order tag, remove it from the corresponding ordering list
|
||||||
if let Some(OrderTag(stream, order)) = item.order_tag {
|
if let Some(OrderTag(stream, order)) = item.order_tag {
|
||||||
let order_stream = self.order.get_mut(&stream).unwrap();
|
let order_stream = self.order.get_mut(&stream).unwrap();
|
||||||
assert_eq!(order_stream.pop_front(), Some(order));
|
assert_eq!(order_stream.pop_front(), Some(order));
|
||||||
if order_stream.is_empty() {
|
if order_stream.is_empty() {
|
||||||
self.order.remove(&stream);
|
self.order.remove(&stream);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
// Remove item from sending queue
|
|
||||||
self.items.remove(j);
|
|
||||||
} else {
|
|
||||||
// Move item later in send queue to implement LAS scheduling
|
|
||||||
// (LAS = Least Attained Service)
|
|
||||||
for k in j..self.items.len() - 1 {
|
|
||||||
if self.items[k].sent >= self.items[k + 1].sent {
|
|
||||||
self.items.swap(k, k + 1);
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
// Remove item from sending queue
|
||||||
|
self.items.remove(j);
|
||||||
|
} else if step == 0 {
|
||||||
|
// Step 0 means that this stream had not sent any bytes yet.
|
||||||
|
// Now that it has, and it was not an EOS, we know that it is bigger
|
||||||
|
// than one chunk so move it at the end of the queue.
|
||||||
|
let item = self.items.remove(j).unwrap();
|
||||||
|
self.items.push_back(item);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return Poll::Ready((id, data_frame));
|
||||||
}
|
}
|
||||||
|
|
||||||
return Poll::Ready((id, data_frame));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -190,7 +190,7 @@ impl RecvLoop for ServerConn {
|
||||||
|
|
||||||
let (prio, resp_enc_result) = match ReqEnc::decode(stream).await {
|
let (prio, resp_enc_result) = match ReqEnc::decode(stream).await {
|
||||||
Ok(req_enc) => (req_enc.prio, self2.recv_handler_aux(req_enc).await),
|
Ok(req_enc) => (req_enc.prio, self2.recv_handler_aux(req_enc).await),
|
||||||
Err(e) => (PRIO_HIGH, Err(e)),
|
Err(e) => (PRIO_NORMAL, Err(e)),
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!("server: sending response to {}", id);
|
debug!("server: sending response to {}", id);
|
||||||
|
|
Loading…
Reference in a new issue