From 1a5e6e39af19d572a9de7f54e66bef911bdbbf2f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 6 Apr 2020 19:55:39 +0200 Subject: [PATCH] Some more basic work --- .gitignore | 1 + Cargo.lock | 73 +++++++++++++++++++++++ Cargo.toml | 7 ++- src/api.rs | 23 +++++-- src/data.rs | 23 ------- src/error.rs | 14 ++++- src/main.rs | 88 ++++++++++++++++++++------- src/membership.rs | 86 ++++++++++++++++++++++++++ src/proto.rs | 20 +++++++ src/rpc.rs | 149 ++++++++++++++++++++++++++++++++++++++++++---- 10 files changed, 417 insertions(+), 67 deletions(-) create mode 100644 src/membership.rs diff --git a/.gitignore b/.gitignore index 53eaa219..71111f3e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target +/tmp **/*.rs.bk diff --git a/Cargo.lock b/Cargo.lock index 69e1f40f..de2ef8fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -240,13 +240,26 @@ dependencies = [ "futures-channel 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.13.4 (registry+https://github.com/rust-lang/crates.io-index)", "kv 0.20.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", "rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)", "structopt 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)", + "toml 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "getrandom" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)", + "wasi 0.9.0+wasi-snapshot-preview1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -283,6 +296,11 @@ dependencies = [ "libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "hex" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "http" version = "0.2.1" @@ -551,6 +569,11 @@ name = "pin-utils" version = "0.1.0-alpha.4" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "ppv-lite86" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "proc-macro-error" version = "0.4.12" @@ -601,6 +624,43 @@ dependencies = [ "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_hc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "ppv-lite86 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "redox_syscall" version = "0.1.56" @@ -885,6 +945,11 @@ dependencies = [ "try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "winapi" version = "0.2.8" @@ -952,9 +1017,11 @@ dependencies = [ "checksum futures-task 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "7b0a34e53cf6cdcd0178aa573aed466b646eb3db769570841fda0c7ede375a27" "checksum futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "22766cf25d64306bedf0384da004d05c9974ab104fcc4528f1236181c18004c5" "checksum fxhash 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +"checksum getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" "checksum h2 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "377038bf3c89d18d6ca1431e7a5027194fbd724ca10592b9487ede5e8e144f42" "checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205" "checksum hermit-abi 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "725cf19794cf90aa94e65050cb4191ff5d8fa87a498383774c47b332e3af952e" +"checksum hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77" "checksum http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9" "checksum http-body 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b" "checksum httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9" @@ -985,12 +1052,17 @@ dependencies = [ "checksum pin-project-internal 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "385322a45f2ecf3410c68d2a549a4a2685e8051d0f278e39743ff4e451cb9b3f" "checksum pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae" "checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587" +"checksum ppv-lite86 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "74490b50b9fbe561ac330df47c08f3f33073d2d00c150f719147d7c54522fa1b" "checksum proc-macro-error 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7" "checksum proc-macro-error-attr 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de" "checksum proc-macro-hack 0.5.15 (registry+https://github.com/rust-lang/crates.io-index)" = "0d659fe7c6d27f25e9d80a1a094c223f5246f6a6596453e09d7229bf42750b63" "checksum proc-macro-nested 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694" "checksum proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)" = "df246d292ff63439fea9bc8c0a270bed0e390d5ebd4db4ba15aba81111b5abe3" "checksum quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2bdc6c187c65bca4260c9011c9e3132efe4909da44726bad24cf7572ae338d7f" +"checksum rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +"checksum rand_chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +"checksum rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +"checksum rand_hc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" "checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" "checksum rmp 0.8.9 (registry+https://github.com/rust-lang/crates.io-index)" = "0f10b46df14cf1ee1ac7baa4d2fbc2c52c0622a4b82fa8740e37bc452ac0184f" "checksum rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4c1ee98f14fe8b8e9c5ea13d25da7b2a1796169202c57a09d7288de90d56222b" @@ -1023,6 +1095,7 @@ dependencies = [ "checksum unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" "checksum version_check 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "078775d0255232fb988e6fccf26ddc9d1ac274299aaedcedce21c6f72cc533ce" "checksum want 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +"checksum wasi 0.9.0+wasi-snapshot-preview1 (registry+https://github.com/rust-lang/crates.io-index)" = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" "checksum winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "8093091eeb260906a183e6ae1abdba2ef5ef2257a21801128899c3fc699229c6" "checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" diff --git a/Cargo.toml b/Cargo.toml index 8b170a3a..8381af3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,8 +9,8 @@ edition = "2018" [dependencies] bytes = "0.4" http = "0.2" -hyper = "0.13.4" -kv = "0.20.2" +hyper = "0.13" +kv = "0.20" futures = "0.3" futures-core = "0.3" futures-channel = "0.3" @@ -20,4 +20,7 @@ serde = { version = "1.0", features = ["derive"] } bincode = "1.2.1" err-derive = "0.2.3" rmp-serde = "0.14.3" +toml = "0.5" structopt = { version = "0.3", default-features = false } +rand = "0.7" +hex = "0.3" diff --git a/src/api.rs b/src/api.rs index 9ee17047..cf70dbdf 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,14 +1,16 @@ +use std::sync::Arc; + use futures_util::TryStreamExt; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use futures::future::Future; use crate::error::Error; -use crate::System; +use crate::membership::System; /// This is our service handler. It receives a Request, routes on its /// path, and returns a Future of a Response. -async fn echo(req: Request) -> Result, Error> { +async fn handler(sys: Arc, req: Request) -> Result, Error> { match (req.method(), req.uri().path()) { // Serve some instructions at / (&Method::GET, "/") => Ok(Response::new(Body::from( @@ -51,15 +53,24 @@ async fn echo(req: Request) -> Result, Error> { } } -pub async fn run_api_server(sys: &System, api_port: u16, shutdown_signal: impl Future) -> Result<(), hyper::Error> { - let addr = ([0, 0, 0, 0], api_port).into(); +pub async fn run_api_server(sys: Arc, shutdown_signal: impl Future) -> Result<(), hyper::Error> { + let addr = ([0, 0, 0, 0], sys.config.api_port).into(); - let service = make_service_fn(|_| async { Ok::<_, Error>(service_fn(echo)) }); + let service = make_service_fn(|_| { + let sys = sys.clone(); + async move { + let sys = sys.clone(); + Ok::<_, Error>(service_fn(move |req: Request| { + let sys = sys.clone(); + handler(sys, req) + })) + } + }); let server = Server::bind(&addr).serve(service); let graceful = server.with_graceful_shutdown(shutdown_signal); - println!("Listening on http://{}", addr); + println!("API server listening on http://{}", addr); graceful.await } diff --git a/src/data.rs b/src/data.rs index 651a9d45..e26a38f0 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,31 +1,8 @@ -use std::net::SocketAddr; - use serde::{Serialize, Deserialize}; pub type UUID = [u8; 32]; pub type Hash = [u8; 32]; -// Membership management - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NodeStatus { - id: UUID, - time: u64, - addr: SocketAddr, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NodeConfig { - id: UUID, - n_tokens: u32, -} - -#[derive(Default, Debug, Clone, Serialize, Deserialize)] -pub struct NetworkMembers { - pings: Vec, - desired_state: Vec, - desired_state_version: u64, -} // Data management diff --git a/src/error.rs b/src/error.rs index 9929a896..1e611adb 100644 --- a/src/error.rs +++ b/src/error.rs @@ -9,11 +9,23 @@ pub enum Error { #[error(display = "Hyper error")] Hyper(#[error(source)] hyper::Error), + #[error(display = "HTTP error")] + HTTP(#[error(source)] http::Error), + #[error(display = "Messagepack encode error")] RMPEncode(#[error(source)] rmp_serde::encode::Error), #[error(display = "Messagepack decode error")] RMPDecode(#[error(source)] rmp_serde::decode::Error), + #[error(display = "TOML decode error")] + TomlDecode(#[error(source)] toml::de::Error), + + #[error(display = "Timeout")] + RPCTimeout(#[error(source)] tokio::time::Elapsed), + + #[error(display = "RPC error")] + RPCError(String), + #[error(display = "")] - Msg(String), + Message(String), } diff --git a/src/main.rs b/src/main.rs index 711717d7..4448d535 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,33 +1,73 @@ mod error; mod data; mod proto; +mod membership; mod rpc; mod api; +use std::io::{Read, Write}; +use std::sync::Arc; +use std::net::SocketAddr; +use std::path::PathBuf; use structopt::StructOpt; use futures::channel::oneshot; -use tokio::sync::Mutex; -use hyper::client::Client; - -use data::*; +use serde::Deserialize; +use rand::Rng; +use data::UUID; +use error::Error; +use membership::System; #[derive(StructOpt, Debug)] #[structopt(name = "garage")] pub struct Opt { - #[structopt(long = "api-port", default_value = "3900")] - api_port: u16, - - #[structopt(long = "rpc-port", default_value = "3901")] - rpc_port: u16, + #[structopt(short = "c", long = "config", default_value = "./config.toml")] + config_file: PathBuf, } -pub struct System { - pub opt: Opt, +#[derive(Deserialize, Debug)] +pub struct Config { + metadata_dir: PathBuf, + data_dir: PathBuf, - pub rpc_client: Client, + api_port: u16, + rpc_port: u16, - pub network_members: Mutex, + bootstrap_peers: Vec, +} + +fn read_config(config_file: PathBuf) -> Result { + let mut file = std::fs::OpenOptions::new() + .read(true) + .open(config_file.as_path())?; + + let mut config = String::new(); + file.read_to_string(&mut config)?; + + Ok(toml::from_str(&config)?) +} + +fn gen_node_id(metadata_dir: &PathBuf) -> Result { + let mut id_file = metadata_dir.clone(); + id_file.push("node_id"); + if id_file.as_path().exists() { + let mut f = std::fs::File::open(id_file.as_path())?; + let mut d = vec![]; + f.read_to_end(&mut d)?; + if d.len() != 32 { + return Err(Error::Message(format!("Corrupt node_id file"))) + } + + let mut id = [0u8; 32]; + id.copy_from_slice(&d[..]); + Ok(id) + } else { + let id = rand::thread_rng().gen::(); + + let mut f = std::fs::File::create(id_file.as_path())?; + f.write_all(&id[..])?; + Ok(id) + } } async fn shutdown_signal(chans: Vec>) { @@ -47,22 +87,23 @@ async fn wait_from(chan: oneshot::Receiver<()>) -> () { #[tokio::main] async fn main() { let opt = Opt::from_args(); - let rpc_port = opt.rpc_port; - let api_port = opt.api_port; + let config = read_config(opt.config_file) + .expect("Unable to read config file"); - let sys = System{ - opt, - rpc_client: Client::new(), - network_members: Mutex::new(NetworkMembers::default()), - }; + let id = gen_node_id(&config.metadata_dir) + .expect("Unable to read or generate node ID"); + println!("Node ID: {}", hex::encode(id)); + + let sys = Arc::new(System::new(config, id)); let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); - tokio::spawn(shutdown_signal(vec![tx1, tx2])); + let rpc_server = rpc::run_rpc_server(sys.clone(), wait_from(rx1)); + let api_server = api::run_api_server(sys.clone(), wait_from(rx2)); - let rpc_server = rpc::run_rpc_server(&sys, rpc_port, wait_from(rx1)); - let api_server = api::run_api_server(&sys, api_port, wait_from(rx2)); + tokio::spawn(shutdown_signal(vec![tx1, tx2])); + tokio::spawn(membership::bootstrap(sys)); let (e1, e2) = futures::join![rpc_server, api_server]; @@ -74,3 +115,4 @@ async fn main() { eprintln!("API server error: {}", e) } } + diff --git a/src/membership.rs b/src/membership.rs new file mode 100644 index 00000000..c14d370b --- /dev/null +++ b/src/membership.rs @@ -0,0 +1,86 @@ +use std::sync::Arc; +use std::collections::HashMap; +use std::time::Duration; +use std::net::SocketAddr; + +use hyper::client::Client; +use tokio::sync::RwLock; + +use crate::Config; +use crate::error::Error; +use crate::data::*; +use crate::proto::*; +use crate::rpc::*; + +const PING_INTERVAL: Duration = Duration::from_secs(10); +const PING_TIMEOUT: Duration = Duration::from_secs(2); +const MAX_FAILED_PINGS: usize = 3; + +pub struct System { + pub config: Config, + pub id: UUID, + + pub rpc_client: Client, + + pub members: RwLock, +} + +pub struct Members { + pub present: Vec, + pub status: HashMap, + + pub config: HashMap, + pub config_version: u64, +} + +pub struct NodeStatus { + pub addr: SocketAddr, + remaining_ping_attempts: usize, +} + +pub struct NodeConfig { + pub n_tokens: u32, +} + +impl System { + pub fn new(config: Config, id: UUID) -> Self { + System{ + config, + id, + rpc_client: Client::new(), + members: RwLock::new(Members{ + present: Vec::new(), + status: HashMap::new(), + config: HashMap::new(), + config_version: 0, + }), + } + } + + pub async fn broadcast(&self) -> Vec { + self.members.read().await.present.clone() + } +} + +pub async fn bootstrap(system: Arc) { + rpc_call_many_addr(system.clone(), + &system.config.bootstrap_peers, + &Message::Ping(PingMessage{ + id: system.id, + rpc_port: system.config.rpc_port, + present_hash: [0u8; 32], + config_version: 0, + }), + None, + PING_TIMEOUT).await; + + unimplemented!() //TODO +} + +pub async fn handle_ping(sys: Arc, from: &SocketAddr, ping: &PingMessage) -> Result { + unimplemented!() //TODO +} + +pub async fn handle_advertise_node(sys: Arc, ping: &AdvertiseNodeMessage) -> Result { + unimplemented!() //TODO +} diff --git a/src/proto.rs b/src/proto.rs index 029a58df..6cb12598 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -1,8 +1,28 @@ +use std::net::SocketAddr; use serde::{Serialize, Deserialize}; +use crate::data::*; + #[derive(Debug, Serialize, Deserialize)] pub enum Message { Ok, Error(String), + Ping(PingMessage), + AdvertiseNode(AdvertiseNodeMessage), +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct PingMessage { + pub id: UUID, + pub rpc_port: u16, + + pub present_hash: Hash, + pub config_version: u64, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct AdvertiseNodeMessage { + pub id: UUID, + pub addr: SocketAddr, } diff --git a/src/rpc.rs b/src/rpc.rs index 314b64a9..4ba32c4c 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,42 +1,167 @@ +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + use bytes::IntoBuf; use hyper::service::{make_service_fn, service_fn}; +use hyper::server::conn::AddrStream; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use futures::future::Future; +use futures::stream::futures_unordered::FuturesUnordered; +use futures::stream::StreamExt; +use crate::data::*; use crate::error::Error; use crate::proto::Message; -use crate::System; +use crate::membership::System; +use crate::membership; +// ---- CLIENT PART ---- -/// This is our service handler. It receives a Request, routes on its -/// path, and returns a Future of a Response. -async fn echo(req: Request) -> Result, Error> { +pub async fn rpc_call_many(sys: Arc, + to: &[UUID], + msg: &Message, + stop_after: Option, + timeout: Duration) + -> Vec> +{ + let resp_stream = to.iter() + .map(|to| rpc_call(sys.clone(), to, msg, timeout)) + .collect::>(); + + collect_rpc_results(resp_stream, stop_after).await +} + +pub async fn rpc_call_many_addr(sys: Arc, + to: &[SocketAddr], + msg: &Message, + stop_after: Option, + timeout: Duration) + -> Vec> +{ + let resp_stream = to.iter() + .map(|to| rpc_call_addr(sys.clone(), to, msg, timeout)) + .collect::>(); + + collect_rpc_results(resp_stream, stop_after).await +} + +async fn collect_rpc_results(mut resp_stream: FuturesUnordered>>, + stop_after: Option) + -> Vec> +{ + let mut results = vec![]; + let mut n_ok = 0; + while let Some(resp) = resp_stream.next().await { + if resp.is_ok() { + n_ok += 1 + } + results.push(resp); + if let Some(n) = stop_after { + if n_ok >= n { + break + } + } + } + results +} + +// ---- + +pub async fn rpc_call(sys: Arc, + to: &UUID, + msg: &Message, + timeout: Duration) + -> Result +{ + let addr = { + let members = sys.members.read().await; + match members.status.get(to) { + Some(status) => status.addr.clone(), + None => return Err(Error::Message(format!("Peer ID not found"))), + } + }; + rpc_call_addr(sys, &addr, msg, timeout).await +} + +pub async fn rpc_call_addr(sys: Arc, + to_addr: &SocketAddr, + msg: &Message, + timeout: Duration) + -> Result +{ + let uri = format!("http://{}/", to_addr); + let req = Request::builder() + .method(Method::POST) + .uri(uri) + .body(Body::from(rmp_serde::encode::to_vec_named(msg)?))?; + + let resp_fut = sys.rpc_client.request(req); + let resp = tokio::time::timeout(timeout, resp_fut).await??; + + if resp.status() == StatusCode::OK { + let body = hyper::body::to_bytes(resp.into_body()).await?; + let msg = rmp_serde::decode::from_read::<_, Message>(body.into_buf())?; + match msg { + Message::Error(e) => Err(Error::RPCError(e)), + x => Ok(x) + } + } else { + Err(Error::RPCError(format!("Status code {}", resp.status()))) + } +} + +// ---- SERVER PART ---- + +fn err_to_msg(x: Result) -> Message { + match x { + Err(e) => Message::Error(format!("{}", e)), + Ok(msg) => msg, + } +} + +async fn handler(sys: Arc, req: Request, addr: SocketAddr) -> Result, Error> { if req.method() != &Method::POST { let mut bad_request = Response::default(); *bad_request.status_mut() = StatusCode::BAD_REQUEST; return Ok(bad_request); } - let whole_body = hyper::body::to_bytes(req.into_body()).await?; - let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf()); + let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf())?; + + eprintln!("RPC from {}: {:?}", addr, msg); + + let resp = err_to_msg(match &msg { + Message::Ping(ping) => membership::handle_ping(sys, &addr, ping).await, + Message::AdvertiseNode(adv) => membership::handle_advertise_node(sys, adv).await, + _ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))), + }); - let resp = Message::Ok; Ok(Response::new(Body::from( rmp_serde::encode::to_vec_named(&resp)? ))) } -pub async fn run_rpc_server(sys: &System, rpc_port: u16, shutdown_signal: impl Future) -> Result<(), hyper::Error> { - let addr = ([0, 0, 0, 0], rpc_port).into(); +pub async fn run_rpc_server(sys: Arc, shutdown_signal: impl Future) -> Result<(), hyper::Error> { + let bind_addr = ([0, 0, 0, 0], sys.config.rpc_port).into(); - let service = make_service_fn(|_| async { Ok::<_, Error>(service_fn(echo)) }); + let service = make_service_fn(|conn: &AddrStream| { + let client_addr = conn.remote_addr(); + let sys = sys.clone(); + async move { + Ok::<_, Error>(service_fn(move |req: Request| { + let sys = sys.clone(); + handler(sys, req, client_addr) + })) + } + }); - let server = Server::bind(&addr).serve(service); + let server = Server::bind(&bind_addr).serve(service) ; let graceful = server.with_graceful_shutdown(shutdown_signal); - println!("Listening on http://{}", addr); + println!("RPC server listening on http://{}", bind_addr); graceful.await }