diff --git a/src/background.rs b/src/background.rs index de0f07af..772745a6 100644 --- a/src/background.rs +++ b/src/background.rs @@ -68,7 +68,7 @@ impl BackgroundRunner { let _: Result<_, _> = self.queue_in.clone().send((boxed, true)); } - pub async fn spawn_worker(self: Arc, worker: F) + pub async fn spawn_worker(&self, worker: F) where F: FnOnce(watch::Receiver) -> T + Send + 'static, T: Future + Send + 'static, diff --git a/src/main.rs b/src/main.rs index ea6124b5..8b124bff 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ mod proto; mod background; mod membership; mod table; +mod table_sync; mod block; mod block_ref_table; @@ -36,11 +37,11 @@ pub struct Opt { #[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")] rpc_host: SocketAddr, - #[structopt(long="ca-cert")] + #[structopt(long = "ca-cert")] ca_cert: Option, - #[structopt(long="client-cert")] + #[structopt(long = "client-cert")] client_cert: Option, - #[structopt(long="client-key")] + #[structopt(long = "client-key")] client_key: Option, #[structopt(subcommand)] @@ -86,13 +87,11 @@ async fn main() { let opt = Opt::from_args(); let tls_config = match (opt.ca_cert, opt.client_cert, opt.client_key) { - (Some(ca_cert), Some(client_cert), Some(client_key)) => { - Some(TlsConfig{ - ca_cert, - node_cert: client_cert, - node_key: client_key, - }) - } + (Some(ca_cert), Some(client_cert), Some(client_key)) => Some(TlsConfig { + ca_cert, + node_cert: client_cert, + node_key: client_key, + }), (None, None, None) => None, _ => { eprintln!("Missing one of: --ca-cert, --node-cert, --node-key. Not using TLS."); diff --git a/src/membership.rs b/src/membership.rs index 89550b67..368e9355 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -157,7 +157,7 @@ impl Ring { self.walk_ring_from_pos(start, n) } - fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec { + pub fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec { let mut ret = vec![]; let mut datacenters = vec![]; @@ -282,13 +282,13 @@ impl System { .collect::>(); self.clone().ping_nodes(bootstrap_peers).await; - self.background - .clone() + self.clone() + .background .spawn_worker(|stop_signal| self.ping_loop(stop_signal).map(Ok)) .await; } - pub async fn ping_nodes(self: Arc, peers: Vec<(SocketAddr, Option)>) { + async fn ping_nodes(self: Arc, peers: Vec<(SocketAddr, Option)>) { let ping_msg = self.make_ping(); let ping_resps = join_all(peers.iter().map(|(addr, id_option)| { let sys = self.clone(); diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 6f897a90..bb0ca56c 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -142,7 +142,10 @@ impl RpcClient { let resp = tokio::time::timeout(timeout, resp_fut) .await? .map_err(|e| { - eprintln!("RPC HTTP client error when connecting to {}: {}", to_addr, e); + eprintln!( + "RPC HTTP client error when connecting to {}: {}", + to_addr, e + ); e })?; @@ -158,4 +161,3 @@ impl RpcClient { } } } - diff --git a/src/rpc_server.rs b/src/rpc_server.rs index b75d67fd..16ea0ca8 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -76,9 +76,7 @@ async fn handler( // and the request handler simply sits there waiting for the task to finish. // (if it's cancelled, that's not an issue) // (TODO FIXME except if garage happens to shut down at that point) - let write_fut = async move { - garage.block_manager.write_block(&m.hash, &m.data).await - }; + let write_fut = async move { garage.block_manager.write_block(&m.hash, &m.data).await }; tokio::spawn(write_fut).await? } Message::GetBlock(h) => garage.block_manager.read_block(&h).await, diff --git a/src/server.rs b/src/server.rs index 8b49f105..af58ded1 100644 --- a/src/server.rs +++ b/src/server.rs @@ -93,7 +93,7 @@ impl Garage { &db, "block_ref".to_string(), data_rep_param.clone(), - ); + ).await; let version_table = Table::new( VersionTable { background: background.clone(), @@ -103,7 +103,7 @@ impl Garage { &db, "version".to_string(), meta_rep_param.clone(), - ); + ).await; let object_table = Table::new( ObjectTable { background: background.clone(), @@ -113,7 +113,7 @@ impl Garage { &db, "object".to_string(), meta_rep_param.clone(), - ); + ).await; let mut garage = Self { db, diff --git a/src/table.rs b/src/table.rs index 69d818c2..533b4291 100644 --- a/src/table.rs +++ b/src/table.rs @@ -12,6 +12,7 @@ use crate::error::Error; use crate::membership::System; use crate::proto::*; use crate::rpc_client::*; +use crate::table_sync::TableSyncer; pub struct Table { pub instance: F, @@ -20,7 +21,6 @@ pub struct Table { pub system: Arc, pub store: sled::Tree, - pub partitions: Vec, pub param: TableReplicationParams, } @@ -61,12 +61,6 @@ pub enum TableRPC { Update(Vec>), } -pub struct Partition { - pub begin: Hash, - pub end: Hash, - pub other_nodes: Vec, -} - pub trait PartitionKey { fn hash(&self) -> Hash; } @@ -124,7 +118,7 @@ pub trait TableSchema: Send + Sync { } impl Table { - pub fn new( + pub async fn new( instance: F, system: Arc, db: &sled::Db, @@ -132,14 +126,15 @@ impl Table { param: TableReplicationParams, ) -> Arc { let store = db.open_tree(&name).expect("Unable to open DB tree"); - Arc::new(Self { + let table = Arc::new(Self { instance, name, system, store, - partitions: Vec::new(), param, - }) + }); + TableSyncer::launch(table.clone()).await; + table } pub fn rpc_handler(self: Arc) -> Box { @@ -207,7 +202,11 @@ impl Table { } } - pub async fn get(&self, partition_key: &F::P, sort_key: &F::S) -> Result, Error> { + pub async fn get( + self: &Arc, + partition_key: &F::P, + sort_key: &F::S, + ) -> Result, Error> { let hash = partition_key.hash(); let who = self .system @@ -245,17 +244,19 @@ impl Table { } if let Some(ret_entry) = &ret { if not_all_same { - let _: Result<_, _> = self.repair_on_read(&who[..], &ret_entry).await; + self.system + .background + .spawn(self.clone().repair_on_read(who, ret_entry.clone())); } } Ok(ret) } - async fn repair_on_read(&self, who: &[UUID], what: &F::E) -> Result<(), Error> { - let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(what)?)); + async fn repair_on_read(self: Arc, who: Vec, what: F::E) -> Result<(), Error> { + let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); self.rpc_try_call_many(&who[..], &TableRPC::::Update(vec![what_enc]), who.len()) - .await - .map(|_| ()) + .await?; + Ok(()) } async fn rpc_try_call_many( diff --git a/src/table_sync.rs b/src/table_sync.rs new file mode 100644 index 00000000..5097c1b0 --- /dev/null +++ b/src/table_sync.rs @@ -0,0 +1,204 @@ +use rand::Rng; +use std::sync::Arc; +use std::time::Duration; +use std::collections::BTreeSet; + +use futures::{pin_mut, select}; +use futures_util::future::*; +use tokio::sync::watch; +use tokio::sync::Mutex; + +use crate::data::*; +use crate::error::Error; +use crate::membership::{Ring, System}; +use crate::table::*; + +const SCAN_INTERVAL: Duration = Duration::from_secs(3600); + +pub struct TableSyncer { + pub table: Arc>, + + pub todo: Mutex, +} + +pub struct SyncTodo { + pub todo: Vec, +} + +#[derive(Debug, Clone)] +pub struct Partition { + pub begin: Hash, + pub end: Hash, +} + +impl TableSyncer { + pub async fn launch(table: Arc>) -> Arc { + let todo = SyncTodo { todo: Vec::new() }; + let syncer = Arc::new(TableSyncer { + table: table.clone(), + todo: Mutex::new(todo), + }); + + let s1 = syncer.clone(); + table + .system + .background + .spawn_worker(move |must_exit: watch::Receiver| s1.watcher_task(must_exit)) + .await; + + let s2 = syncer.clone(); + table + .system + .background + .spawn_worker(move |must_exit: watch::Receiver| s2.syncer_task(must_exit)) + .await; + + syncer + } + + async fn watcher_task( + self: Arc, + mut must_exit: watch::Receiver, + ) -> Result<(), Error> { + self.todo.lock().await.add_full_scan(&self.table); + let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse(); + let mut prev_ring: Arc = self.table.system.ring.borrow().clone(); + let mut ring_recv: watch::Receiver> = self.table.system.ring.clone(); + + loop { + let s_ring_recv = ring_recv.recv().fuse(); + let s_must_exit = must_exit.recv().fuse(); + pin_mut!(s_ring_recv, s_must_exit); + + select! { + _ = next_full_scan => { + next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse(); + self.todo.lock().await.add_full_scan(&self.table); + } + new_ring_r = s_ring_recv => { + if let Some(new_ring) = new_ring_r { + self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring); + prev_ring = new_ring; + } + } + must_exit_v = s_must_exit => { + if must_exit_v.unwrap_or(false) { + return Ok(()) + } + } + } + } + } + + async fn syncer_task( + self: Arc, + mut must_exit: watch::Receiver, + ) -> Result<(), Error> { + loop { + let s_pop_task = self.pop_task().fuse(); + let s_must_exit = must_exit.recv().fuse(); + pin_mut!(s_must_exit, s_pop_task); + + select! { + task = s_pop_task => { + if let Some(partition) = task { + let res = self.sync_partition(&partition).await; + if let Err(e) = res { + eprintln!("Error while syncing {:?}: {}", partition, e); + } + } else { + tokio::time::delay_for(Duration::from_secs(1)).await; + } + } + must_exit_v = s_must_exit => { + if must_exit_v.unwrap_or(false) { + return Ok(()) + } + } + } + } + } + + async fn pop_task(&self) -> Option { + self.todo.lock().await.pop_task() + } + + async fn sync_partition(self: &Arc, partition: &Partition) -> Result<(), Error> { + unimplemented!() + } +} + +impl SyncTodo { + fn add_full_scan(&mut self, table: &Table) { + let my_id = table.system.id.clone(); + + self.todo.clear(); + + let ring: Arc = table.system.ring.borrow().clone(); + for i in 0..ring.ring.len() { + let nodes = ring.walk_ring_from_pos(i, table.param.replication_factor); + let begin = ring.ring[i].location.clone(); + + if i == ring.ring.len() - 1 { + let end = ring.ring[0].location.clone(); + self.add_full_scan_aux(table, begin, [0xffu8; 32].into(), &nodes[..], &my_id); + self.add_full_scan_aux(table, [0u8; 32].into(), end, &nodes[..], &my_id); + } else { + let end = ring.ring[i + 1].location.clone(); + self.add_full_scan_aux(table, begin, end, &nodes[..], &my_id); + } + } + } + + fn add_full_scan_aux( + &mut self, + table: &Table, + begin: Hash, + end: Hash, + nodes: &[UUID], + my_id: &UUID, + ) { + if !nodes.contains(my_id) { + // Check if we have some data to send, otherwise skip + if table + .store + .range(begin.clone()..end.clone()) + .next() + .is_none() + {} + } + + self.todo.push(Partition { begin, end }); + } + + fn add_ring_difference(&mut self, table: &Table, old: &Ring, new: &Ring) { + let old_ring = ring_points(old); + let new_ring = ring_points(new); + unimplemented!() + } + + fn pop_task(&mut self) -> Option { + if self.todo.is_empty() { + return None; + } + + let i = rand::thread_rng().gen_range::(0, self.todo.len()); + if i == self.todo.len() - 1 { + self.todo.pop() + } else { + let replacement = self.todo.pop().unwrap(); + let ret = std::mem::replace(&mut self.todo[i], replacement); + Some(ret) + } + } +} + +fn ring_points(ring: &Ring) -> BTreeSet { + let mut ret = BTreeSet::new(); + ret.insert([0u8; 32].into()); + ret.insert([0xFFu8; 32].into()); + for i in 0..ring.ring.len() { + ret.insert(ring.ring[i].location.clone()); + } + ret +} diff --git a/src/tls_util.rs b/src/tls_util.rs index dfc4e716..52c52110 100644 --- a/src/tls_util.rs +++ b/src/tls_util.rs @@ -1,17 +1,17 @@ -use std::{fs, io}; -use core::task::{Poll, Context}; +use core::future::Future; +use core::task::{Context, Poll}; use std::pin::Pin; use std::sync::Arc; -use core::future::Future; +use std::{fs, io}; use futures_util::future::*; -use tokio::io::{AsyncRead, AsyncWrite}; -use rustls::internal::pemfile; -use hyper::client::HttpConnector; use hyper::client::connect::Connection; +use hyper::client::HttpConnector; use hyper::service::Service; use hyper::Uri; use hyper_rustls::MaybeHttpsStream; +use rustls::internal::pemfile; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio_rustls::TlsConnector; use webpki::DNSNameRef; @@ -58,7 +58,6 @@ pub fn load_private_key(filename: &str) -> Result { Ok(keys[0].clone()) } - // ---- AWFUL COPYPASTA FROM HYPER-RUSTLS connector.rs // ---- ALWAYS USE `garage` AS HOSTNAME FOR TLS VERIFICATION @@ -85,56 +84,56 @@ impl HttpsConnectorFixedDnsname { } impl Service for HttpsConnectorFixedDnsname - where +where T: Service, T::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static, T::Future: Send + 'static, T::Error: Into, { - type Response = MaybeHttpsStream; - type Error = BoxError; + type Response = MaybeHttpsStream; + type Error = BoxError; - #[allow(clippy::type_complexity)] - type Future = - Pin, BoxError>> + Send>>; + #[allow(clippy::type_complexity)] + type Future = + Pin, BoxError>> + Send>>; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - match self.http.poll_ready(cx) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), - Poll::Pending => Poll::Pending, - } - } + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + match self.http.poll_ready(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), + Poll::Pending => Poll::Pending, + } + } - fn call(&mut self, dst: Uri) -> Self::Future { - let is_https = dst.scheme_str() == Some("https"); + fn call(&mut self, dst: Uri) -> Self::Future { + let is_https = dst.scheme_str() == Some("https"); - if !is_https { - let connecting_future = self.http.call(dst); + if !is_https { + let connecting_future = self.http.call(dst); - let f = async move { - let tcp = connecting_future.await.map_err(Into::into)?; + let f = async move { + let tcp = connecting_future.await.map_err(Into::into)?; - Ok(MaybeHttpsStream::Http(tcp)) - }; - f.boxed() - } else { - let cfg = self.tls_config.clone(); - let connecting_future = self.http.call(dst); + Ok(MaybeHttpsStream::Http(tcp)) + }; + f.boxed() + } else { + let cfg = self.tls_config.clone(); + let connecting_future = self.http.call(dst); - let dnsname = DNSNameRef::try_from_ascii_str(self.fixed_dnsname) - .expect("Invalid fixed dnsname"); + let dnsname = + DNSNameRef::try_from_ascii_str(self.fixed_dnsname).expect("Invalid fixed dnsname"); - let f = async move { - let tcp = connecting_future.await.map_err(Into::into)?; - let connector = TlsConnector::from(cfg); - let tls = connector - .connect(dnsname, tcp) - .await - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - Ok(MaybeHttpsStream::Https(tls)) - }; - f.boxed() - } - } + let f = async move { + let tcp = connecting_future.await.map_err(Into::into)?; + let connector = TlsConnector::from(cfg); + let tls = connector + .connect(dnsname, tcp) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + Ok(MaybeHttpsStream::Https(tls)) + }; + f.boxed() + } + } }