This commit is contained in:
Alex 2020-04-16 19:28:02 +02:00
parent 2832be4396
commit 2f3b1a072f
6 changed files with 129 additions and 37 deletions

View file

@ -111,6 +111,19 @@ where
Ok(wr)
}
pub fn debug_serialize<T: Serialize>(x: T) -> String {
match serde_json::to_string(&x) {
Ok(ss) => {
if ss.len() > 100 {
ss[..100].to_string()
} else {
ss
}
}
Err(e) => format!("<JSON serialization error: {}>", e),
}
}
// Network management
#[derive(Clone, Debug, Serialize, Deserialize)]

View file

@ -131,11 +131,11 @@ impl Ring {
self.ring = new_ring;
self.n_datacenters = datacenters.len();
eprintln!("RING: --");
for e in self.ring.iter() {
eprintln!("{:?}", e);
}
eprintln!("END --");
// eprintln!("RING: --");
// for e in self.ring.iter() {
// eprintln!("{:?}", e);
// }
// eprintln!("END --");
}
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {

View file

@ -84,7 +84,7 @@ pub async fn rpc_call(
let status = sys.status.borrow().clone();
match status.nodes.get(to) {
Some(status) => status.addr.clone(),
None => return Err(Error::Message(format!("Peer ID not found"))),
None => return Err(Error::Message(format!("Peer ID not found: {:?}", to))),
}
};
sys.rpc_client.call(&addr, msg, timeout).await

View file

@ -8,29 +8,16 @@ use futures_util::stream::*;
use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use serde::Serialize;
use tokio::net::{TcpListener, TcpStream};
use tokio_rustls::server::TlsStream;
use tokio_rustls::TlsAcceptor;
use crate::data::rmp_to_vec_all_named;
use crate::data::{rmp_to_vec_all_named, debug_serialize};
use crate::error::Error;
use crate::proto::Message;
use crate::server::Garage;
use crate::tls_util;
fn debug_serialize<T: Serialize>(x: T) -> String {
match serde_json::to_string(&x) {
Ok(ss) => {
if ss.len() > 100 {
ss[..100].to_string()
} else {
ss
}
}
Err(e) => format!("<JSON serialization error: {}>", e),
}
}
fn err_to_msg(x: Result<Message, Error>) -> Message {
match x {
@ -53,12 +40,12 @@ async fn handler(
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf())?;
eprintln!(
"RPC from {}: {} ({} bytes)",
addr,
debug_serialize(&msg),
whole_body.len()
);
// eprintln!(
// "RPC from {}: {} ({} bytes)",
// addr,
// debug_serialize(&msg),
// whole_body.len()
// );
let sys = garage.system.clone();
let resp = err_to_msg(match msg {
@ -99,7 +86,7 @@ async fn handler(
_ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))),
});
eprintln!("reply to {}: {}", addr, debug_serialize(&resp));
// eprintln!("reply to {}: {}", addr, debug_serialize(&resp));
Ok(Response::new(Body::from(rmp_to_vec_all_named(&resp)?)))
}

View file

