add doc comments #53

Merged
lx merged 10 commits from trinity-1686a/garage:doc-comments into main 2021-04-08 13:01:22 +00:00
38 changed files with 393 additions and 79 deletions

View file

@ -20,6 +20,7 @@ use crate::s3_get::*;
use crate::s3_list::*;
use crate::s3_put::*;
/// Run the S3 API server
pub async fn run_api_server(
garage: Arc<Garage>,
shutdown_signal: impl Future<Output = ()>,

View file

@ -1,9 +1,13 @@
//! Module containing various helpers for encoding
/// Escape &str for xml inclusion
pub fn xml_escape(s: &str) -> String {
s.replace("<", "&lt;")
.replace(">", "&gt;")
.replace("\"", "&quot;")
}
/// Encode &str for use in a URI
pub fn uri_encode(string: &str, encode_slash: bool) -> String {
let mut result = String::with_capacity(string.len() * 2);
for c in string.chars() {
@ -24,6 +28,7 @@ pub fn uri_encode(string: &str, encode_slash: bool) -> String {
result
}
/// Encode &str either as an uri, or a valid string for xml inclusion
pub fn xml_encode_key(k: &str, urlencode: bool) -> String {
if urlencode {
uri_encode(k, true)

View file

@ -3,44 +3,57 @@ use hyper::StatusCode;
use garage_util::error::Error as GarageError;
/// Errors of this crate
#[derive(Debug, Error)]
pub enum Error {
// Category: internal error
/// Error related to deeper parts of Garage
#[error(display = "Internal error: {}", _0)]
InternalError(#[error(source)] GarageError),
/// Error related to Hyper
#[error(display = "Internal error (Hyper error): {}", _0)]
Hyper(#[error(source)] hyper::Error),
/// Error related to HTTP
#[error(display = "Internal error (HTTP error): {}", _0)]
HTTP(#[error(source)] http::Error),
// Category: cannot process
/// No proper api key was used, or the signature was invalid
#[error(display = "Forbidden: {}", _0)]
Forbidden(String),
/// The object requested don't exists
#[error(display = "Not found")]
NotFound,
// Category: bad request
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

the request contained an invalid UTF-8 sequence in its path or in other parameters would be more accurate

`the request contained an invalid UTF-8 sequence in its path or in other parameters` would be more accurate
#[error(display = "Invalid UTF-8: {}", _0)]
InvalidUTF8Str(#[error(source)] std::str::Utf8Error),
/// The request used an invalid path
#[error(display = "Invalid UTF-8: {}", _0)]
InvalidUTF8String(#[error(source)] std::string::FromUtf8Error),
/// Some base64 encoded data was badly encoded
#[error(display = "Invalid base64: {}", _0)]
InvalidBase64(#[error(source)] base64::DecodeError),
/// The client sent invalid XML data
#[error(display = "Invalid XML: {}", _0)]
InvalidXML(String),
/// The client sent a header with invalid value
#[error(display = "Invalid header value: {}", _0)]
InvalidHeader(#[error(source)] hyper::header::ToStrError),
/// The client sent a range header with invalid value
#[error(display = "Invalid HTTP range: {:?}", _0)]
InvalidRange(#[error(from)] http_range::HttpRangeParseError),
/// The client sent an invalid request
#[error(display = "Bad request: {}", _0)]
BadRequest(String),
}
@ -52,6 +65,7 @@ impl From<roxmltree::Error> for Error {
}
impl Error {
/// Get the HTTP status code that best represents the meaning of the error for the client
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

rather get the HTTP status code that best represents the meaning of the error for the client

rather `get the HTTP status code that best represents the meaning of the error for the client`
pub fn http_status_code(&self) -> StatusCode {
match self {
Error::NotFound => StatusCode::NOT_FOUND,
@ -65,6 +79,7 @@ impl Error {
}
}
/// Trait to map error to the Bad Request error code
pub trait OkOrBadRequest {
type S2;
fn ok_or_bad_request(self, reason: &'static str) -> Self::S2;
@ -93,6 +108,7 @@ impl<T> OkOrBadRequest for Option<T> {
}
}
/// Trait to map an error to an Internal Error code
pub trait OkOrInternalError {
type S2;
fn ok_or_internal_error(self, reason: &'static str) -> Self::S2;

View file

@ -1,15 +1,19 @@
//! Crate for serving a S3 compatible API
#[macro_use]
extern crate log;
pub mod error;
mod error;
pub use error::Error;
lx marked this conversation as resolved Outdated
Outdated
Review

Doesn't this prevent us from using OkOrBadRequest and OkOrInternalError? (haven't read what's below yet)

Doesn't this prevent us from using `OkOrBadRequest` and `OkOrInternalError`? (haven't read what's below yet)

It prevents use from other crates, but not from this one. It does not break anything, but does prevent using those traits in web in the future, for instance ????

It prevents use from other crates, but not from this one. It does not break anything, but does prevent using those traits in web in the future, for instance ????
Outdated
Review

Alright, we'll make things pub again if/when we need them.

Alright, we'll make things `pub` again if/when we need them.
pub mod encoding;
mod encoding;
pub mod api_server;
pub mod signature;
mod api_server;
pub use api_server::run_api_server;
pub mod s3_copy;
pub mod s3_delete;
mod signature;
mod s3_copy;
mod s3_delete;
pub mod s3_get;
pub mod s3_list;
pub mod s3_put;
mod s3_list;
mod s3_put;

View file

@ -1,3 +1,4 @@
//! Function related to GET and HEAD requests
use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH};
@ -79,6 +80,7 @@ fn try_answer_cached(
}
}
/// Handle HEAD request
pub async fn handle_head(
garage: Arc<Garage>,
req: &Request<Body>,
@ -118,6 +120,7 @@ pub async fn handle_head(
Ok(response)
}
/// Handle GET request
pub async fn handle_get(
garage: Arc<Garage>,
req: &Request<Body>,
@ -224,7 +227,7 @@ pub async fn handle_get(
}
}
pub async fn handle_get_range(
async fn handle_get_range(
garage: Arc<Garage>,
version: &ObjectVersion,
version_data: &ObjectVersionData,

View file

@ -80,7 +80,7 @@ pub struct ConfigureNodeOpt {
#[structopt(short = "c", long = "capacity")]
capacity: Option<u32>,
/// Optionnal node tag
/// Optional node tag
#[structopt(short = "t", long = "tag")]
tag: Option<String>,

View file

@ -1,4 +1,5 @@
#![recursion_limit = "1024"]
//! Garage CLI, used to interact with a running Garage instance, and to launch a Garage instance
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

"and to launch a Garage instance" ?

"and to launch a Garage **instance**" ?
#[macro_use]
extern crate log;
@ -25,7 +26,7 @@ use cli::*;
#[derive(StructOpt, Debug)]
#[structopt(name = "garage")]
pub struct Opt {
struct Opt {
/// RPC connect to this host to execute client operations
#[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")]
pub rpc_host: SocketAddr,

View file

@ -8,10 +8,10 @@ use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
use garage_api::api_server;
use garage_api::run_api_server;
use garage_model::garage::Garage;
use garage_rpc::rpc_server::RpcServer;
use garage_web::web_server;
use garage_web::run_web_server;
use crate::admin_rpc::*;
@ -62,8 +62,8 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initializing RPC and API servers...");
let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone()));
let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
let web_server = web_server::run_web_server(garage, wait_from(watch_cancel.clone()));
let api_server = run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
let web_server = run_web_server(garage, wait_from(watch_cancel.clone()));
futures::try_join!(
bootstrap.map(|rv| {

View file

@ -18,12 +18,13 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use garage_table::replication::{sharded::TableShardedReplication, TableReplication};
use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block_ref_table::*;
use crate::garage::Garage;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1;
@ -33,28 +34,41 @@ const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
Ok,
/// Message to ask for a block of data, by hash
GetBlock(Hash),
/// Message to send a block of data, either because requested, of for first delivery of new
/// block
PutBlock(PutBlockMessage),
/// Ask other node if they should have this block, but don't actually have it
NeedBlockQuery(Hash),
/// Response : whether the node do require that block
NeedBlockReply(bool),
}
/// Structure used to send a block
#[derive(Debug, Serialize, Deserialize)]
pub struct PutBlockMessage {
/// Hash of the block
pub hash: Hash,
/// Content of the block
#[serde(with = "serde_bytes")]
pub data: Vec<u8>,
}
impl RpcMessage for Message {}
/// The block manager, handling block exchange between nodes, and block storage on local node
pub struct BlockManager {
/// Replication strategy, allowing to find on which node blocks should be located
pub replication: TableShardedReplication,
/// Directory in which block are stored
pub data_dir: PathBuf,
/// Lock to prevent concurrent edition of the directory
pub data_dir_lock: Mutex<()>,
rc: sled::Tree,
@ -128,7 +142,8 @@ impl BlockManager {
}
pub fn spawn_background_worker(self: Arc<Self>) {
// Launch 2 simultaneous workers for background resync loop preprocessing
// Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this
// launches only one worker with current value of BACKGROUND_WORKERS
for i in 0..BACKGROUND_WORKERS {
let bm2 = self.clone();
let background = self.system.background.clone();
@ -141,7 +156,8 @@ impl BlockManager {
}
}
pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
/// Write a block to disk
async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

should probably not be pub, same for read_block and need_block

should probably not be `pub`, same for `read_block` and `need_block`
let _lock = self.data_dir_lock.lock().await;
let mut path = self.block_dir(hash);
@ -159,7 +175,8 @@ impl BlockManager {
Ok(Message::Ok)
}
pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
/// Read block from disk, verifying it's integrity
async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
let path = self.block_path(hash);
let mut f = match fs::File::open(&path).await {
@ -190,7 +207,8 @@ impl BlockManager {
Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
}
pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
/// Check if this node should have a block, but don't actually have it
async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
let needed = self
.rc
.get(hash.as_ref())?
@ -217,6 +235,8 @@ impl BlockManager {
path
}
/// Increment the number of time a block is used, putting it to resynchronization if it is
/// required, but not known
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
let old_rc = self.rc.fetch_and_update(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@ -229,6 +249,7 @@ impl BlockManager {
Ok(())
}
/// Decrement the number of time a block is used
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

resync does both: if the rc is >0 and the block is not present, ask it to peers. on the contrary if rc=0 and block is present, delete it locally (after checking that no peer needs it)

resync does both: if the rc is >0 and the block is not present, ask it to peers. on the contrary if rc=0 and block is present, delete it locally (after checking that no peer needs it)

I understand resync both get missing blocks and remove unnecessary blocks. My problem is that if I understand the code correctly, when the counter go down from 1 to 0, the value for new_rc is Some(0) (actually a Vec of 8 0u8), which is not None, so the block is not put to resync, and don't get deleted, unless some background thread regularly scan the table, which sound inneficient

I understand resync both get missing blocks and remove unnecessary blocks. My problem is that if I understand the code correctly, when the counter go down from 1 to 0, the value for new_rc is Some(0) (actually a Vec of 8 0u8), which is not None, so the block is not put to resync, and don't get deleted, unless some background thread regularly scan the table, which sound inneficient
Outdated
Review

block_decref actually deletes the value in the rc when it reaches 0, so it is indeed None and not Some(0), unless I'm mistaken? Maybe you're not looking at the last version of the code?

`block_decref` actually deletes the value in the rc when it reaches 0, so it is indeed None and not Some(0), unless I'm mistaken? Maybe you're not looking at the last version of the code?

Nevermind, I somehow read old_v >= 1 instead of old_v > 1

Nevermind, I somehow read `old_v >= 1` instead of `old_v > 1`
let new_rc = self.rc.update_and_fetch(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@ -388,6 +409,7 @@ impl BlockManager {
Ok(())
}
/// Ask nodes that might have a block for it
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
let who = self.replication.read_nodes(&hash);
let resps = self
@ -412,6 +434,7 @@ impl BlockManager {
)))
}
/// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash);
self.rpc_client
@ -498,6 +521,7 @@ impl BlockManager {
.boxed()
}
/// Get lenght of resync queue
pub fn resync_queue_len(&self) -> usize {
self.resync_queue.len()
}

View file

@ -10,13 +10,14 @@ use crate::block::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BlockRef {
// Primary key
/// Hash of the block, used as partition key
pub block: Hash,
// Sort key
/// Id of the Version for the object containing this block, used as sorting key
pub version: UUID,
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

version is the id of a Version object in the version table (which stores the different versions of objects in the buckets), not a version number for the BlockRef. i.e. BlockRef.version is the version id of an object that contains this block

version is the id of a Version object in the version table (which stores the different versions of objects in the buckets), not a version number for the BlockRef. i.e. `BlockRef.version` is the version id of an object that contains this block
// Keep track of deleted status
/// Is the Version that contains this block deleted
pub deleted: crdt::Bool,
}
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

-> is the Version (a version of an object in a bucket) that contains this block deleted

-> is the Version (a version of an object in a bucket) that contains this block deleted

View file

@ -12,15 +12,18 @@ use crate::key_table::PermissionSet;
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket {
// Primary key
/// Name of the bucket
pub name: String,
/// State, and configuration if not deleted, of the bucket
pub state: crdt::LWW<BucketState>,
}
/// State of a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum BucketState {
/// The bucket is deleted
Deleted,
/// The bucket exists
Present(BucketParams),
}
@ -37,9 +40,12 @@ impl CRDT for BucketState {
}
}
/// Configuration for a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketParams {
/// Map of key with access to the bucket, and what kind of access they give
pub authorized_keys: crdt::LWWMap<String, PermissionSet>,
/// Is the bucket served as http
pub website: crdt::LWW<bool>,
}
@ -51,6 +57,7 @@ impl CRDT for BucketParams {
}
impl BucketParams {
/// Initializes a new instance of the Bucket struct
pub fn new() -> Self {
BucketParams {
authorized_keys: crdt::LWWMap::new(),
@ -60,15 +67,21 @@ impl BucketParams {
}
impl Bucket {
/// Create a new bucket
Outdated
Review

This formulation is ambiguous/misleading, as the bucket is not really created until the Bucket is commited to the database

This formulation is ambiguous/misleading, as the bucket is not really created until the `Bucket` is commited to the database
Outdated
Review

Suggestion: Initializes a new instance of the Bucket struct

Suggestion: `Initializes a new instance of the Bucket struct`
pub fn new(name: String) -> Self {
Bucket {
name,
state: crdt::LWW::new(BucketState::Present(BucketParams::new())),
}
}
/// Returns true if this represents a deleted bucket
Outdated
Review

This formulation is ambiguous/misleading, as is_deleted does not do a database query, it just checks if this struct that was returned earlier from the db has the deleted flag

This formulation is ambiguous/misleading, as `is_deleted` does not do a database query, it just checks if this struct that was returned earlier from the db has the deleted flag
Outdated
Review

Suggestion: returns true if this represents a deleted bucket

Suggestion: `returns true if this represents a deleted bucket`
pub fn is_deleted(&self) -> bool {
*self.state.get() == BucketState::Deleted
}
/// Return the list of authorized keys, when each was updated, and the permission associated to
/// the key
pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] {
match self.state.get() {
BucketState::Deleted => &[],

View file

@ -7,8 +7,8 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::RpcHttpClient;
use garage_rpc::rpc_server::RpcServer;
use garage_table::replication::fullcopy::*;
use garage_table::replication::sharded::*;
use garage_table::replication::TableFullReplication;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::block::*;
@ -18,15 +18,23 @@ use crate::key_table::*;
use crate::object_table::*;
use crate::version_table::*;
/// An entire Garage full of data
pub struct Garage {
/// The parsed configuration Garage is running
pub config: Config,
/// The local database
pub db: sled::Db,
/// A background job runner
pub background: Arc<BackgroundRunner>,
/// The membership manager
pub system: Arc<System>,
/// The block manager
pub block_manager: Arc<BlockManager>,
/// Table containing informations about buckets
pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
/// Table containing informations about api keys
pub key_table: Arc<Table<KeyTable, TableFullReplication>>,
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
@ -35,6 +43,7 @@ pub struct Garage {
}
impl Garage {
/// Create and run garage
pub fn new(
config: Config,
db: sled::Db,

View file

@ -3,26 +3,28 @@ use serde::{Deserialize, Serialize};
use garage_table::crdt::*;
use garage_table::*;
/// An api key
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Key {
// Primary key
/// The id of the key (immutable), used as partition key
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

please keep in the comment that this acts as the primary key

please keep in the comment that this acts as the primary key
pub key_id: String,
// Associated secret key (immutable)
/// The secret_key associated
pub secret_key: String,
// Name
/// Name for the key
pub name: crdt::LWW<String>,
// Deletion
/// Is the key deleted
pub deleted: crdt::Bool,
// Authorized keys
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
/// Buckets in which the key is authorized. Empty if `Key` is deleted
// CRDT interaction: deleted implies authorized_buckets is empty
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

this comment needs to be kept somewhere

this comment needs to be kept somewhere
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
}
impl Key {
/// Create a new key
pub fn new(name: String) -> Self {
let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..]));
let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]);
@ -34,6 +36,8 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(),
}
}
/// Import a key from it's parts
pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self {
Self {
key_id: key_id.to_string(),
@ -43,6 +47,8 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(),
}
}
/// Create a new Key which can me merged to mark an existing key deleted
pub fn delete(key_id: String) -> Self {
Self {
key_id,
@ -52,13 +58,16 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(),
}
}
/// Add an authorized bucket, only if it wasn't there before
/// Check if `Key` is allowed to read in bucket
pub fn allow_read(&self, bucket: &str) -> bool {
self.authorized_buckets
.get(&bucket.to_string())
.map(|x| x.allow_read)
.unwrap_or(false)
}
/// Check if `Key` is allowed to write in bucket
pub fn allow_write(&self, bucket: &str) -> bool {
self.authorized_buckets
.get(&bucket.to_string())
@ -67,9 +76,12 @@ impl Key {
}
}
/// Permission given to a key in a bucket
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct PermissionSet {
/// The key can be used to read the bucket
pub allow_read: bool,
/// The key can be used to write in the bucket
pub allow_write: bool,
}

View file

@ -6,24 +6,26 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_table::crdt::*;
use garage_table::replication::sharded::*;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::version_table::*;
/// An object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
// Primary key
/// The bucket in which the object is stored, used as partition key
pub bucket: String,
// Sort key
/// The key at which the object is stored in its bucket, used as sorting key
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

please keep in the comment that these two are used respectively as partition key (not primary key, my bad) and sort key, it's an import detail in understanding how Garage works internally

please keep in the comment that these two are used respectively as partition key (not primary key, my bad) and sort key, it's an import detail in understanding how Garage works internally
pub key: String,
// Data
/// The list of currenty stored versions of the object
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

doesn't keep all known versions, the comment is misleading

doesn't keep all known versions, the comment is misleading

I'm not sure what it store then

I'm not sure what it store then
versions: Vec<ObjectVersion>,
}
impl Object {
/// Create an object from parts
pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self {
let mut ret = Self {
bucket,
@ -36,6 +38,7 @@ impl Object {
}
ret
}
/// Adds a version if it wasn't already present
pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> {
match self
@ -49,23 +52,32 @@ impl Object {
Ok(_) => Err(()),
}
}
/// Get a list of currently stored versions of `Object`
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

s/known/currently stored

s/known/currently stored
pub fn versions(&self) -> &[ObjectVersion] {
&self.versions[..]
}
}
/// Informations about a version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion {
/// Id of the version
pub uuid: UUID,
/// Timestamp of when the object was created
pub timestamp: u64,
/// State of the version
pub state: ObjectVersionState,
}
/// State of an object version
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState {
/// The version is being received
Uploading(ObjectVersionHeaders),
/// The version is fully received
Complete(ObjectVersionData),
/// The version uploaded containded errors or the upload was explicitly aborted
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

it can never be received and stay in Uploading state forever if CompleteMultipartUpload/AbortMultipartUpload is never called (and it doesn't matter)

it can never be received and stay in Uploading state forever if CompleteMultipartUpload/AbortMultipartUpload is never called (and it doesn't matter)
Aborted,
}
@ -90,10 +102,15 @@ impl CRDT for ObjectVersionState {
}
}
/// Data about an object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData {
/// The object was deleted, this Version is a tombstone to mark it as such
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

the version represents a deletion of the object, i.e. at this version number the object doesn't exist (404)

the version represents a deletion of the object, i.e. at this version number the object doesn't exist (404)
DeleteMarker,
/// The object is short, it's stored inlined
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
/// The object is not short, Hash of first block is stored here, next segments hashes are
/// stored in the version table
FirstBlock(ObjectVersionMeta, Hash),
}
@ -101,16 +118,23 @@ impl AutoCRDT for ObjectVersionData {
const WARN_IF_DIFFERENT: bool = true;
}
/// Metadata about the object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta {
/// Headers to send to the client
pub headers: ObjectVersionHeaders,
/// Size of the object
pub size: u64,
/// etag of the object
pub etag: String,
}
/// Additional headers for an object
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionHeaders {
/// Content type of the object
pub content_type: String,
/// Any other http headers to send
pub other: BTreeMap<String, String>,
}
@ -118,18 +142,24 @@ impl ObjectVersion {
fn cmp_key(&self) -> (u64, UUID) {
(self.timestamp, self.uuid)
}
/// Is the object version currently being uploaded
pub fn is_uploading(&self) -> bool {
match self.state {
ObjectVersionState::Uploading(_) => true,
_ => false,
}
}
/// Is the object version completely received
pub fn is_complete(&self) -> bool {
match self.state {
ObjectVersionState::Complete(_) => true,
_ => false,
}
}
/// Is the object version available (received and not a tombstone)
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

is the object version available as an existing object (received and not a delete marker)

is the object version available as an existing object (received and not a delete marker)
pub fn is_data(&self) -> bool {
match self.state {
ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false,

View file

@ -5,26 +5,32 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_table::crdt::*;
use garage_table::replication::sharded::*;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::block_ref_table::*;
/// A version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
// Primary key
/// UUID of the version, used as partition key
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

used as a partition key

used as a partition key
pub uuid: UUID,
// Actual data: the blocks for this version
// In the case of a multipart upload, also store the etags
// of individual parts and check them when doing CompleteMultipartUpload
/// Is this version deleted
pub deleted: crdt::Bool,
/// list of blocks of data composing the version
pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
/// Etag of each part in case of a multipart upload, empty otherwise
pub parts_etags: crdt::Map<u64, String>,
// Back link to bucket+key so that we can figure if
// this was deleted later on
/// Bucket in which the related object is stored
pub bucket: String,
/// Key in which the related object is stored
pub key: String,
}
@ -43,7 +49,9 @@ impl Version {
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlockKey {
/// Number of the part
lx marked this conversation as resolved Outdated
Outdated
Review

not necessarily starting at 1, the client decides the part numbers and we don't care (for instance he can send parts 0, 1, 2, 14, 22, 37 and 42, and that's a fine object as long as these parts match the checklist that is sent in the CompleteMultipartUpload call)

not necessarily starting at 1, the client decides the part numbers and we don't care (for instance he can send parts 0, 1, 2, 14, 22, 37 and 42, and that's a fine object as long as these parts match the checklist that is sent in the CompleteMultipartUpload call)

Ok I've read s3 doc on multi-part upload and I hate it.
Next comment appear to be false too, it's offset in part, not in the whole file

Ok I've read s3 doc on multi-part upload and I hate it. Next comment appear to be false too, it's offset in part, not in the whole file
Outdated
Review

Yes, we don't know the offset in the whole file when a part is being uploaded because parts get uploaded independently from one another.

Yes, we don't know the offset in the whole file when a part is being uploaded because parts get uploaded independently from one another.
pub part_number: u64,
/// Offset of this sub-segment in its part
pub offset: u64,
}
@ -61,9 +69,12 @@ impl PartialOrd for VersionBlockKey {
}
}
/// Informations about a single block
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlock {
/// Hash of the block
pub hash: Hash,
/// Size of the block
pub size: u64,
}

View file

@ -1,7 +1,9 @@
//! Crate containing rpc related functions and types used in Garage
#[macro_use]
extern crate log;
pub mod consul;
mod consul;
pub(crate) mod tls_util;
pub mod membership;

View file

@ -1,3 +1,4 @@
//! Module containing structs related to membership management
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

//!

`//!`
use std::collections::HashMap;
use std::fmt::Write as FmtWrite;
use std::io::{Read, Write};
@ -30,20 +31,29 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
/// RPC endpoint used for calls related to membership
pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
/// Response to successfull advertisements
Ok,
/// Message sent to detect other nodes status
Ping(PingMessage),
/// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp
PullStatus,
/// Ask other node its config. Answered with AdvertiseConfig
PullConfig,
/// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus
AdvertiseNodesUp(Vec<AdvertisedNode>),
/// Advertisement of nodes config. Sent spontanously or in response to PullConfig
AdvertiseConfig(NetworkConfig),
}
impl RpcMessage for Message {}
/// A ping, containing informations about status and config
#[derive(Debug, Serialize, Deserialize)]
pub struct PingMessage {
id: UUID,
@ -55,18 +65,25 @@ pub struct PingMessage {
state_info: StateInfo,
}
/// A node advertisement
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AdvertisedNode {
/// Id of the node this advertisement relates to
pub id: UUID,
/// IP and port of the node
pub addr: SocketAddr,
/// Is the node considered up
pub is_up: bool,
/// When was the node last seen up, in milliseconds since UNIX epoch
pub last_seen: u64,
pub state_info: StateInfo,
}
/// This node's membership manager
pub struct System {
/// The id of this node
pub id: UUID,
persist_config: Persister<NetworkConfig>,
@ -79,10 +96,12 @@ pub struct System {
rpc_client: Arc<RpcClient<Message>>,
pub(crate) status: watch::Receiver<Arc<Status>>,
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

not "viewed by this node", all nodes should have equal rings (if not it's a bug)

not "viewed by this node", all nodes should have equal rings (if not it's a bug)
/// The ring
pub ring: watch::Receiver<Arc<Ring>>,
update_lock: Mutex<Updaters>,
/// The job runner of this node
pub background: Arc<BackgroundRunner>,
}
@ -91,21 +110,29 @@ struct Updaters {
update_ring: watch::Sender<Arc<Ring>>,
}
lx marked this conversation as resolved Outdated
Outdated
Review

when is it sorted?

when is it sorted?

it's sorted in recalculate_hash. Actually the node list is short, so sorting it is pretty much free

it's sorted in recalculate_hash. Actually the node list is short, so sorting it is pretty much free
Outdated
Review

That's my reasoning as well.

That's my reasoning as well.
/// The status of each nodes, viewed by this node
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

hash of the whole set of currently known nodes. This hash is sent in regular ping messages so nodes can detect when they have different views of the cluster. They then exchange their peer lists kind of in an anti-entropy process.

hash of the whole set of currently known nodes. This hash is sent in regular ping messages so nodes can detect when they have different views of the cluster. They then exchange their peer lists kind of in an anti-entropy process.
#[derive(Debug, Clone)]
pub struct Status {
/// Mapping of each node id to its known status
pub nodes: HashMap<UUID, Arc<StatusEntry>>,
/// Hash of `nodes`, used to detect when nodes have different views of the cluster
pub hash: Hash,
}
/// The status of a single node
#[derive(Debug)]
pub struct StatusEntry {
Outdated
Review

Not sure but this counts failed requests of any kind, and not just ping messages. It is incremented in rpc_client.rs

Not sure but this counts failed requests of any kind, and not just ping messages. It is incremented in rpc_client.rs

hum, it might double-count pings, as it count once in rpc_clients, and once in this file in ping_nodes

hum, it might double-count pings, as it count once in rpc_clients, and once in this file in ping_nodes
Outdated
Review

Thanks for noticing that, I'll have to check.

Thanks for noticing that, I'll have to check.
Outdated
Review

Ok so the cound in rpc_client does not happen when sending a ping message in ping_nodes in membership.rs because we are calling RpcAddrClient::call and not RpcClient::call (i.e. we are calling the node using its IP address and not its node identifier). The increment in rpc_client is done in RpcClient::call because it needs to know the node ID. So there is no redundancy between the two.

Ok so the cound in `rpc_client` does not happen when sending a ping message in `ping_nodes` in `membership.rs` because we are calling `RpcAddrClient::call` and not `RpcClient::call` (i.e. we are calling the node using its IP address and not its node identifier). The increment in `rpc_client` is done in `RpcClient::call` because it needs to know the node ID. So there is no redundancy between the two.
/// The IP and port used to connect to this node
pub addr: SocketAddr,
/// Last time this node was seen
pub last_seen: u64,
/// Number of consecutive pings sent without reply to this node
pub num_failures: AtomicUsize,
pub state_info: StateInfo,
}
impl StatusEntry {
/// is the node associated to this entry considered up
pub fn is_up(&self) -> bool {
self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
}
@ -195,6 +222,7 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
}
impl System {
/// Create this node's membership manager
pub fn new(
metadata_dir: PathBuf,
rpc_http_client: Arc<RpcHttpClient>,
@ -279,6 +307,7 @@ impl System {
});
}
/// Get an RPC client
pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> {
RpcClient::new(
RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()),
@ -287,6 +316,7 @@ impl System {
)
}
/// Save network configuration to disc
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
let ring = self.ring.borrow().clone();
self.persist_config
@ -319,6 +349,7 @@ impl System {
self.rpc_client.call_many(&to[..], msg, timeout).await;
}
/// Perform bootstraping, starting the ping loop
pub async fn bootstrap(
self: Arc<Self>,
peers: Vec<SocketAddr>,
@ -386,6 +417,8 @@ impl System {
}
} else if let Some(id) = id_option {
if let Some(st) = status.nodes.get_mut(id) {
// we need to increment failure counter as call was done using by_addr so the
// counter was not auto-incremented
Outdated
Review

Actually no because the increment in rpc_client.rs is done by the RpcClient and not the RpcAddrClient, but here we are using the RpcAddrClient. (we could put this in a comment to make it clearer)

Actually no because the increment in `rpc_client.rs` is done by the `RpcClient` and not the `RpcAddrClient`, but here we are using the `RpcAddrClient`. (we could put this in a comment to make it clearer)
st.num_failures.fetch_add(1, Ordering::SeqCst);
if !st.is_up() {
warn!("Node {:?} seems to be down.", id);

View file

@ -1,3 +1,5 @@
//! Module containing types related to computing nodes which should receive a copy of data blocks
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

and metadata entries

and metadata entries
//! and metadata
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
@ -8,23 +10,30 @@ use garage_util::data::*;
// A partition number is encoded on 16 bits,
// i.e. we have up to 2**16 partitions.
// (in practice we have exactly 2**PARTITION_BITS partitions)
/// A partition id, stored on 16 bits
pub type Partition = u16;
// TODO: make this constant parametrizable in the config file
// For deployments with many nodes it might make sense to bump
// it up to 10.
// Maximum value : 16
/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
/// presence of numerous nodes, but exponentially bigger ring. Max 16
pub const PARTITION_BITS: usize = 8;
const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
// TODO: make this constant paraetrizable in the config file
// (most deployments use a replication factor of 3, so...)
/// The maximum number of time an object might get replicated
pub const MAX_REPLICATION: usize = 3;
Outdated
Review

Not really, a node can be known in that it answers ping messages, and not be configured if it is idle or draining pending removal by admins.

Not really, a node can be known in that it answers ping messages, and not be configured if it is idle or draining pending removal by admins.

would "available" be correct instead of known?

would "available" be correct instead of known?
Outdated
Review

Just put a TODO in the comment and I'll write it later

Just put a TODO in the comment and I'll write it later
Outdated
Review

Suggestion: The user-defined configuration of the cluster's nodes

Suggestion: `The user-defined configuration of the cluster's nodes`
/// The user-defined configuration of the cluster's nodes
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfig {
/// Map of each node's id to it's configuration
pub members: HashMap<UUID, NetworkConfigEntry>,
/// Version of this config
pub version: u64,
}
@ -37,26 +46,40 @@ impl NetworkConfig {
}
}
/// The overall configuration of one (possibly remote) node
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfigEntry {
/// Datacenter at which this entry belong. This infromation might be used to perform a better
/// geodistribution
pub datacenter: String,
/// The (relative) capacity of the node
pub capacity: u32,
/// A tag to recognize the entry, not used for other things than display
pub tag: String,
}
/// A ring distributing fairly objects to nodes
#[derive(Clone)]
pub struct Ring {
/// The network configuration used to generate this ring
pub config: NetworkConfig,
/// The list of entries in the ring
pub ring: Vec<RingEntry>,
}
/// An entry in the ring
#[derive(Clone, Debug)]
pub struct RingEntry {
/// The prefix of the Hash of object which should use this entry
pub location: Hash,
/// The nodes in which a matching object should get stored
pub nodes: [UUID; MAX_REPLICATION],
}
impl Ring {
// TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6
// levels of imbrication. It is basically impossible to test, maintain, or understand for an
// outsider.
pub(crate) fn new(config: NetworkConfig) -> Self {
// Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
@ -166,20 +189,16 @@ impl Ring {
})
.collect::<Vec<_>>();
// eprintln!("RING: --");
// for e in ring.iter() {
// eprintln!("{:?}", e);
// }
// eprintln!("END --");
Self { config, ring }
}
/// Get the partition in which data would fall on
pub fn partition_of(&self, from: &Hash) -> Partition {
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
top >> (16 - PARTITION_BITS)
}
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

and the first hash (of a partition key) that would fall in this partition

and the first hash (of a partition key) that would fall in this partition
/// Get the list of partitions and the first hash of a partition key that would fall in it
pub fn partitions(&self) -> Vec<(Partition, Hash)> {
let mut ret = vec![];
@ -193,6 +212,8 @@ impl Ring {
ret
}
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

TODO rename this function as it doesn't walk the ring anymore but just returns the nodes at the corresponding location. This is vocabulary from the previous implementation that used datacenter-aware ring walking and led to data imbalance.

TODO rename this function as it doesn't walk the ring anymore but just returns the nodes at the corresponding location. This is vocabulary from the previous implementation that used datacenter-aware ring walking and led to data imbalance.
// TODO rename this function as it no longer walk the ring
/// Walk the ring to find the n servers in which data should be replicated
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
if self.ring.len() != 1 << PARTITION_BITS {
warn!("Ring not yet ready, read/writes will be lost!");
@ -201,12 +222,15 @@ impl Ring {
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
lx marked this conversation as resolved Outdated
Outdated
Review

just to make sure I don't mess up. Should probably be factorized in a single location.

just to make sure I don't mess up. Should probably be factorized in a single location.
let partition_idx = (top >> (16 - PARTITION_BITS)) as usize;
// TODO why computing two time in the same way and asserting?
assert_eq!(partition_idx, self.partition_of(from) as usize);
let partition = &self.ring[partition_idx];
let partition_top =
u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
// TODO is this an assertion on the validity of PARTITION_MASK_U16? If so, it should
// probably be a test more than a runtime assertion
assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16);
assert!(n <= partition.nodes.len());

View file

@ -1,3 +1,4 @@
//! Contain structs related to making RPCs
use std::borrow::Borrow;
use std::marker::PhantomData;
use std::net::SocketAddr;
@ -26,14 +27,19 @@ use crate::tls_util;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
/// Strategy to apply when making RPC
#[derive(Copy, Clone)]
pub struct RequestStrategy {
/// Max time to wait for reponse
pub rs_timeout: Duration,
/// Min number of response to consider the request successful
pub rs_quorum: usize,
/// Should requests be dropped after enough response are received
pub rs_interrupt_after_quorum: bool,
}
impl RequestStrategy {
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
pub fn with_quorum(quorum: usize) -> Self {
RequestStrategy {
rs_timeout: DEFAULT_TIMEOUT,
@ -41,19 +47,25 @@ impl RequestStrategy {
rs_interrupt_after_quorum: false,
}
}
/// Set timeout of the strategy
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.rs_timeout = timeout;
self
}
/// Set if requests can be dropped after quorum has been reached
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

as a general rule: true for read requests, false for write requests

as a general rule: true for read requests, false for write requests
/// In general true for read requests, and false for write
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
self.rs_interrupt_after_quorum = interrupt;
self
}
}
/// Shortcut for a boxed async function taking a message, and resolving to another message or an
/// error
pub type LocalHandlerFn<M> =
Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>;
/// Client used to send RPC
pub struct RpcClient<M: RpcMessage> {
status: watch::Receiver<Arc<Status>>,
background: Arc<BackgroundRunner>,
@ -64,6 +76,7 @@ pub struct RpcClient<M: RpcMessage> {
}
impl<M: RpcMessage + 'static> RpcClient<M> {
/// Create a new RpcClient from an address, a job runner, and the status of all RPC servers
pub fn new(
rac: RpcAddrClient<M>,
background: Arc<BackgroundRunner>,
@ -77,6 +90,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
})
}
/// Set the local handler, to process RPC to this node without network usage
pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F)
where
F: Fn(Arc<M>) -> Fut + Send + Sync + 'static,
@ -90,14 +104,17 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
self.local_handler.swap(Some(Arc::new((my_id, handler))));
}
/// Get a RPC client to make calls using node's SocketAddr instead of its ID
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

Please don't call this get_addr, it's not what it does ! It does not return a SocketAddr or something like that. It is used to get the underlying RpcAddrClient which allows for RPC by specifing the target node's SocketAddr instead of their node IDs. Also, the comment is wrong.

Please don't call this `get_addr`, it's not what it does ! It does not return a SocketAddr or something like that. It is used to get the underlying RpcAddrClient which allows for RPC by specifing the target node's SocketAddr instead of their node IDs. Also, the comment is wrong.
pub fn by_addr(&self) -> &RpcAddrClient<M> {
&self.rpc_addr_client
}
/// Make a RPC call
pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> {
self.call_arc(to, Arc::new(msg), timeout).await
}
/// Make a RPC call from a message stored in an Arc
pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
if let Some(lh) = self.local_handler.load_full() {
let (my_id, local_handler) = lh.as_ref();
@ -135,6 +152,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
}
/// Make a RPC call to multiple servers, returning a Vec containing each result
pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
let msg = Arc::new(msg);
let mut resp_stream = to
@ -149,6 +167,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
results
}
/// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

too many*

too many*
/// strategy could not be respected due to too many errors
pub async fn try_call_many(
self: &Arc<Self>,
to: &[UUID],
@ -208,6 +228,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
}
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

No. This is an RPC client which contains only the necessary logic to call RPC's to nodes by specifying their SocketAddr, and not by specifying their node IDs. It is used as an underlying layer for RpcClient.

No. This is an RPC client which contains only the necessary logic to call RPC's to nodes by specifying their SocketAddr, and not by specifying their node IDs. It is used as an underlying layer for RpcClient.
/// Thin wrapper arround an `RpcHttpClient` specifying the path of the request
pub struct RpcAddrClient<M: RpcMessage> {
phantom: PhantomData<M>,
@ -216,6 +237,7 @@ pub struct RpcAddrClient<M: RpcMessage> {
}
impl<M: RpcMessage> RpcAddrClient<M> {
/// Create an RpcAddrClient from an HTTP client and the endpoint to reach for RPCs
pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self {
Self {
phantom: PhantomData::default(),
@ -224,6 +246,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
}
}
/// Make a RPC
pub async fn call<MB>(
&self,
to_addr: &SocketAddr,
@ -239,6 +262,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
}
}
/// HTTP client used to make RPCs
pub struct RpcHttpClient {
request_limiter: Semaphore,
method: ClientMethod,
@ -250,6 +274,7 @@ enum ClientMethod {
}
impl RpcHttpClient {
/// Create a new RpcHttpClient
pub fn new(
max_concurrent_requests: usize,
tls_config: &Option<TlsConfig>,
@ -280,6 +305,7 @@ impl RpcHttpClient {
})
}
/// Make a RPC
async fn call<M, MB>(
&self,
path: &str,

View file

@ -1,3 +1,4 @@
//! Contains structs related to receiving RPCs
use std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::Pin;
@ -22,13 +23,17 @@ use garage_util::error::Error;
use crate::tls_util;
/// Trait for messages that can be sent as RPC
pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {}
type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>;
type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>;
/// Structure handling RPCs
pub struct RpcServer {
/// The address the RpcServer will bind
pub bind_addr: SocketAddr,
/// The tls configuration used for RPC
pub tls_config: Option<TlsConfig>,
handlers: HashMap<String, Handler>,
@ -87,6 +92,7 @@ where
}
impl RpcServer {
/// Create a new RpcServer
pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self {
Self {
bind_addr,
@ -95,6 +101,7 @@ impl RpcServer {
}
}
/// Add handler handling request made to `name`
pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F)
where
M: RpcMessage + 'static,
@ -156,6 +163,7 @@ impl RpcServer {
}
}
/// Run the RpcServer
pub async fn run(
self: Arc<Self>,
shutdown_signal: impl Future<Output = ()>,

View file

@ -34,7 +34,7 @@ use crate::crdt::crdt::*;
/// and may differ from what you observed with your atomic clock!
///
/// This scheme is used by AWS S3 or Soundcloud and often without knowing
/// in entreprise when reconciliating databases with ad-hoc scripts.
/// in enterprise when reconciliating databases with ad-hoc scripts.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LWW<T> {
ts: u64,

View file

@ -37,6 +37,7 @@ where
Self { vals: vec![(k, v)] }
}
/// Add a value to the map
pub fn put(&mut self, k: K, v: V) {
self.merge(&Self::put_mutator(k, v));
}

View file

@ -74,7 +74,7 @@ where
while !*must_exit.borrow() {
match self.gc_loop_iter().await {
Ok(true) => {
// Stuff was done, loop imediately
// Stuff was done, loop immediately
continue;
}
Ok(false) => {

View file

@ -8,10 +8,10 @@ pub mod schema;
pub mod util;
pub mod data;
pub mod gc;
pub mod merkle;
mod gc;
mod merkle;
pub mod replication;
pub mod sync;
mod sync;
pub mod table;
pub use schema::*;

View file

@ -6,19 +6,19 @@ use garage_util::data::*;
use crate::replication::*;
/// Full replication schema: all nodes store everything
/// Writes are disseminated in an epidemic manner in the network
/// Advantage: do all reads locally, extremely fast
/// Inconvenient: only suitable to reasonably small tables
#[derive(Clone)]
pub struct TableFullReplication {
/// The membership manager of this node
pub system: Arc<System>,
/// Max number of faults allowed while replicating a record
pub max_faults: usize,
}
impl TableReplication for TableFullReplication {
// Full replication schema: all nodes store everything
// Writes are disseminated in an epidemic manner in the network
// Advantage: do all reads locally, extremely fast
// Inconvenient: only suitable to reasonably small tables
fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> {
vec![self.system.id]
}

View file

@ -1,6 +1,8 @@
mod parameters;
pub mod fullcopy;
pub mod sharded;
mod fullcopy;
mod sharded;
pub use fullcopy::TableFullReplication;
pub use parameters::*;
pub use sharded::TableShardedReplication;

View file

@ -2,20 +2,25 @@ use garage_rpc::ring::*;
use garage_util::data::*;
/// Trait to describe how a table shall be replicated
pub trait TableReplication: Send + Sync {
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
// Which nodes to send reads from
/// Which nodes to send read requests to
fn read_nodes(&self, hash: &Hash) -> Vec<UUID>;
/// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize;
// Which nodes to send writes to
/// Which nodes to send writes to
fn write_nodes(&self, hash: &Hash) -> Vec<UUID>;
/// Responses needed to consider a write succesfull
fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize;
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

Most of the time it is, but not always for fully replicated tables

Most of the time it is, but not always for fully replicated tables
// Accessing partitions, for Merkle tree & sync
/// Get partition for data with given hash
fn partition_of(&self, hash: &Hash) -> Partition;
/// List of existing partitions
fn partitions(&self) -> Vec<(Partition, Hash)>;
}

View file

@ -6,22 +6,25 @@ use garage_util::data::*;
use crate::replication::*;
/// Sharded replication schema:
/// - based on the ring of nodes, a certain set of neighbors
/// store entries, given as a function of the position of the
/// entry's hash in the ring
/// - reads are done on all of the nodes that replicate the data
/// - writes as well
#[derive(Clone)]
pub struct TableShardedReplication {
/// The membership manager of this node
pub system: Arc<System>,
/// How many time each data should be replicated
pub replication_factor: usize,
/// How many nodes to contact for a read, should be at most `replication_factor`
pub read_quorum: usize,
/// How many nodes to contact for a write, should be at most `replication_factor`
pub write_quorum: usize,
}
impl TableReplication for TableShardedReplication {
// Sharded replication schema:
// - based on the ring of nodes, a certain set of neighbors
// store entries, given as a function of the position of the
// entry's hash in the ring
// - reads are done on all of the nodes that replicate the data
// - writes as well
fn read_nodes(&self, hash: &Hash) -> Vec<UUID> {
let ring = self.system.ring.borrow().clone();
ring.walk_ring(&hash, self.replication_factor)

View file

@ -4,7 +4,9 @@ use garage_util::data::*;
use crate::crdt::CRDT;
/// Trait for field used to partition data
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

It's not a trait for the data, but only for a field of the data that is used to partition it.

It's not a trait for the data, but only for a field of the data that is used to partition it.
pub trait PartitionKey {
/// Get the key used to partition
fn hash(&self) -> Hash;
}
@ -20,7 +22,9 @@ impl PartitionKey for Hash {
}
}
/// Trait for field used to sort data
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

Same as above, it's only a trait of one of the fields of the data

Same as above, it's only a trait of one of the fields of the data
pub trait SortKey {
/// Get the key used to sort
fn sort_key(&self) -> &[u8];
}
@ -36,25 +40,34 @@ impl SortKey for Hash {
}
}
/// Trait for an entry in a table. It must be sortable and partitionnable.
pub trait Entry<P: PartitionKey, S: SortKey>:
CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
{
/// Get the key used to partition
fn partition_key(&self) -> &P;
/// Get the key used to sort
fn sort_key(&self) -> &S;
/// Is the entry a tombstone? Default implementation always return false
fn is_tombstone(&self) -> bool {
false
}
}
/// Trait for the schema used in a table
pub trait TableSchema: Send + Sync {
/// The partition key used in that table
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
/// The sort key used int that table
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
/// They type for an entry in that table
type E: Entry<Self::P, Self::S>;
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
// Action to take if not able to decode current version:
// try loading from an older version
/// Try migrating an entry from an older version
fn try_migrate(_bytes: &[u8]) -> Option<Self::E> {
None
}
@ -65,7 +78,5 @@ pub trait TableSchema: Send + Sync {
// to stderr.
fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {}
Outdated
Review

Why remove empty implementation?

Why remove empty implementation?

As explained above, I feel like a default implementation should be what we want in most case. Of 5 implementations of this trait, only 2 actually use an empty updated, so being explicit about it seems to me like a better option.

As explained above, I feel like a default implementation should be what we want in most case. Of 5 implementations of this trait, only 2 actually use an empty `updated`, so being explicit about it seems to me like a better option.
Outdated
Review

Idk, to me the reasonning was that updated() is implemented when we need to add logic to do something when an entry changes, but if we don't need to add anything we shouldn't have to write an empty updated() handler. I don't see how having an empty default implementation can cause much confusion as it does literally nothing.

Idk, to me the reasonning was that `updated()` is implemented when we need to add logic to do something when an entry changes, but if we don't need to add anything we shouldn't have to write an empty `updated()` handler. I don't see how having an empty default implementation can cause much confusion as it does literally nothing.
fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
true
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool;
}

View file

@ -1,3 +1,4 @@
//! Job runner for futures and async functions
use core::future::Future;
use std::pin::Pin;
use std::sync::Arc;
@ -12,14 +13,15 @@ use crate::error::Error;
type JobOutput = Result<(), Error>;
type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
/// Job runner for futures and async functions
pub struct BackgroundRunner {
pub stop_signal: watch::Receiver<bool>,
stop_signal: watch::Receiver<bool>,
queue_in: mpsc::UnboundedSender<(Job, bool)>,
worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
}
impl BackgroundRunner {
/// Create a new BackgroundRunner
pub fn new(
n_runners: usize,
stop_signal: watch::Receiver<bool>,
@ -103,7 +105,7 @@ impl BackgroundRunner {
(bgrunner, await_all_done)
}
// Spawn a task to be run in background
/// Spawn a task to be run in background
pub fn spawn<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
@ -115,6 +117,8 @@ impl BackgroundRunner {
.unwrap();
}
/// Spawn a task to be run in background. It may get discarded before running if spawned while
/// the runner is stopping
pub fn spawn_cancellable<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,

View file

@ -1,3 +1,4 @@
//! Contains type and functions related to Garage configuration file
use std::io::Read;
use std::net::SocketAddr;
use std::path::PathBuf;
@ -6,57 +7,82 @@ use serde::{de, Deserialize};
use crate::error::Error;
/// Represent the whole configuration
#[derive(Deserialize, Debug, Clone)]
pub struct Config {
/// Path where to store metadata. Should be fast, but low volume
pub metadata_dir: PathBuf,
/// Path where to store data. Can be slower, but need higher volume
pub data_dir: PathBuf,
/// Address to bind for RPC
pub rpc_bind_addr: SocketAddr,
/// Bootstrap peers RPC address
#[serde(deserialize_with = "deserialize_vec_addr")]
pub bootstrap_peers: Vec<SocketAddr>,
/// Consule host to connect to to discover more peers
pub consul_host: Option<String>,
/// Consul service name to use
pub consul_service_name: Option<String>,
/// Max number of concurrent RPC request
#[serde(default = "default_max_concurrent_rpc_requests")]
pub max_concurrent_rpc_requests: usize,
/// Size of data blocks to save to disk
#[serde(default = "default_block_size")]
pub block_size: usize,
#[serde(default = "default_control_write_max_faults")]
pub control_write_max_faults: usize,
/// How many nodes should hold a copy of meta data
#[serde(default = "default_replication_factor")]
pub meta_replication_factor: usize,
/// How many nodes should hold a copy of data
#[serde(default = "default_replication_factor")]
pub data_replication_factor: usize,
/// Configuration for RPC TLS
pub rpc_tls: Option<TlsConfig>,
/// Configuration for S3 api
pub s3_api: ApiConfig,
/// Configuration for serving files as normal web server
pub s3_web: WebConfig,
}
/// Configuration for RPC TLS
#[derive(Deserialize, Debug, Clone)]
pub struct TlsConfig {
/// Path to certificate autority used for all nodes
pub ca_cert: String,
/// Path to public certificate for this node
pub node_cert: String,
/// Path to private key for this node
pub node_key: String,
}
/// Configuration for S3 api
#[derive(Deserialize, Debug, Clone)]
pub struct ApiConfig {
/// Address and port to bind for api serving
pub api_bind_addr: SocketAddr,
/// S3 region to use
pub s3_region: String,
}
/// Configuration for serving files as normal web server
#[derive(Deserialize, Debug, Clone)]
pub struct WebConfig {
/// Address and port to bind for web serving
pub bind_addr: SocketAddr,
/// Suffix to remove from domain name to find bucket
pub root_domain: String,
/// Suffix to add when user-agent request path end with "/"
pub index: String,
}
@ -73,6 +99,7 @@ fn default_control_write_max_faults() -> usize {
1
}
/// Read and parse configuration
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
let mut file = std::fs::OpenOptions::new()
.read(true)

View file

@ -1,8 +1,10 @@
//! Contains common types and functions related to serialization and integrity
use rand::Rng;
use serde::de::{self, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt;
/// An array of 32 bytes
#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)]
pub struct FixedBytes32([u8; 32]);
@ -61,15 +63,20 @@ impl Serialize for FixedBytes32 {
}
impl FixedBytes32 {
/// Access the content as a slice
pub fn as_slice(&self) -> &[u8] {
&self.0[..]
}
/// Access the content as a mutable slice
pub fn as_slice_mut(&mut self) -> &mut [u8] {
&mut self.0[..]
}
/// Copy to a slice
pub fn to_vec(&self) -> Vec<u8> {
self.0.to_vec()
}
/// Try building a FixedBytes32 from a slice
/// Return None if the slice is not 32 bytes long
pub fn try_from(by: &[u8]) -> Option<Self> {
if by.len() != 32 {
return None;
@ -80,9 +87,12 @@ impl FixedBytes32 {
}
}
/// A 32 bytes UUID
pub type UUID = FixedBytes32;
/// A 256 bit cryptographic hash, can be sha256 or blake2 depending on provenance
pub type Hash = FixedBytes32;
/// Compute the sha256 of a slice
pub fn sha256sum(data: &[u8]) -> Hash {
use sha2::{Digest, Sha256};
@ -93,6 +103,7 @@ pub fn sha256sum(data: &[u8]) -> Hash {
hash.into()
}
/// Compute the blake2 of a slice
pub fn blake2sum(data: &[u8]) -> Hash {
use blake2::{Blake2b, Digest};
@ -103,8 +114,10 @@ pub fn blake2sum(data: &[u8]) -> Hash {
hash.into()
}
/// A 64 bit non cryptographic hash
pub type FastHash = u64;
/// Compute a (non cryptographic) of a slice
pub fn fasthash(data: &[u8]) -> FastHash {
use xxhash_rust::xxh3::Xxh3;
@ -113,12 +126,14 @@ pub fn fasthash(data: &[u8]) -> FastHash {
h.digest()
}
/// Generate a random 32 bytes UUID
pub fn gen_uuid() -> UUID {
rand::thread_rng().gen::<[u8; 32]>().into()
}
// RMP serialization with names of fields and variants
/// Serialize to MessagePack
pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
where
T: Serialize + ?Sized,
@ -131,10 +146,13 @@ where
Ok(wr)
}
/// Serialize to JSON, truncating long result
pub fn debug_serialize<T: Serialize>(x: T) -> String {
match serde_json::to_string(&x) {
Ok(ss) => {
if ss.len() > 100 {
// TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
// (or more) codepoint
ss[..100].to_string()
} else {
ss

View file

@ -1,9 +1,11 @@
//! Module containing error types used in Garage
use err_derive::Error;
use hyper::StatusCode;
use std::io;
use crate::data::*;
/// RPC related errors
#[derive(Debug, Error)]
pub enum RPCError {
#[error(display = "Node is down: {:?}.", _0)]
@ -28,6 +30,7 @@ pub enum RPCError {
TooManyErrors(Vec<String>),
}
/// Regroup all Garage errors
#[derive(Debug, Error)]
pub enum Error {
#[error(display = "IO error: {}", _0)]

View file

@ -1,3 +1,5 @@
//! Crate containing common functions and types used in Garage
#[macro_use]
extern crate log;

View file

@ -1,6 +1,8 @@
//! Module containing helper functions to manipulate time
use chrono::{SecondsFormat, TimeZone, Utc};
use std::time::{SystemTime, UNIX_EPOCH};
/// Returns milliseconds since UNIX Epoch
pub fn now_msec() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
@ -8,6 +10,8 @@ pub fn now_msec() -> u64 {
.as_millis() as u64
}
/// Convert a timestamp represented as milliseconds since UNIX Epoch to
/// its RFC3339 representation, such as "2021-01-01T12:30:00Z"
pub fn msec_to_rfc3339(msecs: u64) -> String {
let secs = msecs as i64 / 1000;
let nanos = (msecs as i64 % 1000) as u32 * 1_000_000;

View file

@ -3,30 +3,37 @@ use hyper::StatusCode;
use garage_util::error::Error as GarageError;
/// Errors of this crate
#[derive(Debug, Error)]
pub enum Error {
/// An error received from the API crate
#[error(display = "API error: {}", _0)]
ApiError(#[error(source)] garage_api::error::Error),
ApiError(#[error(source)] garage_api::Error),
// Category: internal error
/// Error internal to garage
#[error(display = "Internal error: {}", _0)]
InternalError(#[error(source)] GarageError),
/// The file does not exist
#[error(display = "Not found")]
NotFound,
// Category: bad request
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

see garage_api/error.rs

see garage_api/error.rs
#[error(display = "Invalid UTF-8: {}", _0)]
InvalidUTF8(#[error(source)] std::str::Utf8Error),
/// The client send a header with invalid value
#[error(display = "Invalid header value: {}", _0)]
InvalidHeader(#[error(source)] hyper::header::ToStrError),
/// The client sent a request without host, or with unsupported method
#[error(display = "Bad request: {}", _0)]
BadRequest(String),
}
impl Error {
/// Transform errors into http status code
pub fn http_status_code(&self) -> StatusCode {
match self {
Error::NotFound => StatusCode::NOT_FOUND,

View file

@ -1,6 +1,9 @@
//! Crate for handling web serving of s3 bucket
#[macro_use]
extern crate log;
pub mod error;
mod error;
pub use error::Error;
pub mod web_server;
mod web_server;
pub use web_server::run_web_server;

View file

@ -18,6 +18,7 @@ use garage_model::garage::Garage;
use garage_table::*;
use garage_util::error::Error as GarageError;
/// Run a web server
pub async fn run_web_server(
garage: Arc<Garage>,
shutdown_signal: impl Future<Output = ()>,