From 67585a4ffab14ba7c4b7c6dca530c177059a91d9 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Fri, 26 Mar 2021 21:53:28 +0100 Subject: [PATCH] attempt at documenting model crate --- src/model/block.rs | 27 ++++++++++++++++++++++++- src/model/block_ref_table.rs | 3 +++ src/model/bucket_table.rs | 17 ++++++++++++++-- src/model/garage.rs | 9 +++++++++ src/model/key_table.rs | 26 +++++++++++++++++------- src/model/lib.rs | 1 + src/model/object_table.rs | 38 ++++++++++++++++++++++++++++++++---- src/model/version_table.rs | 13 +++++++++++- 8 files changed, 119 insertions(+), 15 deletions(-) diff --git a/src/model/block.rs b/src/model/block.rs index 2b145615..38b2325c 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -24,6 +24,7 @@ use crate::block_ref_table::*; use crate::garage::Garage; +/// Size under which data will be stored inlined in database instead of as files pub const INLINE_THRESHOLD: usize = 3072; pub const BACKGROUND_WORKERS: u64 = 1; @@ -33,28 +34,41 @@ const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); +/// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] pub enum Message { Ok, + /// Message to ask for a block of data, by hash GetBlock(Hash), + /// Message to send a block of data, either because requested, of for first delivery of new + /// block PutBlock(PutBlockMessage), + /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), + /// Response : whether the node do require that block NeedBlockReply(bool), } +/// Structure used to send a block #[derive(Debug, Serialize, Deserialize)] pub struct PutBlockMessage { + /// Hash of the block pub hash: Hash, + /// Content of the block #[serde(with = "serde_bytes")] pub data: Vec, } impl RpcMessage for Message {} +/// The block manager, handling block exchange between nodes, and block storage on local node pub struct BlockManager { + /// Replication strategy, allowing to find on which node blocks should be located pub replication: TableShardedReplication, + /// Directory in which block are stored pub data_dir: PathBuf, + /// Lock to prevent concurrent edition of the directory pub data_dir_lock: Mutex<()>, rc: sled::Tree, @@ -128,7 +142,8 @@ impl BlockManager { } pub fn spawn_background_worker(self: Arc) { - // Launch 2 simultaneous workers for background resync loop preprocessing + // Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this + // launches only one worker with current value of BACKGROUND_WORKERS for i in 0..BACKGROUND_WORKERS { let bm2 = self.clone(); let background = self.system.background.clone(); @@ -141,6 +156,7 @@ impl BlockManager { } } + /// Write a block to disk pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result { let _lock = self.data_dir_lock.lock().await; @@ -159,6 +175,7 @@ impl BlockManager { Ok(Message::Ok) } + /// Read block from disk, verifying it's integrity pub async fn read_block(&self, hash: &Hash) -> Result { let path = self.block_path(hash); @@ -190,6 +207,7 @@ impl BlockManager { Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) } + /// Check if this node should have a block, but don't actually have it pub async fn need_block(&self, hash: &Hash) -> Result { let needed = self .rc @@ -217,6 +235,8 @@ impl BlockManager { path } + /// Increment the number of time a block is used, putting it to resynchronization if it is + /// required, but not known pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { let old_rc = self.rc.fetch_and_update(&hash, |old| { let old_v = old.map(u64_from_be_bytes).unwrap_or(0); @@ -229,6 +249,8 @@ impl BlockManager { Ok(()) } + /// Decrement the number of time a block is used + // when counter reach 0, it seems not put to resync which I assume put it to gc? pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { let new_rc = self.rc.update_and_fetch(&hash, |old| { let old_v = old.map(u64_from_be_bytes).unwrap_or(0); @@ -388,6 +410,7 @@ impl BlockManager { Ok(()) } + /// Ask nodes that might have a block for it pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { let who = self.replication.read_nodes(&hash); let resps = self @@ -412,6 +435,7 @@ impl BlockManager { ))) } + /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Vec) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); self.rpc_client @@ -498,6 +522,7 @@ impl BlockManager { .boxed() } + /// Get lenght of resync queue pub fn resync_queue_len(&self) -> usize { self.resync_queue.len() } diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs index e4372717..2c5f9bf9 100644 --- a/src/model/block_ref_table.rs +++ b/src/model/block_ref_table.rs @@ -11,12 +11,15 @@ use crate::block::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct BlockRef { // Primary key + /// Hash of the block pub block: Hash, // Sort key + // why a version on a hashed (probably immutable) piece of data? pub version: UUID, // Keep track of deleted status + /// Is that block deleted pub deleted: crdt::Bool, } diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 2ede4904..8198deb7 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -12,15 +12,18 @@ use crate::key_table::PermissionSet; /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Bucket { - // Primary key + /// Name of the bucket pub name: String, - + /// State, and configuration if not deleted, of the bucket pub state: crdt::LWW, } +/// State of a bucket #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub enum BucketState { + /// The bucket is deleted Deleted, + /// The bucket exists Present(BucketParams), } @@ -37,9 +40,12 @@ impl CRDT for BucketState { } } +/// Configuration for a bucket #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct BucketParams { + /// Map of key with access to the bucket, and what kind of access they give pub authorized_keys: crdt::LWWMap, + /// Is the bucket served as http pub website: crdt::LWW, } @@ -51,6 +57,7 @@ impl CRDT for BucketParams { } impl BucketParams { + /// Create a new default `BucketParams` pub fn new() -> Self { BucketParams { authorized_keys: crdt::LWWMap::new(), @@ -60,15 +67,21 @@ impl BucketParams { } impl Bucket { + /// Create a new bucket pub fn new(name: String) -> Self { Bucket { name, state: crdt::LWW::new(BucketState::Present(BucketParams::new())), } } + + /// Query if bucket is deleted pub fn is_deleted(&self) -> bool { *self.state.get() == BucketState::Deleted } + + /// Return the list of authorized keys, when each was updated, and the permission associated to + /// the key pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] { match self.state.get() { BucketState::Deleted => &[], diff --git a/src/model/garage.rs b/src/model/garage.rs index 3f51f9fe..797a91e5 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -18,15 +18,23 @@ use crate::key_table::*; use crate::object_table::*; use crate::version_table::*; +/// An entire Garage full of data pub struct Garage { + /// The parsed configuration Garage is running pub config: Config, + /// The local database pub db: sled::Db, + /// A background job runner pub background: Arc, + /// The membership manager pub system: Arc, + /// The block manager pub block_manager: Arc, + /// Table containing informations about buckets pub bucket_table: Arc>, + /// Table containing informations about api keys pub key_table: Arc>, pub object_table: Arc>, @@ -35,6 +43,7 @@ pub struct Garage { } impl Garage { + /// Create and run garage pub fn new( config: Config, db: sled::Db, diff --git a/src/model/key_table.rs b/src/model/key_table.rs index e6ebe8de..653a38e2 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -3,26 +3,28 @@ use serde::{Deserialize, Serialize}; use garage_table::crdt::*; use garage_table::*; +/// An api key #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Key { - // Primary key + /// The id of the key (immutable) pub key_id: String, - // Associated secret key (immutable) + /// The secret_key associated + // shouldn't it be hashed or something, so it's trully secret? pub secret_key: String, - // Name + /// Name for the key pub name: crdt::LWW, - // Deletion + /// Is the key deleted pub deleted: crdt::Bool, - // Authorized keys + /// Buckets in which the key is authorized. Empty if `Key` is deleted pub authorized_buckets: crdt::LWWMap, - // CRDT interaction: deleted implies authorized_buckets is empty } impl Key { + /// Create a new key pub fn new(name: String) -> Self { let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]); @@ -34,6 +36,8 @@ impl Key { authorized_buckets: crdt::LWWMap::new(), } } + + /// Import a key from it's parts pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self { Self { key_id: key_id.to_string(), @@ -43,6 +47,8 @@ impl Key { authorized_buckets: crdt::LWWMap::new(), } } + + /// Create a new Key which can me merged to mark an existing key deleted pub fn delete(key_id: String) -> Self { Self { key_id, @@ -52,13 +58,16 @@ impl Key { authorized_buckets: crdt::LWWMap::new(), } } - /// Add an authorized bucket, only if it wasn't there before + + /// Check if `Key` is allowed to read in bucket pub fn allow_read(&self, bucket: &str) -> bool { self.authorized_buckets .get(&bucket.to_string()) .map(|x| x.allow_read) .unwrap_or(false) } + + /// Check if `Key` is allowed to write in bucket pub fn allow_write(&self, bucket: &str) -> bool { self.authorized_buckets .get(&bucket.to_string()) @@ -67,9 +76,12 @@ impl Key { } } +/// Permission given to a key in a bucket #[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct PermissionSet { + /// The key can be used to read the bucket pub allow_read: bool, + /// The key can be used to write in the bucket pub allow_write: bool, } diff --git a/src/model/lib.rs b/src/model/lib.rs index b4a8ddb7..70d2e2ce 100644 --- a/src/model/lib.rs +++ b/src/model/lib.rs @@ -1,3 +1,4 @@ +#![warn(missing_docs)] #[macro_use] extern crate log; diff --git a/src/model/object_table.rs b/src/model/object_table.rs index d5be62e5..5b026ceb 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -11,19 +11,21 @@ use garage_table::*; use crate::version_table::*; +/// An object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { - // Primary key + /// The bucket in which the object is stored pub bucket: String, - // Sort key + /// The key at which the object is stored in its bucket pub key: String, - // Data + /// The list of known versions of the object versions: Vec, } impl Object { + /// Create an object from parts pub fn new(bucket: String, key: String, versions: Vec) -> Self { let mut ret = Self { bucket, @@ -36,6 +38,7 @@ impl Object { } ret } + /// Adds a version if it wasn't already present pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> { match self @@ -49,23 +52,32 @@ impl Object { Ok(_) => Err(()), } } + + /// Get a list of all versions known for `Object` pub fn versions(&self) -> &[ObjectVersion] { &self.versions[..] } } +/// Informations about a version of an object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersion { + /// Id of the version pub uuid: UUID, + /// Timestamp of when the object was created pub timestamp: u64, - + /// State of the version pub state: ObjectVersionState, } +/// State of an object version #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionState { + /// The version is being received Uploading(ObjectVersionHeaders), + /// The version is fully received Complete(ObjectVersionData), + /// The version was never fully received Aborted, } @@ -90,10 +102,15 @@ impl CRDT for ObjectVersionState { } } +/// Data about an object version #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionData { + /// The version is deleted DeleteMarker, + /// The object is short, it's stored inlined Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec), + /// The object is not short, Hash of first block is stored here, next segments hashes are + /// stored in the version table FirstBlock(ObjectVersionMeta, Hash), } @@ -101,16 +118,23 @@ impl AutoCRDT for ObjectVersionData { const WARN_IF_DIFFERENT: bool = true; } +/// Metadata about the object version #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionMeta { + /// Headers to send to the client pub headers: ObjectVersionHeaders, + /// Size of the object pub size: u64, + /// etag of the object pub etag: String, } +/// Additional headers for an object #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionHeaders { + /// Content type of the object pub content_type: String, + /// Any other http headers to send pub other: BTreeMap, } @@ -118,18 +142,24 @@ impl ObjectVersion { fn cmp_key(&self) -> (u64, UUID) { (self.timestamp, self.uuid) } + + /// Is the object version currently being uploaded pub fn is_uploading(&self) -> bool { match self.state { ObjectVersionState::Uploading(_) => true, _ => false, } } + + /// Is the object version completely received pub fn is_complete(&self) -> bool { match self.state { ObjectVersionState::Complete(_) => true, _ => false, } } + + /// Is the object version available (received and not deleted) pub fn is_data(&self) -> bool { match self.state { ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false, diff --git a/src/model/version_table.rs b/src/model/version_table.rs index fabd1fb1..428fac10 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -10,21 +10,27 @@ use garage_table::*; use crate::block_ref_table::*; +/// A version of an object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Version { - // Primary key + /// UUID of the version pub uuid: UUID, // Actual data: the blocks for this version // In the case of a multipart upload, also store the etags // of individual parts and check them when doing CompleteMultipartUpload + /// Is this version deleted pub deleted: crdt::Bool, + /// list of blocks of data composing the version pub blocks: crdt::Map, + /// Etag of each part in case of a multipart upload, empty otherwise pub parts_etags: crdt::Map, // Back link to bucket+key so that we can figure if // this was deleted later on + /// Bucket in which the related object is stored pub bucket: String, + /// Key in which the related object is stored pub key: String, } @@ -43,7 +49,9 @@ impl Version { #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] pub struct VersionBlockKey { + /// Number of the part, starting at 1 pub part_number: u64, + /// offset of the block in the file, starting at 0 pub offset: u64, } @@ -61,9 +69,12 @@ impl PartialOrd for VersionBlockKey { } } +/// Informations about a single block #[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] pub struct VersionBlock { + /// Hash of the block pub hash: Hash, + /// Size of the block pub size: u64, }