@ -306,7 +306,20 @@ impl<F: TableSchema + 'static> Table<F> {
Ok(resps_vals)
}
async fn handle(&self, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> {
pub async fn rpc_call(&self, who: &UUID, rpc: &TableRPC<F>) -> Result<TableRPC<F>, Error> {
let rpc_bytes = rmp_to_vec_all_named(rpc)?;
let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes);
let resp = rpc_call(self.system.clone(), who, &rpc_msg, self.param.timeout).await?;
if let Message::TableRPC(tbl, rep_by) = &resp {
if *tbl == self.name {
return Ok(rmp_serde::decode::from_read_ref(&rep_by)?);
}
}
Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp)))
}
async fn handle(self: &Arc<Self>, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> {
match msg {
TableRPC::ReadEntry(key, sort_key) => {
let value = self.handle_read_entry(&key, &sort_key)?;
@ -334,7 +347,7 @@ impl<F: TableSchema + 'static> Table<F> {
}
}
async fn handle_update(&self, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> {
async fn handle_update(self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> {
for update_bytes in entries.drain(..) {
let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?;
@ -360,7 +373,12 @@ impl<F: TableSchema + 'static> Table<F> {
Ok((old_entry, new_entry))
})?;
self.instance.updated(old_entry, new_entry).await;
if old_entry.as_ref() != Some(&new_entry) {
self.instance.updated(old_entry, new_entry).await;
let syncer = self.syncer.read().await.as_ref().unwrap().clone();
self.system.background.spawn(syncer.invalidate(tree_key));
}
}
Ok(())
}

View file

@ -1,5 +1,5 @@
use rand::Rng;
use std::collections::{BTreeSet, HashMap, VecDeque};
use std::collections::{BTreeSet, BTreeMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
@ -10,19 +10,21 @@ use futures_util::future::*;
use tokio::sync::watch;
use tokio::sync::Mutex;
use serde::{Serialize, Deserialize};
use serde_bytes::ByteBuf;
use crate::data::*;
use crate::error::Error;
use crate::membership::Ring;
use crate::table::*;
const MAX_DEPTH: usize = 16;
const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800);
pub struct TableSyncer<F: TableSchema> {
pub table: Arc<Table<F>>,
pub todo: Mutex<SyncTodo>,
pub cache: Vec<Mutex<HashMap<SyncRange, RangeChecksum>>>,
pub cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksum>>>,
}
pub struct SyncTodo {
@ -43,6 +45,17 @@ pub struct SyncRange {
pub level: usize,
}
impl std::cmp::PartialOrd for SyncRange {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl std::cmp::Ord for SyncRange {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.begin.cmp(&other.begin)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RangeChecksum {
pub bounds: SyncRange,
@ -59,7 +72,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
let syncer = Arc::new(TableSyncer {
table: table.clone(),
todo: Mutex::new(todo),
cache: (0..32).map(|_| Mutex::new(HashMap::new())).collect::<Vec<_>>(),
cache: (0..MAX_DEPTH).map(|_| Mutex::new(BTreeMap::new())).collect::<Vec<_>>(),
});
let s1 = syncer.clone();
@ -83,12 +96,14 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
tokio::time::delay_for(Duration::from_secs(10));
self.todo.lock().await.add_full_scan(&self.table);
let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone();
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone();
loop {
while !*must_exit.borrow() {
let s_ring_recv = ring_recv.recv().fuse();
let s_must_exit = must_exit.recv().fuse();
pin_mut!(s_ring_recv, s_must_exit);
@ -96,21 +111,24 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
select! {
_ = next_full_scan => {
next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
eprintln!("Adding full scan to syncer todo list");
self.todo.lock().await.add_full_scan(&self.table);
}
new_ring_r = s_ring_recv => {
if let Some(new_ring) = new_ring_r {
eprintln!("Adding ring difference to syncer todo list");
self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring);
prev_ring = new_ring;
}
}
must_exit_v = s_must_exit => {
if must_exit_v.unwrap_or(false) {
return Ok(())
break;
}
}
}
}
Ok(())
}
async fn syncer_task(
@ -131,6 +149,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
async fn sync_partition(self: Arc<Self>, partition: &Partition, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
eprintln!("Calculating root checksum for {:?}...", partition);
let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?;
eprintln!("Root checksum for {:?}: {:?}", partition, root_cks);
@ -152,7 +171,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
async fn root_checksum(self: &Arc<Self>, begin: &Hash, end: &Hash, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> {
for i in 1..32 {
for i in 1..MAX_DEPTH {
let rc = self.range_checksum(&SyncRange{
begin: begin.to_vec(),
end: end.to_vec(),
@ -262,13 +281,49 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
todo.push_back(root_ck);
while !todo.is_empty() && !*must_exit.borrow() {
eprintln!("Sync with {:?}: {} remaining", who, todo.len());
let end = std::cmp::min(16, todo.len());
let step = todo.drain(..end).collect::<Vec<_>>();
unimplemented!()
let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::SyncChecksums(step)).await?;
if let TableRPC::<F>::SyncDifferentSet(mut s) = rpc_resp {
let mut items = vec![];
for differing in s.drain(..) {
if differing.level == 0 {
items.push(differing.begin);
} else {
let checksum = self.range_checksum(&differing, &mut must_exit).await?;
todo.push_back(checksum);
}
}
if items.len() > 0 {
self.table.system.background.spawn(self.clone().send_items(who.clone(), items));
}
} else {
return Err(Error::Message(format!("Unexpected response to RPC SyncChecksums: {}", debug_serialize(&rpc_resp))));
}
}
Ok(())
}
async fn send_items(self: Arc<Self>, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
eprintln!("Sending {} items to {:?}", item_list.len(), who);
let mut values = vec![];
for item in item_list.iter() {
if let Some(v) = self.table.store.get(&item[..])? {
values.push(Arc::new(ByteBuf::from(v.as_ref())));
}
}
let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::Update(values)).await?;
if let TableRPC::<F>::Ok = rpc_resp {
Ok(())
} else {
Err(Error::Message(format!("Unexpected response to RPC Update: {}", debug_serialize(&rpc_resp))))
}
}
pub async fn handle_checksum_rpc(self: &Arc<Self>, checksums: &[RangeChecksum], mut must_exit: watch::Receiver<bool>) -> Result<Vec<SyncRange>, Error> {
let mut ret = vec![];
for ckr in checksums.iter() {
@ -288,6 +343,25 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
Ok(ret)
}
pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> {
for i in 1..MAX_DEPTH {
let needle = SyncRange{
begin: item_key.to_vec(),
end: vec![],
level: i,
};
let mut cache = self.cache[i].lock().await;
if let Some(cache_entry) = cache.range(..=needle).rev().next() {
if cache_entry.0.begin <= item_key && cache_entry.0.end > item_key {
let index = cache_entry.0.clone();
drop(cache_entry);
cache.remove(&index);
}
}
}
Ok(())
}
}
impl SyncTodo {