diff --git a/src/api.rs b/src/api_server.rs similarity index 100% rename from src/api.rs rename to src/api_server.rs diff --git a/src/main.rs b/src/main.rs index 05b0a73a..922e873e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,119 +2,163 @@ mod error; mod data; mod proto; mod membership; -mod rpc; -mod api; +mod server; +mod rpc_server; +mod rpc_client; +mod api_server; -use std::io::{Read, Write}; -use std::sync::Arc; +use std::collections::HashSet; use std::net::SocketAddr; use std::path::PathBuf; use structopt::StructOpt; -use futures::channel::oneshot; -use serde::Deserialize; -use rand::Rng; -use data::UUID; use error::Error; -use membership::System; +use rpc_client::RpcClient; +use data::*; +use proto::*; #[derive(StructOpt, Debug)] #[structopt(name = "garage")] pub struct Opt { + /// RPC connect to this host to execute client operations + #[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")] + rpc_host: SocketAddr, + + #[structopt(subcommand)] + cmd: Command, +} + +#[derive(StructOpt, Debug)] +pub enum Command { + /// Run Garage server + #[structopt(name = "server")] + Server(ServerOpt), + + /// Get network status + #[structopt(name = "status")] + Status, + + /// Configure Garage node + #[structopt(name = "configure")] + Configure(ConfigureOpt), +} + +#[derive(StructOpt, Debug)] +pub struct ServerOpt { + /// Configuration file #[structopt(short = "c", long = "config", default_value = "./config.toml")] config_file: PathBuf, } -#[derive(Deserialize, Debug)] -pub struct Config { - datacenter: String, +#[derive(StructOpt, Debug)] +pub struct ConfigureOpt { + /// Node to configure (prefix of hexadecimal node id) + node_id: String, - metadata_dir: PathBuf, - data_dir: PathBuf, - - api_port: u16, - rpc_port: u16, - - bootstrap_peers: Vec, + /// Number of tokens + n_tokens: u32, } -fn read_config(config_file: PathBuf) -> Result { - let mut file = std::fs::OpenOptions::new() - .read(true) - .open(config_file.as_path())?; - - let mut config = String::new(); - file.read_to_string(&mut config)?; - - Ok(toml::from_str(&config)?) -} - -fn gen_node_id(metadata_dir: &PathBuf) -> Result { - let mut id_file = metadata_dir.clone(); - id_file.push("node_id"); - if id_file.as_path().exists() { - let mut f = std::fs::File::open(id_file.as_path())?; - let mut d = vec![]; - f.read_to_end(&mut d)?; - if d.len() != 32 { - return Err(Error::Message(format!("Corrupt node_id file"))) - } - - let mut id = [0u8; 32]; - id.copy_from_slice(&d[..]); - Ok(id) - } else { - let id = rand::thread_rng().gen::(); - - let mut f = std::fs::File::create(id_file.as_path())?; - f.write_all(&id[..])?; - Ok(id) - } -} - -async fn shutdown_signal(chans: Vec>) { - // Wait for the CTRL+C signal - tokio::signal::ctrl_c() - .await - .expect("failed to install CTRL+C signal handler"); - for ch in chans { - ch.send(()).unwrap(); - } -} - -async fn wait_from(chan: oneshot::Receiver<()>) -> () { - chan.await.unwrap() -} #[tokio::main] async fn main() { let opt = Opt::from_args(); - let config = read_config(opt.config_file) - .expect("Unable to read config file"); - let id = gen_node_id(&config.metadata_dir) - .expect("Unable to read or generate node ID"); - println!("Node ID: {}", hex::encode(id)); + let rpc_cli = RpcClient::new(); - let sys = Arc::new(System::new(config, id)); + let resp = match opt.cmd { + Command::Server(server_opt) => { + server::run_server(server_opt.config_file).await + } + Command::Status => { + cmd_status(rpc_cli, opt.rpc_host).await + } + Command::Configure(configure_opt) => { + cmd_configure(rpc_cli, opt.rpc_host, configure_opt).await + } + }; - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - - let rpc_server = rpc::run_rpc_server(sys.clone(), wait_from(rx1)); - let api_server = api::run_api_server(sys.clone(), wait_from(rx2)); - - tokio::spawn(shutdown_signal(vec![tx1, tx2])); - tokio::spawn(sys.bootstrap()); - - let (e1, e2) = futures::join![rpc_server, api_server]; - - if let Err(e) = e1 { - eprintln!("RPC server error: {}", e) - } - - if let Err(e) = e2 { - eprintln!("API server error: {}", e) + if let Err(e) = resp { + eprintln!("Error: {}", e); } } +async fn cmd_status(rpc_cli: RpcClient, rpc_host: SocketAddr) -> Result<(), Error> { + let status = match rpc_cli.call(&rpc_host, + &Message::PullStatus, + DEFAULT_TIMEOUT).await? { + Message::AdvertiseNodesUp(nodes) => nodes, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))) + }; + let config = match rpc_cli.call(&rpc_host, + &Message::PullConfig, + DEFAULT_TIMEOUT).await? { + Message::AdvertiseConfig(cfg) => cfg, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))) + }; + + println!("Healthy nodes:"); + for adv in status.iter() { + if let Some(cfg) = config.members.get(&adv.id) { + println!("{}\t{}\t{}\t{}", hex::encode(adv.id), adv.addr, adv.datacenter, cfg.n_tokens); + } + } + + let status_keys = status.iter().map(|x| x.id.clone()).collect::>(); + if config.members.iter().any(|(id, _)| !status_keys.contains(id)) { + println!("\nFailed nodes:"); + for (id, cfg) in config.members.iter() { + if !status.iter().any(|x| x.id == *id) { + println!("{}\t{}", hex::encode(id), cfg.n_tokens); + } + } + } + + if status.iter().any(|adv| !config.members.contains_key(&adv.id)) { + println!("\nUnconfigured nodes:"); + for adv in status.iter() { + if !config.members.contains_key(&adv.id) { + println!("{}\t{}\t{}", hex::encode(adv.id), adv.addr, adv.datacenter); + } + } + } + + Ok(()) +} + +async fn cmd_configure(rpc_cli: RpcClient, rpc_host: SocketAddr, args: ConfigureOpt) -> Result<(), Error> { + let status = match rpc_cli.call(&rpc_host, + &Message::PullStatus, + DEFAULT_TIMEOUT).await? { + Message::AdvertiseNodesUp(nodes) => nodes, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))) + }; + + let mut candidates = vec![]; + for adv in status.iter() { + if hex::encode(adv.id).starts_with(&args.node_id) { + candidates.push(adv.id.clone()); + } + } + if candidates.len() != 1 { + return Err(Error::Message(format!("{} matching nodes", candidates.len()))); + } + + let mut config = match rpc_cli.call(&rpc_host, + &Message::PullConfig, + DEFAULT_TIMEOUT).await? { + Message::AdvertiseConfig(cfg) => cfg, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))) + }; + + config.members.insert(candidates[0].clone(), + NetworkConfigEntry{ + n_tokens: args.n_tokens, + }); + config.version += 1; + + rpc_cli.call(&rpc_host, + &Message::AdvertiseConfig(config), + DEFAULT_TIMEOUT).await?; + Ok(()) +} diff --git a/src/membership.rs b/src/membership.rs index 7aaa0759..aa51e0fa 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -1,18 +1,20 @@ use std::sync::Arc; +use std::path::PathBuf; +use std::io::{Read}; use std::collections::HashMap; use std::time::Duration; use std::net::{IpAddr, SocketAddr}; +use tokio::prelude::*; use futures::future::join_all; -use hyper::client::Client; use tokio::sync::RwLock; use sha2::{Sha256, Digest}; -use crate::Config; +use crate::server::Config; use crate::error::Error; use crate::data::*; use crate::proto::*; -use crate::rpc::*; +use crate::rpc_client::*; const PING_INTERVAL: Duration = Duration::from_secs(10); const PING_TIMEOUT: Duration = Duration::from_secs(2); @@ -22,7 +24,7 @@ pub struct System { pub config: Config, pub id: UUID, - pub rpc_client: Client, + pub rpc_client: RpcClient, pub members: RwLock, } @@ -73,26 +75,62 @@ pub struct NodeStatus { pub remaining_ping_attempts: usize, } +fn read_network_config(metadata_dir: &PathBuf) -> Result { + let mut path = metadata_dir.clone(); + path.push("network_config"); + + let mut file = std::fs::OpenOptions::new() + .read(true) + .open(path.as_path())?; + + let mut net_config_bytes = vec![]; + file.read_to_end(&mut net_config_bytes) + .expect("Failure when reading network_config"); + + let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..]) + .expect("Invalid or corrupt network_config file"); + + Ok(net_config) +} impl System { pub fn new(config: Config, id: UUID) -> Self { + let net_config = match read_network_config(&config.metadata_dir) { + Ok(x) => x, + Err(_) => NetworkConfig{ + members: HashMap::new(), + version: 0, + }, + }; let mut members = Members{ status: HashMap::new(), status_hash: [0u8; 32], - config: NetworkConfig{ - members: HashMap::new(), - version: 0, - }, - }; + config: net_config, + }; members.recalculate_status_hash(); System{ config, id, - rpc_client: Client::new(), + rpc_client: RpcClient::new(), members: RwLock::new(members), } } + pub async fn save_network_config(&self) { + let mut path = self.config.metadata_dir.clone(); + path.push("network_config"); + + let members = self.members.read().await; + let data = rmp_serde::encode::to_vec_named(&members.config) + .expect("Error while encoding network config"); + drop(members); + + let mut f = tokio::fs::File::create(path.as_path()).await + .expect("Could not create network_config"); + f.write_all(&data[..]).await + .expect("Could not write network_config"); + } + pub async fn make_ping(&self) -> Message { let members = self.members.read().await; Message::Ping(PingMessage{ @@ -129,7 +167,7 @@ impl System { let sys = self.clone(); let ping_msg_ref = &ping_msg; async move { - (id_option, addr.clone(), rpc_call_addr(sys, &addr, ping_msg_ref, PING_TIMEOUT).await) + (id_option, addr.clone(), sys.rpc_client.call(&addr, ping_msg_ref, PING_TIMEOUT).await) } })).await; @@ -146,6 +184,7 @@ impl System { to_advertise.push(AdvertisedNode{ id: info.id.clone(), addr: addr.clone(), + datacenter: info.datacenter.clone(), }); } if is_new || members.status_hash != info.status_hash { @@ -208,6 +247,7 @@ impl System { mem.push(AdvertisedNode{ id: node.clone(), addr: status.addr.clone(), + datacenter: status.datacenter.clone(), }); } Ok(Message::AdvertiseNodesUp(mem)) @@ -265,6 +305,7 @@ impl System { if adv.version > members.config.version { members.config = adv.clone(); tokio::spawn(self.clone().broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)); + self.save_network_config().await; } Ok(Message::Ok) diff --git a/src/proto.rs b/src/proto.rs index 3a950c6c..3e679d35 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -1,8 +1,11 @@ +use std::time::Duration; use std::net::SocketAddr; use serde::{Serialize, Deserialize}; use crate::data::*; +pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2); + #[derive(Debug, Serialize, Deserialize)] pub enum Message { Ok, @@ -29,4 +32,5 @@ pub struct PingMessage { pub struct AdvertisedNode { pub id: UUID, pub addr: SocketAddr, + pub datacenter: String, } diff --git a/src/rpc_client.rs b/src/rpc_client.rs new file mode 100644 index 00000000..057c19e8 --- /dev/null +++ b/src/rpc_client.rs @@ -0,0 +1,96 @@ +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + +use bytes::IntoBuf; +use hyper::{Body, Method, Request, StatusCode}; +use hyper::client::Client; +use futures::stream::futures_unordered::FuturesUnordered; +use futures::stream::StreamExt; + +use crate::data::*; +use crate::error::Error; +use crate::proto::Message; +use crate::membership::System; + +pub async fn rpc_call_many(sys: Arc, + to: &[UUID], + msg: &Message, + stop_after: Option, + timeout: Duration) + -> Vec> +{ + let mut resp_stream = to.iter() + .map(|to| rpc_call(sys.clone(), to, msg, timeout)) + .collect::>(); + + let mut results = vec![]; + let mut n_ok = 0; + while let Some(resp) = resp_stream.next().await { + if resp.is_ok() { + n_ok += 1 + } + results.push(resp); + if let Some(n) = stop_after { + if n_ok >= n { + break + } + } + } + results +} + +pub async fn rpc_call(sys: Arc, + to: &UUID, + msg: &Message, + timeout: Duration) + -> Result +{ + let addr = { + let members = sys.members.read().await; + match members.status.get(to) { + Some(status) => status.addr.clone(), + None => return Err(Error::Message(format!("Peer ID not found"))), + } + }; + sys.rpc_client.call(&addr, msg, timeout).await +} + +pub struct RpcClient { + pub client: Client, +} + +impl RpcClient { + pub fn new() -> Self { + RpcClient{ + client: Client::new(), + } + } + + pub async fn call(&self, + to_addr: &SocketAddr, + msg: &Message, + timeout: Duration) + -> Result + { + let uri = format!("http://{}/", to_addr); + let req = Request::builder() + .method(Method::POST) + .uri(uri) + .body(Body::from(rmp_serde::encode::to_vec_named(msg)?))?; + + let resp_fut = self.client.request(req); + let resp = tokio::time::timeout(timeout, resp_fut).await??; + + if resp.status() == StatusCode::OK { + let body = hyper::body::to_bytes(resp.into_body()).await?; + let msg = rmp_serde::decode::from_read::<_, Message>(body.into_buf())?; + match msg { + Message::Error(e) => Err(Error::RPCError(e)), + x => Ok(x) + } + } else { + Err(Error::RPCError(format!("Status code {}", resp.status()))) + } + } +} diff --git a/src/rpc.rs b/src/rpc_server.rs similarity index 52% rename from src/rpc.rs rename to src/rpc_server.rs index 5f25dafb..55c7482b 100644 --- a/src/rpc.rs +++ b/src/rpc_server.rs @@ -1,96 +1,16 @@ use std::net::SocketAddr; use std::sync::Arc; -use std::time::Duration; use bytes::IntoBuf; use hyper::service::{make_service_fn, service_fn}; use hyper::server::conn::AddrStream; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use futures::future::Future; -use futures::stream::futures_unordered::FuturesUnordered; -use futures::stream::StreamExt; -use crate::data::*; use crate::error::Error; use crate::proto::Message; use crate::membership::System; -// ---- CLIENT PART ---- - -pub async fn rpc_call_many(sys: Arc, - to: &[UUID], - msg: &Message, - stop_after: Option, - timeout: Duration) - -> Vec> -{ - let mut resp_stream = to.iter() - .map(|to| rpc_call(sys.clone(), to, msg, timeout)) - .collect::>(); - - let mut results = vec![]; - let mut n_ok = 0; - while let Some(resp) = resp_stream.next().await { - if resp.is_ok() { - n_ok += 1 - } - results.push(resp); - if let Some(n) = stop_after { - if n_ok >= n { - break - } - } - } - results -} - -// ---- - -pub async fn rpc_call(sys: Arc, - to: &UUID, - msg: &Message, - timeout: Duration) - -> Result -{ - let addr = { - let members = sys.members.read().await; - match members.status.get(to) { - Some(status) => status.addr.clone(), - None => return Err(Error::Message(format!("Peer ID not found"))), - } - }; - rpc_call_addr(sys, &addr, msg, timeout).await -} - -pub async fn rpc_call_addr(sys: Arc, - to_addr: &SocketAddr, - msg: &Message, - timeout: Duration) - -> Result -{ - let uri = format!("http://{}/", to_addr); - let req = Request::builder() - .method(Method::POST) - .uri(uri) - .body(Body::from(rmp_serde::encode::to_vec_named(msg)?))?; - - let resp_fut = sys.rpc_client.request(req); - let resp = tokio::time::timeout(timeout, resp_fut).await??; - - if resp.status() == StatusCode::OK { - let body = hyper::body::to_bytes(resp.into_body()).await?; - let msg = rmp_serde::decode::from_read::<_, Message>(body.into_buf())?; - match msg { - Message::Error(e) => Err(Error::RPCError(e)), - x => Ok(x) - } - } else { - Err(Error::RPCError(format!("Status code {}", resp.status()))) - } -} - -// ---- SERVER PART ---- - fn err_to_msg(x: Result) -> Message { match x { Err(e) => Message::Error(format!("{}", e)), diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 00000000..04f98c65 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,98 @@ +use std::io::{Read, Write}; +use std::sync::Arc; +use std::net::SocketAddr; +use std::path::PathBuf; +use futures::channel::oneshot; +use serde::Deserialize; +use rand::Rng; + +use crate::data::UUID; +use crate::error::Error; +use crate::membership::System; +use crate::api_server; +use crate::rpc_server; + +#[derive(Deserialize, Debug)] +pub struct Config { + pub datacenter: String, + + pub metadata_dir: PathBuf, + pub data_dir: PathBuf, + + pub api_port: u16, + pub rpc_port: u16, + + pub bootstrap_peers: Vec, +} + +fn read_config(config_file: PathBuf) -> Result { + let mut file = std::fs::OpenOptions::new() + .read(true) + .open(config_file.as_path())?; + + let mut config = String::new(); + file.read_to_string(&mut config)?; + + Ok(toml::from_str(&config)?) +} + +fn gen_node_id(metadata_dir: &PathBuf) -> Result { + let mut id_file = metadata_dir.clone(); + id_file.push("node_id"); + if id_file.as_path().exists() { + let mut f = std::fs::File::open(id_file.as_path())?; + let mut d = vec![]; + f.read_to_end(&mut d)?; + if d.len() != 32 { + return Err(Error::Message(format!("Corrupt node_id file"))) + } + + let mut id = [0u8; 32]; + id.copy_from_slice(&d[..]); + Ok(id) + } else { + let id = rand::thread_rng().gen::(); + + let mut f = std::fs::File::create(id_file.as_path())?; + f.write_all(&id[..])?; + Ok(id) + } +} + +async fn shutdown_signal(chans: Vec>) { + // Wait for the CTRL+C signal + tokio::signal::ctrl_c() + .await + .expect("failed to install CTRL+C signal handler"); + println!("Received CTRL+C, shutting down."); + for ch in chans { + ch.send(()).unwrap(); + } +} + +async fn wait_from(chan: oneshot::Receiver<()>) -> () { + chan.await.unwrap() +} + +pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { + let config = read_config(config_file) + .expect("Unable to read config file"); + + let id = gen_node_id(&config.metadata_dir) + .expect("Unable to read or generate node ID"); + println!("Node ID: {}", hex::encode(id)); + + let sys = Arc::new(System::new(config, id)); + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + let rpc_server = rpc_server::run_rpc_server(sys.clone(), wait_from(rx1)); + let api_server = api_server::run_api_server(sys.clone(), wait_from(rx2)); + + tokio::spawn(shutdown_signal(vec![tx1, tx2])); + tokio::spawn(sys.bootstrap()); + + futures::try_join!(rpc_server, api_server)?; + Ok(()) +}