add doc comments #53
|
@ -20,6 +20,7 @@ use crate::s3_get::*;
|
|||
use crate::s3_list::*;
|
||||
use crate::s3_put::*;
|
||||
|
||||
/// Run the S3 API server
|
||||
pub async fn run_api_server(
|
||||
garage: Arc<Garage>,
|
||||
shutdown_signal: impl Future<Output = ()>,
|
||||
|
|
|
@ -1,9 +1,13 @@
|
|||
//! Module containing various helpers for encoding
|
||||
|
||||
/// Escape &str for xml inclusion
|
||||
pub fn xml_escape(s: &str) -> String {
|
||||
s.replace("<", "<")
|
||||
.replace(">", ">")
|
||||
.replace("\"", """)
|
||||
}
|
||||
|
||||
/// Encode &str for use in a URI
|
||||
pub fn uri_encode(string: &str, encode_slash: bool) -> String {
|
||||
let mut result = String::with_capacity(string.len() * 2);
|
||||
for c in string.chars() {
|
||||
|
@ -24,6 +28,7 @@ pub fn uri_encode(string: &str, encode_slash: bool) -> String {
|
|||
result
|
||||
}
|
||||
|
||||
/// Encode &str either as an uri, or a valid string for xml inclusion
|
||||
pub fn xml_encode_key(k: &str, urlencode: bool) -> String {
|
||||
if urlencode {
|
||||
uri_encode(k, true)
|
||||
|
|
|
@ -3,44 +3,57 @@ use hyper::StatusCode;
|
|||
|
||||
use garage_util::error::Error as GarageError;
|
||||
|
||||
/// Errors of this crate
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
// Category: internal error
|
||||
/// Error related to deeper parts of Garage
|
||||
#[error(display = "Internal error: {}", _0)]
|
||||
InternalError(#[error(source)] GarageError),
|
||||
|
||||
/// Error related to Hyper
|
||||
#[error(display = "Internal error (Hyper error): {}", _0)]
|
||||
Hyper(#[error(source)] hyper::Error),
|
||||
|
||||
/// Error related to HTTP
|
||||
#[error(display = "Internal error (HTTP error): {}", _0)]
|
||||
HTTP(#[error(source)] http::Error),
|
||||
|
||||
// Category: cannot process
|
||||
/// No proper api key was used, or the signature was invalid
|
||||
#[error(display = "Forbidden: {}", _0)]
|
||||
Forbidden(String),
|
||||
|
||||
/// The object requested don't exists
|
||||
#[error(display = "Not found")]
|
||||
NotFound,
|
||||
|
||||
// Category: bad request
|
||||
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
|
||||
trinity-1686a marked this conversation as resolved
Outdated
|
||||
#[error(display = "Invalid UTF-8: {}", _0)]
|
||||
InvalidUTF8Str(#[error(source)] std::str::Utf8Error),
|
||||
|
||||
/// The request used an invalid path
|
||||
#[error(display = "Invalid UTF-8: {}", _0)]
|
||||
InvalidUTF8String(#[error(source)] std::string::FromUtf8Error),
|
||||
|
||||
/// Some base64 encoded data was badly encoded
|
||||
#[error(display = "Invalid base64: {}", _0)]
|
||||
InvalidBase64(#[error(source)] base64::DecodeError),
|
||||
|
||||
/// The client sent invalid XML data
|
||||
#[error(display = "Invalid XML: {}", _0)]
|
||||
InvalidXML(String),
|
||||
|
||||
/// The client sent a header with invalid value
|
||||
#[error(display = "Invalid header value: {}", _0)]
|
||||
InvalidHeader(#[error(source)] hyper::header::ToStrError),
|
||||
|
||||
/// The client sent a range header with invalid value
|
||||
#[error(display = "Invalid HTTP range: {:?}", _0)]
|
||||
InvalidRange(#[error(from)] http_range::HttpRangeParseError),
|
||||
|
||||
/// The client sent an invalid request
|
||||
#[error(display = "Bad request: {}", _0)]
|
||||
BadRequest(String),
|
||||
}
|
||||
|
@ -52,6 +65,7 @@ impl From<roxmltree::Error> for Error {
|
|||
}
|
||||
|
||||
impl Error {
|
||||
/// Get the HTTP status code that best represents the meaning of the error for the client
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
rather rather `get the HTTP status code that best represents the meaning of the error for the client`
|
||||
pub fn http_status_code(&self) -> StatusCode {
|
||||
match self {
|
||||
Error::NotFound => StatusCode::NOT_FOUND,
|
||||
|
@ -65,6 +79,7 @@ impl Error {
|
|||
}
|
||||
}
|
||||
|
||||
/// Trait to map error to the Bad Request error code
|
||||
pub trait OkOrBadRequest {
|
||||
type S2;
|
||||
fn ok_or_bad_request(self, reason: &'static str) -> Self::S2;
|
||||
|
@ -93,6 +108,7 @@ impl<T> OkOrBadRequest for Option<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Trait to map an error to an Internal Error code
|
||||
pub trait OkOrInternalError {
|
||||
type S2;
|
||||
fn ok_or_internal_error(self, reason: &'static str) -> Self::S2;
|
||||
|
|
|
@ -1,15 +1,19 @@
|
|||
//! Crate for serving a S3 compatible API
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
pub mod error;
|
||||
mod error;
|
||||
pub use error::Error;
|
||||
|
||||
lx marked this conversation as resolved
Outdated
lx
commented
Doesn't this prevent us from using Doesn't this prevent us from using `OkOrBadRequest` and `OkOrInternalError`? (haven't read what's below yet)
trinity-1686a
commented
It prevents use from other crates, but not from this one. It does not break anything, but does prevent using those traits in web in the future, for instance ???? It prevents use from other crates, but not from this one. It does not break anything, but does prevent using those traits in web in the future, for instance ????
lx
commented
Alright, we'll make things Alright, we'll make things `pub` again if/when we need them.
|
||||
pub mod encoding;
|
||||
mod encoding;
|
||||
|
||||
pub mod api_server;
|
||||
pub mod signature;
|
||||
mod api_server;
|
||||
pub use api_server::run_api_server;
|
||||
|
||||
pub mod s3_copy;
|
||||
pub mod s3_delete;
|
||||
mod signature;
|
||||
|
||||
mod s3_copy;
|
||||
mod s3_delete;
|
||||
pub mod s3_get;
|
||||
pub mod s3_list;
|
||||
pub mod s3_put;
|
||||
mod s3_list;
|
||||
mod s3_put;
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
//! Function related to GET and HEAD requests
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, UNIX_EPOCH};
|
||||
|
||||
|
@ -79,6 +80,7 @@ fn try_answer_cached(
|
|||
}
|
||||
}
|
||||
|
||||
/// Handle HEAD request
|
||||
pub async fn handle_head(
|
||||
garage: Arc<Garage>,
|
||||
req: &Request<Body>,
|
||||
|
@ -118,6 +120,7 @@ pub async fn handle_head(
|
|||
Ok(response)
|
||||
}
|
||||
|
||||
/// Handle GET request
|
||||
pub async fn handle_get(
|
||||
garage: Arc<Garage>,
|
||||
req: &Request<Body>,
|
||||
|
@ -224,7 +227,7 @@ pub async fn handle_get(
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn handle_get_range(
|
||||
async fn handle_get_range(
|
||||
garage: Arc<Garage>,
|
||||
version: &ObjectVersion,
|
||||
version_data: &ObjectVersionData,
|
||||
|
|
|
@ -80,7 +80,7 @@ pub struct ConfigureNodeOpt {
|
|||
#[structopt(short = "c", long = "capacity")]
|
||||
capacity: Option<u32>,
|
||||
|
||||
/// Optionnal node tag
|
||||
/// Optional node tag
|
||||
#[structopt(short = "t", long = "tag")]
|
||||
tag: Option<String>,
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
#![recursion_limit = "1024"]
|
||||
//! Garage CLI, used to interact with a running Garage instance, and to launch a Garage instance
|
||||
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
"and to launch a Garage instance" ? "and to launch a Garage **instance**" ?
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
@ -25,7 +26,7 @@ use cli::*;
|
|||
|
||||
#[derive(StructOpt, Debug)]
|
||||
#[structopt(name = "garage")]
|
||||
pub struct Opt {
|
||||
struct Opt {
|
||||
/// RPC connect to this host to execute client operations
|
||||
#[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")]
|
||||
pub rpc_host: SocketAddr,
|
||||
|
|
|
@ -8,10 +8,10 @@ use garage_util::background::*;
|
|||
use garage_util::config::*;
|
||||
use garage_util::error::Error;
|
||||
|
||||
use garage_api::api_server;
|
||||
use garage_api::run_api_server;
|
||||
use garage_model::garage::Garage;
|
||||
use garage_rpc::rpc_server::RpcServer;
|
||||
use garage_web::web_server;
|
||||
use garage_web::run_web_server;
|
||||
|
||||
use crate::admin_rpc::*;
|
||||
|
||||
|
@ -62,8 +62,8 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
|
|||
|
||||
info!("Initializing RPC and API servers...");
|
||||
let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone()));
|
||||
let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
|
||||
let web_server = web_server::run_web_server(garage, wait_from(watch_cancel.clone()));
|
||||
let api_server = run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
|
||||
let web_server = run_web_server(garage, wait_from(watch_cancel.clone()));
|
||||
|
||||
futures::try_join!(
|
||||
bootstrap.map(|rv| {
|
||||
|
|
|
@ -18,12 +18,13 @@ use garage_rpc::membership::System;
|
|||
use garage_rpc::rpc_client::*;
|
||||
use garage_rpc::rpc_server::*;
|
||||
|
||||
use garage_table::replication::{sharded::TableShardedReplication, TableReplication};
|
||||
use garage_table::replication::{TableReplication, TableShardedReplication};
|
||||
|
||||
use crate::block_ref_table::*;
|
||||
|
||||
use crate::garage::Garage;
|
||||
|
||||
/// Size under which data will be stored inlined in database instead of as files
|
||||
pub const INLINE_THRESHOLD: usize = 3072;
|
||||
|
||||
pub const BACKGROUND_WORKERS: u64 = 1;
|
||||
|
@ -33,28 +34,41 @@ const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
|
|||
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
/// RPC messages used to share blocks of data between nodes
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum Message {
|
||||
Ok,
|
||||
/// Message to ask for a block of data, by hash
|
||||
GetBlock(Hash),
|
||||
/// Message to send a block of data, either because requested, of for first delivery of new
|
||||
/// block
|
||||
PutBlock(PutBlockMessage),
|
||||
/// Ask other node if they should have this block, but don't actually have it
|
||||
NeedBlockQuery(Hash),
|
||||
/// Response : whether the node do require that block
|
||||
NeedBlockReply(bool),
|
||||
}
|
||||
|
||||
/// Structure used to send a block
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct PutBlockMessage {
|
||||
/// Hash of the block
|
||||
pub hash: Hash,
|
||||
|
||||
/// Content of the block
|
||||
#[serde(with = "serde_bytes")]
|
||||
pub data: Vec<u8>,
|
||||
}
|
||||
|
||||
impl RpcMessage for Message {}
|
||||
|
||||
/// The block manager, handling block exchange between nodes, and block storage on local node
|
||||
pub struct BlockManager {
|
||||
/// Replication strategy, allowing to find on which node blocks should be located
|
||||
pub replication: TableShardedReplication,
|
||||
/// Directory in which block are stored
|
||||
pub data_dir: PathBuf,
|
||||
/// Lock to prevent concurrent edition of the directory
|
||||
pub data_dir_lock: Mutex<()>,
|
||||
|
||||
rc: sled::Tree,
|
||||
|
@ -128,7 +142,8 @@ impl BlockManager {
|
|||
}
|
||||
|
||||
pub fn spawn_background_worker(self: Arc<Self>) {
|
||||
// Launch 2 simultaneous workers for background resync loop preprocessing
|
||||
// Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this
|
||||
// launches only one worker with current value of BACKGROUND_WORKERS
|
||||
for i in 0..BACKGROUND_WORKERS {
|
||||
let bm2 = self.clone();
|
||||
let background = self.system.background.clone();
|
||||
|
@ -141,7 +156,8 @@ impl BlockManager {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
|
||||
/// Write a block to disk
|
||||
async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
should probably not be should probably not be `pub`, same for `read_block` and `need_block`
|
||||
let _lock = self.data_dir_lock.lock().await;
|
||||
|
||||
let mut path = self.block_dir(hash);
|
||||
|
@ -159,7 +175,8 @@ impl BlockManager {
|
|||
Ok(Message::Ok)
|
||||
}
|
||||
|
||||
pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
|
||||
/// Read block from disk, verifying it's integrity
|
||||
async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
|
||||
let path = self.block_path(hash);
|
||||
|
||||
let mut f = match fs::File::open(&path).await {
|
||||
|
@ -190,7 +207,8 @@ impl BlockManager {
|
|||
Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
|
||||
}
|
||||
|
||||
pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
|
||||
/// Check if this node should have a block, but don't actually have it
|
||||
async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
|
||||
let needed = self
|
||||
.rc
|
||||
.get(hash.as_ref())?
|
||||
|
@ -217,6 +235,8 @@ impl BlockManager {
|
|||
path
|
||||
}
|
||||
|
||||
/// Increment the number of time a block is used, putting it to resynchronization if it is
|
||||
/// required, but not known
|
||||
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
|
||||
let old_rc = self.rc.fetch_and_update(&hash, |old| {
|
||||
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
|
||||
|
@ -229,6 +249,7 @@ impl BlockManager {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Decrement the number of time a block is used
|
||||
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
resync does both: if the rc is >0 and the block is not present, ask it to peers. on the contrary if rc=0 and block is present, delete it locally (after checking that no peer needs it) resync does both: if the rc is >0 and the block is not present, ask it to peers. on the contrary if rc=0 and block is present, delete it locally (after checking that no peer needs it)
trinity-1686a
commented
I understand resync both get missing blocks and remove unnecessary blocks. My problem is that if I understand the code correctly, when the counter go down from 1 to 0, the value for new_rc is Some(0) (actually a Vec of 8 0u8), which is not None, so the block is not put to resync, and don't get deleted, unless some background thread regularly scan the table, which sound inneficient I understand resync both get missing blocks and remove unnecessary blocks. My problem is that if I understand the code correctly, when the counter go down from 1 to 0, the value for new_rc is Some(0) (actually a Vec of 8 0u8), which is not None, so the block is not put to resync, and don't get deleted, unless some background thread regularly scan the table, which sound inneficient
lx
commented
`block_decref` actually deletes the value in the rc when it reaches 0, so it is indeed None and not Some(0), unless I'm mistaken? Maybe you're not looking at the last version of the code?
trinity-1686a
commented
Nevermind, I somehow read Nevermind, I somehow read `old_v >= 1` instead of `old_v > 1`
|
||||
let new_rc = self.rc.update_and_fetch(&hash, |old| {
|
||||
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
|
||||
|
@ -388,6 +409,7 @@ impl BlockManager {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Ask nodes that might have a block for it
|
||||
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
|
||||
let who = self.replication.read_nodes(&hash);
|
||||
let resps = self
|
||||
|
@ -412,6 +434,7 @@ impl BlockManager {
|
|||
)))
|
||||
}
|
||||
|
||||
/// Send block to nodes that should have it
|
||||
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
|
||||
let who = self.replication.write_nodes(&hash);
|
||||
self.rpc_client
|
||||
|
@ -498,6 +521,7 @@ impl BlockManager {
|
|||
.boxed()
|
||||
}
|
||||
|
||||
/// Get lenght of resync queue
|
||||
pub fn resync_queue_len(&self) -> usize {
|
||||
self.resync_queue.len()
|
||||
}
|
||||
|
|
|
@ -10,13 +10,14 @@ use crate::block::*;
|
|||
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct BlockRef {
|
||||
// Primary key
|
||||
/// Hash of the block, used as partition key
|
||||
pub block: Hash,
|
||||
|
||||
// Sort key
|
||||
/// Id of the Version for the object containing this block, used as sorting key
|
||||
pub version: UUID,
|
||||
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
version is the id of a Version object in the version table (which stores the different versions of objects in the buckets), not a version number for the BlockRef. i.e. version is the id of a Version object in the version table (which stores the different versions of objects in the buckets), not a version number for the BlockRef. i.e. `BlockRef.version` is the version id of an object that contains this block
|
||||
// Keep track of deleted status
|
||||
/// Is the Version that contains this block deleted
|
||||
pub deleted: crdt::Bool,
|
||||
}
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
-> is the Version (a version of an object in a bucket) that contains this block deleted -> is the Version (a version of an object in a bucket) that contains this block deleted
|
||||
|
||||
|
|
|
@ -12,15 +12,18 @@ use crate::key_table::PermissionSet;
|
|||
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct Bucket {
|
||||
// Primary key
|
||||
/// Name of the bucket
|
||||
pub name: String,
|
||||
|
||||
/// State, and configuration if not deleted, of the bucket
|
||||
pub state: crdt::LWW<BucketState>,
|
||||
}
|
||||
|
||||
/// State of a bucket
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum BucketState {
|
||||
/// The bucket is deleted
|
||||
Deleted,
|
||||
/// The bucket exists
|
||||
Present(BucketParams),
|
||||
}
|
||||
|
||||
|
@ -37,9 +40,12 @@ impl CRDT for BucketState {
|
|||
}
|
||||
}
|
||||
|
||||
/// Configuration for a bucket
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct BucketParams {
|
||||
/// Map of key with access to the bucket, and what kind of access they give
|
||||
pub authorized_keys: crdt::LWWMap<String, PermissionSet>,
|
||||
/// Is the bucket served as http
|
||||
pub website: crdt::LWW<bool>,
|
||||
}
|
||||
|
||||
|
@ -51,6 +57,7 @@ impl CRDT for BucketParams {
|
|||
}
|
||||
|
||||
impl BucketParams {
|
||||
/// Initializes a new instance of the Bucket struct
|
||||
pub fn new() -> Self {
|
||||
BucketParams {
|
||||
authorized_keys: crdt::LWWMap::new(),
|
||||
|
@ -60,15 +67,21 @@ impl BucketParams {
|
|||
}
|
||||
|
||||
impl Bucket {
|
||||
/// Create a new bucket
|
||||
lx
commented
This formulation is ambiguous/misleading, as the bucket is not really created until the This formulation is ambiguous/misleading, as the bucket is not really created until the `Bucket` is commited to the database
lx
commented
Suggestion: Suggestion: `Initializes a new instance of the Bucket struct`
|
||||
pub fn new(name: String) -> Self {
|
||||
Bucket {
|
||||
name,
|
||||
state: crdt::LWW::new(BucketState::Present(BucketParams::new())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if this represents a deleted bucket
|
||||
lx
commented
This formulation is ambiguous/misleading, as This formulation is ambiguous/misleading, as `is_deleted` does not do a database query, it just checks if this struct that was returned earlier from the db has the deleted flag
lx
commented
Suggestion: Suggestion: `returns true if this represents a deleted bucket`
|
||||
pub fn is_deleted(&self) -> bool {
|
||||
*self.state.get() == BucketState::Deleted
|
||||
}
|
||||
|
||||
/// Return the list of authorized keys, when each was updated, and the permission associated to
|
||||
/// the key
|
||||
pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] {
|
||||
match self.state.get() {
|
||||
BucketState::Deleted => &[],
|
||||
|
|
|
@ -7,8 +7,8 @@ use garage_rpc::membership::System;
|
|||
use garage_rpc::rpc_client::RpcHttpClient;
|
||||
use garage_rpc::rpc_server::RpcServer;
|
||||
|
||||
use garage_table::replication::fullcopy::*;
|
||||
use garage_table::replication::sharded::*;
|
||||
use garage_table::replication::TableFullReplication;
|
||||
use garage_table::replication::TableShardedReplication;
|
||||
use garage_table::*;
|
||||
|
||||
use crate::block::*;
|
||||
|
@ -18,15 +18,23 @@ use crate::key_table::*;
|
|||
use crate::object_table::*;
|
||||
use crate::version_table::*;
|
||||
|
||||
/// An entire Garage full of data
|
||||
pub struct Garage {
|
||||
/// The parsed configuration Garage is running
|
||||
pub config: Config,
|
||||
|
||||
/// The local database
|
||||
pub db: sled::Db,
|
||||
/// A background job runner
|
||||
pub background: Arc<BackgroundRunner>,
|
||||
/// The membership manager
|
||||
pub system: Arc<System>,
|
||||
/// The block manager
|
||||
pub block_manager: Arc<BlockManager>,
|
||||
|
||||
/// Table containing informations about buckets
|
||||
pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
|
||||
/// Table containing informations about api keys
|
||||
pub key_table: Arc<Table<KeyTable, TableFullReplication>>,
|
||||
|
||||
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
|
||||
|
@ -35,6 +43,7 @@ pub struct Garage {
|
|||
}
|
||||
|
||||
impl Garage {
|
||||
/// Create and run garage
|
||||
pub fn new(
|
||||
config: Config,
|
||||
db: sled::Db,
|
||||
|
|
|
@ -3,26 +3,28 @@ use serde::{Deserialize, Serialize};
|
|||
use garage_table::crdt::*;
|
||||
use garage_table::*;
|
||||
|
||||
/// An api key
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct Key {
|
||||
// Primary key
|
||||
/// The id of the key (immutable), used as partition key
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
please keep in the comment that this acts as the primary key please keep in the comment that this acts as the primary key
|
||||
pub key_id: String,
|
||||
|
||||
// Associated secret key (immutable)
|
||||
/// The secret_key associated
|
||||
pub secret_key: String,
|
||||
|
||||
// Name
|
||||
/// Name for the key
|
||||
pub name: crdt::LWW<String>,
|
||||
|
||||
// Deletion
|
||||
/// Is the key deleted
|
||||
pub deleted: crdt::Bool,
|
||||
|
||||
// Authorized keys
|
||||
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
|
||||
/// Buckets in which the key is authorized. Empty if `Key` is deleted
|
||||
// CRDT interaction: deleted implies authorized_buckets is empty
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
this comment needs to be kept somewhere this comment needs to be kept somewhere
|
||||
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
|
||||
}
|
||||
|
||||
impl Key {
|
||||
/// Create a new key
|
||||
pub fn new(name: String) -> Self {
|
||||
let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..]));
|
||||
let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]);
|
||||
|
@ -34,6 +36,8 @@ impl Key {
|
|||
authorized_buckets: crdt::LWWMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Import a key from it's parts
|
||||
pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self {
|
||||
Self {
|
||||
key_id: key_id.to_string(),
|
||||
|
@ -43,6 +47,8 @@ impl Key {
|
|||
authorized_buckets: crdt::LWWMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new Key which can me merged to mark an existing key deleted
|
||||
pub fn delete(key_id: String) -> Self {
|
||||
Self {
|
||||
key_id,
|
||||
|
@ -52,13 +58,16 @@ impl Key {
|
|||
authorized_buckets: crdt::LWWMap::new(),
|
||||
}
|
||||
}
|
||||
/// Add an authorized bucket, only if it wasn't there before
|
||||
|
||||
/// Check if `Key` is allowed to read in bucket
|
||||
pub fn allow_read(&self, bucket: &str) -> bool {
|
||||
self.authorized_buckets
|
||||
.get(&bucket.to_string())
|
||||
.map(|x| x.allow_read)
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Check if `Key` is allowed to write in bucket
|
||||
pub fn allow_write(&self, bucket: &str) -> bool {
|
||||
self.authorized_buckets
|
||||
.get(&bucket.to_string())
|
||||
|
@ -67,9 +76,12 @@ impl Key {
|
|||
}
|
||||
}
|
||||
|
||||
/// Permission given to a key in a bucket
|
||||
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct PermissionSet {
|
||||
/// The key can be used to read the bucket
|
||||
pub allow_read: bool,
|
||||
/// The key can be used to write in the bucket
|
||||
pub allow_write: bool,
|
||||
}
|
||||
|
||||
|
|
|
@ -6,24 +6,26 @@ use garage_util::background::BackgroundRunner;
|
|||
use garage_util::data::*;
|
||||
|
||||
use garage_table::crdt::*;
|
||||
use garage_table::replication::sharded::*;
|
||||
use garage_table::replication::TableShardedReplication;
|
||||
use garage_table::*;
|
||||
|
||||
use crate::version_table::*;
|
||||
|
||||
/// An object
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct Object {
|
||||
// Primary key
|
||||
/// The bucket in which the object is stored, used as partition key
|
||||
pub bucket: String,
|
||||
|
||||
// Sort key
|
||||
/// The key at which the object is stored in its bucket, used as sorting key
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
please keep in the comment that these two are used respectively as partition key (not primary key, my bad) and sort key, it's an import detail in understanding how Garage works internally please keep in the comment that these two are used respectively as partition key (not primary key, my bad) and sort key, it's an import detail in understanding how Garage works internally
|
||||
pub key: String,
|
||||
|
||||
// Data
|
||||
/// The list of currenty stored versions of the object
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
doesn't keep all known versions, the comment is misleading doesn't keep all known versions, the comment is misleading
trinity-1686a
commented
I'm not sure what it store then I'm not sure what it store then
|
||||
versions: Vec<ObjectVersion>,
|
||||
}
|
||||
|
||||
impl Object {
|
||||
/// Create an object from parts
|
||||
pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self {
|
||||
let mut ret = Self {
|
||||
bucket,
|
||||
|
@ -36,6 +38,7 @@ impl Object {
|
|||
}
|
||||
ret
|
||||
}
|
||||
|
||||
/// Adds a version if it wasn't already present
|
||||
pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> {
|
||||
match self
|
||||
|
@ -49,23 +52,32 @@ impl Object {
|
|||
Ok(_) => Err(()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a list of currently stored versions of `Object`
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
s/known/currently stored s/known/currently stored
|
||||
pub fn versions(&self) -> &[ObjectVersion] {
|
||||
&self.versions[..]
|
||||
}
|
||||
}
|
||||
|
||||
/// Informations about a version of an object
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct ObjectVersion {
|
||||
/// Id of the version
|
||||
pub uuid: UUID,
|
||||
/// Timestamp of when the object was created
|
||||
pub timestamp: u64,
|
||||
|
||||
/// State of the version
|
||||
pub state: ObjectVersionState,
|
||||
}
|
||||
|
||||
/// State of an object version
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum ObjectVersionState {
|
||||
/// The version is being received
|
||||
Uploading(ObjectVersionHeaders),
|
||||
/// The version is fully received
|
||||
Complete(ObjectVersionData),
|
||||
/// The version uploaded containded errors or the upload was explicitly aborted
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
it can never be received and stay in Uploading state forever if CompleteMultipartUpload/AbortMultipartUpload is never called (and it doesn't matter) it can never be received and stay in Uploading state forever if CompleteMultipartUpload/AbortMultipartUpload is never called (and it doesn't matter)
|
||||
Aborted,
|
||||
}
|
||||
|
||||
|
@ -90,10 +102,15 @@ impl CRDT for ObjectVersionState {
|
|||
}
|
||||
}
|
||||
|
||||
/// Data about an object version
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum ObjectVersionData {
|
||||
/// The object was deleted, this Version is a tombstone to mark it as such
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
the version represents a deletion of the object, i.e. at this version number the object doesn't exist (404) the version represents a deletion of the object, i.e. at this version number the object doesn't exist (404)
|
||||
DeleteMarker,
|
||||
/// The object is short, it's stored inlined
|
||||
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
|
||||
/// The object is not short, Hash of first block is stored here, next segments hashes are
|
||||
/// stored in the version table
|
||||
FirstBlock(ObjectVersionMeta, Hash),
|
||||
}
|
||||
|
||||
|
@ -101,16 +118,23 @@ impl AutoCRDT for ObjectVersionData {
|
|||
const WARN_IF_DIFFERENT: bool = true;
|
||||
}
|
||||
|
||||
/// Metadata about the object version
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct ObjectVersionMeta {
|
||||
/// Headers to send to the client
|
||||
pub headers: ObjectVersionHeaders,
|
||||
/// Size of the object
|
||||
pub size: u64,
|
||||
/// etag of the object
|
||||
pub etag: String,
|
||||
}
|
||||
|
||||
/// Additional headers for an object
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct ObjectVersionHeaders {
|
||||
/// Content type of the object
|
||||
pub content_type: String,
|
||||
/// Any other http headers to send
|
||||
pub other: BTreeMap<String, String>,
|
||||
}
|
||||
|
||||
|
@ -118,18 +142,24 @@ impl ObjectVersion {
|
|||
fn cmp_key(&self) -> (u64, UUID) {
|
||||
(self.timestamp, self.uuid)
|
||||
}
|
||||
|
||||
/// Is the object version currently being uploaded
|
||||
pub fn is_uploading(&self) -> bool {
|
||||
match self.state {
|
||||
ObjectVersionState::Uploading(_) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Is the object version completely received
|
||||
pub fn is_complete(&self) -> bool {
|
||||
match self.state {
|
||||
ObjectVersionState::Complete(_) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Is the object version available (received and not a tombstone)
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
is the object version available as an existing object (received and not a delete marker) is the object version available as an existing object (received and not a delete marker)
|
||||
pub fn is_data(&self) -> bool {
|
||||
match self.state {
|
||||
ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false,
|
||||
|
|
|
@ -5,26 +5,32 @@ use garage_util::background::BackgroundRunner;
|
|||
use garage_util::data::*;
|
||||
|
||||
use garage_table::crdt::*;
|
||||
use garage_table::replication::sharded::*;
|
||||
use garage_table::replication::TableShardedReplication;
|
||||
use garage_table::*;
|
||||
|
||||
use crate::block_ref_table::*;
|
||||
|
||||
/// A version of an object
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct Version {
|
||||
// Primary key
|
||||
/// UUID of the version, used as partition key
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
used as a partition key used as a partition key
|
||||
pub uuid: UUID,
|
||||
|
||||
// Actual data: the blocks for this version
|
||||
// In the case of a multipart upload, also store the etags
|
||||
// of individual parts and check them when doing CompleteMultipartUpload
|
||||
/// Is this version deleted
|
||||
pub deleted: crdt::Bool,
|
||||
/// list of blocks of data composing the version
|
||||
pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
|
||||
/// Etag of each part in case of a multipart upload, empty otherwise
|
||||
pub parts_etags: crdt::Map<u64, String>,
|
||||
|
||||
// Back link to bucket+key so that we can figure if
|
||||
// this was deleted later on
|
||||
/// Bucket in which the related object is stored
|
||||
pub bucket: String,
|
||||
/// Key in which the related object is stored
|
||||
pub key: String,
|
||||
}
|
||||
|
||||
|
@ -43,7 +49,9 @@ impl Version {
|
|||
|
||||
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
|
||||
pub struct VersionBlockKey {
|
||||
/// Number of the part
|
||||
lx marked this conversation as resolved
Outdated
lx
commented
not necessarily starting at 1, the client decides the part numbers and we don't care (for instance he can send parts 0, 1, 2, 14, 22, 37 and 42, and that's a fine object as long as these parts match the checklist that is sent in the CompleteMultipartUpload call) not necessarily starting at 1, the client decides the part numbers and we don't care (for instance he can send parts 0, 1, 2, 14, 22, 37 and 42, and that's a fine object as long as these parts match the checklist that is sent in the CompleteMultipartUpload call)
trinity-1686a
commented
Ok I've read s3 doc on multi-part upload and I hate it. Ok I've read s3 doc on multi-part upload and I hate it.
Next comment appear to be false too, it's offset in part, not in the whole file
lx
commented
Yes, we don't know the offset in the whole file when a part is being uploaded because parts get uploaded independently from one another. Yes, we don't know the offset in the whole file when a part is being uploaded because parts get uploaded independently from one another.
|
||||
pub part_number: u64,
|
||||
/// Offset of this sub-segment in its part
|
||||
pub offset: u64,
|
||||
}
|
||||
|
||||
|
@ -61,9 +69,12 @@ impl PartialOrd for VersionBlockKey {
|
|||
}
|
||||
}
|
||||
|
||||
/// Informations about a single block
|
||||
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
|
||||
pub struct VersionBlock {
|
||||
/// Hash of the block
|
||||
pub hash: Hash,
|
||||
/// Size of the block
|
||||
pub size: u64,
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
//! Crate containing rpc related functions and types used in Garage
|
||||
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
pub mod consul;
|
||||
mod consul;
|
||||
pub(crate) mod tls_util;
|
||||
|
||||
pub mod membership;
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
//! Module containing structs related to membership management
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
`//!`
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Write as FmtWrite;
|
||||
use std::io::{Read, Write};
|
||||
|
@ -30,20 +31,29 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
|
|||
const PING_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
|
||||
|
||||
/// RPC endpoint used for calls related to membership
|
||||
pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
|
||||
|
||||
/// RPC messages related to membership
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum Message {
|
||||
/// Response to successfull advertisements
|
||||
Ok,
|
||||
/// Message sent to detect other nodes status
|
||||
Ping(PingMessage),
|
||||
/// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp
|
||||
PullStatus,
|
||||
/// Ask other node its config. Answered with AdvertiseConfig
|
||||
PullConfig,
|
||||
/// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus
|
||||
AdvertiseNodesUp(Vec<AdvertisedNode>),
|
||||
/// Advertisement of nodes config. Sent spontanously or in response to PullConfig
|
||||
AdvertiseConfig(NetworkConfig),
|
||||
}
|
||||
|
||||
impl RpcMessage for Message {}
|
||||
|
||||
/// A ping, containing informations about status and config
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct PingMessage {
|
||||
id: UUID,
|
||||
|
@ -55,18 +65,25 @@ pub struct PingMessage {
|
|||
state_info: StateInfo,
|
||||
}
|
||||
|
||||
/// A node advertisement
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct AdvertisedNode {
|
||||
/// Id of the node this advertisement relates to
|
||||
pub id: UUID,
|
||||
/// IP and port of the node
|
||||
pub addr: SocketAddr,
|
||||
|
||||
/// Is the node considered up
|
||||
pub is_up: bool,
|
||||
/// When was the node last seen up, in milliseconds since UNIX epoch
|
||||
pub last_seen: u64,
|
||||
|
||||
pub state_info: StateInfo,
|
||||
}
|
||||
|
||||
/// This node's membership manager
|
||||
pub struct System {
|
||||
/// The id of this node
|
||||
pub id: UUID,
|
||||
|
||||
persist_config: Persister<NetworkConfig>,
|
||||
|
@ -79,10 +96,12 @@ pub struct System {
|
|||
rpc_client: Arc<RpcClient<Message>>,
|
||||
|
||||
pub(crate) status: watch::Receiver<Arc<Status>>,
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
not "viewed by this node", all nodes should have equal rings (if not it's a bug) not "viewed by this node", all nodes should have equal rings (if not it's a bug)
|
||||
/// The ring
|
||||
pub ring: watch::Receiver<Arc<Ring>>,
|
||||
|
||||
update_lock: Mutex<Updaters>,
|
||||
|
||||
/// The job runner of this node
|
||||
pub background: Arc<BackgroundRunner>,
|
||||
}
|
||||
|
||||
|
@ -91,21 +110,29 @@ struct Updaters {
|
|||
update_ring: watch::Sender<Arc<Ring>>,
|
||||
}
|
||||
lx marked this conversation as resolved
Outdated
lx
commented
when is it sorted? when is it sorted?
trinity-1686a
commented
it's sorted in recalculate_hash. Actually the node list is short, so sorting it is pretty much free it's sorted in recalculate_hash. Actually the node list is short, so sorting it is pretty much free
lx
commented
That's my reasoning as well. That's my reasoning as well.
|
||||
|
||||
/// The status of each nodes, viewed by this node
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
hash of the whole set of currently known nodes. This hash is sent in regular ping messages so nodes can detect when they have different views of the cluster. They then exchange their peer lists kind of in an anti-entropy process. hash of the whole set of currently known nodes. This hash is sent in regular ping messages so nodes can detect when they have different views of the cluster. They then exchange their peer lists kind of in an anti-entropy process.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Status {
|
||||
/// Mapping of each node id to its known status
|
||||
pub nodes: HashMap<UUID, Arc<StatusEntry>>,
|
||||
/// Hash of `nodes`, used to detect when nodes have different views of the cluster
|
||||
pub hash: Hash,
|
||||
}
|
||||
|
||||
/// The status of a single node
|
||||
#[derive(Debug)]
|
||||
pub struct StatusEntry {
|
||||
lx
commented
Not sure but this counts failed requests of any kind, and not just ping messages. It is incremented in rpc_client.rs Not sure but this counts failed requests of any kind, and not just ping messages. It is incremented in rpc_client.rs
trinity-1686a
commented
hum, it might double-count pings, as it count once in rpc_clients, and once in this file in ping_nodes hum, it might double-count pings, as it count once in rpc_clients, and once in this file in ping_nodes
lx
commented
Thanks for noticing that, I'll have to check. Thanks for noticing that, I'll have to check.
lx
commented
Ok so the cound in Ok so the cound in `rpc_client` does not happen when sending a ping message in `ping_nodes` in `membership.rs` because we are calling `RpcAddrClient::call` and not `RpcClient::call` (i.e. we are calling the node using its IP address and not its node identifier). The increment in `rpc_client` is done in `RpcClient::call` because it needs to know the node ID. So there is no redundancy between the two.
|
||||
/// The IP and port used to connect to this node
|
||||
pub addr: SocketAddr,
|
||||
/// Last time this node was seen
|
||||
pub last_seen: u64,
|
||||
/// Number of consecutive pings sent without reply to this node
|
||||
pub num_failures: AtomicUsize,
|
||||
pub state_info: StateInfo,
|
||||
}
|
||||
|
||||
impl StatusEntry {
|
||||
/// is the node associated to this entry considered up
|
||||
pub fn is_up(&self) -> bool {
|
||||
self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
|
||||
}
|
||||
|
@ -195,6 +222,7 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
|
|||
}
|
||||
|
||||
impl System {
|
||||
/// Create this node's membership manager
|
||||
pub fn new(
|
||||
metadata_dir: PathBuf,
|
||||
rpc_http_client: Arc<RpcHttpClient>,
|
||||
|
@ -279,6 +307,7 @@ impl System {
|
|||
});
|
||||
}
|
||||
|
||||
/// Get an RPC client
|
||||
pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> {
|
||||
RpcClient::new(
|
||||
RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()),
|
||||
|
@ -287,6 +316,7 @@ impl System {
|
|||
)
|
||||
}
|
||||
|
||||
/// Save network configuration to disc
|
||||
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
|
||||
let ring = self.ring.borrow().clone();
|
||||
self.persist_config
|
||||
|
@ -319,6 +349,7 @@ impl System {
|
|||
self.rpc_client.call_many(&to[..], msg, timeout).await;
|
||||
}
|
||||
|
||||
/// Perform bootstraping, starting the ping loop
|
||||
pub async fn bootstrap(
|
||||
self: Arc<Self>,
|
||||
peers: Vec<SocketAddr>,
|
||||
|
@ -386,6 +417,8 @@ impl System {
|
|||
}
|
||||
} else if let Some(id) = id_option {
|
||||
if let Some(st) = status.nodes.get_mut(id) {
|
||||
// we need to increment failure counter as call was done using by_addr so the
|
||||
// counter was not auto-incremented
|
||||
lx
commented
Actually no because the increment in Actually no because the increment in `rpc_client.rs` is done by the `RpcClient` and not the `RpcAddrClient`, but here we are using the `RpcAddrClient`. (we could put this in a comment to make it clearer)
|
||||
st.num_failures.fetch_add(1, Ordering::SeqCst);
|
||||
if !st.is_up() {
|
||||
warn!("Node {:?} seems to be down.", id);
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
//! Module containing types related to computing nodes which should receive a copy of data blocks
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
and metadata entries and metadata entries
|
||||
//! and metadata
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::convert::TryInto;
|
||||
|
||||
|
@ -8,23 +10,30 @@ use garage_util::data::*;
|
|||
// A partition number is encoded on 16 bits,
|
||||
// i.e. we have up to 2**16 partitions.
|
||||
// (in practice we have exactly 2**PARTITION_BITS partitions)
|
||||
/// A partition id, stored on 16 bits
|
||||
pub type Partition = u16;
|
||||
|
||||
// TODO: make this constant parametrizable in the config file
|
||||
// For deployments with many nodes it might make sense to bump
|
||||
// it up to 10.
|
||||
// Maximum value : 16
|
||||
/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
|
||||
/// presence of numerous nodes, but exponentially bigger ring. Max 16
|
||||
pub const PARTITION_BITS: usize = 8;
|
||||
|
||||
const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
|
||||
|
||||
// TODO: make this constant paraetrizable in the config file
|
||||
// (most deployments use a replication factor of 3, so...)
|
||||
/// The maximum number of time an object might get replicated
|
||||
pub const MAX_REPLICATION: usize = 3;
|
||||
|
||||
lx
commented
Not really, a node can be known in that it answers ping messages, and not be configured if it is idle or draining pending removal by admins. Not really, a node can be known in that it answers ping messages, and not be configured if it is idle or draining pending removal by admins.
trinity-1686a
commented
would "available" be correct instead of known? would "available" be correct instead of known?
lx
commented
Just put a TODO in the comment and I'll write it later Just put a TODO in the comment and I'll write it later
lx
commented
Suggestion: Suggestion: `The user-defined configuration of the cluster's nodes`
|
||||
/// The user-defined configuration of the cluster's nodes
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct NetworkConfig {
|
||||
/// Map of each node's id to it's configuration
|
||||
pub members: HashMap<UUID, NetworkConfigEntry>,
|
||||
/// Version of this config
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
|
@ -37,26 +46,40 @@ impl NetworkConfig {
|
|||
}
|
||||
}
|
||||
|
||||
/// The overall configuration of one (possibly remote) node
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct NetworkConfigEntry {
|
||||
/// Datacenter at which this entry belong. This infromation might be used to perform a better
|
||||
/// geodistribution
|
||||
pub datacenter: String,
|
||||
/// The (relative) capacity of the node
|
||||
pub capacity: u32,
|
||||
/// A tag to recognize the entry, not used for other things than display
|
||||
pub tag: String,
|
||||
}
|
||||
|
||||
/// A ring distributing fairly objects to nodes
|
||||
#[derive(Clone)]
|
||||
pub struct Ring {
|
||||
/// The network configuration used to generate this ring
|
||||
pub config: NetworkConfig,
|
||||
/// The list of entries in the ring
|
||||
pub ring: Vec<RingEntry>,
|
||||
}
|
||||
|
||||
/// An entry in the ring
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RingEntry {
|
||||
/// The prefix of the Hash of object which should use this entry
|
||||
pub location: Hash,
|
||||
/// The nodes in which a matching object should get stored
|
||||
pub nodes: [UUID; MAX_REPLICATION],
|
||||
}
|
||||
|
||||
impl Ring {
|
||||
// TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6
|
||||
// levels of imbrication. It is basically impossible to test, maintain, or understand for an
|
||||
// outsider.
|
||||
pub(crate) fn new(config: NetworkConfig) -> Self {
|
||||
// Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
|
||||
let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
|
||||
|
@ -166,20 +189,16 @@ impl Ring {
|
|||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// eprintln!("RING: --");
|
||||
// for e in ring.iter() {
|
||||
// eprintln!("{:?}", e);
|
||||
// }
|
||||
// eprintln!("END --");
|
||||
|
||||
Self { config, ring }
|
||||
}
|
||||
|
||||
/// Get the partition in which data would fall on
|
||||
pub fn partition_of(&self, from: &Hash) -> Partition {
|
||||
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
|
||||
top >> (16 - PARTITION_BITS)
|
||||
}
|
||||
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
and the first hash (of a partition key) that would fall in this partition and the first hash (of a partition key) that would fall in this partition
|
||||
/// Get the list of partitions and the first hash of a partition key that would fall in it
|
||||
pub fn partitions(&self) -> Vec<(Partition, Hash)> {
|
||||
let mut ret = vec![];
|
||||
|
||||
|
@ -193,6 +212,8 @@ impl Ring {
|
|||
ret
|
||||
}
|
||||
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
TODO rename this function as it doesn't walk the ring anymore but just returns the nodes at the corresponding location. This is vocabulary from the previous implementation that used datacenter-aware ring walking and led to data imbalance. TODO rename this function as it doesn't walk the ring anymore but just returns the nodes at the corresponding location. This is vocabulary from the previous implementation that used datacenter-aware ring walking and led to data imbalance.
|
||||
// TODO rename this function as it no longer walk the ring
|
||||
/// Walk the ring to find the n servers in which data should be replicated
|
||||
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
|
||||
if self.ring.len() != 1 << PARTITION_BITS {
|
||||
warn!("Ring not yet ready, read/writes will be lost!");
|
||||
|
@ -201,12 +222,15 @@ impl Ring {
|
|||
|
||||
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
|
||||
lx marked this conversation as resolved
Outdated
lx
commented
just to make sure I don't mess up. Should probably be factorized in a single location. just to make sure I don't mess up. Should probably be factorized in a single location.
|
||||
let partition_idx = (top >> (16 - PARTITION_BITS)) as usize;
|
||||
// TODO why computing two time in the same way and asserting?
|
||||
assert_eq!(partition_idx, self.partition_of(from) as usize);
|
||||
|
||||
let partition = &self.ring[partition_idx];
|
||||
|
||||
let partition_top =
|
||||
u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
|
||||
// TODO is this an assertion on the validity of PARTITION_MASK_U16? If so, it should
|
||||
// probably be a test more than a runtime assertion
|
||||
assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16);
|
||||
|
||||
assert!(n <= partition.nodes.len());
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
//! Contain structs related to making RPCs
|
||||
use std::borrow::Borrow;
|
||||
use std::marker::PhantomData;
|
||||
use std::net::SocketAddr;
|
||||
|
@ -26,14 +27,19 @@ use crate::tls_util;
|
|||
|
||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
/// Strategy to apply when making RPC
|
||||
#[derive(Copy, Clone)]
|
||||
pub struct RequestStrategy {
|
||||
/// Max time to wait for reponse
|
||||
pub rs_timeout: Duration,
|
||||
/// Min number of response to consider the request successful
|
||||
pub rs_quorum: usize,
|
||||
/// Should requests be dropped after enough response are received
|
||||
pub rs_interrupt_after_quorum: bool,
|
||||
}
|
||||
|
||||
impl RequestStrategy {
|
||||
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
|
||||
pub fn with_quorum(quorum: usize) -> Self {
|
||||
RequestStrategy {
|
||||
rs_timeout: DEFAULT_TIMEOUT,
|
||||
|
@ -41,19 +47,25 @@ impl RequestStrategy {
|
|||
rs_interrupt_after_quorum: false,
|
||||
}
|
||||
}
|
||||
/// Set timeout of the strategy
|
||||
pub fn with_timeout(mut self, timeout: Duration) -> Self {
|
||||
self.rs_timeout = timeout;
|
||||
self
|
||||
}
|
||||
/// Set if requests can be dropped after quorum has been reached
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
as a general rule: true for read requests, false for write requests as a general rule: true for read requests, false for write requests
|
||||
/// In general true for read requests, and false for write
|
||||
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
|
||||
self.rs_interrupt_after_quorum = interrupt;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Shortcut for a boxed async function taking a message, and resolving to another message or an
|
||||
/// error
|
||||
pub type LocalHandlerFn<M> =
|
||||
Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>;
|
||||
|
||||
/// Client used to send RPC
|
||||
pub struct RpcClient<M: RpcMessage> {
|
||||
status: watch::Receiver<Arc<Status>>,
|
||||
background: Arc<BackgroundRunner>,
|
||||
|
@ -64,6 +76,7 @@ pub struct RpcClient<M: RpcMessage> {
|
|||
}
|
||||
|
||||
impl<M: RpcMessage + 'static> RpcClient<M> {
|
||||
/// Create a new RpcClient from an address, a job runner, and the status of all RPC servers
|
||||
pub fn new(
|
||||
rac: RpcAddrClient<M>,
|
||||
background: Arc<BackgroundRunner>,
|
||||
|
@ -77,6 +90,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
|||
})
|
||||
}
|
||||
|
||||
/// Set the local handler, to process RPC to this node without network usage
|
||||
pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F)
|
||||
where
|
||||
F: Fn(Arc<M>) -> Fut + Send + Sync + 'static,
|
||||
|
@ -90,14 +104,17 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
|||
self.local_handler.swap(Some(Arc::new((my_id, handler))));
|
||||
}
|
||||
|
||||
/// Get a RPC client to make calls using node's SocketAddr instead of its ID
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
Please don't call this Please don't call this `get_addr`, it's not what it does ! It does not return a SocketAddr or something like that. It is used to get the underlying RpcAddrClient which allows for RPC by specifing the target node's SocketAddr instead of their node IDs. Also, the comment is wrong.
|
||||
pub fn by_addr(&self) -> &RpcAddrClient<M> {
|
||||
&self.rpc_addr_client
|
||||
}
|
||||
|
||||
/// Make a RPC call
|
||||
pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> {
|
||||
self.call_arc(to, Arc::new(msg), timeout).await
|
||||
}
|
||||
|
||||
/// Make a RPC call from a message stored in an Arc
|
||||
pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
|
||||
if let Some(lh) = self.local_handler.load_full() {
|
||||
let (my_id, local_handler) = lh.as_ref();
|
||||
|
@ -135,6 +152,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Make a RPC call to multiple servers, returning a Vec containing each result
|
||||
pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
|
||||
let msg = Arc::new(msg);
|
||||
let mut resp_stream = to
|
||||
|
@ -149,6 +167,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
|||
results
|
||||
}
|
||||
|
||||
/// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
too many* too many*
|
||||
/// strategy could not be respected due to too many errors
|
||||
pub async fn try_call_many(
|
||||
self: &Arc<Self>,
|
||||
to: &[UUID],
|
||||
|
@ -208,6 +228,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
|||
}
|
||||
}
|
||||
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
No. This is an RPC client which contains only the necessary logic to call RPC's to nodes by specifying their SocketAddr, and not by specifying their node IDs. It is used as an underlying layer for RpcClient. No. This is an RPC client which contains only the necessary logic to call RPC's to nodes by specifying their SocketAddr, and not by specifying their node IDs. It is used as an underlying layer for RpcClient.
|
||||
/// Thin wrapper arround an `RpcHttpClient` specifying the path of the request
|
||||
pub struct RpcAddrClient<M: RpcMessage> {
|
||||
phantom: PhantomData<M>,
|
||||
|
||||
|
@ -216,6 +237,7 @@ pub struct RpcAddrClient<M: RpcMessage> {
|
|||
}
|
||||
|
||||
impl<M: RpcMessage> RpcAddrClient<M> {
|
||||
/// Create an RpcAddrClient from an HTTP client and the endpoint to reach for RPCs
|
||||
pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self {
|
||||
Self {
|
||||
phantom: PhantomData::default(),
|
||||
|
@ -224,6 +246,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Make a RPC
|
||||
pub async fn call<MB>(
|
||||
&self,
|
||||
to_addr: &SocketAddr,
|
||||
|
@ -239,6 +262,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
|
|||
}
|
||||
}
|
||||
|
||||
/// HTTP client used to make RPCs
|
||||
pub struct RpcHttpClient {
|
||||
request_limiter: Semaphore,
|
||||
method: ClientMethod,
|
||||
|
@ -250,6 +274,7 @@ enum ClientMethod {
|
|||
}
|
||||
|
||||
impl RpcHttpClient {
|
||||
/// Create a new RpcHttpClient
|
||||
pub fn new(
|
||||
max_concurrent_requests: usize,
|
||||
tls_config: &Option<TlsConfig>,
|
||||
|
@ -280,6 +305,7 @@ impl RpcHttpClient {
|
|||
})
|
||||
}
|
||||
|
||||
/// Make a RPC
|
||||
async fn call<M, MB>(
|
||||
&self,
|
||||
path: &str,
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
//! Contains structs related to receiving RPCs
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
|
@ -22,13 +23,17 @@ use garage_util::error::Error;
|
|||
|
||||
use crate::tls_util;
|
||||
|
||||
/// Trait for messages that can be sent as RPC
|
||||
pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {}
|
||||
|
||||
type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>;
|
||||
type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>;
|
||||
|
||||
/// Structure handling RPCs
|
||||
pub struct RpcServer {
|
||||
/// The address the RpcServer will bind
|
||||
pub bind_addr: SocketAddr,
|
||||
/// The tls configuration used for RPC
|
||||
pub tls_config: Option<TlsConfig>,
|
||||
|
||||
handlers: HashMap<String, Handler>,
|
||||
|
@ -87,6 +92,7 @@ where
|
|||
}
|
||||
|
||||
impl RpcServer {
|
||||
/// Create a new RpcServer
|
||||
pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self {
|
||||
Self {
|
||||
bind_addr,
|
||||
|
@ -95,6 +101,7 @@ impl RpcServer {
|
|||
}
|
||||
}
|
||||
|
||||
/// Add handler handling request made to `name`
|
||||
pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F)
|
||||
where
|
||||
M: RpcMessage + 'static,
|
||||
|
@ -156,6 +163,7 @@ impl RpcServer {
|
|||
}
|
||||
}
|
||||
|
||||
/// Run the RpcServer
|
||||
pub async fn run(
|
||||
self: Arc<Self>,
|
||||
shutdown_signal: impl Future<Output = ()>,
|
||||
|
|
|
@ -34,7 +34,7 @@ use crate::crdt::crdt::*;
|
|||
/// and may differ from what you observed with your atomic clock!
|
||||
///
|
||||
/// This scheme is used by AWS S3 or Soundcloud and often without knowing
|
||||
/// in entreprise when reconciliating databases with ad-hoc scripts.
|
||||
/// in enterprise when reconciliating databases with ad-hoc scripts.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct LWW<T> {
|
||||
ts: u64,
|
||||
|
|
|
@ -37,6 +37,7 @@ where
|
|||
Self { vals: vec![(k, v)] }
|
||||
}
|
||||
|
||||
/// Add a value to the map
|
||||
pub fn put(&mut self, k: K, v: V) {
|
||||
self.merge(&Self::put_mutator(k, v));
|
||||
}
|
||||
|
|
|
@ -74,7 +74,7 @@ where
|
|||
while !*must_exit.borrow() {
|
||||
match self.gc_loop_iter().await {
|
||||
Ok(true) => {
|
||||
// Stuff was done, loop imediately
|
||||
// Stuff was done, loop immediately
|
||||
continue;
|
||||
}
|
||||
Ok(false) => {
|
||||
|
|
|
@ -8,10 +8,10 @@ pub mod schema;
|
|||
pub mod util;
|
||||
|
||||
pub mod data;
|
||||
pub mod gc;
|
||||
pub mod merkle;
|
||||
mod gc;
|
||||
mod merkle;
|
||||
pub mod replication;
|
||||
pub mod sync;
|
||||
mod sync;
|
||||
pub mod table;
|
||||
|
||||
pub use schema::*;
|
||||
|
|
|
@ -6,19 +6,19 @@ use garage_util::data::*;
|
|||
|
||||
use crate::replication::*;
|
||||
|
||||
/// Full replication schema: all nodes store everything
|
||||
/// Writes are disseminated in an epidemic manner in the network
|
||||
/// Advantage: do all reads locally, extremely fast
|
||||
/// Inconvenient: only suitable to reasonably small tables
|
||||
#[derive(Clone)]
|
||||
pub struct TableFullReplication {
|
||||
/// The membership manager of this node
|
||||
pub system: Arc<System>,
|
||||
/// Max number of faults allowed while replicating a record
|
||||
pub max_faults: usize,
|
||||
}
|
||||
|
||||
impl TableReplication for TableFullReplication {
|
||||
// Full replication schema: all nodes store everything
|
||||
// Writes are disseminated in an epidemic manner in the network
|
||||
|
||||
// Advantage: do all reads locally, extremely fast
|
||||
// Inconvenient: only suitable to reasonably small tables
|
||||
|
||||
fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> {
|
||||
vec![self.system.id]
|
||||
}
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
mod parameters;
|
||||
|
||||
pub mod fullcopy;
|
||||
pub mod sharded;
|
||||
mod fullcopy;
|
||||
mod sharded;
|
||||
|
||||
pub use fullcopy::TableFullReplication;
|
||||
pub use parameters::*;
|
||||
pub use sharded::TableShardedReplication;
|
||||
|
|
|
@ -2,20 +2,25 @@ use garage_rpc::ring::*;
|
|||
|
||||
use garage_util::data::*;
|
||||
|
||||
/// Trait to describe how a table shall be replicated
|
||||
pub trait TableReplication: Send + Sync {
|
||||
// See examples in table_sharded.rs and table_fullcopy.rs
|
||||
// To understand various replication methods
|
||||
|
||||
// Which nodes to send reads from
|
||||
/// Which nodes to send read requests to
|
||||
fn read_nodes(&self, hash: &Hash) -> Vec<UUID>;
|
||||
/// Responses needed to consider a read succesfull
|
||||
fn read_quorum(&self) -> usize;
|
||||
|
||||
// Which nodes to send writes to
|
||||
/// Which nodes to send writes to
|
||||
fn write_nodes(&self, hash: &Hash) -> Vec<UUID>;
|
||||
/// Responses needed to consider a write succesfull
|
||||
fn write_quorum(&self) -> usize;
|
||||
fn max_write_errors(&self) -> usize;
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
Most of the time it is, but not always for fully replicated tables Most of the time it is, but not always for fully replicated tables
|
||||
|
||||
// Accessing partitions, for Merkle tree & sync
|
||||
/// Get partition for data with given hash
|
||||
fn partition_of(&self, hash: &Hash) -> Partition;
|
||||
/// List of existing partitions
|
||||
fn partitions(&self) -> Vec<(Partition, Hash)>;
|
||||
}
|
||||
|
|
|
@ -6,22 +6,25 @@ use garage_util::data::*;
|
|||
|
||||
use crate::replication::*;
|
||||
|
||||
/// Sharded replication schema:
|
||||
/// - based on the ring of nodes, a certain set of neighbors
|
||||
/// store entries, given as a function of the position of the
|
||||
/// entry's hash in the ring
|
||||
/// - reads are done on all of the nodes that replicate the data
|
||||
/// - writes as well
|
||||
#[derive(Clone)]
|
||||
pub struct TableShardedReplication {
|
||||
/// The membership manager of this node
|
||||
pub system: Arc<System>,
|
||||
/// How many time each data should be replicated
|
||||
pub replication_factor: usize,
|
||||
/// How many nodes to contact for a read, should be at most `replication_factor`
|
||||
pub read_quorum: usize,
|
||||
/// How many nodes to contact for a write, should be at most `replication_factor`
|
||||
pub write_quorum: usize,
|
||||
}
|
||||
|
||||
impl TableReplication for TableShardedReplication {
|
||||
// Sharded replication schema:
|
||||
// - based on the ring of nodes, a certain set of neighbors
|
||||
// store entries, given as a function of the position of the
|
||||
// entry's hash in the ring
|
||||
// - reads are done on all of the nodes that replicate the data
|
||||
// - writes as well
|
||||
|
||||
fn read_nodes(&self, hash: &Hash) -> Vec<UUID> {
|
||||
let ring = self.system.ring.borrow().clone();
|
||||
ring.walk_ring(&hash, self.replication_factor)
|
||||
|
|
|
@ -4,7 +4,9 @@ use garage_util::data::*;
|
|||
|
||||
use crate::crdt::CRDT;
|
||||
|
||||
/// Trait for field used to partition data
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
It's not a trait for the data, but only for a field of the data that is used to partition it. It's not a trait for the data, but only for a field of the data that is used to partition it.
|
||||
pub trait PartitionKey {
|
||||
/// Get the key used to partition
|
||||
fn hash(&self) -> Hash;
|
||||
}
|
||||
|
||||
|
@ -20,7 +22,9 @@ impl PartitionKey for Hash {
|
|||
}
|
||||
}
|
||||
|
||||
/// Trait for field used to sort data
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
Same as above, it's only a trait of one of the fields of the data Same as above, it's only a trait of one of the fields of the data
|
||||
pub trait SortKey {
|
||||
/// Get the key used to sort
|
||||
fn sort_key(&self) -> &[u8];
|
||||
}
|
||||
|
||||
|
@ -36,25 +40,34 @@ impl SortKey for Hash {
|
|||
}
|
||||
}
|
||||
|
||||
/// Trait for an entry in a table. It must be sortable and partitionnable.
|
||||
pub trait Entry<P: PartitionKey, S: SortKey>:
|
||||
CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
|
||||
{
|
||||
/// Get the key used to partition
|
||||
fn partition_key(&self) -> &P;
|
||||
/// Get the key used to sort
|
||||
fn sort_key(&self) -> &S;
|
||||
|
||||
/// Is the entry a tombstone? Default implementation always return false
|
||||
fn is_tombstone(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Trait for the schema used in a table
|
||||
pub trait TableSchema: Send + Sync {
|
||||
/// The partition key used in that table
|
||||
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||
/// The sort key used int that table
|
||||
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||
/// They type for an entry in that table
|
||||
type E: Entry<Self::P, Self::S>;
|
||||
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||
|
||||
// Action to take if not able to decode current version:
|
||||
// try loading from an older version
|
||||
/// Try migrating an entry from an older version
|
||||
fn try_migrate(_bytes: &[u8]) -> Option<Self::E> {
|
||||
None
|
||||
}
|
||||
|
@ -65,7 +78,5 @@ pub trait TableSchema: Send + Sync {
|
|||
// to stderr.
|
||||
fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {}
|
||||
lx
commented
Why remove empty implementation? Why remove empty implementation?
trinity-1686a
commented
As explained above, I feel like a default implementation should be what we want in most case. Of 5 implementations of this trait, only 2 actually use an empty As explained above, I feel like a default implementation should be what we want in most case. Of 5 implementations of this trait, only 2 actually use an empty `updated`, so being explicit about it seems to me like a better option.
lx
commented
Idk, to me the reasonning was that Idk, to me the reasonning was that `updated()` is implemented when we need to add logic to do something when an entry changes, but if we don't need to add anything we shouldn't have to write an empty `updated()` handler. I don't see how having an empty default implementation can cause much confusion as it does literally nothing.
|
||||
|
||||
fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
|
||||
true
|
||||
}
|
||||
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool;
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
//! Job runner for futures and async functions
|
||||
use core::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
@ -12,14 +13,15 @@ use crate::error::Error;
|
|||
type JobOutput = Result<(), Error>;
|
||||
type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
|
||||
|
||||
/// Job runner for futures and async functions
|
||||
pub struct BackgroundRunner {
|
||||
pub stop_signal: watch::Receiver<bool>,
|
||||
|
||||
stop_signal: watch::Receiver<bool>,
|
||||
queue_in: mpsc::UnboundedSender<(Job, bool)>,
|
||||
worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl BackgroundRunner {
|
||||
/// Create a new BackgroundRunner
|
||||
pub fn new(
|
||||
n_runners: usize,
|
||||
stop_signal: watch::Receiver<bool>,
|
||||
|
@ -103,7 +105,7 @@ impl BackgroundRunner {
|
|||
(bgrunner, await_all_done)
|
||||
}
|
||||
|
||||
// Spawn a task to be run in background
|
||||
/// Spawn a task to be run in background
|
||||
pub fn spawn<T>(&self, job: T)
|
||||
where
|
||||
T: Future<Output = JobOutput> + Send + 'static,
|
||||
|
@ -115,6 +117,8 @@ impl BackgroundRunner {
|
|||
.unwrap();
|
||||
}
|
||||
|
||||
/// Spawn a task to be run in background. It may get discarded before running if spawned while
|
||||
/// the runner is stopping
|
||||
pub fn spawn_cancellable<T>(&self, job: T)
|
||||
where
|
||||
T: Future<Output = JobOutput> + Send + 'static,
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
//! Contains type and functions related to Garage configuration file
|
||||
use std::io::Read;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
|
@ -6,57 +7,82 @@ use serde::{de, Deserialize};
|
|||
|
||||
use crate::error::Error;
|
||||
|
||||
/// Represent the whole configuration
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct Config {
|
||||
/// Path where to store metadata. Should be fast, but low volume
|
||||
pub metadata_dir: PathBuf,
|
||||
/// Path where to store data. Can be slower, but need higher volume
|
||||
pub data_dir: PathBuf,
|
||||
|
||||
/// Address to bind for RPC
|
||||
pub rpc_bind_addr: SocketAddr,
|
||||
|
||||
/// Bootstrap peers RPC address
|
||||
#[serde(deserialize_with = "deserialize_vec_addr")]
|
||||
pub bootstrap_peers: Vec<SocketAddr>,
|
||||
/// Consule host to connect to to discover more peers
|
||||
pub consul_host: Option<String>,
|
||||
/// Consul service name to use
|
||||
pub consul_service_name: Option<String>,
|
||||
|
||||
/// Max number of concurrent RPC request
|
||||
#[serde(default = "default_max_concurrent_rpc_requests")]
|
||||
pub max_concurrent_rpc_requests: usize,
|
||||
|
||||
/// Size of data blocks to save to disk
|
||||
#[serde(default = "default_block_size")]
|
||||
pub block_size: usize,
|
||||
|
||||
#[serde(default = "default_control_write_max_faults")]
|
||||
pub control_write_max_faults: usize,
|
||||
|
||||
/// How many nodes should hold a copy of meta data
|
||||
#[serde(default = "default_replication_factor")]
|
||||
pub meta_replication_factor: usize,
|
||||
|
||||
/// How many nodes should hold a copy of data
|
||||
#[serde(default = "default_replication_factor")]
|
||||
pub data_replication_factor: usize,
|
||||
|
||||
/// Configuration for RPC TLS
|
||||
pub rpc_tls: Option<TlsConfig>,
|
||||
|
||||
/// Configuration for S3 api
|
||||
pub s3_api: ApiConfig,
|
||||
|
||||
/// Configuration for serving files as normal web server
|
||||
pub s3_web: WebConfig,
|
||||
}
|
||||
|
||||
/// Configuration for RPC TLS
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct TlsConfig {
|
||||
/// Path to certificate autority used for all nodes
|
||||
pub ca_cert: String,
|
||||
/// Path to public certificate for this node
|
||||
pub node_cert: String,
|
||||
/// Path to private key for this node
|
||||
pub node_key: String,
|
||||
}
|
||||
|
||||
/// Configuration for S3 api
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct ApiConfig {
|
||||
/// Address and port to bind for api serving
|
||||
pub api_bind_addr: SocketAddr,
|
||||
/// S3 region to use
|
||||
pub s3_region: String,
|
||||
}
|
||||
|
||||
/// Configuration for serving files as normal web server
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct WebConfig {
|
||||
/// Address and port to bind for web serving
|
||||
pub bind_addr: SocketAddr,
|
||||
/// Suffix to remove from domain name to find bucket
|
||||
pub root_domain: String,
|
||||
/// Suffix to add when user-agent request path end with "/"
|
||||
pub index: String,
|
||||
}
|
||||
|
||||
|
@ -73,6 +99,7 @@ fn default_control_write_max_faults() -> usize {
|
|||
1
|
||||
}
|
||||
|
||||
/// Read and parse configuration
|
||||
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
|
||||
let mut file = std::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
//! Contains common types and functions related to serialization and integrity
|
||||
use rand::Rng;
|
||||
use serde::de::{self, Visitor};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use std::fmt;
|
||||
|
||||
/// An array of 32 bytes
|
||||
#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)]
|
||||
pub struct FixedBytes32([u8; 32]);
|
||||
|
||||
|
@ -61,15 +63,20 @@ impl Serialize for FixedBytes32 {
|
|||
}
|
||||
|
||||
impl FixedBytes32 {
|
||||
/// Access the content as a slice
|
||||
pub fn as_slice(&self) -> &[u8] {
|
||||
&self.0[..]
|
||||
}
|
||||
/// Access the content as a mutable slice
|
||||
pub fn as_slice_mut(&mut self) -> &mut [u8] {
|
||||
&mut self.0[..]
|
||||
}
|
||||
/// Copy to a slice
|
||||
pub fn to_vec(&self) -> Vec<u8> {
|
||||
self.0.to_vec()
|
||||
}
|
||||
/// Try building a FixedBytes32 from a slice
|
||||
/// Return None if the slice is not 32 bytes long
|
||||
pub fn try_from(by: &[u8]) -> Option<Self> {
|
||||
if by.len() != 32 {
|
||||
return None;
|
||||
|
@ -80,9 +87,12 @@ impl FixedBytes32 {
|
|||
}
|
||||
}
|
||||
|
||||
/// A 32 bytes UUID
|
||||
pub type UUID = FixedBytes32;
|
||||
/// A 256 bit cryptographic hash, can be sha256 or blake2 depending on provenance
|
||||
pub type Hash = FixedBytes32;
|
||||
|
||||
/// Compute the sha256 of a slice
|
||||
pub fn sha256sum(data: &[u8]) -> Hash {
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
|
@ -93,6 +103,7 @@ pub fn sha256sum(data: &[u8]) -> Hash {
|
|||
hash.into()
|
||||
}
|
||||
|
||||
/// Compute the blake2 of a slice
|
||||
pub fn blake2sum(data: &[u8]) -> Hash {
|
||||
use blake2::{Blake2b, Digest};
|
||||
|
||||
|
@ -103,8 +114,10 @@ pub fn blake2sum(data: &[u8]) -> Hash {
|
|||
hash.into()
|
||||
}
|
||||
|
||||
/// A 64 bit non cryptographic hash
|
||||
pub type FastHash = u64;
|
||||
|
||||
/// Compute a (non cryptographic) of a slice
|
||||
pub fn fasthash(data: &[u8]) -> FastHash {
|
||||
use xxhash_rust::xxh3::Xxh3;
|
||||
|
||||
|
@ -113,12 +126,14 @@ pub fn fasthash(data: &[u8]) -> FastHash {
|
|||
h.digest()
|
||||
}
|
||||
|
||||
/// Generate a random 32 bytes UUID
|
||||
pub fn gen_uuid() -> UUID {
|
||||
rand::thread_rng().gen::<[u8; 32]>().into()
|
||||
}
|
||||
|
||||
// RMP serialization with names of fields and variants
|
||||
|
||||
/// Serialize to MessagePack
|
||||
pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
|
||||
where
|
||||
T: Serialize + ?Sized,
|
||||
|
@ -131,10 +146,13 @@ where
|
|||
Ok(wr)
|
||||
}
|
||||
|
||||
/// Serialize to JSON, truncating long result
|
||||
pub fn debug_serialize<T: Serialize>(x: T) -> String {
|
||||
match serde_json::to_string(&x) {
|
||||
Ok(ss) => {
|
||||
if ss.len() > 100 {
|
||||
// TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
|
||||
// (or more) codepoint
|
||||
ss[..100].to_string()
|
||||
} else {
|
||||
ss
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
//! Module containing error types used in Garage
|
||||
use err_derive::Error;
|
||||
use hyper::StatusCode;
|
||||
use std::io;
|
||||
|
||||
use crate::data::*;
|
||||
|
||||
/// RPC related errors
|
||||
#[derive(Debug, Error)]
|
||||
pub enum RPCError {
|
||||
#[error(display = "Node is down: {:?}.", _0)]
|
||||
|
@ -28,6 +30,7 @@ pub enum RPCError {
|
|||
TooManyErrors(Vec<String>),
|
||||
}
|
||||
|
||||
/// Regroup all Garage errors
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error(display = "IO error: {}", _0)]
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
//! Crate containing common functions and types used in Garage
|
||||
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
//! Module containing helper functions to manipulate time
|
||||
use chrono::{SecondsFormat, TimeZone, Utc};
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
/// Returns milliseconds since UNIX Epoch
|
||||
pub fn now_msec() -> u64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
|
@ -8,6 +10,8 @@ pub fn now_msec() -> u64 {
|
|||
.as_millis() as u64
|
||||
}
|
||||
|
||||
/// Convert a timestamp represented as milliseconds since UNIX Epoch to
|
||||
/// its RFC3339 representation, such as "2021-01-01T12:30:00Z"
|
||||
pub fn msec_to_rfc3339(msecs: u64) -> String {
|
||||
let secs = msecs as i64 / 1000;
|
||||
let nanos = (msecs as i64 % 1000) as u32 * 1_000_000;
|
||||
|
|
|
@ -3,30 +3,37 @@ use hyper::StatusCode;
|
|||
|
||||
use garage_util::error::Error as GarageError;
|
||||
|
||||
/// Errors of this crate
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
/// An error received from the API crate
|
||||
#[error(display = "API error: {}", _0)]
|
||||
ApiError(#[error(source)] garage_api::error::Error),
|
||||
ApiError(#[error(source)] garage_api::Error),
|
||||
|
||||
// Category: internal error
|
||||
/// Error internal to garage
|
||||
#[error(display = "Internal error: {}", _0)]
|
||||
InternalError(#[error(source)] GarageError),
|
||||
|
||||
/// The file does not exist
|
||||
#[error(display = "Not found")]
|
||||
NotFound,
|
||||
|
||||
// Category: bad request
|
||||
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
|
||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
see garage_api/error.rs see garage_api/error.rs
|
||||
#[error(display = "Invalid UTF-8: {}", _0)]
|
||||
InvalidUTF8(#[error(source)] std::str::Utf8Error),
|
||||
|
||||
/// The client send a header with invalid value
|
||||
#[error(display = "Invalid header value: {}", _0)]
|
||||
InvalidHeader(#[error(source)] hyper::header::ToStrError),
|
||||
|
||||
/// The client sent a request without host, or with unsupported method
|
||||
#[error(display = "Bad request: {}", _0)]
|
||||
BadRequest(String),
|
||||
}
|
||||
|
||||
impl Error {
|
||||
/// Transform errors into http status code
|
||||
pub fn http_status_code(&self) -> StatusCode {
|
||||
match self {
|
||||
Error::NotFound => StatusCode::NOT_FOUND,
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
//! Crate for handling web serving of s3 bucket
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
pub mod error;
|
||||
mod error;
|
||||
pub use error::Error;
|
||||
|
||||
pub mod web_server;
|
||||
mod web_server;
|
||||
pub use web_server::run_web_server;
|
||||
|
|
|
@ -18,6 +18,7 @@ use garage_model::garage::Garage;
|
|||
use garage_table::*;
|
||||
use garage_util::error::Error as GarageError;
|
||||
|
||||
/// Run a web server
|
||||
pub async fn run_web_server(
|
||||
garage: Arc<Garage>,
|
||||
shutdown_signal: impl Future<Output = ()>,
|
||||
|
|
the request contained an invalid UTF-8 sequence in its path or in other parameters
would be more accurate