forked from Deuxfleurs/garage
Ununderstandable error
This commit is contained in:
parent
3c36b449a3
commit
87f2b4d2fc
4 changed files with 188 additions and 40 deletions
14
src/data.rs
14
src/data.rs
|
@ -1,9 +1,23 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
use serde::{Serialize, Deserialize};
|
use serde::{Serialize, Deserialize};
|
||||||
|
|
||||||
pub type UUID = [u8; 32];
|
pub type UUID = [u8; 32];
|
||||||
pub type Hash = [u8; 32];
|
pub type Hash = [u8; 32];
|
||||||
|
|
||||||
|
|
||||||
|
// Network management
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct NetworkConfig {
|
||||||
|
pub members: HashMap<UUID, NetworkConfigEntry>,
|
||||||
|
pub version: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct NetworkConfigEntry {
|
||||||
|
pub n_tokens: u32,
|
||||||
|
}
|
||||||
|
|
||||||
// Data management
|
// Data management
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
|
|
@ -29,33 +29,41 @@ pub struct System {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Members {
|
pub struct Members {
|
||||||
pub present: Vec<UUID>,
|
|
||||||
pub status: HashMap<UUID, NodeStatus>,
|
pub status: HashMap<UUID, NodeStatus>,
|
||||||
pub status_hash: Hash,
|
pub status_hash: Hash,
|
||||||
|
|
||||||
pub config: HashMap<UUID, NodeConfig>,
|
pub config: NetworkConfig,
|
||||||
pub config_version: u64,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Members {
|
impl Members {
|
||||||
fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) {
|
fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
|
||||||
match self.present.binary_search(&info.id) {
|
|
||||||
Ok(pos) => {}
|
|
||||||
Err(pos) => self.present.insert(pos, info.id.clone()),
|
|
||||||
}
|
|
||||||
self.status.insert(info.id.clone(),
|
self.status.insert(info.id.clone(),
|
||||||
NodeStatus{
|
NodeStatus{
|
||||||
addr: SocketAddr::new(ip, info.rpc_port),
|
addr: SocketAddr::new(ip, info.rpc_port),
|
||||||
remaining_ping_attempts: MAX_FAILED_PINGS,
|
remaining_ping_attempts: MAX_FAILED_PINGS,
|
||||||
});
|
}).is_none()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_advertise_node(&mut self, id: &UUID, addr: &SocketAddr) -> bool {
|
||||||
|
if !self.status.contains_key(id) {
|
||||||
|
self.status.insert(id.clone(),
|
||||||
|
NodeStatus{
|
||||||
|
addr: addr.clone(),
|
||||||
|
remaining_ping_attempts: MAX_FAILED_PINGS,
|
||||||
|
});
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn recalculate_status_hash(&mut self) {
|
fn recalculate_status_hash(&mut self) {
|
||||||
|
let mut nodes = self.status.iter().collect::<Vec<_>>();
|
||||||
|
nodes.sort_by_key(|(id, _status)| *id);
|
||||||
|
|
||||||
let mut hasher = Sha256::new();
|
let mut hasher = Sha256::new();
|
||||||
for node in self.present.iter() {
|
for (id, status) in nodes {
|
||||||
if let Some(status) = self.status.get(node) {
|
hasher.input(format!("{} {}\n", hex::encode(id), status.addr));
|
||||||
hasher.input(format!("{} {}\n", hex::encode(node), status.addr));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
self.status_hash.copy_from_slice(&hasher.result()[..]);
|
self.status_hash.copy_from_slice(&hasher.result()[..]);
|
||||||
}
|
}
|
||||||
|
@ -66,9 +74,6 @@ pub struct NodeStatus {
|
||||||
pub remaining_ping_attempts: usize,
|
pub remaining_ping_attempts: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct NodeConfig {
|
|
||||||
pub n_tokens: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl System {
|
impl System {
|
||||||
pub fn new(config: Config, id: UUID) -> Self {
|
pub fn new(config: Config, id: UUID) -> Self {
|
||||||
|
@ -77,26 +82,31 @@ impl System {
|
||||||
id,
|
id,
|
||||||
rpc_client: Client::new(),
|
rpc_client: Client::new(),
|
||||||
members: RwLock::new(Members{
|
members: RwLock::new(Members{
|
||||||
present: Vec::new(),
|
|
||||||
status: HashMap::new(),
|
status: HashMap::new(),
|
||||||
status_hash: [0u8; 32],
|
status_hash: [0u8; 32],
|
||||||
config: HashMap::new(),
|
config: NetworkConfig{
|
||||||
config_version: 0,
|
members: HashMap::new(),
|
||||||
|
version: 0,
|
||||||
|
},
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn make_ping(&self) -> Message {
|
pub async fn make_ping(&self) -> Message {
|
||||||
|
let members = self.members.read().await;
|
||||||
Message::Ping(PingMessage{
|
Message::Ping(PingMessage{
|
||||||
id: self.id,
|
id: self.id,
|
||||||
rpc_port: self.config.rpc_port,
|
rpc_port: self.config.rpc_port,
|
||||||
present_hash: self.members.read().await.status_hash.clone(),
|
status_hash: members.status_hash.clone(),
|
||||||
config_version: 0,
|
config_version: members.config.version,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn broadcast(&self) -> Vec<UUID> {
|
pub async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) {
|
||||||
self.members.read().await.present.clone()
|
let members = self.members.read().await;
|
||||||
|
let to = members.status.keys().cloned().collect::<Vec<_>>();
|
||||||
|
drop(members);
|
||||||
|
rpc_call_many(self.clone(), &to[..], &msg, None, timeout).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn bootstrap(self: Arc<Self>) {
|
pub async fn bootstrap(self: Arc<Self>) {
|
||||||
|
@ -115,19 +125,12 @@ impl System {
|
||||||
for (addr, ping_resp) in ping_resps {
|
for (addr, ping_resp) in ping_resps {
|
||||||
if let Ok(Message::Ping(info)) = ping_resp {
|
if let Ok(Message::Ping(info)) = ping_resp {
|
||||||
members.handle_ping(addr.ip(), &info);
|
members.handle_ping(addr.ip(), &info);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
members.recalculate_status_hash();
|
members.recalculate_status_hash();
|
||||||
drop(members);
|
drop(members);
|
||||||
|
|
||||||
let resps = rpc_call_many_addr(self.clone(),
|
tokio::spawn(self.ping_loop());
|
||||||
&self.config.bootstrap_peers,
|
|
||||||
&ping_msg,
|
|
||||||
None,
|
|
||||||
PING_TIMEOUT).await;
|
|
||||||
|
|
||||||
unimplemented!() //TODO
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_ping(self: Arc<Self>,
|
pub async fn handle_ping(self: Arc<Self>,
|
||||||
|
@ -136,17 +139,141 @@ impl System {
|
||||||
-> Result<Message, Error>
|
-> Result<Message, Error>
|
||||||
{
|
{
|
||||||
let mut members = self.members.write().await;
|
let mut members = self.members.write().await;
|
||||||
members.handle_ping(from.ip(), ping);
|
let is_new = members.handle_ping(from.ip(), ping);
|
||||||
members.recalculate_status_hash();
|
members.recalculate_status_hash();
|
||||||
|
let status_hash = members.status_hash.clone();
|
||||||
|
let config_version = members.config.version;
|
||||||
drop(members);
|
drop(members);
|
||||||
|
|
||||||
|
if is_new || status_hash != ping.status_hash {
|
||||||
|
tokio::spawn(self.clone().pull_status(ping.id.clone()));
|
||||||
|
}
|
||||||
|
if is_new || config_version < ping.config_version {
|
||||||
|
tokio::spawn(self.clone().pull_config(ping.id.clone()));
|
||||||
|
}
|
||||||
|
|
||||||
Ok(self.make_ping().await)
|
Ok(self.make_ping().await)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_advertise_node(self: Arc<Self>,
|
pub async fn handle_pull_status(&self) -> Result<Message, Error> {
|
||||||
ping: &AdvertiseNodeMessage)
|
let members = self.members.read().await;
|
||||||
|
let mut mem = vec![];
|
||||||
|
for (node, status) in members.status.iter() {
|
||||||
|
mem.push(AdvertisedNode{
|
||||||
|
id: node.clone(),
|
||||||
|
addr: status.addr.clone(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Ok(Message::AdvertiseNodesUp(mem))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handle_pull_config(&self) -> Result<Message, Error> {
|
||||||
|
let members = self.members.read().await;
|
||||||
|
Ok(Message::AdvertiseConfig(members.config.clone()))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handle_advertise_nodes_up(self: Arc<Self>,
|
||||||
|
adv: &[AdvertisedNode])
|
||||||
-> Result<Message, Error>
|
-> Result<Message, Error>
|
||||||
{
|
{
|
||||||
unimplemented!() //TODO
|
let mut propagate = vec![];
|
||||||
|
|
||||||
|
let mut members = self.members.write().await;
|
||||||
|
for node in adv.iter() {
|
||||||
|
let is_new = members.handle_advertise_node(&node.id, &node.addr);
|
||||||
|
if is_new {
|
||||||
|
tokio::spawn(self.clone().pull_status(node.id.clone()));
|
||||||
|
tokio::spawn(self.clone().pull_config(node.id.clone()));
|
||||||
|
propagate.push(node.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if propagate.len() > 0 {
|
||||||
|
tokio::spawn(self.clone().broadcast(Message::AdvertiseNodesUp(propagate), PING_TIMEOUT));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Message::Ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handle_advertise_config(self: Arc<Self>,
|
||||||
|
adv: &NetworkConfig)
|
||||||
|
-> Result<Message, Error>
|
||||||
|
{
|
||||||
|
let mut members = self.members.write().await;
|
||||||
|
if adv.version > members.config.version {
|
||||||
|
members.config = adv.clone();
|
||||||
|
tokio::spawn(self.clone().broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Message::Ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn ping_loop(self: Arc<Self>) {
|
||||||
|
loop {
|
||||||
|
let restart_at = tokio::time::delay_for(PING_INTERVAL);
|
||||||
|
|
||||||
|
let members = self.members.read().await;
|
||||||
|
let ping_addrs = members.status.iter()
|
||||||
|
.map(|(id, status)| (id.clone(), status.addr.clone()))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
drop(members);
|
||||||
|
|
||||||
|
let ping_msg = self.make_ping().await;
|
||||||
|
let ping_resps = join_all(
|
||||||
|
ping_addrs.iter()
|
||||||
|
.map(|(id, addr)| {
|
||||||
|
let sys = self.clone();
|
||||||
|
let ping_msg_ref = &ping_msg;
|
||||||
|
async move {
|
||||||
|
(id, addr.clone(), rpc_call_addr(sys, &addr, ping_msg_ref, PING_TIMEOUT).await)
|
||||||
|
}
|
||||||
|
})).await;
|
||||||
|
|
||||||
|
let mut members = self.members.write().await;
|
||||||
|
for (id, addr, ping_resp) in ping_resps {
|
||||||
|
if let Ok(Message::Ping(ping)) = ping_resp {
|
||||||
|
let is_new = members.handle_ping(addr.ip(), &ping);
|
||||||
|
if is_new || members.status_hash != ping.status_hash {
|
||||||
|
tokio::spawn(self.clone().pull_status(ping.id.clone()));
|
||||||
|
}
|
||||||
|
if is_new || members.config.version < ping.config_version {
|
||||||
|
tokio::spawn(self.clone().pull_config(ping.id.clone()));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let remaining_attempts = members.status.get(id).map(|x| x.remaining_ping_attempts).unwrap_or(0);
|
||||||
|
if remaining_attempts == 0 {
|
||||||
|
eprintln!("Removing node {} after too many failed pings", hex::encode(id));
|
||||||
|
members.status.remove(id);
|
||||||
|
} else {
|
||||||
|
if let Some(st) = members.status.get_mut(id) {
|
||||||
|
st.remaining_ping_attempts = remaining_attempts - 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
drop(members);
|
||||||
|
|
||||||
|
restart_at.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn pull_status(self: Arc<Self>, peer: UUID) {
|
||||||
|
let resp = rpc_call(self.clone(),
|
||||||
|
&peer,
|
||||||
|
&Message::PullStatus,
|
||||||
|
PING_TIMEOUT).await;
|
||||||
|
if let Ok(Message::AdvertiseNodesUp(nodes)) = resp {
|
||||||
|
self.handle_advertise_nodes_up(&nodes).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn pull_config(self: Arc<Self>, peer: UUID) {
|
||||||
|
let resp = rpc_call(self.clone(),
|
||||||
|
&peer,
|
||||||
|
&Message::PullConfig,
|
||||||
|
PING_TIMEOUT).await;
|
||||||
|
if let Ok(Message::AdvertiseConfig(config)) = resp {
|
||||||
|
self.handle_advertise_config(&config).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
11
src/proto.rs
11
src/proto.rs
|
@ -9,7 +9,10 @@ pub enum Message {
|
||||||
Error(String),
|
Error(String),
|
||||||
|
|
||||||
Ping(PingMessage),
|
Ping(PingMessage),
|
||||||
AdvertiseNode(AdvertiseNodeMessage),
|
PullStatus,
|
||||||
|
PullConfig,
|
||||||
|
AdvertiseNodesUp(Vec<AdvertisedNode>),
|
||||||
|
AdvertiseConfig(NetworkConfig),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
@ -17,12 +20,12 @@ pub struct PingMessage {
|
||||||
pub id: UUID,
|
pub id: UUID,
|
||||||
pub rpc_port: u16,
|
pub rpc_port: u16,
|
||||||
|
|
||||||
pub present_hash: Hash,
|
pub status_hash: Hash,
|
||||||
pub config_version: u64,
|
pub config_version: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct AdvertiseNodeMessage {
|
pub struct AdvertisedNode {
|
||||||
pub id: UUID,
|
pub id: UUID,
|
||||||
pub addr: SocketAddr,
|
pub addr: SocketAddr,
|
||||||
}
|
}
|
||||||
|
|
|
@ -133,7 +133,11 @@ async fn handler(sys: Arc<System>, req: Request<Body>, addr: SocketAddr) -> Resu
|
||||||
|
|
||||||
let resp = err_to_msg(match &msg {
|
let resp = err_to_msg(match &msg {
|
||||||
Message::Ping(ping) => sys.handle_ping(&addr, ping).await,
|
Message::Ping(ping) => sys.handle_ping(&addr, ping).await,
|
||||||
Message::AdvertiseNode(adv) => sys.handle_advertise_node(adv).await,
|
Message::PullStatus => sys.handle_pull_status().await,
|
||||||
|
Message::PullConfig => sys.handle_pull_config().await,
|
||||||
|
Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await,
|
||||||
|
Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await,
|
||||||
|
|
||||||
_ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))),
|
_ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue