add doc comments #53
38 changed files with 393 additions and 79 deletions
|
@ -20,6 +20,7 @@ use crate::s3_get::*;
|
||||||
use crate::s3_list::*;
|
use crate::s3_list::*;
|
||||||
use crate::s3_put::*;
|
use crate::s3_put::*;
|
||||||
|
|
||||||
|
/// Run the S3 API server
|
||||||
pub async fn run_api_server(
|
pub async fn run_api_server(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
shutdown_signal: impl Future<Output = ()>,
|
shutdown_signal: impl Future<Output = ()>,
|
||||||
|
|
|
@ -1,9 +1,13 @@
|
||||||
|
//! Module containing various helpers for encoding
|
||||||
|
|
||||||
|
/// Escape &str for xml inclusion
|
||||||
pub fn xml_escape(s: &str) -> String {
|
pub fn xml_escape(s: &str) -> String {
|
||||||
s.replace("<", "<")
|
s.replace("<", "<")
|
||||||
.replace(">", ">")
|
.replace(">", ">")
|
||||||
.replace("\"", """)
|
.replace("\"", """)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Encode &str for use in a URI
|
||||||
pub fn uri_encode(string: &str, encode_slash: bool) -> String {
|
pub fn uri_encode(string: &str, encode_slash: bool) -> String {
|
||||||
let mut result = String::with_capacity(string.len() * 2);
|
let mut result = String::with_capacity(string.len() * 2);
|
||||||
for c in string.chars() {
|
for c in string.chars() {
|
||||||
|
@ -24,6 +28,7 @@ pub fn uri_encode(string: &str, encode_slash: bool) -> String {
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Encode &str either as an uri, or a valid string for xml inclusion
|
||||||
pub fn xml_encode_key(k: &str, urlencode: bool) -> String {
|
pub fn xml_encode_key(k: &str, urlencode: bool) -> String {
|
||||||
if urlencode {
|
if urlencode {
|
||||||
uri_encode(k, true)
|
uri_encode(k, true)
|
||||||
|
|
|
@ -3,44 +3,57 @@ use hyper::StatusCode;
|
||||||
|
|
||||||
use garage_util::error::Error as GarageError;
|
use garage_util::error::Error as GarageError;
|
||||||
|
|
||||||
|
/// Errors of this crate
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
// Category: internal error
|
// Category: internal error
|
||||||
|
/// Error related to deeper parts of Garage
|
||||||
#[error(display = "Internal error: {}", _0)]
|
#[error(display = "Internal error: {}", _0)]
|
||||||
InternalError(#[error(source)] GarageError),
|
InternalError(#[error(source)] GarageError),
|
||||||
|
|
||||||
|
/// Error related to Hyper
|
||||||
#[error(display = "Internal error (Hyper error): {}", _0)]
|
#[error(display = "Internal error (Hyper error): {}", _0)]
|
||||||
Hyper(#[error(source)] hyper::Error),
|
Hyper(#[error(source)] hyper::Error),
|
||||||
|
|
||||||
|
/// Error related to HTTP
|
||||||
#[error(display = "Internal error (HTTP error): {}", _0)]
|
#[error(display = "Internal error (HTTP error): {}", _0)]
|
||||||
HTTP(#[error(source)] http::Error),
|
HTTP(#[error(source)] http::Error),
|
||||||
|
|
||||||
// Category: cannot process
|
// Category: cannot process
|
||||||
|
/// No proper api key was used, or the signature was invalid
|
||||||
#[error(display = "Forbidden: {}", _0)]
|
#[error(display = "Forbidden: {}", _0)]
|
||||||
Forbidden(String),
|
Forbidden(String),
|
||||||
|
|
||||||
|
/// The object requested don't exists
|
||||||
#[error(display = "Not found")]
|
#[error(display = "Not found")]
|
||||||
NotFound,
|
NotFound,
|
||||||
|
|
||||||
// Category: bad request
|
// Category: bad request
|
||||||
|
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
|
||||||
#[error(display = "Invalid UTF-8: {}", _0)]
|
#[error(display = "Invalid UTF-8: {}", _0)]
|
||||||
InvalidUTF8Str(#[error(source)] std::str::Utf8Error),
|
InvalidUTF8Str(#[error(source)] std::str::Utf8Error),
|
||||||
|
|
||||||
|
/// The request used an invalid path
|
||||||
#[error(display = "Invalid UTF-8: {}", _0)]
|
#[error(display = "Invalid UTF-8: {}", _0)]
|
||||||
InvalidUTF8String(#[error(source)] std::string::FromUtf8Error),
|
InvalidUTF8String(#[error(source)] std::string::FromUtf8Error),
|
||||||
|
|
||||||
|
/// Some base64 encoded data was badly encoded
|
||||||
#[error(display = "Invalid base64: {}", _0)]
|
#[error(display = "Invalid base64: {}", _0)]
|
||||||
InvalidBase64(#[error(source)] base64::DecodeError),
|
InvalidBase64(#[error(source)] base64::DecodeError),
|
||||||
|
|
||||||
|
/// The client sent invalid XML data
|
||||||
#[error(display = "Invalid XML: {}", _0)]
|
#[error(display = "Invalid XML: {}", _0)]
|
||||||
InvalidXML(String),
|
InvalidXML(String),
|
||||||
|
|
||||||
|
/// The client sent a header with invalid value
|
||||||
#[error(display = "Invalid header value: {}", _0)]
|
#[error(display = "Invalid header value: {}", _0)]
|
||||||
InvalidHeader(#[error(source)] hyper::header::ToStrError),
|
InvalidHeader(#[error(source)] hyper::header::ToStrError),
|
||||||
|
|
||||||
|
/// The client sent a range header with invalid value
|
||||||
#[error(display = "Invalid HTTP range: {:?}", _0)]
|
#[error(display = "Invalid HTTP range: {:?}", _0)]
|
||||||
InvalidRange(#[error(from)] http_range::HttpRangeParseError),
|
InvalidRange(#[error(from)] http_range::HttpRangeParseError),
|
||||||
|
|
||||||
|
/// The client sent an invalid request
|
||||||
#[error(display = "Bad request: {}", _0)]
|
#[error(display = "Bad request: {}", _0)]
|
||||||
BadRequest(String),
|
BadRequest(String),
|
||||||
}
|
}
|
||||||
|
@ -52,6 +65,7 @@ impl From<roxmltree::Error> for Error {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Error {
|
impl Error {
|
||||||
|
/// Get the HTTP status code that best represents the meaning of the error for the client
|
||||||
pub fn http_status_code(&self) -> StatusCode {
|
pub fn http_status_code(&self) -> StatusCode {
|
||||||
match self {
|
match self {
|
||||||
Error::NotFound => StatusCode::NOT_FOUND,
|
Error::NotFound => StatusCode::NOT_FOUND,
|
||||||
|
@ -65,6 +79,7 @@ impl Error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Trait to map error to the Bad Request error code
|
||||||
pub trait OkOrBadRequest {
|
pub trait OkOrBadRequest {
|
||||||
type S2;
|
type S2;
|
||||||
fn ok_or_bad_request(self, reason: &'static str) -> Self::S2;
|
fn ok_or_bad_request(self, reason: &'static str) -> Self::S2;
|
||||||
|
@ -93,6 +108,7 @@ impl<T> OkOrBadRequest for Option<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Trait to map an error to an Internal Error code
|
||||||
pub trait OkOrInternalError {
|
pub trait OkOrInternalError {
|
||||||
type S2;
|
type S2;
|
||||||
fn ok_or_internal_error(self, reason: &'static str) -> Self::S2;
|
fn ok_or_internal_error(self, reason: &'static str) -> Self::S2;
|
||||||
|
|
|
@ -1,15 +1,19 @@
|
||||||
|
//! Crate for serving a S3 compatible API
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
|
||||||
pub mod error;
|
mod error;
|
||||||
|
pub use error::Error;
|
||||||
|
|
||||||
pub mod encoding;
|
mod encoding;
|
||||||
|
|
||||||
pub mod api_server;
|
mod api_server;
|
||||||
pub mod signature;
|
pub use api_server::run_api_server;
|
||||||
|
|
||||||
pub mod s3_copy;
|
mod signature;
|
||||||
pub mod s3_delete;
|
|
||||||
|
mod s3_copy;
|
||||||
|
mod s3_delete;
|
||||||
pub mod s3_get;
|
pub mod s3_get;
|
||||||
pub mod s3_list;
|
mod s3_list;
|
||||||
pub mod s3_put;
|
mod s3_put;
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
//! Function related to GET and HEAD requests
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, UNIX_EPOCH};
|
use std::time::{Duration, UNIX_EPOCH};
|
||||||
|
|
||||||
|
@ -79,6 +80,7 @@ fn try_answer_cached(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Handle HEAD request
|
||||||
pub async fn handle_head(
|
pub async fn handle_head(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
req: &Request<Body>,
|
req: &Request<Body>,
|
||||||
|
@ -118,6 +120,7 @@ pub async fn handle_head(
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Handle GET request
|
||||||
pub async fn handle_get(
|
pub async fn handle_get(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
req: &Request<Body>,
|
req: &Request<Body>,
|
||||||
|
@ -224,7 +227,7 @@ pub async fn handle_get(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_get_range(
|
async fn handle_get_range(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
version: &ObjectVersion,
|
version: &ObjectVersion,
|
||||||
version_data: &ObjectVersionData,
|
version_data: &ObjectVersionData,
|
||||||
|
|
|
@ -80,7 +80,7 @@ pub struct ConfigureNodeOpt {
|
||||||
#[structopt(short = "c", long = "capacity")]
|
#[structopt(short = "c", long = "capacity")]
|
||||||
capacity: Option<u32>,
|
capacity: Option<u32>,
|
||||||
|
|
||||||
/// Optionnal node tag
|
/// Optional node tag
|
||||||
#[structopt(short = "t", long = "tag")]
|
#[structopt(short = "t", long = "tag")]
|
||||||
tag: Option<String>,
|
tag: Option<String>,
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
#![recursion_limit = "1024"]
|
#![recursion_limit = "1024"]
|
||||||
|
//! Garage CLI, used to interact with a running Garage instance, and to launch a Garage instance
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
@ -25,7 +26,7 @@ use cli::*;
|
||||||
|
|
||||||
#[derive(StructOpt, Debug)]
|
#[derive(StructOpt, Debug)]
|
||||||
#[structopt(name = "garage")]
|
#[structopt(name = "garage")]
|
||||||
pub struct Opt {
|
struct Opt {
|
||||||
/// RPC connect to this host to execute client operations
|
/// RPC connect to this host to execute client operations
|
||||||
#[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")]
|
#[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")]
|
||||||
pub rpc_host: SocketAddr,
|
pub rpc_host: SocketAddr,
|
||||||
|
|
|
@ -8,10 +8,10 @@ use garage_util::background::*;
|
||||||
use garage_util::config::*;
|
use garage_util::config::*;
|
||||||
use garage_util::error::Error;
|
use garage_util::error::Error;
|
||||||
|
|
||||||
use garage_api::api_server;
|
use garage_api::run_api_server;
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
use garage_rpc::rpc_server::RpcServer;
|
use garage_rpc::rpc_server::RpcServer;
|
||||||
use garage_web::web_server;
|
use garage_web::run_web_server;
|
||||||
|
|
||||||
use crate::admin_rpc::*;
|
use crate::admin_rpc::*;
|
||||||
|
|
||||||
|
@ -62,8 +62,8 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
|
||||||
|
|
||||||
info!("Initializing RPC and API servers...");
|
info!("Initializing RPC and API servers...");
|
||||||
let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone()));
|
let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone()));
|
||||||
let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
|
let api_server = run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
|
||||||
let web_server = web_server::run_web_server(garage, wait_from(watch_cancel.clone()));
|
let web_server = run_web_server(garage, wait_from(watch_cancel.clone()));
|
||||||
|
|
||||||
futures::try_join!(
|
futures::try_join!(
|
||||||
bootstrap.map(|rv| {
|
bootstrap.map(|rv| {
|
||||||
|
|
|
@ -18,12 +18,13 @@ use garage_rpc::membership::System;
|
||||||
use garage_rpc::rpc_client::*;
|
use garage_rpc::rpc_client::*;
|
||||||
use garage_rpc::rpc_server::*;
|
use garage_rpc::rpc_server::*;
|
||||||
|
|
||||||
use garage_table::replication::{sharded::TableShardedReplication, TableReplication};
|
use garage_table::replication::{TableReplication, TableShardedReplication};
|
||||||
|
|
||||||
use crate::block_ref_table::*;
|
use crate::block_ref_table::*;
|
||||||
|
|
||||||
use crate::garage::Garage;
|
use crate::garage::Garage;
|
||||||
|
|
||||||
|
/// Size under which data will be stored inlined in database instead of as files
|
||||||
pub const INLINE_THRESHOLD: usize = 3072;
|
pub const INLINE_THRESHOLD: usize = 3072;
|
||||||
|
|
||||||
pub const BACKGROUND_WORKERS: u64 = 1;
|
pub const BACKGROUND_WORKERS: u64 = 1;
|
||||||
|
@ -33,28 +34,41 @@ const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
|
||||||
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
|
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
|
||||||
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
|
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
|
||||||
|
|
||||||
|
/// RPC messages used to share blocks of data between nodes
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub enum Message {
|
pub enum Message {
|
||||||
Ok,
|
Ok,
|
||||||
|
/// Message to ask for a block of data, by hash
|
||||||
GetBlock(Hash),
|
GetBlock(Hash),
|
||||||
|
/// Message to send a block of data, either because requested, of for first delivery of new
|
||||||
|
/// block
|
||||||
PutBlock(PutBlockMessage),
|
PutBlock(PutBlockMessage),
|
||||||
|
/// Ask other node if they should have this block, but don't actually have it
|
||||||
NeedBlockQuery(Hash),
|
NeedBlockQuery(Hash),
|
||||||
|
/// Response : whether the node do require that block
|
||||||
NeedBlockReply(bool),
|
NeedBlockReply(bool),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Structure used to send a block
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub struct PutBlockMessage {
|
pub struct PutBlockMessage {
|
||||||
|
/// Hash of the block
|
||||||
pub hash: Hash,
|
pub hash: Hash,
|
||||||
|
|
||||||
|
/// Content of the block
|
||||||
#[serde(with = "serde_bytes")]
|
#[serde(with = "serde_bytes")]
|
||||||
pub data: Vec<u8>,
|
pub data: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RpcMessage for Message {}
|
impl RpcMessage for Message {}
|
||||||
|
|
||||||
|
/// The block manager, handling block exchange between nodes, and block storage on local node
|
||||||
pub struct BlockManager {
|
pub struct BlockManager {
|
||||||
|
/// Replication strategy, allowing to find on which node blocks should be located
|
||||||
pub replication: TableShardedReplication,
|
pub replication: TableShardedReplication,
|
||||||
|
/// Directory in which block are stored
|
||||||
pub data_dir: PathBuf,
|
pub data_dir: PathBuf,
|
||||||
|
/// Lock to prevent concurrent edition of the directory
|
||||||
pub data_dir_lock: Mutex<()>,
|
pub data_dir_lock: Mutex<()>,
|
||||||
|
|
||||||
rc: sled::Tree,
|
rc: sled::Tree,
|
||||||
|
@ -128,7 +142,8 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn spawn_background_worker(self: Arc<Self>) {
|
pub fn spawn_background_worker(self: Arc<Self>) {
|
||||||
// Launch 2 simultaneous workers for background resync loop preprocessing
|
// Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this
|
||||||
|
// launches only one worker with current value of BACKGROUND_WORKERS
|
||||||
for i in 0..BACKGROUND_WORKERS {
|
for i in 0..BACKGROUND_WORKERS {
|
||||||
let bm2 = self.clone();
|
let bm2 = self.clone();
|
||||||
let background = self.system.background.clone();
|
let background = self.system.background.clone();
|
||||||
|
@ -141,7 +156,8 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
|
/// Write a block to disk
|
||||||
|
async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
|
||||||
let _lock = self.data_dir_lock.lock().await;
|
let _lock = self.data_dir_lock.lock().await;
|
||||||
|
|
||||||
let mut path = self.block_dir(hash);
|
let mut path = self.block_dir(hash);
|
||||||
|
@ -159,7 +175,8 @@ impl BlockManager {
|
||||||
Ok(Message::Ok)
|
Ok(Message::Ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
|
/// Read block from disk, verifying it's integrity
|
||||||
|
async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
|
||||||
let path = self.block_path(hash);
|
let path = self.block_path(hash);
|
||||||
|
|
||||||
let mut f = match fs::File::open(&path).await {
|
let mut f = match fs::File::open(&path).await {
|
||||||
|
@ -190,7 +207,8 @@ impl BlockManager {
|
||||||
Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
|
Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
|
/// Check if this node should have a block, but don't actually have it
|
||||||
|
async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
|
||||||
let needed = self
|
let needed = self
|
||||||
.rc
|
.rc
|
||||||
.get(hash.as_ref())?
|
.get(hash.as_ref())?
|
||||||
|
@ -217,6 +235,8 @@ impl BlockManager {
|
||||||
path
|
path
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Increment the number of time a block is used, putting it to resynchronization if it is
|
||||||
|
/// required, but not known
|
||||||
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
|
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
|
||||||
let old_rc = self.rc.fetch_and_update(&hash, |old| {
|
let old_rc = self.rc.fetch_and_update(&hash, |old| {
|
||||||
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
|
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
|
||||||
|
@ -229,6 +249,7 @@ impl BlockManager {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Decrement the number of time a block is used
|
||||||
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
|
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
|
||||||
let new_rc = self.rc.update_and_fetch(&hash, |old| {
|
let new_rc = self.rc.update_and_fetch(&hash, |old| {
|
||||||
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
|
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
|
||||||
|
@ -388,6 +409,7 @@ impl BlockManager {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Ask nodes that might have a block for it
|
||||||
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
|
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
|
||||||
let who = self.replication.read_nodes(&hash);
|
let who = self.replication.read_nodes(&hash);
|
||||||
let resps = self
|
let resps = self
|
||||||
|
@ -412,6 +434,7 @@ impl BlockManager {
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Send block to nodes that should have it
|
||||||
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
|
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
|
||||||
let who = self.replication.write_nodes(&hash);
|
let who = self.replication.write_nodes(&hash);
|
||||||
self.rpc_client
|
self.rpc_client
|
||||||
|
@ -498,6 +521,7 @@ impl BlockManager {
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get lenght of resync queue
|
||||||
pub fn resync_queue_len(&self) -> usize {
|
pub fn resync_queue_len(&self) -> usize {
|
||||||
self.resync_queue.len()
|
self.resync_queue.len()
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,13 +10,14 @@ use crate::block::*;
|
||||||
|
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct BlockRef {
|
pub struct BlockRef {
|
||||||
// Primary key
|
/// Hash of the block, used as partition key
|
||||||
pub block: Hash,
|
pub block: Hash,
|
||||||
|
|
||||||
// Sort key
|
/// Id of the Version for the object containing this block, used as sorting key
|
||||||
pub version: UUID,
|
pub version: UUID,
|
||||||
|
|
||||||
// Keep track of deleted status
|
// Keep track of deleted status
|
||||||
|
/// Is the Version that contains this block deleted
|
||||||
pub deleted: crdt::Bool,
|
pub deleted: crdt::Bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,15 +12,18 @@ use crate::key_table::PermissionSet;
|
||||||
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
|
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct Bucket {
|
pub struct Bucket {
|
||||||
// Primary key
|
/// Name of the bucket
|
||||||
pub name: String,
|
pub name: String,
|
||||||
|
/// State, and configuration if not deleted, of the bucket
|
||||||
pub state: crdt::LWW<BucketState>,
|
pub state: crdt::LWW<BucketState>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// State of a bucket
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub enum BucketState {
|
pub enum BucketState {
|
||||||
|
/// The bucket is deleted
|
||||||
Deleted,
|
Deleted,
|
||||||
|
/// The bucket exists
|
||||||
Present(BucketParams),
|
Present(BucketParams),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,9 +40,12 @@ impl CRDT for BucketState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Configuration for a bucket
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct BucketParams {
|
pub struct BucketParams {
|
||||||
|
/// Map of key with access to the bucket, and what kind of access they give
|
||||||
pub authorized_keys: crdt::LWWMap<String, PermissionSet>,
|
pub authorized_keys: crdt::LWWMap<String, PermissionSet>,
|
||||||
|
/// Is the bucket served as http
|
||||||
pub website: crdt::LWW<bool>,
|
pub website: crdt::LWW<bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,6 +57,7 @@ impl CRDT for BucketParams {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BucketParams {
|
impl BucketParams {
|
||||||
|
/// Initializes a new instance of the Bucket struct
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
BucketParams {
|
BucketParams {
|
||||||
authorized_keys: crdt::LWWMap::new(),
|
authorized_keys: crdt::LWWMap::new(),
|
||||||
|
@ -60,15 +67,21 @@ impl BucketParams {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Bucket {
|
impl Bucket {
|
||||||
|
/// Create a new bucket
|
||||||
pub fn new(name: String) -> Self {
|
pub fn new(name: String) -> Self {
|
||||||
Bucket {
|
Bucket {
|
||||||
name,
|
name,
|
||||||
state: crdt::LWW::new(BucketState::Present(BucketParams::new())),
|
state: crdt::LWW::new(BucketState::Present(BucketParams::new())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns true if this represents a deleted bucket
|
||||||
pub fn is_deleted(&self) -> bool {
|
pub fn is_deleted(&self) -> bool {
|
||||||
*self.state.get() == BucketState::Deleted
|
*self.state.get() == BucketState::Deleted
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return the list of authorized keys, when each was updated, and the permission associated to
|
||||||
|
/// the key
|
||||||
pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] {
|
pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] {
|
||||||
match self.state.get() {
|
match self.state.get() {
|
||||||
BucketState::Deleted => &[],
|
BucketState::Deleted => &[],
|
||||||
|
|
|
@ -7,8 +7,8 @@ use garage_rpc::membership::System;
|
||||||
use garage_rpc::rpc_client::RpcHttpClient;
|
use garage_rpc::rpc_client::RpcHttpClient;
|
||||||
use garage_rpc::rpc_server::RpcServer;
|
use garage_rpc::rpc_server::RpcServer;
|
||||||
|
|
||||||
use garage_table::replication::fullcopy::*;
|
use garage_table::replication::TableFullReplication;
|
||||||
use garage_table::replication::sharded::*;
|
use garage_table::replication::TableShardedReplication;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
|
|
||||||
use crate::block::*;
|
use crate::block::*;
|
||||||
|
@ -18,15 +18,23 @@ use crate::key_table::*;
|
||||||
use crate::object_table::*;
|
use crate::object_table::*;
|
||||||
use crate::version_table::*;
|
use crate::version_table::*;
|
||||||
|
|
||||||
|
/// An entire Garage full of data
|
||||||
pub struct Garage {
|
pub struct Garage {
|
||||||
|
/// The parsed configuration Garage is running
|
||||||
pub config: Config,
|
pub config: Config,
|
||||||
|
|
||||||
|
/// The local database
|
||||||
pub db: sled::Db,
|
pub db: sled::Db,
|
||||||
|
/// A background job runner
|
||||||
pub background: Arc<BackgroundRunner>,
|
pub background: Arc<BackgroundRunner>,
|
||||||
|
/// The membership manager
|
||||||
pub system: Arc<System>,
|
pub system: Arc<System>,
|
||||||
|
/// The block manager
|
||||||
pub block_manager: Arc<BlockManager>,
|
pub block_manager: Arc<BlockManager>,
|
||||||
|
|
||||||
|
/// Table containing informations about buckets
|
||||||
pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
|
pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
|
||||||
|
/// Table containing informations about api keys
|
||||||
pub key_table: Arc<Table<KeyTable, TableFullReplication>>,
|
pub key_table: Arc<Table<KeyTable, TableFullReplication>>,
|
||||||
|
|
||||||
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
|
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
|
||||||
|
@ -35,6 +43,7 @@ pub struct Garage {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Garage {
|
impl Garage {
|
||||||
|
/// Create and run garage
|
||||||
pub fn new(
|
pub fn new(
|
||||||
config: Config,
|
config: Config,
|
||||||
db: sled::Db,
|
db: sled::Db,
|
||||||
|
|
|
@ -3,26 +3,28 @@ use serde::{Deserialize, Serialize};
|
||||||
use garage_table::crdt::*;
|
use garage_table::crdt::*;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
|
|
||||||
|
/// An api key
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct Key {
|
pub struct Key {
|
||||||
// Primary key
|
/// The id of the key (immutable), used as partition key
|
||||||
pub key_id: String,
|
pub key_id: String,
|
||||||
|
|
||||||
// Associated secret key (immutable)
|
/// The secret_key associated
|
||||||
pub secret_key: String,
|
pub secret_key: String,
|
||||||
|
|
||||||
// Name
|
/// Name for the key
|
||||||
pub name: crdt::LWW<String>,
|
pub name: crdt::LWW<String>,
|
||||||
|
|
||||||
// Deletion
|
/// Is the key deleted
|
||||||
pub deleted: crdt::Bool,
|
pub deleted: crdt::Bool,
|
||||||
|
|
||||||
// Authorized keys
|
/// Buckets in which the key is authorized. Empty if `Key` is deleted
|
||||||
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
|
|
||||||
// CRDT interaction: deleted implies authorized_buckets is empty
|
// CRDT interaction: deleted implies authorized_buckets is empty
|
||||||
|
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Key {
|
impl Key {
|
||||||
|
/// Create a new key
|
||||||
pub fn new(name: String) -> Self {
|
pub fn new(name: String) -> Self {
|
||||||
let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..]));
|
let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..]));
|
||||||
let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]);
|
let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]);
|
||||||
|
@ -34,6 +36,8 @@ impl Key {
|
||||||
authorized_buckets: crdt::LWWMap::new(),
|
authorized_buckets: crdt::LWWMap::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Import a key from it's parts
|
||||||
pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self {
|
pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self {
|
||||||
Self {
|
Self {
|
||||||
key_id: key_id.to_string(),
|
key_id: key_id.to_string(),
|
||||||
|
@ -43,6 +47,8 @@ impl Key {
|
||||||
authorized_buckets: crdt::LWWMap::new(),
|
authorized_buckets: crdt::LWWMap::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create a new Key which can me merged to mark an existing key deleted
|
||||||
pub fn delete(key_id: String) -> Self {
|
pub fn delete(key_id: String) -> Self {
|
||||||
Self {
|
Self {
|
||||||
key_id,
|
key_id,
|
||||||
|
@ -52,13 +58,16 @@ impl Key {
|
||||||
authorized_buckets: crdt::LWWMap::new(),
|
authorized_buckets: crdt::LWWMap::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/// Add an authorized bucket, only if it wasn't there before
|
|
||||||
|
/// Check if `Key` is allowed to read in bucket
|
||||||
pub fn allow_read(&self, bucket: &str) -> bool {
|
pub fn allow_read(&self, bucket: &str) -> bool {
|
||||||
self.authorized_buckets
|
self.authorized_buckets
|
||||||
.get(&bucket.to_string())
|
.get(&bucket.to_string())
|
||||||
.map(|x| x.allow_read)
|
.map(|x| x.allow_read)
|
||||||
.unwrap_or(false)
|
.unwrap_or(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check if `Key` is allowed to write in bucket
|
||||||
pub fn allow_write(&self, bucket: &str) -> bool {
|
pub fn allow_write(&self, bucket: &str) -> bool {
|
||||||
self.authorized_buckets
|
self.authorized_buckets
|
||||||
.get(&bucket.to_string())
|
.get(&bucket.to_string())
|
||||||
|
@ -67,9 +76,12 @@ impl Key {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Permission given to a key in a bucket
|
||||||
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct PermissionSet {
|
pub struct PermissionSet {
|
||||||
|
/// The key can be used to read the bucket
|
||||||
pub allow_read: bool,
|
pub allow_read: bool,
|
||||||
|
/// The key can be used to write in the bucket
|
||||||
pub allow_write: bool,
|
pub allow_write: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,24 +6,26 @@ use garage_util::background::BackgroundRunner;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
|
||||||
use garage_table::crdt::*;
|
use garage_table::crdt::*;
|
||||||
use garage_table::replication::sharded::*;
|
use garage_table::replication::TableShardedReplication;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
|
|
||||||
use crate::version_table::*;
|
use crate::version_table::*;
|
||||||
|
|
||||||
|
/// An object
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct Object {
|
pub struct Object {
|
||||||
// Primary key
|
/// The bucket in which the object is stored, used as partition key
|
||||||
pub bucket: String,
|
pub bucket: String,
|
||||||
|
|
||||||
// Sort key
|
/// The key at which the object is stored in its bucket, used as sorting key
|
||||||
pub key: String,
|
pub key: String,
|
||||||
|
|
||||||
// Data
|
/// The list of currenty stored versions of the object
|
||||||
versions: Vec<ObjectVersion>,
|
versions: Vec<ObjectVersion>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Object {
|
impl Object {
|
||||||
|
/// Create an object from parts
|
||||||
pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self {
|
pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self {
|
||||||
let mut ret = Self {
|
let mut ret = Self {
|
||||||
bucket,
|
bucket,
|
||||||
|
@ -36,6 +38,7 @@ impl Object {
|
||||||
}
|
}
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Adds a version if it wasn't already present
|
/// Adds a version if it wasn't already present
|
||||||
pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> {
|
pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> {
|
||||||
match self
|
match self
|
||||||
|
@ -49,23 +52,32 @@ impl Object {
|
||||||
Ok(_) => Err(()),
|
Ok(_) => Err(()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get a list of currently stored versions of `Object`
|
||||||
pub fn versions(&self) -> &[ObjectVersion] {
|
pub fn versions(&self) -> &[ObjectVersion] {
|
||||||
&self.versions[..]
|
&self.versions[..]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Informations about a version of an object
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct ObjectVersion {
|
pub struct ObjectVersion {
|
||||||
|
/// Id of the version
|
||||||
pub uuid: UUID,
|
pub uuid: UUID,
|
||||||
|
/// Timestamp of when the object was created
|
||||||
pub timestamp: u64,
|
pub timestamp: u64,
|
||||||
|
/// State of the version
|
||||||
pub state: ObjectVersionState,
|
pub state: ObjectVersionState,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// State of an object version
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub enum ObjectVersionState {
|
pub enum ObjectVersionState {
|
||||||
|
/// The version is being received
|
||||||
Uploading(ObjectVersionHeaders),
|
Uploading(ObjectVersionHeaders),
|
||||||
|
/// The version is fully received
|
||||||
Complete(ObjectVersionData),
|
Complete(ObjectVersionData),
|
||||||
|
/// The version uploaded containded errors or the upload was explicitly aborted
|
||||||
Aborted,
|
Aborted,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,10 +102,15 @@ impl CRDT for ObjectVersionState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Data about an object version
|
||||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub enum ObjectVersionData {
|
pub enum ObjectVersionData {
|
||||||
|
/// The object was deleted, this Version is a tombstone to mark it as such
|
||||||
DeleteMarker,
|
DeleteMarker,
|
||||||
|
/// The object is short, it's stored inlined
|
||||||
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
|
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
|
||||||
|
/// The object is not short, Hash of first block is stored here, next segments hashes are
|
||||||
|
/// stored in the version table
|
||||||
FirstBlock(ObjectVersionMeta, Hash),
|
FirstBlock(ObjectVersionMeta, Hash),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,16 +118,23 @@ impl AutoCRDT for ObjectVersionData {
|
||||||
const WARN_IF_DIFFERENT: bool = true;
|
const WARN_IF_DIFFERENT: bool = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Metadata about the object version
|
||||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct ObjectVersionMeta {
|
pub struct ObjectVersionMeta {
|
||||||
|
/// Headers to send to the client
|
||||||
pub headers: ObjectVersionHeaders,
|
pub headers: ObjectVersionHeaders,
|
||||||
|
/// Size of the object
|
||||||
pub size: u64,
|
pub size: u64,
|
||||||
|
/// etag of the object
|
||||||
pub etag: String,
|
pub etag: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Additional headers for an object
|
||||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct ObjectVersionHeaders {
|
pub struct ObjectVersionHeaders {
|
||||||
|
/// Content type of the object
|
||||||
pub content_type: String,
|
pub content_type: String,
|
||||||
|
/// Any other http headers to send
|
||||||
pub other: BTreeMap<String, String>,
|
pub other: BTreeMap<String, String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,18 +142,24 @@ impl ObjectVersion {
|
||||||
fn cmp_key(&self) -> (u64, UUID) {
|
fn cmp_key(&self) -> (u64, UUID) {
|
||||||
(self.timestamp, self.uuid)
|
(self.timestamp, self.uuid)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Is the object version currently being uploaded
|
||||||
pub fn is_uploading(&self) -> bool {
|
pub fn is_uploading(&self) -> bool {
|
||||||
match self.state {
|
match self.state {
|
||||||
ObjectVersionState::Uploading(_) => true,
|
ObjectVersionState::Uploading(_) => true,
|
||||||
_ => false,
|
_ => false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Is the object version completely received
|
||||||
pub fn is_complete(&self) -> bool {
|
pub fn is_complete(&self) -> bool {
|
||||||
match self.state {
|
match self.state {
|
||||||
ObjectVersionState::Complete(_) => true,
|
ObjectVersionState::Complete(_) => true,
|
||||||
_ => false,
|
_ => false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Is the object version available (received and not a tombstone)
|
||||||
pub fn is_data(&self) -> bool {
|
pub fn is_data(&self) -> bool {
|
||||||
match self.state {
|
match self.state {
|
||||||
ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false,
|
ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false,
|
||||||
|
|
|
@ -5,26 +5,32 @@ use garage_util::background::BackgroundRunner;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
|
||||||
use garage_table::crdt::*;
|
use garage_table::crdt::*;
|
||||||
use garage_table::replication::sharded::*;
|
use garage_table::replication::TableShardedReplication;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
|
|
||||||
use crate::block_ref_table::*;
|
use crate::block_ref_table::*;
|
||||||
|
|
||||||
|
/// A version of an object
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct Version {
|
pub struct Version {
|
||||||
// Primary key
|
/// UUID of the version, used as partition key
|
||||||
pub uuid: UUID,
|
pub uuid: UUID,
|
||||||
|
|
||||||
// Actual data: the blocks for this version
|
// Actual data: the blocks for this version
|
||||||
// In the case of a multipart upload, also store the etags
|
// In the case of a multipart upload, also store the etags
|
||||||
// of individual parts and check them when doing CompleteMultipartUpload
|
// of individual parts and check them when doing CompleteMultipartUpload
|
||||||
|
/// Is this version deleted
|
||||||
pub deleted: crdt::Bool,
|
pub deleted: crdt::Bool,
|
||||||
|
/// list of blocks of data composing the version
|
||||||
pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
|
pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
|
||||||
|
/// Etag of each part in case of a multipart upload, empty otherwise
|
||||||
pub parts_etags: crdt::Map<u64, String>,
|
pub parts_etags: crdt::Map<u64, String>,
|
||||||
|
|
||||||
// Back link to bucket+key so that we can figure if
|
// Back link to bucket+key so that we can figure if
|
||||||
// this was deleted later on
|
// this was deleted later on
|
||||||
|
/// Bucket in which the related object is stored
|
||||||
pub bucket: String,
|
pub bucket: String,
|
||||||
|
/// Key in which the related object is stored
|
||||||
pub key: String,
|
pub key: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,7 +49,9 @@ impl Version {
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
|
||||||
pub struct VersionBlockKey {
|
pub struct VersionBlockKey {
|
||||||
|
/// Number of the part
|
||||||
pub part_number: u64,
|
pub part_number: u64,
|
||||||
|
/// Offset of this sub-segment in its part
|
||||||
pub offset: u64,
|
pub offset: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -61,9 +69,12 @@ impl PartialOrd for VersionBlockKey {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Informations about a single block
|
||||||
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
|
||||||
pub struct VersionBlock {
|
pub struct VersionBlock {
|
||||||
|
/// Hash of the block
|
||||||
pub hash: Hash,
|
pub hash: Hash,
|
||||||
|
/// Size of the block
|
||||||
pub size: u64,
|
pub size: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
|
//! Crate containing rpc related functions and types used in Garage
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
|
||||||
pub mod consul;
|
mod consul;
|
||||||
pub(crate) mod tls_util;
|
pub(crate) mod tls_util;
|
||||||
|
|
||||||
pub mod membership;
|
pub mod membership;
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
//! Module containing structs related to membership management
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fmt::Write as FmtWrite;
|
use std::fmt::Write as FmtWrite;
|
||||||
use std::io::{Read, Write};
|
use std::io::{Read, Write};
|
||||||
|
@ -30,20 +31,29 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
|
||||||
const PING_TIMEOUT: Duration = Duration::from_secs(2);
|
const PING_TIMEOUT: Duration = Duration::from_secs(2);
|
||||||
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
|
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
|
||||||
|
|
||||||
|
/// RPC endpoint used for calls related to membership
|
||||||
pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
|
pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
|
||||||
|
|
||||||
|
/// RPC messages related to membership
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub enum Message {
|
pub enum Message {
|
||||||
|
/// Response to successfull advertisements
|
||||||
Ok,
|
Ok,
|
||||||
|
/// Message sent to detect other nodes status
|
||||||
Ping(PingMessage),
|
Ping(PingMessage),
|
||||||
|
/// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp
|
||||||
PullStatus,
|
PullStatus,
|
||||||
|
/// Ask other node its config. Answered with AdvertiseConfig
|
||||||
PullConfig,
|
PullConfig,
|
||||||
|
/// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus
|
||||||
AdvertiseNodesUp(Vec<AdvertisedNode>),
|
AdvertiseNodesUp(Vec<AdvertisedNode>),
|
||||||
|
/// Advertisement of nodes config. Sent spontanously or in response to PullConfig
|
||||||
AdvertiseConfig(NetworkConfig),
|
AdvertiseConfig(NetworkConfig),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RpcMessage for Message {}
|
impl RpcMessage for Message {}
|
||||||
|
|
||||||
|
/// A ping, containing informations about status and config
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub struct PingMessage {
|
pub struct PingMessage {
|
||||||
id: UUID,
|
id: UUID,
|
||||||
|
@ -55,18 +65,25 @@ pub struct PingMessage {
|
||||||
state_info: StateInfo,
|
state_info: StateInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A node advertisement
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct AdvertisedNode {
|
pub struct AdvertisedNode {
|
||||||
|
/// Id of the node this advertisement relates to
|
||||||
pub id: UUID,
|
pub id: UUID,
|
||||||
|
/// IP and port of the node
|
||||||
pub addr: SocketAddr,
|
pub addr: SocketAddr,
|
||||||
|
|
||||||
|
/// Is the node considered up
|
||||||
pub is_up: bool,
|
pub is_up: bool,
|
||||||
|
/// When was the node last seen up, in milliseconds since UNIX epoch
|
||||||
pub last_seen: u64,
|
pub last_seen: u64,
|
||||||
|
|
||||||
pub state_info: StateInfo,
|
pub state_info: StateInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// This node's membership manager
|
||||||
pub struct System {
|
pub struct System {
|
||||||
|
/// The id of this node
|
||||||
pub id: UUID,
|
pub id: UUID,
|
||||||
|
|
||||||
persist_config: Persister<NetworkConfig>,
|
persist_config: Persister<NetworkConfig>,
|
||||||
|
@ -79,10 +96,12 @@ pub struct System {
|
||||||
rpc_client: Arc<RpcClient<Message>>,
|
rpc_client: Arc<RpcClient<Message>>,
|
||||||
|
|
||||||
pub(crate) status: watch::Receiver<Arc<Status>>,
|
pub(crate) status: watch::Receiver<Arc<Status>>,
|
||||||
|
/// The ring
|
||||||
pub ring: watch::Receiver<Arc<Ring>>,
|
pub ring: watch::Receiver<Arc<Ring>>,
|
||||||
|
|
||||||
update_lock: Mutex<Updaters>,
|
update_lock: Mutex<Updaters>,
|
||||||
|
|
||||||
|
/// The job runner of this node
|
||||||
pub background: Arc<BackgroundRunner>,
|
pub background: Arc<BackgroundRunner>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,21 +110,29 @@ struct Updaters {
|
||||||
update_ring: watch::Sender<Arc<Ring>>,
|
update_ring: watch::Sender<Arc<Ring>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The status of each nodes, viewed by this node
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Status {
|
pub struct Status {
|
||||||
|
/// Mapping of each node id to its known status
|
||||||
pub nodes: HashMap<UUID, Arc<StatusEntry>>,
|
pub nodes: HashMap<UUID, Arc<StatusEntry>>,
|
||||||
|
/// Hash of `nodes`, used to detect when nodes have different views of the cluster
|
||||||
pub hash: Hash,
|
pub hash: Hash,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The status of a single node
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct StatusEntry {
|
pub struct StatusEntry {
|
||||||
|
/// The IP and port used to connect to this node
|
||||||
pub addr: SocketAddr,
|
pub addr: SocketAddr,
|
||||||
|
/// Last time this node was seen
|
||||||
pub last_seen: u64,
|
pub last_seen: u64,
|
||||||
|
/// Number of consecutive pings sent without reply to this node
|
||||||
pub num_failures: AtomicUsize,
|
pub num_failures: AtomicUsize,
|
||||||
pub state_info: StateInfo,
|
pub state_info: StateInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StatusEntry {
|
impl StatusEntry {
|
||||||
|
/// is the node associated to this entry considered up
|
||||||
pub fn is_up(&self) -> bool {
|
pub fn is_up(&self) -> bool {
|
||||||
self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
|
self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
|
||||||
}
|
}
|
||||||
|
@ -195,6 +222,7 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl System {
|
impl System {
|
||||||
|
/// Create this node's membership manager
|
||||||
pub fn new(
|
pub fn new(
|
||||||
metadata_dir: PathBuf,
|
metadata_dir: PathBuf,
|
||||||
rpc_http_client: Arc<RpcHttpClient>,
|
rpc_http_client: Arc<RpcHttpClient>,
|
||||||
|
@ -279,6 +307,7 @@ impl System {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get an RPC client
|
||||||
pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> {
|
pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> {
|
||||||
RpcClient::new(
|
RpcClient::new(
|
||||||
RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()),
|
RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()),
|
||||||
|
@ -287,6 +316,7 @@ impl System {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Save network configuration to disc
|
||||||
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
|
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
|
||||||
let ring = self.ring.borrow().clone();
|
let ring = self.ring.borrow().clone();
|
||||||
self.persist_config
|
self.persist_config
|
||||||
|
@ -319,6 +349,7 @@ impl System {
|
||||||
self.rpc_client.call_many(&to[..], msg, timeout).await;
|
self.rpc_client.call_many(&to[..], msg, timeout).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Perform bootstraping, starting the ping loop
|
||||||
pub async fn bootstrap(
|
pub async fn bootstrap(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
peers: Vec<SocketAddr>,
|
peers: Vec<SocketAddr>,
|
||||||
|
@ -386,6 +417,8 @@ impl System {
|
||||||
}
|
}
|
||||||
} else if let Some(id) = id_option {
|
} else if let Some(id) = id_option {
|
||||||
if let Some(st) = status.nodes.get_mut(id) {
|
if let Some(st) = status.nodes.get_mut(id) {
|
||||||
|
// we need to increment failure counter as call was done using by_addr so the
|
||||||
|
// counter was not auto-incremented
|
||||||
st.num_failures.fetch_add(1, Ordering::SeqCst);
|
st.num_failures.fetch_add(1, Ordering::SeqCst);
|
||||||
if !st.is_up() {
|
if !st.is_up() {
|
||||||
warn!("Node {:?} seems to be down.", id);
|
warn!("Node {:?} seems to be down.", id);
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
//! Module containing types related to computing nodes which should receive a copy of data blocks
|
||||||
|
//! and metadata
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
|
|
||||||
|
@ -8,23 +10,30 @@ use garage_util::data::*;
|
||||||
// A partition number is encoded on 16 bits,
|
// A partition number is encoded on 16 bits,
|
||||||
// i.e. we have up to 2**16 partitions.
|
// i.e. we have up to 2**16 partitions.
|
||||||
// (in practice we have exactly 2**PARTITION_BITS partitions)
|
// (in practice we have exactly 2**PARTITION_BITS partitions)
|
||||||
|
/// A partition id, stored on 16 bits
|
||||||
pub type Partition = u16;
|
pub type Partition = u16;
|
||||||
|
|
||||||
// TODO: make this constant parametrizable in the config file
|
// TODO: make this constant parametrizable in the config file
|
||||||
// For deployments with many nodes it might make sense to bump
|
// For deployments with many nodes it might make sense to bump
|
||||||
// it up to 10.
|
// it up to 10.
|
||||||
// Maximum value : 16
|
// Maximum value : 16
|
||||||
|
/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
|
||||||
|
/// presence of numerous nodes, but exponentially bigger ring. Max 16
|
||||||
pub const PARTITION_BITS: usize = 8;
|
pub const PARTITION_BITS: usize = 8;
|
||||||
|
|
||||||
const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
|
const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
|
||||||
|
|
||||||
// TODO: make this constant paraetrizable in the config file
|
// TODO: make this constant paraetrizable in the config file
|
||||||
// (most deployments use a replication factor of 3, so...)
|
// (most deployments use a replication factor of 3, so...)
|
||||||
|
/// The maximum number of time an object might get replicated
|
||||||
pub const MAX_REPLICATION: usize = 3;
|
pub const MAX_REPLICATION: usize = 3;
|
||||||
|
|
||||||
|
/// The user-defined configuration of the cluster's nodes
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct NetworkConfig {
|
pub struct NetworkConfig {
|
||||||
|
/// Map of each node's id to it's configuration
|
||||||
pub members: HashMap<UUID, NetworkConfigEntry>,
|
pub members: HashMap<UUID, NetworkConfigEntry>,
|
||||||
|
/// Version of this config
|
||||||
pub version: u64,
|
pub version: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,26 +46,40 @@ impl NetworkConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The overall configuration of one (possibly remote) node
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct NetworkConfigEntry {
|
pub struct NetworkConfigEntry {
|
||||||
|
/// Datacenter at which this entry belong. This infromation might be used to perform a better
|
||||||
|
/// geodistribution
|
||||||
pub datacenter: String,
|
pub datacenter: String,
|
||||||
|
/// The (relative) capacity of the node
|
||||||
pub capacity: u32,
|
pub capacity: u32,
|
||||||
|
/// A tag to recognize the entry, not used for other things than display
|
||||||
pub tag: String,
|
pub tag: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A ring distributing fairly objects to nodes
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Ring {
|
pub struct Ring {
|
||||||
|
/// The network configuration used to generate this ring
|
||||||
pub config: NetworkConfig,
|
pub config: NetworkConfig,
|
||||||
|
/// The list of entries in the ring
|
||||||
pub ring: Vec<RingEntry>,
|
pub ring: Vec<RingEntry>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// An entry in the ring
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct RingEntry {
|
pub struct RingEntry {
|
||||||
|
/// The prefix of the Hash of object which should use this entry
|
||||||
pub location: Hash,
|
pub location: Hash,
|
||||||
|
/// The nodes in which a matching object should get stored
|
||||||
pub nodes: [UUID; MAX_REPLICATION],
|
pub nodes: [UUID; MAX_REPLICATION],
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Ring {
|
impl Ring {
|
||||||
|
// TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6
|
||||||
|
// levels of imbrication. It is basically impossible to test, maintain, or understand for an
|
||||||
|
// outsider.
|
||||||
pub(crate) fn new(config: NetworkConfig) -> Self {
|
pub(crate) fn new(config: NetworkConfig) -> Self {
|
||||||
// Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
|
// Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
|
||||||
let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
|
let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
|
||||||
|
@ -166,20 +189,16 @@ impl Ring {
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
// eprintln!("RING: --");
|
|
||||||
// for e in ring.iter() {
|
|
||||||
// eprintln!("{:?}", e);
|
|
||||||
// }
|
|
||||||
// eprintln!("END --");
|
|
||||||
|
|
||||||
Self { config, ring }
|
Self { config, ring }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the partition in which data would fall on
|
||||||
pub fn partition_of(&self, from: &Hash) -> Partition {
|
pub fn partition_of(&self, from: &Hash) -> Partition {
|
||||||
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
|
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
|
||||||
top >> (16 - PARTITION_BITS)
|
top >> (16 - PARTITION_BITS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the list of partitions and the first hash of a partition key that would fall in it
|
||||||
pub fn partitions(&self) -> Vec<(Partition, Hash)> {
|
pub fn partitions(&self) -> Vec<(Partition, Hash)> {
|
||||||
let mut ret = vec![];
|
let mut ret = vec![];
|
||||||
|
|
||||||
|
@ -193,6 +212,8 @@ impl Ring {
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO rename this function as it no longer walk the ring
|
||||||
|
/// Walk the ring to find the n servers in which data should be replicated
|
||||||
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
|
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
|
||||||
if self.ring.len() != 1 << PARTITION_BITS {
|
if self.ring.len() != 1 << PARTITION_BITS {
|
||||||
warn!("Ring not yet ready, read/writes will be lost!");
|
warn!("Ring not yet ready, read/writes will be lost!");
|
||||||
|
@ -201,12 +222,15 @@ impl Ring {
|
||||||
|
|
||||||
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
|
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
|
||||||
let partition_idx = (top >> (16 - PARTITION_BITS)) as usize;
|
let partition_idx = (top >> (16 - PARTITION_BITS)) as usize;
|
||||||
|
// TODO why computing two time in the same way and asserting?
|
||||||
assert_eq!(partition_idx, self.partition_of(from) as usize);
|
assert_eq!(partition_idx, self.partition_of(from) as usize);
|
||||||
|
|
||||||
let partition = &self.ring[partition_idx];
|
let partition = &self.ring[partition_idx];
|
||||||
|
|
||||||
let partition_top =
|
let partition_top =
|
||||||
u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
|
u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
|
||||||
|
// TODO is this an assertion on the validity of PARTITION_MASK_U16? If so, it should
|
||||||
|
// probably be a test more than a runtime assertion
|
||||||
assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16);
|
assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16);
|
||||||
|
|
||||||
assert!(n <= partition.nodes.len());
|
assert!(n <= partition.nodes.len());
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
//! Contain structs related to making RPCs
|
||||||
use std::borrow::Borrow;
|
use std::borrow::Borrow;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
@ -26,14 +27,19 @@ use crate::tls_util;
|
||||||
|
|
||||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
|
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||||
|
|
||||||
|
/// Strategy to apply when making RPC
|
||||||
#[derive(Copy, Clone)]
|
#[derive(Copy, Clone)]
|
||||||
pub struct RequestStrategy {
|
pub struct RequestStrategy {
|
||||||
|
/// Max time to wait for reponse
|
||||||
pub rs_timeout: Duration,
|
pub rs_timeout: Duration,
|
||||||
|
/// Min number of response to consider the request successful
|
||||||
pub rs_quorum: usize,
|
pub rs_quorum: usize,
|
||||||
|
/// Should requests be dropped after enough response are received
|
||||||
pub rs_interrupt_after_quorum: bool,
|
pub rs_interrupt_after_quorum: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RequestStrategy {
|
impl RequestStrategy {
|
||||||
|
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
|
||||||
pub fn with_quorum(quorum: usize) -> Self {
|
pub fn with_quorum(quorum: usize) -> Self {
|
||||||
RequestStrategy {
|
RequestStrategy {
|
||||||
rs_timeout: DEFAULT_TIMEOUT,
|
rs_timeout: DEFAULT_TIMEOUT,
|
||||||
|
@ -41,19 +47,25 @@ impl RequestStrategy {
|
||||||
rs_interrupt_after_quorum: false,
|
rs_interrupt_after_quorum: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/// Set timeout of the strategy
|
||||||
pub fn with_timeout(mut self, timeout: Duration) -> Self {
|
pub fn with_timeout(mut self, timeout: Duration) -> Self {
|
||||||
self.rs_timeout = timeout;
|
self.rs_timeout = timeout;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
/// Set if requests can be dropped after quorum has been reached
|
||||||
|
/// In general true for read requests, and false for write
|
||||||
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
|
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
|
||||||
self.rs_interrupt_after_quorum = interrupt;
|
self.rs_interrupt_after_quorum = interrupt;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Shortcut for a boxed async function taking a message, and resolving to another message or an
|
||||||
|
/// error
|
||||||
pub type LocalHandlerFn<M> =
|
pub type LocalHandlerFn<M> =
|
||||||
Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>;
|
Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>;
|
||||||
|
|
||||||
|
/// Client used to send RPC
|
||||||
pub struct RpcClient<M: RpcMessage> {
|
pub struct RpcClient<M: RpcMessage> {
|
||||||
status: watch::Receiver<Arc<Status>>,
|
status: watch::Receiver<Arc<Status>>,
|
||||||
background: Arc<BackgroundRunner>,
|
background: Arc<BackgroundRunner>,
|
||||||
|
@ -64,6 +76,7 @@ pub struct RpcClient<M: RpcMessage> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: RpcMessage + 'static> RpcClient<M> {
|
impl<M: RpcMessage + 'static> RpcClient<M> {
|
||||||
|
/// Create a new RpcClient from an address, a job runner, and the status of all RPC servers
|
||||||
pub fn new(
|
pub fn new(
|
||||||
rac: RpcAddrClient<M>,
|
rac: RpcAddrClient<M>,
|
||||||
background: Arc<BackgroundRunner>,
|
background: Arc<BackgroundRunner>,
|
||||||
|
@ -77,6 +90,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set the local handler, to process RPC to this node without network usage
|
||||||
pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F)
|
pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F)
|
||||||
where
|
where
|
||||||
F: Fn(Arc<M>) -> Fut + Send + Sync + 'static,
|
F: Fn(Arc<M>) -> Fut + Send + Sync + 'static,
|
||||||
|
@ -90,14 +104,17 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
||||||
self.local_handler.swap(Some(Arc::new((my_id, handler))));
|
self.local_handler.swap(Some(Arc::new((my_id, handler))));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get a RPC client to make calls using node's SocketAddr instead of its ID
|
||||||
pub fn by_addr(&self) -> &RpcAddrClient<M> {
|
pub fn by_addr(&self) -> &RpcAddrClient<M> {
|
||||||
&self.rpc_addr_client
|
&self.rpc_addr_client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Make a RPC call
|
||||||
pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> {
|
pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> {
|
||||||
self.call_arc(to, Arc::new(msg), timeout).await
|
self.call_arc(to, Arc::new(msg), timeout).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Make a RPC call from a message stored in an Arc
|
||||||
pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
|
pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
|
||||||
if let Some(lh) = self.local_handler.load_full() {
|
if let Some(lh) = self.local_handler.load_full() {
|
||||||
let (my_id, local_handler) = lh.as_ref();
|
let (my_id, local_handler) = lh.as_ref();
|
||||||
|
@ -135,6 +152,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Make a RPC call to multiple servers, returning a Vec containing each result
|
||||||
pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
|
pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
|
||||||
let msg = Arc::new(msg);
|
let msg = Arc::new(msg);
|
||||||
let mut resp_stream = to
|
let mut resp_stream = to
|
||||||
|
@ -149,6 +167,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
||||||
results
|
results
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if
|
||||||
|
/// strategy could not be respected due to too many errors
|
||||||
pub async fn try_call_many(
|
pub async fn try_call_many(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
to: &[UUID],
|
to: &[UUID],
|
||||||
|
@ -208,6 +228,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Thin wrapper arround an `RpcHttpClient` specifying the path of the request
|
||||||
pub struct RpcAddrClient<M: RpcMessage> {
|
pub struct RpcAddrClient<M: RpcMessage> {
|
||||||
phantom: PhantomData<M>,
|
phantom: PhantomData<M>,
|
||||||
|
|
||||||
|
@ -216,6 +237,7 @@ pub struct RpcAddrClient<M: RpcMessage> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: RpcMessage> RpcAddrClient<M> {
|
impl<M: RpcMessage> RpcAddrClient<M> {
|
||||||
|
/// Create an RpcAddrClient from an HTTP client and the endpoint to reach for RPCs
|
||||||
pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self {
|
pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self {
|
||||||
Self {
|
Self {
|
||||||
phantom: PhantomData::default(),
|
phantom: PhantomData::default(),
|
||||||
|
@ -224,6 +246,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Make a RPC
|
||||||
pub async fn call<MB>(
|
pub async fn call<MB>(
|
||||||
&self,
|
&self,
|
||||||
to_addr: &SocketAddr,
|
to_addr: &SocketAddr,
|
||||||
|
@ -239,6 +262,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// HTTP client used to make RPCs
|
||||||
pub struct RpcHttpClient {
|
pub struct RpcHttpClient {
|
||||||
request_limiter: Semaphore,
|
request_limiter: Semaphore,
|
||||||
method: ClientMethod,
|
method: ClientMethod,
|
||||||
|
@ -250,6 +274,7 @@ enum ClientMethod {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RpcHttpClient {
|
impl RpcHttpClient {
|
||||||
|
/// Create a new RpcHttpClient
|
||||||
pub fn new(
|
pub fn new(
|
||||||
max_concurrent_requests: usize,
|
max_concurrent_requests: usize,
|
||||||
tls_config: &Option<TlsConfig>,
|
tls_config: &Option<TlsConfig>,
|
||||||
|
@ -280,6 +305,7 @@ impl RpcHttpClient {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Make a RPC
|
||||||
async fn call<M, MB>(
|
async fn call<M, MB>(
|
||||||
&self,
|
&self,
|
||||||
path: &str,
|
path: &str,
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
//! Contains structs related to receiving RPCs
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
@ -22,13 +23,17 @@ use garage_util::error::Error;
|
||||||
|
|
||||||
use crate::tls_util;
|
use crate::tls_util;
|
||||||
|
|
||||||
|
/// Trait for messages that can be sent as RPC
|
||||||
pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {}
|
pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {}
|
||||||
|
|
||||||
type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>;
|
type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>;
|
||||||
type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>;
|
type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>;
|
||||||
|
|
||||||
|
/// Structure handling RPCs
|
||||||
pub struct RpcServer {
|
pub struct RpcServer {
|
||||||
|
/// The address the RpcServer will bind
|
||||||
pub bind_addr: SocketAddr,
|
pub bind_addr: SocketAddr,
|
||||||
|
/// The tls configuration used for RPC
|
||||||
pub tls_config: Option<TlsConfig>,
|
pub tls_config: Option<TlsConfig>,
|
||||||
|
|
||||||
handlers: HashMap<String, Handler>,
|
handlers: HashMap<String, Handler>,
|
||||||
|
@ -87,6 +92,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RpcServer {
|
impl RpcServer {
|
||||||
|
/// Create a new RpcServer
|
||||||
pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self {
|
pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
bind_addr,
|
bind_addr,
|
||||||
|
@ -95,6 +101,7 @@ impl RpcServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Add handler handling request made to `name`
|
||||||
pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F)
|
pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F)
|
||||||
where
|
where
|
||||||
M: RpcMessage + 'static,
|
M: RpcMessage + 'static,
|
||||||
|
@ -156,6 +163,7 @@ impl RpcServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Run the RpcServer
|
||||||
pub async fn run(
|
pub async fn run(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
shutdown_signal: impl Future<Output = ()>,
|
shutdown_signal: impl Future<Output = ()>,
|
||||||
|
|
|
@ -34,7 +34,7 @@ use crate::crdt::crdt::*;
|
||||||
/// and may differ from what you observed with your atomic clock!
|
/// and may differ from what you observed with your atomic clock!
|
||||||
///
|
///
|
||||||
/// This scheme is used by AWS S3 or Soundcloud and often without knowing
|
/// This scheme is used by AWS S3 or Soundcloud and often without knowing
|
||||||
/// in entreprise when reconciliating databases with ad-hoc scripts.
|
/// in enterprise when reconciliating databases with ad-hoc scripts.
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||||
pub struct LWW<T> {
|
pub struct LWW<T> {
|
||||||
ts: u64,
|
ts: u64,
|
||||||
|
|
|
@ -37,6 +37,7 @@ where
|
||||||
Self { vals: vec![(k, v)] }
|
Self { vals: vec![(k, v)] }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Add a value to the map
|
||||||
pub fn put(&mut self, k: K, v: V) {
|
pub fn put(&mut self, k: K, v: V) {
|
||||||
self.merge(&Self::put_mutator(k, v));
|
self.merge(&Self::put_mutator(k, v));
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,7 +74,7 @@ where
|
||||||
while !*must_exit.borrow() {
|
while !*must_exit.borrow() {
|
||||||
match self.gc_loop_iter().await {
|
match self.gc_loop_iter().await {
|
||||||
Ok(true) => {
|
Ok(true) => {
|
||||||
// Stuff was done, loop imediately
|
// Stuff was done, loop immediately
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Ok(false) => {
|
Ok(false) => {
|
||||||
|
|
|
@ -8,10 +8,10 @@ pub mod schema;
|
||||||
pub mod util;
|
pub mod util;
|
||||||
|
|
||||||
pub mod data;
|
pub mod data;
|
||||||
pub mod gc;
|
mod gc;
|
||||||
pub mod merkle;
|
mod merkle;
|
||||||
pub mod replication;
|
pub mod replication;
|
||||||
pub mod sync;
|
mod sync;
|
||||||
pub mod table;
|
pub mod table;
|
||||||
|
|
||||||
pub use schema::*;
|
pub use schema::*;
|
||||||
|
|
|
@ -6,19 +6,19 @@ use garage_util::data::*;
|
||||||
|
|
||||||
use crate::replication::*;
|
use crate::replication::*;
|
||||||
|
|
||||||
|
/// Full replication schema: all nodes store everything
|
||||||
|
/// Writes are disseminated in an epidemic manner in the network
|
||||||
|
/// Advantage: do all reads locally, extremely fast
|
||||||
|
/// Inconvenient: only suitable to reasonably small tables
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct TableFullReplication {
|
pub struct TableFullReplication {
|
||||||
|
/// The membership manager of this node
|
||||||
pub system: Arc<System>,
|
pub system: Arc<System>,
|
||||||
|
/// Max number of faults allowed while replicating a record
|
||||||
pub max_faults: usize,
|
pub max_faults: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TableReplication for TableFullReplication {
|
impl TableReplication for TableFullReplication {
|
||||||
// Full replication schema: all nodes store everything
|
|
||||||
// Writes are disseminated in an epidemic manner in the network
|
|
||||||
|
|
||||||
// Advantage: do all reads locally, extremely fast
|
|
||||||
// Inconvenient: only suitable to reasonably small tables
|
|
||||||
|
|
||||||
fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> {
|
fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> {
|
||||||
vec![self.system.id]
|
vec![self.system.id]
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
mod parameters;
|
mod parameters;
|
||||||
|
|
||||||
pub mod fullcopy;
|
mod fullcopy;
|
||||||
pub mod sharded;
|
mod sharded;
|
||||||
|
|
||||||
|
pub use fullcopy::TableFullReplication;
|
||||||
pub use parameters::*;
|
pub use parameters::*;
|
||||||
|
pub use sharded::TableShardedReplication;
|
||||||
|
|
|
@ -2,20 +2,25 @@ use garage_rpc::ring::*;
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
|
||||||
|
/// Trait to describe how a table shall be replicated
|
||||||
pub trait TableReplication: Send + Sync {
|
pub trait TableReplication: Send + Sync {
|
||||||
// See examples in table_sharded.rs and table_fullcopy.rs
|
// See examples in table_sharded.rs and table_fullcopy.rs
|
||||||
// To understand various replication methods
|
// To understand various replication methods
|
||||||
|
|
||||||
// Which nodes to send reads from
|
/// Which nodes to send read requests to
|
||||||
fn read_nodes(&self, hash: &Hash) -> Vec<UUID>;
|
fn read_nodes(&self, hash: &Hash) -> Vec<UUID>;
|
||||||
|
/// Responses needed to consider a read succesfull
|
||||||
fn read_quorum(&self) -> usize;
|
fn read_quorum(&self) -> usize;
|
||||||
|
|
||||||
// Which nodes to send writes to
|
/// Which nodes to send writes to
|
||||||
fn write_nodes(&self, hash: &Hash) -> Vec<UUID>;
|
fn write_nodes(&self, hash: &Hash) -> Vec<UUID>;
|
||||||
|
/// Responses needed to consider a write succesfull
|
||||||
fn write_quorum(&self) -> usize;
|
fn write_quorum(&self) -> usize;
|
||||||
fn max_write_errors(&self) -> usize;
|
fn max_write_errors(&self) -> usize;
|
||||||
|
|
||||||
// Accessing partitions, for Merkle tree & sync
|
// Accessing partitions, for Merkle tree & sync
|
||||||
|
/// Get partition for data with given hash
|
||||||
fn partition_of(&self, hash: &Hash) -> Partition;
|
fn partition_of(&self, hash: &Hash) -> Partition;
|
||||||
|
/// List of existing partitions
|
||||||
fn partitions(&self) -> Vec<(Partition, Hash)>;
|
fn partitions(&self) -> Vec<(Partition, Hash)>;
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,22 +6,25 @@ use garage_util::data::*;
|
||||||
|
|
||||||
use crate::replication::*;
|
use crate::replication::*;
|
||||||
|
|
||||||
|
/// Sharded replication schema:
|
||||||
|
/// - based on the ring of nodes, a certain set of neighbors
|
||||||
|
/// store entries, given as a function of the position of the
|
||||||
|
/// entry's hash in the ring
|
||||||
|
/// - reads are done on all of the nodes that replicate the data
|
||||||
|
/// - writes as well
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct TableShardedReplication {
|
pub struct TableShardedReplication {
|
||||||
|
/// The membership manager of this node
|
||||||
pub system: Arc<System>,
|
pub system: Arc<System>,
|
||||||
|
/// How many time each data should be replicated
|
||||||
pub replication_factor: usize,
|
pub replication_factor: usize,
|
||||||
|
/// How many nodes to contact for a read, should be at most `replication_factor`
|
||||||
pub read_quorum: usize,
|
pub read_quorum: usize,
|
||||||
|
/// How many nodes to contact for a write, should be at most `replication_factor`
|
||||||
pub write_quorum: usize,
|
pub write_quorum: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TableReplication for TableShardedReplication {
|
impl TableReplication for TableShardedReplication {
|
||||||
// Sharded replication schema:
|
|
||||||
// - based on the ring of nodes, a certain set of neighbors
|
|
||||||
// store entries, given as a function of the position of the
|
|
||||||
// entry's hash in the ring
|
|
||||||
// - reads are done on all of the nodes that replicate the data
|
|
||||||
// - writes as well
|
|
||||||
|
|
||||||
fn read_nodes(&self, hash: &Hash) -> Vec<UUID> {
|
fn read_nodes(&self, hash: &Hash) -> Vec<UUID> {
|
||||||
let ring = self.system.ring.borrow().clone();
|
let ring = self.system.ring.borrow().clone();
|
||||||
ring.walk_ring(&hash, self.replication_factor)
|
ring.walk_ring(&hash, self.replication_factor)
|
||||||
|
|
|
@ -4,7 +4,9 @@ use garage_util::data::*;
|
||||||
|
|
||||||
use crate::crdt::CRDT;
|
use crate::crdt::CRDT;
|
||||||
|
|
||||||
|
/// Trait for field used to partition data
|
||||||
pub trait PartitionKey {
|
pub trait PartitionKey {
|
||||||
|
/// Get the key used to partition
|
||||||
fn hash(&self) -> Hash;
|
fn hash(&self) -> Hash;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,7 +22,9 @@ impl PartitionKey for Hash {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Trait for field used to sort data
|
||||||
pub trait SortKey {
|
pub trait SortKey {
|
||||||
|
/// Get the key used to sort
|
||||||
fn sort_key(&self) -> &[u8];
|
fn sort_key(&self) -> &[u8];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,25 +40,34 @@ impl SortKey for Hash {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Trait for an entry in a table. It must be sortable and partitionnable.
|
||||||
pub trait Entry<P: PartitionKey, S: SortKey>:
|
pub trait Entry<P: PartitionKey, S: SortKey>:
|
||||||
CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
|
CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
|
||||||
{
|
{
|
||||||
|
/// Get the key used to partition
|
||||||
fn partition_key(&self) -> &P;
|
fn partition_key(&self) -> &P;
|
||||||
|
/// Get the key used to sort
|
||||||
fn sort_key(&self) -> &S;
|
fn sort_key(&self) -> &S;
|
||||||
|
|
||||||
|
/// Is the entry a tombstone? Default implementation always return false
|
||||||
fn is_tombstone(&self) -> bool {
|
fn is_tombstone(&self) -> bool {
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Trait for the schema used in a table
|
||||||
pub trait TableSchema: Send + Sync {
|
pub trait TableSchema: Send + Sync {
|
||||||
|
/// The partition key used in that table
|
||||||
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||||
|
/// The sort key used int that table
|
||||||
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||||
|
/// They type for an entry in that table
|
||||||
type E: Entry<Self::P, Self::S>;
|
type E: Entry<Self::P, Self::S>;
|
||||||
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||||
|
|
||||||
// Action to take if not able to decode current version:
|
// Action to take if not able to decode current version:
|
||||||
// try loading from an older version
|
// try loading from an older version
|
||||||
|
/// Try migrating an entry from an older version
|
||||||
fn try_migrate(_bytes: &[u8]) -> Option<Self::E> {
|
fn try_migrate(_bytes: &[u8]) -> Option<Self::E> {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
@ -65,7 +78,5 @@ pub trait TableSchema: Send + Sync {
|
||||||
// to stderr.
|
// to stderr.
|
||||||
fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {}
|
fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {}
|
||||||
|
|
||||||
fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
|
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool;
|
||||||
true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
//! Job runner for futures and async functions
|
||||||
use core::future::Future;
|
use core::future::Future;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -12,14 +13,15 @@ use crate::error::Error;
|
||||||
type JobOutput = Result<(), Error>;
|
type JobOutput = Result<(), Error>;
|
||||||
type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
|
type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
|
||||||
|
|
||||||
|
/// Job runner for futures and async functions
|
||||||
pub struct BackgroundRunner {
|
pub struct BackgroundRunner {
|
||||||
pub stop_signal: watch::Receiver<bool>,
|
stop_signal: watch::Receiver<bool>,
|
||||||
|
|
||||||
queue_in: mpsc::UnboundedSender<(Job, bool)>,
|
queue_in: mpsc::UnboundedSender<(Job, bool)>,
|
||||||
worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
|
worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BackgroundRunner {
|
impl BackgroundRunner {
|
||||||
|
/// Create a new BackgroundRunner
|
||||||
pub fn new(
|
pub fn new(
|
||||||
n_runners: usize,
|
n_runners: usize,
|
||||||
stop_signal: watch::Receiver<bool>,
|
stop_signal: watch::Receiver<bool>,
|
||||||
|
@ -103,7 +105,7 @@ impl BackgroundRunner {
|
||||||
(bgrunner, await_all_done)
|
(bgrunner, await_all_done)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Spawn a task to be run in background
|
/// Spawn a task to be run in background
|
||||||
pub fn spawn<T>(&self, job: T)
|
pub fn spawn<T>(&self, job: T)
|
||||||
where
|
where
|
||||||
T: Future<Output = JobOutput> + Send + 'static,
|
T: Future<Output = JobOutput> + Send + 'static,
|
||||||
|
@ -115,6 +117,8 @@ impl BackgroundRunner {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Spawn a task to be run in background. It may get discarded before running if spawned while
|
||||||
|
/// the runner is stopping
|
||||||
pub fn spawn_cancellable<T>(&self, job: T)
|
pub fn spawn_cancellable<T>(&self, job: T)
|
||||||
where
|
where
|
||||||
T: Future<Output = JobOutput> + Send + 'static,
|
T: Future<Output = JobOutput> + Send + 'static,
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
//! Contains type and functions related to Garage configuration file
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
@ -6,57 +7,82 @@ use serde::{de, Deserialize};
|
||||||
|
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
|
|
||||||
|
/// Represent the whole configuration
|
||||||
#[derive(Deserialize, Debug, Clone)]
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
|
/// Path where to store metadata. Should be fast, but low volume
|
||||||
pub metadata_dir: PathBuf,
|
pub metadata_dir: PathBuf,
|
||||||
|
/// Path where to store data. Can be slower, but need higher volume
|
||||||
pub data_dir: PathBuf,
|
pub data_dir: PathBuf,
|
||||||
|
|
||||||
|
/// Address to bind for RPC
|
||||||
pub rpc_bind_addr: SocketAddr,
|
pub rpc_bind_addr: SocketAddr,
|
||||||
|
|
||||||
|
/// Bootstrap peers RPC address
|
||||||
#[serde(deserialize_with = "deserialize_vec_addr")]
|
#[serde(deserialize_with = "deserialize_vec_addr")]
|
||||||
pub bootstrap_peers: Vec<SocketAddr>,
|
pub bootstrap_peers: Vec<SocketAddr>,
|
||||||
|
/// Consule host to connect to to discover more peers
|
||||||
pub consul_host: Option<String>,
|
pub consul_host: Option<String>,
|
||||||
|
/// Consul service name to use
|
||||||
pub consul_service_name: Option<String>,
|
pub consul_service_name: Option<String>,
|
||||||
|
|
||||||
|
/// Max number of concurrent RPC request
|
||||||
#[serde(default = "default_max_concurrent_rpc_requests")]
|
#[serde(default = "default_max_concurrent_rpc_requests")]
|
||||||
pub max_concurrent_rpc_requests: usize,
|
pub max_concurrent_rpc_requests: usize,
|
||||||
|
|
||||||
|
/// Size of data blocks to save to disk
|
||||||
#[serde(default = "default_block_size")]
|
#[serde(default = "default_block_size")]
|
||||||
pub block_size: usize,
|
pub block_size: usize,
|
||||||
|
|
||||||
#[serde(default = "default_control_write_max_faults")]
|
#[serde(default = "default_control_write_max_faults")]
|
||||||
pub control_write_max_faults: usize,
|
pub control_write_max_faults: usize,
|
||||||
|
|
||||||
|
/// How many nodes should hold a copy of meta data
|
||||||
#[serde(default = "default_replication_factor")]
|
#[serde(default = "default_replication_factor")]
|
||||||
pub meta_replication_factor: usize,
|
pub meta_replication_factor: usize,
|
||||||
|
|
||||||
|
/// How many nodes should hold a copy of data
|
||||||
#[serde(default = "default_replication_factor")]
|
#[serde(default = "default_replication_factor")]
|
||||||
pub data_replication_factor: usize,
|
pub data_replication_factor: usize,
|
||||||
|
|
||||||
|
/// Configuration for RPC TLS
|
||||||
pub rpc_tls: Option<TlsConfig>,
|
pub rpc_tls: Option<TlsConfig>,
|
||||||
|
|
||||||
|
/// Configuration for S3 api
|
||||||
pub s3_api: ApiConfig,
|
pub s3_api: ApiConfig,
|
||||||
|
|
||||||
|
/// Configuration for serving files as normal web server
|
||||||
pub s3_web: WebConfig,
|
pub s3_web: WebConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Configuration for RPC TLS
|
||||||
#[derive(Deserialize, Debug, Clone)]
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
pub struct TlsConfig {
|
pub struct TlsConfig {
|
||||||
|
/// Path to certificate autority used for all nodes
|
||||||
pub ca_cert: String,
|
pub ca_cert: String,
|
||||||
|
/// Path to public certificate for this node
|
||||||
pub node_cert: String,
|
pub node_cert: String,
|
||||||
|
/// Path to private key for this node
|
||||||
pub node_key: String,
|
pub node_key: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Configuration for S3 api
|
||||||
#[derive(Deserialize, Debug, Clone)]
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
pub struct ApiConfig {
|
pub struct ApiConfig {
|
||||||
|
/// Address and port to bind for api serving
|
||||||
pub api_bind_addr: SocketAddr,
|
pub api_bind_addr: SocketAddr,
|
||||||
|
/// S3 region to use
|
||||||
pub s3_region: String,
|
pub s3_region: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Configuration for serving files as normal web server
|
||||||
#[derive(Deserialize, Debug, Clone)]
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
pub struct WebConfig {
|
pub struct WebConfig {
|
||||||
|
/// Address and port to bind for web serving
|
||||||
pub bind_addr: SocketAddr,
|
pub bind_addr: SocketAddr,
|
||||||
|
/// Suffix to remove from domain name to find bucket
|
||||||
pub root_domain: String,
|
pub root_domain: String,
|
||||||
|
/// Suffix to add when user-agent request path end with "/"
|
||||||
pub index: String,
|
pub index: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,6 +99,7 @@ fn default_control_write_max_faults() -> usize {
|
||||||
1
|
1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Read and parse configuration
|
||||||
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
|
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
|
||||||
let mut file = std::fs::OpenOptions::new()
|
let mut file = std::fs::OpenOptions::new()
|
||||||
.read(true)
|
.read(true)
|
||||||
|
|
|
@ -1,8 +1,10 @@
|
||||||
|
//! Contains common types and functions related to serialization and integrity
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use serde::de::{self, Visitor};
|
use serde::de::{self, Visitor};
|
||||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
|
||||||
|
/// An array of 32 bytes
|
||||||
#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)]
|
#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)]
|
||||||
pub struct FixedBytes32([u8; 32]);
|
pub struct FixedBytes32([u8; 32]);
|
||||||
|
|
||||||
|
@ -61,15 +63,20 @@ impl Serialize for FixedBytes32 {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FixedBytes32 {
|
impl FixedBytes32 {
|
||||||
|
/// Access the content as a slice
|
||||||
pub fn as_slice(&self) -> &[u8] {
|
pub fn as_slice(&self) -> &[u8] {
|
||||||
&self.0[..]
|
&self.0[..]
|
||||||
}
|
}
|
||||||
|
/// Access the content as a mutable slice
|
||||||
pub fn as_slice_mut(&mut self) -> &mut [u8] {
|
pub fn as_slice_mut(&mut self) -> &mut [u8] {
|
||||||
&mut self.0[..]
|
&mut self.0[..]
|
||||||
}
|
}
|
||||||
|
/// Copy to a slice
|
||||||
pub fn to_vec(&self) -> Vec<u8> {
|
pub fn to_vec(&self) -> Vec<u8> {
|
||||||
self.0.to_vec()
|
self.0.to_vec()
|
||||||
}
|
}
|
||||||
|
/// Try building a FixedBytes32 from a slice
|
||||||
|
/// Return None if the slice is not 32 bytes long
|
||||||
pub fn try_from(by: &[u8]) -> Option<Self> {
|
pub fn try_from(by: &[u8]) -> Option<Self> {
|
||||||
if by.len() != 32 {
|
if by.len() != 32 {
|
||||||
return None;
|
return None;
|
||||||
|
@ -80,9 +87,12 @@ impl FixedBytes32 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A 32 bytes UUID
|
||||||
pub type UUID = FixedBytes32;
|
pub type UUID = FixedBytes32;
|
||||||
|
/// A 256 bit cryptographic hash, can be sha256 or blake2 depending on provenance
|
||||||
pub type Hash = FixedBytes32;
|
pub type Hash = FixedBytes32;
|
||||||
|
|
||||||
|
/// Compute the sha256 of a slice
|
||||||
pub fn sha256sum(data: &[u8]) -> Hash {
|
pub fn sha256sum(data: &[u8]) -> Hash {
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
|
|
||||||
|
@ -93,6 +103,7 @@ pub fn sha256sum(data: &[u8]) -> Hash {
|
||||||
hash.into()
|
hash.into()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compute the blake2 of a slice
|
||||||
pub fn blake2sum(data: &[u8]) -> Hash {
|
pub fn blake2sum(data: &[u8]) -> Hash {
|
||||||
use blake2::{Blake2b, Digest};
|
use blake2::{Blake2b, Digest};
|
||||||
|
|
||||||
|
@ -103,8 +114,10 @@ pub fn blake2sum(data: &[u8]) -> Hash {
|
||||||
hash.into()
|
hash.into()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A 64 bit non cryptographic hash
|
||||||
pub type FastHash = u64;
|
pub type FastHash = u64;
|
||||||
|
|
||||||
|
/// Compute a (non cryptographic) of a slice
|
||||||
pub fn fasthash(data: &[u8]) -> FastHash {
|
pub fn fasthash(data: &[u8]) -> FastHash {
|
||||||
use xxhash_rust::xxh3::Xxh3;
|
use xxhash_rust::xxh3::Xxh3;
|
||||||
|
|
||||||
|
@ -113,12 +126,14 @@ pub fn fasthash(data: &[u8]) -> FastHash {
|
||||||
h.digest()
|
h.digest()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Generate a random 32 bytes UUID
|
||||||
pub fn gen_uuid() -> UUID {
|
pub fn gen_uuid() -> UUID {
|
||||||
rand::thread_rng().gen::<[u8; 32]>().into()
|
rand::thread_rng().gen::<[u8; 32]>().into()
|
||||||
}
|
}
|
||||||
|
|
||||||
// RMP serialization with names of fields and variants
|
// RMP serialization with names of fields and variants
|
||||||
|
|
||||||
|
/// Serialize to MessagePack
|
||||||
pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
|
pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
|
||||||
where
|
where
|
||||||
T: Serialize + ?Sized,
|
T: Serialize + ?Sized,
|
||||||
|
@ -131,10 +146,13 @@ where
|
||||||
Ok(wr)
|
Ok(wr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Serialize to JSON, truncating long result
|
||||||
pub fn debug_serialize<T: Serialize>(x: T) -> String {
|
pub fn debug_serialize<T: Serialize>(x: T) -> String {
|
||||||
match serde_json::to_string(&x) {
|
match serde_json::to_string(&x) {
|
||||||
Ok(ss) => {
|
Ok(ss) => {
|
||||||
if ss.len() > 100 {
|
if ss.len() > 100 {
|
||||||
|
// TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
|
||||||
|
// (or more) codepoint
|
||||||
ss[..100].to_string()
|
ss[..100].to_string()
|
||||||
} else {
|
} else {
|
||||||
ss
|
ss
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
|
//! Module containing error types used in Garage
|
||||||
use err_derive::Error;
|
use err_derive::Error;
|
||||||
use hyper::StatusCode;
|
use hyper::StatusCode;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
use crate::data::*;
|
use crate::data::*;
|
||||||
|
|
||||||
|
/// RPC related errors
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
pub enum RPCError {
|
pub enum RPCError {
|
||||||
#[error(display = "Node is down: {:?}.", _0)]
|
#[error(display = "Node is down: {:?}.", _0)]
|
||||||
|
@ -28,6 +30,7 @@ pub enum RPCError {
|
||||||
TooManyErrors(Vec<String>),
|
TooManyErrors(Vec<String>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Regroup all Garage errors
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[error(display = "IO error: {}", _0)]
|
#[error(display = "IO error: {}", _0)]
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
//! Crate containing common functions and types used in Garage
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
|
//! Module containing helper functions to manipulate time
|
||||||
use chrono::{SecondsFormat, TimeZone, Utc};
|
use chrono::{SecondsFormat, TimeZone, Utc};
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
|
/// Returns milliseconds since UNIX Epoch
|
||||||
pub fn now_msec() -> u64 {
|
pub fn now_msec() -> u64 {
|
||||||
SystemTime::now()
|
SystemTime::now()
|
||||||
.duration_since(UNIX_EPOCH)
|
.duration_since(UNIX_EPOCH)
|
||||||
|
@ -8,6 +10,8 @@ pub fn now_msec() -> u64 {
|
||||||
.as_millis() as u64
|
.as_millis() as u64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Convert a timestamp represented as milliseconds since UNIX Epoch to
|
||||||
|
/// its RFC3339 representation, such as "2021-01-01T12:30:00Z"
|
||||||
pub fn msec_to_rfc3339(msecs: u64) -> String {
|
pub fn msec_to_rfc3339(msecs: u64) -> String {
|
||||||
let secs = msecs as i64 / 1000;
|
let secs = msecs as i64 / 1000;
|
||||||
let nanos = (msecs as i64 % 1000) as u32 * 1_000_000;
|
let nanos = (msecs as i64 % 1000) as u32 * 1_000_000;
|
||||||
|
|
|
@ -3,30 +3,37 @@ use hyper::StatusCode;
|
||||||
|
|
||||||
use garage_util::error::Error as GarageError;
|
use garage_util::error::Error as GarageError;
|
||||||
|
|
||||||
|
/// Errors of this crate
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
|
/// An error received from the API crate
|
||||||
#[error(display = "API error: {}", _0)]
|
#[error(display = "API error: {}", _0)]
|
||||||
ApiError(#[error(source)] garage_api::error::Error),
|
ApiError(#[error(source)] garage_api::Error),
|
||||||
|
|
||||||
// Category: internal error
|
// Category: internal error
|
||||||
|
/// Error internal to garage
|
||||||
#[error(display = "Internal error: {}", _0)]
|
#[error(display = "Internal error: {}", _0)]
|
||||||
InternalError(#[error(source)] GarageError),
|
InternalError(#[error(source)] GarageError),
|
||||||
|
|
||||||
|
/// The file does not exist
|
||||||
#[error(display = "Not found")]
|
#[error(display = "Not found")]
|
||||||
NotFound,
|
NotFound,
|
||||||
|
|
||||||
// Category: bad request
|
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
|
||||||
#[error(display = "Invalid UTF-8: {}", _0)]
|
#[error(display = "Invalid UTF-8: {}", _0)]
|
||||||
InvalidUTF8(#[error(source)] std::str::Utf8Error),
|
InvalidUTF8(#[error(source)] std::str::Utf8Error),
|
||||||
|
|
||||||
|
/// The client send a header with invalid value
|
||||||
#[error(display = "Invalid header value: {}", _0)]
|
#[error(display = "Invalid header value: {}", _0)]
|
||||||
InvalidHeader(#[error(source)] hyper::header::ToStrError),
|
InvalidHeader(#[error(source)] hyper::header::ToStrError),
|
||||||
|
|
||||||
|
/// The client sent a request without host, or with unsupported method
|
||||||
#[error(display = "Bad request: {}", _0)]
|
#[error(display = "Bad request: {}", _0)]
|
||||||
BadRequest(String),
|
BadRequest(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Error {
|
impl Error {
|
||||||
|
/// Transform errors into http status code
|
||||||
pub fn http_status_code(&self) -> StatusCode {
|
pub fn http_status_code(&self) -> StatusCode {
|
||||||
match self {
|
match self {
|
||||||
Error::NotFound => StatusCode::NOT_FOUND,
|
Error::NotFound => StatusCode::NOT_FOUND,
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
|
//! Crate for handling web serving of s3 bucket
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
|
||||||
pub mod error;
|
mod error;
|
||||||
|
pub use error::Error;
|
||||||
|
|
||||||
pub mod web_server;
|
mod web_server;
|
||||||
|
pub use web_server::run_web_server;
|
||||||
|
|
|
@ -18,6 +18,7 @@ use garage_model::garage::Garage;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
use garage_util::error::Error as GarageError;
|
use garage_util::error::Error as GarageError;
|
||||||
|
|
||||||
|
/// Run a web server
|
||||||
pub async fn run_web_server(
|
pub async fn run_web_server(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
shutdown_signal: impl Future<Output = ()>,
|
shutdown_signal: impl Future<Output = ()>,
|
||||||
|
|
Loading…
Reference in a new issue