Add block ref table

This commit is contained in:
Alex 2020-04-10 23:11:52 +02:00
parent ff4fb97568
commit cf8fd948fc
7 changed files with 93 additions and 23 deletions

View file

@ -133,7 +133,7 @@ async fn handle_put(
} }
let version = Version { let version = Version {
version: version_uuid.clone(), uuid: version_uuid.clone(),
deleted: false, deleted: false,
blocks: Vec::new(), blocks: Vec::new(),
bucket: bucket.into(), bucket: bucket.into(),
@ -146,7 +146,7 @@ async fn handle_put(
let mut next_offset = first_block.len(); let mut next_offset = first_block.len();
let mut put_curr_version_block = let mut put_curr_version_block =
put_version_block(garage.clone(), &version, 0, first_block_hash.clone()); put_block_meta(garage.clone(), &version, 0, first_block_hash.clone());
let mut put_curr_block = put_block(garage.clone(), first_block_hash, first_block); let mut put_curr_block = put_block(garage.clone(), first_block_hash, first_block);
loop { loop {
@ -155,7 +155,7 @@ async fn handle_put(
if let Some(block) = next_block { if let Some(block) = next_block {
let block_hash = hash(&block[..]); let block_hash = hash(&block[..]);
let block_len = block.len(); let block_len = block.len();
put_curr_version_block = put_version_block( put_curr_version_block = put_block_meta(
garage.clone(), garage.clone(),
&version, &version,
next_offset as u64, next_offset as u64,
@ -176,15 +176,28 @@ async fn handle_put(
Ok(version_uuid) Ok(version_uuid)
} }
async fn put_version_block( async fn put_block_meta(
garage: Arc<Garage>, garage: Arc<Garage>,
version: &Version, version: &Version,
offset: u64, offset: u64,
hash: Hash, hash: Hash,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut version = version.clone(); let mut version = version.clone();
version.blocks.push(VersionBlock { offset, hash }); version.blocks.push(VersionBlock {
garage.version_table.insert(&version).await?; offset,
hash: hash.clone(),
});
let block_ref = BlockRef {
block: hash,
version: version.uuid.clone(),
deleted: false,
};
futures::try_join!(
garage.version_table.insert(&version),
garage.block_ref_table.insert(&block_ref),
)?;
Ok(()) Ok(())
} }
@ -308,9 +321,7 @@ async fn handle_get(
} }
}) })
.buffered(2); .buffered(2);
let body: BodyType = Box::new(StreamBody::new( let body: BodyType = Box::new(StreamBody::new(Box::pin(body_stream)));
Box::pin(body_stream),
));
Ok(resp_builder.body(body)?) Ok(resp_builder.body(body)?)
} }
} }

51
src/block_ref_table.rs Normal file
View file

@ -0,0 +1,51 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::data::*;
use crate::server::Garage;
use crate::table::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BlockRef {
// Primary key
pub block: Hash,
// Sort key
pub version: UUID,
// Keep track of deleted status
pub deleted: bool,
}
impl Entry<Hash, UUID> for BlockRef {
fn partition_key(&self) -> &Hash {
&self.block
}
fn sort_key(&self) -> &UUID {
&self.version
}
fn merge(&mut self, other: &Self) {
if other.deleted {
self.deleted = true;
}
}
}
pub struct BlockRefTable {
pub garage: RwLock<Option<Arc<Garage>>>,
}
#[async_trait]
impl TableFormat for BlockRefTable {
type P = Hash;
type S = UUID;
type E = BlockRef;
async fn updated(&self, old: Option<&Self::E>, new: &Self::E) {
//unimplemented!()
// TODO
}
}

View file

@ -129,15 +129,6 @@ pub struct NetworkConfigEntry {
pub const INLINE_THRESHOLD: usize = 3072; pub const INLINE_THRESHOLD: usize = 3072;
#[derive(Debug, Serialize, Deserialize)] pub use crate::block_ref_table::*;
pub struct SplitpointMeta {
pub bucket: String,
pub key: String,
pub timestamp: u64,
pub uuid: UUID,
pub deleted: bool,
}
pub use crate::object_table::*; pub use crate::object_table::*;
pub use crate::version_table::*; pub use crate::version_table::*;

View file

@ -6,6 +6,7 @@ mod membership;
mod table; mod table;
mod block; mod block;
mod block_ref_table;
mod object_table; mod object_table;
mod version_table; mod version_table;

View file

@ -24,6 +24,7 @@ pub struct Garage {
pub object_table: Arc<Table<ObjectTable>>, pub object_table: Arc<Table<ObjectTable>>,
pub version_table: Arc<Table<VersionTable>>, pub version_table: Arc<Table<VersionTable>>,
pub block_ref_table: Arc<Table<BlockRefTable>>,
} }
impl Garage { impl Garage {
@ -55,6 +56,15 @@ impl Garage {
"version".to_string(), "version".to_string(),
meta_rep_param.clone(), meta_rep_param.clone(),
)); ));
let block_ref_table = Arc::new(Table::new(
BlockRefTable {
garage: RwLock::new(None),
},
system.clone(),
&db,
"block_ref".to_string(),
meta_rep_param.clone(),
));
let mut garage = Self { let mut garage = Self {
db, db,
@ -63,6 +73,7 @@ impl Garage {
table_rpc_handlers: HashMap::new(), table_rpc_handlers: HashMap::new(),
object_table, object_table,
version_table, version_table,
block_ref_table,
}; };
garage.table_rpc_handlers.insert( garage.table_rpc_handlers.insert(
@ -73,11 +84,16 @@ impl Garage {
garage.version_table.name.clone(), garage.version_table.name.clone(),
garage.version_table.clone().rpc_handler(), garage.version_table.clone().rpc_handler(),
); );
garage.table_rpc_handlers.insert(
garage.block_ref_table.name.clone(),
garage.block_ref_table.clone().rpc_handler(),
);
let garage = Arc::new(garage); let garage = Arc::new(garage);
*garage.object_table.instance.garage.write().await = Some(garage.clone()); *garage.object_table.instance.garage.write().await = Some(garage.clone());
*garage.version_table.instance.garage.write().await = Some(garage.clone()); *garage.version_table.instance.garage.write().await = Some(garage.clone());
*garage.block_ref_table.instance.garage.write().await = Some(garage.clone());
garage garage
} }

View file

@ -10,7 +10,7 @@ use crate::table::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Version { pub struct Version {
// Primary key // Primary key
pub version: UUID, pub uuid: UUID,
// Actual data: the blocks for this version // Actual data: the blocks for this version
pub deleted: bool, pub deleted: bool,
@ -30,7 +30,7 @@ pub struct VersionBlock {
impl Entry<Hash, EmptySortKey> for Version { impl Entry<Hash, EmptySortKey> for Version {
fn partition_key(&self) -> &Hash { fn partition_key(&self) -> &Hash {
&self.version &self.uuid
} }
fn sort_key(&self) -> &EmptySortKey { fn sort_key(&self) -> &EmptySortKey {
&EmptySortKey &EmptySortKey