diff --git a/src/block.rs b/src/block.rs index 99a0121f..e78ddd78 100644 --- a/src/block.rs +++ b/src/block.rs @@ -1,49 +1,113 @@ use std::path::PathBuf; -use std::sync::Arc; use tokio::fs; use tokio::prelude::*; +use tokio::sync::Mutex; +use futures_util::future::*; use crate::data::*; use crate::error::Error; use crate::proto::*; -use crate::server::Garage; +use crate::background::*; -fn block_dir(garage: &Garage, hash: &Hash) -> PathBuf { - let mut path = garage.system.config.data_dir.clone(); - path.push(hex::encode(&hash.as_slice()[0..1])); - path.push(hex::encode(&hash.as_slice()[1..2])); - path + +pub struct BlockManager { + pub data_dir: PathBuf, + pub rc: sled::Tree, + pub lock: Mutex<()>, } -pub async fn write_block(garage: Arc, hash: &Hash, data: &[u8]) -> Result { - garage.fs_lock.lock().await; - - let mut path = block_dir(&garage, hash); - fs::create_dir_all(&path).await?; - - path.push(hex::encode(hash)); - if fs::metadata(&path).await.is_ok() { - return Ok(Message::Ok); +impl BlockManager { + pub fn new(db: &sled::Db, data_dir: PathBuf) -> Self { + let rc = db.open_tree("block_local_rc") + .expect("Unable to open block_local_rc tree"); + rc.set_merge_operator(rc_merge); + Self{ + rc, + data_dir, + lock: Mutex::new(()), + } } - let mut f = fs::File::create(path).await?; - f.write_all(data).await?; - drop(f); + pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result { + let _lock = self.lock.lock().await; - Ok(Message::Ok) + let mut path = self.block_dir(hash); + fs::create_dir_all(&path).await?; + + path.push(hex::encode(hash)); + if fs::metadata(&path).await.is_ok() { + return Ok(Message::Ok); + } + + let mut f = fs::File::create(path).await?; + f.write_all(data).await?; + drop(f); + + Ok(Message::Ok) + } + + pub async fn read_block(&self, hash: &Hash) -> Result { + let mut path = self.block_dir(hash); + path.push(hex::encode(hash)); + + let mut f = fs::File::open(path).await?; + let mut data = vec![]; + f.read_to_end(&mut data).await?; + + Ok(Message::PutBlock(PutBlockMessage { + hash: hash.clone(), + data, + })) + } + + fn block_dir(&self, hash: &Hash) -> PathBuf { + let mut path = self.data_dir.clone(); + path.push(hex::encode(&hash.as_slice()[0..1])); + path.push(hex::encode(&hash.as_slice()[1..2])); + path + } + + pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { + self.rc.merge(&hash, vec![1])?; + Ok(()) + } + + pub fn block_decref(&self, hash: &Hash, background: &BackgroundRunner) -> Result<(), Error> { + match self.rc.merge(&hash, vec![0])? { + None => { + let mut path = self.block_dir(hash); + path.push(hex::encode(hash)); + background.spawn(tokio::fs::remove_file(path).map_err(Into::into)); + Ok(()) + } + Some(_) => Ok(()) + } + } } -pub async fn read_block(garage: Arc, hash: &Hash) -> Result { - let mut path = block_dir(&garage, hash); - path.push(hex::encode(hash)); - - let mut f = fs::File::open(path).await?; - let mut data = vec![]; - f.read_to_end(&mut data).await?; - - Ok(Message::PutBlock(PutBlockMessage { - hash: hash.clone(), - data, - })) +fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option> { + let old = old.map(|x| { + assert!(x.len() == 8); + let mut x8 = [0u8; 8]; + x8.copy_from_slice(x); + u64::from_be_bytes(x8) + }).unwrap_or(0); + assert!(new.len() == 1); + let new = match new[0] { + 0 => { + if old > 0 { + old - 1 + } else { + 0 + } + } + 1 => old + 1, + _ => unreachable!(), + }; + if new == 0 { + None + } else { + Some(u64::to_be_bytes(new).to_vec()) + } } diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs index b4bff937..e2310f74 100644 --- a/src/block_ref_table.rs +++ b/src/block_ref_table.rs @@ -45,7 +45,19 @@ impl TableFormat for BlockRefTable { type E = BlockRef; async fn updated(&self, old: Option, new: Self::E) { - //unimplemented!() - // TODO + let garage = self.garage.read().await.as_ref().cloned().unwrap(); + + let was_before = old.map(|x| !x.deleted).unwrap_or(false); + let is_after = !new.deleted; + if is_after && !was_before { + if let Err(e) = garage.block_manager.block_incref(&new.block) { + eprintln!("Failed to incref block {:?}: {}", &new.block, e); + } + } + if was_before && !is_after { + if let Err(e) = garage.block_manager.block_decref(&new.block, &garage.background) { + eprintln!("Failed to decref or delete block {:?}: {}", &new.block, e); + } + } } } diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 98798614..08fa909d 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -9,7 +9,6 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use serde::Serialize; -use crate::block::*; use crate::data::rmp_to_vec_all_named; use crate::error::Error; use crate::proto::Message; @@ -65,8 +64,8 @@ async fn handler( Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await, Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await, - Message::PutBlock(m) => write_block(garage, &m.hash, &m.data).await, - Message::GetBlock(h) => read_block(garage, &h).await, + Message::PutBlock(m) => garage.block_manager.write_block(&m.hash, &m.data).await, + Message::GetBlock(h) => garage.block_manager.read_block(&h).await, Message::TableRPC(table, msg) => { if let Some(rpc_handler) = garage.table_rpc_handlers.get(table) { diff --git a/src/server.rs b/src/server.rs index e1f6dc80..8b2bd0c3 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,4 +1,4 @@ -use futures_util::future::FutureExt; +pub use futures_util::future::FutureExt; use serde::Deserialize; use std::collections::HashMap; use std::io::{Read, Write}; @@ -6,11 +6,12 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::watch; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::RwLock; use crate::api_server; use crate::background::*; use crate::data::*; +use crate::block::*; use crate::error::Error; use crate::membership::System; use crate::proto::*; @@ -40,7 +41,7 @@ pub struct Config { pub struct Garage { pub db: sled::Db, pub system: Arc, - pub fs_lock: Mutex<()>, + pub block_manager: BlockManager, pub background: Arc, pub table_rpc_handlers: HashMap>, @@ -57,6 +58,8 @@ impl Garage { db: sled::Db, background: Arc, ) -> Arc { + let block_manager = BlockManager::new(&db, config.data_dir.clone()); + let system = Arc::new(System::new(config, id, background.clone())); let meta_rep_param = TableReplicationParams { @@ -105,7 +108,7 @@ impl Garage { let mut garage = Self { db, system: system.clone(), - fs_lock: Mutex::new(()), + block_manager, background, table_rpc_handlers: HashMap::new(), object_table,