attempt at documenting model crate

This commit is contained in:
Trinity Pointard 2021-03-26 21:53:28 +01:00 committed by Alex Auvolat
parent b437610812
commit 67585a4ffa
No known key found for this signature in database
GPG key ID: EDABF9711E244EB1
8 changed files with 119 additions and 15 deletions

View file

@ -24,6 +24,7 @@ use crate::block_ref_table::*;
use crate::garage::Garage;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1;
@ -33,28 +34,41 @@ const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
Ok,
/// Message to ask for a block of data, by hash
GetBlock(Hash),
/// Message to send a block of data, either because requested, of for first delivery of new
/// block
PutBlock(PutBlockMessage),
/// Ask other node if they should have this block, but don't actually have it
NeedBlockQuery(Hash),
/// Response : whether the node do require that block
NeedBlockReply(bool),
}
/// Structure used to send a block
#[derive(Debug, Serialize, Deserialize)]
pub struct PutBlockMessage {
/// Hash of the block
pub hash: Hash,
/// Content of the block
#[serde(with = "serde_bytes")]
pub data: Vec<u8>,
}
impl RpcMessage for Message {}
/// The block manager, handling block exchange between nodes, and block storage on local node
pub struct BlockManager {
/// Replication strategy, allowing to find on which node blocks should be located
pub replication: TableShardedReplication,
/// Directory in which block are stored
pub data_dir: PathBuf,
/// Lock to prevent concurrent edition of the directory
pub data_dir_lock: Mutex<()>,
rc: sled::Tree,
@ -128,7 +142,8 @@ impl BlockManager {
}
pub fn spawn_background_worker(self: Arc<Self>) {
// Launch 2 simultaneous workers for background resync loop preprocessing
// Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this
// launches only one worker with current value of BACKGROUND_WORKERS
for i in 0..BACKGROUND_WORKERS {
let bm2 = self.clone();
let background = self.system.background.clone();
@ -141,6 +156,7 @@ impl BlockManager {
}
}
/// Write a block to disk
pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
let _lock = self.data_dir_lock.lock().await;
@ -159,6 +175,7 @@ impl BlockManager {
Ok(Message::Ok)
}
/// Read block from disk, verifying it's integrity
pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
let path = self.block_path(hash);
@ -190,6 +207,7 @@ impl BlockManager {
Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
}
/// Check if this node should have a block, but don't actually have it
pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
let needed = self
.rc
@ -217,6 +235,8 @@ impl BlockManager {
path
}
/// Increment the number of time a block is used, putting it to resynchronization if it is
/// required, but not known
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
let old_rc = self.rc.fetch_and_update(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@ -229,6 +249,8 @@ impl BlockManager {
Ok(())
}
/// Decrement the number of time a block is used
// when counter reach 0, it seems not put to resync which I assume put it to gc?
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
let new_rc = self.rc.update_and_fetch(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@ -388,6 +410,7 @@ impl BlockManager {
Ok(())
}
/// Ask nodes that might have a block for it
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
let who = self.replication.read_nodes(&hash);
let resps = self
@ -412,6 +435,7 @@ impl BlockManager {
)))
}
/// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash);
self.rpc_client
@ -498,6 +522,7 @@ impl BlockManager {
.boxed()
}
/// Get lenght of resync queue
pub fn resync_queue_len(&self) -> usize {
self.resync_queue.len()
}

View file

@ -11,12 +11,15 @@ use crate::block::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BlockRef {
// Primary key
/// Hash of the block
pub block: Hash,
// Sort key
// why a version on a hashed (probably immutable) piece of data?
pub version: UUID,
// Keep track of deleted status
/// Is that block deleted
pub deleted: crdt::Bool,
}

View file

@ -12,15 +12,18 @@ use crate::key_table::PermissionSet;
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket {
// Primary key
/// Name of the bucket
pub name: String,
/// State, and configuration if not deleted, of the bucket
pub state: crdt::LWW<BucketState>,
}
/// State of a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum BucketState {
/// The bucket is deleted
Deleted,
/// The bucket exists
Present(BucketParams),
}
@ -37,9 +40,12 @@ impl CRDT for BucketState {
}
}
/// Configuration for a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketParams {
/// Map of key with access to the bucket, and what kind of access they give
pub authorized_keys: crdt::LWWMap<String, PermissionSet>,
/// Is the bucket served as http
pub website: crdt::LWW<bool>,
}
@ -51,6 +57,7 @@ impl CRDT for BucketParams {
}
impl BucketParams {
/// Create a new default `BucketParams`
pub fn new() -> Self {
BucketParams {
authorized_keys: crdt::LWWMap::new(),
@ -60,15 +67,21 @@ impl BucketParams {
}
impl Bucket {
/// Create a new bucket
pub fn new(name: String) -> Self {
Bucket {
name,
state: crdt::LWW::new(BucketState::Present(BucketParams::new())),
}
}
/// Query if bucket is deleted
pub fn is_deleted(&self) -> bool {
*self.state.get() == BucketState::Deleted
}
/// Return the list of authorized keys, when each was updated, and the permission associated to
/// the key
pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] {
match self.state.get() {
BucketState::Deleted => &[],

View file

@ -18,15 +18,23 @@ use crate::key_table::*;
use crate::object_table::*;
use crate::version_table::*;
/// An entire Garage full of data
pub struct Garage {
/// The parsed configuration Garage is running
pub config: Config,
/// The local database
pub db: sled::Db,
/// A background job runner
pub background: Arc<BackgroundRunner>,
/// The membership manager
pub system: Arc<System>,
/// The block manager
pub block_manager: Arc<BlockManager>,
/// Table containing informations about buckets
pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
/// Table containing informations about api keys
pub key_table: Arc<Table<KeyTable, TableFullReplication>>,
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
@ -35,6 +43,7 @@ pub struct Garage {
}
impl Garage {
/// Create and run garage
pub fn new(
config: Config,
db: sled::Db,

View file

@ -3,26 +3,28 @@ use serde::{Deserialize, Serialize};
use garage_table::crdt::*;
use garage_table::*;
/// An api key
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Key {
// Primary key
/// The id of the key (immutable)
pub key_id: String,
// Associated secret key (immutable)
/// The secret_key associated
// shouldn't it be hashed or something, so it's trully secret?
pub secret_key: String,
// Name
/// Name for the key
pub name: crdt::LWW<String>,
// Deletion
/// Is the key deleted
pub deleted: crdt::Bool,
// Authorized keys
/// Buckets in which the key is authorized. Empty if `Key` is deleted
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
// CRDT interaction: deleted implies authorized_buckets is empty
}
impl Key {
/// Create a new key
pub fn new(name: String) -> Self {
let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..]));
let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]);
@ -34,6 +36,8 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(),
}
}
/// Import a key from it's parts
pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self {
Self {
key_id: key_id.to_string(),
@ -43,6 +47,8 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(),
}
}
/// Create a new Key which can me merged to mark an existing key deleted
pub fn delete(key_id: String) -> Self {
Self {
key_id,
@ -52,13 +58,16 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(),
}
}
/// Add an authorized bucket, only if it wasn't there before
/// Check if `Key` is allowed to read in bucket
pub fn allow_read(&self, bucket: &str) -> bool {
self.authorized_buckets
.get(&bucket.to_string())
.map(|x| x.allow_read)
.unwrap_or(false)
}
/// Check if `Key` is allowed to write in bucket
pub fn allow_write(&self, bucket: &str) -> bool {
self.authorized_buckets
.get(&bucket.to_string())
@ -67,9 +76,12 @@ impl Key {
}
}
/// Permission given to a key in a bucket
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct PermissionSet {
/// The key can be used to read the bucket
pub allow_read: bool,
/// The key can be used to write in the bucket
pub allow_write: bool,
}

View file

@ -1,3 +1,4 @@
#![warn(missing_docs)]
#[macro_use]
extern crate log;

View file

@ -11,19 +11,21 @@ use garage_table::*;
use crate::version_table::*;
/// An object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
// Primary key
/// The bucket in which the object is stored
pub bucket: String,
// Sort key
/// The key at which the object is stored in its bucket
pub key: String,
// Data
/// The list of known versions of the object
versions: Vec<ObjectVersion>,
}
impl Object {
/// Create an object from parts
pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self {
let mut ret = Self {
bucket,
@ -36,6 +38,7 @@ impl Object {
}
ret
}
/// Adds a version if it wasn't already present
pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> {
match self
@ -49,23 +52,32 @@ impl Object {
Ok(_) => Err(()),
}
}
/// Get a list of all versions known for `Object`
pub fn versions(&self) -> &[ObjectVersion] {
&self.versions[..]
}
}
/// Informations about a version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion {
/// Id of the version
pub uuid: UUID,
/// Timestamp of when the object was created
pub timestamp: u64,
/// State of the version
pub state: ObjectVersionState,
}
/// State of an object version
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState {
/// The version is being received
Uploading(ObjectVersionHeaders),
/// The version is fully received
Complete(ObjectVersionData),
/// The version was never fully received
Aborted,
}
@ -90,10 +102,15 @@ impl CRDT for ObjectVersionState {
}
}
/// Data about an object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData {
/// The version is deleted
DeleteMarker,
/// The object is short, it's stored inlined
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
/// The object is not short, Hash of first block is stored here, next segments hashes are
/// stored in the version table
FirstBlock(ObjectVersionMeta, Hash),
}
@ -101,16 +118,23 @@ impl AutoCRDT for ObjectVersionData {
const WARN_IF_DIFFERENT: bool = true;
}
/// Metadata about the object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta {
/// Headers to send to the client
pub headers: ObjectVersionHeaders,
/// Size of the object
pub size: u64,
/// etag of the object
pub etag: String,
}
/// Additional headers for an object
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionHeaders {
/// Content type of the object
pub content_type: String,
/// Any other http headers to send
pub other: BTreeMap<String, String>,
}
@ -118,18 +142,24 @@ impl ObjectVersion {
fn cmp_key(&self) -> (u64, UUID) {
(self.timestamp, self.uuid)
}
/// Is the object version currently being uploaded
pub fn is_uploading(&self) -> bool {
match self.state {
ObjectVersionState::Uploading(_) => true,
_ => false,
}
}
/// Is the object version completely received
pub fn is_complete(&self) -> bool {
match self.state {
ObjectVersionState::Complete(_) => true,
_ => false,
}
}
/// Is the object version available (received and not deleted)
pub fn is_data(&self) -> bool {
match self.state {
ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false,

View file

@ -10,21 +10,27 @@ use garage_table::*;
use crate::block_ref_table::*;
/// A version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
// Primary key
/// UUID of the version
pub uuid: UUID,
// Actual data: the blocks for this version
// In the case of a multipart upload, also store the etags
// of individual parts and check them when doing CompleteMultipartUpload
/// Is this version deleted
pub deleted: crdt::Bool,
/// list of blocks of data composing the version
pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
/// Etag of each part in case of a multipart upload, empty otherwise
pub parts_etags: crdt::Map<u64, String>,
// Back link to bucket+key so that we can figure if
// this was deleted later on
/// Bucket in which the related object is stored
pub bucket: String,
/// Key in which the related object is stored
pub key: String,
}
@ -43,7 +49,9 @@ impl Version {
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlockKey {
/// Number of the part, starting at 1
pub part_number: u64,
/// offset of the block in the file, starting at 0
pub offset: u64,
}
@ -61,9 +69,12 @@ impl PartialOrd for VersionBlockKey {
}
}
/// Informations about a single block
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlock {
/// Hash of the block
pub hash: Hash,
/// Size of the block
pub size: u64,
}