Some refactoring
This commit is contained in:
parent
3bcbbe1e31
commit
80892df8cc
8 changed files with 141 additions and 120 deletions
|
@ -21,6 +21,7 @@ use garage_util::data::*;
|
||||||
use garage_util::error::Error;
|
use garage_util::error::Error;
|
||||||
|
|
||||||
use garage_rpc::membership::*;
|
use garage_rpc::membership::*;
|
||||||
|
use garage_rpc::ring::*;
|
||||||
use garage_rpc::rpc_client::*;
|
use garage_rpc::rpc_client::*;
|
||||||
|
|
||||||
use admin_rpc::*;
|
use admin_rpc::*;
|
||||||
|
|
|
@ -2,7 +2,11 @@
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
|
||||||
pub mod consul;
|
pub mod consul;
|
||||||
|
pub(crate) mod tls_util;
|
||||||
|
|
||||||
|
pub mod ring;
|
||||||
pub mod membership;
|
pub mod membership;
|
||||||
|
|
||||||
pub mod rpc_client;
|
pub mod rpc_client;
|
||||||
pub mod rpc_server;
|
pub mod rpc_server;
|
||||||
pub(crate) mod tls_util;
|
|
||||||
|
|
|
@ -1,6 +1,4 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::hash::Hash as StdHash;
|
|
||||||
use std::hash::Hasher;
|
|
||||||
use std::io::{Read, Write};
|
use std::io::{Read, Write};
|
||||||
use std::net::{IpAddr, SocketAddr};
|
use std::net::{IpAddr, SocketAddr};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
@ -24,6 +22,7 @@ use garage_util::error::Error;
|
||||||
use crate::consul::get_consul_nodes;
|
use crate::consul::get_consul_nodes;
|
||||||
use crate::rpc_client::*;
|
use crate::rpc_client::*;
|
||||||
use crate::rpc_server::*;
|
use crate::rpc_server::*;
|
||||||
|
use crate::ring::*;
|
||||||
|
|
||||||
const PING_INTERVAL: Duration = Duration::from_secs(10);
|
const PING_INTERVAL: Duration = Duration::from_secs(10);
|
||||||
const CONSUL_INTERVAL: Duration = Duration::from_secs(60);
|
const CONSUL_INTERVAL: Duration = Duration::from_secs(60);
|
||||||
|
@ -66,19 +65,6 @@ pub struct AdvertisedNode {
|
||||||
pub state_info: StateInfo,
|
pub state_info: StateInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
|
||||||
pub struct NetworkConfig {
|
|
||||||
pub members: HashMap<UUID, NetworkConfigEntry>,
|
|
||||||
pub version: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
|
||||||
pub struct NetworkConfigEntry {
|
|
||||||
pub datacenter: String,
|
|
||||||
pub n_tokens: u32,
|
|
||||||
pub tag: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct System {
|
pub struct System {
|
||||||
pub id: UUID,
|
pub id: UUID,
|
||||||
|
|
||||||
|
@ -90,7 +76,7 @@ pub struct System {
|
||||||
rpc_http_client: Arc<RpcHttpClient>,
|
rpc_http_client: Arc<RpcHttpClient>,
|
||||||
rpc_client: Arc<RpcClient<Message>>,
|
rpc_client: Arc<RpcClient<Message>>,
|
||||||
|
|
||||||
pub status: watch::Receiver<Arc<Status>>,
|
pub(crate) status: watch::Receiver<Arc<Status>>,
|
||||||
pub ring: watch::Receiver<Arc<Ring>>,
|
pub ring: watch::Receiver<Arc<Ring>>,
|
||||||
|
|
||||||
update_lock: Mutex<(watch::Sender<Arc<Status>>, watch::Sender<Arc<Ring>>)>,
|
update_lock: Mutex<(watch::Sender<Arc<Status>>, watch::Sender<Arc<Ring>>)>,
|
||||||
|
@ -123,20 +109,6 @@ pub struct StateInfo {
|
||||||
pub hostname: String,
|
pub hostname: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Ring {
|
|
||||||
pub config: NetworkConfig,
|
|
||||||
pub ring: Vec<RingEntry>,
|
|
||||||
pub n_datacenters: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct RingEntry {
|
|
||||||
pub location: Hash,
|
|
||||||
pub node: UUID,
|
|
||||||
pub datacenter: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Status {
|
impl Status {
|
||||||
fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
|
fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
|
||||||
let addr = SocketAddr::new(ip, info.rpc_port);
|
let addr = SocketAddr::new(ip, info.rpc_port);
|
||||||
|
@ -175,86 +147,6 @@ impl Status {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Ring {
|
|
||||||
fn rebuild_ring(&mut self) {
|
|
||||||
let mut new_ring = vec![];
|
|
||||||
let mut datacenters = vec![];
|
|
||||||
|
|
||||||
for (id, config) in self.config.members.iter() {
|
|
||||||
let mut dc_hasher = std::collections::hash_map::DefaultHasher::new();
|
|
||||||
config.datacenter.hash(&mut dc_hasher);
|
|
||||||
let datacenter = dc_hasher.finish();
|
|
||||||
|
|
||||||
if !datacenters.contains(&datacenter) {
|
|
||||||
datacenters.push(datacenter);
|
|
||||||
}
|
|
||||||
|
|
||||||
for i in 0..config.n_tokens {
|
|
||||||
let location = hash(format!("{} {}", hex::encode(&id), i).as_bytes());
|
|
||||||
|
|
||||||
new_ring.push(RingEntry {
|
|
||||||
location: location.into(),
|
|
||||||
node: *id,
|
|
||||||
datacenter,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location));
|
|
||||||
self.ring = new_ring;
|
|
||||||
self.n_datacenters = datacenters.len();
|
|
||||||
|
|
||||||
// eprintln!("RING: --");
|
|
||||||
// for e in self.ring.iter() {
|
|
||||||
// eprintln!("{:?}", e);
|
|
||||||
// }
|
|
||||||
// eprintln!("END --");
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
|
|
||||||
if n >= self.config.members.len() {
|
|
||||||
return self.config.members.keys().cloned().collect::<Vec<_>>();
|
|
||||||
}
|
|
||||||
|
|
||||||
let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) {
|
|
||||||
Ok(i) => i,
|
|
||||||
Err(i) => {
|
|
||||||
if i == 0 {
|
|
||||||
self.ring.len() - 1
|
|
||||||
} else {
|
|
||||||
i - 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
self.walk_ring_from_pos(start, n)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> {
|
|
||||||
if n >= self.config.members.len() {
|
|
||||||
return self.config.members.keys().cloned().collect::<Vec<_>>();
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut ret = vec![];
|
|
||||||
let mut datacenters = vec![];
|
|
||||||
|
|
||||||
let mut delta = 0;
|
|
||||||
while ret.len() < n {
|
|
||||||
let i = (start + delta) % self.ring.len();
|
|
||||||
delta += 1;
|
|
||||||
|
|
||||||
if !datacenters.contains(&self.ring[i].datacenter) {
|
|
||||||
ret.push(self.ring[i].node);
|
|
||||||
datacenters.push(self.ring[i].datacenter);
|
|
||||||
} else if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) {
|
|
||||||
ret.push(self.ring[i].node);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ret
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
|
fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
|
||||||
let mut id_file = metadata_dir.clone();
|
let mut id_file = metadata_dir.clone();
|
||||||
id_file.push("node_id");
|
id_file.push("node_id");
|
||||||
|
@ -312,10 +204,7 @@ impl System {
|
||||||
"No valid previous network configuration stored ({}), starting fresh.",
|
"No valid previous network configuration stored ({}), starting fresh.",
|
||||||
e
|
e
|
||||||
);
|
);
|
||||||
NetworkConfig {
|
NetworkConfig::new()
|
||||||
members: HashMap::new(),
|
|
||||||
version: 0,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let mut status = Status {
|
let mut status = Status {
|
||||||
|
@ -641,9 +530,11 @@ impl System {
|
||||||
adv: &NetworkConfig,
|
adv: &NetworkConfig,
|
||||||
) -> Result<Message, Error> {
|
) -> Result<Message, Error> {
|
||||||
let update_lock = self.update_lock.lock().await;
|
let update_lock = self.update_lock.lock().await;
|
||||||
let mut ring: Ring = self.ring.borrow().as_ref().clone();
|
let ring: Arc<Ring> = self.ring.borrow().clone();
|
||||||
|
|
||||||
if adv.version > ring.config.version {
|
if adv.version > ring.config.version {
|
||||||
|
let mut ring = ring.as_ref().clone();
|
||||||
|
|
||||||
ring.config = adv.clone();
|
ring.config = adv.clone();
|
||||||
ring.rebuild_ring();
|
ring.rebuild_ring();
|
||||||
update_lock.1.broadcast(Arc::new(ring))?;
|
update_lock.1.broadcast(Arc::new(ring))?;
|
||||||
|
|
122
src/rpc/ring.rs
Normal file
122
src/rpc/ring.rs
Normal file
|
@ -0,0 +1,122 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use garage_util::data::*;
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct NetworkConfig {
|
||||||
|
pub members: HashMap<UUID, NetworkConfigEntry>,
|
||||||
|
pub version: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NetworkConfig {
|
||||||
|
pub(crate) fn new() -> Self {
|
||||||
|
Self{
|
||||||
|
members: HashMap::new(),
|
||||||
|
version: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct NetworkConfigEntry {
|
||||||
|
pub datacenter: String,
|
||||||
|
pub n_tokens: u32,
|
||||||
|
pub tag: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Ring {
|
||||||
|
pub config: NetworkConfig,
|
||||||
|
pub ring: Vec<RingEntry>,
|
||||||
|
pub n_datacenters: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct RingEntry {
|
||||||
|
pub location: Hash,
|
||||||
|
pub node: UUID,
|
||||||
|
datacenter: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Ring {
|
||||||
|
pub(crate) fn rebuild_ring(&mut self) {
|
||||||
|
let mut new_ring = vec![];
|
||||||
|
let mut datacenters = vec![];
|
||||||
|
|
||||||
|
for (id, config) in self.config.members.iter() {
|
||||||
|
let datacenter = &config.datacenter;
|
||||||
|
|
||||||
|
if !datacenters.contains(datacenter) {
|
||||||
|
datacenters.push(datacenter.to_string());
|
||||||
|
}
|
||||||
|
let datacenter_idx = datacenters.iter().enumerate().find(|(_, dc)| *dc == datacenter).unwrap().0;
|
||||||
|
|
||||||
|
for i in 0..config.n_tokens {
|
||||||
|
let location = hash(format!("{} {}", hex::encode(&id), i).as_bytes());
|
||||||
|
|
||||||
|
new_ring.push(RingEntry {
|
||||||
|
location: location.into(),
|
||||||
|
node: *id,
|
||||||
|
datacenter: datacenter_idx,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location));
|
||||||
|
self.ring = new_ring;
|
||||||
|
self.n_datacenters = datacenters.len();
|
||||||
|
|
||||||
|
// eprintln!("RING: --");
|
||||||
|
// for e in self.ring.iter() {
|
||||||
|
// eprintln!("{:?}", e);
|
||||||
|
// }
|
||||||
|
// eprintln!("END --");
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
|
||||||
|
if n >= self.config.members.len() {
|
||||||
|
return self.config.members.keys().cloned().collect::<Vec<_>>();
|
||||||
|
}
|
||||||
|
|
||||||
|
let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) {
|
||||||
|
Ok(i) => i,
|
||||||
|
Err(i) => {
|
||||||
|
if i == 0 {
|
||||||
|
self.ring.len() - 1
|
||||||
|
} else {
|
||||||
|
i - 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
self.walk_ring_from_pos(start, n)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> {
|
||||||
|
if n >= self.config.members.len() {
|
||||||
|
return self.config.members.keys().cloned().collect::<Vec<_>>();
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut ret = vec![];
|
||||||
|
let mut datacenters = vec![];
|
||||||
|
|
||||||
|
let mut delta = 0;
|
||||||
|
while ret.len() < n {
|
||||||
|
let i = (start + delta) % self.ring.len();
|
||||||
|
delta += 1;
|
||||||
|
|
||||||
|
if !datacenters.contains(&self.ring[i].datacenter) {
|
||||||
|
ret.push(self.ring[i].node);
|
||||||
|
datacenters.push(self.ring[i].datacenter);
|
||||||
|
} else if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) {
|
||||||
|
ret.push(self.ring[i].node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ret
|
||||||
|
}
|
||||||
|
}
|
|
@ -12,7 +12,8 @@ use serde_bytes::ByteBuf;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::Error;
|
use garage_util::error::Error;
|
||||||
|
|
||||||
use garage_rpc::membership::{Ring, System};
|
use garage_rpc::membership::System;
|
||||||
|
use garage_rpc::ring::Ring;
|
||||||
use garage_rpc::rpc_client::*;
|
use garage_rpc::rpc_client::*;
|
||||||
use garage_rpc::rpc_server::*;
|
use garage_rpc::rpc_server::*;
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
use arc_swap::ArcSwapOption;
|
use arc_swap::ArcSwapOption;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use garage_rpc::membership::{Ring, System};
|
use garage_rpc::membership::{System};
|
||||||
|
use garage_rpc::ring::Ring;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
|
||||||
use crate::*;
|
use crate::*;
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
use garage_rpc::membership::{Ring, System};
|
use garage_rpc::membership::{System};
|
||||||
|
use garage_rpc::ring::Ring;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
|
||||||
use crate::*;
|
use crate::*;
|
||||||
|
|
|
@ -12,7 +12,7 @@ use serde_bytes::ByteBuf;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tokio::sync::{mpsc, watch};
|
use tokio::sync::{mpsc, watch};
|
||||||
|
|
||||||
use garage_rpc::membership::Ring;
|
use garage_rpc::ring::Ring;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::Error;
|
use garage_util::error::Error;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue