forked from lx/netapp
Implement Least Attained First scheduling of streams
This commit is contained in:
parent
5af23955af
commit
0f799a7768
1 changed files with 17 additions and 5 deletions
22
src/send.rs
22
src/send.rs
|
@ -59,6 +59,7 @@ struct SendQueueItem {
|
||||||
prio: RequestPriority,
|
prio: RequestPriority,
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
data: ByteStreamReader,
|
data: ByteStreamReader,
|
||||||
|
sent: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SendQueue {
|
impl SendQueue {
|
||||||
|
@ -106,7 +107,7 @@ impl SendQueuePriority {
|
||||||
let i = order_vec.iter().take_while(|o2| **o2 < order).count();
|
let i = order_vec.iter().take_while(|o2| **o2 < order).count();
|
||||||
order_vec.insert(i, order);
|
order_vec.insert(i, order);
|
||||||
}
|
}
|
||||||
self.items.push_back(item);
|
self.items.push_front(item);
|
||||||
}
|
}
|
||||||
fn remove(&mut self, id: RequestID) {
|
fn remove(&mut self, id: RequestID) {
|
||||||
if let Some(i) = self.items.iter().position(|x| x.id == id) {
|
if let Some(i) = self.items.iter().position(|x| x.id == id) {
|
||||||
|
@ -139,7 +140,11 @@ impl SendQueuePriority {
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
});
|
});
|
||||||
|
|
||||||
if eos || packet.is_err() {
|
let is_err = packet.is_err();
|
||||||
|
let data_frame = DataFrame::from_packet(packet, !eos);
|
||||||
|
item.sent += data_frame.data().len();
|
||||||
|
|
||||||
|
if eos || is_err {
|
||||||
if let Some(OrderTag(stream, order)) = item.order_tag {
|
if let Some(OrderTag(stream, order)) = item.order_tag {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
self.order.get_mut(&stream).unwrap().pop_front(),
|
self.order.get_mut(&stream).unwrap().pop_front(),
|
||||||
|
@ -147,10 +152,16 @@ impl SendQueuePriority {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
self.items.remove(j);
|
self.items.remove(j);
|
||||||
|
} else {
|
||||||
|
for k in j..self.items.len() - 1 {
|
||||||
|
if self.items[k].sent >= self.items[k + 1].sent {
|
||||||
|
self.items.swap(k, k + 1);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let data_frame = DataFrame::from_packet(packet, !eos);
|
|
||||||
|
|
||||||
return Poll::Ready((id, data_frame));
|
return Poll::Ready((id, data_frame));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -160,7 +171,7 @@ impl SendQueuePriority {
|
||||||
fn dump(&self, prio: u8) -> String {
|
fn dump(&self, prio: u8) -> String {
|
||||||
self.items
|
self.items
|
||||||
.iter()
|
.iter()
|
||||||
.map(|i| format!("[{} {} {:?}]", prio, i.id, i.order_tag))
|
.map(|i| format!("[{} {} {:?} @{}]", prio, i.id, i.order_tag, i.sent))
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.join(" ")
|
.join(" ")
|
||||||
}
|
}
|
||||||
|
@ -294,6 +305,7 @@ pub(crate) trait SendLoop: Sync {
|
||||||
prio,
|
prio,
|
||||||
order_tag,
|
order_tag,
|
||||||
data: ByteStreamReader::new(data),
|
data: ByteStreamReader::new(data),
|
||||||
|
sent: 0,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
Some(SendItem::Cancel(id)) => {
|
Some(SendItem::Cancel(id)) => {
|
||||||
|
|
Loading…
Reference in a new issue