use std::io::Write; use std::net; use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use bytes::Bytes; use futures::{stream, StreamExt}; use log::*; use serde::{Deserialize, Serialize}; use structopt::StructOpt; use tokio::sync::watch; use tokio_unix_tcp::NamedSocketAddr; use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::{ed25519, PublicKey}; use netapp::endpoint::*; use netapp::message::*; use netapp::peering::fullmesh::*; use netapp::util::*; use netapp::{NetApp, NodeID}; #[derive(StructOpt, Debug)] #[structopt(name = "netapp")] pub struct Opt { #[structopt(long = "network-key", short = "n")] network_key: Option, #[structopt(long = "private-key", short = "p")] private_key: Option, #[structopt(long = "bootstrap-peer", short = "b")] bootstrap_peers: Vec, #[structopt(long = "listen-addr", short = "l", default_value = "127.0.0.1:1980")] listen_addr: String, #[structopt(long = "public-addr", short = "a")] public_addr: Option, } #[tokio::main] async fn main() { env_logger::Builder::new() .parse_env("RUST_LOG") .format(|buf, record| { writeln!( buf, "{} {} {} {}", chrono::Local::now().format("%s%.6f"), record.module_path().unwrap_or("_"), record.level(), record.args() ) }) .init(); let opt = Opt::from_args(); let netid = match &opt.network_key { Some(k) => auth::Key::from_slice(&hex::decode(k).unwrap()).unwrap(), None => auth::gen_key(), }; info!("Network key: {}", hex::encode(&netid)); let privkey = match &opt.private_key { Some(k) => ed25519::SecretKey::from_slice(&hex::decode(k).unwrap()).unwrap(), None => { let (_pk, sk) = ed25519::gen_keypair(); sk } }; info!("Node private key: {}", hex::encode(&privkey)); info!("Node public key: {}", hex::encode(privkey.public_key())); let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); let listen_addr: net::SocketAddr = opt.listen_addr.parse().unwrap(); info!("Node public address: {:?}", public_addr); info!("Node listen address: {}", listen_addr); let netapp = NetApp::new(0u64, netid.clone(), privkey.clone()); let mut bootstrap_peers: Vec<(PublicKey, NamedSocketAddr)> = vec![]; for peer in opt.bootstrap_peers.iter() { bootstrap_peers.push( parse_peer_addr(peer) .map(|(node_id, socket_bind_addr)| (node_id, socket_bind_addr.into())) .expect("Invalid peer address"), ); } let peering = FullMeshPeeringStrategy::new( netapp.clone(), bootstrap_peers, public_addr.map(|a| net::SocketAddr::new(a, listen_addr.port()).into()), ); info!("Add more peers to this mesh by running: fullmesh -n {} -l 127.0.0.1:$((1000 + $RANDOM)) -b {}@{}", hex::encode(&netid), hex::encode(privkey.public_key()), listen_addr); let watch_cancel = netapp::util::watch_ctrl_c(); let example = Arc::new(Example { netapp: netapp.clone(), fullmesh: peering.clone(), example_endpoint: netapp.endpoint("__netapp/examples/fullmesh.rs/Example".into()), }); example.example_endpoint.set_handler(example.clone()); tokio::join!( example.exchange_loop(watch_cancel.clone()), netapp.listen(listen_addr.into(), public_addr, watch_cancel.clone()), peering.run(watch_cancel), ); } // ---- struct Example { netapp: Arc, fullmesh: Arc, example_endpoint: Arc>, } impl Example { async fn exchange_loop(self: Arc, must_exit: watch::Receiver) { let mut i = 12000; while !*must_exit.borrow() { tokio::time::sleep(Duration::from_secs(2)).await; let peers = self.fullmesh.get_peer_list(); for p in peers.iter() { let id = p.id; if id == self.netapp.id { continue; } i += 1; let example_field = i; let self2 = self.clone(); tokio::spawn(async move { info!( "Send example query {} to {}", example_field, hex::encode(id) ); // Fake data stream with some delays in item production let stream = Box::pin(stream::iter([100, 200, 300, 400]).then(|x| async move { tokio::time::sleep(Duration::from_millis(500)).await; Ok(Bytes::from(vec![(x % 256) as u8; 133 * x])) })); match self2 .example_endpoint .call_streaming( &id, Req::new(ExampleMessage { example_field }) .unwrap() .with_stream(stream), PRIO_NORMAL, ) .await { Ok(resp) => { let (resp, stream) = resp.into_parts(); info!( "Got example response to {} from {}: {:?}", example_field, hex::encode(id), resp ); let mut stream = stream.unwrap(); while let Some(x) = stream.next().await { info!("Response: stream got bytes {:?}", x.map(|b| b.len())); } } Err(e) => warn!("Error with example request: {}", e), } }); } } } } #[async_trait] impl StreamingEndpointHandler for Example { async fn handle( self: &Arc, mut msg: Req, _from: NodeID, ) -> Resp { info!( "Got example message: {:?}, sending example response", msg.msg() ); let source_stream = msg.take_stream().unwrap(); // Return same stream with 300ms delay let new_stream = Box::pin(source_stream.then(|x| async move { tokio::time::sleep(Duration::from_millis(300)).await; x })); Resp::new(ExampleResponse { example_field: false, }) .with_stream(new_stream) } } #[derive(Serialize, Deserialize, Debug)] struct ExampleMessage { example_field: usize, } #[derive(Serialize, Deserialize, Debug)] struct ExampleResponse { example_field: bool, } impl Message for ExampleMessage { type Response = ExampleResponse; }