forked from lx/netapp
Maybe fix something? Idk
This commit is contained in:
parent
5a9ae8615e
commit
32a0fbcbd9
2 changed files with 8 additions and 4 deletions
|
@ -112,6 +112,8 @@ impl SendLoop for ServerConn {}
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl RecvLoop for ServerConn {
|
impl RecvLoop for ServerConn {
|
||||||
async fn recv_handler(self: Arc<Self>, id: u16, bytes: Vec<u8>) {
|
async fn recv_handler(self: Arc<Self>, id: u16, bytes: Vec<u8>) {
|
||||||
|
debug!("ServerConn recv_handler {} ({} bytes)", id, bytes.len());
|
||||||
|
|
||||||
let bytes: Bytes = bytes.into();
|
let bytes: Bytes = bytes.into();
|
||||||
|
|
||||||
let prio = bytes[0];
|
let prio = bytes[0];
|
||||||
|
@ -265,6 +267,8 @@ impl SendLoop for ClientConn {}
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl RecvLoop for ClientConn {
|
impl RecvLoop for ClientConn {
|
||||||
async fn recv_handler(self: Arc<Self>, id: RequestID, msg: Vec<u8>) {
|
async fn recv_handler(self: Arc<Self>, id: RequestID, msg: Vec<u8>) {
|
||||||
|
debug!("ClientConn recv_handler {} ({} bytes)", id, msg.len());
|
||||||
|
|
||||||
let mut inflight = self.inflight.lock().unwrap();
|
let mut inflight = self.inflight.lock().unwrap();
|
||||||
if let Some(ch) = inflight.remove(&id) {
|
if let Some(ch) = inflight.remove(&id) {
|
||||||
if ch.send(msg).is_err() {
|
if ch.send(msg).is_err() {
|
||||||
|
|
|
@ -81,7 +81,7 @@ impl SendQueue {
|
||||||
if !items_at_prio.is_empty() {
|
if !items_at_prio.is_empty() {
|
||||||
self.items.insert(prio, items_at_prio);
|
self.items.insert(prio, items_at_prio);
|
||||||
}
|
}
|
||||||
ret
|
ret.or_else(|| self.pop())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -139,7 +139,7 @@ pub(crate) trait SendLoop: Sync {
|
||||||
|
|
||||||
write.write_all(&item.data[item.cursor..]).await?;
|
write.write_all(&item.data[item.cursor..]).await?;
|
||||||
}
|
}
|
||||||
write.flush().await.log_err("Could not flush in send_loop");
|
write.flush().await?;
|
||||||
} else {
|
} else {
|
||||||
let sth = msg_recv
|
let sth = msg_recv
|
||||||
.recv()
|
.recv()
|
||||||
|
@ -182,14 +182,14 @@ pub(crate) trait RecvLoop: Sync + 'static {
|
||||||
let mut header_size = [0u8; 2];
|
let mut header_size = [0u8; 2];
|
||||||
read.read_exact(&mut header_size[..]).await?;
|
read.read_exact(&mut header_size[..]).await?;
|
||||||
let size = RequestID::from_be_bytes(header_size);
|
let size = RequestID::from_be_bytes(header_size);
|
||||||
trace!("recv_loop: got header size: {:04x}", id);
|
trace!("recv_loop: got header size: {:04x}", size);
|
||||||
|
|
||||||
let has_cont = (size & 0x8000) != 0;
|
let has_cont = (size & 0x8000) != 0;
|
||||||
let size = size & !0x8000;
|
let size = size & !0x8000;
|
||||||
|
|
||||||
let mut next_slice = vec![0; size as usize];
|
let mut next_slice = vec![0; size as usize];
|
||||||
read.read_exact(&mut next_slice[..]).await?;
|
read.read_exact(&mut next_slice[..]).await?;
|
||||||
trace!("recv_loop: read {} bytes", size);
|
trace!("recv_loop: read {} bytes", next_slice.len());
|
||||||
|
|
||||||
let mut msg_bytes = receiving.remove(&id).unwrap_or(vec![]);
|
let mut msg_bytes = receiving.remove(&id).unwrap_or(vec![]);
|
||||||
msg_bytes.extend_from_slice(&next_slice[..]);
|
msg_bytes.extend_from_slice(&next_slice[..]);
|
||||||
|
|
Loading…
Reference in a new issue