use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use log::{error, trace}; use futures::{AsyncReadExt, AsyncWriteExt}; use kuska_handshake::async_std::BoxStreamWrite; use tokio::sync::mpsc; use async_trait::async_trait; use crate::error::*; /// Tag which is exchanged between client and server upon connection establishment /// to check that they are running compatible versions of Netapp pub const VERSION_TAG: [u8; 8] = [b'n', b'e', b't', b'a', b'p', b'p', 0x00, 0x04]; /// Priority of a request (click to read more about priorities). /// /// This priority value is used to priorize messages /// in the send queue of the client, and their responses in the send queue of the /// server. Lower values mean higher priority. /// /// This mechanism is usefull for messages bigger than the maximum chunk size /// (set at `0x4000` bytes), such as large file transfers. /// In such case, all of the messages in the send queue with the highest priority /// will take turns to send individual chunks, in a round-robin fashion. /// Once all highest priority messages are sent successfully, the messages with /// the next highest priority will begin being sent in the same way. /// /// The same priority value is given to a request and to its associated response. pub type RequestPriority = u8; /// Priority class: high pub const PRIO_HIGH: RequestPriority = 0x20; /// Priority class: normal pub const PRIO_NORMAL: RequestPriority = 0x40; /// Priority class: background pub const PRIO_BACKGROUND: RequestPriority = 0x80; /// Priority: primary among given class pub const PRIO_PRIMARY: RequestPriority = 0x00; /// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`) pub const PRIO_SECONDARY: RequestPriority = 0x01; // Messages are sent by chunks // Chunk format: // - u32 BE: request id (same for request and response) // - u16 BE: chunk length, possibly with CHUNK_HAS_CONTINUATION flag // when this is not the last chunk of the message // - [u8; chunk_length] chunk data pub(crate) type RequestID = u32; type ChunkLength = u16; const MAX_CHUNK_LENGTH: ChunkLength = 0x4000; const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; struct SendQueueItem { id: RequestID, prio: RequestPriority, data: Vec, cursor: usize, } struct SendQueue { items: VecDeque<(u8, VecDeque)>, } impl SendQueue { fn new() -> Self { Self { items: VecDeque::with_capacity(64), } } fn push(&mut self, item: SendQueueItem) { let prio = item.prio; let pos_prio = match self.items.binary_search_by(|(p, _)| p.cmp(&prio)) { Ok(i) => i, Err(i) => { self.items.insert(i, (prio, VecDeque::new())); i } }; self.items[pos_prio].1.push_back(item); } fn pop(&mut self) -> Option { match self.items.pop_front() { None => None, Some((prio, mut items_at_prio)) => { let ret = items_at_prio.pop_front(); if !items_at_prio.is_empty() { self.items.push_front((prio, items_at_prio)); } ret.or_else(|| self.pop()) } } } fn is_empty(&self) -> bool { self.items.iter().all(|(_k, v)| v.is_empty()) } } /// The SendLoop trait, which is implemented both by the client and the server /// connection objects (ServerConna and ClientConn) adds a method `.send_loop()` /// that takes a channel of messages to send and an asynchronous writer, /// and sends messages from the channel to the async writer, putting them in a queue /// before being sent and doing the round-robin sending strategy. /// /// The `.send_loop()` exits when the sending end of the channel is closed, /// or if there is an error at any time writing to the async writer. #[async_trait] pub(crate) trait SendLoop: Sync { async fn send_loop( self: Arc, mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec)>, mut write: BoxStreamWrite, ) -> Result<(), Error> where W: AsyncWriteExt + Unpin + Send + Sync, { // Before anything, send version tag, which is checked in recv_loop write.write_all(&VERSION_TAG[..]).await?; write.flush().await?; let mut sending = SendQueue::new(); let mut should_exit = false; while !should_exit || !sending.is_empty() { if let Ok((id, prio, data)) = msg_recv.try_recv() { trace!("send_loop: got {}, {} bytes", id, data.len()); sending.push(SendQueueItem { id, prio, data, cursor: 0, }); } else if let Some(mut item) = sending.pop() { trace!( "send_loop: sending bytes for {} ({} bytes, {} already sent)", item.id, item.data.len(), item.cursor ); let header_id = RequestID::to_be_bytes(item.id); write.write_all(&header_id[..]).await?; if item.data.len() - item.cursor > MAX_CHUNK_LENGTH as usize { let size_header = ChunkLength::to_be_bytes(MAX_CHUNK_LENGTH | CHUNK_HAS_CONTINUATION); write.write_all(&size_header[..]).await?; let new_cursor = item.cursor + MAX_CHUNK_LENGTH as usize; write.write_all(&item.data[item.cursor..new_cursor]).await?; item.cursor = new_cursor; sending.push(item); } else { let send_len = (item.data.len() - item.cursor) as ChunkLength; let size_header = ChunkLength::to_be_bytes(send_len); write.write_all(&size_header[..]).await?; write.write_all(&item.data[item.cursor..]).await?; } write.flush().await?; } else { let sth = msg_recv.recv().await; if let Some((id, prio, data)) = sth { trace!("send_loop: got {}, {} bytes", id, data.len()); sending.push(SendQueueItem { id, prio, data, cursor: 0, }); } else { should_exit = true; } } } let _ = write.goodbye().await; Ok(()) } } /// The RecvLoop trait, which is implemented both by the client and the server /// connection objects (ServerConn and ClientConn) adds a method `.recv_loop()` /// and a prototype of a handler for received messages `.recv_handler()` that /// must be filled by implementors. `.recv_loop()` receives messages in a loop /// according to the protocol defined above: chunks of message in progress of being /// received are stored in a buffer, and when the last chunk of a message is received, /// the full message is passed to the receive handler. #[async_trait] pub(crate) trait RecvLoop: Sync + 'static { fn recv_handler(self: &Arc, id: RequestID, msg: Vec); async fn recv_loop(self: Arc, mut read: R) -> Result<(), Error> where R: AsyncReadExt + Unpin + Send + Sync, { let mut their_version_tag = [0u8; 8]; read.read_exact(&mut their_version_tag[..]).await?; if their_version_tag != VERSION_TAG { let msg = format!( "Different netapp versions: {:?} (theirs) vs. {:?} (ours)", their_version_tag, VERSION_TAG ); error!("{}", msg); return Err(Error::VersionMismatch(msg)); } let mut receiving = HashMap::new(); loop { trace!("recv_loop: reading packet"); let mut header_id = [0u8; RequestID::BITS as usize / 8]; match read.read_exact(&mut header_id[..]).await { Ok(_) => (), Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break, Err(e) => return Err(e.into()), }; let id = RequestID::from_be_bytes(header_id); trace!("recv_loop: got header id: {:04x}", id); let mut header_size = [0u8; ChunkLength::BITS as usize / 8]; read.read_exact(&mut header_size[..]).await?; let size = ChunkLength::from_be_bytes(header_size); trace!("recv_loop: got header size: {:04x}", size); let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0; let size = size & !CHUNK_HAS_CONTINUATION; let mut next_slice = vec![0; size as usize]; read.read_exact(&mut next_slice[..]).await?; trace!("recv_loop: read {} bytes", next_slice.len()); let mut msg_bytes: Vec<_> = receiving.remove(&id).unwrap_or_default(); msg_bytes.extend_from_slice(&next_slice[..]); if has_cont { receiving.insert(id, msg_bytes); } else { self.recv_handler(id, msg_bytes); } } Ok(()) } } #[cfg(test)] mod test { use super::*; #[test] fn test_priority_queue() { let i1 = SendQueueItem { id: 1, prio: PRIO_NORMAL, data: vec![], cursor: 0, }; let i2 = SendQueueItem { id: 2, prio: PRIO_HIGH, data: vec![], cursor: 0, }; let i2bis = SendQueueItem { id: 20, prio: PRIO_HIGH, data: vec![], cursor: 0, }; let i3 = SendQueueItem { id: 3, prio: PRIO_HIGH | PRIO_SECONDARY, data: vec![], cursor: 0, }; let i4 = SendQueueItem { id: 4, prio: PRIO_BACKGROUND | PRIO_SECONDARY, data: vec![], cursor: 0, }; let i5 = SendQueueItem { id: 5, prio: PRIO_BACKGROUND | PRIO_PRIMARY, data: vec![], cursor: 0, }; let mut q = SendQueue::new(); q.push(i1); // 1 let a = q.pop().unwrap(); // empty -> 1 assert_eq!(a.id, 1); assert!(q.pop().is_none()); q.push(a); // 1 q.push(i2); // 2 1 q.push(i2bis); // [2 20] 1 let a = q.pop().unwrap(); // 20 1 -> 2 assert_eq!(a.id, 2); let b = q.pop().unwrap(); // 1 -> 20 assert_eq!(b.id, 20); let c = q.pop().unwrap(); // empty -> 1 assert_eq!(c.id, 1); assert!(q.pop().is_none()); q.push(a); // 2 q.push(b); // [2 20] q.push(c); // [2 20] 1 q.push(i3); // [2 20] 3 1 q.push(i4); // [2 20] 3 1 4 q.push(i5); // [2 20] 3 1 5 4 let a = q.pop().unwrap(); // 20 3 1 5 4 -> 2 assert_eq!(a.id, 2); q.push(a); // [20 2] 3 1 5 4 let a = q.pop().unwrap(); // 2 3 1 5 4 -> 20 assert_eq!(a.id, 20); let b = q.pop().unwrap(); // 3 1 5 4 -> 2 assert_eq!(b.id, 2); q.push(b); // 2 3 1 5 4 let b = q.pop().unwrap(); // 3 1 5 4 -> 2 assert_eq!(b.id, 2); let c = q.pop().unwrap(); // 1 5 4 -> 3 assert_eq!(c.id, 3); q.push(b); // 2 1 5 4 let b = q.pop().unwrap(); // 1 5 4 -> 2 assert_eq!(b.id, 2); let e = q.pop().unwrap(); // 5 4 -> 1 assert_eq!(e.id, 1); let f = q.pop().unwrap(); // 4 -> 5 assert_eq!(f.id, 5); let g = q.pop().unwrap(); // empty -> 4 assert_eq!(g.id, 4); assert!(q.pop().is_none()); } }