Fix ping timeout and interval #4
1 changed files with 31 additions and 12 deletions
43
src/proto.rs
43
src/proto.rs
|
@ -1,6 +1,6 @@
|
||||||
use std::collections::{HashMap, VecDeque};
|
use std::collections::{HashMap, VecDeque};
|
||||||
use std::sync::Arc;
|
|
||||||
use std::fmt::Write;
|
use std::fmt::Write;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use log::trace;
|
use log::trace;
|
||||||
|
|
||||||
|
@ -99,7 +99,14 @@ impl SendQueue {
|
||||||
let mut ret = String::new();
|
let mut ret = String::new();
|
||||||
for (prio, q) in self.items.iter() {
|
for (prio, q) in self.items.iter() {
|
||||||
for item in q.iter() {
|
for item in q.iter() {
|
||||||
write!(&mut ret, " [{} {} ({})]", prio, item.data.len() - item.cursor, item.id).unwrap();
|
write!(
|
||||||
|
&mut ret,
|
||||||
|
" [{} {} ({})]",
|
||||||
|
prio,
|
||||||
|
item.data.len() - item.cursor,
|
||||||
|
item.id
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ret
|
ret
|
||||||
|
@ -130,7 +137,13 @@ pub(crate) trait SendLoop: Sync {
|
||||||
while !should_exit || !sending.is_empty() {
|
while !should_exit || !sending.is_empty() {
|
||||||
trace!("send_loop({}): queue = {}", debug_name, sending.dump());
|
trace!("send_loop({}): queue = {}", debug_name, sending.dump());
|
||||||
if let Ok((id, prio, data)) = msg_recv.try_recv() {
|
if let Ok((id, prio, data)) = msg_recv.try_recv() {
|
||||||
trace!("send_loop({}): got {}, {} bytes", debug_name, id, data.len());
|
trace!(
|
||||||
|
"send_loop({}): new message to send, id = {}, prio = {}, {} bytes",
|
||||||
|
debug_name,
|
||||||
|
id,
|
||||||
|
prio,
|
||||||
|
data.len()
|
||||||
|
);
|
||||||
sending.push(SendQueueItem {
|
sending.push(SendQueueItem {
|
||||||
id,
|
id,
|
||||||
prio,
|
prio,
|
||||||
|
@ -170,7 +183,13 @@ pub(crate) trait SendLoop: Sync {
|
||||||
} else {
|
} else {
|
||||||
let sth = msg_recv.recv().await;
|
let sth = msg_recv.recv().await;
|
||||||
if let Some((id, prio, data)) = sth {
|
if let Some((id, prio, data)) = sth {
|
||||||
trace!("send_loop({}): got {}, {} bytes", debug_name, id, data.len());
|
trace!(
|
||||||
|
"send_loop({}): new message to send, id = {}, prio = {}, {} bytes",
|
||||||
|
debug_name,
|
||||||
|
id,
|
||||||
|
prio,
|
||||||
|
data.len()
|
||||||
|
);
|
||||||
sending.push(SendQueueItem {
|
sending.push(SendQueueItem {
|
||||||
id,
|
id,
|
||||||
prio,
|
prio,
|
||||||
|
@ -199,17 +218,12 @@ pub(crate) trait SendLoop: Sync {
|
||||||
pub(crate) trait RecvLoop: Sync + 'static {
|
pub(crate) trait RecvLoop: Sync + 'static {
|
||||||
fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>);
|
fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>);
|
||||||
|
|
||||||
async fn recv_loop<R>(
|
async fn recv_loop<R>(self: Arc<Self>, mut read: R, debug_name: String) -> Result<(), Error>
|
||||||
self: Arc<Self>,
|
|
||||||
mut read: R,
|
|
||||||
debug_name: String,
|
|
||||||
) -> Result<(), Error>
|
|
||||||
where
|
where
|
||||||
R: AsyncReadExt + Unpin + Send + Sync,
|
R: AsyncReadExt + Unpin + Send + Sync,
|
||||||
{
|
{
|
||||||
let mut receiving = HashMap::new();
|
let mut receiving = HashMap::new();
|
||||||
loop {
|
loop {
|
||||||
trace!("recv_loop({}): reading packet", debug_name);
|
|
||||||
let mut header_id = [0u8; RequestID::BITS as usize / 8];
|
let mut header_id = [0u8; RequestID::BITS as usize / 8];
|
||||||
match read.read_exact(&mut header_id[..]).await {
|
match read.read_exact(&mut header_id[..]).await {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
|
@ -217,12 +231,17 @@ pub(crate) trait RecvLoop: Sync + 'static {
|
||||||
Err(e) => return Err(e.into()),
|
Err(e) => return Err(e.into()),
|
||||||
};
|
};
|
||||||
let id = RequestID::from_be_bytes(header_id);
|
let id = RequestID::from_be_bytes(header_id);
|
||||||
trace!("recv_loop({}): got header id: {:04x}", debug_name, id);
|
|
||||||
|
|
||||||
let mut header_size = [0u8; ChunkLength::BITS as usize / 8];
|
let mut header_size = [0u8; ChunkLength::BITS as usize / 8];
|
||||||
read.read_exact(&mut header_size[..]).await?;
|
read.read_exact(&mut header_size[..]).await?;
|
||||||
let size = ChunkLength::from_be_bytes(header_size);
|
let size = ChunkLength::from_be_bytes(header_size);
|
||||||
trace!("recv_loop({}): got header size: {:04x}", debug_name, size);
|
trace!(
|
||||||
|
"recv_loop({}): got header id = {}, size = 0x{:04x} ({} bytes)",
|
||||||
|
debug_name,
|
||||||
|
id,
|
||||||
|
size,
|
||||||
|
size & !CHUNK_HAS_CONTINUATION
|
||||||
|
);
|
||||||
|
|
||||||
let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0;
|
let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0;
|
||||||
let size = size & !CHUNK_HAS_CONTINUATION;
|
let size = size & !CHUNK_HAS_CONTINUATION;
|
||||||
|
|
Loading…
Reference in a new issue