use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use futures::future::join_all; use futures::select; use futures_util::future::*; use tokio::sync::watch; use garage_util::data::*; use garage_util::error::Error; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use crate::data::*; use crate::replication::*; use crate::schema::*; use crate::table::*; const TABLE_GC_BATCH_SIZE: usize = 1024; const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); pub struct TableGC { data: Arc>, aux: Arc>, rpc_client: Arc>, } #[derive(Serialize, Deserialize)] enum GcRPC { Update(Vec), DeleteIfEqualHash(Vec<(ByteBuf, Hash)>), Ok, } impl RpcMessage for GcRPC {} impl TableGC where F: TableSchema + 'static, R: TableReplication + 'static, { pub(crate) fn launch( data: Arc>, aux: Arc>, rpc_server: &mut RpcServer, ) -> Arc { let rpc_path = format!("table_{}/gc", data.name); let rpc_client = aux.system.rpc_client::(&rpc_path); let gc = Arc::new(Self { data: data.clone(), aux: aux.clone(), rpc_client, }); gc.register_handler(rpc_server, rpc_path); let gc1 = gc.clone(); aux.system.background.spawn_worker( format!("GC loop for {}", data.name), move |must_exit: watch::Receiver| gc1.gc_loop(must_exit), ); gc } async fn gc_loop(self: Arc, mut must_exit: watch::Receiver) -> Result<(), Error> { while !*must_exit.borrow() { match self.gc_loop_iter().await { Ok(true) => { // Stuff was done, loop imediately } Ok(false) => { select! { _ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (), _ = must_exit.recv().fuse() => (), } } Err(e) => { warn!("({}) Error doing GC: {}", self.data.name, e); } } } Ok(()) } async fn gc_loop_iter(&self) -> Result { let mut entries = vec![]; let mut excluded = vec![]; for item in self.data.gc_todo.iter() { let (k, vhash) = item?; let vhash = Hash::try_from(&vhash[..]).unwrap(); let v_opt = self .data .store .get(&k[..])? .filter(|v| blake2sum(&v[..]) == vhash); if let Some(v) = v_opt { entries.push((ByteBuf::from(k.to_vec()), vhash, ByteBuf::from(v.to_vec()))); if entries.len() >= TABLE_GC_BATCH_SIZE { break; } } else { excluded.push((k, vhash)); } } for (k, vhash) in excluded { self.todo_remove_if_equal(&k[..], vhash)?; } if entries.len() == 0 { // Nothing to do in this iteration return Ok(false); } debug!("({}) GC: doing {} items", self.data.name, entries.len()); let mut partitions = HashMap::new(); for (k, vhash, v) in entries { let pkh = Hash::try_from(&k[..32]).unwrap(); let mut nodes = self.aux.replication.write_nodes(&pkh, &self.aux.system); nodes.retain(|x| *x != self.aux.system.id); nodes.sort(); if !partitions.contains_key(&nodes) { partitions.insert(nodes.clone(), vec![]); } partitions.get_mut(&nodes).unwrap().push((k, vhash, v)); } let resps = join_all( partitions .into_iter() .map(|(nodes, items)| self.try_send_and_delete(nodes, items)), ) .await; for resp in resps { if let Err(e) = resp { warn!( "({}) Unable to send and delete for GC: {}", self.data.name, e ); } } Ok(true) } async fn try_send_and_delete( &self, nodes: Vec, items: Vec<(ByteBuf, Hash, ByteBuf)>, ) -> Result<(), Error> { let n_items = items.len(); let mut updates = vec![]; let mut deletes = vec![]; for (k, vhash, v) in items { updates.push(v); deletes.push((k, vhash)); } self.rpc_client .try_call_many( &nodes[..], GcRPC::Update(updates), RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT), ) .await?; info!( "({}) GC: {} items successfully pushed, will try to delete.", self.data.name, n_items ); self.rpc_client .try_call_many( &nodes[..], GcRPC::DeleteIfEqualHash(deletes.clone()), RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT), ) .await?; for (k, vhash) in deletes { self.data.delete_if_equal_hash(&k[..], vhash)?; self.todo_remove_if_equal(&k[..], vhash)?; } Ok(()) } fn todo_remove_if_equal(&self, key: &[u8], vhash: Hash) -> Result<(), Error> { let _ = self .data .gc_todo .compare_and_swap::<_, _, Vec>(key, Some(vhash), None)?; Ok(()) } // ---- RPC HANDLER ---- fn register_handler(self: &Arc, rpc_server: &mut RpcServer, path: String) { let self2 = self.clone(); rpc_server.add_handler::(path, move |msg, _addr| { let self2 = self2.clone(); async move { self2.handle_rpc(&msg).await } }); let self2 = self.clone(); self.rpc_client .set_local_handler(self.aux.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle_rpc(&msg).await } }); } async fn handle_rpc(self: &Arc, message: &GcRPC) -> Result { match message { GcRPC::Update(items) => { self.data.update_many(items)?; Ok(GcRPC::Ok) } GcRPC::DeleteIfEqualHash(items) => { for (key, vhash) in items.iter() { self.data.delete_if_equal_hash(&key[..], *vhash)?; self.todo_remove_if_equal(&key[..], *vhash)?; } Ok(GcRPC::Ok) } _ => Err(Error::Message(format!("Unexpected GC RPC"))), } } }