add doc comments #53

Merged
lx merged 10 commits from trinity-1686a/garage:doc-comments into main 2021-04-08 13:01:22 +00:00
38 changed files with 393 additions and 79 deletions

View file

@ -20,6 +20,7 @@ use crate::s3_get::*;
use crate::s3_list::*; use crate::s3_list::*;
use crate::s3_put::*; use crate::s3_put::*;
/// Run the S3 API server
pub async fn run_api_server( pub async fn run_api_server(
garage: Arc<Garage>, garage: Arc<Garage>,
shutdown_signal: impl Future<Output = ()>, shutdown_signal: impl Future<Output = ()>,

View file

@ -1,9 +1,13 @@
//! Module containing various helpers for encoding
/// Escape &str for xml inclusion
pub fn xml_escape(s: &str) -> String { pub fn xml_escape(s: &str) -> String {
s.replace("<", "&lt;") s.replace("<", "&lt;")
.replace(">", "&gt;") .replace(">", "&gt;")
.replace("\"", "&quot;") .replace("\"", "&quot;")
} }
/// Encode &str for use in a URI
pub fn uri_encode(string: &str, encode_slash: bool) -> String { pub fn uri_encode(string: &str, encode_slash: bool) -> String {
let mut result = String::with_capacity(string.len() * 2); let mut result = String::with_capacity(string.len() * 2);
for c in string.chars() { for c in string.chars() {
@ -24,6 +28,7 @@ pub fn uri_encode(string: &str, encode_slash: bool) -> String {
result result
} }
/// Encode &str either as an uri, or a valid string for xml inclusion
pub fn xml_encode_key(k: &str, urlencode: bool) -> String { pub fn xml_encode_key(k: &str, urlencode: bool) -> String {
if urlencode { if urlencode {
uri_encode(k, true) uri_encode(k, true)

View file

@ -3,44 +3,57 @@ use hyper::StatusCode;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
/// Errors of this crate
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum Error { pub enum Error {
// Category: internal error // Category: internal error
/// Error related to deeper parts of Garage
#[error(display = "Internal error: {}", _0)] #[error(display = "Internal error: {}", _0)]
InternalError(#[error(source)] GarageError), InternalError(#[error(source)] GarageError),
/// Error related to Hyper
#[error(display = "Internal error (Hyper error): {}", _0)] #[error(display = "Internal error (Hyper error): {}", _0)]
Hyper(#[error(source)] hyper::Error), Hyper(#[error(source)] hyper::Error),
/// Error related to HTTP
#[error(display = "Internal error (HTTP error): {}", _0)] #[error(display = "Internal error (HTTP error): {}", _0)]
HTTP(#[error(source)] http::Error), HTTP(#[error(source)] http::Error),
// Category: cannot process // Category: cannot process
/// No proper api key was used, or the signature was invalid
#[error(display = "Forbidden: {}", _0)] #[error(display = "Forbidden: {}", _0)]
Forbidden(String), Forbidden(String),
/// The object requested don't exists
#[error(display = "Not found")] #[error(display = "Not found")]
NotFound, NotFound,
// Category: bad request // Category: bad request
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

the request contained an invalid UTF-8 sequence in its path or in other parameters would be more accurate

`the request contained an invalid UTF-8 sequence in its path or in other parameters` would be more accurate
#[error(display = "Invalid UTF-8: {}", _0)] #[error(display = "Invalid UTF-8: {}", _0)]
InvalidUTF8Str(#[error(source)] std::str::Utf8Error), InvalidUTF8Str(#[error(source)] std::str::Utf8Error),
/// The request used an invalid path
#[error(display = "Invalid UTF-8: {}", _0)] #[error(display = "Invalid UTF-8: {}", _0)]
InvalidUTF8String(#[error(source)] std::string::FromUtf8Error), InvalidUTF8String(#[error(source)] std::string::FromUtf8Error),
/// Some base64 encoded data was badly encoded
#[error(display = "Invalid base64: {}", _0)] #[error(display = "Invalid base64: {}", _0)]
InvalidBase64(#[error(source)] base64::DecodeError), InvalidBase64(#[error(source)] base64::DecodeError),
/// The client sent invalid XML data
#[error(display = "Invalid XML: {}", _0)] #[error(display = "Invalid XML: {}", _0)]
InvalidXML(String), InvalidXML(String),
/// The client sent a header with invalid value
#[error(display = "Invalid header value: {}", _0)] #[error(display = "Invalid header value: {}", _0)]
InvalidHeader(#[error(source)] hyper::header::ToStrError), InvalidHeader(#[error(source)] hyper::header::ToStrError),
/// The client sent a range header with invalid value
#[error(display = "Invalid HTTP range: {:?}", _0)] #[error(display = "Invalid HTTP range: {:?}", _0)]
InvalidRange(#[error(from)] http_range::HttpRangeParseError), InvalidRange(#[error(from)] http_range::HttpRangeParseError),
/// The client sent an invalid request
#[error(display = "Bad request: {}", _0)] #[error(display = "Bad request: {}", _0)]
BadRequest(String), BadRequest(String),
} }
@ -52,6 +65,7 @@ impl From<roxmltree::Error> for Error {
} }
impl Error { impl Error {
/// Get the HTTP status code that best represents the meaning of the error for the client
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

rather get the HTTP status code that best represents the meaning of the error for the client

rather `get the HTTP status code that best represents the meaning of the error for the client`
pub fn http_status_code(&self) -> StatusCode { pub fn http_status_code(&self) -> StatusCode {
match self { match self {
Error::NotFound => StatusCode::NOT_FOUND, Error::NotFound => StatusCode::NOT_FOUND,
@ -65,6 +79,7 @@ impl Error {
} }
} }
/// Trait to map error to the Bad Request error code
pub trait OkOrBadRequest { pub trait OkOrBadRequest {
type S2; type S2;
fn ok_or_bad_request(self, reason: &'static str) -> Self::S2; fn ok_or_bad_request(self, reason: &'static str) -> Self::S2;
@ -93,6 +108,7 @@ impl<T> OkOrBadRequest for Option<T> {
} }
} }
/// Trait to map an error to an Internal Error code
pub trait OkOrInternalError { pub trait OkOrInternalError {
type S2; type S2;
fn ok_or_internal_error(self, reason: &'static str) -> Self::S2; fn ok_or_internal_error(self, reason: &'static str) -> Self::S2;

View file

@ -1,15 +1,19 @@
//! Crate for serving a S3 compatible API
#[macro_use] #[macro_use]
extern crate log; extern crate log;
pub mod error; mod error;
pub use error::Error;
lx marked this conversation as resolved Outdated
Outdated
Review

Doesn't this prevent us from using OkOrBadRequest and OkOrInternalError? (haven't read what's below yet)

Doesn't this prevent us from using `OkOrBadRequest` and `OkOrInternalError`? (haven't read what's below yet)

It prevents use from other crates, but not from this one. It does not break anything, but does prevent using those traits in web in the future, for instance ????

It prevents use from other crates, but not from this one. It does not break anything, but does prevent using those traits in web in the future, for instance ????
Outdated
Review

Alright, we'll make things pub again if/when we need them.

Alright, we'll make things `pub` again if/when we need them.
pub mod encoding; mod encoding;
pub mod api_server; mod api_server;
pub mod signature; pub use api_server::run_api_server;
pub mod s3_copy; mod signature;
pub mod s3_delete;
mod s3_copy;
mod s3_delete;
pub mod s3_get; pub mod s3_get;
pub mod s3_list; mod s3_list;
pub mod s3_put; mod s3_put;

View file

@ -1,3 +1,4 @@
//! Function related to GET and HEAD requests
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH}; use std::time::{Duration, UNIX_EPOCH};
@ -79,6 +80,7 @@ fn try_answer_cached(
} }
} }
/// Handle HEAD request
pub async fn handle_head( pub async fn handle_head(
garage: Arc<Garage>, garage: Arc<Garage>,
req: &Request<Body>, req: &Request<Body>,
@ -118,6 +120,7 @@ pub async fn handle_head(
Ok(response) Ok(response)
} }
/// Handle GET request
pub async fn handle_get( pub async fn handle_get(
garage: Arc<Garage>, garage: Arc<Garage>,
req: &Request<Body>, req: &Request<Body>,
@ -224,7 +227,7 @@ pub async fn handle_get(
} }
} }
pub async fn handle_get_range( async fn handle_get_range(
garage: Arc<Garage>, garage: Arc<Garage>,
version: &ObjectVersion, version: &ObjectVersion,
version_data: &ObjectVersionData, version_data: &ObjectVersionData,

View file

@ -80,7 +80,7 @@ pub struct ConfigureNodeOpt {
#[structopt(short = "c", long = "capacity")] #[structopt(short = "c", long = "capacity")]
capacity: Option<u32>, capacity: Option<u32>,
/// Optionnal node tag /// Optional node tag
#[structopt(short = "t", long = "tag")] #[structopt(short = "t", long = "tag")]
tag: Option<String>, tag: Option<String>,

View file

@ -1,4 +1,5 @@
#![recursion_limit = "1024"] #![recursion_limit = "1024"]
//! Garage CLI, used to interact with a running Garage instance, and to launch a Garage instance
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

"and to launch a Garage instance" ?

"and to launch a Garage **instance**" ?
#[macro_use] #[macro_use]
extern crate log; extern crate log;
@ -25,7 +26,7 @@ use cli::*;
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
#[structopt(name = "garage")] #[structopt(name = "garage")]
pub struct Opt { struct Opt {
/// RPC connect to this host to execute client operations /// RPC connect to this host to execute client operations
#[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")] #[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")]
pub rpc_host: SocketAddr, pub rpc_host: SocketAddr,

View file

@ -8,10 +8,10 @@ use garage_util::background::*;
use garage_util::config::*; use garage_util::config::*;
use garage_util::error::Error; use garage_util::error::Error;
use garage_api::api_server; use garage_api::run_api_server;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_rpc::rpc_server::RpcServer; use garage_rpc::rpc_server::RpcServer;
use garage_web::web_server; use garage_web::run_web_server;
use crate::admin_rpc::*; use crate::admin_rpc::*;
@ -62,8 +62,8 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initializing RPC and API servers..."); info!("Initializing RPC and API servers...");
let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone())); let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone()));
let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); let api_server = run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
let web_server = web_server::run_web_server(garage, wait_from(watch_cancel.clone())); let web_server = run_web_server(garage, wait_from(watch_cancel.clone()));
futures::try_join!( futures::try_join!(
bootstrap.map(|rv| { bootstrap.map(|rv| {

View file

@ -18,12 +18,13 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::*; use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*; use garage_rpc::rpc_server::*;
use garage_table::replication::{sharded::TableShardedReplication, TableReplication}; use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block_ref_table::*; use crate::block_ref_table::*;
use crate::garage::Garage; use crate::garage::Garage;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072; pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1; pub const BACKGROUND_WORKERS: u64 = 1;
@ -33,28 +34,41 @@ const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum Message { pub enum Message {
Ok, Ok,
/// Message to ask for a block of data, by hash
GetBlock(Hash), GetBlock(Hash),
/// Message to send a block of data, either because requested, of for first delivery of new
/// block
PutBlock(PutBlockMessage), PutBlock(PutBlockMessage),
/// Ask other node if they should have this block, but don't actually have it
NeedBlockQuery(Hash), NeedBlockQuery(Hash),
/// Response : whether the node do require that block
NeedBlockReply(bool), NeedBlockReply(bool),
} }
/// Structure used to send a block
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct PutBlockMessage { pub struct PutBlockMessage {
/// Hash of the block
pub hash: Hash, pub hash: Hash,
/// Content of the block
#[serde(with = "serde_bytes")] #[serde(with = "serde_bytes")]
pub data: Vec<u8>, pub data: Vec<u8>,
} }
impl RpcMessage for Message {} impl RpcMessage for Message {}
/// The block manager, handling block exchange between nodes, and block storage on local node
pub struct BlockManager { pub struct BlockManager {
/// Replication strategy, allowing to find on which node blocks should be located
pub replication: TableShardedReplication, pub replication: TableShardedReplication,
/// Directory in which block are stored
pub data_dir: PathBuf, pub data_dir: PathBuf,
/// Lock to prevent concurrent edition of the directory
pub data_dir_lock: Mutex<()>, pub data_dir_lock: Mutex<()>,
rc: sled::Tree, rc: sled::Tree,
@ -128,7 +142,8 @@ impl BlockManager {
} }
pub fn spawn_background_worker(self: Arc<Self>) { pub fn spawn_background_worker(self: Arc<Self>) {
// Launch 2 simultaneous workers for background resync loop preprocessing // Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this
// launches only one worker with current value of BACKGROUND_WORKERS
for i in 0..BACKGROUND_WORKERS { for i in 0..BACKGROUND_WORKERS {
let bm2 = self.clone(); let bm2 = self.clone();
let background = self.system.background.clone(); let background = self.system.background.clone();
@ -141,7 +156,8 @@ impl BlockManager {
} }
} }
pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { /// Write a block to disk
async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

should probably not be pub, same for read_block and need_block

should probably not be `pub`, same for `read_block` and `need_block`
let _lock = self.data_dir_lock.lock().await; let _lock = self.data_dir_lock.lock().await;
let mut path = self.block_dir(hash); let mut path = self.block_dir(hash);
@ -159,7 +175,8 @@ impl BlockManager {
Ok(Message::Ok) Ok(Message::Ok)
} }
pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> { /// Read block from disk, verifying it's integrity
async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
let path = self.block_path(hash); let path = self.block_path(hash);
let mut f = match fs::File::open(&path).await { let mut f = match fs::File::open(&path).await {
@ -190,7 +207,8 @@ impl BlockManager {
Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
} }
pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> { /// Check if this node should have a block, but don't actually have it
async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
let needed = self let needed = self
.rc .rc
.get(hash.as_ref())? .get(hash.as_ref())?
@ -217,6 +235,8 @@ impl BlockManager {
path path
} }
/// Increment the number of time a block is used, putting it to resynchronization if it is
/// required, but not known
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
let old_rc = self.rc.fetch_and_update(&hash, |old| { let old_rc = self.rc.fetch_and_update(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0); let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@ -229,6 +249,7 @@ impl BlockManager {
Ok(()) Ok(())
} }
/// Decrement the number of time a block is used
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

resync does both: if the rc is >0 and the block is not present, ask it to peers. on the contrary if rc=0 and block is present, delete it locally (after checking that no peer needs it)

resync does both: if the rc is >0 and the block is not present, ask it to peers. on the contrary if rc=0 and block is present, delete it locally (after checking that no peer needs it)

I understand resync both get missing blocks and remove unnecessary blocks. My problem is that if I understand the code correctly, when the counter go down from 1 to 0, the value for new_rc is Some(0) (actually a Vec of 8 0u8), which is not None, so the block is not put to resync, and don't get deleted, unless some background thread regularly scan the table, which sound inneficient

I understand resync both get missing blocks and remove unnecessary blocks. My problem is that if I understand the code correctly, when the counter go down from 1 to 0, the value for new_rc is Some(0) (actually a Vec of 8 0u8), which is not None, so the block is not put to resync, and don't get deleted, unless some background thread regularly scan the table, which sound inneficient
Outdated
Review

block_decref actually deletes the value in the rc when it reaches 0, so it is indeed None and not Some(0), unless I'm mistaken? Maybe you're not looking at the last version of the code?

`block_decref` actually deletes the value in the rc when it reaches 0, so it is indeed None and not Some(0), unless I'm mistaken? Maybe you're not looking at the last version of the code?

Nevermind, I somehow read old_v >= 1 instead of old_v > 1

Nevermind, I somehow read `old_v >= 1` instead of `old_v > 1`
let new_rc = self.rc.update_and_fetch(&hash, |old| { let new_rc = self.rc.update_and_fetch(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0); let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@ -388,6 +409,7 @@ impl BlockManager {
Ok(()) Ok(())
} }
/// Ask nodes that might have a block for it
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
let who = self.replication.read_nodes(&hash); let who = self.replication.read_nodes(&hash);
let resps = self let resps = self
@ -412,6 +434,7 @@ impl BlockManager {
))) )))
} }
/// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash); let who = self.replication.write_nodes(&hash);
self.rpc_client self.rpc_client
@ -498,6 +521,7 @@ impl BlockManager {
.boxed() .boxed()
} }
/// Get lenght of resync queue
pub fn resync_queue_len(&self) -> usize { pub fn resync_queue_len(&self) -> usize {
self.resync_queue.len() self.resync_queue.len()
} }

View file

@ -10,13 +10,14 @@ use crate::block::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BlockRef { pub struct BlockRef {
// Primary key /// Hash of the block, used as partition key
pub block: Hash, pub block: Hash,
// Sort key /// Id of the Version for the object containing this block, used as sorting key
pub version: UUID, pub version: UUID,
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

version is the id of a Version object in the version table (which stores the different versions of objects in the buckets), not a version number for the BlockRef. i.e. BlockRef.version is the version id of an object that contains this block

version is the id of a Version object in the version table (which stores the different versions of objects in the buckets), not a version number for the BlockRef. i.e. `BlockRef.version` is the version id of an object that contains this block
// Keep track of deleted status // Keep track of deleted status
/// Is the Version that contains this block deleted
pub deleted: crdt::Bool, pub deleted: crdt::Bool,
} }
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

-> is the Version (a version of an object in a bucket) that contains this block deleted

-> is the Version (a version of an object in a bucket) that contains this block deleted

View file

@ -12,15 +12,18 @@ use crate::key_table::PermissionSet;
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket { pub struct Bucket {
// Primary key /// Name of the bucket
pub name: String, pub name: String,
/// State, and configuration if not deleted, of the bucket
pub state: crdt::LWW<BucketState>, pub state: crdt::LWW<BucketState>,
} }
/// State of a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum BucketState { pub enum BucketState {
/// The bucket is deleted
Deleted, Deleted,
/// The bucket exists
Present(BucketParams), Present(BucketParams),
} }
@ -37,9 +40,12 @@ impl CRDT for BucketState {
} }
} }
/// Configuration for a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketParams { pub struct BucketParams {
/// Map of key with access to the bucket, and what kind of access they give
pub authorized_keys: crdt::LWWMap<String, PermissionSet>, pub authorized_keys: crdt::LWWMap<String, PermissionSet>,
/// Is the bucket served as http
pub website: crdt::LWW<bool>, pub website: crdt::LWW<bool>,
} }
@ -51,6 +57,7 @@ impl CRDT for BucketParams {
} }
impl BucketParams { impl BucketParams {
/// Initializes a new instance of the Bucket struct
pub fn new() -> Self { pub fn new() -> Self {
BucketParams { BucketParams {
authorized_keys: crdt::LWWMap::new(), authorized_keys: crdt::LWWMap::new(),
@ -60,15 +67,21 @@ impl BucketParams {
} }
impl Bucket { impl Bucket {
/// Create a new bucket
Outdated
Review

This formulation is ambiguous/misleading, as the bucket is not really created until the Bucket is commited to the database

This formulation is ambiguous/misleading, as the bucket is not really created until the `Bucket` is commited to the database
Outdated
Review

Suggestion: Initializes a new instance of the Bucket struct

Suggestion: `Initializes a new instance of the Bucket struct`
pub fn new(name: String) -> Self { pub fn new(name: String) -> Self {
Bucket { Bucket {
name, name,
state: crdt::LWW::new(BucketState::Present(BucketParams::new())), state: crdt::LWW::new(BucketState::Present(BucketParams::new())),
} }
} }
/// Returns true if this represents a deleted bucket
Outdated
Review

This formulation is ambiguous/misleading, as is_deleted does not do a database query, it just checks if this struct that was returned earlier from the db has the deleted flag

This formulation is ambiguous/misleading, as `is_deleted` does not do a database query, it just checks if this struct that was returned earlier from the db has the deleted flag
Outdated
Review

Suggestion: returns true if this represents a deleted bucket

Suggestion: `returns true if this represents a deleted bucket`
pub fn is_deleted(&self) -> bool { pub fn is_deleted(&self) -> bool {
*self.state.get() == BucketState::Deleted *self.state.get() == BucketState::Deleted
} }
/// Return the list of authorized keys, when each was updated, and the permission associated to
/// the key
pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] { pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] {
match self.state.get() { match self.state.get() {
BucketState::Deleted => &[], BucketState::Deleted => &[],

View file

@ -7,8 +7,8 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::RpcHttpClient; use garage_rpc::rpc_client::RpcHttpClient;
use garage_rpc::rpc_server::RpcServer; use garage_rpc::rpc_server::RpcServer;
use garage_table::replication::fullcopy::*; use garage_table::replication::TableFullReplication;
use garage_table::replication::sharded::*; use garage_table::replication::TableShardedReplication;
use garage_table::*; use garage_table::*;
use crate::block::*; use crate::block::*;
@ -18,15 +18,23 @@ use crate::key_table::*;
use crate::object_table::*; use crate::object_table::*;
use crate::version_table::*; use crate::version_table::*;
/// An entire Garage full of data
pub struct Garage { pub struct Garage {
/// The parsed configuration Garage is running
pub config: Config, pub config: Config,
/// The local database
pub db: sled::Db, pub db: sled::Db,
/// A background job runner
pub background: Arc<BackgroundRunner>, pub background: Arc<BackgroundRunner>,
/// The membership manager
pub system: Arc<System>, pub system: Arc<System>,
/// The block manager
pub block_manager: Arc<BlockManager>, pub block_manager: Arc<BlockManager>,
/// Table containing informations about buckets
pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>, pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
/// Table containing informations about api keys
pub key_table: Arc<Table<KeyTable, TableFullReplication>>, pub key_table: Arc<Table<KeyTable, TableFullReplication>>,
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>, pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
@ -35,6 +43,7 @@ pub struct Garage {
} }
impl Garage { impl Garage {
/// Create and run garage
pub fn new( pub fn new(
config: Config, config: Config,
db: sled::Db, db: sled::Db,

View file

@ -3,26 +3,28 @@ use serde::{Deserialize, Serialize};
use garage_table::crdt::*; use garage_table::crdt::*;
use garage_table::*; use garage_table::*;
/// An api key
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Key { pub struct Key {
// Primary key /// The id of the key (immutable), used as partition key
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

please keep in the comment that this acts as the primary key

please keep in the comment that this acts as the primary key
pub key_id: String, pub key_id: String,
// Associated secret key (immutable) /// The secret_key associated
pub secret_key: String, pub secret_key: String,
// Name /// Name for the key
pub name: crdt::LWW<String>, pub name: crdt::LWW<String>,
// Deletion /// Is the key deleted
pub deleted: crdt::Bool, pub deleted: crdt::Bool,
// Authorized keys /// Buckets in which the key is authorized. Empty if `Key` is deleted
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
// CRDT interaction: deleted implies authorized_buckets is empty // CRDT interaction: deleted implies authorized_buckets is empty
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

this comment needs to be kept somewhere

this comment needs to be kept somewhere
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
} }
impl Key { impl Key {
/// Create a new key
pub fn new(name: String) -> Self { pub fn new(name: String) -> Self {
let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..]));
let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]); let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]);
@ -34,6 +36,8 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(), authorized_buckets: crdt::LWWMap::new(),
} }
} }
/// Import a key from it's parts
pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self { pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self {
Self { Self {
key_id: key_id.to_string(), key_id: key_id.to_string(),
@ -43,6 +47,8 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(), authorized_buckets: crdt::LWWMap::new(),
} }
} }
/// Create a new Key which can me merged to mark an existing key deleted
pub fn delete(key_id: String) -> Self { pub fn delete(key_id: String) -> Self {
Self { Self {
key_id, key_id,
@ -52,13 +58,16 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(), authorized_buckets: crdt::LWWMap::new(),
} }
} }
/// Add an authorized bucket, only if it wasn't there before
/// Check if `Key` is allowed to read in bucket
pub fn allow_read(&self, bucket: &str) -> bool { pub fn allow_read(&self, bucket: &str) -> bool {
self.authorized_buckets self.authorized_buckets
.get(&bucket.to_string()) .get(&bucket.to_string())
.map(|x| x.allow_read) .map(|x| x.allow_read)
.unwrap_or(false) .unwrap_or(false)
} }
/// Check if `Key` is allowed to write in bucket
pub fn allow_write(&self, bucket: &str) -> bool { pub fn allow_write(&self, bucket: &str) -> bool {
self.authorized_buckets self.authorized_buckets
.get(&bucket.to_string()) .get(&bucket.to_string())
@ -67,9 +76,12 @@ impl Key {
} }
} }
/// Permission given to a key in a bucket
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct PermissionSet { pub struct PermissionSet {
/// The key can be used to read the bucket
pub allow_read: bool, pub allow_read: bool,
/// The key can be used to write in the bucket
pub allow_write: bool, pub allow_write: bool,
} }

View file

@ -6,24 +6,26 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_table::crdt::*; use garage_table::crdt::*;
use garage_table::replication::sharded::*; use garage_table::replication::TableShardedReplication;
use garage_table::*; use garage_table::*;
use crate::version_table::*; use crate::version_table::*;
/// An object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object { pub struct Object {
// Primary key /// The bucket in which the object is stored, used as partition key
pub bucket: String, pub bucket: String,
// Sort key /// The key at which the object is stored in its bucket, used as sorting key
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

please keep in the comment that these two are used respectively as partition key (not primary key, my bad) and sort key, it's an import detail in understanding how Garage works internally

please keep in the comment that these two are used respectively as partition key (not primary key, my bad) and sort key, it's an import detail in understanding how Garage works internally
pub key: String, pub key: String,
// Data /// The list of currenty stored versions of the object
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

doesn't keep all known versions, the comment is misleading

doesn't keep all known versions, the comment is misleading

I'm not sure what it store then

I'm not sure what it store then
versions: Vec<ObjectVersion>, versions: Vec<ObjectVersion>,
} }
impl Object { impl Object {
/// Create an object from parts
pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self { pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self {
let mut ret = Self { let mut ret = Self {
bucket, bucket,
@ -36,6 +38,7 @@ impl Object {
} }
ret ret
} }
/// Adds a version if it wasn't already present /// Adds a version if it wasn't already present
pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> { pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> {
match self match self
@ -49,23 +52,32 @@ impl Object {
Ok(_) => Err(()), Ok(_) => Err(()),
} }
} }
/// Get a list of currently stored versions of `Object`
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

s/known/currently stored

s/known/currently stored
pub fn versions(&self) -> &[ObjectVersion] { pub fn versions(&self) -> &[ObjectVersion] {
&self.versions[..] &self.versions[..]
} }
} }
/// Informations about a version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion { pub struct ObjectVersion {
/// Id of the version
pub uuid: UUID, pub uuid: UUID,
/// Timestamp of when the object was created
pub timestamp: u64, pub timestamp: u64,
/// State of the version
pub state: ObjectVersionState, pub state: ObjectVersionState,
} }
/// State of an object version
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState { pub enum ObjectVersionState {
/// The version is being received
Uploading(ObjectVersionHeaders), Uploading(ObjectVersionHeaders),
/// The version is fully received
Complete(ObjectVersionData), Complete(ObjectVersionData),
/// The version uploaded containded errors or the upload was explicitly aborted
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

it can never be received and stay in Uploading state forever if CompleteMultipartUpload/AbortMultipartUpload is never called (and it doesn't matter)

it can never be received and stay in Uploading state forever if CompleteMultipartUpload/AbortMultipartUpload is never called (and it doesn't matter)
Aborted, Aborted,
} }
@ -90,10 +102,15 @@ impl CRDT for ObjectVersionState {
} }
} }
/// Data about an object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData { pub enum ObjectVersionData {
/// The object was deleted, this Version is a tombstone to mark it as such
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

the version represents a deletion of the object, i.e. at this version number the object doesn't exist (404)

the version represents a deletion of the object, i.e. at this version number the object doesn't exist (404)
DeleteMarker, DeleteMarker,
/// The object is short, it's stored inlined
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
/// The object is not short, Hash of first block is stored here, next segments hashes are
/// stored in the version table
FirstBlock(ObjectVersionMeta, Hash), FirstBlock(ObjectVersionMeta, Hash),
} }
@ -101,16 +118,23 @@ impl AutoCRDT for ObjectVersionData {
const WARN_IF_DIFFERENT: bool = true; const WARN_IF_DIFFERENT: bool = true;
} }
/// Metadata about the object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta { pub struct ObjectVersionMeta {
/// Headers to send to the client
pub headers: ObjectVersionHeaders, pub headers: ObjectVersionHeaders,
/// Size of the object
pub size: u64, pub size: u64,
/// etag of the object
pub etag: String, pub etag: String,
} }
/// Additional headers for an object
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionHeaders { pub struct ObjectVersionHeaders {
/// Content type of the object
pub content_type: String, pub content_type: String,
/// Any other http headers to send
pub other: BTreeMap<String, String>, pub other: BTreeMap<String, String>,
} }
@ -118,18 +142,24 @@ impl ObjectVersion {
fn cmp_key(&self) -> (u64, UUID) { fn cmp_key(&self) -> (u64, UUID) {
(self.timestamp, self.uuid) (self.timestamp, self.uuid)
} }
/// Is the object version currently being uploaded
pub fn is_uploading(&self) -> bool { pub fn is_uploading(&self) -> bool {
match self.state { match self.state {
ObjectVersionState::Uploading(_) => true, ObjectVersionState::Uploading(_) => true,
_ => false, _ => false,
} }
} }
/// Is the object version completely received
pub fn is_complete(&self) -> bool { pub fn is_complete(&self) -> bool {
match self.state { match self.state {
ObjectVersionState::Complete(_) => true, ObjectVersionState::Complete(_) => true,
_ => false, _ => false,
} }
} }
/// Is the object version available (received and not a tombstone)
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

is the object version available as an existing object (received and not a delete marker)

is the object version available as an existing object (received and not a delete marker)
pub fn is_data(&self) -> bool { pub fn is_data(&self) -> bool {
match self.state { match self.state {
ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false, ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false,

View file

@ -5,26 +5,32 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_table::crdt::*; use garage_table::crdt::*;
use garage_table::replication::sharded::*; use garage_table::replication::TableShardedReplication;
use garage_table::*; use garage_table::*;
use crate::block_ref_table::*; use crate::block_ref_table::*;
/// A version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Version { pub struct Version {
// Primary key /// UUID of the version, used as partition key
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

used as a partition key

used as a partition key
pub uuid: UUID, pub uuid: UUID,
// Actual data: the blocks for this version // Actual data: the blocks for this version
// In the case of a multipart upload, also store the etags // In the case of a multipart upload, also store the etags
// of individual parts and check them when doing CompleteMultipartUpload // of individual parts and check them when doing CompleteMultipartUpload
/// Is this version deleted
pub deleted: crdt::Bool, pub deleted: crdt::Bool,
/// list of blocks of data composing the version
pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
/// Etag of each part in case of a multipart upload, empty otherwise
pub parts_etags: crdt::Map<u64, String>, pub parts_etags: crdt::Map<u64, String>,
// Back link to bucket+key so that we can figure if // Back link to bucket+key so that we can figure if
// this was deleted later on // this was deleted later on
/// Bucket in which the related object is stored
pub bucket: String, pub bucket: String,
/// Key in which the related object is stored
pub key: String, pub key: String,
} }
@ -43,7 +49,9 @@ impl Version {
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlockKey { pub struct VersionBlockKey {
/// Number of the part
lx marked this conversation as resolved Outdated
Outdated
Review

not necessarily starting at 1, the client decides the part numbers and we don't care (for instance he can send parts 0, 1, 2, 14, 22, 37 and 42, and that's a fine object as long as these parts match the checklist that is sent in the CompleteMultipartUpload call)

not necessarily starting at 1, the client decides the part numbers and we don't care (for instance he can send parts 0, 1, 2, 14, 22, 37 and 42, and that's a fine object as long as these parts match the checklist that is sent in the CompleteMultipartUpload call)

Ok I've read s3 doc on multi-part upload and I hate it.
Next comment appear to be false too, it's offset in part, not in the whole file

Ok I've read s3 doc on multi-part upload and I hate it. Next comment appear to be false too, it's offset in part, not in the whole file
Outdated
Review

Yes, we don't know the offset in the whole file when a part is being uploaded because parts get uploaded independently from one another.

Yes, we don't know the offset in the whole file when a part is being uploaded because parts get uploaded independently from one another.
pub part_number: u64, pub part_number: u64,
/// Offset of this sub-segment in its part
pub offset: u64, pub offset: u64,
} }
@ -61,9 +69,12 @@ impl PartialOrd for VersionBlockKey {
} }
} }
/// Informations about a single block
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlock { pub struct VersionBlock {
/// Hash of the block
pub hash: Hash, pub hash: Hash,
/// Size of the block
pub size: u64, pub size: u64,
} }

View file

@ -1,7 +1,9 @@
//! Crate containing rpc related functions and types used in Garage
#[macro_use] #[macro_use]
extern crate log; extern crate log;
pub mod consul; mod consul;
pub(crate) mod tls_util; pub(crate) mod tls_util;
pub mod membership; pub mod membership;

View file

@ -1,3 +1,4 @@
//! Module containing structs related to membership management
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

//!

`//!`
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Write as FmtWrite; use std::fmt::Write as FmtWrite;
use std::io::{Read, Write}; use std::io::{Read, Write};
@ -30,20 +31,29 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const PING_TIMEOUT: Duration = Duration::from_secs(2); const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5; const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
/// RPC endpoint used for calls related to membership
pub const MEMBERSHIP_RPC_PATH: &str = "_membership"; pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum Message { pub enum Message {
/// Response to successfull advertisements
Ok, Ok,
/// Message sent to detect other nodes status
Ping(PingMessage), Ping(PingMessage),
/// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp
PullStatus, PullStatus,
/// Ask other node its config. Answered with AdvertiseConfig
PullConfig, PullConfig,
/// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus
AdvertiseNodesUp(Vec<AdvertisedNode>), AdvertiseNodesUp(Vec<AdvertisedNode>),
/// Advertisement of nodes config. Sent spontanously or in response to PullConfig
AdvertiseConfig(NetworkConfig), AdvertiseConfig(NetworkConfig),
} }
impl RpcMessage for Message {} impl RpcMessage for Message {}
/// A ping, containing informations about status and config
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct PingMessage { pub struct PingMessage {
id: UUID, id: UUID,
@ -55,18 +65,25 @@ pub struct PingMessage {
state_info: StateInfo, state_info: StateInfo,
} }
/// A node advertisement
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AdvertisedNode { pub struct AdvertisedNode {
/// Id of the node this advertisement relates to
pub id: UUID, pub id: UUID,
/// IP and port of the node
pub addr: SocketAddr, pub addr: SocketAddr,
/// Is the node considered up
pub is_up: bool, pub is_up: bool,
/// When was the node last seen up, in milliseconds since UNIX epoch
pub last_seen: u64, pub last_seen: u64,
pub state_info: StateInfo, pub state_info: StateInfo,
} }
/// This node's membership manager
pub struct System { pub struct System {
/// The id of this node
pub id: UUID, pub id: UUID,
persist_config: Persister<NetworkConfig>, persist_config: Persister<NetworkConfig>,
@ -79,10 +96,12 @@ pub struct System {
rpc_client: Arc<RpcClient<Message>>, rpc_client: Arc<RpcClient<Message>>,
pub(crate) status: watch::Receiver<Arc<Status>>, pub(crate) status: watch::Receiver<Arc<Status>>,
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

not "viewed by this node", all nodes should have equal rings (if not it's a bug)

not "viewed by this node", all nodes should have equal rings (if not it's a bug)
/// The ring
pub ring: watch::Receiver<Arc<Ring>>, pub ring: watch::Receiver<Arc<Ring>>,
update_lock: Mutex<Updaters>, update_lock: Mutex<Updaters>,
/// The job runner of this node
pub background: Arc<BackgroundRunner>, pub background: Arc<BackgroundRunner>,
} }
@ -91,21 +110,29 @@ struct Updaters {
update_ring: watch::Sender<Arc<Ring>>, update_ring: watch::Sender<Arc<Ring>>,
} }
lx marked this conversation as resolved Outdated
Outdated
Review

when is it sorted?

when is it sorted?

it's sorted in recalculate_hash. Actually the node list is short, so sorting it is pretty much free

it's sorted in recalculate_hash. Actually the node list is short, so sorting it is pretty much free
Outdated
Review

That's my reasoning as well.

That's my reasoning as well.
/// The status of each nodes, viewed by this node
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

hash of the whole set of currently known nodes. This hash is sent in regular ping messages so nodes can detect when they have different views of the cluster. They then exchange their peer lists kind of in an anti-entropy process.

hash of the whole set of currently known nodes. This hash is sent in regular ping messages so nodes can detect when they have different views of the cluster. They then exchange their peer lists kind of in an anti-entropy process.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Status { pub struct Status {
/// Mapping of each node id to its known status
pub nodes: HashMap<UUID, Arc<StatusEntry>>, pub nodes: HashMap<UUID, Arc<StatusEntry>>,
/// Hash of `nodes`, used to detect when nodes have different views of the cluster
pub hash: Hash, pub hash: Hash,
} }
/// The status of a single node
#[derive(Debug)] #[derive(Debug)]
pub struct StatusEntry { pub struct StatusEntry {
Outdated
Review

Not sure but this counts failed requests of any kind, and not just ping messages. It is incremented in rpc_client.rs

Not sure but this counts failed requests of any kind, and not just ping messages. It is incremented in rpc_client.rs

hum, it might double-count pings, as it count once in rpc_clients, and once in this file in ping_nodes

hum, it might double-count pings, as it count once in rpc_clients, and once in this file in ping_nodes
Outdated
Review

Thanks for noticing that, I'll have to check.

Thanks for noticing that, I'll have to check.
Outdated
Review

Ok so the cound in rpc_client does not happen when sending a ping message in ping_nodes in membership.rs because we are calling RpcAddrClient::call and not RpcClient::call (i.e. we are calling the node using its IP address and not its node identifier). The increment in rpc_client is done in RpcClient::call because it needs to know the node ID. So there is no redundancy between the two.

Ok so the cound in `rpc_client` does not happen when sending a ping message in `ping_nodes` in `membership.rs` because we are calling `RpcAddrClient::call` and not `RpcClient::call` (i.e. we are calling the node using its IP address and not its node identifier). The increment in `rpc_client` is done in `RpcClient::call` because it needs to know the node ID. So there is no redundancy between the two.
/// The IP and port used to connect to this node
pub addr: SocketAddr, pub addr: SocketAddr,
/// Last time this node was seen
pub last_seen: u64, pub last_seen: u64,
/// Number of consecutive pings sent without reply to this node
pub num_failures: AtomicUsize, pub num_failures: AtomicUsize,
pub state_info: StateInfo, pub state_info: StateInfo,
} }
impl StatusEntry { impl StatusEntry {
/// is the node associated to this entry considered up
pub fn is_up(&self) -> bool { pub fn is_up(&self) -> bool {
self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
} }
@ -195,6 +222,7 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
} }
impl System { impl System {
/// Create this node's membership manager
pub fn new( pub fn new(
metadata_dir: PathBuf, metadata_dir: PathBuf,
rpc_http_client: Arc<RpcHttpClient>, rpc_http_client: Arc<RpcHttpClient>,
@ -279,6 +307,7 @@ impl System {
}); });
} }
/// Get an RPC client
pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> { pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> {
RpcClient::new( RpcClient::new(
RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()), RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()),
@ -287,6 +316,7 @@ impl System {
) )
} }
/// Save network configuration to disc
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> { async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
let ring = self.ring.borrow().clone(); let ring = self.ring.borrow().clone();
self.persist_config self.persist_config
@ -319,6 +349,7 @@ impl System {
self.rpc_client.call_many(&to[..], msg, timeout).await; self.rpc_client.call_many(&to[..], msg, timeout).await;
} }
/// Perform bootstraping, starting the ping loop
pub async fn bootstrap( pub async fn bootstrap(
self: Arc<Self>, self: Arc<Self>,
peers: Vec<SocketAddr>, peers: Vec<SocketAddr>,
@ -386,6 +417,8 @@ impl System {
} }
} else if let Some(id) = id_option { } else if let Some(id) = id_option {
if let Some(st) = status.nodes.get_mut(id) { if let Some(st) = status.nodes.get_mut(id) {
// we need to increment failure counter as call was done using by_addr so the
// counter was not auto-incremented
Outdated
Review

Actually no because the increment in rpc_client.rs is done by the RpcClient and not the RpcAddrClient, but here we are using the RpcAddrClient. (we could put this in a comment to make it clearer)

Actually no because the increment in `rpc_client.rs` is done by the `RpcClient` and not the `RpcAddrClient`, but here we are using the `RpcAddrClient`. (we could put this in a comment to make it clearer)
st.num_failures.fetch_add(1, Ordering::SeqCst); st.num_failures.fetch_add(1, Ordering::SeqCst);
if !st.is_up() { if !st.is_up() {
warn!("Node {:?} seems to be down.", id); warn!("Node {:?} seems to be down.", id);

View file

@ -1,3 +1,5 @@
//! Module containing types related to computing nodes which should receive a copy of data blocks
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

and metadata entries

and metadata entries
//! and metadata
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::convert::TryInto; use std::convert::TryInto;
@ -8,23 +10,30 @@ use garage_util::data::*;
// A partition number is encoded on 16 bits, // A partition number is encoded on 16 bits,
// i.e. we have up to 2**16 partitions. // i.e. we have up to 2**16 partitions.
// (in practice we have exactly 2**PARTITION_BITS partitions) // (in practice we have exactly 2**PARTITION_BITS partitions)
/// A partition id, stored on 16 bits
pub type Partition = u16; pub type Partition = u16;
// TODO: make this constant parametrizable in the config file // TODO: make this constant parametrizable in the config file
// For deployments with many nodes it might make sense to bump // For deployments with many nodes it might make sense to bump
// it up to 10. // it up to 10.
// Maximum value : 16 // Maximum value : 16
/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
/// presence of numerous nodes, but exponentially bigger ring. Max 16
pub const PARTITION_BITS: usize = 8; pub const PARTITION_BITS: usize = 8;
const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS); const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
// TODO: make this constant paraetrizable in the config file // TODO: make this constant paraetrizable in the config file
// (most deployments use a replication factor of 3, so...) // (most deployments use a replication factor of 3, so...)
/// The maximum number of time an object might get replicated
pub const MAX_REPLICATION: usize = 3; pub const MAX_REPLICATION: usize = 3;
Outdated
Review

Not really, a node can be known in that it answers ping messages, and not be configured if it is idle or draining pending removal by admins.

Not really, a node can be known in that it answers ping messages, and not be configured if it is idle or draining pending removal by admins.

would "available" be correct instead of known?

would "available" be correct instead of known?
Outdated
Review

Just put a TODO in the comment and I'll write it later

Just put a TODO in the comment and I'll write it later
Outdated
Review

Suggestion: The user-defined configuration of the cluster's nodes

Suggestion: `The user-defined configuration of the cluster's nodes`
/// The user-defined configuration of the cluster's nodes
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfig { pub struct NetworkConfig {
/// Map of each node's id to it's configuration
pub members: HashMap<UUID, NetworkConfigEntry>, pub members: HashMap<UUID, NetworkConfigEntry>,
/// Version of this config
pub version: u64, pub version: u64,
} }
@ -37,26 +46,40 @@ impl NetworkConfig {
} }
} }
/// The overall configuration of one (possibly remote) node
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfigEntry { pub struct NetworkConfigEntry {
/// Datacenter at which this entry belong. This infromation might be used to perform a better
/// geodistribution
pub datacenter: String, pub datacenter: String,
/// The (relative) capacity of the node
pub capacity: u32, pub capacity: u32,
/// A tag to recognize the entry, not used for other things than display
pub tag: String, pub tag: String,
} }
/// A ring distributing fairly objects to nodes
#[derive(Clone)] #[derive(Clone)]
pub struct Ring { pub struct Ring {
/// The network configuration used to generate this ring
pub config: NetworkConfig, pub config: NetworkConfig,
/// The list of entries in the ring
pub ring: Vec<RingEntry>, pub ring: Vec<RingEntry>,
} }
/// An entry in the ring
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct RingEntry { pub struct RingEntry {
/// The prefix of the Hash of object which should use this entry
pub location: Hash, pub location: Hash,
/// The nodes in which a matching object should get stored
pub nodes: [UUID; MAX_REPLICATION], pub nodes: [UUID; MAX_REPLICATION],
} }
impl Ring { impl Ring {
// TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6
// levels of imbrication. It is basically impossible to test, maintain, or understand for an
// outsider.
pub(crate) fn new(config: NetworkConfig) -> Self { pub(crate) fn new(config: NetworkConfig) -> Self {
// Create a vector of partition indices (0 to 2**PARTITION_BITS-1) // Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>(); let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
@ -166,20 +189,16 @@ impl Ring {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// eprintln!("RING: --");
// for e in ring.iter() {
// eprintln!("{:?}", e);
// }
// eprintln!("END --");
Self { config, ring } Self { config, ring }
} }
/// Get the partition in which data would fall on
pub fn partition_of(&self, from: &Hash) -> Partition { pub fn partition_of(&self, from: &Hash) -> Partition {
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
top >> (16 - PARTITION_BITS) top >> (16 - PARTITION_BITS)
} }
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

and the first hash (of a partition key) that would fall in this partition

and the first hash (of a partition key) that would fall in this partition
/// Get the list of partitions and the first hash of a partition key that would fall in it
pub fn partitions(&self) -> Vec<(Partition, Hash)> { pub fn partitions(&self) -> Vec<(Partition, Hash)> {
let mut ret = vec![]; let mut ret = vec![];
@ -193,6 +212,8 @@ impl Ring {
ret ret
} }
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

TODO rename this function as it doesn't walk the ring anymore but just returns the nodes at the corresponding location. This is vocabulary from the previous implementation that used datacenter-aware ring walking and led to data imbalance.

TODO rename this function as it doesn't walk the ring anymore but just returns the nodes at the corresponding location. This is vocabulary from the previous implementation that used datacenter-aware ring walking and led to data imbalance.
// TODO rename this function as it no longer walk the ring
/// Walk the ring to find the n servers in which data should be replicated
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> { pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
if self.ring.len() != 1 << PARTITION_BITS { if self.ring.len() != 1 << PARTITION_BITS {
warn!("Ring not yet ready, read/writes will be lost!"); warn!("Ring not yet ready, read/writes will be lost!");
@ -201,12 +222,15 @@ impl Ring {
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
lx marked this conversation as resolved Outdated
Outdated
Review

just to make sure I don't mess up. Should probably be factorized in a single location.

just to make sure I don't mess up. Should probably be factorized in a single location.
let partition_idx = (top >> (16 - PARTITION_BITS)) as usize; let partition_idx = (top >> (16 - PARTITION_BITS)) as usize;
// TODO why computing two time in the same way and asserting?
assert_eq!(partition_idx, self.partition_of(from) as usize); assert_eq!(partition_idx, self.partition_of(from) as usize);
let partition = &self.ring[partition_idx]; let partition = &self.ring[partition_idx];
let partition_top = let partition_top =
u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap()); u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
// TODO is this an assertion on the validity of PARTITION_MASK_U16? If so, it should
// probably be a test more than a runtime assertion
assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16); assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16);
assert!(n <= partition.nodes.len()); assert!(n <= partition.nodes.len());

View file

@ -1,3 +1,4 @@
//! Contain structs related to making RPCs
use std::borrow::Borrow; use std::borrow::Borrow;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -26,14 +27,19 @@ use crate::tls_util;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
/// Strategy to apply when making RPC
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct RequestStrategy { pub struct RequestStrategy {
/// Max time to wait for reponse
pub rs_timeout: Duration, pub rs_timeout: Duration,
/// Min number of response to consider the request successful
pub rs_quorum: usize, pub rs_quorum: usize,
/// Should requests be dropped after enough response are received
pub rs_interrupt_after_quorum: bool, pub rs_interrupt_after_quorum: bool,
} }
impl RequestStrategy { impl RequestStrategy {
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
pub fn with_quorum(quorum: usize) -> Self { pub fn with_quorum(quorum: usize) -> Self {
RequestStrategy { RequestStrategy {
rs_timeout: DEFAULT_TIMEOUT, rs_timeout: DEFAULT_TIMEOUT,
@ -41,19 +47,25 @@ impl RequestStrategy {
rs_interrupt_after_quorum: false, rs_interrupt_after_quorum: false,
} }
} }
/// Set timeout of the strategy
pub fn with_timeout(mut self, timeout: Duration) -> Self { pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.rs_timeout = timeout; self.rs_timeout = timeout;
self self
} }
/// Set if requests can be dropped after quorum has been reached
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

as a general rule: true for read requests, false for write requests

as a general rule: true for read requests, false for write requests
/// In general true for read requests, and false for write
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self { pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
self.rs_interrupt_after_quorum = interrupt; self.rs_interrupt_after_quorum = interrupt;
self self
} }
} }
/// Shortcut for a boxed async function taking a message, and resolving to another message or an
/// error
pub type LocalHandlerFn<M> = pub type LocalHandlerFn<M> =
Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>; Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>;
/// Client used to send RPC
pub struct RpcClient<M: RpcMessage> { pub struct RpcClient<M: RpcMessage> {
status: watch::Receiver<Arc<Status>>, status: watch::Receiver<Arc<Status>>,
background: Arc<BackgroundRunner>, background: Arc<BackgroundRunner>,
@ -64,6 +76,7 @@ pub struct RpcClient<M: RpcMessage> {
} }
impl<M: RpcMessage + 'static> RpcClient<M> { impl<M: RpcMessage + 'static> RpcClient<M> {
/// Create a new RpcClient from an address, a job runner, and the status of all RPC servers
pub fn new( pub fn new(
rac: RpcAddrClient<M>, rac: RpcAddrClient<M>,
background: Arc<BackgroundRunner>, background: Arc<BackgroundRunner>,
@ -77,6 +90,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}) })
} }
/// Set the local handler, to process RPC to this node without network usage
pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F) pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F)
where where
F: Fn(Arc<M>) -> Fut + Send + Sync + 'static, F: Fn(Arc<M>) -> Fut + Send + Sync + 'static,
@ -90,14 +104,17 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
self.local_handler.swap(Some(Arc::new((my_id, handler)))); self.local_handler.swap(Some(Arc::new((my_id, handler))));
} }
/// Get a RPC client to make calls using node's SocketAddr instead of its ID
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

Please don't call this get_addr, it's not what it does ! It does not return a SocketAddr or something like that. It is used to get the underlying RpcAddrClient which allows for RPC by specifing the target node's SocketAddr instead of their node IDs. Also, the comment is wrong.

Please don't call this `get_addr`, it's not what it does ! It does not return a SocketAddr or something like that. It is used to get the underlying RpcAddrClient which allows for RPC by specifing the target node's SocketAddr instead of their node IDs. Also, the comment is wrong.
pub fn by_addr(&self) -> &RpcAddrClient<M> { pub fn by_addr(&self) -> &RpcAddrClient<M> {
&self.rpc_addr_client &self.rpc_addr_client
} }
/// Make a RPC call
pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> { pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> {
self.call_arc(to, Arc::new(msg), timeout).await self.call_arc(to, Arc::new(msg), timeout).await
} }
/// Make a RPC call from a message stored in an Arc
pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> { pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
if let Some(lh) = self.local_handler.load_full() { if let Some(lh) = self.local_handler.load_full() {
let (my_id, local_handler) = lh.as_ref(); let (my_id, local_handler) = lh.as_ref();
@ -135,6 +152,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
} }
} }
/// Make a RPC call to multiple servers, returning a Vec containing each result
pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> { pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
let msg = Arc::new(msg); let msg = Arc::new(msg);
let mut resp_stream = to let mut resp_stream = to
@ -149,6 +167,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
results results
} }
/// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

too many*

too many*
/// strategy could not be respected due to too many errors
pub async fn try_call_many( pub async fn try_call_many(
self: &Arc<Self>, self: &Arc<Self>,
to: &[UUID], to: &[UUID],
@ -208,6 +228,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
} }
} }
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

No. This is an RPC client which contains only the necessary logic to call RPC's to nodes by specifying their SocketAddr, and not by specifying their node IDs. It is used as an underlying layer for RpcClient.

No. This is an RPC client which contains only the necessary logic to call RPC's to nodes by specifying their SocketAddr, and not by specifying their node IDs. It is used as an underlying layer for RpcClient.
/// Thin wrapper arround an `RpcHttpClient` specifying the path of the request
pub struct RpcAddrClient<M: RpcMessage> { pub struct RpcAddrClient<M: RpcMessage> {
phantom: PhantomData<M>, phantom: PhantomData<M>,
@ -216,6 +237,7 @@ pub struct RpcAddrClient<M: RpcMessage> {
} }
impl<M: RpcMessage> RpcAddrClient<M> { impl<M: RpcMessage> RpcAddrClient<M> {
/// Create an RpcAddrClient from an HTTP client and the endpoint to reach for RPCs
pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self { pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self {
Self { Self {
phantom: PhantomData::default(), phantom: PhantomData::default(),
@ -224,6 +246,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
} }
} }
/// Make a RPC
pub async fn call<MB>( pub async fn call<MB>(
&self, &self,
to_addr: &SocketAddr, to_addr: &SocketAddr,
@ -239,6 +262,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
} }
} }
/// HTTP client used to make RPCs
pub struct RpcHttpClient { pub struct RpcHttpClient {
request_limiter: Semaphore, request_limiter: Semaphore,
method: ClientMethod, method: ClientMethod,
@ -250,6 +274,7 @@ enum ClientMethod {
} }
impl RpcHttpClient { impl RpcHttpClient {
/// Create a new RpcHttpClient
pub fn new( pub fn new(
max_concurrent_requests: usize, max_concurrent_requests: usize,
tls_config: &Option<TlsConfig>, tls_config: &Option<TlsConfig>,
@ -280,6 +305,7 @@ impl RpcHttpClient {
}) })
} }
/// Make a RPC
async fn call<M, MB>( async fn call<M, MB>(
&self, &self,
path: &str, path: &str,

View file

@ -1,3 +1,4 @@
//! Contains structs related to receiving RPCs
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
@ -22,13 +23,17 @@ use garage_util::error::Error;
use crate::tls_util; use crate::tls_util;
/// Trait for messages that can be sent as RPC
pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {} pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {}
type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>; type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>;
type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>; type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>;
/// Structure handling RPCs
pub struct RpcServer { pub struct RpcServer {
/// The address the RpcServer will bind
pub bind_addr: SocketAddr, pub bind_addr: SocketAddr,
/// The tls configuration used for RPC
pub tls_config: Option<TlsConfig>, pub tls_config: Option<TlsConfig>,
handlers: HashMap<String, Handler>, handlers: HashMap<String, Handler>,
@ -87,6 +92,7 @@ where
} }
impl RpcServer { impl RpcServer {
/// Create a new RpcServer
pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self { pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self {
Self { Self {
bind_addr, bind_addr,
@ -95,6 +101,7 @@ impl RpcServer {
} }
} }
/// Add handler handling request made to `name`
pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F) pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F)
where where
M: RpcMessage + 'static, M: RpcMessage + 'static,
@ -156,6 +163,7 @@ impl RpcServer {
} }
} }
/// Run the RpcServer
pub async fn run( pub async fn run(
self: Arc<Self>, self: Arc<Self>,
shutdown_signal: impl Future<Output = ()>, shutdown_signal: impl Future<Output = ()>,

View file

@ -34,7 +34,7 @@ use crate::crdt::crdt::*;
/// and may differ from what you observed with your atomic clock! /// and may differ from what you observed with your atomic clock!
/// ///
/// This scheme is used by AWS S3 or Soundcloud and often without knowing /// This scheme is used by AWS S3 or Soundcloud and often without knowing
/// in entreprise when reconciliating databases with ad-hoc scripts. /// in enterprise when reconciliating databases with ad-hoc scripts.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LWW<T> { pub struct LWW<T> {
ts: u64, ts: u64,

View file

@ -37,6 +37,7 @@ where
Self { vals: vec![(k, v)] } Self { vals: vec![(k, v)] }
} }
/// Add a value to the map
pub fn put(&mut self, k: K, v: V) { pub fn put(&mut self, k: K, v: V) {
self.merge(&Self::put_mutator(k, v)); self.merge(&Self::put_mutator(k, v));
} }

View file

@ -74,7 +74,7 @@ where
while !*must_exit.borrow() { while !*must_exit.borrow() {
match self.gc_loop_iter().await { match self.gc_loop_iter().await {
Ok(true) => { Ok(true) => {
// Stuff was done, loop imediately // Stuff was done, loop immediately
continue; continue;
} }
Ok(false) => { Ok(false) => {

View file

@ -8,10 +8,10 @@ pub mod schema;
pub mod util; pub mod util;
pub mod data; pub mod data;
pub mod gc; mod gc;
pub mod merkle; mod merkle;
pub mod replication; pub mod replication;
pub mod sync; mod sync;
pub mod table; pub mod table;
pub use schema::*; pub use schema::*;

View file

@ -6,19 +6,19 @@ use garage_util::data::*;
use crate::replication::*; use crate::replication::*;
/// Full replication schema: all nodes store everything
/// Writes are disseminated in an epidemic manner in the network
/// Advantage: do all reads locally, extremely fast
/// Inconvenient: only suitable to reasonably small tables
#[derive(Clone)] #[derive(Clone)]
pub struct TableFullReplication { pub struct TableFullReplication {
/// The membership manager of this node
pub system: Arc<System>, pub system: Arc<System>,
/// Max number of faults allowed while replicating a record
pub max_faults: usize, pub max_faults: usize,
} }
impl TableReplication for TableFullReplication { impl TableReplication for TableFullReplication {
// Full replication schema: all nodes store everything
// Writes are disseminated in an epidemic manner in the network
// Advantage: do all reads locally, extremely fast
// Inconvenient: only suitable to reasonably small tables
fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> { fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> {
vec![self.system.id] vec![self.system.id]
} }

View file

@ -1,6 +1,8 @@
mod parameters; mod parameters;
pub mod fullcopy; mod fullcopy;
pub mod sharded; mod sharded;
pub use fullcopy::TableFullReplication;
pub use parameters::*; pub use parameters::*;
pub use sharded::TableShardedReplication;

View file

@ -2,20 +2,25 @@ use garage_rpc::ring::*;
use garage_util::data::*; use garage_util::data::*;
/// Trait to describe how a table shall be replicated
pub trait TableReplication: Send + Sync { pub trait TableReplication: Send + Sync {
// See examples in table_sharded.rs and table_fullcopy.rs // See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods // To understand various replication methods
// Which nodes to send reads from /// Which nodes to send read requests to
fn read_nodes(&self, hash: &Hash) -> Vec<UUID>; fn read_nodes(&self, hash: &Hash) -> Vec<UUID>;
/// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize; fn read_quorum(&self) -> usize;
// Which nodes to send writes to /// Which nodes to send writes to
fn write_nodes(&self, hash: &Hash) -> Vec<UUID>; fn write_nodes(&self, hash: &Hash) -> Vec<UUID>;
/// Responses needed to consider a write succesfull
fn write_quorum(&self) -> usize; fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize; fn max_write_errors(&self) -> usize;
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

Most of the time it is, but not always for fully replicated tables

Most of the time it is, but not always for fully replicated tables
// Accessing partitions, for Merkle tree & sync // Accessing partitions, for Merkle tree & sync
/// Get partition for data with given hash
fn partition_of(&self, hash: &Hash) -> Partition; fn partition_of(&self, hash: &Hash) -> Partition;
/// List of existing partitions
fn partitions(&self) -> Vec<(Partition, Hash)>; fn partitions(&self) -> Vec<(Partition, Hash)>;
} }

View file

@ -6,22 +6,25 @@ use garage_util::data::*;
use crate::replication::*; use crate::replication::*;
/// Sharded replication schema:
/// - based on the ring of nodes, a certain set of neighbors
/// store entries, given as a function of the position of the
/// entry's hash in the ring
/// - reads are done on all of the nodes that replicate the data
/// - writes as well
#[derive(Clone)] #[derive(Clone)]
pub struct TableShardedReplication { pub struct TableShardedReplication {
/// The membership manager of this node
pub system: Arc<System>, pub system: Arc<System>,
/// How many time each data should be replicated
pub replication_factor: usize, pub replication_factor: usize,
/// How many nodes to contact for a read, should be at most `replication_factor`
pub read_quorum: usize, pub read_quorum: usize,
/// How many nodes to contact for a write, should be at most `replication_factor`
pub write_quorum: usize, pub write_quorum: usize,
} }
impl TableReplication for TableShardedReplication { impl TableReplication for TableShardedReplication {
// Sharded replication schema:
// - based on the ring of nodes, a certain set of neighbors
// store entries, given as a function of the position of the
// entry's hash in the ring
// - reads are done on all of the nodes that replicate the data
// - writes as well
fn read_nodes(&self, hash: &Hash) -> Vec<UUID> { fn read_nodes(&self, hash: &Hash) -> Vec<UUID> {
let ring = self.system.ring.borrow().clone(); let ring = self.system.ring.borrow().clone();
ring.walk_ring(&hash, self.replication_factor) ring.walk_ring(&hash, self.replication_factor)

View file

@ -4,7 +4,9 @@ use garage_util::data::*;
use crate::crdt::CRDT; use crate::crdt::CRDT;
/// Trait for field used to partition data
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

It's not a trait for the data, but only for a field of the data that is used to partition it.

It's not a trait for the data, but only for a field of the data that is used to partition it.
pub trait PartitionKey { pub trait PartitionKey {
/// Get the key used to partition
fn hash(&self) -> Hash; fn hash(&self) -> Hash;
} }
@ -20,7 +22,9 @@ impl PartitionKey for Hash {
} }
} }
/// Trait for field used to sort data
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

Same as above, it's only a trait of one of the fields of the data

Same as above, it's only a trait of one of the fields of the data
pub trait SortKey { pub trait SortKey {
/// Get the key used to sort
fn sort_key(&self) -> &[u8]; fn sort_key(&self) -> &[u8];
} }
@ -36,25 +40,34 @@ impl SortKey for Hash {
} }
} }
/// Trait for an entry in a table. It must be sortable and partitionnable.
pub trait Entry<P: PartitionKey, S: SortKey>: pub trait Entry<P: PartitionKey, S: SortKey>:
CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
{ {
/// Get the key used to partition
fn partition_key(&self) -> &P; fn partition_key(&self) -> &P;
/// Get the key used to sort
fn sort_key(&self) -> &S; fn sort_key(&self) -> &S;
/// Is the entry a tombstone? Default implementation always return false
fn is_tombstone(&self) -> bool { fn is_tombstone(&self) -> bool {
false false
} }
} }
/// Trait for the schema used in a table
pub trait TableSchema: Send + Sync { pub trait TableSchema: Send + Sync {
/// The partition key used in that table
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
/// The sort key used int that table
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
/// They type for an entry in that table
type E: Entry<Self::P, Self::S>; type E: Entry<Self::P, Self::S>;
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
// Action to take if not able to decode current version: // Action to take if not able to decode current version:
// try loading from an older version // try loading from an older version
/// Try migrating an entry from an older version
fn try_migrate(_bytes: &[u8]) -> Option<Self::E> { fn try_migrate(_bytes: &[u8]) -> Option<Self::E> {
None None
} }
@ -65,7 +78,5 @@ pub trait TableSchema: Send + Sync {
// to stderr. // to stderr.
fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {} fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {}
Outdated
Review

Why remove empty implementation?

Why remove empty implementation?

As explained above, I feel like a default implementation should be what we want in most case. Of 5 implementations of this trait, only 2 actually use an empty updated, so being explicit about it seems to me like a better option.

As explained above, I feel like a default implementation should be what we want in most case. Of 5 implementations of this trait, only 2 actually use an empty `updated`, so being explicit about it seems to me like a better option.
Outdated
Review

Idk, to me the reasonning was that updated() is implemented when we need to add logic to do something when an entry changes, but if we don't need to add anything we shouldn't have to write an empty updated() handler. I don't see how having an empty default implementation can cause much confusion as it does literally nothing.

Idk, to me the reasonning was that `updated()` is implemented when we need to add logic to do something when an entry changes, but if we don't need to add anything we shouldn't have to write an empty `updated()` handler. I don't see how having an empty default implementation can cause much confusion as it does literally nothing.
fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool;
true
}
} }

View file

@ -1,3 +1,4 @@
//! Job runner for futures and async functions
use core::future::Future; use core::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
@ -12,14 +13,15 @@ use crate::error::Error;
type JobOutput = Result<(), Error>; type JobOutput = Result<(), Error>;
type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
/// Job runner for futures and async functions
pub struct BackgroundRunner { pub struct BackgroundRunner {
pub stop_signal: watch::Receiver<bool>, stop_signal: watch::Receiver<bool>,
queue_in: mpsc::UnboundedSender<(Job, bool)>, queue_in: mpsc::UnboundedSender<(Job, bool)>,
worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>, worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
} }
impl BackgroundRunner { impl BackgroundRunner {
/// Create a new BackgroundRunner
pub fn new( pub fn new(
n_runners: usize, n_runners: usize,
stop_signal: watch::Receiver<bool>, stop_signal: watch::Receiver<bool>,
@ -103,7 +105,7 @@ impl BackgroundRunner {
(bgrunner, await_all_done) (bgrunner, await_all_done)
} }
// Spawn a task to be run in background /// Spawn a task to be run in background
pub fn spawn<T>(&self, job: T) pub fn spawn<T>(&self, job: T)
where where
T: Future<Output = JobOutput> + Send + 'static, T: Future<Output = JobOutput> + Send + 'static,
@ -115,6 +117,8 @@ impl BackgroundRunner {
.unwrap(); .unwrap();
} }
/// Spawn a task to be run in background. It may get discarded before running if spawned while
/// the runner is stopping
pub fn spawn_cancellable<T>(&self, job: T) pub fn spawn_cancellable<T>(&self, job: T)
where where
T: Future<Output = JobOutput> + Send + 'static, T: Future<Output = JobOutput> + Send + 'static,

View file

@ -1,3 +1,4 @@
//! Contains type and functions related to Garage configuration file
use std::io::Read; use std::io::Read;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
@ -6,57 +7,82 @@ use serde::{de, Deserialize};
use crate::error::Error; use crate::error::Error;
/// Represent the whole configuration
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct Config { pub struct Config {
/// Path where to store metadata. Should be fast, but low volume
pub metadata_dir: PathBuf, pub metadata_dir: PathBuf,
/// Path where to store data. Can be slower, but need higher volume
pub data_dir: PathBuf, pub data_dir: PathBuf,
/// Address to bind for RPC
pub rpc_bind_addr: SocketAddr, pub rpc_bind_addr: SocketAddr,
/// Bootstrap peers RPC address
#[serde(deserialize_with = "deserialize_vec_addr")] #[serde(deserialize_with = "deserialize_vec_addr")]
pub bootstrap_peers: Vec<SocketAddr>, pub bootstrap_peers: Vec<SocketAddr>,
/// Consule host to connect to to discover more peers
pub consul_host: Option<String>, pub consul_host: Option<String>,
/// Consul service name to use
pub consul_service_name: Option<String>, pub consul_service_name: Option<String>,
/// Max number of concurrent RPC request
#[serde(default = "default_max_concurrent_rpc_requests")] #[serde(default = "default_max_concurrent_rpc_requests")]
pub max_concurrent_rpc_requests: usize, pub max_concurrent_rpc_requests: usize,
/// Size of data blocks to save to disk
#[serde(default = "default_block_size")] #[serde(default = "default_block_size")]
pub block_size: usize, pub block_size: usize,
#[serde(default = "default_control_write_max_faults")] #[serde(default = "default_control_write_max_faults")]
pub control_write_max_faults: usize, pub control_write_max_faults: usize,
/// How many nodes should hold a copy of meta data
#[serde(default = "default_replication_factor")] #[serde(default = "default_replication_factor")]
pub meta_replication_factor: usize, pub meta_replication_factor: usize,
/// How many nodes should hold a copy of data
#[serde(default = "default_replication_factor")] #[serde(default = "default_replication_factor")]
pub data_replication_factor: usize, pub data_replication_factor: usize,
/// Configuration for RPC TLS
pub rpc_tls: Option<TlsConfig>, pub rpc_tls: Option<TlsConfig>,
/// Configuration for S3 api
pub s3_api: ApiConfig, pub s3_api: ApiConfig,
/// Configuration for serving files as normal web server
pub s3_web: WebConfig, pub s3_web: WebConfig,
} }
/// Configuration for RPC TLS
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct TlsConfig { pub struct TlsConfig {
/// Path to certificate autority used for all nodes
pub ca_cert: String, pub ca_cert: String,
/// Path to public certificate for this node
pub node_cert: String, pub node_cert: String,
/// Path to private key for this node
pub node_key: String, pub node_key: String,
} }
/// Configuration for S3 api
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct ApiConfig { pub struct ApiConfig {
/// Address and port to bind for api serving
pub api_bind_addr: SocketAddr, pub api_bind_addr: SocketAddr,
/// S3 region to use
pub s3_region: String, pub s3_region: String,
} }
/// Configuration for serving files as normal web server
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct WebConfig { pub struct WebConfig {
/// Address and port to bind for web serving
pub bind_addr: SocketAddr, pub bind_addr: SocketAddr,
/// Suffix to remove from domain name to find bucket
pub root_domain: String, pub root_domain: String,
/// Suffix to add when user-agent request path end with "/"
pub index: String, pub index: String,
} }
@ -73,6 +99,7 @@ fn default_control_write_max_faults() -> usize {
1 1
} }
/// Read and parse configuration
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> { pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
let mut file = std::fs::OpenOptions::new() let mut file = std::fs::OpenOptions::new()
.read(true) .read(true)

View file

@ -1,8 +1,10 @@
//! Contains common types and functions related to serialization and integrity
use rand::Rng; use rand::Rng;
use serde::de::{self, Visitor}; use serde::de::{self, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt; use std::fmt;
/// An array of 32 bytes
#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)] #[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)]
pub struct FixedBytes32([u8; 32]); pub struct FixedBytes32([u8; 32]);
@ -61,15 +63,20 @@ impl Serialize for FixedBytes32 {
} }
impl FixedBytes32 { impl FixedBytes32 {
/// Access the content as a slice
pub fn as_slice(&self) -> &[u8] { pub fn as_slice(&self) -> &[u8] {
&self.0[..] &self.0[..]
} }
/// Access the content as a mutable slice
pub fn as_slice_mut(&mut self) -> &mut [u8] { pub fn as_slice_mut(&mut self) -> &mut [u8] {
&mut self.0[..] &mut self.0[..]
} }
/// Copy to a slice
pub fn to_vec(&self) -> Vec<u8> { pub fn to_vec(&self) -> Vec<u8> {
self.0.to_vec() self.0.to_vec()
} }
/// Try building a FixedBytes32 from a slice
/// Return None if the slice is not 32 bytes long
pub fn try_from(by: &[u8]) -> Option<Self> { pub fn try_from(by: &[u8]) -> Option<Self> {
if by.len() != 32 { if by.len() != 32 {
return None; return None;
@ -80,9 +87,12 @@ impl FixedBytes32 {
} }
} }
/// A 32 bytes UUID
pub type UUID = FixedBytes32; pub type UUID = FixedBytes32;
/// A 256 bit cryptographic hash, can be sha256 or blake2 depending on provenance
pub type Hash = FixedBytes32; pub type Hash = FixedBytes32;
/// Compute the sha256 of a slice
pub fn sha256sum(data: &[u8]) -> Hash { pub fn sha256sum(data: &[u8]) -> Hash {
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
@ -93,6 +103,7 @@ pub fn sha256sum(data: &[u8]) -> Hash {
hash.into() hash.into()
} }
/// Compute the blake2 of a slice
pub fn blake2sum(data: &[u8]) -> Hash { pub fn blake2sum(data: &[u8]) -> Hash {
use blake2::{Blake2b, Digest}; use blake2::{Blake2b, Digest};
@ -103,8 +114,10 @@ pub fn blake2sum(data: &[u8]) -> Hash {
hash.into() hash.into()
} }
/// A 64 bit non cryptographic hash
pub type FastHash = u64; pub type FastHash = u64;
/// Compute a (non cryptographic) of a slice
pub fn fasthash(data: &[u8]) -> FastHash { pub fn fasthash(data: &[u8]) -> FastHash {
use xxhash_rust::xxh3::Xxh3; use xxhash_rust::xxh3::Xxh3;
@ -113,12 +126,14 @@ pub fn fasthash(data: &[u8]) -> FastHash {
h.digest() h.digest()
} }
/// Generate a random 32 bytes UUID
pub fn gen_uuid() -> UUID { pub fn gen_uuid() -> UUID {
rand::thread_rng().gen::<[u8; 32]>().into() rand::thread_rng().gen::<[u8; 32]>().into()
} }
// RMP serialization with names of fields and variants // RMP serialization with names of fields and variants
/// Serialize to MessagePack
pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error> pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
where where
T: Serialize + ?Sized, T: Serialize + ?Sized,
@ -131,10 +146,13 @@ where
Ok(wr) Ok(wr)
} }
/// Serialize to JSON, truncating long result
pub fn debug_serialize<T: Serialize>(x: T) -> String { pub fn debug_serialize<T: Serialize>(x: T) -> String {
match serde_json::to_string(&x) { match serde_json::to_string(&x) {
Ok(ss) => { Ok(ss) => {
if ss.len() > 100 { if ss.len() > 100 {
// TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
// (or more) codepoint
ss[..100].to_string() ss[..100].to_string()
} else { } else {
ss ss

View file

@ -1,9 +1,11 @@
//! Module containing error types used in Garage
use err_derive::Error; use err_derive::Error;
use hyper::StatusCode; use hyper::StatusCode;
use std::io; use std::io;
use crate::data::*; use crate::data::*;
/// RPC related errors
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum RPCError { pub enum RPCError {
#[error(display = "Node is down: {:?}.", _0)] #[error(display = "Node is down: {:?}.", _0)]
@ -28,6 +30,7 @@ pub enum RPCError {
TooManyErrors(Vec<String>), TooManyErrors(Vec<String>),
} }
/// Regroup all Garage errors
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum Error { pub enum Error {
#[error(display = "IO error: {}", _0)] #[error(display = "IO error: {}", _0)]

View file

@ -1,3 +1,5 @@
//! Crate containing common functions and types used in Garage
#[macro_use] #[macro_use]
extern crate log; extern crate log;

View file

@ -1,6 +1,8 @@
//! Module containing helper functions to manipulate time
use chrono::{SecondsFormat, TimeZone, Utc}; use chrono::{SecondsFormat, TimeZone, Utc};
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
/// Returns milliseconds since UNIX Epoch
pub fn now_msec() -> u64 { pub fn now_msec() -> u64 {
SystemTime::now() SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
@ -8,6 +10,8 @@ pub fn now_msec() -> u64 {
.as_millis() as u64 .as_millis() as u64
} }
/// Convert a timestamp represented as milliseconds since UNIX Epoch to
/// its RFC3339 representation, such as "2021-01-01T12:30:00Z"
pub fn msec_to_rfc3339(msecs: u64) -> String { pub fn msec_to_rfc3339(msecs: u64) -> String {
let secs = msecs as i64 / 1000; let secs = msecs as i64 / 1000;
let nanos = (msecs as i64 % 1000) as u32 * 1_000_000; let nanos = (msecs as i64 % 1000) as u32 * 1_000_000;

View file

@ -3,30 +3,37 @@ use hyper::StatusCode;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
/// Errors of this crate
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum Error { pub enum Error {
/// An error received from the API crate
#[error(display = "API error: {}", _0)] #[error(display = "API error: {}", _0)]
ApiError(#[error(source)] garage_api::error::Error), ApiError(#[error(source)] garage_api::Error),
// Category: internal error // Category: internal error
/// Error internal to garage
#[error(display = "Internal error: {}", _0)] #[error(display = "Internal error: {}", _0)]
InternalError(#[error(source)] GarageError), InternalError(#[error(source)] GarageError),
/// The file does not exist
#[error(display = "Not found")] #[error(display = "Not found")]
NotFound, NotFound,
// Category: bad request /// The request contained an invalid UTF-8 sequence in its path or in other parameters
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

see garage_api/error.rs

see garage_api/error.rs
#[error(display = "Invalid UTF-8: {}", _0)] #[error(display = "Invalid UTF-8: {}", _0)]
InvalidUTF8(#[error(source)] std::str::Utf8Error), InvalidUTF8(#[error(source)] std::str::Utf8Error),
/// The client send a header with invalid value
#[error(display = "Invalid header value: {}", _0)] #[error(display = "Invalid header value: {}", _0)]
InvalidHeader(#[error(source)] hyper::header::ToStrError), InvalidHeader(#[error(source)] hyper::header::ToStrError),
/// The client sent a request without host, or with unsupported method
#[error(display = "Bad request: {}", _0)] #[error(display = "Bad request: {}", _0)]
BadRequest(String), BadRequest(String),
} }
impl Error { impl Error {
/// Transform errors into http status code
pub fn http_status_code(&self) -> StatusCode { pub fn http_status_code(&self) -> StatusCode {
match self { match self {
Error::NotFound => StatusCode::NOT_FOUND, Error::NotFound => StatusCode::NOT_FOUND,

View file

@ -1,6 +1,9 @@
//! Crate for handling web serving of s3 bucket
#[macro_use] #[macro_use]
extern crate log; extern crate log;
pub mod error; mod error;
pub use error::Error;
pub mod web_server; mod web_server;
pub use web_server::run_web_server;

View file

@ -18,6 +18,7 @@ use garage_model::garage::Garage;
use garage_table::*; use garage_table::*;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
/// Run a web server
pub async fn run_web_server( pub async fn run_web_server(
garage: Arc<Garage>, garage: Arc<Garage>,
shutdown_signal: impl Future<Output = ()>, shutdown_signal: impl Future<Output = ()>,