Fix the Sync issue. Details:

So the HTTP client future of Hyper is not Sync, thus the stream
that read blocks wasn't either. However Hyper's default Body type
requires a stream to be Sync for wrap_stream. Solution: reimplement
a custom HTTP body type.
This commit is contained in:
Alex 2020-04-10 22:01:48 +02:00
parent d66c0d6833
commit 3477864142
14 changed files with 663 additions and 432 deletions

1
rustfmt.toml Normal file
View file

@ -0,0 +1 @@
hard_tabs = true

View file

@ -1,26 +1,35 @@
use std::sync::Arc;
use std::net::SocketAddr;
use core::pin::Pin;
use core::task::{Context, Poll};
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::Arc;
use futures::stream::*;
use hyper::service::{make_service_fn, service_fn};
use hyper::server::conn::AddrStream;
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use hyper::body::Bytes;
use futures::future::Future;
use futures::ready;
use futures::stream::*;
use hyper::body::{Bytes, HttpBody};
use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use crate::error::Error;
use crate::data::*;
use crate::data;
use crate::data::*;
use crate::error::Error;
use crate::proto::*;
use crate::rpc_client::*;
use crate::server::Garage;
use crate::table::EmptySortKey;
pub async fn run_api_server(garage: Arc<Garage>, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> {
let addr = ([0, 0, 0, 0], garage.system.config.api_port).into();
type BodyType = Box<dyn HttpBody<Data = Bytes, Error = Error> + Send + Unpin>;
let service = make_service_fn(|conn: &AddrStream| {
pub async fn run_api_server(
garage: Arc<Garage>,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), hyper::Error> {
let addr = ([0, 0, 0, 0], garage.system.config.api_port).into();
let service = make_service_fn(|conn: &AddrStream| {
let garage = garage.clone();
let client_addr = conn.remote_addr();
async move {
@ -31,59 +40,72 @@ pub async fn run_api_server(garage: Arc<Garage>, shutdown_signal: impl Future<Ou
}
});
let server = Server::bind(&addr).serve(service);
let server = Server::bind(&addr).serve(service);
let graceful = server.with_graceful_shutdown(shutdown_signal);
println!("API server listening on http://{}", addr);
println!("API server listening on http://{}", addr);
graceful.await
}
async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Error> {
async fn handler(
garage: Arc<Garage>,
req: Request<Body>,
addr: SocketAddr,
) -> Result<Response<BodyType>, Error> {
match handler_inner(garage, req, addr).await {
Ok(x) => Ok(x),
Err(e) => {
let mut http_error = Response::new(Body::from(format!("{}\n", e)));
let body: BodyType = Box::new(BytesBody::from(format!("{}\n", e)));
let mut http_error = Response::new(body);
*http_error.status_mut() = e.http_status_code();
Ok(http_error)
}
}
}
async fn handler_inner(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Error> {
async fn handler_inner(
garage: Arc<Garage>,
req: Request<Body>,
addr: SocketAddr,
) -> Result<Response<BodyType>, Error> {
eprintln!("{} {} {}", addr, req.method(), req.uri());
let bucket = req.headers()
let bucket = req
.headers()
.get(hyper::header::HOST)
.map(|x| x.to_str().map_err(Error::from))
.unwrap_or(Err(Error::BadRequest(format!("Host: header missing"))))?
.to_lowercase();
let key = req.uri().path().to_string();
match req.method() {
&Method::GET => {
Ok(handle_get(garage, &bucket, &key).await?)
}
match req.method() {
&Method::GET => Ok(handle_get(garage, &bucket, &key).await?),
&Method::PUT => {
let mime_type = req.headers()
let mime_type = req
.headers()
.get(hyper::header::CONTENT_TYPE)
.map(|x| x.to_str())
.unwrap_or(Ok("blob"))?
.to_string();
let version_uuid = handle_put(garage, &mime_type, &bucket, &key, req.into_body()).await?;
Ok(Response::new(Body::from(
format!("{:?}\n", version_uuid),
)))
let version_uuid =
handle_put(garage, &mime_type, &bucket, &key, req.into_body()).await?;
Ok(Response::new(Box::new(BytesBody::from(format!(
"{:?}\n",
version_uuid
)))))
}
_ => Err(Error::BadRequest(format!("Invalid method"))),
}
_ => Err(Error::BadRequest(format!("Invalid method"))),
}
}
async fn handle_put(garage: Arc<Garage>,
mime_type: &str,
bucket: &str, key: &str, body: Body)
-> Result<UUID, Error>
{
async fn handle_put(
garage: Arc<Garage>,
mime_type: &str,
bucket: &str,
key: &str,
body: Body,
) -> Result<UUID, Error> {
let version_uuid = gen_uuid();
let mut chunker = BodyChunker::new(body, garage.system.config.block_size);
@ -97,7 +119,7 @@ async fn handle_put(garage: Arc<Garage>,
key: key.into(),
versions: Vec::new(),
};
object.versions.push(Box::new(ObjectVersion{
object.versions.push(Box::new(ObjectVersion {
uuid: version_uuid.clone(),
timestamp: now_msec(),
mime_type: mime_type.to_string(),
@ -110,7 +132,7 @@ async fn handle_put(garage: Arc<Garage>,
object.versions[0].data = ObjectVersionData::Inline(first_block);
object.versions[0].is_complete = true;
garage.object_table.insert(&object).await?;
return Ok(version_uuid)
return Ok(version_uuid);
}
let version = Version {
@ -126,15 +148,22 @@ async fn handle_put(garage: Arc<Garage>,
garage.object_table.insert(&object).await?;
let mut next_offset = first_block.len();
let mut put_curr_version_block = put_version_block(garage.clone(), &version, 0, first_block_hash.clone());
let mut put_curr_version_block =
put_version_block(garage.clone(), &version, 0, first_block_hash.clone());
let mut put_curr_block = put_block(garage.clone(), first_block_hash, first_block);
loop {
let (_, _, next_block) = futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
let (_, _, next_block) =
futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
if let Some(block) = next_block {
let block_hash = hash(&block[..]);
let block_len = block.len();
put_curr_version_block = put_version_block(garage.clone(), &version, next_offset as u64, block_hash.clone());
put_curr_version_block = put_version_block(
garage.clone(),
&version,
next_offset as u64,
block_hash.clone(),
);
put_curr_block = put_block(garage.clone(), block_hash, block);
next_offset += block_len;
} else {
@ -150,27 +179,33 @@ async fn handle_put(garage: Arc<Garage>,
Ok(version_uuid)
}
async fn put_version_block(garage: Arc<Garage>, version: &Version, offset: u64, hash: Hash) -> Result<(), Error> {
async fn put_version_block(
garage: Arc<Garage>,
version: &Version,
offset: u64,
hash: Hash,
) -> Result<(), Error> {
let mut version = version.clone();
version.blocks.push(VersionBlock{
offset,
hash,
});
version.blocks.push(VersionBlock { offset, hash });
garage.version_table.insert(&version).await?;
Ok(())
}
async fn put_block(garage: Arc<Garage>, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
let who = garage.system.members.read().await
let who = garage
.system
.members
.read()
.await
.walk_ring(&hash, garage.system.config.meta_replication_factor);
rpc_try_call_many(garage.system.clone(),
&who[..],
&Message::PutBlock(PutBlockMessage{
hash,
data,
}),
(garage.system.config.meta_replication_factor+1)/2,
DEFAULT_TIMEOUT).await?;
rpc_try_call_many(
garage.system.clone(),
&who[..],
&Message::PutBlock(PutBlockMessage { hash, data }),
(garage.system.config.meta_replication_factor + 1) / 2,
DEFAULT_TIMEOUT,
)
.await?;
Ok(())
}
@ -183,7 +218,7 @@ struct BodyChunker {
impl BodyChunker {
fn new(body: Body, block_size: usize) -> Self {
Self{
Self {
body,
read_all: false,
block_size,
@ -203,26 +238,36 @@ impl BodyChunker {
if self.buf.len() == 0 {
Ok(None)
} else if self.buf.len() <= self.block_size {
let block = self.buf.drain(..)
.collect::<Vec<u8>>();
let block = self.buf.drain(..).collect::<Vec<u8>>();
Ok(Some(block))
} else {
let block = self.buf.drain(..self.block_size)
.collect::<Vec<u8>>();
let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>();
Ok(Some(block))
}
}
}
async fn handle_get(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<Response<Body>, Error> {
let mut object = match garage.object_table.get(&bucket.to_string(), &key.to_string()).await? {
async fn handle_get(
garage: Arc<Garage>,
bucket: &str,
key: &str,
) -> Result<Response<BodyType>, Error> {
let mut object = match garage
.object_table
.get(&bucket.to_string(), &key.to_string())
.await?
{
None => return Err(Error::NotFound),
Some(o) => o
Some(o) => o,
};
let last_v = match object.versions.drain(..)
.rev().filter(|v| v.is_complete)
.next() {
let last_v = match object
.versions
.drain(..)
.rev()
.filter(|v| v.is_complete)
.next()
{
Some(v) => v,
None => return Err(Error::NotFound),
};
@ -234,7 +279,8 @@ async fn handle_get(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<Resp
match last_v.data {
ObjectVersionData::DeleteMarker => Err(Error::NotFound),
ObjectVersionData::Inline(bytes) => {
Ok(resp_builder.body(bytes.into())?)
let body: BodyType = Box::new(BytesBody::from(bytes));
Ok(resp_builder.body(body)?)
}
ObjectVersionData::FirstBlock(first_block_hash) => {
let read_first_block = get_block(garage.clone(), &first_block_hash);
@ -246,42 +292,119 @@ async fn handle_get(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<Resp
None => return Err(Error::NotFound),
};
let mut blocks = version.blocks.iter()
let mut blocks = version
.blocks
.iter()
.map(|vb| (vb.hash.clone(), None))
.collect::<Vec<_>>();
blocks[0].1 = Some(first_block);
let block_futures = blocks.drain(..)
.map(move |(hash, data_opt)| async {
let body_stream = futures::stream::iter(blocks)
.map(move |(hash, data_opt)| {
let garage = garage.clone();
async move {
if let Some(data) = data_opt {
Ok(data)
Ok(Bytes::from(data))
} else {
get_block(garage.clone(), &hash).await
.map_err(|e| format!("{}", e))
get_block(garage.clone(), &hash).await.map(Bytes::from)
}
});
let body_stream = futures::stream::iter(block_futures).buffered(2);
let body = Body::wrap_stream(body_stream);
}
})
.buffered(2);
let body: BodyType = Box::new(NonSyncStreamBody {
stream: Box::pin(body_stream),
});
Ok(resp_builder.body(body)?)
}
}
}
async fn get_block(garage: Arc<Garage>, hash: &Hash) -> Result<Vec<u8>, Error> {
let who = garage.system.members.read().await
let who = garage
.system
.members
.read()
.await
.walk_ring(&hash, garage.system.config.meta_replication_factor);
let resps = rpc_try_call_many(garage.system.clone(),
&who[..],
&Message::GetBlock(hash.clone()),
1,
DEFAULT_TIMEOUT).await?;
let resps = rpc_try_call_many(
garage.system.clone(),
&who[..],
&Message::GetBlock(hash.clone()),
1,
DEFAULT_TIMEOUT,
)
.await?;
for resp in resps {
if let Message::PutBlock(pbm) = resp {
if data::hash(&pbm.data) == *hash {
return Ok(pbm.data)
return Ok(pbm.data);
}
}
}
Err(Error::Message(format!("No valid blocks returned")))
}
pub struct NonSyncStreamBody {
pub stream: Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>,
}
impl HttpBody for NonSyncStreamBody {
type Data = Bytes;
type Error = Error;
fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Bytes, Self::Error>>> {
match ready!(self.stream.as_mut().poll_next(cx)) {
Some(res) => Poll::Ready(Some(res)),
None => Poll::Ready(None),
}
}
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> {
Poll::Ready(Ok(None))
}
}
pub struct BytesBody {
pub bytes: Option<Bytes>,
}
impl HttpBody for BytesBody {
type Data = Bytes;
type Error = Error;
fn poll_data(
mut self: Pin<&mut Self>,
_cx: &mut Context,
) -> Poll<Option<Result<Bytes, Self::Error>>> {
Poll::Ready(self.bytes.take().map(Ok))
}
fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context,
) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> {
Poll::Ready(Ok(None))
}
}
impl From<String> for BytesBody {
fn from(x: String) -> BytesBody {
BytesBody {
bytes: Some(Bytes::from(x.into_bytes())),
}
}
}
impl From<Vec<u8>> for BytesBody {
fn from(x: Vec<u8>) -> BytesBody {
BytesBody {
bytes: Some(Bytes::from(x)),
}
}
}

View file

@ -1,13 +1,13 @@
use std::sync::Arc;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::fs;
use tokio::prelude::*;
use crate::error::Error;
use crate::server::Garage;
use crate::proto::*;
use crate::data::*;
use crate::error::Error;
use crate::proto::*;
use crate::server::Garage;
fn block_dir(garage: &Garage, hash: &Hash) -> PathBuf {
let mut path = garage.system.config.data_dir.clone();
@ -24,7 +24,7 @@ pub async fn write_block(garage: Arc<Garage>, hash: &Hash, data: &[u8]) -> Resul
path.push(hex::encode(hash));
if fs::metadata(&path).await.is_ok() {
return Ok(Message::Ok)
return Ok(Message::Ok);
}
let mut f = fs::File::create(path).await?;
@ -42,7 +42,7 @@ pub async fn read_block(garage: Arc<Garage>, hash: &Hash) -> Result<Message, Err
let mut data = vec![];
f.read_to_end(&mut data).await?;
Ok(Message::PutBlock(PutBlockMessage{
Ok(Message::PutBlock(PutBlockMessage {
hash: hash.clone(),
data,
}))

View file

@ -1,10 +1,10 @@
use std::time::{SystemTime, UNIX_EPOCH};
use std::fmt;
use std::collections::HashMap;
use serde::{Serializer, Deserializer, Serialize, Deserialize};
use serde::de::{self, Visitor};
use rand::Rng;
use sha2::{Sha256, Digest};
use serde::de::{self, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::fmt;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq)]
pub struct FixedBytes32([u8; 32]);
@ -43,7 +43,10 @@ impl<'de> Visitor<'de> for FixedBytes32Visitor {
res.copy_from_slice(value);
Ok(res.into())
} else {
Err(E::custom(format!("Invalid byte string length {}, expected 32", value.len())))
Err(E::custom(format!(
"Invalid byte string length {}, expected 32",
value.len()
)))
}
}
}
@ -88,7 +91,8 @@ pub fn gen_uuid() -> UUID {
}
pub fn now_msec() -> u64 {
SystemTime::now().duration_since(UNIX_EPOCH)
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Fix your clock :o")
.as_millis() as u64
}
@ -96,7 +100,8 @@ pub fn now_msec() -> u64 {
// RMP serialization with names of fields and variants
pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
where T: Serialize + ?Sized
where
T: Serialize + ?Sized,
{
let mut wr = Vec::with_capacity(128);
let mut se = rmp_serde::Serializer::new(&mut wr)
@ -104,7 +109,6 @@ where T: Serialize + ?Sized
.with_string_variants();
val.serialize(&mut se)?;
Ok(wr)
}
// Network management

View file

@ -1,6 +1,6 @@
use std::io;
use err_derive::Error;
use hyper::StatusCode;
use std::io;
#[derive(Debug, Error)]
pub enum Error {
@ -13,8 +13,8 @@ pub enum Error {
#[error(display = "HTTP error: {}", _0)]
HTTP(#[error(source)] http::Error),
#[error(display = "Invalid HTTP header value: {}", _0)]
HTTPHeader(#[error(source)] http::header::ToStrError),
#[error(display = "Invalid HTTP header value: {}", _0)]
HTTPHeader(#[error(source)] http::header::ToStrError),
#[error(display = "Sled error: {}", _0)]
Sled(#[error(source)] sled::Error),

View file

@ -1,28 +1,28 @@
mod error;
mod data;
mod error;
mod proto;
mod membership;
mod table;
mod block;
mod object_table;
mod version_table;
mod block;
mod server;
mod rpc_server;
mod rpc_client;
mod api_server;
mod rpc_client;
mod rpc_server;
mod server;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::path::PathBuf;
use structopt::StructOpt;
use error::Error;
use rpc_client::RpcClient;
use data::*;
use error::Error;
use proto::*;
use rpc_client::RpcClient;
#[derive(StructOpt, Debug)]
#[structopt(name = "garage")]
@ -69,7 +69,6 @@ pub struct ConfigureOpt {
n_tokens: u32,
}
#[tokio::main]
async fn main() {
let opt = Opt::from_args();
@ -77,12 +76,8 @@ async fn main() {
let rpc_cli = RpcClient::new();
let resp = match opt.cmd {
Command::Server(server_opt) => {
server::run_server(server_opt.config_file).await
}
Command::Status => {
cmd_status(rpc_cli, opt.rpc_host).await
}
Command::Server(server_opt) => server::run_server(server_opt.config_file).await,
Command::Status => cmd_status(rpc_cli, opt.rpc_host).await,
Command::Configure(configure_opt) => {
cmd_configure(rpc_cli, opt.rpc_host, configure_opt).await
}
@ -94,28 +89,40 @@ async fn main() {
}
async fn cmd_status(rpc_cli: RpcClient, rpc_host: SocketAddr) -> Result<(), Error> {
let status = match rpc_cli.call(&rpc_host,
&Message::PullStatus,
DEFAULT_TIMEOUT).await? {
let status = match rpc_cli
.call(&rpc_host, &Message::PullStatus, DEFAULT_TIMEOUT)
.await?
{
Message::AdvertiseNodesUp(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp)))
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let config = match rpc_cli.call(&rpc_host,
&Message::PullConfig,
DEFAULT_TIMEOUT).await? {
let config = match rpc_cli
.call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT)
.await?
{
Message::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp)))
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
println!("Healthy nodes:");
for adv in status.iter() {
if let Some(cfg) = config.members.get(&adv.id) {
println!("{}\t{}\t{}\t{}", hex::encode(&adv.id), cfg.datacenter, cfg.n_tokens, adv.addr);
println!(
"{}\t{}\t{}\t{}",
hex::encode(&adv.id),
cfg.datacenter,
cfg.n_tokens,
adv.addr
);
}
}
let status_keys = status.iter().map(|x| x.id.clone()).collect::<HashSet<_>>();
if config.members.iter().any(|(id, _)| !status_keys.contains(id)) {
if config
.members
.iter()
.any(|(id, _)| !status_keys.contains(id))
{
println!("\nFailed nodes:");
for (id, cfg) in config.members.iter() {
if !status.iter().any(|x| x.id == *id) {
@ -124,7 +131,10 @@ async fn cmd_status(rpc_cli: RpcClient, rpc_host: SocketAddr) -> Result<(), Erro
}
}
if status.iter().any(|adv| !config.members.contains_key(&adv.id)) {
if status
.iter()
.any(|adv| !config.members.contains_key(&adv.id))
{
println!("\nUnconfigured nodes:");
for adv in status.iter() {
if !config.members.contains_key(&adv.id) {
@ -136,12 +146,17 @@ async fn cmd_status(rpc_cli: RpcClient, rpc_host: SocketAddr) -> Result<(), Erro
Ok(())
}
async fn cmd_configure(rpc_cli: RpcClient, rpc_host: SocketAddr, args: ConfigureOpt) -> Result<(), Error> {
let status = match rpc_cli.call(&rpc_host,
&Message::PullStatus,
DEFAULT_TIMEOUT).await? {
async fn cmd_configure(
rpc_cli: RpcClient,
rpc_host: SocketAddr,
args: ConfigureOpt,
) -> Result<(), Error> {
let status = match rpc_cli
.call(&rpc_host, &Message::PullStatus, DEFAULT_TIMEOUT)
.await?
{
Message::AdvertiseNodesUp(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp)))
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let mut candidates = vec![];
@ -151,25 +166,35 @@ async fn cmd_configure(rpc_cli: RpcClient, rpc_host: SocketAddr, args: Configure
}
}
if candidates.len() != 1 {
return Err(Error::Message(format!("{} matching nodes", candidates.len())));
return Err(Error::Message(format!(
"{} matching nodes",
candidates.len()
)));
}
let mut config = match rpc_cli.call(&rpc_host,
&Message::PullConfig,
DEFAULT_TIMEOUT).await? {
let mut config = match rpc_cli
.call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT)
.await?
{
Message::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp)))
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
config.members.insert(candidates[0].clone(),
NetworkConfigEntry{
datacenter: args.datacenter,
n_tokens: args.n_tokens,
});
config.members.insert(
candidates[0].clone(),
NetworkConfigEntry {
datacenter: args.datacenter,
n_tokens: args.n_tokens,
},
);
config.version += 1;
rpc_cli.call(&rpc_host,
&Message::AdvertiseConfig(config),
DEFAULT_TIMEOUT).await?;
rpc_cli
.call(
&rpc_host,
&Message::AdvertiseConfig(config),
DEFAULT_TIMEOUT,
)
.await?;
Ok(())
}

View file

@ -1,22 +1,22 @@
use std::sync::Arc;
use std::collections::HashMap;
use std::hash::Hash as StdHash;
use std::hash::Hasher;
use std::path::PathBuf;
use std::io::{Read};
use std::collections::HashMap;
use std::time::Duration;
use std::io::Read;
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use sha2::{Sha256, Digest};
use tokio::prelude::*;
use futures::future::join_all;
use sha2::{Digest, Sha256};
use tokio::prelude::*;
use tokio::sync::RwLock;
use crate::server::Config;
use crate::error::Error;
use crate::data::*;
use crate::error::Error;
use crate::proto::*;
use crate::rpc_client::*;
use crate::server::Config;
const PING_INTERVAL: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
@ -36,8 +36,8 @@ pub struct Members {
pub status_hash: Hash,
pub config: NetworkConfig,
pub ring: Vec<RingEntry>,
pub n_datacenters: usize,
pub ring: Vec<RingEntry>,
pub n_datacenters: usize,
}
pub struct NodeStatus {
@ -47,19 +47,21 @@ pub struct NodeStatus {
#[derive(Debug)]
pub struct RingEntry {
pub location: Hash,
pub node: UUID,
pub datacenter: u64,
pub location: Hash,
pub node: UUID,
pub datacenter: u64,
}
impl Members {
fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
let addr = SocketAddr::new(ip, info.rpc_port);
let old_status = self.status.insert(info.id.clone(),
NodeStatus{
let old_status = self.status.insert(
info.id.clone(),
NodeStatus {
addr: addr.clone(),
remaining_ping_attempts: MAX_FAILED_PINGS,
});
},
);
match old_status {
None => {
eprintln!("Newly pingable node: {}", hex::encode(&info.id));
@ -80,122 +82,129 @@ impl Members {
hasher.input(format!("{} {}\n", hex::encode(&id), status.addr));
}
eprintln!("END --");
self.status_hash.as_slice_mut().copy_from_slice(&hasher.result()[..]);
self.status_hash
.as_slice_mut()
.copy_from_slice(&hasher.result()[..]);
}
fn rebuild_ring(&mut self) {
let mut new_ring = vec![];
let mut datacenters = vec![];
fn rebuild_ring(&mut self) {
let mut new_ring = vec![];
let mut datacenters = vec![];
for (id, config) in self.config.members.iter() {
let mut dc_hasher = std::collections::hash_map::DefaultHasher::new();
config.datacenter.hash(&mut dc_hasher);
let datacenter = dc_hasher.finish();
for (id, config) in self.config.members.iter() {
let mut dc_hasher = std::collections::hash_map::DefaultHasher::new();
config.datacenter.hash(&mut dc_hasher);
let datacenter = dc_hasher.finish();
if !datacenters.contains(&datacenter) {
datacenters.push(datacenter);
}
if !datacenters.contains(&datacenter) {
datacenters.push(datacenter);
}
for i in 0..config.n_tokens {
let location = hash(format!("{} {}", hex::encode(&id), i).as_bytes());
for i in 0..config.n_tokens {
let location = hash(format!("{} {}", hex::encode(&id), i).as_bytes());
new_ring.push(RingEntry{
location: location.into(),
node: id.clone(),
datacenter,
})
}
}
new_ring.push(RingEntry {
location: location.into(),
node: id.clone(),
datacenter,
})
}
}
new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location));
self.ring = new_ring;
self.n_datacenters = datacenters.len();
new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location));
self.ring = new_ring;
self.n_datacenters = datacenters.len();
eprintln!("RING: --");
for e in self.ring.iter() {
eprintln!("{:?}", e);
}
eprintln!("END --");
}
}
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
if n >= self.config.members.len() {
return self.config.members.keys().cloned().collect::<Vec<_>>();
}
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
if n >= self.config.members.len() {
return self.config.members.keys().cloned().collect::<Vec<_>>();
}
let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) {
Ok(i) => i,
Err(i) => if i == 0 {
self.ring.len() - 1
} else {
i - 1
}
};
let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) {
Ok(i) => i,
Err(i) => {
if i == 0 {
self.ring.len() - 1
} else {
i - 1
}
}
};
self.walk_ring_from_pos(start, n)
}
}
fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> {
let mut ret = vec![];
let mut datacenters = vec![];
let mut ret = vec![];
let mut datacenters = vec![];
for delta in 0..self.ring.len() {
if ret.len() == n {
break;
}
for delta in 0..self.ring.len() {
if ret.len() == n {
break;
}
let i = (start + delta) % self.ring.len();
let i = (start + delta) % self.ring.len();
if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) {
ret.push(self.ring[i].node.clone());
} else if !datacenters.contains(&self.ring[i].datacenter) {
ret.push(self.ring[i].node.clone());
datacenters.push(self.ring[i].datacenter);
}
}
if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) {
ret.push(self.ring[i].node.clone());
} else if !datacenters.contains(&self.ring[i].datacenter) {
ret.push(self.ring[i].node.clone());
datacenters.push(self.ring[i].datacenter);
}
}
ret
ret
}
}
fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
let mut path = metadata_dir.clone();
path.push("network_config");
let mut path = metadata_dir.clone();
path.push("network_config");
let mut file = std::fs::OpenOptions::new()
.read(true)
.open(path.as_path())?;
let mut net_config_bytes = vec![];
file.read_to_end(&mut net_config_bytes)?;
let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])?;
let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])?;
Ok(net_config)
}
impl System {
pub fn new(config: Config, id: UUID) -> Self {
let net_config = match read_network_config(&config.metadata_dir) {
Ok(x) => x,
Err(e) => {
println!("No valid previous network configuration stored ({}), starting fresh.", e);
NetworkConfig{
let net_config = match read_network_config(&config.metadata_dir) {
Ok(x) => x,
Err(e) => {
println!(
"No valid previous network configuration stored ({}), starting fresh.",
e
);
NetworkConfig {
members: HashMap::new(),
version: 0,
}
},
};
let mut members = Members{
status: HashMap::new(),
status_hash: Hash::default(),
config: net_config,
ring: Vec::new(),
n_datacenters: 0,
};
}
};
let mut members = Members {
status: HashMap::new(),
status_hash: Hash::default(),
config: net_config,
ring: Vec::new(),
n_datacenters: 0,
};
members.recalculate_status_hash();
members.rebuild_ring();
System{
members.rebuild_ring();
System {
config,
id,
rpc_client: RpcClient::new(),
@ -203,24 +212,26 @@ impl System {
}
}
async fn save_network_config(self: Arc<Self>) {
let mut path = self.config.metadata_dir.clone();
path.push("network_config");
async fn save_network_config(self: Arc<Self>) {
let mut path = self.config.metadata_dir.clone();
path.push("network_config");
let members = self.members.read().await;
let data = rmp_to_vec_all_named(&members.config)
.expect("Error while encoding network config");
drop(members);
let members = self.members.read().await;
let data =
rmp_to_vec_all_named(&members.config).expect("Error while encoding network config");
drop(members);
let mut f = tokio::fs::File::create(path.as_path()).await
.expect("Could not create network_config");
f.write_all(&data[..]).await
.expect("Could not write network_config");
}
let mut f = tokio::fs::File::create(path.as_path())
.await
.expect("Could not create network_config");
f.write_all(&data[..])
.await
.expect("Could not write network_config");
}
pub async fn make_ping(&self) -> Message {
let members = self.members.read().await;
Message::Ping(PingMessage{
Message::Ping(PingMessage {
id: self.id.clone(),
rpc_port: self.config.rpc_port,
status_hash: members.status_hash.clone(),
@ -230,13 +241,20 @@ impl System {
pub async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) {
let members = self.members.read().await;
let to = members.status.keys().filter(|x| **x != self.id).cloned().collect::<Vec<_>>();
let to = members
.status
.keys()
.filter(|x| **x != self.id)
.cloned()
.collect::<Vec<_>>();
drop(members);
rpc_call_many(self.clone(), &to[..], &msg, timeout).await;
}
pub async fn bootstrap(self: Arc<Self>) {
let bootstrap_peers = self.config.bootstrap_peers
let bootstrap_peers = self
.config
.bootstrap_peers
.iter()
.map(|ip| (ip.clone(), None))
.collect::<Vec<_>>();
@ -247,16 +265,19 @@ impl System {
pub async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
let ping_msg = self.make_ping().await;
let ping_resps = join_all(
peers.iter()
.map(|(addr, id_option)| {
let sys = self.clone();
let ping_msg_ref = &ping_msg;
async move {
(id_option, addr.clone(), sys.rpc_client.call(&addr, ping_msg_ref, PING_TIMEOUT).await)
}
})).await;
let ping_resps = join_all(peers.iter().map(|(addr, id_option)| {
let sys = self.clone();
let ping_msg_ref = &ping_msg;
async move {
(
id_option,
addr.clone(),
sys.rpc_client.call(&addr, ping_msg_ref, PING_TIMEOUT).await,
)
}
}))
.await;
let mut members = self.members.write().await;
let mut has_changes = false;
@ -267,7 +288,7 @@ impl System {
let is_new = members.handle_ping(addr.ip(), &info);
if is_new {
has_changes = true;
to_advertise.push(AdvertisedNode{
to_advertise.push(AdvertisedNode {
id: info.id.clone(),
addr: addr.clone(),
});
@ -279,9 +300,16 @@ impl System {
tokio::spawn(self.clone().pull_config(info.id.clone()));
}
} else if let Some(id) = id_option {
let remaining_attempts = members.status.get(id).map(|x| x.remaining_ping_attempts).unwrap_or(0);
let remaining_attempts = members
.status
.get(id)
.map(|x| x.remaining_ping_attempts)
.unwrap_or(0);
if remaining_attempts == 0 {
eprintln!("Removing node {} after too many failed pings", hex::encode(&id));
eprintln!(
"Removing node {} after too many failed pings",
hex::encode(&id)
);
members.status.remove(&id);
has_changes = true;
} else {
@ -297,15 +325,16 @@ impl System {
drop(members);
if to_advertise.len() > 0 {
self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT).await;
self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT)
.await;
}
}
pub async fn handle_ping(self: Arc<Self>,
from: &SocketAddr,
ping: &PingMessage)
-> Result<Message, Error>
{
pub async fn handle_ping(
self: Arc<Self>,
from: &SocketAddr,
ping: &PingMessage,
) -> Result<Message, Error> {
let mut members = self.members.write().await;
let is_new = members.handle_ping(from.ip(), ping);
if is_new {
@ -329,7 +358,7 @@ impl System {
let members = self.members.read().await;
let mut mem = vec![];
for (node, status) in members.status.iter() {
mem.push(AdvertisedNode{
mem.push(AdvertisedNode {
id: node.clone(),
addr: status.addr.clone(),
});
@ -342,10 +371,10 @@ impl System {
Ok(Message::AdvertiseConfig(members.config.clone()))
}
pub async fn handle_advertise_nodes_up(self: Arc<Self>,
adv: &[AdvertisedNode])
-> Result<Message, Error>
{
pub async fn handle_advertise_nodes_up(
self: Arc<Self>,
adv: &[AdvertisedNode],
) -> Result<Message, Error> {
let mut to_ping = vec![];
let mut members = self.members.write().await;
@ -355,11 +384,13 @@ impl System {
if node.id == self.id {
// learn our own ip address
let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port);
let old_self = members.status.insert(node.id.clone(),
NodeStatus{
let old_self = members.status.insert(
node.id.clone(),
NodeStatus {
addr: self_addr,
remaining_ping_attempts: MAX_FAILED_PINGS,
});
},
);
has_changed = match old_self {
None => true,
Some(x) => x.addr != self_addr,
@ -380,18 +411,20 @@ impl System {
Ok(Message::Ok)
}
pub async fn handle_advertise_config(self: Arc<Self>,
adv: &NetworkConfig)
-> Result<Message, Error>
{
pub async fn handle_advertise_config(
self: Arc<Self>,
adv: &NetworkConfig,
) -> Result<Message, Error> {
let mut members = self.members.write().await;
if adv.version > members.config.version {
members.config = adv.clone();
members.rebuild_ring();
members.rebuild_ring();
tokio::spawn(self.clone().broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT));
tokio::spawn(self.clone().save_network_config());
tokio::spawn(
self.clone()
.broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT),
);
tokio::spawn(self.clone().save_network_config());
}
Ok(Message::Ok)
@ -400,12 +433,14 @@ impl System {
pub async fn ping_loop(self: Arc<Self>) {
loop {
let restart_at = tokio::time::delay_for(PING_INTERVAL);
let members = self.members.read().await;
let ping_addrs = members.status.iter()
.filter(|(id, _)| **id != self.id)
.map(|(id, status)| (status.addr.clone(), Some(id.clone())))
.collect::<Vec<_>>();
let ping_addrs = members
.status
.iter()
.filter(|(id, _)| **id != self.id)
.map(|(id, status)| (status.addr.clone(), Some(id.clone())))
.collect::<Vec<_>>();
drop(members);
self.clone().ping_nodes(ping_addrs).await;
@ -414,12 +449,12 @@ impl System {
}
}
pub fn pull_status(self: Arc<Self>, peer: UUID) -> impl futures::future::Future<Output=()> + Send + 'static {
pub fn pull_status(
self: Arc<Self>,
peer: UUID