Add stream example to fullmesh example
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing

This commit is contained in:
Alex 2022-07-26 12:01:13 +02:00
parent 74e57016f6
commit bdf7d4731d
Signed by: lx
GPG key ID: 0E496D15096376BE

View file

@ -1,17 +1,24 @@
use std::io::Write;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use log::info;
use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream, StreamExt};
use log::*;
use serde::{Deserialize, Serialize};
use structopt::StructOpt;
use tokio::sync::watch;
use sodiumoxide::crypto::auth;
use sodiumoxide::crypto::sign::ed25519;
use netapp::endpoint::*;
use netapp::message::*;
use netapp::peering::fullmesh::*;
use netapp::util::*;
use netapp::NetApp;
use netapp::{NetApp, NodeID};
#[derive(StructOpt, Debug)]
#[structopt(name = "netapp")]
@ -92,8 +99,126 @@ async fn main() {
let watch_cancel = netapp::util::watch_ctrl_c();
let example = Arc::new(Example {
netapp: netapp.clone(),
fullmesh: peering.clone(),
example_endpoint: netapp.endpoint("__netapp/examples/fullmesh.rs/Example".into()),
});
example.example_endpoint.set_handler(example.clone());
tokio::join!(
example.exchange_loop(watch_cancel.clone()),
netapp.listen(listen_addr, public_addr, watch_cancel.clone()),
peering.run(watch_cancel),
);
}
// ----
struct Example {
netapp: Arc<NetApp>,
fullmesh: Arc<FullMeshPeeringStrategy>,
example_endpoint: Arc<Endpoint<ExampleMessage, Self>>,
}
impl Example {
async fn exchange_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
let mut i = 12000;
while !*must_exit.borrow() {
tokio::time::sleep(Duration::from_secs(7)).await;
let peers = self.fullmesh.get_peer_list();
for p in peers.iter() {
let id = p.id;
if id == self.netapp.id {
continue;
}
i += 1;
let example_field = i;
let self2 = self.clone();
tokio::spawn(async move {
info!(
"Send example query {} to {}",
example_field,
hex::encode(id)
);
let stream =
Box::pin(stream::iter([100, 200, 300, 400]).then(|x| async move {
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(Bytes::from(vec![(x % 256) as u8; 133 * x]))
}));
match self2
.example_endpoint
.call_streaming(
&id,
Req::new(ExampleMessage { example_field })
.unwrap()
.with_stream(stream),
PRIO_NORMAL,
)
.await
{
Ok(resp) => {
let (resp, stream) = resp.into_parts();
info!(
"Got example response to {} from {}: {:?}",
example_field,
hex::encode(id),
resp
);
let mut stream = stream.unwrap();
while let Some(x) = stream.next().await {
info!("Response: stream got bytes {:?}", x.map(|b| b.len()));
}
}
Err(e) => warn!("Error with example request: {}", e),
}
});
}
}
}
}
#[async_trait]
impl StreamingEndpointHandler<ExampleMessage> for Example {
async fn handle(
self: &Arc<Self>,
mut msg: Req<ExampleMessage>,
_from: NodeID,
) -> Resp<ExampleMessage> {
info!(
"Got example message: {:?}, sending example response",
msg.msg()
);
let source_stream = msg.take_stream().unwrap();
let new_stream = Box::pin(source_stream.then(|x| async move {
info!(
"Handler: stream got bytes {:?}",
x.as_ref().map(|b| b.len())
);
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(Bytes::from(vec![
10u8;
x.map(|b| b.len()).unwrap_or(1422) * 2
]))
}));
Resp::new(ExampleResponse {
example_field: false,
})
.with_stream(new_stream)
}
}
#[derive(Serialize, Deserialize, Debug)]
struct ExampleMessage {
example_field: usize,
}
#[derive(Serialize, Deserialize, Debug)]
struct ExampleResponse {
example_field: bool,
}
impl Message for ExampleMessage {
type Response = ExampleResponse;
}