diff --git a/Cargo.lock b/Cargo.lock index 0d4521c..52e67f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -295,6 +295,7 @@ dependencies = [ name = "garage" version = "0.1.0" dependencies = [ + "arc-swap 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "async-trait 0.1.30 (registry+https://github.com/rust-lang/crates.io-index)", "bincode 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 478aa1b..a66a712 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ sha2 = "0.8" async-trait = "0.1.30" reduce = "0.1.2" serde_json = "1.0" +arc-swap = "0.4" rustls = "0.17" tokio-rustls = "0.13" diff --git a/src/block.rs b/src/block.rs index e898ad1..25a1091 100644 --- a/src/block.rs +++ b/src/block.rs @@ -6,6 +6,7 @@ use futures::stream::*; use tokio::fs; use tokio::prelude::*; use tokio::sync::{watch, Mutex}; +use arc_swap::ArcSwapOption; use crate::data; use crate::data::*; @@ -13,6 +14,7 @@ use crate::error::Error; use crate::membership::System; use crate::proto::*; use crate::rpc_client::*; +use crate::server::Garage; pub struct BlockManager { pub data_dir: PathBuf, @@ -20,10 +22,11 @@ pub struct BlockManager { pub resync_queue: sled::Tree, pub lock: Mutex<()>, pub system: Arc, + pub garage: ArcSwapOption, } impl BlockManager { - pub async fn new(db: &sled::Db, data_dir: PathBuf, system: Arc) -> Arc { + pub fn new(db: &sled::Db, data_dir: PathBuf, system: Arc) -> Arc { let rc = db .open_tree("block_local_rc") .expect("Unable to open block_local_rc tree"); @@ -33,20 +36,23 @@ impl BlockManager { .open_tree("block_local_resync_queue") .expect("Unable to open block_local_resync_queue tree"); - let block_manager = Arc::new(Self { + Arc::new(Self { rc, resync_queue, data_dir, lock: Mutex::new(()), system, - }); - let bm2 = block_manager.clone(); - block_manager + garage: ArcSwapOption::from(None), + }) + } + + pub async fn spawn_background_worker(self: Arc) { + let bm2 = self.clone(); + self .system .background .spawn_worker(move |must_exit| bm2.resync_loop(must_exit)) .await; - block_manager } pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result { @@ -80,7 +86,7 @@ impl BlockManager { let _lock = self.lock.lock().await; eprintln!("Block {:?} is corrupted. Deleting and resyncing.", hash); fs::remove_file(path).await?; - self.resync_queue.insert(hash.to_vec(), vec![1u8])?; + self.put_to_resync(&hash, 0)?; return Err(Error::CorruptData(hash.clone())); } @@ -98,38 +104,55 @@ impl BlockManager { } pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { - self.rc.merge(&hash, vec![1])?; + let new_rc = self.rc.merge(&hash, vec![1])?; + if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { + self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?; + } Ok(()) } pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { - if self.rc.merge(&hash, vec![0])?.is_none() { - self.resync_queue.insert(hash.to_vec(), vec![1u8])?; + let new_rc = self.rc.merge(&hash, vec![0])?; + if new_rc.is_none() { + self.put_to_resync(&hash, 2 * BLOCK_RW_TIMEOUT.as_millis() as u64)?; } Ok(()) } + fn put_to_resync(&self, hash: &Hash, delay_millis: u64) -> Result<(), Error> { + let when = now_msec() + delay_millis; + eprintln!("Put resync_queue: {} {:?}", when, hash); + let mut key = u64::to_be_bytes(when).to_vec(); + key.extend(hash.as_ref()); + self.resync_queue.insert(key, hash.as_ref())?; + Ok(()) + } + async fn resync_loop(self: Arc, must_exit: watch::Receiver) -> Result<(), Error> { while !*must_exit.borrow() { - if let Some((hash_bytes, _v)) = self.resync_queue.get_gt(&[])? { - let mut hash = [0u8; 32]; - hash.copy_from_slice(hash_bytes.as_ref()); - let hash = Hash::from(hash); + if let Some((time_bytes, hash_bytes)) = self.resync_queue.get_gt(&[])? { + let time_msec = u64_from_bytes(&time_bytes[0..8]); + eprintln!("First in resync queue: {} (now = {})", time_msec, now_msec()); + if now_msec() >= time_msec { + let mut hash = [0u8; 32]; + hash.copy_from_slice(hash_bytes.as_ref()); + let hash = Hash::from(hash); - match self.resync_iter(&hash).await { - Ok(_) => { - self.resync_queue.remove(&hash_bytes)?; - } - Err(e) => { - eprintln!( - "Failed to resync hash {:?}, leaving it in queue: {}", - hash, e - ); + match self.resync_iter(&hash).await { + Ok(_) => { + self.resync_queue.remove(&hash_bytes)?; + } + Err(e) => { + eprintln!( + "Failed to resync hash {:?}, leaving it in queue: {}", + hash, e + ); + } } + continue; } - } else { - tokio::time::delay_for(Duration::from_secs(1)).await; } + tokio::time::delay_for(Duration::from_secs(1)).await; } Ok(()) } @@ -145,14 +168,23 @@ impl BlockManager { .map(|x| u64_from_bytes(x.as_ref()) > 0) .unwrap_or(false); + eprintln!("Resync block {:?}: exists {}, needed {}", hash, exists, needed); + if exists && !needed { - // TODO: verify that other nodes that might need it have it ? + let garage = self.garage.load_full().unwrap(); + let active_refs = garage.block_ref_table.get_range(&hash, &[0u8; 32].into(), Some(()), 1).await?; + let needed_by_others = !active_refs.is_empty(); + if needed_by_others { + // TODO check they have it and send it if not + } fs::remove_file(path).await?; self.resync_queue.remove(&hash)?; } if needed && !exists { // TODO find a way to not do this if they are sending it to us + // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay + // between the RC being incremented and this part being called. let block_data = rpc_get_block(&self.system, &hash).await?; self.write_block(hash, &block_data[..]).await?; } @@ -190,11 +222,8 @@ fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option> { } pub async fn rpc_get_block(system: &Arc, hash: &Hash) -> Result, Error> { - let who = system - .ring - .borrow() - .clone() - .walk_ring(&hash, system.config.data_replication_factor); + let ring = system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, system.config.data_replication_factor); let msg = Message::GetBlock(hash.clone()); let mut resp_stream = who .iter() @@ -215,11 +244,8 @@ pub async fn rpc_get_block(system: &Arc, hash: &Hash) -> Result, } pub async fn rpc_put_block(system: &Arc, hash: Hash, data: Vec) -> Result<(), Error> { - let who = system - .ring - .borrow() - .clone() - .walk_ring(&hash, system.config.data_replication_factor); + let ring = system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, system.config.data_replication_factor); rpc_try_call_many( system.clone(), &who[..], diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs index cf24fea..5f6ce21 100644 --- a/src/block_ref_table.rs +++ b/src/block_ref_table.rs @@ -44,6 +44,7 @@ impl TableSchema for BlockRefTable { type P = Hash; type S = UUID; type E = BlockRef; + type Filter = (); async fn updated(&self, old: Option, new: Option) { let block = &old.as_ref().or(new.as_ref()).unwrap().block; @@ -60,4 +61,8 @@ impl TableSchema for BlockRefTable { } } } + + fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { + !entry.deleted + } } diff --git a/src/object_table.rs b/src/object_table.rs index fbacf2d..880543e 100644 --- a/src/object_table.rs +++ b/src/object_table.rs @@ -96,6 +96,7 @@ impl TableSchema for ObjectTable { type P = String; type S = String; type E = Object; + type Filter = (); async fn updated(&self, old: Option, new: Option) { let version_table = self.version_table.clone(); @@ -122,4 +123,9 @@ impl TableSchema for ObjectTable { }); } } + + fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { + // TODO + true + } } diff --git a/src/server.rs b/src/server.rs index 287b438..591a7bf 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,10 +1,11 @@ -pub use futures_util::future::FutureExt; -use serde::Deserialize; use std::collections::HashMap; use std::io::{Read, Write}; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; + +pub use futures_util::future::FutureExt; +use serde::Deserialize; use tokio::sync::watch; use crate::api_server; @@ -66,9 +67,11 @@ impl Garage { db: sled::Db, background: Arc, ) -> Arc { + println!("Initialize membership management system..."); let system = Arc::new(System::new(config.clone(), id, background.clone())); - let block_manager = BlockManager::new(&db, config.data_dir.clone(), system.clone()).await; + println!("Initialize block manager..."); + let block_manager = BlockManager::new(&db, config.data_dir.clone(), system.clone()); let data_rep_param = TableReplicationParams { replication_factor: system.config.data_replication_factor, @@ -84,6 +87,7 @@ impl Garage { timeout: DEFAULT_TIMEOUT, }; + println!("Initialize block_ref_table..."); let block_ref_table = Table::new( BlockRefTable { background: background.clone(), @@ -95,6 +99,8 @@ impl Garage { data_rep_param.clone(), ) .await; + + println!("Initialize version_table..."); let version_table = Table::new( VersionTable { background: background.clone(), @@ -106,6 +112,8 @@ impl Garage { meta_rep_param.clone(), ) .await; + + println!("Initialize object_table..."); let object_table = Table::new( ObjectTable { background: background.clone(), @@ -118,6 +126,7 @@ impl Garage { ) .await; + println!("Initialize Garage..."); let mut garage = Self { db, system: system.clone(), @@ -142,7 +151,13 @@ impl Garage { garage.block_ref_table.clone().rpc_handler(), ); - Arc::new(garage) + let garage = Arc::new(garage); + + println!("Start block manager background thread..."); + garage.block_manager.garage.swap(Some(garage.clone())); + garage.block_manager.clone().spawn_background_worker().await; + + garage } } @@ -206,20 +221,25 @@ async fn wait_from(mut chan: watch::Receiver) -> () { } pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { + println!("Loading configuration..."); let config = read_config(config_file).expect("Unable to read config file"); - let mut db_path = config.metadata_dir.clone(); - db_path.push("db"); - let db = sled::open(db_path).expect("Unable to open DB"); - let id = gen_node_id(&config.metadata_dir).expect("Unable to read or generate node ID"); println!("Node ID: {}", hex::encode(&id)); + println!("Opening database..."); + let mut db_path = config.metadata_dir.clone(); + db_path.push("db"); + let db = sled::open(db_path).expect("Unable to open DB"); + let (send_cancel, watch_cancel) = watch::channel(false); + println!("Initializing background runner..."); let background = BackgroundRunner::new(8, watch_cancel.clone()); + let garage = Garage::new(config, id, db, background.clone()).await; + println!("Initializing RPC and API servers..."); let rpc_server = rpc_server::run_rpc_server(garage.clone(), wait_from(watch_cancel.clone())); let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); diff --git a/src/table.rs b/src/table.rs index 6892c9f..40114ae 100644 --- a/src/table.rs +++ b/src/table.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, BTreeMap}; use std::sync::Arc; use std::time::Duration; @@ -60,10 +60,11 @@ pub enum TableRPC { ReadEntry(F::P, F::S), ReadEntryResponse(Option), + ReadRange(F::P, F::S, Option, usize), + Update(Vec>), - SyncChecksums(Vec), - SyncDifferentSet(Vec), + SyncRPC(SyncRPC), } pub trait PartitionKey { @@ -118,11 +119,15 @@ pub trait TableSchema: Send + Sync { type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type E: Entry; + type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; async fn updated(&self, old: Option, new: Option); + fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { true } } impl Table { + // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== + pub async fn new( instance: F, system: Arc, @@ -144,18 +149,10 @@ impl Table { table } - pub fn rpc_handler(self: Arc) -> Box { - Box::new(TableRpcHandlerAdapter:: { table: self }) - } - pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self - .system - .ring - .borrow() - .clone() - .walk_ring(&hash, self.param.replication_factor); + let ring = self.system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, self.param.replication_factor); eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); @@ -171,12 +168,8 @@ impl Table { for entry in entries.iter() { let hash = entry.partition_key().hash(); - let who = self - .system - .ring - .borrow() - .clone() - .walk_ring(&hash, self.param.replication_factor); + let ring = self.system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, self.param.replication_factor); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { @@ -215,12 +208,8 @@ impl Table { sort_key: &F::S, ) -> Result, Error> { let hash = partition_key.hash(); - let who = self - .system - .ring - .borrow() - .clone() - .walk_ring(&hash, self.param.replication_factor); + let ring = self.system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, self.param.replication_factor); eprintln!("get who: {:?}", who); let rpc = &TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); @@ -251,15 +240,76 @@ impl Table { } if let Some(ret_entry) = &ret { if not_all_same { + let self2 = self.clone(); + let ent2 = ret_entry.clone(); self.system .background - .spawn(self.clone().repair_on_read(who, ret_entry.clone())); + .spawn(async move { + self2.repair_on_read(&who[..], ent2).await + }); } } Ok(ret) } - async fn repair_on_read(self: Arc, who: Vec, what: F::E) -> Result<(), Error> { + pub async fn get_range( + self: &Arc, + partition_key: &F::P, + begin_sort_key: &F::S, + filter: Option, + limit: usize, + ) -> Result, Error> { + let hash = partition_key.hash(); + let ring = self.system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, self.param.replication_factor); + + let rpc = &TableRPC::::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit); + let resps = self + .rpc_try_call_many(&who[..], &rpc, self.param.read_quorum) + .await?; + + let mut ret = BTreeMap::new(); + let mut to_repair = BTreeMap::new(); + for resp in resps { + if let TableRPC::Update(entries) = resp { + for entry_bytes in entries.iter() { + let entry = rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?; + let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); + match ret.remove(&entry_key) { + None => { + ret.insert(entry_key, Some(entry)); + } + Some(Some(mut prev)) => { + let must_repair = prev != entry; + prev.merge(&entry); + if must_repair { + to_repair.insert(entry_key.clone(), Some(prev.clone())); + } + ret.insert(entry_key, Some(prev)); + } + Some(None) => unreachable!(), + } + } + } + } + if !to_repair.is_empty() { + let self2 = self.clone(); + self.system + .background + .spawn(async move { + for (_, v) in to_repair.iter_mut() { + self2.repair_on_read(&who[..], v.take().unwrap()).await?; + } + Ok(()) + }); + } + let ret_vec = ret.iter_mut().take(limit).map(|(_k, v)| v.take().unwrap()).collect::>(); + Ok(ret_vec) + } + + // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== + + async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); self.rpc_try_call_many(&who[..], &TableRPC::::Update(vec![what_enc]), who.len()) .await?; @@ -322,6 +372,12 @@ impl Table { ))) } + // =============== HANDLERS FOR RPC OPERATIONS (SERVER SIDE) ============== + + pub fn rpc_handler(self: Arc) -> Box { + Box::new(TableRpcHandlerAdapter:: { table: self }) + } + async fn handle(self: &Arc, msg: TableRPC) -> Result, Error> { match msg { TableRPC::ReadEntry(key, sort_key) => { @@ -332,12 +388,12 @@ impl Table { self.handle_update(pairs).await?; Ok(TableRPC::Ok) } - TableRPC::SyncChecksums(checksums) => { + TableRPC::SyncRPC(rpc) => { let syncer = self.syncer.read().await.as_ref().unwrap().clone(); - let differing = syncer - .handle_checksum_rpc(&checksums[..], self.system.background.stop_signal.clone()) + let response = syncer + .handle_rpc(&rpc, self.system.background.stop_signal.clone()) .await?; - Ok(TableRPC::SyncDifferentSet(differing)) + Ok(TableRPC::SyncRPC(response)) } _ => Err(Error::RPCError(format!("Unexpected table RPC"))), } diff --git a/src/table_sync.rs b/src/table_sync.rs index 8eb0807..5ef13d6 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -27,6 +27,12 @@ pub struct TableSyncer { pub cache: Vec>>, } +#[derive(Serialize, Deserialize)] +pub enum SyncRPC { + Checksums(Vec), + DifferentSet(Vec), +} + pub struct SyncTodo { pub todo: Vec, } @@ -166,13 +172,8 @@ impl TableSyncer { .root_checksum(&partition.begin, &partition.end, must_exit) .await?; - let nodes = self - .table - .system - .ring - .borrow() - .clone() - .walk_ring(&partition.begin, self.table.param.replication_factor); + let ring = self.table.system.ring.borrow().clone(); + let nodes = ring.walk_ring(&partition.begin, self.table.param.replication_factor); let mut sync_futures = nodes .iter() .map(|node| { @@ -361,9 +362,9 @@ impl TableSyncer { let rpc_resp = self .table - .rpc_call(&who, &TableRPC::::SyncChecksums(step)) + .rpc_call(&who, &TableRPC::::SyncRPC(SyncRPC::Checksums(step))) .await?; - if let TableRPC::::SyncDifferentSet(mut s) = rpc_resp { + if let TableRPC::::SyncRPC(SyncRPC::DifferentSet(mut s)) = rpc_resp { let mut items = vec![]; for differing in s.drain(..) { if differing.level == 0 { @@ -381,7 +382,7 @@ impl TableSyncer { } } else { return Err(Error::Message(format!( - "Unexpected response to RPC SyncChecksums: {}", + "Unexpected response to sync RPC checksums: {}", debug_serialize(&rpc_resp) ))); } @@ -417,41 +418,44 @@ impl TableSyncer { } } - pub async fn handle_checksum_rpc( + pub async fn handle_rpc( self: &Arc, - checksums: &[RangeChecksum], + message: &SyncRPC, mut must_exit: watch::Receiver, - ) -> Result, Error> { - let mut ret = vec![]; - for ckr in checksums.iter() { - let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?; - for (range, hash) in ckr.children.iter() { - match our_ckr - .children - .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) - { - Err(_) => { - ret.push(range.clone()); - } - Ok(i) => { - if our_ckr.children[i].1 != *hash { + ) -> Result { + if let SyncRPC::Checksums(checksums) = message { + let mut ret = vec![]; + for ckr in checksums.iter() { + let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?; + for (range, hash) in ckr.children.iter() { + match our_ckr + .children + .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) + { + Err(_) => { ret.push(range.clone()); } + Ok(i) => { + if our_ckr.children[i].1 != *hash { + ret.push(range.clone()); + } + } } } } + let n_checksums = checksums + .iter() + .map(|x| x.children.len()) + .fold(0, |x, y| x + y); + eprintln!( + "({}) Checksum comparison RPC: {} different out of {}", + self.table.name, + ret.len(), + n_checksums + ); + return Ok(SyncRPC::DifferentSet(ret)); } - let n_checksums = checksums - .iter() - .map(|x| x.children.len()) - .fold(0, |x, y| x + y); - eprintln!( - "({}) Checksum comparison RPC: {} different out of {}", - self.table.name, - ret.len(), - n_checksums - ); - Ok(ret) + Err(Error::Message(format!("Unexpected sync RPC"))) } pub async fn invalidate(self: Arc, item_key: Vec) -> Result<(), Error> { diff --git a/src/version_table.rs b/src/version_table.rs index 77a7560..22290fd 100644 --- a/src/version_table.rs +++ b/src/version_table.rs @@ -62,6 +62,7 @@ impl TableSchema for VersionTable { type P = Hash; type S = EmptySortKey; type E = Version; + type Filter = (); async fn updated(&self, old: Option, new: Option) { let block_ref_table = self.block_ref_table.clone(); @@ -84,4 +85,8 @@ impl TableSchema for VersionTable { }); } } + + fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { + !entry.deleted + } }