diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs index 4ab8a8a..82e45c3 100644 --- a/examples/fullmesh.rs +++ b/examples/fullmesh.rs @@ -1,17 +1,24 @@ use std::io::Write; use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; -use log::info; - +use async_trait::async_trait; +use bytes::Bytes; +use futures::{stream, StreamExt}; +use log::*; +use serde::{Deserialize, Serialize}; use structopt::StructOpt; +use tokio::sync::watch; use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; +use netapp::endpoint::*; +use netapp::message::*; use netapp::peering::fullmesh::*; use netapp::util::*; - -use netapp::NetApp; +use netapp::{NetApp, NodeID}; #[derive(StructOpt, Debug)] #[structopt(name = "netapp")] @@ -92,8 +99,126 @@ async fn main() { let watch_cancel = netapp::util::watch_ctrl_c(); + let example = Arc::new(Example { + netapp: netapp.clone(), + fullmesh: peering.clone(), + example_endpoint: netapp.endpoint("__netapp/examples/fullmesh.rs/Example".into()), + }); + example.example_endpoint.set_handler(example.clone()); + tokio::join!( + example.exchange_loop(watch_cancel.clone()), netapp.listen(listen_addr, public_addr, watch_cancel.clone()), peering.run(watch_cancel), ); } + +// ---- + +struct Example { + netapp: Arc, + fullmesh: Arc, + example_endpoint: Arc>, +} + +impl Example { + async fn exchange_loop(self: Arc, must_exit: watch::Receiver) { + let mut i = 12000; + while !*must_exit.borrow() { + tokio::time::sleep(Duration::from_secs(7)).await; + + let peers = self.fullmesh.get_peer_list(); + for p in peers.iter() { + let id = p.id; + if id == self.netapp.id { + continue; + } + i += 1; + let example_field = i; + let self2 = self.clone(); + tokio::spawn(async move { + info!( + "Send example query {} to {}", + example_field, + hex::encode(id) + ); + let stream = + Box::pin(stream::iter([100, 200, 300, 400]).then(|x| async move { + tokio::time::sleep(Duration::from_millis(100)).await; + Ok(Bytes::from(vec![(x % 256) as u8; 133 * x])) + })); + match self2 + .example_endpoint + .call_streaming( + &id, + Req::new(ExampleMessage { example_field }) + .unwrap() + .with_stream(stream), + PRIO_NORMAL, + ) + .await + { + Ok(resp) => { + let (resp, stream) = resp.into_parts(); + info!( + "Got example response to {} from {}: {:?}", + example_field, + hex::encode(id), + resp + ); + let mut stream = stream.unwrap(); + while let Some(x) = stream.next().await { + info!("Response: stream got bytes {:?}", x.map(|b| b.len())); + } + } + Err(e) => warn!("Error with example request: {}", e), + } + }); + } + } + } +} + +#[async_trait] +impl StreamingEndpointHandler for Example { + async fn handle( + self: &Arc, + mut msg: Req, + _from: NodeID, + ) -> Resp { + info!( + "Got example message: {:?}, sending example response", + msg.msg() + ); + let source_stream = msg.take_stream().unwrap(); + let new_stream = Box::pin(source_stream.then(|x| async move { + info!( + "Handler: stream got bytes {:?}", + x.as_ref().map(|b| b.len()) + ); + tokio::time::sleep(Duration::from_millis(100)).await; + Ok(Bytes::from(vec![ + 10u8; + x.map(|b| b.len()).unwrap_or(1422) * 2 + ])) + })); + Resp::new(ExampleResponse { + example_field: false, + }) + .with_stream(new_stream) + } +} + +#[derive(Serialize, Deserialize, Debug)] +struct ExampleMessage { + example_field: usize, +} + +#[derive(Serialize, Deserialize, Debug)] +struct ExampleResponse { + example_field: bool, +} + +impl Message for ExampleMessage { + type Response = ExampleResponse; +}