Some more basic work
This commit is contained in:
parent
7102db1d54
commit
1a5e6e39af
10 changed files with 417 additions and 67 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -1,2 +1,3 @@
|
|||
/target
|
||||
/tmp
|
||||
**/*.rs.bk
|
||||
|
|
73
Cargo.lock
generated
73
Cargo.lock
generated
|
@ -240,13 +240,26 @@ dependencies = [
|
|||
"futures-channel 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"hyper 0.13.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"kv 0.20.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"structopt 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"toml 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "getrandom"
|
||||
version = "0.1.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"wasi 0.9.0+wasi-snapshot-preview1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -283,6 +296,11 @@ dependencies = [
|
|||
"libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hex"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "http"
|
||||
version = "0.2.1"
|
||||
|
@ -551,6 +569,11 @@ name = "pin-utils"
|
|||
version = "0.1.0-alpha.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "ppv-lite86"
|
||||
version = "0.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-error"
|
||||
version = "0.4.12"
|
||||
|
@ -601,6 +624,43 @@ dependencies = [
|
|||
"proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rand_chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rand_hc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_chacha"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"ppv-lite86 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_core"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_hc"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.1.56"
|
||||
|
@ -885,6 +945,11 @@ dependencies = [
|
|||
"try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasi"
|
||||
version = "0.9.0+wasi-snapshot-preview1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "winapi"
|
||||
version = "0.2.8"
|
||||
|
@ -952,9 +1017,11 @@ dependencies = [
|
|||
"checksum futures-task 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "7b0a34e53cf6cdcd0178aa573aed466b646eb3db769570841fda0c7ede375a27"
|
||||
"checksum futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "22766cf25d64306bedf0384da004d05c9974ab104fcc4528f1236181c18004c5"
|
||||
"checksum fxhash 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
|
||||
"checksum getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb"
|
||||
"checksum h2 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "377038bf3c89d18d6ca1431e7a5027194fbd724ca10592b9487ede5e8e144f42"
|
||||
"checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205"
|
||||
"checksum hermit-abi 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "725cf19794cf90aa94e65050cb4191ff5d8fa87a498383774c47b332e3af952e"
|
||||
"checksum hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77"
|
||||
"checksum http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9"
|
||||
"checksum http-body 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b"
|
||||
"checksum httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9"
|
||||
|
@ -985,12 +1052,17 @@ dependencies = [
|
|||
"checksum pin-project-internal 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "385322a45f2ecf3410c68d2a549a4a2685e8051d0f278e39743ff4e451cb9b3f"
|
||||
"checksum pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae"
|
||||
"checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587"
|
||||
"checksum ppv-lite86 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "74490b50b9fbe561ac330df47c08f3f33073d2d00c150f719147d7c54522fa1b"
|
||||
"checksum proc-macro-error 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7"
|
||||
"checksum proc-macro-error-attr 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de"
|
||||
"checksum proc-macro-hack 0.5.15 (registry+https://github.com/rust-lang/crates.io-index)" = "0d659fe7c6d27f25e9d80a1a094c223f5246f6a6596453e09d7229bf42750b63"
|
||||
"checksum proc-macro-nested 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694"
|
||||
"checksum proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)" = "df246d292ff63439fea9bc8c0a270bed0e390d5ebd4db4ba15aba81111b5abe3"
|
||||
"checksum quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2bdc6c187c65bca4260c9011c9e3132efe4909da44726bad24cf7572ae338d7f"
|
||||
"checksum rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
|
||||
"checksum rand_chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
|
||||
"checksum rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
|
||||
"checksum rand_hc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
|
||||
"checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84"
|
||||
"checksum rmp 0.8.9 (registry+https://github.com/rust-lang/crates.io-index)" = "0f10b46df14cf1ee1ac7baa4d2fbc2c52c0622a4b82fa8740e37bc452ac0184f"
|
||||
"checksum rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4c1ee98f14fe8b8e9c5ea13d25da7b2a1796169202c57a09d7288de90d56222b"
|
||||
|
@ -1023,6 +1095,7 @@ dependencies = [
|
|||
"checksum unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c"
|
||||
"checksum version_check 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "078775d0255232fb988e6fccf26ddc9d1ac274299aaedcedce21c6f72cc533ce"
|
||||
"checksum want 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
|
||||
"checksum wasi 0.9.0+wasi-snapshot-preview1 (registry+https://github.com/rust-lang/crates.io-index)" = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
|
||||
"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
|
||||
"checksum winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "8093091eeb260906a183e6ae1abdba2ef5ef2257a21801128899c3fc699229c6"
|
||||
"checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
|
||||
|
|
|
@ -9,8 +9,8 @@ edition = "2018"
|
|||
[dependencies]
|
||||
bytes = "0.4"
|
||||
http = "0.2"
|
||||
hyper = "0.13.4"
|
||||
kv = "0.20.2"
|
||||
hyper = "0.13"
|
||||
kv = "0.20"
|
||||
futures = "0.3"
|
||||
futures-core = "0.3"
|
||||
futures-channel = "0.3"
|
||||
|
@ -20,4 +20,7 @@ serde = { version = "1.0", features = ["derive"] }
|
|||
bincode = "1.2.1"
|
||||
err-derive = "0.2.3"
|
||||
rmp-serde = "0.14.3"
|
||||
toml = "0.5"
|
||||
structopt = { version = "0.3", default-features = false }
|
||||
rand = "0.7"
|
||||
hex = "0.3"
|
||||
|
|
23
src/api.rs
23
src/api.rs
|
@ -1,14 +1,16 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use futures_util::TryStreamExt;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||
use futures::future::Future;
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::System;
|
||||
use crate::membership::System;
|
||||
|
||||
/// This is our service handler. It receives a Request, routes on its
|
||||
/// path, and returns a Future of a Response.
|
||||
async fn echo(req: Request<Body>) -> Result<Response<Body>, Error> {
|
||||
async fn handler(sys: Arc<System>, req: Request<Body>) -> Result<Response<Body>, Error> {
|
||||
match (req.method(), req.uri().path()) {
|
||||
// Serve some instructions at /
|
||||
(&Method::GET, "/") => Ok(Response::new(Body::from(
|
||||
|
@ -51,15 +53,24 @@ async fn echo(req: Request<Body>) -> Result<Response<Body>, Error> {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn run_api_server(sys: &System, api_port: u16, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> {
|
||||
let addr = ([0, 0, 0, 0], api_port).into();
|
||||
pub async fn run_api_server(sys: Arc<System>, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> {
|
||||
let addr = ([0, 0, 0, 0], sys.config.api_port).into();
|
||||
|
||||
let service = make_service_fn(|_| async { Ok::<_, Error>(service_fn(echo)) });
|
||||
let service = make_service_fn(|_| {
|
||||
let sys = sys.clone();
|
||||
async move {
|
||||
let sys = sys.clone();
|
||||
Ok::<_, Error>(service_fn(move |req: Request<Body>| {
|
||||
let sys = sys.clone();
|
||||
handler(sys, req)
|
||||
}))
|
||||
}
|
||||
});
|
||||
|
||||
let server = Server::bind(&addr).serve(service);
|
||||
|
||||
let graceful = server.with_graceful_shutdown(shutdown_signal);
|
||||
println!("Listening on http://{}", addr);
|
||||
println!("API server listening on http://{}", addr);
|
||||
|
||||
graceful.await
|
||||
}
|
||||
|
|
23
src/data.rs
23
src/data.rs
|
@ -1,31 +1,8 @@
|
|||
use std::net::SocketAddr;
|
||||
|
||||
use serde::{Serialize, Deserialize};
|
||||
|
||||
pub type UUID = [u8; 32];
|
||||
pub type Hash = [u8; 32];
|
||||
|
||||
// Membership management
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct NodeStatus {
|
||||
id: UUID,
|
||||
time: u64,
|
||||
addr: SocketAddr,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct NodeConfig {
|
||||
id: UUID,
|
||||
n_tokens: u32,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct NetworkMembers {
|
||||
pings: Vec<NodeStatus>,
|
||||
desired_state: Vec<NodeConfig>,
|
||||
desired_state_version: u64,
|
||||
}
|
||||
|
||||
// Data management
|
||||
|
||||
|
|
14
src/error.rs
14
src/error.rs
|
@ -9,11 +9,23 @@ pub enum Error {
|
|||
#[error(display = "Hyper error")]
|
||||
Hyper(#[error(source)] hyper::Error),
|
||||
|
||||
#[error(display = "HTTP error")]
|
||||
HTTP(#[error(source)] http::Error),
|
||||
|
||||
#[error(display = "Messagepack encode error")]
|
||||
RMPEncode(#[error(source)] rmp_serde::encode::Error),
|
||||
#[error(display = "Messagepack decode error")]
|
||||
RMPDecode(#[error(source)] rmp_serde::decode::Error),
|
||||
|
||||
#[error(display = "TOML decode error")]
|
||||
TomlDecode(#[error(source)] toml::de::Error),
|
||||
|
||||
#[error(display = "Timeout")]
|
||||
RPCTimeout(#[error(source)] tokio::time::Elapsed),
|
||||
|
||||
#[error(display = "RPC error")]
|
||||
RPCError(String),
|
||||
|
||||
#[error(display = "")]
|
||||
Msg(String),
|
||||
Message(String),
|
||||
}
|
||||
|
|
88
src/main.rs
88
src/main.rs
|
@ -1,33 +1,73 @@
|
|||
mod error;
|
||||
mod data;
|
||||
mod proto;
|
||||
mod membership;
|
||||
mod rpc;
|
||||
mod api;
|
||||
|
||||
use std::io::{Read, Write};
|
||||
use std::sync::Arc;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use structopt::StructOpt;
|
||||
use futures::channel::oneshot;
|
||||
use tokio::sync::Mutex;
|
||||
use hyper::client::Client;
|
||||
|
||||
use data::*;
|
||||
use serde::Deserialize;
|
||||
use rand::Rng;
|
||||
|
||||
use data::UUID;
|
||||
use error::Error;
|
||||
use membership::System;
|
||||
|
||||
#[derive(StructOpt, Debug)]
|
||||
#[structopt(name = "garage")]
|
||||
pub struct Opt {
|
||||
#[structopt(long = "api-port", default_value = "3900")]
|
||||
api_port: u16,
|
||||
|
||||
#[structopt(long = "rpc-port", default_value = "3901")]
|
||||
rpc_port: u16,
|
||||
#[structopt(short = "c", long = "config", default_value = "./config.toml")]
|
||||
config_file: PathBuf,
|
||||
}
|
||||
|
||||
pub struct System {
|
||||
pub opt: Opt,
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct Config {
|
||||
metadata_dir: PathBuf,
|
||||
data_dir: PathBuf,
|
||||
|
||||
pub rpc_client: Client<hyper::client::HttpConnector, hyper::Body>,
|
||||
api_port: u16,
|
||||
rpc_port: u16,
|
||||
|
||||
pub network_members: Mutex<NetworkMembers>,
|
||||
bootstrap_peers: Vec<SocketAddr>,
|
||||
}
|
||||
|
||||
fn read_config(config_file: PathBuf) -> Result<Config, Error> {
|
||||
let mut file = std::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(config_file.as_path())?;
|
||||
|
||||
let mut config = String::new();
|
||||
file.read_to_string(&mut config)?;
|
||||
|
||||
Ok(toml::from_str(&config)?)
|
||||
}
|
||||
|
||||
fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
|
||||
let mut id_file = metadata_dir.clone();
|
||||
id_file.push("node_id");
|
||||
if id_file.as_path().exists() {
|
||||
let mut f = std::fs::File::open(id_file.as_path())?;
|
||||
let mut d = vec![];
|
||||
f.read_to_end(&mut d)?;
|
||||
if d.len() != 32 {
|
||||
return Err(Error::Message(format!("Corrupt node_id file")))
|
||||
}
|
||||
|
||||
let mut id = [0u8; 32];
|
||||
id.copy_from_slice(&d[..]);
|
||||
Ok(id)
|
||||
} else {
|
||||
let id = rand::thread_rng().gen::<UUID>();
|
||||
|
||||
let mut f = std::fs::File::create(id_file.as_path())?;
|
||||
f.write_all(&id[..])?;
|
||||
Ok(id)
|
||||
}
|
||||
}
|
||||
|
||||
async fn shutdown_signal(chans: Vec<oneshot::Sender<()>>) {
|
||||
|
@ -47,22 +87,23 @@ async fn wait_from(chan: oneshot::Receiver<()>) -> () {
|
|||
#[tokio::main]
|
||||
async fn main() {
|
||||
let opt = Opt::from_args();
|
||||
let rpc_port = opt.rpc_port;
|
||||
let api_port = opt.api_port;
|
||||
let config = read_config(opt.config_file)
|
||||
.expect("Unable to read config file");
|
||||
|
||||
let sys = System{
|
||||
opt,
|
||||
rpc_client: Client::new(),
|
||||
network_members: Mutex::new(NetworkMembers::default()),
|
||||
};
|
||||
let id = gen_node_id(&config.metadata_dir)
|
||||
.expect("Unable to read or generate node ID");
|
||||
println!("Node ID: {}", hex::encode(id));
|
||||
|
||||
let sys = Arc::new(System::new(config, id));
|
||||
|
||||
let (tx1, rx1) = oneshot::channel();
|
||||
let (tx2, rx2) = oneshot::channel();
|
||||
|
||||
tokio::spawn(shutdown_signal(vec![tx1, tx2]));
|
||||
let rpc_server = rpc::run_rpc_server(sys.clone(), wait_from(rx1));
|
||||
let api_server = api::run_api_server(sys.clone(), wait_from(rx2));
|
||||
|
||||
let rpc_server = rpc::run_rpc_server(&sys, rpc_port, wait_from(rx1));
|
||||
let api_server = api::run_api_server(&sys, api_port, wait_from(rx2));
|
||||
tokio::spawn(shutdown_signal(vec![tx1, tx2]));
|
||||
tokio::spawn(membership::bootstrap(sys));
|
||||
|
||||
let (e1, e2) = futures::join![rpc_server, api_server];
|
||||
|
||||
|
@ -74,3 +115,4 @@ async fn main() {
|
|||
eprintln!("API server error: {}", e)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
86
src/membership.rs
Normal file
86
src/membership.rs
Normal file
|
@ -0,0 +1,86 @@
|
|||
use std::sync::Arc;
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use hyper::client::Client;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::Config;
|
||||
use crate::error::Error;
|
||||
use crate::data::*;
|
||||
use crate::proto::*;
|
||||
use crate::rpc::*;
|
||||
|
||||
const PING_INTERVAL: Duration = Duration::from_secs(10);
|
||||
const PING_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
const MAX_FAILED_PINGS: usize = 3;
|
||||
|
||||
pub struct System {
|
||||
pub config: Config,
|
||||
pub id: UUID,
|
||||
|
||||
pub rpc_client: Client<hyper::client::HttpConnector, hyper::Body>,
|
||||
|
||||
pub members: RwLock<Members>,
|
||||
}
|
||||
|
||||
pub struct Members {
|
||||
pub present: Vec<UUID>,
|
||||
pub status: HashMap<UUID, NodeStatus>,
|
||||
|
||||
pub config: HashMap<UUID, NodeConfig>,
|
||||
pub config_version: u64,
|
||||
}
|
||||
|
||||
pub struct NodeStatus {
|
||||
pub addr: SocketAddr,
|
||||
remaining_ping_attempts: usize,
|
||||
}
|
||||
|
||||
pub struct NodeConfig {
|
||||
pub n_tokens: u32,
|
||||
}
|
||||
|
||||
impl System {
|
||||
pub fn new(config: Config, id: UUID) -> Self {
|
||||
System{
|
||||
config,
|
||||
id,
|
||||
rpc_client: Client::new(),
|
||||
members: RwLock::new(Members{
|
||||
present: Vec::new(),
|
||||
status: HashMap::new(),
|
||||
config: HashMap::new(),
|
||||
config_version: 0,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn broadcast(&self) -> Vec<UUID> {
|
||||
self.members.read().await.present.clone()
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn bootstrap(system: Arc<System>) {
|
||||
rpc_call_many_addr(system.clone(),
|
||||
&system.config.bootstrap_peers,
|
||||
&Message::Ping(PingMessage{
|
||||
id: system.id,
|
||||
rpc_port: system.config.rpc_port,
|
||||
present_hash: [0u8; 32],
|
||||
config_version: 0,
|
||||
}),
|
||||
None,
|
||||
PING_TIMEOUT).await;
|
||||
|
||||
unimplemented!() //TODO
|
||||
}
|
||||
|
||||
pub async fn handle_ping(sys: Arc<System>, from: &SocketAddr, ping: &PingMessage) -> Result<Message, Error> {
|
||||
unimplemented!() //TODO
|
||||
}
|
||||
|
||||
pub async fn handle_advertise_node(sys: Arc<System>, ping: &AdvertiseNodeMessage) -> Result<Message, Error> {
|
||||
unimplemented!() //TODO
|
||||
}
|
20
src/proto.rs
20
src/proto.rs
|
@ -1,8 +1,28 @@
|
|||
use std::net::SocketAddr;
|
||||
use serde::{Serialize, Deserialize};
|
||||
|
||||
use crate::data::*;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum Message {
|
||||
Ok,
|
||||
Error(String),
|
||||
|
||||
Ping(PingMessage),
|
||||
AdvertiseNode(AdvertiseNodeMessage),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct PingMessage {
|
||||
pub id: UUID,
|
||||
pub rpc_port: u16,
|
||||
|
||||
pub present_hash: Hash,
|
||||
pub config_version: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct AdvertiseNodeMessage {
|
||||
pub id: UUID,
|
||||
pub addr: SocketAddr,
|
||||
}
|
||||
|
|
149
src/rpc.rs
149
src/rpc.rs
|
@ -1,42 +1,167 @@
|
|||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::IntoBuf;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::server::conn::AddrStream;
|
||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||
use futures::future::Future;
|
||||
use futures::stream::futures_unordered::FuturesUnordered;
|
||||
use futures::stream::StreamExt;
|
||||
|
||||
use crate::data::*;
|
||||
use crate::error::Error;
|
||||
use crate::proto::Message;
|
||||
use crate::System;
|
||||
use crate::membership::System;
|
||||
use crate::membership;
|
||||
|
||||
// ---- CLIENT PART ----
|
||||
|
||||
/// This is our service handler. It receives a Request, routes on its
|
||||
/// path, and returns a Future of a Response.
|
||||
async fn echo(req: Request<Body>) -> Result<Response<Body>, Error> {
|
||||
pub async fn rpc_call_many(sys: Arc<System>,
|
||||
to: &[UUID],
|
||||
msg: &Message,
|
||||
stop_after: Option<usize>,
|
||||
timeout: Duration)
|
||||
-> Vec<Result<Message, Error>>
|
||||
{
|
||||
let resp_stream = to.iter()
|
||||
.map(|to| rpc_call(sys.clone(), to, msg, timeout))
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
collect_rpc_results(resp_stream, stop_after).await
|
||||
}
|
||||
|
||||
pub async fn rpc_call_many_addr(sys: Arc<System>,
|
||||
to: &[SocketAddr],
|
||||
msg: &Message,
|
||||
stop_after: Option<usize>,
|
||||
timeout: Duration)
|
||||
-> Vec<Result<Message, Error>>
|
||||
{
|
||||
let resp_stream = to.iter()
|
||||
.map(|to| rpc_call_addr(sys.clone(), to, msg, timeout))
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
collect_rpc_results(resp_stream, stop_after).await
|
||||
}
|
||||
|
||||
async fn collect_rpc_results(mut resp_stream: FuturesUnordered<impl Future<Output=Result<Message, Error>>>,
|
||||
stop_after: Option<usize>)
|
||||
-> Vec<Result<Message, Error>>
|
||||
{
|
||||
let mut results = vec![];
|
||||
let mut n_ok = 0;
|
||||
while let Some(resp) = resp_stream.next().await {
|
||||
if resp.is_ok() {
|
||||
n_ok += 1
|
||||
}
|
||||
results.push(resp);
|
||||
if let Some(n) = stop_after {
|
||||
if n_ok >= n {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
results
|
||||
}
|
||||
|
||||
// ----
|
||||
|
||||
pub async fn rpc_call(sys: Arc<System>,
|
||||
to: &UUID,
|
||||
msg: &Message,
|
||||
timeout: Duration)
|
||||
-> Result<Message, Error>
|
||||
{
|
||||
let addr = {
|
||||
let members = sys.members.read().await;
|
||||
match members.status.get(to) {
|
||||
Some(status) => status.addr.clone(),
|
||||
None => return Err(Error::Message(format!("Peer ID not found"))),
|
||||
}
|
||||
};
|
||||
rpc_call_addr(sys, &addr, msg, timeout).await
|
||||
}
|
||||
|
||||
pub async fn rpc_call_addr(sys: Arc<System>,
|
||||
to_addr: &SocketAddr,
|
||||
msg: &Message,
|
||||
timeout: Duration)
|
||||
-> Result<Message, Error>
|
||||
{
|
||||
let uri = format!("http://{}/", to_addr);
|
||||
let req = Request::builder()
|
||||
.method(Method::POST)
|
||||
.uri(uri)
|
||||
.body(Body::from(rmp_serde::encode::to_vec_named(msg)?))?;
|
||||
|
||||
let resp_fut = sys.rpc_client.request(req);
|
||||
let resp = tokio::time::timeout(timeout, resp_fut).await??;
|
||||
|
||||
if resp.status() == StatusCode::OK {
|
||||
let body = hyper::body::to_bytes(resp.into_body()).await?;
|
||||
let msg = rmp_serde::decode::from_read::<_, Message>(body.into_buf())?;
|
||||
match msg {
|
||||
Message::Error(e) => Err(Error::RPCError(e)),
|
||||
x => Ok(x)
|
||||
}
|
||||
} else {
|
||||
Err(Error::RPCError(format!("Status code {}", resp.status())))
|
||||
}
|
||||
}
|
||||
|
||||
// ---- SERVER PART ----
|
||||
|
||||
fn err_to_msg(x: Result<Message, Error>) -> Message {
|
||||
match x {
|
||||
Err(e) => Message::Error(format!("{}", e)),
|
||||
Ok(msg) => msg,
|
||||
}
|
||||
}
|
||||
|
||||
async fn handler(sys: Arc<System>, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Error> {
|
||||
if req.method() != &Method::POST {
|
||||
let mut bad_request = Response::default();
|
||||
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
|
||||
return Ok(bad_request);
|
||||
}
|
||||
|
||||
|
||||
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
|
||||
let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf());
|
||||
let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf())?;
|
||||
|
||||
eprintln!("RPC from {}: {:?}", addr, msg);
|
||||
|
||||
let resp = err_to_msg(match &msg {
|
||||
Message::Ping(ping) => membership::handle_ping(sys, &addr, ping).await,
|
||||
Message::AdvertiseNode(adv) => membership::handle_advertise_node(sys, adv).await,
|
||||
_ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))),
|
||||
});
|
||||
|
||||
let resp = Message::Ok;
|
||||
Ok(Response::new(Body::from(
|
||||
rmp_serde::encode::to_vec_named(&resp)?
|
||||
)))
|
||||
}
|
||||
|
||||
|
||||
pub async fn run_rpc_server(sys: &System, rpc_port: u16, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> {
|
||||
let addr = ([0, 0, 0, 0], rpc_port).into();
|
||||
pub async fn run_rpc_server(sys: Arc<System>, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> {
|
||||
let bind_addr = ([0, 0, 0, 0], sys.config.rpc_port).into();
|
||||
|
||||
let service = make_service_fn(|_| async { Ok::<_, Error>(service_fn(echo)) });
|
||||
let service = make_service_fn(|conn: &AddrStream| {
|
||||
let client_addr = conn.remote_addr();
|
||||
let sys = sys.clone();
|
||||
async move {
|
||||
Ok::<_, Error>(service_fn(move |req: Request<Body>| {
|
||||
let sys = sys.clone();
|
||||
handler(sys, req, client_addr)
|
||||
}))
|
||||
}
|
||||
});
|
||||
|
||||
let server = Server::bind(&addr).serve(service);
|
||||
let server = Server::bind(&bind_addr).serve(service) ;
|
||||
|
||||
let graceful = server.with_graceful_shutdown(shutdown_signal);
|
||||
println!("Listening on http://{}", addr);
|
||||
println!("RPC server listening on http://{}", bind_addr);
|
||||
|
||||
graceful.await
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue