add streaming body to requests and responses #3
1 changed files with 12 additions and 4 deletions
16
src/send.rs
16
src/send.rs
|
@ -118,6 +118,9 @@ impl SendQueuePriority {
|
||||||
let order_vec = self.order.get_mut(&stream).unwrap();
|
let order_vec = self.order.get_mut(&stream).unwrap();
|
||||||
let j = order_vec.iter().position(|x| *x == order).unwrap();
|
let j = order_vec.iter().position(|x| *x == order).unwrap();
|
||||||
order_vec.remove(j).unwrap();
|
order_vec.remove(j).unwrap();
|
||||||
|
if order_vec.is_empty() {
|
||||||
|
self.order.remove(&stream);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -147,14 +150,19 @@ impl SendQueuePriority {
|
||||||
item.sent += data_frame.data().len();
|
item.sent += data_frame.data().len();
|
||||||
|
|
||||||
if eos || is_err {
|
if eos || is_err {
|
||||||
|
// If item had an order tag, remove it from the corresponding ordering list
|
||||||
if let Some(OrderTag(stream, order)) = item.order_tag {
|
if let Some(OrderTag(stream, order)) = item.order_tag {
|
||||||
assert_eq!(
|
let order_stream = self.order.get_mut(&stream).unwrap();
|
||||||
self.order.get_mut(&stream).unwrap().pop_front(),
|
assert_eq!(order_stream.pop_front(), Some(order));
|
||||||
Some(order)
|
if order_stream.is_empty() {
|
||||||
)
|
self.order.remove(&stream);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
// Remove item from sending queue
|
||||||
self.items.remove(j);
|
self.items.remove(j);
|
||||||
} else {
|
} else {
|
||||||
|
// Move item later in send queue to implement LAS scheduling
|
||||||
|
// (LAS = Least Attained Service)
|
||||||
for k in j..self.items.len() - 1 {
|
for k in j..self.items.len() - 1 {
|
||||||
if self.items[k].sent >= self.items[k + 1].sent {
|
if self.items[k].sent >= self.items[k + 1].sent {
|
||||||
self.items.swap(k, k + 1);
|
self.items.swap(k, k + 1);
|
||||||
|
|
Loading…
Reference in a new issue