forked from Deuxfleurs/garage
WIP
This commit is contained in:
parent
e8d750175d
commit
2832be4396
3 changed files with 208 additions and 31 deletions
|
@ -14,7 +14,7 @@ type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
|
||||||
|
|
||||||
pub struct BackgroundRunner {
|
pub struct BackgroundRunner {
|
||||||
n_runners: usize,
|
n_runners: usize,
|
||||||
stop_signal: watch::Receiver<bool>,
|
pub stop_signal: watch::Receiver<bool>,
|
||||||
|
|
||||||
queue_in: mpsc::UnboundedSender<(Job, bool)>,
|
queue_in: mpsc::UnboundedSender<(Job, bool)>,
|
||||||
queue_out: Mutex<mpsc::UnboundedReceiver<(Job, bool)>>,
|
queue_out: Mutex<mpsc::UnboundedReceiver<(Job, bool)>>,
|
||||||
|
|
21
src/table.rs
21
src/table.rs
|
@ -4,6 +4,7 @@ use std::time::Duration;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures::stream::*;
|
use futures::stream::*;
|
||||||
|
use tokio::sync::RwLock;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_bytes::ByteBuf;
|
use serde_bytes::ByteBuf;
|
||||||
|
|
||||||
|
@ -12,7 +13,7 @@ use crate::error::Error;
|
||||||
use crate::membership::System;
|
use crate::membership::System;
|
||||||
use crate::proto::*;
|
use crate::proto::*;
|
||||||
use crate::rpc_client::*;
|
use crate::rpc_client::*;
|
||||||
use crate::table_sync::TableSyncer;
|
use crate::table_sync::*;
|
||||||
|
|
||||||
pub struct Table<F: TableSchema> {
|
pub struct Table<F: TableSchema> {
|
||||||
pub instance: F,
|
pub instance: F,
|
||||||
|
@ -21,6 +22,7 @@ pub struct Table<F: TableSchema> {
|
||||||
|
|
||||||
pub system: Arc<System>,
|
pub system: Arc<System>,
|
||||||
pub store: sled::Tree,
|
pub store: sled::Tree,
|
||||||
|
pub syncer: RwLock<Option<Arc<TableSyncer<F>>>>,
|
||||||
|
|
||||||
pub param: TableReplicationParams,
|
pub param: TableReplicationParams,
|
||||||
}
|
}
|
||||||
|
@ -59,6 +61,9 @@ pub enum TableRPC<F: TableSchema> {
|
||||||
ReadEntryResponse(Option<ByteBuf>),
|
ReadEntryResponse(Option<ByteBuf>),
|
||||||
|
|
||||||
Update(Vec<Arc<ByteBuf>>),
|
Update(Vec<Arc<ByteBuf>>),
|
||||||
|
|
||||||
|
SyncChecksums(Vec<RangeChecksum>),
|
||||||
|
SyncDifferentSet(Vec<SyncRange>),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait PartitionKey {
|
pub trait PartitionKey {
|
||||||
|
@ -132,8 +137,10 @@ impl<F: TableSchema + 'static> Table<F> {
|
||||||
system,
|
system,
|
||||||
store,
|
store,
|
||||||
param,
|
param,
|
||||||
|
syncer: RwLock::new(None),
|
||||||
});
|
});
|
||||||
TableSyncer::launch(table.clone()).await;
|
let syncer = TableSyncer::launch(table.clone()).await;
|
||||||
|
*table.syncer.write().await = Some(syncer);
|
||||||
table
|
table
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -309,6 +316,11 @@ impl<F: TableSchema + 'static> Table<F> {
|
||||||
self.handle_update(pairs).await?;
|
self.handle_update(pairs).await?;
|
||||||
Ok(TableRPC::Ok)
|
Ok(TableRPC::Ok)
|
||||||
}
|
}
|
||||||
|
TableRPC::SyncChecksums(checksums) => {
|
||||||
|
let syncer = self.syncer.read().await.as_ref().unwrap().clone();
|
||||||
|
let differing = syncer.handle_checksum_rpc(&checksums[..], self.system.background.stop_signal.clone()).await?;
|
||||||
|
Ok(TableRPC::SyncDifferentSet(differing))
|
||||||
|
}
|
||||||
_ => Err(Error::RPCError(format!("Unexpected table RPC"))),
|
_ => Err(Error::RPCError(format!("Unexpected table RPC"))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -353,6 +365,11 @@ impl<F: TableSchema + 'static> Table<F> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> {
|
||||||
|
// TODO
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
|
fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
|
||||||
let mut ret = p.hash().to_vec();
|
let mut ret = p.hash().to_vec();
|
||||||
ret.extend(s.sort_key());
|
ret.extend(s.sort_key());
|
||||||
|
|
|
@ -1,12 +1,15 @@
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use std::collections::BTreeSet;
|
use std::collections::{BTreeSet, HashMap, VecDeque};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use futures::{pin_mut, select};
|
use futures::{pin_mut, select};
|
||||||
|
use futures::future::BoxFuture;
|
||||||
|
use futures_util::stream::*;
|
||||||
use futures_util::future::*;
|
use futures_util::future::*;
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
use serde::{Serialize, Deserialize};
|
||||||
|
|
||||||
use crate::data::*;
|
use crate::data::*;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
|
@ -14,11 +17,12 @@ use crate::membership::Ring;
|
||||||
use crate::table::*;
|
use crate::table::*;
|
||||||
|
|
||||||
const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
|
const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
|
||||||
|
const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800);
|
||||||
|
|
||||||
pub struct TableSyncer<F: TableSchema> {
|
pub struct TableSyncer<F: TableSchema> {
|
||||||
pub table: Arc<Table<F>>,
|
pub table: Arc<Table<F>>,
|
||||||
|
|
||||||
pub todo: Mutex<SyncTodo>,
|
pub todo: Mutex<SyncTodo>,
|
||||||
|
pub cache: Vec<Mutex<HashMap<SyncRange, RangeChecksum>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct SyncTodo {
|
pub struct SyncTodo {
|
||||||
|
@ -32,12 +36,30 @@ pub struct Partition {
|
||||||
pub retain: bool,
|
pub retain: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct SyncRange {
|
||||||
|
pub begin: Vec<u8>,
|
||||||
|
pub end: Vec<u8>,
|
||||||
|
pub level: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct RangeChecksum {
|
||||||
|
pub bounds: SyncRange,
|
||||||
|
pub children: Vec<(SyncRange, Hash)>,
|
||||||
|
pub found_limit: Option<Vec<u8>>,
|
||||||
|
|
||||||
|
#[serde(skip, default="std::time::Instant::now")]
|
||||||
|
pub time: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
impl<F: TableSchema + 'static> TableSyncer<F> {
|
impl<F: TableSchema + 'static> TableSyncer<F> {
|
||||||
pub async fn launch(table: Arc<Table<F>>) -> Arc<Self> {
|
pub async fn launch(table: Arc<Table<F>>) -> Arc<Self> {
|
||||||
let todo = SyncTodo { todo: Vec::new() };
|
let todo = SyncTodo { todo: Vec::new() };
|
||||||
let syncer = Arc::new(TableSyncer {
|
let syncer = Arc::new(TableSyncer {
|
||||||
table: table.clone(),
|
table: table.clone(),
|
||||||
todo: Mutex::new(todo),
|
todo: Mutex::new(todo),
|
||||||
|
cache: (0..32).map(|_| Mutex::new(HashMap::new())).collect::<Vec<_>>(),
|
||||||
});
|
});
|
||||||
|
|
||||||
let s1 = syncer.clone();
|
let s1 = syncer.clone();
|
||||||
|
@ -95,38 +117,176 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
mut must_exit: watch::Receiver<bool>,
|
mut must_exit: watch::Receiver<bool>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
loop {
|
while !*must_exit.borrow() {
|
||||||
let s_pop_task = self.pop_task().fuse();
|
if let Some(partition) = self.todo.lock().await.pop_task() {
|
||||||
let s_must_exit = must_exit.recv().fuse();
|
let res = self.clone().sync_partition(&partition, &mut must_exit).await;
|
||||||
pin_mut!(s_must_exit, s_pop_task);
|
if let Err(e) = res {
|
||||||
|
eprintln!("Error while syncing {:?}: {}", partition, e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tokio::time::delay_for(Duration::from_secs(1)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
select! {
|
async fn sync_partition(self: Arc<Self>, partition: &Partition, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
|
||||||
task = s_pop_task => {
|
let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?;
|
||||||
if let Some(partition) = task {
|
eprintln!("Root checksum for {:?}: {:?}", partition, root_cks);
|
||||||
let res = self.sync_partition(&partition).await;
|
|
||||||
if let Err(e) = res {
|
let nodes = self.table.system.ring.borrow().clone().walk_ring(&partition.begin, self.table.param.replication_factor);
|
||||||
eprintln!("Error while syncing {:?}: {}", partition, e);
|
let mut sync_futures = nodes.iter()
|
||||||
}
|
.map(|node| self.clone().do_sync_with(root_cks.clone(), node.clone(), must_exit.clone()))
|
||||||
} else {
|
.collect::<FuturesUnordered<_>>();
|
||||||
tokio::time::delay_for(Duration::from_secs(1)).await;
|
|
||||||
|
while let Some(r) = sync_futures.next().await {
|
||||||
|
if let Err(e) = r {
|
||||||
|
eprintln!("Sync error: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !partition.retain {
|
||||||
|
self.table.delete_range(&partition.begin, &partition.end).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn root_checksum(self: &Arc<Self>, begin: &Hash, end: &Hash, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> {
|
||||||
|
for i in 1..32 {
|
||||||
|
let rc = self.range_checksum(&SyncRange{
|
||||||
|
begin: begin.to_vec(),
|
||||||
|
end: end.to_vec(),
|
||||||
|
level: i,
|
||||||
|
}, must_exit).await?;
|
||||||
|
if rc.found_limit.is_none() {
|
||||||
|
return Ok(rc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(Error::Message(format!("Unable to compute root checksum (this should never happen")))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn range_checksum<'a>(self: &'a Arc<Self>, range: &'a SyncRange, must_exit: &'a mut watch::Receiver<bool>) -> BoxFuture<'a, Result<RangeChecksum, Error>> {
|
||||||
|
async move {
|
||||||
|
let mut cache = self.cache[range.level].lock().await;
|
||||||
|
if let Some(v) = cache.get(&range) {
|
||||||
|
if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT {
|
||||||
|
return Ok(v.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cache.remove(&range);
|
||||||
|
drop(cache);
|
||||||
|
|
||||||
|
let v = self.range_checksum_inner(&range, must_exit).await?;
|
||||||
|
|
||||||
|
let mut cache = self.cache[range.level].lock().await;
|
||||||
|
eprintln!("Checksum for {:?}: {:?}", range, v);
|
||||||
|
cache.insert(range.clone(), v.clone());
|
||||||
|
Ok(v)
|
||||||
|
}.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn range_checksum_inner(self: &Arc<Self>, range: &SyncRange, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> {
|
||||||
|
if range.level == 1 {
|
||||||
|
let mut children = vec![];
|
||||||
|
for item in self.table.store.range(range.begin.clone()..range.end.clone()) {
|
||||||
|
let (key, value) = item?;
|
||||||
|
let key_hash = hash(&key[..]);
|
||||||
|
if key != range.begin && key_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) {
|
||||||
|
return Ok(RangeChecksum{
|
||||||
|
bounds: range.clone(),
|
||||||
|
children,
|
||||||
|
found_limit: Some(key.to_vec()),
|
||||||
|
time: Instant::now(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
let item_range = SyncRange{
|
||||||
|
begin: key.to_vec(),
|
||||||
|
end: vec![],
|
||||||
|
level: 0,
|
||||||
|
};
|
||||||
|
children.push((item_range, hash(&value[..])));
|
||||||
|
}
|
||||||
|
Ok(RangeChecksum{
|
||||||
|
bounds: range.clone(),
|
||||||
|
children,
|
||||||
|
found_limit: None,
|
||||||
|
time: Instant::now(),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
let mut children = vec![];
|
||||||
|
let mut sub_range = SyncRange{
|
||||||
|
begin: range.begin.clone(),
|
||||||
|
end: range.end.clone(),
|
||||||
|
level: range.level - 1,
|
||||||
|
};
|
||||||
|
let mut time = Instant::now();
|
||||||
|
while !*must_exit.borrow() {
|
||||||
|
let sub_ck = self.range_checksum(&sub_range, must_exit).await?;
|
||||||
|
|
||||||
|
if sub_ck.children.len() > 0 {
|
||||||
|
let sub_ck_hash = hash(&rmp_to_vec_all_named(&sub_ck)?[..]);
|
||||||
|
children.push((sub_range.clone(), sub_ck_hash));
|
||||||
|
if sub_ck.time < time {
|
||||||
|
time = sub_ck.time;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
must_exit_v = s_must_exit => {
|
|
||||||
if must_exit_v.unwrap_or(false) {
|
if sub_ck.found_limit.is_none() || sub_ck.children.len() == 0 {
|
||||||
return Ok(())
|
return Ok(RangeChecksum{
|
||||||
|
bounds: range.clone(),
|
||||||
|
children,
|
||||||
|
found_limit: None,
|
||||||
|
time,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
let found_limit = sub_ck.found_limit.unwrap();
|
||||||
|
|
||||||
|
let actual_limit_hash = hash(&found_limit[..]);
|
||||||
|
if actual_limit_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) {
|
||||||
|
return Ok(RangeChecksum{
|
||||||
|
bounds: range.clone(),
|
||||||
|
children,
|
||||||
|
found_limit: Some(found_limit.clone()),
|
||||||
|
time,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
sub_range.begin = found_limit;
|
||||||
|
}
|
||||||
|
Err(Error::Message(format!("Exiting.")))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn do_sync_with(self: Arc<Self>, root_ck: RangeChecksum, who: UUID, mut must_exit: watch::Receiver<bool>) -> Result<(), Error> {
|
||||||
|
let mut todo = VecDeque::new();
|
||||||
|
todo.push_back(root_ck);
|
||||||
|
|
||||||
|
while !todo.is_empty() && !*must_exit.borrow() {
|
||||||
|
let end = std::cmp::min(16, todo.len());
|
||||||
|
let step = todo.drain(..end).collect::<Vec<_>>();
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handle_checksum_rpc(self: &Arc<Self>, checksums: &[RangeChecksum], mut must_exit: watch::Receiver<bool>) -> Result<Vec<SyncRange>, Error> {
|
||||||
|
let mut ret = vec![];
|
||||||
|
for ckr in checksums.iter() {
|
||||||
|
let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?;
|
||||||
|
for (range, hash) in ckr.children.iter() {
|
||||||
|
match our_ckr.children.binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) {
|
||||||
|
Err(_) => {
|
||||||
|
ret.push(range.clone());
|
||||||
|
}
|
||||||
|
Ok(i) => {
|
||||||
|
if our_ckr.children[i].1 != *hash {
|
||||||
|
ret.push(range.clone());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
Ok(ret)
|
||||||
|
|
||||||
async fn pop_task(&self) -> Option<Partition> {
|
|
||||||
self.todo.lock().await.pop_task()
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn sync_partition(self: &Arc<Self>, partition: &Partition) -> Result<(), Error> {
|
|
||||||
eprintln!("NOT IMPLEMENTED: SYNC PARTITION {:?}", partition);
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue