use std::collections::{HashMap, VecDeque}; use std::fmt::Write; use std::sync::Arc; use log::trace; use futures::{AsyncReadExt, AsyncWriteExt}; use kuska_handshake::async_std::BoxStreamWrite; use tokio::sync::mpsc; use async_trait::async_trait; use crate::error::*; /// Priority of a request (click to read more about priorities). /// /// This priority value is used to priorize messages /// in the send queue of the client, and their responses in the send queue of the /// server. Lower values mean higher priority. /// /// This mechanism is usefull for messages bigger than the maximum chunk size /// (set at `0x4000` bytes), such as large file transfers. /// In such case, all of the messages in the send queue with the highest priority /// will take turns to send individual chunks, in a round-robin fashion. /// Once all highest priority messages are sent successfully, the messages with /// the next highest priority will begin being sent in the same way. /// /// The same priority value is given to a request and to its associated response. pub type RequestPriority = u8; /// Priority class: high pub const PRIO_HIGH: RequestPriority = 0x20; /// Priority class: normal pub const PRIO_NORMAL: RequestPriority = 0x40; /// Priority class: background pub const PRIO_BACKGROUND: RequestPriority = 0x80; /// Priority: primary among given class pub const PRIO_PRIMARY: RequestPriority = 0x00; /// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`) pub const PRIO_SECONDARY: RequestPriority = 0x01; // Messages are sent by chunks // Chunk format: // - u32 BE: request id (same for request and response) // - u16 BE: chunk length, possibly with CHUNK_HAS_CONTINUATION flag // when this is not the last chunk of the message // - [u8; chunk_length] chunk data pub(crate) type RequestID = u32; type ChunkLength = u16; const MAX_CHUNK_LENGTH: ChunkLength = 0x4000; const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; struct SendQueueItem { id: RequestID, prio: RequestPriority, data: Vec, cursor: usize, } struct SendQueue { items: VecDeque<(u8, VecDeque)>, } impl SendQueue { fn new() -> Self { Self { items: VecDeque::with_capacity(64), } } fn push(&mut self, item: SendQueueItem) { let prio = item.prio; let pos_prio = match self.items.binary_search_by(|(p, _)| p.cmp(&prio)) { Ok(i) => i, Err(i) => { self.items.insert(i, (prio, VecDeque::new())); i } }; self.items[pos_prio].1.push_back(item); } fn pop(&mut self) -> Option { match self.items.pop_front() { None => None, Some((prio, mut items_at_prio)) => { let ret = items_at_prio.pop_front(); if !items_at_prio.is_empty() { self.items.push_front((prio, items_at_prio)); } ret.or_else(|| self.pop()) } } } fn is_empty(&self) -> bool { self.items.iter().all(|(_k, v)| v.is_empty()) } fn dump(&self) -> String { let mut ret = String::new(); for (prio, q) in self.items.iter() { for item in q.iter() { write!( &mut ret, " [{} {} ({})]", prio, item.data.len() - item.cursor, item.id ) .unwrap(); } } ret } } /// The SendLoop trait, which is implemented both by the client and the server /// connection objects (ServerConna and ClientConn) adds a method `.send_loop()` /// that takes a channel of messages to send and an asynchronous writer, /// and sends messages from the channel to the async writer, putting them in a queue /// before being sent and doing the round-robin sending strategy. /// /// The `.send_loop()` exits when the sending end of the channel is closed, /// or if there is an error at any time writing to the async writer. #[async_trait] pub(crate) trait SendLoop: Sync { async fn send_loop( self: Arc, mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec)>, mut write: BoxStreamWrite, debug_name: String, ) -> Result<(), Error> where W: AsyncWriteExt + Unpin + Send + Sync, { let mut sending = SendQueue::new(); let mut should_exit = false; while !should_exit || !sending.is_empty() { trace!("send_loop({}): queue = {}", debug_name, sending.dump()); if let Ok((id, prio, data)) = msg_recv.try_recv() { trace!( "send_loop({}): new message to send, id = {}, prio = {}, {} bytes", debug_name, id, prio, data.len() ); sending.push(SendQueueItem { id, prio, data, cursor: 0, }); } else if let Some(mut item) = sending.pop() { trace!( "send_loop({}): sending bytes for {} ({} bytes, {} already sent)", debug_name, item.id, item.data.len(), item.cursor ); let header_id = RequestID::to_be_bytes(item.id); write.write_all(&header_id[..]).await?; if item.data.len() - item.cursor > MAX_CHUNK_LENGTH as usize { let size_header = ChunkLength::to_be_bytes(MAX_CHUNK_LENGTH | CHUNK_HAS_CONTINUATION); write.write_all(&size_header[..]).await?; let new_cursor = item.cursor + MAX_CHUNK_LENGTH as usize; write.write_all(&item.data[item.cursor..new_cursor]).await?; item.cursor = new_cursor; sending.push(item); } else { let send_len = (item.data.len() - item.cursor) as ChunkLength; let size_header = ChunkLength::to_be_bytes(send_len); write.write_all(&size_header[..]).await?; write.write_all(&item.data[item.cursor..]).await?; } write.flush().await?; } else { let sth = msg_recv.recv().await; if let Some((id, prio, data)) = sth { trace!( "send_loop({}): new message to send, id = {}, prio = {}, {} bytes", debug_name, id, prio, data.len() ); sending.push(SendQueueItem { id, prio, data, cursor: 0, }); } else { should_exit = true; } } } let _ = write.goodbye().await; Ok(()) } } /// The RecvLoop trait, which is implemented both by the client and the server /// connection objects (ServerConn and ClientConn) adds a method `.recv_loop()` /// and a prototype of a handler for received messages `.recv_handler()` that /// must be filled by implementors. `.recv_loop()` receives messages in a loop /// according to the protocol defined above: chunks of message in progress of being /// received are stored in a buffer, and when the last chunk of a message is received, /// the full message is passed to the receive handler. #[async_trait] pub(crate) trait RecvLoop: Sync + 'static { fn recv_handler(self: &Arc, id: RequestID, msg: Vec); async fn recv_loop(self: Arc, mut read: R, debug_name: String) -> Result<(), Error> where R: AsyncReadExt + Unpin + Send + Sync, { let mut receiving = HashMap::new(); loop { let mut header_id = [0u8; RequestID::BITS as usize / 8]; match read.read_exact(&mut header_id[..]).await { Ok(_) => (), Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break, Err(e) => return Err(e.into()), }; let id = RequestID::from_be_bytes(header_id); let mut header_size = [0u8; ChunkLength::BITS as usize / 8]; read.read_exact(&mut header_size[..]).await?; let size = ChunkLength::from_be_bytes(header_size); trace!( "recv_loop({}): got header id = {}, size = 0x{:04x} ({} bytes)", debug_name, id, size, size & !CHUNK_HAS_CONTINUATION ); let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0; let size = size & !CHUNK_HAS_CONTINUATION; let mut next_slice = vec![0; size as usize]; read.read_exact(&mut next_slice[..]).await?; trace!("recv_loop({}): read {} bytes", debug_name, next_slice.len()); let mut msg_bytes: Vec<_> = receiving.remove(&id).unwrap_or_default(); msg_bytes.extend_from_slice(&next_slice[..]); if has_cont { receiving.insert(id, msg_bytes); } else { self.recv_handler(id, msg_bytes); } } Ok(()) } } #[cfg(test)] mod test { use super::*; #[test] fn test_priority_queue() { let i1 = SendQueueItem { id: 1, prio: PRIO_NORMAL, data: vec![], cursor: 0, }; let i2 = SendQueueItem { id: 2, prio: PRIO_HIGH, data: vec![], cursor: 0, }; let i2bis = SendQueueItem { id: 20, prio: PRIO_HIGH, data: vec![], cursor: 0, }; let i3 = SendQueueItem { id: 3, prio: PRIO_HIGH | PRIO_SECONDARY, data: vec![], cursor: 0, }; let i4 = SendQueueItem { id: 4, prio: PRIO_BACKGROUND | PRIO_SECONDARY, data: vec![], cursor: 0, }; let i5 = SendQueueItem { id: 5, prio: PRIO_BACKGROUND | PRIO_PRIMARY, data: vec![], cursor: 0, }; let mut q = SendQueue::new(); q.push(i1); // 1 let a = q.pop().unwrap(); // empty -> 1 assert_eq!(a.id, 1); assert!(q.pop().is_none()); q.push(a); // 1 q.push(i2); // 2 1 q.push(i2bis); // [2 20] 1 let a = q.pop().unwrap(); // 20 1 -> 2 assert_eq!(a.id, 2); let b = q.pop().unwrap(); // 1 -> 20 assert_eq!(b.id, 20); let c = q.pop().unwrap(); // empty -> 1 assert_eq!(c.id, 1); assert!(q.pop().is_none()); q.push(a); // 2 q.push(b); // [2 20] q.push(c); // [2 20] 1 q.push(i3); // [2 20] 3 1 q.push(i4); // [2 20] 3 1 4 q.push(i5); // [2 20] 3 1 5 4 let a = q.pop().unwrap(); // 20 3 1 5 4 -> 2 assert_eq!(a.id, 2); q.push(a); // [20 2] 3 1 5 4 let a = q.pop().unwrap(); // 2 3 1 5 4 -> 20 assert_eq!(a.id, 20); let b = q.pop().unwrap(); // 3 1 5 4 -> 2 assert_eq!(b.id, 2); q.push(b); // 2 3 1 5 4 let b = q.pop().unwrap(); // 3 1 5 4 -> 2 assert_eq!(b.id, 2); let c = q.pop().unwrap(); // 1 5 4 -> 3 assert_eq!(c.id, 3); q.push(b); // 2 1 5 4 let b = q.pop().unwrap(); // 1 5 4 -> 2 assert_eq!(b.id, 2); let e = q.pop().unwrap(); // 5 4 -> 1 assert_eq!(e.id, 1); let f = q.pop().unwrap(); // 4 -> 5 assert_eq!(f.id, 5); let g = q.pop().unwrap(); // empty -> 4 assert_eq!(g.id, 4); assert!(q.pop().is_none()); } }