use std::collections::{HashMap, VecDeque}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use log::{trace, warn}; use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures::Stream; use futures::{AsyncReadExt, AsyncWriteExt, FutureExt, StreamExt}; use kuska_handshake::async_std::BoxStreamWrite; use tokio::sync::mpsc; use async_trait::async_trait; use crate::error::*; use crate::util::AssociatedStream; /// Priority of a request (click to read more about priorities). /// /// This priority value is used to priorize messages /// in the send queue of the client, and their responses in the send queue of the /// server. Lower values mean higher priority. /// /// This mechanism is usefull for messages bigger than the maximum chunk size /// (set at `0x4000` bytes), such as large file transfers. /// In such case, all of the messages in the send queue with the highest priority /// will take turns to send individual chunks, in a round-robin fashion. /// Once all highest priority messages are sent successfully, the messages with /// the next highest priority will begin being sent in the same way. /// /// The same priority value is given to a request and to its associated response. pub type RequestPriority = u8; /// Priority class: high pub const PRIO_HIGH: RequestPriority = 0x20; /// Priority class: normal pub const PRIO_NORMAL: RequestPriority = 0x40; /// Priority class: background pub const PRIO_BACKGROUND: RequestPriority = 0x80; /// Priority: primary among given class pub const PRIO_PRIMARY: RequestPriority = 0x00; /// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`) pub const PRIO_SECONDARY: RequestPriority = 0x01; // Messages are sent by chunks // Chunk format: // - u32 BE: request id (same for request and response) // - u16 BE: chunk length, possibly with CHUNK_HAS_CONTINUATION flag // when this is not the last chunk of the message // - [u8; chunk_length] chunk data pub(crate) type RequestID = u32; type ChunkLength = u16; pub(crate) const MAX_CHUNK_LENGTH: ChunkLength = 0x4000; const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; struct SendQueueItem { id: RequestID, prio: RequestPriority, data: DataReader, } pub(crate) enum Data { Full(Vec), Streaming(AssociatedStream), } #[pin_project::pin_project(project = DataReaderProj)] enum DataReader { Full { #[pin] data: Vec, pos: usize, }, Streaming { #[pin] reader: AssociatedStream, }, } impl From for DataReader { fn from(data: Data) -> DataReader { match data { Data::Full(data) => DataReader::Full { data, pos: 0 }, Data::Streaming(reader) => DataReader::Streaming { reader }, } } } impl Stream for DataReader { type Item = ([u8; MAX_CHUNK_LENGTH as usize], usize); fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.project() { DataReaderProj::Full { data, pos } => { let len = std::cmp::min(MAX_CHUNK_LENGTH as usize, data.len() - *pos); let end = *pos + len; if len == 0 { Poll::Ready(None) } else { let mut body = [0; MAX_CHUNK_LENGTH as usize]; body[..len].copy_from_slice(&data[*pos..end]); *pos = end; Poll::Ready(Some((body, len))) } } DataReaderProj::Streaming { reader } => { reader.poll_next(cx).map(|opt| { opt.map(|v| { let mut body = [0; MAX_CHUNK_LENGTH as usize]; let len = std::cmp::min(MAX_CHUNK_LENGTH as usize, v.len()); // TODO this can throw away long vec, they should be splited instead body[..len].copy_from_slice(&v[..len]); (body, len) }) }) } } } } struct SendQueue { items: VecDeque<(u8, VecDeque)>, } impl SendQueue { fn new() -> Self { Self { items: VecDeque::with_capacity(64), } } fn push(&mut self, item: SendQueueItem) { let prio = item.prio; let pos_prio = match self.items.binary_search_by(|(p, _)| p.cmp(&prio)) { Ok(i) => i, Err(i) => { self.items.insert(i, (prio, VecDeque::new())); i } }; self.items[pos_prio].1.push_back(item); } fn pop(&mut self) -> Option { match self.items.pop_front() { None => None, Some((prio, mut items_at_prio)) => { let ret = items_at_prio.pop_front(); if !items_at_prio.is_empty() { self.items.push_front((prio, items_at_prio)); } ret.or_else(|| self.pop()) } } } fn is_empty(&self) -> bool { self.items.iter().all(|(_k, v)| v.is_empty()) } } /// The SendLoop trait, which is implemented both by the client and the server /// connection objects (ServerConna and ClientConn) adds a method `.send_loop()` /// that takes a channel of messages to send and an asynchronous writer, /// and sends messages from the channel to the async writer, putting them in a queue /// before being sent and doing the round-robin sending strategy. /// /// The `.send_loop()` exits when the sending end of the channel is closed, /// or if there is an error at any time writing to the async writer. #[async_trait] pub(crate) trait SendLoop: Sync { async fn send_loop( self: Arc, mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Data)>, mut write: BoxStreamWrite, ) -> Result<(), Error> where W: AsyncWriteExt + Unpin + Send + Sync, { let mut sending = SendQueue::new(); let mut should_exit = false; while !should_exit || !sending.is_empty() { if let Ok((id, prio, data)) = msg_recv.try_recv() { match &data { Data::Full(data) => { trace!("send_loop: got {}, {} bytes", id, data.len()); } Data::Streaming(_) => { trace!("send_loop: got {}, unknown size", id); } } sending.push(SendQueueItem { id, prio, data: data.into(), }); } else if let Some(mut item) = sending.pop() { trace!( "send_loop: sending bytes for {}", item.id, ); let data = futures::select! { data = item.data.next().fuse() => data, default => { // nothing to send yet; re-schedule and find something else to do sending.push(item); continue; // TODO if every SendQueueItem is waiting on data, use select_all to await // something to do // TODO find some way to not require sending empty last chunk } }; let header_id = RequestID::to_be_bytes(item.id); write.write_all(&header_id[..]).await?; let data = match data.as_ref() { Some((data, len)) => &data[..*len], None => &[], }; if !data.is_empty() { let size_header = ChunkLength::to_be_bytes(data.len() as u16 | CHUNK_HAS_CONTINUATION); write.write_all(&size_header[..]).await?; write.write_all(data).await?; sending.push(item); } else { // this is always zero for now, but may be more when above TODO get fixed let size_header = ChunkLength::to_be_bytes(data.len() as u16); write.write_all(&size_header[..]).await?; write.write_all(data).await?; } write.flush().await?; } else { let sth = msg_recv.recv().await; if let Some((id, prio, data)) = sth { match &data { Data::Full(data) => { trace!("send_loop: got {}, {} bytes", id, data.len()); } Data::Streaming(_) => { trace!("send_loop: got {}, unknown size", id); } } sending.push(SendQueueItem { id, prio, data: data.into(), }); } else { should_exit = true; } } } let _ = write.goodbye().await; Ok(()) } } struct ChannelPair { receiver: Option>>, sender: Option>>, } impl ChannelPair { fn take_receiver(&mut self) -> Option>> { self.receiver.take() } fn take_sender(&mut self) -> Option>> { self.sender.take() } fn ref_sender(&mut self) -> Option<&UnboundedSender>> { self.sender.as_ref().take() } fn insert_into(self, map: &mut HashMap, index: RequestID) { if self.receiver.is_some() || self.sender.is_some() { map.insert(index, self); } } } impl Default for ChannelPair { fn default() -> Self { let (send, recv) = unbounded(); ChannelPair { receiver: Some(recv), sender: Some(send), } } } /// The RecvLoop trait, which is implemented both by the client and the server /// connection objects (ServerConn and ClientConn) adds a method `.recv_loop()` /// and a prototype of a handler for received messages `.recv_handler()` that /// must be filled by implementors. `.recv_loop()` receives messages in a loop /// according to the protocol defined above: chunks of message in progress of being /// received are stored in a buffer, and when the last chunk of a message is received, /// the full message is passed to the receive handler. #[async_trait] pub(crate) trait RecvLoop: Sync + 'static { fn recv_handler(self: &Arc, id: RequestID, msg: Vec, stream: AssociatedStream); async fn recv_loop(self: Arc, mut read: R) -> Result<(), Error> where R: AsyncReadExt + Unpin + Send + Sync, { let mut receiving: HashMap> = HashMap::new(); let mut streams: HashMap< RequestID, ChannelPair, > = HashMap::new(); loop { trace!("recv_loop: reading packet"); let mut header_id = [0u8; RequestID::BITS as usize / 8]; match read.read_exact(&mut header_id[..]).await { Ok(_) => (), Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break, Err(e) => return Err(e.into()), }; let id = RequestID::from_be_bytes(header_id); trace!("recv_loop: got header id: {:04x}", id); let mut header_size = [0u8; ChunkLength::BITS as usize / 8]; read.read_exact(&mut header_size[..]).await?; let size = ChunkLength::from_be_bytes(header_size); trace!("recv_loop: got header size: {:04x}", size); let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0; let size = size & !CHUNK_HAS_CONTINUATION; let mut next_slice = vec![0; size as usize]; read.read_exact(&mut next_slice[..]).await?; trace!("recv_loop: read {} bytes", next_slice.len()); if id & 1 == 0 { // main stream let mut msg_bytes = receiving.remove(&id).unwrap_or_default(); msg_bytes.extend_from_slice(&next_slice[..]); if has_cont { receiving.insert(id, msg_bytes); } else { let mut channel_pair = streams.remove(&(id | 1)).unwrap_or_default(); if let Some(receiver) = channel_pair.take_receiver() { self.recv_handler(id, msg_bytes, Box::pin(receiver)); } else { warn!("Couldn't take receiver part of stream") } channel_pair.insert_into(&mut streams, id | 1); } } else { // associated stream let mut channel_pair = streams.remove(&(id)).unwrap_or_default(); // if we get an error, the receiving end is disconnected. We still need to // reach eos before dropping this sender if !next_slice.is_empty() { if let Some(sender) = channel_pair.ref_sender() { let _ = sender.unbounded_send(next_slice); } else { warn!("Couldn't take sending part of stream") } } if !has_cont { channel_pair.take_sender(); } channel_pair.insert_into(&mut streams, id); } } Ok(()) } } #[cfg(test)] mod test { use super::*; #[test] fn test_priority_queue() { let i1 = SendQueueItem { id: 1, prio: PRIO_NORMAL, data: DataReader::Full { data: vec![], pos: 0, }, }; let i2 = SendQueueItem { id: 2, prio: PRIO_HIGH, data: DataReader::Full { data: vec![], pos: 0, }, }; let i2bis = SendQueueItem { id: 20, prio: PRIO_HIGH, data: DataReader::Full { data: vec![], pos: 0, }, }; let i3 = SendQueueItem { id: 3, prio: PRIO_HIGH | PRIO_SECONDARY, data: DataReader::Full { data: vec![], pos: 0, }, }; let i4 = SendQueueItem { id: 4, prio: PRIO_BACKGROUND | PRIO_SECONDARY, data: DataReader::Full { data: vec![], pos: 0, }, }; let i5 = SendQueueItem { id: 5, prio: PRIO_BACKGROUND | PRIO_PRIMARY, data: DataReader::Full { data: vec![], pos: 0, }, }; let mut q = SendQueue::new(); q.push(i1); // 1 let a = q.pop().unwrap(); // empty -> 1 assert_eq!(a.id, 1); assert!(q.pop().is_none()); q.push(a); // 1 q.push(i2); // 2 1 q.push(i2bis); // [2 20] 1 let a = q.pop().unwrap(); // 20 1 -> 2 assert_eq!(a.id, 2); let b = q.pop().unwrap(); // 1 -> 20 assert_eq!(b.id, 20); let c = q.pop().unwrap(); // empty -> 1 assert_eq!(c.id, 1); assert!(q.pop().is_none()); q.push(a); // 2 q.push(b); // [2 20] q.push(c); // [2 20] 1 q.push(i3); // [2 20] 3 1 q.push(i4); // [2 20] 3 1 4 q.push(i5); // [2 20] 3 1 5 4 let a = q.pop().unwrap(); // 20 3 1 5 4 -> 2 assert_eq!(a.id, 2); q.push(a); // [20 2] 3 1 5 4 let a = q.pop().unwrap(); // 2 3 1 5 4 -> 20 assert_eq!(a.id, 20); let b = q.pop().unwrap(); // 3 1 5 4 -> 2 assert_eq!(b.id, 2); q.push(b); // 2 3 1 5 4 let b = q.pop().unwrap(); // 3 1 5 4 -> 2 assert_eq!(b.id, 2); let c = q.pop().unwrap(); // 1 5 4 -> 3 assert_eq!(c.id, 3); q.push(b); // 2 1 5 4 let b = q.pop().unwrap(); // 1 5 4 -> 2 assert_eq!(b.id, 2); let e = q.pop().unwrap(); // 5 4 -> 1 assert_eq!(e.id, 1); let f = q.pop().unwrap(); // 4 -> 5 assert_eq!(f.id, 5); let g = q.pop().unwrap(); // empty -> 4 assert_eq!(g.id, 4); assert!(q.pop().is_none()); } }