forked from lx/netapp
Change call() to take a ref to the message to be sent
Handlers also receive a ref
This commit is contained in:
parent
fba49cf93d
commit
8a0bfa0ff6
6 changed files with 18 additions and 18 deletions
|
@ -145,7 +145,7 @@ impl Example {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
match self2
|
match self2
|
||||||
.example_endpoint
|
.example_endpoint
|
||||||
.call(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL)
|
.call(&p, &ExampleMessage { example_field: 42 }, PRIO_NORMAL)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(resp) => debug!("Got example response: {:?}", resp),
|
Ok(resp) => debug!("Got example response: {:?}", resp),
|
||||||
|
@ -159,7 +159,7 @@ impl Example {
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler<ExampleMessage> for Example {
|
impl EndpointHandler<ExampleMessage> for Example {
|
||||||
async fn handle(self: &Arc<Self>, msg: ExampleMessage, _from: NodeID) -> ExampleResponse {
|
async fn handle(self: &Arc<Self>, msg: &ExampleMessage, _from: NodeID) -> ExampleResponse {
|
||||||
debug!("Got example message: {:?}, sending example response", msg);
|
debug!("Got example message: {:?}, sending example response", msg);
|
||||||
ExampleResponse {
|
ExampleResponse {
|
||||||
example_field: false,
|
example_field: false,
|
||||||
|
|
|
@ -112,7 +112,7 @@ impl ClientConn {
|
||||||
|
|
||||||
pub(crate) async fn call<T>(
|
pub(crate) async fn call<T>(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
rq: T,
|
rq: &T,
|
||||||
path: &str,
|
path: &str,
|
||||||
prio: RequestPriority,
|
prio: RequestPriority,
|
||||||
) -> Result<<T as Message>::Response, Error>
|
) -> Result<<T as Message>::Response, Error>
|
||||||
|
@ -127,7 +127,7 @@ impl ClientConn {
|
||||||
|
|
||||||
let mut bytes = vec![prio, path.as_bytes().len() as u8];
|
let mut bytes = vec![prio, path.as_bytes().len() as u8];
|
||||||
bytes.extend_from_slice(path.as_bytes());
|
bytes.extend_from_slice(path.as_bytes());
|
||||||
bytes.extend_from_slice(&rmp_to_vec_all_named(&rq)?[..]);
|
bytes.extend_from_slice(&rmp_to_vec_all_named(rq)?[..]);
|
||||||
|
|
||||||
let (resp_send, resp_recv) = oneshot::channel();
|
let (resp_send, resp_recv) = oneshot::channel();
|
||||||
let old = self.inflight.lock().unwrap().insert(id, resp_send);
|
let old = self.inflight.lock().unwrap().insert(id, resp_send);
|
||||||
|
|
|
@ -26,7 +26,7 @@ pub trait EndpointHandler<M>: Send + Sync
|
||||||
where
|
where
|
||||||
M: Message,
|
M: Message,
|
||||||
{
|
{
|
||||||
async fn handle(self: &Arc<Self>, m: M, from: NodeID) -> M::Response;
|
async fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> M::Response;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// If one simply wants to use an endpoint in a client fashion,
|
/// If one simply wants to use an endpoint in a client fashion,
|
||||||
|
@ -35,7 +35,7 @@ where
|
||||||
/// it will panic if it is ever made to handle request.
|
/// it will panic if it is ever made to handle request.
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<M: Message + 'static> EndpointHandler<M> for () {
|
impl<M: Message + 'static> EndpointHandler<M> for () {
|
||||||
async fn handle(self: &Arc<()>, _m: M, _from: NodeID) -> M::Response {
|
async fn handle(self: &Arc<()>, _m: &M, _from: NodeID) -> M::Response {
|
||||||
panic!("This endpoint should not have a local handler.");
|
panic!("This endpoint should not have a local handler.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -86,7 +86,7 @@ where
|
||||||
pub async fn call(
|
pub async fn call(
|
||||||
&self,
|
&self,
|
||||||
target: &NodeID,
|
target: &NodeID,
|
||||||
req: M,
|
req: &M,
|
||||||
prio: RequestPriority,
|
prio: RequestPriority,
|
||||||
) -> Result<<M as Message>::Response, Error> {
|
) -> Result<<M as Message>::Response, Error> {
|
||||||
if *target == self.netapp.id {
|
if *target == self.netapp.id {
|
||||||
|
@ -141,7 +141,7 @@ where
|
||||||
None => Err(Error::NoHandler),
|
None => Err(Error::NoHandler),
|
||||||
Some(h) => {
|
Some(h) => {
|
||||||
let req = rmp_serde::decode::from_read_ref::<_, M>(buf)?;
|
let req = rmp_serde::decode::from_read_ref::<_, M>(buf)?;
|
||||||
let res = h.handle(req, from).await;
|
let res = h.handle(&req, from).await;
|
||||||
let res_bytes = rmp_to_vec_all_named(&res)?;
|
let res_bytes = rmp_to_vec_all_named(&res)?;
|
||||||
Ok(res_bytes)
|
Ok(res_bytes)
|
||||||
}
|
}
|
||||||
|
|
|
@ -366,7 +366,7 @@ impl NetApp {
|
||||||
hello_endpoint
|
hello_endpoint
|
||||||
.call(
|
.call(
|
||||||
&conn.peer_id,
|
&conn.peer_id,
|
||||||
HelloMessage {
|
&HelloMessage {
|
||||||
server_addr,
|
server_addr,
|
||||||
server_port,
|
server_port,
|
||||||
},
|
},
|
||||||
|
@ -401,7 +401,7 @@ impl NetApp {
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler<HelloMessage> for NetApp {
|
impl EndpointHandler<HelloMessage> for NetApp {
|
||||||
async fn handle(self: &Arc<Self>, msg: HelloMessage, from: NodeID) {
|
async fn handle(self: &Arc<Self>, msg: &HelloMessage, from: NodeID) {
|
||||||
if let Some(h) = self.on_connected_handler.load().as_ref() {
|
if let Some(h) = self.on_connected_handler.load().as_ref() {
|
||||||
if let Some(c) = self.server_conns.read().unwrap().get(&from) {
|
if let Some(c) = self.server_conns.read().unwrap().get(&from) {
|
||||||
let remote_ip = msg.server_addr.unwrap_or_else(|| c.remote_addr.ip());
|
let remote_ip = msg.server_addr.unwrap_or_else(|| c.remote_addr.ip());
|
||||||
|
|
|
@ -331,7 +331,7 @@ impl Basalt {
|
||||||
async fn do_pull(self: Arc<Self>, peer: NodeID) {
|
async fn do_pull(self: Arc<Self>, peer: NodeID) {
|
||||||
match self
|
match self
|
||||||
.pull_endpoint
|
.pull_endpoint
|
||||||
.call(&peer, PullMessage {}, PRIO_NORMAL)
|
.call(&peer, &PullMessage {}, PRIO_NORMAL)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(resp) => {
|
Ok(resp) => {
|
||||||
|
@ -346,7 +346,7 @@ impl Basalt {
|
||||||
|
|
||||||
async fn do_push(self: Arc<Self>, peer: NodeID) {
|
async fn do_push(self: Arc<Self>, peer: NodeID) {
|
||||||
let push_msg = self.make_push_message();
|
let push_msg = self.make_push_message();
|
||||||
match self.push_endpoint.call(&peer, push_msg, PRIO_NORMAL).await {
|
match self.push_endpoint.call(&peer, &push_msg, PRIO_NORMAL).await {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
trace!("KYEV PEXo {}", hex::encode(peer));
|
trace!("KYEV PEXo {}", hex::encode(peer));
|
||||||
}
|
}
|
||||||
|
@ -468,14 +468,14 @@ impl Basalt {
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler<PullMessage> for Basalt {
|
impl EndpointHandler<PullMessage> for Basalt {
|
||||||
async fn handle(self: &Arc<Self>, _pullmsg: PullMessage, _from: NodeID) -> PushMessage {
|
async fn handle(self: &Arc<Self>, _pullmsg: &PullMessage, _from: NodeID) -> PushMessage {
|
||||||
self.make_push_message()
|
self.make_push_message()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler<PushMessage> for Basalt {
|
impl EndpointHandler<PushMessage> for Basalt {
|
||||||
async fn handle(self: &Arc<Self>, pushmsg: PushMessage, _from: NodeID) {
|
async fn handle(self: &Arc<Self>, pushmsg: &PushMessage, _from: NodeID) {
|
||||||
self.handle_peer_list(&pushmsg.peers[..]);
|
self.handle_peer_list(&pushmsg.peers[..]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -329,7 +329,7 @@ impl FullMeshPeeringStrategy {
|
||||||
hex::encode(id),
|
hex::encode(id),
|
||||||
ping_time
|
ping_time
|
||||||
);
|
);
|
||||||
match self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH).await {
|
match self.ping_endpoint.call(&id, &ping_msg, PRIO_HIGH).await {
|
||||||
Err(e) => warn!("Error pinging {}: {}", hex::encode(id), e),
|
Err(e) => warn!("Error pinging {}: {}", hex::encode(id), e),
|
||||||
Ok(ping_resp) => {
|
Ok(ping_resp) => {
|
||||||
let resp_time = Instant::now();
|
let resp_time = Instant::now();
|
||||||
|
@ -361,7 +361,7 @@ impl FullMeshPeeringStrategy {
|
||||||
let pex_message = PeerListMessage { list: peer_list };
|
let pex_message = PeerListMessage { list: peer_list };
|
||||||
match self
|
match self
|
||||||
.peer_list_endpoint
|
.peer_list_endpoint
|
||||||
.call(id, pex_message, PRIO_BACKGROUND)
|
.call(id, &pex_message, PRIO_BACKGROUND)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Err(e) => warn!("Error doing peer exchange: {}", e),
|
Err(e) => warn!("Error doing peer exchange: {}", e),
|
||||||
|
@ -451,7 +451,7 @@ impl FullMeshPeeringStrategy {
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler<PingMessage> for FullMeshPeeringStrategy {
|
impl EndpointHandler<PingMessage> for FullMeshPeeringStrategy {
|
||||||
async fn handle(self: &Arc<Self>, ping: PingMessage, from: NodeID) -> PingMessage {
|
async fn handle(self: &Arc<Self>, ping: &PingMessage, from: NodeID) -> PingMessage {
|
||||||
let ping_resp = PingMessage {
|
let ping_resp = PingMessage {
|
||||||
id: ping.id,
|
id: ping.id,
|
||||||
peer_list_hash: self.known_hosts.read().unwrap().hash,
|
peer_list_hash: self.known_hosts.read().unwrap().hash,
|
||||||
|
@ -465,7 +465,7 @@ impl EndpointHandler<PingMessage> for FullMeshPeeringStrategy {
|
||||||
impl EndpointHandler<PeerListMessage> for FullMeshPeeringStrategy {
|
impl EndpointHandler<PeerListMessage> for FullMeshPeeringStrategy {
|
||||||
async fn handle(
|
async fn handle(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
peer_list: PeerListMessage,
|
peer_list: &PeerListMessage,
|
||||||
_from: NodeID,
|
_from: NodeID,
|
||||||
) -> PeerListMessage {
|
) -> PeerListMessage {
|
||||||
self.handle_peer_list(&peer_list.list[..]);
|
self.handle_peer_list(&peer_list.list[..]);
|
||||||
|
|
Loading…
Reference in a new issue