add debug_name in proto to differenciate messages
continuous-integration/drone/push Build was killed Details

This commit is contained in:
Alex 2022-08-31 15:58:05 +02:00
parent 700f783956
commit 01db3c4319
Signed by: lx
GPG Key ID: 0E496D15096376BE
3 changed files with 25 additions and 13 deletions

View File

@ -102,13 +102,16 @@ impl ClientConn {
netapp.connected_as_client(peer_id, conn.clone()); netapp.connected_as_client(peer_id, conn.clone());
let debug_name = format!("CLI {}", hex::encode(&peer_id[..8]));
tokio::spawn(async move { tokio::spawn(async move {
let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write)); let debug_name_2 = debug_name.clone();
let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write, debug_name_2));
let conn2 = conn.clone(); let conn2 = conn.clone();
let recv_future = tokio::spawn(async move { let recv_future = tokio::spawn(async move {
select! { select! {
r = conn2.recv_loop(read) => r, r = conn2.recv_loop(read, debug_name) => r,
_ = await_exit(stop_recv_loop_recv) => Ok(()) _ = await_exit(stop_recv_loop_recv) => Ok(())
} }
}); });

View File

@ -120,6 +120,7 @@ pub(crate) trait SendLoop: Sync {
self: Arc<Self>, self: Arc<Self>,
mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec<u8>)>, mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec<u8>)>,
mut write: BoxStreamWrite<W>, mut write: BoxStreamWrite<W>,
debug_name: String,
) -> Result<(), Error> ) -> Result<(), Error>
where where
W: AsyncWriteExt + Unpin + Send + Sync, W: AsyncWriteExt + Unpin + Send + Sync,
@ -127,9 +128,9 @@ pub(crate) trait SendLoop: Sync {
let mut sending = SendQueue::new(); let mut sending = SendQueue::new();
let mut should_exit = false; let mut should_exit = false;
while !should_exit || !sending.is_empty() { while !should_exit || !sending.is_empty() {
trace!("send_loop: queue = {}", sending.dump()); trace!("send_loop({}): queue = {}", debug_name, sending.dump());
if let Ok((id, prio, data)) = msg_recv.try_recv() { if let Ok((id, prio, data)) = msg_recv.try_recv() {
trace!("send_loop: got {}, {} bytes", id, data.len()); trace!("send_loop({}): got {}, {} bytes", debug_name, id, data.len());
sending.push(SendQueueItem { sending.push(SendQueueItem {
id, id,
prio, prio,
@ -138,7 +139,8 @@ pub(crate) trait SendLoop: Sync {
}); });
} else if let Some(mut item) = sending.pop() { } else if let Some(mut item) = sending.pop() {
trace!( trace!(
"send_loop: sending bytes for {} ({} bytes, {} already sent)", "send_loop({}): sending bytes for {} ({} bytes, {} already sent)",
debug_name,
item.id, item.id,
item.data.len(), item.data.len(),
item.cursor item.cursor
@ -168,7 +170,7 @@ pub(crate) trait SendLoop: Sync {
} else { } else {
let sth = msg_recv.recv().await; let sth = msg_recv.recv().await;
if let Some((id, prio, data)) = sth { if let Some((id, prio, data)) = sth {
trace!("send_loop: got {}, {} bytes", id, data.len()); trace!("send_loop({}): got {}, {} bytes", debug_name, id, data.len());
sending.push(SendQueueItem { sending.push(SendQueueItem {
id, id,
prio, prio,
@ -197,13 +199,17 @@ pub(crate) trait SendLoop: Sync {
pub(crate) trait RecvLoop: Sync + 'static { pub(crate) trait RecvLoop: Sync + 'static {
fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>); fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>);
async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error> async fn recv_loop<R>(
self: Arc<Self>,
mut read: R,
debug_name: String,
) -> Result<(), Error>
where where
R: AsyncReadExt + Unpin + Send + Sync, R: AsyncReadExt + Unpin + Send + Sync,
{ {
let mut receiving = HashMap::new(); let mut receiving = HashMap::new();
loop { loop {
trace!("recv_loop: reading packet"); trace!("recv_loop({}): reading packet", debug_name);
let mut header_id = [0u8; RequestID::BITS as usize / 8]; let mut header_id = [0u8; RequestID::BITS as usize / 8];
match read.read_exact(&mut header_id[..]).await { match read.read_exact(&mut header_id[..]).await {
Ok(_) => (), Ok(_) => (),
@ -211,19 +217,19 @@ pub(crate) trait RecvLoop: Sync + 'static {
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
}; };
let id = RequestID::from_be_bytes(header_id); let id = RequestID::from_be_bytes(header_id);
trace!("recv_loop: got header id: {:04x}", id); trace!("recv_loop({}): got header id: {:04x}", debug_name, id);
let mut header_size = [0u8; ChunkLength::BITS as usize / 8]; let mut header_size = [0u8; ChunkLength::BITS as usize / 8];
read.read_exact(&mut header_size[..]).await?; read.read_exact(&mut header_size[..]).await?;
let size = ChunkLength::from_be_bytes(header_size); let size = ChunkLength::from_be_bytes(header_size);
trace!("recv_loop: got header size: {:04x}", size); trace!("recv_loop({}): got header size: {:04x}", debug_name, size);
let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0; let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0;
let size = size & !CHUNK_HAS_CONTINUATION; let size = size & !CHUNK_HAS_CONTINUATION;
let mut next_slice = vec![0; size as usize]; let mut next_slice = vec![0; size as usize];
read.read_exact(&mut next_slice[..]).await?; read.read_exact(&mut next_slice[..]).await?;
trace!("recv_loop: read {} bytes", next_slice.len()); trace!("recv_loop({}): read {} bytes", debug_name, next_slice.len());
let mut msg_bytes: Vec<_> = receiving.remove(&id).unwrap_or_default(); let mut msg_bytes: Vec<_> = receiving.remove(&id).unwrap_or_default();
msg_bytes.extend_from_slice(&next_slice[..]); msg_bytes.extend_from_slice(&next_slice[..]);

View File

@ -105,14 +105,17 @@ impl ServerConn {
netapp.connected_as_server(peer_id, conn.clone()); netapp.connected_as_server(peer_id, conn.clone());
let debug_name = format!("SVR {}", hex::encode(&peer_id[..8]));
let debug_name_2 = debug_name.clone();
let conn2 = conn.clone(); let conn2 = conn.clone();
let recv_future = tokio::spawn(async move { let recv_future = tokio::spawn(async move {
select! { select! {
r = conn2.recv_loop(read) => r, r = conn2.recv_loop(read, debug_name_2) => r,
_ = await_exit(must_exit) => Ok(()) _ = await_exit(must_exit) => Ok(())
} }
}); });
let send_future = tokio::spawn(conn.clone().send_loop(resp_recv, write)); let send_future = tokio::spawn(conn.clone().send_loop(resp_recv, write, debug_name));
recv_future.await.log_err("ServerConn recv_loop"); recv_future.await.log_err("ServerConn recv_loop");
conn.resp_send.store(None); conn.resp_send.store(None);