forked from Deuxfleurs/garage
Local refcounting of blocks
This commit is contained in:
parent
dcf58499a4
commit
5dd59e437d
4 changed files with 119 additions and 41 deletions
128
src/block.rs
128
src/block.rs
|
@ -1,49 +1,113 @@
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use tokio::prelude::*;
|
use tokio::prelude::*;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
use futures_util::future::*;
|
||||||
|
|
||||||
use crate::data::*;
|
use crate::data::*;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::proto::*;
|
use crate::proto::*;
|
||||||
use crate::server::Garage;
|
use crate::background::*;
|
||||||
|
|
||||||
fn block_dir(garage: &Garage, hash: &Hash) -> PathBuf {
|
|
||||||
let mut path = garage.system.config.data_dir.clone();
|
pub struct BlockManager {
|
||||||
path.push(hex::encode(&hash.as_slice()[0..1]));
|
pub data_dir: PathBuf,
|
||||||
path.push(hex::encode(&hash.as_slice()[1..2]));
|
pub rc: sled::Tree,
|
||||||
path
|
pub lock: Mutex<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn write_block(garage: Arc<Garage>, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
|
impl BlockManager {
|
||||||
garage.fs_lock.lock().await;
|
pub fn new(db: &sled::Db, data_dir: PathBuf) -> Self {
|
||||||
|
let rc = db.open_tree("block_local_rc")
|
||||||
let mut path = block_dir(&garage, hash);
|
.expect("Unable to open block_local_rc tree");
|
||||||
fs::create_dir_all(&path).await?;
|
rc.set_merge_operator(rc_merge);
|
||||||
|
Self{
|
||||||
path.push(hex::encode(hash));
|
rc,
|
||||||
if fs::metadata(&path).await.is_ok() {
|
data_dir,
|
||||||
return Ok(Message::Ok);
|
lock: Mutex::new(()),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut f = fs::File::create(path).await?;
|
pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
|
||||||
f.write_all(data).await?;
|
let _lock = self.lock.lock().await;
|
||||||
drop(f);
|
|
||||||
|
|
||||||
Ok(Message::Ok)
|
let mut path = self.block_dir(hash);
|
||||||
|
fs::create_dir_all(&path).await?;
|
||||||
|
|
||||||
|
path.push(hex::encode(hash));
|
||||||
|
if fs::metadata(&path).await.is_ok() {
|
||||||
|
return Ok(Message::Ok);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut f = fs::File::create(path).await?;
|
||||||
|
f.write_all(data).await?;
|
||||||
|
drop(f);
|
||||||
|
|
||||||
|
Ok(Message::Ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
|
||||||
|
let mut path = self.block_dir(hash);
|
||||||
|
path.push(hex::encode(hash));
|
||||||
|
|
||||||
|
let mut f = fs::File::open(path).await?;
|
||||||
|
let mut data = vec![];
|
||||||
|
f.read_to_end(&mut data).await?;
|
||||||
|
|
||||||
|
Ok(Message::PutBlock(PutBlockMessage {
|
||||||
|
hash: hash.clone(),
|
||||||
|
data,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn block_dir(&self, hash: &Hash) -> PathBuf {
|
||||||
|
let mut path = self.data_dir.clone();
|
||||||
|
path.push(hex::encode(&hash.as_slice()[0..1]));
|
||||||
|
path.push(hex::encode(&hash.as_slice()[1..2]));
|
||||||
|
path
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
|
||||||
|
self.rc.merge(&hash, vec![1])?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn block_decref(&self, hash: &Hash, background: &BackgroundRunner) -> Result<(), Error> {
|
||||||
|
match self.rc.merge(&hash, vec![0])? {
|
||||||
|
None => {
|
||||||
|
let mut path = self.block_dir(hash);
|
||||||
|
path.push(hex::encode(hash));
|
||||||
|
background.spawn(tokio::fs::remove_file(path).map_err(Into::into));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Some(_) => Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn read_block(garage: Arc<Garage>, hash: &Hash) -> Result<Message, Error> {
|
fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> {
|
||||||
let mut path = block_dir(&garage, hash);
|
let old = old.map(|x| {
|
||||||
path.push(hex::encode(hash));
|
assert!(x.len() == 8);
|
||||||
|
let mut x8 = [0u8; 8];
|
||||||
let mut f = fs::File::open(path).await?;
|
x8.copy_from_slice(x);
|
||||||
let mut data = vec![];
|
u64::from_be_bytes(x8)
|
||||||
f.read_to_end(&mut data).await?;
|
}).unwrap_or(0);
|
||||||
|
assert!(new.len() == 1);
|
||||||
Ok(Message::PutBlock(PutBlockMessage {
|
let new = match new[0] {
|
||||||
hash: hash.clone(),
|
0 => {
|
||||||
data,
|
if old > 0 {
|
||||||
}))
|
old - 1
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
1 => old + 1,
|
||||||
|
_ => unreachable!(),
|
||||||
|
};
|
||||||
|
if new == 0 {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(u64::to_be_bytes(new).to_vec())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,19 @@ impl TableFormat for BlockRefTable {
|
||||||
type E = BlockRef;
|
type E = BlockRef;
|
||||||
|
|
||||||
async fn updated(&self, old: Option<Self::E>, new: Self::E) {
|
async fn updated(&self, old: Option<Self::E>, new: Self::E) {
|
||||||
//unimplemented!()
|
let garage = self.garage.read().await.as_ref().cloned().unwrap();
|
||||||
// TODO
|
|
||||||
|
let was_before = old.map(|x| !x.deleted).unwrap_or(false);
|
||||||
|
let is_after = !new.deleted;
|
||||||
|
if is_after && !was_before {
|
||||||
|
if let Err(e) = garage.block_manager.block_incref(&new.block) {
|
||||||
|
eprintln!("Failed to incref block {:?}: {}", &new.block, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if was_before && !is_after {
|
||||||
|
if let Err(e) = garage.block_manager.block_decref(&new.block, &garage.background) {
|
||||||
|
eprintln!("Failed to decref or delete block {:?}: {}", &new.block, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,6 @@ use hyper::service::{make_service_fn, service_fn};
|
||||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
|
||||||
use crate::block::*;
|
|
||||||
use crate::data::rmp_to_vec_all_named;
|
use crate::data::rmp_to_vec_all_named;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::proto::Message;
|
use crate::proto::Message;
|
||||||
|
@ -65,8 +64,8 @@ async fn handler(
|
||||||
Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await,
|
Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await,
|
||||||
Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await,
|
Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await,
|
||||||
|
|
||||||
Message::PutBlock(m) => write_block(garage, &m.hash, &m.data).await,
|
Message::PutBlock(m) => garage.block_manager.write_block(&m.hash, &m.data).await,
|
||||||
Message::GetBlock(h) => read_block(garage, &h).await,
|
Message::GetBlock(h) => garage.block_manager.read_block(&h).await,
|
||||||
|
|
||||||
Message::TableRPC(table, msg) => {
|
Message::TableRPC(table, msg) => {
|
||||||
if let Some(rpc_handler) = garage.table_rpc_handlers.get(table) {
|
if let Some(rpc_handler) = garage.table_rpc_handlers.get(table) {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use futures_util::future::FutureExt;
|
pub use futures_util::future::FutureExt;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io::{Read, Write};
|
use std::io::{Read, Write};
|
||||||
|
@ -6,11 +6,12 @@ use std::net::SocketAddr;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
use tokio::sync::{Mutex, RwLock};
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
use crate::api_server;
|
use crate::api_server;
|
||||||
use crate::background::*;
|
use crate::background::*;
|
||||||
use crate::data::*;
|
use crate::data::*;
|
||||||
|
use crate::block::*;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::membership::System;
|
use crate::membership::System;
|
||||||
use crate::proto::*;
|
use crate::proto::*;
|
||||||
|
@ -40,7 +41,7 @@ pub struct Config {
|
||||||
pub struct Garage {
|
pub struct Garage {
|
||||||
pub db: sled::Db,
|
pub db: sled::Db,
|
||||||
pub system: Arc<System>,
|
pub system: Arc<System>,
|
||||||
pub fs_lock: Mutex<()>,
|
pub block_manager: BlockManager,
|
||||||
pub background: Arc<BackgroundRunner>,
|
pub background: Arc<BackgroundRunner>,
|
||||||
|
|
||||||
pub table_rpc_handlers: HashMap<String, Box<dyn TableRpcHandler + Sync + Send>>,
|
pub table_rpc_handlers: HashMap<String, Box<dyn TableRpcHandler + Sync + Send>>,
|
||||||
|
@ -57,6 +58,8 @@ impl Garage {
|
||||||
db: sled::Db,
|
db: sled::Db,
|
||||||
background: Arc<BackgroundRunner>,
|
background: Arc<BackgroundRunner>,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
|
let block_manager = BlockManager::new(&db, config.data_dir.clone());
|
||||||
|
|
||||||
let system = Arc::new(System::new(config, id, background.clone()));
|
let system = Arc::new(System::new(config, id, background.clone()));
|
||||||
|
|
||||||
let meta_rep_param = TableReplicationParams {
|
let meta_rep_param = TableReplicationParams {
|
||||||
|
@ -105,7 +108,7 @@ impl Garage {
|
||||||
let mut garage = Self {
|
let mut garage = Self {
|
||||||
db,
|
db,
|
||||||
system: system.clone(),
|
system: system.clone(),
|
||||||
fs_lock: Mutex::new(()),
|
block_manager,
|
||||||
background,
|
background,
|
||||||
table_rpc_handlers: HashMap::new(),
|
table_rpc_handlers: HashMap::new(),
|
||||||
object_table,
|
object_table,
|
||||||
|
|
Loading…
Reference in a new issue