Improved bootstraping procedure #56

Merged
lx merged 11 commits from better_bootstrap into main 2021-04-05 21:50:36 +00:00
2 changed files with 31 additions and 5 deletions
Showing only changes of commit f11bd80d2a - Show all commits

View file

@ -582,7 +582,7 @@ impl System {
.map(|ip| (*ip, None)) .map(|ip| (*ip, None))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
match self.persist_status.load() { match self.persist_status.load_async().await {
Ok(peers) => { Ok(peers) => {
bp2.extend(peers.iter().map(|x| (x.addr, Some(x.id)))); bp2.extend(peers.iter().map(|x| (x.addr, Some(x.id))));
} }
@ -653,9 +653,25 @@ impl System {
async fn update_status(self: &Arc<Self>, updaters: &Updaters, status: Status) { async fn update_status(self: &Arc<Self>, updaters: &Updaters, status: Status) {
if status.hash != self.status.borrow().hash { if status.hash != self.status.borrow().hash {
info!("Persisting new peer list"); info!("Persisting new peer list");
let serializable_status = status.to_serializable_membership(&self);
self.persist_status.save_async(&serializable_status).await let mut list = status.to_serializable_membership(&self);
.expect("Unable to persist peer list");
// Combine with old peer list to make sure no peer is lost
match self.persist_status.load_async().await {
Ok(old_list) => {
for pp in old_list {
if !list.iter().any(|np| pp.id == np.id) {
list.push(pp);
}
}
}
_ => (),
}
if list.len() > 0 {
self.persist_status.save_async(&list).await
.expect("Unable to persist peer list");
}
} }
let status = Arc::new(status); let status = Arc::new(status);

View file

@ -1,7 +1,7 @@
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::path::PathBuf; use std::path::PathBuf;
use tokio::io::AsyncWriteExt; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -51,6 +51,16 @@ where
Ok(()) Ok(())
} }
pub async fn load_async(&self) -> Result<T, Error> {
let mut file = tokio::fs::File::open(&self.path).await?;
let mut bytes = vec![];
file.read_to_end(&mut bytes).await?;
let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
Ok(value)
}
pub async fn save_async(&self, t: &T) -> Result<(), Error> { pub async fn save_async(&self, t: &T) -> Result<(), Error> {
let bytes = rmp_to_vec_all_named(t)?; let bytes = rmp_to_vec_all_named(t)?;