205 lines
4.9 KiB
Rust
205 lines
4.9 KiB
Rust
|
use rand::Rng;
|
||
|
use std::sync::Arc;
|
||
|
use std::time::Duration;
|
||
|
use std::collections::BTreeSet;
|
||
|
|
||
|
use futures::{pin_mut, select};
|
||
|
use futures_util::future::*;
|
||
|
use tokio::sync::watch;
|
||
|
use tokio::sync::Mutex;
|
||
|
|
||
|
use crate::data::*;
|
||
|
use crate::error::Error;
|
||
|
use crate::membership::{Ring, System};
|
||
|
use crate::table::*;
|
||
|
|
||
|
const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
|
||
|
|
||
|
pub struct TableSyncer<F: TableSchema> {
|
||
|
pub table: Arc<Table<F>>,
|
||
|
|
||
|
pub todo: Mutex<SyncTodo>,
|
||
|
}
|
||
|
|
||
|
pub struct SyncTodo {
|
||
|
pub todo: Vec<Partition>,
|
||
|
}
|
||
|
|
||
|
#[derive(Debug, Clone)]
|
||
|
pub struct Partition {
|
||
|
pub begin: Hash,
|
||
|
pub end: Hash,
|
||
|
}
|
||
|
|
||
|
impl<F: TableSchema + 'static> TableSyncer<F> {
|
||
|
pub async fn launch(table: Arc<Table<F>>) -> Arc<Self> {
|
||
|
let todo = SyncTodo { todo: Vec::new() };
|
||
|
let syncer = Arc::new(TableSyncer {
|
||
|
table: table.clone(),
|
||
|
todo: Mutex::new(todo),
|
||
|
});
|
||
|
|
||
|
let s1 = syncer.clone();
|
||
|
table
|
||
|
.system
|
||
|
.background
|
||
|
.spawn_worker(move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit))
|
||
|
.await;
|
||
|
|
||
|
let s2 = syncer.clone();
|
||
|
table
|
||
|
.system
|
||
|
.background
|
||
|
.spawn_worker(move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit))
|
||
|
.await;
|
||
|
|
||
|
syncer
|
||
|
}
|
||
|
|
||
|
async fn watcher_task(
|
||
|
self: Arc<Self>,
|
||
|
mut must_exit: watch::Receiver<bool>,
|
||
|
) -> Result<(), Error> {
|
||
|
self.todo.lock().await.add_full_scan(&self.table);
|
||
|
let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
|
||
|
let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone();
|
||
|
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone();
|
||
|
|
||
|
loop {
|
||
|
let s_ring_recv = ring_recv.recv().fuse();
|
||
|
let s_must_exit = must_exit.recv().fuse();
|
||
|
pin_mut!(s_ring_recv, s_must_exit);
|
||
|
|
||
|
select! {
|
||
|
_ = next_full_scan => {
|
||
|
next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
|
||
|
self.todo.lock().await.add_full_scan(&self.table);
|
||
|
}
|
||
|
new_ring_r = s_ring_recv => {
|
||
|
if let Some(new_ring) = new_ring_r {
|
||
|
self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring);
|
||
|
prev_ring = new_ring;
|
||
|
}
|
||
|
}
|
||
|
must_exit_v = s_must_exit => {
|
||
|
if must_exit_v.unwrap_or(false) {
|
||
|
return Ok(())
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
async fn syncer_task(
|
||
|
self: Arc<Self>,
|
||
|
mut must_exit: watch::Receiver<bool>,
|
||
|
) -> Result<(), Error> {
|
||
|
loop {
|
||
|
let s_pop_task = self.pop_task().fuse();
|
||
|
let s_must_exit = must_exit.recv().fuse();
|
||
|
pin_mut!(s_must_exit, s_pop_task);
|
||
|
|
||
|
select! {
|
||
|
task = s_pop_task => {
|
||
|
if let Some(partition) = task {
|
||
|
let res = self.sync_partition(&partition).await;
|
||
|
if let Err(e) = res {
|
||
|
eprintln!("Error while syncing {:?}: {}", partition, e);
|
||
|
}
|
||
|
} else {
|
||
|
tokio::time::delay_for(Duration::from_secs(1)).await;
|
||
|
}
|
||
|
}
|
||
|
must_exit_v = s_must_exit => {
|
||
|
if must_exit_v.unwrap_or(false) {
|
||
|
return Ok(())
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
async fn pop_task(&self) -> Option<Partition> {
|
||
|
self.todo.lock().await.pop_task()
|
||
|
}
|
||
|
|
||
|
async fn sync_partition(self: &Arc<Self>, partition: &Partition) -> Result<(), Error> {
|
||
|
unimplemented!()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
impl SyncTodo {
|
||
|
fn add_full_scan<F: TableSchema>(&mut self, table: &Table<F>) {
|
||
|
let my_id = table.system.id.clone();
|
||
|
|
||
|
self.todo.clear();
|
||
|
|
||
|
let ring: Arc<Ring> = table.system.ring.borrow().clone();
|
||
|
for i in 0..ring.ring.len() {
|
||
|
let nodes = ring.walk_ring_from_pos(i, table.param.replication_factor);
|
||
|
let begin = ring.ring[i].location.clone();
|
||
|
|
||
|
if i == ring.ring.len() - 1 {
|
||
|
let end = ring.ring[0].location.clone();
|
||
|
self.add_full_scan_aux(table, begin, [0xffu8; 32].into(), &nodes[..], &my_id);
|
||
|
self.add_full_scan_aux(table, [0u8; 32].into(), end, &nodes[..], &my_id);
|
||
|
} else {
|
||
|
let end = ring.ring[i + 1].location.clone();
|
||
|
self.add_full_scan_aux(table, begin, end, &nodes[..], &my_id);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
fn add_full_scan_aux<F: TableSchema>(
|
||
|
&mut self,
|
||
|
table: &Table<F>,
|
||
|
begin: Hash,
|
||
|
end: Hash,
|
||
|
nodes: &[UUID],
|
||
|
my_id: &UUID,
|
||
|
) {
|
||
|
if !nodes.contains(my_id) {
|
||
|
// Check if we have some data to send, otherwise skip
|
||
|
if table
|
||
|
.store
|
||
|
.range(begin.clone()..end.clone())
|
||
|
.next()
|
||
|
.is_none()
|
||
|
{}
|
||
|
}
|
||
|
|
||
|
self.todo.push(Partition { begin, end });
|
||
|
}
|
||
|
|
||
|
fn add_ring_difference<F: TableSchema>(&mut self, table: &Table<F>, old: &Ring, new: &Ring) {
|
||
|
let old_ring = ring_points(old);
|
||
|
let new_ring = ring_points(new);
|
||
|
unimplemented!()
|
||
|
}
|
||
|
|
||
|
fn pop_task(&mut self) -> Option<Partition> {
|
||
|
if self.todo.is_empty() {
|
||
|
return None;
|
||
|
}
|
||
|
|
||
|
let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len());
|
||
|
if i == self.todo.len() - 1 {
|
||
|
self.todo.pop()
|
||
|
} else {
|
||
|
let replacement = self.todo.pop().unwrap();
|
||
|
let ret = std::mem::replace(&mut self.todo[i], replacement);
|
||
|
Some(ret)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
fn ring_points(ring: &Ring) -> BTreeSet<Hash> {
|
||
|
let mut ret = BTreeSet::new();
|
||
|
ret.insert([0u8; 32].into());
|
||
|
ret.insert([0xFFu8; 32].into());
|
||
|
for i in 0..ring.ring.len() {
|
||
|
ret.insert(ring.ring[i].location.clone());
|
||
|
}
|
||
|
ret
|
||
|
}
|