Keep network status & ring in a tokio::sync::watch
advantages - reads don't prevent preparing writes - can be followed from other parts of the system by cloning the receiver
This commit is contained in:
parent
5dd59e437d
commit
9c931f5eda
9 changed files with 165 additions and 133 deletions
|
@ -212,9 +212,9 @@ async fn put_block_meta(
|
||||||
async fn put_block(garage: Arc<Garage>, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
|
async fn put_block(garage: Arc<Garage>, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
|
||||||
let who = garage
|
let who = garage
|
||||||
.system
|
.system
|
||||||
.members
|
.ring
|
||||||
.read()
|
.borrow()
|
||||||
.await
|
.clone()
|
||||||
.walk_ring(&hash, garage.system.config.meta_replication_factor);
|
.walk_ring(&hash, garage.system.config.meta_replication_factor);
|
||||||
rpc_try_call_many(
|
rpc_try_call_many(
|
||||||
garage.system.clone(),
|
garage.system.clone(),
|
||||||
|
@ -359,9 +359,9 @@ async fn handle_get(
|
||||||
async fn get_block(garage: Arc<Garage>, hash: &Hash) -> Result<Vec<u8>, Error> {
|
async fn get_block(garage: Arc<Garage>, hash: &Hash) -> Result<Vec<u8>, Error> {
|
||||||
let who = garage
|
let who = garage
|
||||||
.system
|
.system
|
||||||
.members
|
.ring
|
||||||
.read()
|
.borrow()
|
||||||
.await
|
.clone()
|
||||||
.walk_ring(&hash, garage.system.config.meta_replication_factor);
|
.walk_ring(&hash, garage.system.config.meta_replication_factor);
|
||||||
let resps = rpc_try_call_many(
|
let resps = rpc_try_call_many(
|
||||||
garage.system.clone(),
|
garage.system.clone(),
|
||||||
|
|
16
src/block.rs
16
src/block.rs
|
@ -1,15 +1,14 @@
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
use futures_util::future::*;
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use tokio::prelude::*;
|
use tokio::prelude::*;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use futures_util::future::*;
|
|
||||||
|
|
||||||
|
use crate::background::*;
|
||||||
use crate::data::*;
|
use crate::data::*;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::proto::*;
|
use crate::proto::*;
|
||||||
use crate::background::*;
|
|
||||||
|
|
||||||
|
|
||||||
pub struct BlockManager {
|
pub struct BlockManager {
|
||||||
pub data_dir: PathBuf,
|
pub data_dir: PathBuf,
|
||||||
|
@ -19,7 +18,8 @@ pub struct BlockManager {
|
||||||
|
|
||||||
impl BlockManager {
|
impl BlockManager {
|
||||||
pub fn new(db: &sled::Db, data_dir: PathBuf) -> Self {
|
pub fn new(db: &sled::Db, data_dir: PathBuf) -> Self {
|
||||||
let rc = db.open_tree("block_local_rc")
|
let rc = db
|
||||||
|
.open_tree("block_local_rc")
|
||||||
.expect("Unable to open block_local_rc tree");
|
.expect("Unable to open block_local_rc tree");
|
||||||
rc.set_merge_operator(rc_merge);
|
rc.set_merge_operator(rc_merge);
|
||||||
Self {
|
Self {
|
||||||
|
@ -81,18 +81,20 @@ impl BlockManager {
|
||||||
background.spawn(tokio::fs::remove_file(path).map_err(Into::into));
|
background.spawn(tokio::fs::remove_file(path).map_err(Into::into));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
Some(_) => Ok(())
|
Some(_) => Ok(()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> {
|
fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> {
|
||||||
let old = old.map(|x| {
|
let old = old
|
||||||
|
.map(|x| {
|
||||||
assert!(x.len() == 8);
|
assert!(x.len() == 8);
|
||||||
let mut x8 = [0u8; 8];
|
let mut x8 = [0u8; 8];
|
||||||
x8.copy_from_slice(x);
|
x8.copy_from_slice(x);
|
||||||
u64::from_be_bytes(x8)
|
u64::from_be_bytes(x8)
|
||||||
}).unwrap_or(0);
|
})
|
||||||
|
.unwrap_or(0);
|
||||||
assert!(new.len() == 1);
|
assert!(new.len() == 1);
|
||||||
let new = match new[0] {
|
let new = match new[0] {
|
||||||
0 => {
|
0 => {
|
||||||
|
|
|
@ -55,8 +55,11 @@ impl TableFormat for BlockRefTable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if was_before && !is_after {
|
if was_before && !is_after {
|
||||||
if let Err(e) = garage.block_manager.block_decref(&new.block, &garage.background) {
|
if let Err(e) = garage
|
||||||
eprintln!("Failed to decref or delete block {:?}: {}", &new.block, e);
|
.block_manager
|
||||||
|
.block_decref(&new.block, &garage.background)
|
||||||
|
{
|
||||||
|
eprintln!("Failed to decref block {:?}: {}", &new.block, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,7 @@ use futures_util::future::*;
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
use tokio::prelude::*;
|
use tokio::prelude::*;
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
use crate::background::BackgroundRunner;
|
use crate::background::BackgroundRunner;
|
||||||
use crate::data::*;
|
use crate::data::*;
|
||||||
|
@ -32,36 +32,44 @@ pub struct System {
|
||||||
|
|
||||||
pub rpc_client: RpcClient,
|
pub rpc_client: RpcClient,
|
||||||
|
|
||||||
pub members: RwLock<Members>,
|
pub status: watch::Receiver<Arc<Status>>,
|
||||||
|
pub ring: watch::Receiver<Arc<Ring>>,
|
||||||
|
|
||||||
|
update_lock: Mutex<(watch::Sender<Arc<Status>>, watch::Sender<Arc<Ring>>)>,
|
||||||
|
|
||||||
pub background: Arc<BackgroundRunner>,
|
pub background: Arc<BackgroundRunner>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Members {
|
#[derive(Debug, Clone)]
|
||||||
pub status: HashMap<UUID, NodeStatus>,
|
pub struct Status {
|
||||||
pub status_hash: Hash,
|
pub nodes: HashMap<UUID, NodeStatus>,
|
||||||
|
pub hash: Hash,
|
||||||
pub config: NetworkConfig,
|
|
||||||
pub ring: Vec<RingEntry>,
|
|
||||||
pub n_datacenters: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
pub struct NodeStatus {
|
pub struct NodeStatus {
|
||||||
pub addr: SocketAddr,
|
pub addr: SocketAddr,
|
||||||
pub remaining_ping_attempts: usize,
|
pub remaining_ping_attempts: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Clone)]
|
||||||
|
pub struct Ring {
|
||||||
|
pub config: NetworkConfig,
|
||||||
|
pub ring: Vec<RingEntry>,
|
||||||
|
pub n_datacenters: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
pub struct RingEntry {
|
pub struct RingEntry {
|
||||||
pub location: Hash,
|
pub location: Hash,
|
||||||
pub node: UUID,
|
pub node: UUID,
|
||||||
pub datacenter: u64,
|
pub datacenter: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Members {
|
impl Status {
|
||||||
fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
|
fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
|
||||||
let addr = SocketAddr::new(ip, info.rpc_port);
|
let addr = SocketAddr::new(ip, info.rpc_port);
|
||||||
let old_status = self.status.insert(
|
let old_status = self.nodes.insert(
|
||||||
info.id.clone(),
|
info.id.clone(),
|
||||||
NodeStatus {
|
NodeStatus {
|
||||||
addr: addr.clone(),
|
addr: addr.clone(),
|
||||||
|
@ -77,8 +85,8 @@ impl Members {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn recalculate_status_hash(&mut self) {
|
fn recalculate_hash(&mut self) {
|
||||||
let mut nodes = self.status.iter().collect::<Vec<_>>();
|
let mut nodes = self.nodes.iter().collect::<Vec<_>>();
|
||||||
nodes.sort_unstable_by_key(|(id, _status)| *id);
|
nodes.sort_unstable_by_key(|(id, _status)| *id);
|
||||||
|
|
||||||
let mut hasher = Sha256::new();
|
let mut hasher = Sha256::new();
|
||||||
|
@ -88,11 +96,13 @@ impl Members {
|
||||||
hasher.input(format!("{} {}\n", hex::encode(&id), status.addr));
|
hasher.input(format!("{} {}\n", hex::encode(&id), status.addr));
|
||||||
}
|
}
|
||||||
eprintln!("END --");
|
eprintln!("END --");
|
||||||
self.status_hash
|
self.hash
|
||||||
.as_slice_mut()
|
.as_slice_mut()
|
||||||
.copy_from_slice(&hasher.result()[..]);
|
.copy_from_slice(&hasher.result()[..]);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Ring {
|
||||||
fn rebuild_ring(&mut self) {
|
fn rebuild_ring(&mut self) {
|
||||||
let mut new_ring = vec![];
|
let mut new_ring = vec![];
|
||||||
let mut datacenters = vec![];
|
let mut datacenters = vec![];
|
||||||
|
@ -201,20 +211,28 @@ impl System {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let mut members = Members {
|
let mut status = Status {
|
||||||
status: HashMap::new(),
|
nodes: HashMap::new(),
|
||||||
status_hash: Hash::default(),
|
hash: Hash::default(),
|
||||||
|
};
|
||||||
|
status.recalculate_hash();
|
||||||
|
let (update_status, status) = watch::channel(Arc::new(status));
|
||||||
|
|
||||||
|
let mut ring = Ring {
|
||||||
config: net_config,
|
config: net_config,
|
||||||
ring: Vec::new(),
|
ring: Vec::new(),
|
||||||
n_datacenters: 0,
|
n_datacenters: 0,
|
||||||
};
|
};
|
||||||
members.recalculate_status_hash();
|
ring.rebuild_ring();
|
||||||
members.rebuild_ring();
|
let (update_ring, ring) = watch::channel(Arc::new(ring));
|
||||||
|
|
||||||
System {
|
System {
|
||||||
config,
|
config,
|
||||||
id,
|
id,
|
||||||
rpc_client: RpcClient::new(),
|
rpc_client: RpcClient::new(),
|
||||||
members: RwLock::new(members),
|
status,
|
||||||
|
ring,
|
||||||
|
update_lock: Mutex::new((update_status, update_ring)),
|
||||||
background,
|
background,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -223,37 +241,33 @@ impl System {
|
||||||
let mut path = self.config.metadata_dir.clone();
|
let mut path = self.config.metadata_dir.clone();
|
||||||
path.push("network_config");
|
path.push("network_config");
|
||||||
|
|
||||||
let members = self.members.read().await;
|
let ring = self.ring.borrow().clone();
|
||||||
let data =
|
let data = rmp_to_vec_all_named(&ring.config)?;
|
||||||
rmp_to_vec_all_named(&members.config)?;
|
|
||||||
drop(members);
|
|
||||||
|
|
||||||
let mut f = tokio::fs::File::create(path.as_path())
|
let mut f = tokio::fs::File::create(path.as_path()).await?;
|
||||||
.await?;
|
f.write_all(&data[..]).await?;
|
||||||
f.write_all(&data[..])
|
|
||||||
.await?;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn make_ping(&self) -> Message {
|
pub fn make_ping(&self) -> Message {
|
||||||
let members = self.members.read().await;
|
let status = self.status.borrow().clone();
|
||||||
|
let ring = self.ring.borrow().clone();
|
||||||
Message::Ping(PingMessage {
|
Message::Ping(PingMessage {
|
||||||
id: self.id.clone(),
|
id: self.id.clone(),
|
||||||
rpc_port: self.config.rpc_port,
|
rpc_port: self.config.rpc_port,
|
||||||
status_hash: members.status_hash.clone(),
|
status_hash: status.hash.clone(),
|
||||||
config_version: members.config.version,
|
config_version: ring.config.version,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) {
|
pub async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) {
|
||||||
let members = self.members.read().await;
|
let status = self.status.borrow().clone();
|
||||||
let to = members
|
let to = status
|
||||||
.status
|
.nodes
|
||||||
.keys()
|
.keys()
|
||||||
.filter(|x| **x != self.id)
|
.filter(|x| **x != self.id)
|
||||||
.cloned()
|
.cloned()
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
drop(members);
|
|
||||||
rpc_call_many(self.clone(), &to[..], &msg, timeout).await;
|
rpc_call_many(self.clone(), &to[..], &msg, timeout).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -273,7 +287,7 @@ impl System {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
|
pub async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
|
||||||
let ping_msg = self.make_ping().await;
|
let ping_msg = self.make_ping();
|
||||||
let ping_resps = join_all(peers.iter().map(|(addr, id_option)| {
|
let ping_resps = join_all(peers.iter().map(|(addr, id_option)| {
|
||||||
let sys = self.clone();
|
let sys = self.clone();
|
||||||
let ping_msg_ref = &ping_msg;
|
let ping_msg_ref = &ping_msg;
|
||||||
|
@ -287,14 +301,16 @@ impl System {
|
||||||
}))
|
}))
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let mut members = self.members.write().await;
|
let update_locked = self.update_lock.lock().await;
|
||||||
|
let mut status: Status = self.status.borrow().as_ref().clone();
|
||||||
|
let ring = self.ring.borrow().clone();
|
||||||
|
|
||||||
let mut has_changes = false;
|
let mut has_changes = false;
|
||||||
let mut to_advertise = vec![];
|
let mut to_advertise = vec![];
|
||||||
|
|
||||||
for (id_option, addr, ping_resp) in ping_resps {
|
for (id_option, addr, ping_resp) in ping_resps {
|
||||||
if let Ok(Message::Ping(info)) = ping_resp {
|
if let Ok(Message::Ping(info)) = ping_resp {
|
||||||
let is_new = members.handle_ping(addr.ip(), &info);
|
let is_new = status.handle_ping(addr.ip(), &info);
|
||||||
if is_new {
|
if is_new {
|
||||||
has_changes = true;
|
has_changes = true;
|
||||||
to_advertise.push(AdvertisedNode {
|
to_advertise.push(AdvertisedNode {
|
||||||
|
@ -302,17 +318,17 @@ impl System {
|
||||||
addr: addr.clone(),
|
addr: addr.clone(),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
if is_new || members.status_hash != info.status_hash {
|
if is_new || status.hash != info.status_hash {
|
||||||
self.background
|
self.background
|
||||||
.spawn_cancellable(self.clone().pull_status(info.id.clone()).map(Ok));
|
.spawn_cancellable(self.clone().pull_status(info.id.clone()).map(Ok));
|
||||||
}
|
}
|
||||||
if is_new || members.config.version < info.config_version {
|
if is_new || ring.config.version < info.config_version {
|
||||||
self.background
|
self.background
|
||||||
.spawn_cancellable(self.clone().pull_config(info.id.clone()).map(Ok));
|
.spawn_cancellable(self.clone().pull_config(info.id.clone()).map(Ok));
|
||||||
}
|
}
|
||||||
} else if let Some(id) = id_option {
|
} else if let Some(id) = id_option {
|
||||||
let remaining_attempts = members
|
let remaining_attempts = status
|
||||||
.status
|
.nodes
|
||||||
.get(id)
|
.get(id)
|
||||||
.map(|x| x.remaining_ping_attempts)
|
.map(|x| x.remaining_ping_attempts)
|
||||||
.unwrap_or(0);
|
.unwrap_or(0);
|
||||||
|
@ -321,19 +337,22 @@ impl System {
|
||||||
"Removing node {} after too many failed pings",
|
"Removing node {} after too many failed pings",
|
||||||
hex::encode(&id)
|
hex::encode(&id)
|
||||||
);
|
);
|
||||||
members.status.remove(&id);
|
status.nodes.remove(&id);
|
||||||
has_changes = true;
|
has_changes = true;
|
||||||
} else {
|
} else {
|
||||||
if let Some(st) = members.status.get_mut(id) {
|
if let Some(st) = status.nodes.get_mut(id) {
|
||||||
st.remaining_ping_attempts = remaining_attempts - 1;
|
st.remaining_ping_attempts = remaining_attempts - 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if has_changes {
|
if has_changes {
|
||||||
members.recalculate_status_hash();
|
status.recalculate_hash();
|
||||||
}
|
}
|
||||||
drop(members);
|
if let Err(e) = update_locked.0.broadcast(Arc::new(status)) {
|
||||||
|
eprintln!("In ping_nodes: could not save status update ({})", e);
|
||||||
|
}
|
||||||
|
drop(update_locked);
|
||||||
|
|
||||||
if to_advertise.len() > 0 {
|
if to_advertise.len() > 0 {
|
||||||
self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT)
|
self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT)
|
||||||
|
@ -346,29 +365,35 @@ impl System {
|
||||||
from: &SocketAddr,
|
from: &SocketAddr,
|
||||||
ping: &PingMessage,
|
ping: &PingMessage,
|
||||||
) -> Result<Message, Error> {
|
) -> Result<Message, Error> {
|
||||||
let mut members = self.members.write().await;
|
let update_locked = self.update_lock.lock().await;
|
||||||
let is_new = members.handle_ping(from.ip(), ping);
|
let mut status: Status = self.status.borrow().as_ref().clone();
|
||||||
|
|
||||||
|
let is_new = status.handle_ping(from.ip(), ping);
|
||||||
if is_new {
|
if is_new {
|
||||||
members.recalculate_status_hash();
|
status.recalculate_hash();
|
||||||
}
|
}
|
||||||
let status_hash = members.status_hash.clone();
|
let status_hash = status.hash.clone();
|
||||||
let config_version = members.config.version;
|
let config_version = self.ring.borrow().config.version;
|
||||||
drop(members);
|
|
||||||
|
update_locked.0.broadcast(Arc::new(status))?;
|
||||||
|
drop(update_locked);
|
||||||
|
|
||||||
if is_new || status_hash != ping.status_hash {
|
if is_new || status_hash != ping.status_hash {
|
||||||
self.background.spawn_cancellable(self.clone().pull_status(ping.id.clone()).map(Ok));
|
self.background
|
||||||
|
.spawn_cancellable(self.clone().pull_status(ping.id.clone()).map(Ok));
|
||||||
}
|
}
|
||||||
if is_new || config_version < ping.config_version {
|
if is_new || config_version < ping.config_version {
|
||||||
self.background.spawn_cancellable(self.clone().pull_config(ping.id.clone()).map(Ok));
|
self.background
|
||||||
|
.spawn_cancellable(self.clone().pull_config(ping.id.clone()).map(Ok));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(self.make_ping().await)
|
Ok(self.make_ping())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_pull_status(&self) -> Result<Message, Error> {
|
pub fn handle_pull_status(&self) -> Result<Message, Error> {
|
||||||
let members = self.members.read().await;
|
let status = self.status.borrow().clone();
|
||||||
let mut mem = vec![];
|
let mut mem = vec![];
|
||||||
for (node, status) in members.status.iter() {
|
for (node, status) in status.nodes.iter() {
|
||||||
mem.push(AdvertisedNode {
|
mem.push(AdvertisedNode {
|
||||||
id: node.clone(),
|
id: node.clone(),
|
||||||
addr: status.addr.clone(),
|
addr: status.addr.clone(),
|
||||||
|
@ -377,9 +402,9 @@ impl System {
|
||||||
Ok(Message::AdvertiseNodesUp(mem))
|
Ok(Message::AdvertiseNodesUp(mem))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_pull_config(&self) -> Result<Message, Error> {
|
pub fn handle_pull_config(&self) -> Result<Message, Error> {
|
||||||
let members = self.members.read().await;
|
let ring = self.ring.borrow().clone();
|
||||||
Ok(Message::AdvertiseConfig(members.config.clone()))
|
Ok(Message::AdvertiseConfig(ring.config.clone()))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_advertise_nodes_up(
|
pub async fn handle_advertise_nodes_up(
|
||||||
|
@ -388,14 +413,15 @@ impl System {
|
||||||
) -> Result<Message, Error> {
|
) -> Result<Message, Error> {
|
||||||
let mut to_ping = vec![];
|
let mut to_ping = vec![];
|
||||||
|
|
||||||
let mut members = self.members.write().await;
|
let update_lock = self.update_lock.lock().await;
|
||||||
|
let mut status: Status = self.status.borrow().as_ref().clone();
|
||||||
let mut has_changed = false;
|
let mut has_changed = false;
|
||||||
|
|
||||||
for node in adv.iter() {
|
for node in adv.iter() {
|
||||||
if node.id == self.id {
|
if node.id == self.id {
|
||||||
// learn our own ip address
|
// learn our own ip address
|
||||||
let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port);
|
let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port);
|
||||||
let old_self = members.status.insert(
|
let old_self = status.nodes.insert(
|
||||||
node.id.clone(),
|
node.id.clone(),
|
||||||
NodeStatus {
|
NodeStatus {
|
||||||
addr: self_addr,
|
addr: self_addr,
|
||||||
|
@ -406,17 +432,19 @@ impl System {
|
||||||
None => true,
|
None => true,
|
||||||
Some(x) => x.addr != self_addr,
|
Some(x) => x.addr != self_addr,
|
||||||
};
|
};
|
||||||
} else if !members.status.contains_key(&node.id) {
|
} else if !status.nodes.contains_key(&node.id) {
|
||||||
to_ping.push((node.addr.clone(), Some(node.id.clone())));
|
to_ping.push((node.addr.clone(), Some(node.id.clone())));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if has_changed {
|
if has_changed {
|
||||||
members.recalculate_status_hash();
|
status.recalculate_hash();
|
||||||
}
|
}
|
||||||
drop(members);
|
update_lock.0.broadcast(Arc::new(status))?;
|
||||||
|
drop(update_lock);
|
||||||
|
|
||||||
if to_ping.len() > 0 {
|
if to_ping.len() > 0 {
|
||||||
self.background.spawn_cancellable(self.clone().ping_nodes(to_ping).map(Ok));
|
self.background
|
||||||
|
.spawn_cancellable(self.clone().ping_nodes(to_ping).map(Ok));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Message::Ok)
|
Ok(Message::Ok)
|
||||||
|
@ -426,10 +454,14 @@ impl System {
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
adv: &NetworkConfig,
|
adv: &NetworkConfig,
|
||||||
) -> Result<Message, Error> {
|
) -> Result<Message, Error> {
|
||||||
let mut members = self.members.write().await;
|
let mut ring: Ring = self.ring.borrow().as_ref().clone();
|
||||||
if adv.version > members.config.version {
|
let update_lock = self.update_lock.lock().await;
|
||||||
members.config = adv.clone();
|
|
||||||
members.rebuild_ring();
|
if adv.version > ring.config.version {
|
||||||
|
ring.config = adv.clone();
|
||||||
|
ring.rebuild_ring();
|
||||||
|
update_lock.1.broadcast(Arc::new(ring))?;
|
||||||
|
drop(update_lock);
|
||||||
|
|
||||||
self.background.spawn_cancellable(
|
self.background.spawn_cancellable(
|
||||||
self.clone()
|
self.clone()
|
||||||
|
@ -446,14 +478,13 @@ impl System {
|
||||||
loop {
|
loop {
|
||||||
let restart_at = tokio::time::delay_for(PING_INTERVAL);
|
let restart_at = tokio::time::delay_for(PING_INTERVAL);
|
||||||
|
|
||||||
let members = self.members.read().await;
|
let status = self.status.borrow().clone();
|
||||||
let ping_addrs = members
|
let ping_addrs = status
|
||||||
.status
|
.nodes
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|(id, _)| **id != self.id)
|
.filter(|(id, _)| **id != self.id)
|
||||||
.map(|(id, status)| (status.addr.clone(), Some(id.clone())))
|
.map(|(id, status)| (status.addr.clone(), Some(id.clone())))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
drop(members);
|
|
||||||
|
|
||||||
self.clone().ping_nodes(ping_addrs).await;
|
self.clone().ping_nodes(ping_addrs).await;
|
||||||
|
|
||||||
|
|
|
@ -79,8 +79,8 @@ pub async fn rpc_call(
|
||||||
timeout: Duration,
|
timeout: Duration,
|
||||||
) -> Result<Message, Error> {
|
) -> Result<Message, Error> {
|
||||||
let addr = {
|
let addr = {
|
||||||
let members = sys.members.read().await;
|
let status = sys.status.borrow().clone();
|
||||||
match members.status.get(to) {
|
match status.nodes.get(to) {
|
||||||
Some(status) => status.addr.clone(),
|
Some(status) => status.addr.clone(),
|
||||||
None => return Err(Error::Message(format!("Peer ID not found"))),
|
None => return Err(Error::Message(format!("Peer ID not found"))),
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,8 +59,8 @@ async fn handler(
|
||||||
let resp = err_to_msg(match &msg {
|
let resp = err_to_msg(match &msg {
|
||||||
Message::Ping(ping) => sys.handle_ping(&addr, ping).await,
|
Message::Ping(ping) => sys.handle_ping(&addr, ping).await,
|
||||||
|
|
||||||
Message::PullStatus => sys.handle_pull_status().await,
|
Message::PullStatus => sys.handle_pull_status(),
|
||||||
Message::PullConfig => sys.handle_pull_config().await,
|
Message::PullConfig => sys.handle_pull_config(),
|
||||||
Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await,
|
Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await,
|
||||||
Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await,
|
Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await,
|
||||||
|
|
||||||
|
|
|
@ -10,8 +10,8 @@ use tokio::sync::RwLock;
|
||||||
|
|
||||||
use crate::api_server;
|
use crate::api_server;
|
||||||
use crate::background::*;
|
use crate::background::*;
|
||||||
use crate::data::*;
|
|
||||||
use crate::block::*;
|
use crate::block::*;
|
||||||
|
use crate::data::*;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::membership::System;
|
use crate::membership::System;
|
||||||
use crate::proto::*;
|
use crate::proto::*;
|
||||||
|
|
35
src/table.rs
35
src/table.rs
|
@ -3,9 +3,9 @@ use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
use futures::stream::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_bytes::ByteBuf;
|
use serde_bytes::ByteBuf;
|
||||||
use futures::stream::*;
|
|
||||||
|
|
||||||
use crate::data::*;
|
use crate::data::*;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
|
@ -150,9 +150,9 @@ impl<F: TableFormat + 'static> Table<F> {
|
||||||
let hash = e.partition_key().hash();
|
let hash = e.partition_key().hash();
|
||||||
let who = self
|
let who = self
|
||||||
.system
|
.system
|
||||||
.members
|
.ring
|
||||||
.read()
|
.borrow()
|
||||||
.await
|
.clone()
|
||||||
.walk_ring(&hash, self.param.replication_factor);
|
.walk_ring(&hash, self.param.replication_factor);
|
||||||
eprintln!("insert who: {:?}", who);
|
eprintln!("insert who: {:?}", who);
|
||||||
|
|
||||||
|
@ -171,9 +171,9 @@ impl<F: TableFormat + 'static> Table<F> {
|
||||||
let hash = entry.partition_key().hash();
|
let hash = entry.partition_key().hash();
|
||||||
let who = self
|
let who = self
|
||||||
.system
|
.system
|
||||||
.members
|
.ring
|
||||||
.read()
|
.borrow()
|
||||||
.await
|
.clone()
|
||||||
.walk_ring(&hash, self.param.replication_factor);
|
.walk_ring(&hash, self.param.replication_factor);
|
||||||
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
|
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
|
||||||
for node in who {
|
for node in who {
|
||||||
|
@ -184,18 +184,12 @@ impl<F: TableFormat + 'static> Table<F> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let call_futures = call_list.drain()
|
let call_futures = call_list.drain().map(|(node, entries)| async move {
|
||||||
.map(|(node, entries)| async move {
|
|
||||||
let rpc = TableRPC::<F>::Update(entries);
|
let rpc = TableRPC::<F>::Update(entries);
|
||||||
let rpc_bytes = rmp_to_vec_all_named(&rpc)?;
|
let rpc_bytes = rmp_to_vec_all_named(&rpc)?;
|
||||||
let message = Message::TableRPC(self.name.to_string(), rpc_bytes);
|
let message = Message::TableRPC(self.name.to_string(), rpc_bytes);
|
||||||
|
|
||||||
let resp = rpc_call(
|
let resp = rpc_call(self.system.clone(), &node, &message, self.param.timeout).await?;
|
||||||
self.system.clone(),
|
|
||||||
&node,
|
|
||||||
&message,
|
|
||||||
self.param.timeout
|
|
||||||
).await?;
|
|
||||||
Ok::<_, Error>((node, resp))
|
Ok::<_, Error>((node, resp))
|
||||||
});
|
});
|
||||||
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
|
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
|
||||||
|
@ -217,9 +211,9 @@ impl<F: TableFormat + 'static> Table<F> {
|
||||||
let hash = partition_key.hash();
|
let hash = partition_key.hash();
|
||||||
let who = self
|
let who = self
|
||||||
.system
|
.system
|
||||||
.members
|
.ring
|
||||||
.read()
|
.borrow()
|
||||||
.await
|
.clone()
|
||||||
.walk_ring(&hash, self.param.replication_factor);
|
.walk_ring(&hash, self.param.replication_factor);
|
||||||
eprintln!("get who: {:?}", who);
|
eprintln!("get who: {:?}", who);
|
||||||
|
|
||||||
|
@ -259,10 +253,7 @@ impl<F: TableFormat + 'static> Table<F> {
|
||||||
|
|
||||||
async fn repair_on_read(&self, who: &[UUID], what: &F::E) -> Result<(), Error> {
|
async fn repair_on_read(&self, who: &[UUID], what: &F::E) -> Result<(), Error> {
|
||||||
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(what)?));
|
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(what)?));
|
||||||
self.rpc_try_call_many(&who[..],
|
self.rpc_try_call_many(&who[..], &TableRPC::<F>::Update(vec![what_enc]), who.len())
|
||||||
&TableRPC::<F>::Update(vec![what_enc]),
|
|
||||||
who.len(),
|
|
||||||
)
|
|
||||||
.await
|
.await
|
||||||
.map(|_| ())
|
.map(|_| ())
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,14 +69,19 @@ impl TableFormat for VersionTable {
|
||||||
// Propagate deletion of version blocks
|
// Propagate deletion of version blocks
|
||||||
if let Some(old_v) = old {
|
if let Some(old_v) = old {
|
||||||
if new.deleted && !old_v.deleted {
|
if new.deleted && !old_v.deleted {
|
||||||
let deleted_block_refs = old_v.blocks.iter()
|
let deleted_block_refs = old_v
|
||||||
|
.blocks
|
||||||
|
.iter()
|
||||||
.map(|vb| BlockRef {
|
.map(|vb| BlockRef {
|
||||||
block: vb.hash.clone(),
|
block: vb.hash.clone(),
|
||||||
version: old_v.uuid.clone(),
|
version: old_v.uuid.clone(),
|
||||||
deleted: true,
|
deleted: true,
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
garage.block_ref_table.insert_many(&deleted_block_refs[..]).await?;
|
garage
|
||||||
|
.block_ref_table
|
||||||
|
.insert_many(&deleted_block_refs[..])
|
||||||
|
.await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
Loading…
Reference in a new issue