Better handle connection closing
Some checks reported errors
continuous-integration/drone/push Build was killed
Some checks reported errors
continuous-integration/drone/push Build was killed
This commit is contained in:
parent
9b64c27da6
commit
bb4ddf3b61
1 changed files with 9 additions and 2 deletions
11
src/proto.rs
11
src/proto.rs
|
@ -4,6 +4,7 @@ use std::sync::Arc;
|
||||||
use log::trace;
|
use log::trace;
|
||||||
|
|
||||||
use futures::{AsyncReadExt, AsyncWriteExt};
|
use futures::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
use kuska_handshake::async_std::BoxStreamWrite;
|
||||||
|
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
@ -100,7 +101,7 @@ pub(crate) trait SendLoop: Sync {
|
||||||
async fn send_loop<W>(
|
async fn send_loop<W>(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec<u8>)>,
|
mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec<u8>)>,
|
||||||
mut write: W,
|
mut write: BoxStreamWrite<W>,
|
||||||
) -> Result<(), Error>
|
) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
W: AsyncWriteExt + Unpin + Send + Sync,
|
W: AsyncWriteExt + Unpin + Send + Sync,
|
||||||
|
@ -160,6 +161,7 @@ pub(crate) trait SendLoop: Sync {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
write.goodbye().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -177,7 +179,11 @@ pub(crate) trait RecvLoop: Sync + 'static {
|
||||||
loop {
|
loop {
|
||||||
trace!("recv_loop: reading packet");
|
trace!("recv_loop: reading packet");
|
||||||
let mut header_id = [0u8; RequestID::BITS as usize / 8];
|
let mut header_id = [0u8; RequestID::BITS as usize / 8];
|
||||||
read.read_exact(&mut header_id[..]).await?;
|
match read.read_exact(&mut header_id[..]).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
|
||||||
|
Err(e) => return Err(e.into()),
|
||||||
|
};
|
||||||
let id = RequestID::from_be_bytes(header_id);
|
let id = RequestID::from_be_bytes(header_id);
|
||||||
trace!("recv_loop: got header id: {:04x}", id);
|
trace!("recv_loop: got header id: {:04x}", id);
|
||||||
|
|
||||||
|
@ -202,6 +208,7 @@ pub(crate) trait RecvLoop: Sync + 'static {
|
||||||
self.recv_handler(id, msg_bytes);
|
self.recv_handler(id, msg_bytes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue