From f4bf9876278ff59fb7105203ab5f75bbcaa53ded Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Sat, 20 Mar 2021 20:38:44 +0100 Subject: [PATCH 01/10] document util crate --- src/util/background.rs | 10 +++++++--- src/util/config.rs | 27 +++++++++++++++++++++++++++ src/util/data.rs | 18 ++++++++++++++++++ src/util/error.rs | 4 ++++ src/util/lib.rs | 3 +++ src/util/time.rs | 4 ++++ 6 files changed, 63 insertions(+), 3 deletions(-) diff --git a/src/util/background.rs b/src/util/background.rs index b5eb8bc8..3cbcac2a 100644 --- a/src/util/background.rs +++ b/src/util/background.rs @@ -1,3 +1,4 @@ +//! Job runner for futures and async functions use core::future::Future; use std::pin::Pin; use std::sync::Arc; @@ -12,14 +13,15 @@ use crate::error::Error; type JobOutput = Result<(), Error>; type Job = Pin + Send>>; +/// Job runner for futures and async functions pub struct BackgroundRunner { - pub stop_signal: watch::Receiver, - + stop_signal: watch::Receiver, queue_in: mpsc::UnboundedSender<(Job, bool)>, worker_in: mpsc::UnboundedSender>, } impl BackgroundRunner { + /// Create a new BackgroundRunner pub fn new( n_runners: usize, stop_signal: watch::Receiver, @@ -103,7 +105,7 @@ impl BackgroundRunner { (bgrunner, await_all_done) } - // Spawn a task to be run in background + /// Spawn a task to be run in background pub fn spawn(&self, job: T) where T: Future + Send + 'static, @@ -115,6 +117,8 @@ impl BackgroundRunner { .unwrap(); } + /// Spawn a task to be run in background. It may get discarded before running if spawned while + /// the runner is stopping pub fn spawn_cancellable(&self, job: T) where T: Future + Send + 'static, diff --git a/src/util/config.rs b/src/util/config.rs index 9ff67711..fcbb32a9 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -1,3 +1,4 @@ +//! Contains type and functions related to Garage configuration file use std::io::Read; use std::net::SocketAddr; use std::path::PathBuf; @@ -6,57 +7,82 @@ use serde::{de, Deserialize}; use crate::error::Error; +/// Represent the whole configuration #[derive(Deserialize, Debug, Clone)] pub struct Config { + /// Path where to store metadata. Should be fast, but low volume pub metadata_dir: PathBuf, + /// Path where to store data. Can be slower, but need higher volume pub data_dir: PathBuf, + /// Address to bind for RPC pub rpc_bind_addr: SocketAddr, + /// Bootstrap peers RPC address #[serde(deserialize_with = "deserialize_vec_addr")] pub bootstrap_peers: Vec, + /// Consule host to connect to to discover more peers pub consul_host: Option, + /// Consul service name to use pub consul_service_name: Option, + /// Max number of concurrent RPC request #[serde(default = "default_max_concurrent_rpc_requests")] pub max_concurrent_rpc_requests: usize, + /// Size of data blocks to save to disk #[serde(default = "default_block_size")] pub block_size: usize, #[serde(default = "default_control_write_max_faults")] pub control_write_max_faults: usize, + /// How many nodes should hold a copy of meta data #[serde(default = "default_replication_factor")] pub meta_replication_factor: usize, + /// How many nodes should hold a copy of data #[serde(default = "default_replication_factor")] pub data_replication_factor: usize, + /// Configuration for RPC TLS pub rpc_tls: Option, + /// Configuration for S3 api pub s3_api: ApiConfig, + /// Configuration for serving files as normal web server pub s3_web: WebConfig, } +/// Configuration for RPC TLS #[derive(Deserialize, Debug, Clone)] pub struct TlsConfig { + /// Path to certificate autority used for all nodes pub ca_cert: String, + /// Path to public certificate for this node pub node_cert: String, + /// Path to private key for this node pub node_key: String, } +/// Configuration for S3 api #[derive(Deserialize, Debug, Clone)] pub struct ApiConfig { + /// Address and port to bind for api serving pub api_bind_addr: SocketAddr, + /// S3 region to use pub s3_region: String, } +/// Configuration for serving files as normal web server #[derive(Deserialize, Debug, Clone)] pub struct WebConfig { + /// Address and port to bind for web serving pub bind_addr: SocketAddr, + /// Suffix to remove from domain name to find bucket pub root_domain: String, + /// Suffix to add when user-agent request path end with "/" pub index: String, } @@ -73,6 +99,7 @@ fn default_control_write_max_faults() -> usize { 1 } +/// Read and parse configuration pub fn read_config(config_file: PathBuf) -> Result { let mut file = std::fs::OpenOptions::new() .read(true) diff --git a/src/util/data.rs b/src/util/data.rs index 8cd6dd96..b269edac 100644 --- a/src/util/data.rs +++ b/src/util/data.rs @@ -1,8 +1,10 @@ +//! Contains common types and functions related to serialization and integrity use rand::Rng; use serde::de::{self, Visitor}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::fmt; +/// An array of 32 bytes #[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)] pub struct FixedBytes32([u8; 32]); @@ -61,15 +63,20 @@ impl Serialize for FixedBytes32 { } impl FixedBytes32 { + /// Access the content as a slice pub fn as_slice(&self) -> &[u8] { &self.0[..] } + /// Access the content as a mutable slice pub fn as_slice_mut(&mut self) -> &mut [u8] { &mut self.0[..] } + /// Copy to a slice pub fn to_vec(&self) -> Vec { self.0.to_vec() } + /// Try building a FixedBytes32 from a slice + /// Return None if the slice is not 32 bytes long pub fn try_from(by: &[u8]) -> Option { if by.len() != 32 { return None; @@ -80,9 +87,12 @@ impl FixedBytes32 { } } +/// A 32 bytes UUID pub type UUID = FixedBytes32; +/// A 256 bit cryptographic hash, can be sha256 or blake2 depending on provenance pub type Hash = FixedBytes32; +/// Compute the sha256 of a slice pub fn sha256sum(data: &[u8]) -> Hash { use sha2::{Digest, Sha256}; @@ -93,6 +103,7 @@ pub fn sha256sum(data: &[u8]) -> Hash { hash.into() } +/// Compute the blake2 of a slice pub fn blake2sum(data: &[u8]) -> Hash { use blake2::{Blake2b, Digest}; @@ -103,8 +114,10 @@ pub fn blake2sum(data: &[u8]) -> Hash { hash.into() } +/// A 64 bit non cryptographic hash pub type FastHash = u64; +/// Compute a (non cryptographic) of a slice pub fn fasthash(data: &[u8]) -> FastHash { use xxhash_rust::xxh3::Xxh3; @@ -113,12 +126,14 @@ pub fn fasthash(data: &[u8]) -> FastHash { h.digest() } +/// Generate a random 32 bytes UUID pub fn gen_uuid() -> UUID { rand::thread_rng().gen::<[u8; 32]>().into() } // RMP serialization with names of fields and variants +/// Serialize to MessagePack pub fn rmp_to_vec_all_named(val: &T) -> Result, rmp_serde::encode::Error> where T: Serialize + ?Sized, @@ -131,10 +146,13 @@ where Ok(wr) } +/// Serialize to JSON, truncating long result pub fn debug_serialize(x: T) -> String { match serde_json::to_string(&x) { Ok(ss) => { if ss.len() > 100 { + // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes + // (or more) codepoint ss[..100].to_string() } else { ss diff --git a/src/util/error.rs b/src/util/error.rs index a9bf0824..13cbeba3 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -1,9 +1,12 @@ +//! Module containing error types used in Garage +#![allow(missing_docs)] use err_derive::Error; use hyper::StatusCode; use std::io; use crate::data::*; +/// RPC related errors #[derive(Debug, Error)] pub enum RPCError { #[error(display = "Node is down: {:?}.", _0)] @@ -28,6 +31,7 @@ pub enum RPCError { TooManyErrors(Vec), } +/// Regroup all Garage errors #[derive(Debug, Error)] pub enum Error { #[error(display = "IO error: {}", _0)] diff --git a/src/util/lib.rs b/src/util/lib.rs index 055e9ab0..faacd9c1 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -1,3 +1,6 @@ +#![warn(missing_crate_level_docs, missing_docs)] +//! Crate containing common functions and types used in Garage + #[macro_use] extern crate log; diff --git a/src/util/time.rs b/src/util/time.rs index 148860e0..dfedcb26 100644 --- a/src/util/time.rs +++ b/src/util/time.rs @@ -1,6 +1,8 @@ +//! Module containing helper functions to manipulate time use chrono::{SecondsFormat, TimeZone, Utc}; use std::time::{SystemTime, UNIX_EPOCH}; +/// Returns milliseconds since UNIX Epoch pub fn now_msec() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) @@ -8,6 +10,8 @@ pub fn now_msec() -> u64 { .as_millis() as u64 } +/// Convert a timestamp represented as milliseconds since UNIX Epoch to +/// its RFC3339 representation, such as "2021-01-01T12:30:00Z" pub fn msec_to_rfc3339(msecs: u64) -> String { let secs = msecs as i64 / 1000; let nanos = (msecs as i64 % 1000) as u32 * 1_000_000; From 7d3a95183658afea1aeb0cbd435bdec4ebfc71e8 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Mon, 22 Mar 2021 00:00:09 +0100 Subject: [PATCH 02/10] document rpc crate --- src/garage/cli.rs | 2 +- src/rpc/lib.rs | 5 ++++- src/rpc/membership.rs | 34 +++++++++++++++++++++++++++++++++- src/rpc/ring.rs | 34 ++++++++++++++++++++++++++++------ src/rpc/rpc_client.rs | 27 ++++++++++++++++++++++++++- src/rpc/rpc_server.rs | 8 ++++++++ 6 files changed, 100 insertions(+), 10 deletions(-) diff --git a/src/garage/cli.rs b/src/garage/cli.rs index eb8275a9..55cd222b 100644 --- a/src/garage/cli.rs +++ b/src/garage/cli.rs @@ -80,7 +80,7 @@ pub struct ConfigureNodeOpt { #[structopt(short = "c", long = "capacity")] capacity: Option, - /// Optionnal node tag + /// Optional node tag #[structopt(short = "t", long = "tag")] tag: Option, diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 00e31f57..38326077 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -1,7 +1,10 @@ +#![deny(missing_crate_level_docs, missing_docs)] +//! Crate containing rpc related functions and types used in Garage + #[macro_use] extern crate log; -pub mod consul; +mod consul; pub(crate) mod tls_util; pub mod membership; diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 9fb24ad4..c465ce68 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -1,3 +1,4 @@ +/// Module containing structs related to membership management use std::collections::HashMap; use std::fmt::Write as FmtWrite; use std::io::{Read, Write}; @@ -30,20 +31,29 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const PING_TIMEOUT: Duration = Duration::from_secs(2); const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5; +/// RPC endpoint used for calls related to membership pub const MEMBERSHIP_RPC_PATH: &str = "_membership"; +/// RPC messages related to membership #[derive(Debug, Serialize, Deserialize)] pub enum Message { + /// Response to successfull advertisements Ok, + /// Message sent to detect other nodes status Ping(PingMessage), + /// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp PullStatus, + /// Ask other node its config. Answered with AdvertiseConfig PullConfig, + /// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus AdvertiseNodesUp(Vec), + /// Advertisement of nodes config. Sent spontanously or in response to PullConfig AdvertiseConfig(NetworkConfig), } impl RpcMessage for Message {} +/// A ping, containing informations about status and config #[derive(Debug, Serialize, Deserialize)] pub struct PingMessage { id: UUID, @@ -55,18 +65,25 @@ pub struct PingMessage { state_info: StateInfo, } +/// A node advertisement #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AdvertisedNode { + /// Id of the node this advertisement relates to pub id: UUID, + /// IP and port of the node pub addr: SocketAddr, + /// Is the node considered up pub is_up: bool, + /// When was the node last seen up, in milliseconds since UNIX epoch pub last_seen: u64, pub state_info: StateInfo, } +/// This node's membership manager pub struct System { + /// The id of this node pub id: UUID, persist_config: Persister, @@ -79,10 +96,12 @@ pub struct System { rpc_client: Arc>, pub(crate) status: watch::Receiver>, + /// The ring, viewed by this node pub ring: watch::Receiver>, update_lock: Mutex, + /// The job runner of this node pub background: Arc, } @@ -91,21 +110,30 @@ struct Updaters { update_ring: watch::Sender>, } +/// The status of each nodes, viewed by this node #[derive(Debug, Clone)] pub struct Status { + /// Mapping of each node id to its known status + // considering its sorted regularly, maybe it should be a btreeset? pub nodes: HashMap>, + /// Hash of this entry pub hash: Hash, } +/// The status of a single node #[derive(Debug)] pub struct StatusEntry { + /// The IP and port used to connect to this node pub addr: SocketAddr, + /// Last time this node was seen pub last_seen: u64, + /// Number of consecutive pings sent without reply to this node pub num_failures: AtomicUsize, pub state_info: StateInfo, } impl StatusEntry { + /// is the node associated to this entry considered up pub fn is_up(&self) -> bool { self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN } @@ -195,6 +223,7 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result { } impl System { + /// Create this node's membership manager pub fn new( metadata_dir: PathBuf, rpc_http_client: Arc, @@ -279,6 +308,7 @@ impl System { }); } + /// Get an RPC client pub fn rpc_client(self: &Arc, path: &str) -> Arc> { RpcClient::new( RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()), @@ -287,6 +317,7 @@ impl System { ) } + /// Save network configuration to disc async fn save_network_config(self: Arc) -> Result<(), Error> { let ring = self.ring.borrow().clone(); self.persist_config @@ -319,6 +350,7 @@ impl System { self.rpc_client.call_many(&to[..], msg, timeout).await; } + /// Perform bootstraping, starting the ping loop pub async fn bootstrap( self: Arc, peers: Vec, @@ -348,7 +380,7 @@ impl System { id_option, addr, sys.rpc_client - .by_addr() + .get_addr() .call(&addr, ping_msg_ref, PING_TIMEOUT) .await, ) diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index 2e997523..6f341fa8 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -1,3 +1,4 @@ +//! Module containing types related to computing nodes which should receive a copy of data blocks use std::collections::{HashMap, HashSet}; use std::convert::TryInto; @@ -8,23 +9,30 @@ use garage_util::data::*; // A partition number is encoded on 16 bits, // i.e. we have up to 2**16 partitions. // (in practice we have exactly 2**PARTITION_BITS partitions) +/// A partition id, stored on 16 bits pub type Partition = u16; // TODO: make this constant parametrizable in the config file // For deployments with many nodes it might make sense to bump // it up to 10. // Maximum value : 16 +/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in +/// presence of numerous nodes, but exponentially bigger ring. Max 16 pub const PARTITION_BITS: usize = 8; const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS); // TODO: make this constant paraetrizable in the config file // (most deployments use a replication factor of 3, so...) +/// The maximum number of time an object might get replicated pub const MAX_REPLICATION: usize = 3; +/// The versionned configurations of all nodes known in the network #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkConfig { + /// Map of each node's id to it's configuration pub members: HashMap, + /// Version of this config pub version: u64, } @@ -37,26 +45,40 @@ impl NetworkConfig { } } +/// The overall configuration of one (possibly remote) node #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkConfigEntry { + /// Datacenter at which this entry belong. This infromation might be used to perform a better + /// geodistribution pub datacenter: String, + /// The (relative) capacity of the node pub capacity: u32, + /// A tag to recognize the entry, not used for other things than display pub tag: String, } +/// A ring distributing fairly objects to nodes #[derive(Clone)] pub struct Ring { + /// The network configuration used to generate this ring pub config: NetworkConfig, + /// The list of entries in the ring pub ring: Vec, } +/// An entry in the ring #[derive(Clone, Debug)] pub struct RingEntry { + /// The prefix of the Hash of object which should use this entry pub location: Hash, + /// The nodes in which a matching object should get stored pub nodes: [UUID; MAX_REPLICATION], } impl Ring { + // TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6 + // levels of imbrication. It is basically impossible to test, maintain, or understand for an + // outsider. pub(crate) fn new(config: NetworkConfig) -> Self { // Create a vector of partition indices (0 to 2**PARTITION_BITS-1) let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::>(); @@ -166,20 +188,16 @@ impl Ring { }) .collect::>(); - // eprintln!("RING: --"); - // for e in ring.iter() { - // eprintln!("{:?}", e); - // } - // eprintln!("END --"); - Self { config, ring } } + /// Get the partition in which data would fall on pub fn partition_of(&self, from: &Hash) -> Partition { let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); top >> (16 - PARTITION_BITS) } + /// Get the list of partitions and pub fn partitions(&self) -> Vec<(Partition, Hash)> { let mut ret = vec![]; @@ -193,6 +211,7 @@ impl Ring { ret } + /// Walk the ring to find the n servers in which data should be replicated pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec { if self.ring.len() != 1 << PARTITION_BITS { warn!("Ring not yet ready, read/writes will be lost!"); @@ -201,12 +220,15 @@ impl Ring { let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); let partition_idx = (top >> (16 - PARTITION_BITS)) as usize; + // TODO why computing two time in the same way and asserting? assert_eq!(partition_idx, self.partition_of(from) as usize); let partition = &self.ring[partition_idx]; let partition_top = u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap()); + // TODO is this an assertion on the validity of PARTITION_MASK_U16? If so, it should + // probably be a test more than a runtime assertion assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16); assert!(n <= partition.nodes.len()); diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs index eb4f6620..261dec7a 100644 --- a/src/rpc/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -1,3 +1,4 @@ +//! Contain structs related to making RPCs use std::borrow::Borrow; use std::marker::PhantomData; use std::net::SocketAddr; @@ -26,14 +27,19 @@ use crate::tls_util; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); +/// Strategy to apply when making RPC #[derive(Copy, Clone)] pub struct RequestStrategy { + /// Max time to wait for reponse pub rs_timeout: Duration, + /// Min number of response to consider the request successful pub rs_quorum: usize, + /// Should requests be dropped after enough response are received pub rs_interrupt_after_quorum: bool, } impl RequestStrategy { + /// Create a RequestStrategy with default timeout and not interrupting when quorum reached pub fn with_quorum(quorum: usize) -> Self { RequestStrategy { rs_timeout: DEFAULT_TIMEOUT, @@ -41,19 +47,24 @@ impl RequestStrategy { rs_interrupt_after_quorum: false, } } + /// Set timeout of the strategy pub fn with_timeout(mut self, timeout: Duration) -> Self { self.rs_timeout = timeout; self } + /// Set if requests can be dropped after quorum has been reached pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self { self.rs_interrupt_after_quorum = interrupt; self } } +/// Shortcut for a boxed async function taking a message, and resolving to another message or an +/// error pub type LocalHandlerFn = Box) -> Pin> + Send>> + Send + Sync>; +/// Client used to send RPC pub struct RpcClient { status: watch::Receiver>, background: Arc, @@ -64,6 +75,7 @@ pub struct RpcClient { } impl RpcClient { + /// Create a new RpcClient from an address, a job runner, and the status of all RPC servers pub fn new( rac: RpcAddrClient, background: Arc, @@ -77,6 +89,7 @@ impl RpcClient { }) } + /// Set the local handler, to process RPC to this node without network usage pub fn set_local_handler(&self, my_id: UUID, handler: F) where F: Fn(Arc) -> Fut + Send + Sync + 'static, @@ -90,14 +103,17 @@ impl RpcClient { self.local_handler.swap(Some(Arc::new((my_id, handler)))); } - pub fn by_addr(&self) -> &RpcAddrClient { + /// Get the server address this client connect to + pub fn get_addr(&self) -> &RpcAddrClient { &self.rpc_addr_client } + /// Make a RPC call pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result { self.call_arc(to, Arc::new(msg), timeout).await } + /// Make a RPC call from a message stored in an Arc pub async fn call_arc(&self, to: UUID, msg: Arc, timeout: Duration) -> Result { if let Some(lh) = self.local_handler.load_full() { let (my_id, local_handler) = lh.as_ref(); @@ -135,6 +151,7 @@ impl RpcClient { } } + /// Make a RPC call to multiple servers, returning a Vec containing each result pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec> { let msg = Arc::new(msg); let mut resp_stream = to @@ -149,6 +166,8 @@ impl RpcClient { results } + /// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if + /// strategy could not be respected due to to many errors pub async fn try_call_many( self: &Arc, to: &[UUID], @@ -208,6 +227,7 @@ impl RpcClient { } } +/// Endpoint to which send RPC pub struct RpcAddrClient { phantom: PhantomData, @@ -216,6 +236,7 @@ pub struct RpcAddrClient { } impl RpcAddrClient { + /// Create an RpcAddrClient from an HTTP client and the endpoint to reach for RPCs pub fn new(http_client: Arc, path: String) -> Self { Self { phantom: PhantomData::default(), @@ -224,6 +245,7 @@ impl RpcAddrClient { } } + /// Make a RPC pub async fn call( &self, to_addr: &SocketAddr, @@ -239,6 +261,7 @@ impl RpcAddrClient { } } +/// HTTP client used to make RPCs pub struct RpcHttpClient { request_limiter: Semaphore, method: ClientMethod, @@ -250,6 +273,7 @@ enum ClientMethod { } impl RpcHttpClient { + /// Create a new RpcHttpClient pub fn new( max_concurrent_requests: usize, tls_config: &Option, @@ -280,6 +304,7 @@ impl RpcHttpClient { }) } + /// Make a RPC async fn call( &self, path: &str, diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs index 0d82d796..4419a6f0 100644 --- a/src/rpc/rpc_server.rs +++ b/src/rpc/rpc_server.rs @@ -1,3 +1,4 @@ +//! Contains structs related to receiving RPCs use std::collections::HashMap; use std::net::SocketAddr; use std::pin::Pin; @@ -22,13 +23,17 @@ use garage_util::error::Error; use crate::tls_util; +/// Trait for messages that can be sent as RPC pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {} type ResponseFuture = Pin, Error>> + Send>>; type Handler = Box, SocketAddr) -> ResponseFuture + Send + Sync>; +/// Structure handling RPCs pub struct RpcServer { + /// The address the RpcServer will bind pub bind_addr: SocketAddr, + /// The tls configuration used for RPC pub tls_config: Option, handlers: HashMap, @@ -87,6 +92,7 @@ where } impl RpcServer { + /// Create a new RpcServer pub fn new(bind_addr: SocketAddr, tls_config: Option) -> Self { Self { bind_addr, @@ -95,6 +101,7 @@ impl RpcServer { } } + /// Add handler handling request made to `name` pub fn add_handler(&mut self, name: String, handler: F) where M: RpcMessage + 'static, @@ -156,6 +163,7 @@ impl RpcServer { } } + /// Run the RpcServer pub async fn run( self: Arc, shutdown_signal: impl Future, From b476b702c8a1e4027a1abc16d54afbd61fdcf984 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Mon, 22 Mar 2021 00:01:44 +0100 Subject: [PATCH 03/10] run cargo fmt on util and make missing doc warning --- src/rpc/lib.rs | 2 +- src/util/background.rs | 6 +++--- src/util/config.rs | 42 +++++++++++++++++++++--------------------- src/util/data.rs | 14 +++++++------- 4 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 38326077..308baa83 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -1,4 +1,4 @@ -#![deny(missing_crate_level_docs, missing_docs)] +#![warn(missing_crate_level_docs, missing_docs)] //! Crate containing rpc related functions and types used in Garage #[macro_use] diff --git a/src/util/background.rs b/src/util/background.rs index 3cbcac2a..bfdaaf1e 100644 --- a/src/util/background.rs +++ b/src/util/background.rs @@ -21,7 +21,7 @@ pub struct BackgroundRunner { } impl BackgroundRunner { - /// Create a new BackgroundRunner + /// Create a new BackgroundRunner pub fn new( n_runners: usize, stop_signal: watch::Receiver, @@ -117,8 +117,8 @@ impl BackgroundRunner { .unwrap(); } - /// Spawn a task to be run in background. It may get discarded before running if spawned while - /// the runner is stopping + /// Spawn a task to be run in background. It may get discarded before running if spawned while + /// the runner is stopping pub fn spawn_cancellable(&self, job: T) where T: Future + Send + 'static, diff --git a/src/util/config.rs b/src/util/config.rs index fcbb32a9..bb70467b 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -10,79 +10,79 @@ use crate::error::Error; /// Represent the whole configuration #[derive(Deserialize, Debug, Clone)] pub struct Config { - /// Path where to store metadata. Should be fast, but low volume + /// Path where to store metadata. Should be fast, but low volume pub metadata_dir: PathBuf, - /// Path where to store data. Can be slower, but need higher volume + /// Path where to store data. Can be slower, but need higher volume pub data_dir: PathBuf, - /// Address to bind for RPC + /// Address to bind for RPC pub rpc_bind_addr: SocketAddr, - /// Bootstrap peers RPC address + /// Bootstrap peers RPC address #[serde(deserialize_with = "deserialize_vec_addr")] pub bootstrap_peers: Vec, - /// Consule host to connect to to discover more peers + /// Consule host to connect to to discover more peers pub consul_host: Option, - /// Consul service name to use + /// Consul service name to use pub consul_service_name: Option, - /// Max number of concurrent RPC request + /// Max number of concurrent RPC request #[serde(default = "default_max_concurrent_rpc_requests")] pub max_concurrent_rpc_requests: usize, - /// Size of data blocks to save to disk + /// Size of data blocks to save to disk #[serde(default = "default_block_size")] pub block_size: usize, #[serde(default = "default_control_write_max_faults")] pub control_write_max_faults: usize, - /// How many nodes should hold a copy of meta data + /// How many nodes should hold a copy of meta data #[serde(default = "default_replication_factor")] pub meta_replication_factor: usize, - /// How many nodes should hold a copy of data + /// How many nodes should hold a copy of data #[serde(default = "default_replication_factor")] pub data_replication_factor: usize, - /// Configuration for RPC TLS + /// Configuration for RPC TLS pub rpc_tls: Option, - /// Configuration for S3 api + /// Configuration for S3 api pub s3_api: ApiConfig, - /// Configuration for serving files as normal web server + /// Configuration for serving files as normal web server pub s3_web: WebConfig, } /// Configuration for RPC TLS #[derive(Deserialize, Debug, Clone)] pub struct TlsConfig { - /// Path to certificate autority used for all nodes + /// Path to certificate autority used for all nodes pub ca_cert: String, - /// Path to public certificate for this node + /// Path to public certificate for this node pub node_cert: String, - /// Path to private key for this node + /// Path to private key for this node pub node_key: String, } /// Configuration for S3 api #[derive(Deserialize, Debug, Clone)] pub struct ApiConfig { - /// Address and port to bind for api serving + /// Address and port to bind for api serving pub api_bind_addr: SocketAddr, - /// S3 region to use + /// S3 region to use pub s3_region: String, } /// Configuration for serving files as normal web server #[derive(Deserialize, Debug, Clone)] pub struct WebConfig { - /// Address and port to bind for web serving + /// Address and port to bind for web serving pub bind_addr: SocketAddr, - /// Suffix to remove from domain name to find bucket + /// Suffix to remove from domain name to find bucket pub root_domain: String, - /// Suffix to add when user-agent request path end with "/" + /// Suffix to add when user-agent request path end with "/" pub index: String, } diff --git a/src/util/data.rs b/src/util/data.rs index b269edac..34ee8a18 100644 --- a/src/util/data.rs +++ b/src/util/data.rs @@ -63,20 +63,20 @@ impl Serialize for FixedBytes32 { } impl FixedBytes32 { - /// Access the content as a slice + /// Access the content as a slice pub fn as_slice(&self) -> &[u8] { &self.0[..] } - /// Access the content as a mutable slice + /// Access the content as a mutable slice pub fn as_slice_mut(&mut self) -> &mut [u8] { &mut self.0[..] } - /// Copy to a slice + /// Copy to a slice pub fn to_vec(&self) -> Vec { self.0.to_vec() } - /// Try building a FixedBytes32 from a slice - /// Return None if the slice is not 32 bytes long + /// Try building a FixedBytes32 from a slice + /// Return None if the slice is not 32 bytes long pub fn try_from(by: &[u8]) -> Option { if by.len() != 32 { return None; @@ -151,8 +151,8 @@ pub fn debug_serialize(x: T) -> String { match serde_json::to_string(&x) { Ok(ss) => { if ss.len() > 100 { - // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes - // (or more) codepoint + // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes + // (or more) codepoint ss[..100].to_string() } else { ss From 30bec0758b943351d946234329b9a46bd83749a1 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Fri, 26 Mar 2021 19:41:46 +0100 Subject: [PATCH 04/10] attempt at documenting table crate --- src/model/block.rs | 2 +- src/model/bucket_table.rs | 4 ++++ src/model/garage.rs | 4 ++-- src/model/key_table.rs | 4 ++++ src/model/object_table.rs | 2 +- src/model/version_table.rs | 2 +- src/table/crdt/lww.rs | 2 +- src/table/crdt/map.rs | 1 + src/table/gc.rs | 2 +- src/table/lib.rs | 7 ++++--- src/table/replication/fullcopy.rs | 12 ++++++------ src/table/replication/mod.rs | 6 ++++-- src/table/replication/parameters.rs | 10 ++++++++-- src/table/replication/sharded.rs | 17 ++++++++++------- src/table/schema.rs | 19 +++++++++++++++---- src/web/lib.rs | 3 ++- 16 files changed, 65 insertions(+), 32 deletions(-) diff --git a/src/model/block.rs b/src/model/block.rs index 0d9af38f..2b145615 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -18,7 +18,7 @@ use garage_rpc::membership::System; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; -use garage_table::replication::{sharded::TableShardedReplication, TableReplication}; +use garage_table::replication::{TableReplication, TableShardedReplication}; use crate::block_ref_table::*; diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 6330dced..2ede4904 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -100,6 +100,10 @@ impl TableSchema for BucketTable { type E = Bucket; type Filter = DeletedFilter; + fn updated(&self, _old: Option, _new: Option) { + // nothing to do when updated + } + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { filter.apply(entry.is_deleted()) } diff --git a/src/model/garage.rs b/src/model/garage.rs index 5f7a67c9..3f51f9fe 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -7,8 +7,8 @@ use garage_rpc::membership::System; use garage_rpc::rpc_client::RpcHttpClient; use garage_rpc::rpc_server::RpcServer; -use garage_table::replication::fullcopy::*; -use garage_table::replication::sharded::*; +use garage_table::replication::TableFullReplication; +use garage_table::replication::TableShardedReplication; use garage_table::*; use crate::block::*; diff --git a/src/model/key_table.rs b/src/model/key_table.rs index fcca3835..e6ebe8de 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -113,6 +113,10 @@ impl TableSchema for KeyTable { type E = Key; type Filter = KeyFilter; + fn updated(&self, _old: Option, _new: Option) { + // nothing to do when updated + } + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { match filter { KeyFilter::Deleted(df) => df.apply(entry.deleted.get()), diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 34ac798a..d5be62e5 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -6,7 +6,7 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_table::crdt::*; -use garage_table::replication::sharded::*; +use garage_table::replication::TableShardedReplication; use garage_table::*; use crate::version_table::*; diff --git a/src/model/version_table.rs b/src/model/version_table.rs index 841fbfea..fabd1fb1 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -5,7 +5,7 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_table::crdt::*; -use garage_table::replication::sharded::*; +use garage_table::replication::TableShardedReplication; use garage_table::*; use crate::block_ref_table::*; diff --git a/src/table/crdt/lww.rs b/src/table/crdt/lww.rs index 25ecdb07..3b1b2406 100644 --- a/src/table/crdt/lww.rs +++ b/src/table/crdt/lww.rs @@ -34,7 +34,7 @@ use crate::crdt::crdt::*; /// and may differ from what you observed with your atomic clock! /// /// This scheme is used by AWS S3 or Soundcloud and often without knowing -/// in entreprise when reconciliating databases with ad-hoc scripts. +/// in enterprise when reconciliating databases with ad-hoc scripts. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct LWW { ts: u64, diff --git a/src/table/crdt/map.rs b/src/table/crdt/map.rs index 1193e6db..c4a30a26 100644 --- a/src/table/crdt/map.rs +++ b/src/table/crdt/map.rs @@ -37,6 +37,7 @@ where Self { vals: vec![(k, v)] } } + /// Add a value to the map pub fn put(&mut self, k: K, v: V) { self.merge(&Self::put_mutator(k, v)); } diff --git a/src/table/gc.rs b/src/table/gc.rs index e52bf599..694a3789 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -74,7 +74,7 @@ where while !*must_exit.borrow() { match self.gc_loop_iter().await { Ok(true) => { - // Stuff was done, loop imediately + // Stuff was done, loop immediately continue; } Ok(false) => { diff --git a/src/table/lib.rs b/src/table/lib.rs index 3b73163b..8dcb115d 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -1,3 +1,4 @@ +#![warn(missing_docs)] #![recursion_limit = "1024"] #[macro_use] @@ -8,10 +9,10 @@ pub mod schema; pub mod util; pub mod data; -pub mod gc; -pub mod merkle; +mod gc; +mod merkle; pub mod replication; -pub mod sync; +mod sync; pub mod table; pub use schema::*; diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index bd658f63..a6b4c98c 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -6,19 +6,19 @@ use garage_util::data::*; use crate::replication::*; +/// Full replication schema: all nodes store everything +/// Writes are disseminated in an epidemic manner in the network +/// Advantage: do all reads locally, extremely fast +/// Inconvenient: only suitable to reasonably small tables #[derive(Clone)] pub struct TableFullReplication { + /// The membership manager of this node pub system: Arc, + /// Max number of faults allowed while replicating a record pub max_faults: usize, } impl TableReplication for TableFullReplication { - // Full replication schema: all nodes store everything - // Writes are disseminated in an epidemic manner in the network - - // Advantage: do all reads locally, extremely fast - // Inconvenient: only suitable to reasonably small tables - fn read_nodes(&self, _hash: &Hash) -> Vec { vec![self.system.id] } diff --git a/src/table/replication/mod.rs b/src/table/replication/mod.rs index d43d7f19..dfcb026a 100644 --- a/src/table/replication/mod.rs +++ b/src/table/replication/mod.rs @@ -1,6 +1,8 @@ mod parameters; -pub mod fullcopy; -pub mod sharded; +mod fullcopy; +mod sharded; +pub use fullcopy::TableFullReplication; pub use parameters::*; +pub use sharded::TableShardedReplication; diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index e46bd172..0ab9ee5a 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -2,20 +2,26 @@ use garage_rpc::ring::*; use garage_util::data::*; +/// Trait to describe how a table shall be replicated pub trait TableReplication: Send + Sync { // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods - // Which nodes to send reads from + /// Which nodes to send read requests to fn read_nodes(&self, hash: &Hash) -> Vec; + /// Responses needed to consider a read succesfull fn read_quorum(&self) -> usize; - // Which nodes to send writes to + /// Which nodes to send writes to fn write_nodes(&self, hash: &Hash) -> Vec; + /// Responses needed to consider a write succesfull fn write_quorum(&self) -> usize; + // this feels like its write_nodes().len() - write_quorum() fn max_write_errors(&self) -> usize; // Accessing partitions, for Merkle tree & sync + /// Get partition for data with given hash fn partition_of(&self, hash: &Hash) -> Partition; + /// List of existing partitions fn partitions(&self) -> Vec<(Partition, Hash)>; } diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index dce74b03..f2d89729 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -6,22 +6,25 @@ use garage_util::data::*; use crate::replication::*; +/// Sharded replication schema: +/// - based on the ring of nodes, a certain set of neighbors +/// store entries, given as a function of the position of the +/// entry's hash in the ring +/// - reads are done on all of the nodes that replicate the data +/// - writes as well #[derive(Clone)] pub struct TableShardedReplication { + /// The membership manager of this node pub system: Arc, + /// How many time each data should be replicated pub replication_factor: usize, + /// How many nodes to contact for a read, should be at most `replication_factor` pub read_quorum: usize, + /// How many nodes to contact for a write, should be at most `replication_factor` pub write_quorum: usize, } impl TableReplication for TableShardedReplication { - // Sharded replication schema: - // - based on the ring of nodes, a certain set of neighbors - // store entries, given as a function of the position of the - // entry's hash in the ring - // - reads are done on all of the nodes that replicate the data - // - writes as well - fn read_nodes(&self, hash: &Hash) -> Vec { let ring = self.system.ring.borrow().clone(); ring.walk_ring(&hash, self.replication_factor) diff --git a/src/table/schema.rs b/src/table/schema.rs index 4d754664..f5fde95f 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -4,7 +4,9 @@ use garage_util::data::*; use crate::crdt::CRDT; +/// Trait for partitionnable data pub trait PartitionKey { + /// Get the key used to partition fn hash(&self) -> Hash; } @@ -20,7 +22,9 @@ impl PartitionKey for Hash { } } +/// Trait for sortable data pub trait SortKey { + /// Get the key used to sort fn sort_key(&self) -> &[u8]; } @@ -36,25 +40,34 @@ impl SortKey for Hash { } } +/// Trait for an entry in a table. It must be sortable and partitionnable. pub trait Entry: CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { + /// Get the key used to partition fn partition_key(&self) -> &P; + /// Get the key used to sort fn sort_key(&self) -> &S; + /// Is the entry a tombstone? Default implementation always return false fn is_tombstone(&self) -> bool { false } } +/// Trait for the schema used in a table pub trait TableSchema: Send + Sync { + /// The partition key used in that table type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + /// The sort key used int that table type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; + /// They type for an entry in that table type E: Entry; type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; // Action to take if not able to decode current version: // try loading from an older version + /// Try migrating an entry from an older version fn try_migrate(_bytes: &[u8]) -> Option { None } @@ -63,9 +76,7 @@ pub trait TableSchema: Send + Sync { // as the update itself is an unchangeable fact that will never go back // due to CRDT logic. Typically errors in propagation of info should be logged // to stderr. - fn updated(&self, _old: Option, _new: Option) {} + fn updated(&self, old: Option, new: Option); - fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { - true - } + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool; } diff --git a/src/web/lib.rs b/src/web/lib.rs index f28937b9..7d3b4d54 100644 --- a/src/web/lib.rs +++ b/src/web/lib.rs @@ -1,6 +1,7 @@ #[macro_use] extern crate log; -pub mod error; +mod error; +pub use error::Error; pub mod web_server; From 92d54770bbdc2f1a64eb60ab899ca559244c0f48 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Fri, 26 Mar 2021 21:53:28 +0100 Subject: [PATCH 05/10] attempt at documenting model crate --- src/model/block.rs | 27 ++++++++++++++++++++++++- src/model/block_ref_table.rs | 3 +++ src/model/bucket_table.rs | 17 ++++++++++++++-- src/model/garage.rs | 9 +++++++++ src/model/key_table.rs | 26 +++++++++++++++++------- src/model/lib.rs | 1 + src/model/object_table.rs | 38 ++++++++++++++++++++++++++++++++---- src/model/version_table.rs | 13 +++++++++++- 8 files changed, 119 insertions(+), 15 deletions(-) diff --git a/src/model/block.rs b/src/model/block.rs index 2b145615..38b2325c 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -24,6 +24,7 @@ use crate::block_ref_table::*; use crate::garage::Garage; +/// Size under which data will be stored inlined in database instead of as files pub const INLINE_THRESHOLD: usize = 3072; pub const BACKGROUND_WORKERS: u64 = 1; @@ -33,28 +34,41 @@ const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); +/// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] pub enum Message { Ok, + /// Message to ask for a block of data, by hash GetBlock(Hash), + /// Message to send a block of data, either because requested, of for first delivery of new + /// block PutBlock(PutBlockMessage), + /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), + /// Response : whether the node do require that block NeedBlockReply(bool), } +/// Structure used to send a block #[derive(Debug, Serialize, Deserialize)] pub struct PutBlockMessage { + /// Hash of the block pub hash: Hash, + /// Content of the block #[serde(with = "serde_bytes")] pub data: Vec, } impl RpcMessage for Message {} +/// The block manager, handling block exchange between nodes, and block storage on local node pub struct BlockManager { + /// Replication strategy, allowing to find on which node blocks should be located pub replication: TableShardedReplication, + /// Directory in which block are stored pub data_dir: PathBuf, + /// Lock to prevent concurrent edition of the directory pub data_dir_lock: Mutex<()>, rc: sled::Tree, @@ -128,7 +142,8 @@ impl BlockManager { } pub fn spawn_background_worker(self: Arc) { - // Launch 2 simultaneous workers for background resync loop preprocessing + // Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this + // launches only one worker with current value of BACKGROUND_WORKERS for i in 0..BACKGROUND_WORKERS { let bm2 = self.clone(); let background = self.system.background.clone(); @@ -141,6 +156,7 @@ impl BlockManager { } } + /// Write a block to disk pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result { let _lock = self.data_dir_lock.lock().await; @@ -159,6 +175,7 @@ impl BlockManager { Ok(Message::Ok) } + /// Read block from disk, verifying it's integrity pub async fn read_block(&self, hash: &Hash) -> Result { let path = self.block_path(hash); @@ -190,6 +207,7 @@ impl BlockManager { Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) } + /// Check if this node should have a block, but don't actually have it pub async fn need_block(&self, hash: &Hash) -> Result { let needed = self .rc @@ -217,6 +235,8 @@ impl BlockManager { path } + /// Increment the number of time a block is used, putting it to resynchronization if it is + /// required, but not known pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { let old_rc = self.rc.fetch_and_update(&hash, |old| { let old_v = old.map(u64_from_be_bytes).unwrap_or(0); @@ -229,6 +249,8 @@ impl BlockManager { Ok(()) } + /// Decrement the number of time a block is used + // when counter reach 0, it seems not put to resync which I assume put it to gc? pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { let new_rc = self.rc.update_and_fetch(&hash, |old| { let old_v = old.map(u64_from_be_bytes).unwrap_or(0); @@ -388,6 +410,7 @@ impl BlockManager { Ok(()) } + /// Ask nodes that might have a block for it pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { let who = self.replication.read_nodes(&hash); let resps = self @@ -412,6 +435,7 @@ impl BlockManager { ))) } + /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Vec) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); self.rpc_client @@ -498,6 +522,7 @@ impl BlockManager { .boxed() } + /// Get lenght of resync queue pub fn resync_queue_len(&self) -> usize { self.resync_queue.len() } diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs index e4372717..2c5f9bf9 100644 --- a/src/model/block_ref_table.rs +++ b/src/model/block_ref_table.rs @@ -11,12 +11,15 @@ use crate::block::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct BlockRef { // Primary key + /// Hash of the block pub block: Hash, // Sort key + // why a version on a hashed (probably immutable) piece of data? pub version: UUID, // Keep track of deleted status + /// Is that block deleted pub deleted: crdt::Bool, } diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 2ede4904..8198deb7 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -12,15 +12,18 @@ use crate::key_table::PermissionSet; /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Bucket { - // Primary key + /// Name of the bucket pub name: String, - + /// State, and configuration if not deleted, of the bucket pub state: crdt::LWW, } +/// State of a bucket #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub enum BucketState { + /// The bucket is deleted Deleted, + /// The bucket exists Present(BucketParams), } @@ -37,9 +40,12 @@ impl CRDT for BucketState { } } +/// Configuration for a bucket #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct BucketParams { + /// Map of key with access to the bucket, and what kind of access they give pub authorized_keys: crdt::LWWMap, + /// Is the bucket served as http pub website: crdt::LWW, } @@ -51,6 +57,7 @@ impl CRDT for BucketParams { } impl BucketParams { + /// Create a new default `BucketParams` pub fn new() -> Self { BucketParams { authorized_keys: crdt::LWWMap::new(), @@ -60,15 +67,21 @@ impl BucketParams { } impl Bucket { + /// Create a new bucket pub fn new(name: String) -> Self { Bucket { name, state: crdt::LWW::new(BucketState::Present(BucketParams::new())), } } + + /// Query if bucket is deleted pub fn is_deleted(&self) -> bool { *self.state.get() == BucketState::Deleted } + + /// Return the list of authorized keys, when each was updated, and the permission associated to + /// the key pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] { match self.state.get() { BucketState::Deleted => &[], diff --git a/src/model/garage.rs b/src/model/garage.rs index 3f51f9fe..797a91e5 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -18,15 +18,23 @@ use crate::key_table::*; use crate::object_table::*; use crate::version_table::*; +/// An entire Garage full of data pub struct Garage { + /// The parsed configuration Garage is running pub config: Config, + /// The local database pub db: sled::Db, + /// A background job runner pub background: Arc, + /// The membership manager pub system: Arc, + /// The block manager pub block_manager: Arc, + /// Table containing informations about buckets pub bucket_table: Arc>, + /// Table containing informations about api keys pub key_table: Arc>, pub object_table: Arc>, @@ -35,6 +43,7 @@ pub struct Garage { } impl Garage { + /// Create and run garage pub fn new( config: Config, db: sled::Db, diff --git a/src/model/key_table.rs b/src/model/key_table.rs index e6ebe8de..653a38e2 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -3,26 +3,28 @@ use serde::{Deserialize, Serialize}; use garage_table::crdt::*; use garage_table::*; +/// An api key #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Key { - // Primary key + /// The id of the key (immutable) pub key_id: String, - // Associated secret key (immutable) + /// The secret_key associated + // shouldn't it be hashed or something, so it's trully secret? pub secret_key: String, - // Name + /// Name for the key pub name: crdt::LWW, - // Deletion + /// Is the key deleted pub deleted: crdt::Bool, - // Authorized keys + /// Buckets in which the key is authorized. Empty if `Key` is deleted pub authorized_buckets: crdt::LWWMap, - // CRDT interaction: deleted implies authorized_buckets is empty } impl Key { + /// Create a new key pub fn new(name: String) -> Self { let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]); @@ -34,6 +36,8 @@ impl Key { authorized_buckets: crdt::LWWMap::new(), } } + + /// Import a key from it's parts pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self { Self { key_id: key_id.to_string(), @@ -43,6 +47,8 @@ impl Key { authorized_buckets: crdt::LWWMap::new(), } } + + /// Create a new Key which can me merged to mark an existing key deleted pub fn delete(key_id: String) -> Self { Self { key_id, @@ -52,13 +58,16 @@ impl Key { authorized_buckets: crdt::LWWMap::new(), } } - /// Add an authorized bucket, only if it wasn't there before + + /// Check if `Key` is allowed to read in bucket pub fn allow_read(&self, bucket: &str) -> bool { self.authorized_buckets .get(&bucket.to_string()) .map(|x| x.allow_read) .unwrap_or(false) } + + /// Check if `Key` is allowed to write in bucket pub fn allow_write(&self, bucket: &str) -> bool { self.authorized_buckets .get(&bucket.to_string()) @@ -67,9 +76,12 @@ impl Key { } } +/// Permission given to a key in a bucket #[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct PermissionSet { + /// The key can be used to read the bucket pub allow_read: bool, + /// The key can be used to write in the bucket pub allow_write: bool, } diff --git a/src/model/lib.rs b/src/model/lib.rs index b4a8ddb7..70d2e2ce 100644 --- a/src/model/lib.rs +++ b/src/model/lib.rs @@ -1,3 +1,4 @@ +#![warn(missing_docs)] #[macro_use] extern crate log; diff --git a/src/model/object_table.rs b/src/model/object_table.rs index d5be62e5..5b026ceb 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -11,19 +11,21 @@ use garage_table::*; use crate::version_table::*; +/// An object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { - // Primary key + /// The bucket in which the object is stored pub bucket: String, - // Sort key + /// The key at which the object is stored in its bucket pub key: String, - // Data + /// The list of known versions of the object versions: Vec, } impl Object { + /// Create an object from parts pub fn new(bucket: String, key: String, versions: Vec) -> Self { let mut ret = Self { bucket, @@ -36,6 +38,7 @@ impl Object { } ret } + /// Adds a version if it wasn't already present pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> { match self @@ -49,23 +52,32 @@ impl Object { Ok(_) => Err(()), } } + + /// Get a list of all versions known for `Object` pub fn versions(&self) -> &[ObjectVersion] { &self.versions[..] } } +/// Informations about a version of an object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersion { + /// Id of the version pub uuid: UUID, + /// Timestamp of when the object was created pub timestamp: u64, - + /// State of the version pub state: ObjectVersionState, } +/// State of an object version #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionState { + /// The version is being received Uploading(ObjectVersionHeaders), + /// The version is fully received Complete(ObjectVersionData), + /// The version was never fully received Aborted, } @@ -90,10 +102,15 @@ impl CRDT for ObjectVersionState { } } +/// Data about an object version #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionData { + /// The version is deleted DeleteMarker, + /// The object is short, it's stored inlined Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec), + /// The object is not short, Hash of first block is stored here, next segments hashes are + /// stored in the version table FirstBlock(ObjectVersionMeta, Hash), } @@ -101,16 +118,23 @@ impl AutoCRDT for ObjectVersionData { const WARN_IF_DIFFERENT: bool = true; } +/// Metadata about the object version #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionMeta { + /// Headers to send to the client pub headers: ObjectVersionHeaders, + /// Size of the object pub size: u64, + /// etag of the object pub etag: String, } +/// Additional headers for an object #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionHeaders { + /// Content type of the object pub content_type: String, + /// Any other http headers to send pub other: BTreeMap, } @@ -118,18 +142,24 @@ impl ObjectVersion { fn cmp_key(&self) -> (u64, UUID) { (self.timestamp, self.uuid) } + + /// Is the object version currently being uploaded pub fn is_uploading(&self) -> bool { match self.state { ObjectVersionState::Uploading(_) => true, _ => false, } } + + /// Is the object version completely received pub fn is_complete(&self) -> bool { match self.state { ObjectVersionState::Complete(_) => true, _ => false, } } + + /// Is the object version available (received and not deleted) pub fn is_data(&self) -> bool { match self.state { ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false, diff --git a/src/model/version_table.rs b/src/model/version_table.rs index fabd1fb1..428fac10 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -10,21 +10,27 @@ use garage_table::*; use crate::block_ref_table::*; +/// A version of an object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Version { - // Primary key + /// UUID of the version pub uuid: UUID, // Actual data: the blocks for this version // In the case of a multipart upload, also store the etags // of individual parts and check them when doing CompleteMultipartUpload + /// Is this version deleted pub deleted: crdt::Bool, + /// list of blocks of data composing the version pub blocks: crdt::Map, + /// Etag of each part in case of a multipart upload, empty otherwise pub parts_etags: crdt::Map, // Back link to bucket+key so that we can figure if // this was deleted later on + /// Bucket in which the related object is stored pub bucket: String, + /// Key in which the related object is stored pub key: String, } @@ -43,7 +49,9 @@ impl Version { #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] pub struct VersionBlockKey { + /// Number of the part, starting at 1 pub part_number: u64, + /// offset of the block in the file, starting at 0 pub offset: u64, } @@ -61,9 +69,12 @@ impl PartialOrd for VersionBlockKey { } } +/// Informations about a single block #[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] pub struct VersionBlock { + /// Hash of the block pub hash: Hash, + /// Size of the block pub size: u64, } From ee9b685994a95787cd735ee17dc43875f549514f Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Fri, 26 Mar 2021 22:05:16 +0100 Subject: [PATCH 06/10] document web crate --- src/garage/server.rs | 4 ++-- src/web/error.rs | 9 ++++++++- src/web/lib.rs | 5 ++++- src/web/web_server.rs | 1 + 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/garage/server.rs b/src/garage/server.rs index feb858e4..a5bf68a6 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -11,7 +11,7 @@ use garage_util::error::Error; use garage_api::api_server; use garage_model::garage::Garage; use garage_rpc::rpc_server::RpcServer; -use garage_web::web_server; +use garage_web::run_web_server; use crate::admin_rpc::*; @@ -63,7 +63,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Initializing RPC and API servers..."); let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone())); let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); - let web_server = web_server::run_web_server(garage, wait_from(watch_cancel.clone())); + let web_server = run_web_server(garage, wait_from(watch_cancel.clone())); futures::try_join!( bootstrap.map(|rv| { diff --git a/src/web/error.rs b/src/web/error.rs index 14bc3b75..6da86199 100644 --- a/src/web/error.rs +++ b/src/web/error.rs @@ -3,30 +3,37 @@ use hyper::StatusCode; use garage_util::error::Error as GarageError; +/// Errors of this crate #[derive(Debug, Error)] pub enum Error { + /// An error received from the API crate #[error(display = "API error: {}", _0)] ApiError(#[error(source)] garage_api::error::Error), // Category: internal error + /// Error internal to garage #[error(display = "Internal error: {}", _0)] InternalError(#[error(source)] GarageError), + /// The file does not exist #[error(display = "Not found")] NotFound, - // Category: bad request + /// The client requested a malformed path #[error(display = "Invalid UTF-8: {}", _0)] InvalidUTF8(#[error(source)] std::str::Utf8Error), + /// The client send a header with invalid value #[error(display = "Invalid header value: {}", _0)] InvalidHeader(#[error(source)] hyper::header::ToStrError), + /// The client sent a request without host, or with unsupported method #[error(display = "Bad request: {}", _0)] BadRequest(String), } impl Error { + /// Transform errors into http status code pub fn http_status_code(&self) -> StatusCode { match self { Error::NotFound => StatusCode::NOT_FOUND, diff --git a/src/web/lib.rs b/src/web/lib.rs index 7d3b4d54..3e978af6 100644 --- a/src/web/lib.rs +++ b/src/web/lib.rs @@ -1,7 +1,10 @@ +#![deny(missing_crate_level_docs, missing_docs)] +//! Crate for handling web serving of s3 bucket #[macro_use] extern crate log; mod error; pub use error::Error; -pub mod web_server; +mod web_server; +pub use web_server::run_web_server; diff --git a/src/web/web_server.rs b/src/web/web_server.rs index cfde2bcc..9635eca6 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -18,6 +18,7 @@ use garage_model::garage::Garage; use garage_table::*; use garage_util::error::Error as GarageError; +/// Run a web server pub async fn run_web_server( garage: Arc, shutdown_signal: impl Future, From a74eccfd6e1cd438912800ea4f96a2f86d267d12 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Fri, 26 Mar 2021 22:32:09 +0100 Subject: [PATCH 07/10] document api crate --- src/api/api_server.rs | 1 + src/api/encoding.rs | 5 +++++ src/api/error.rs | 16 ++++++++++++++++ src/api/lib.rs | 21 +++++++++++++-------- src/api/s3_get.rs | 5 ++++- src/garage/server.rs | 4 ++-- src/model/key_table.rs | 1 - src/web/error.rs | 2 +- 8 files changed, 42 insertions(+), 13 deletions(-) diff --git a/src/api/api_server.rs b/src/api/api_server.rs index 2feb0e3a..dcc9f478 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -20,6 +20,7 @@ use crate::s3_get::*; use crate::s3_list::*; use crate::s3_put::*; +/// Run the S3 API server pub async fn run_api_server( garage: Arc, shutdown_signal: impl Future, diff --git a/src/api/encoding.rs b/src/api/encoding.rs index 25999207..63c5dee2 100644 --- a/src/api/encoding.rs +++ b/src/api/encoding.rs @@ -1,9 +1,13 @@ +//! Module containing various helpers for encoding + +/// Escape &str for xml inclusion pub fn xml_escape(s: &str) -> String { s.replace("<", "<") .replace(">", ">") .replace("\"", """) } +/// Encode &str for use in a URI pub fn uri_encode(string: &str, encode_slash: bool) -> String { let mut result = String::with_capacity(string.len() * 2); for c in string.chars() { @@ -24,6 +28,7 @@ pub fn uri_encode(string: &str, encode_slash: bool) -> String { result } +/// Encode &str either as an uri, or a valid string for xml inclusion pub fn xml_encode_key(k: &str, urlencode: bool) -> String { if urlencode { uri_encode(k, true) diff --git a/src/api/error.rs b/src/api/error.rs index 42a7ab10..acd8ebf7 100644 --- a/src/api/error.rs +++ b/src/api/error.rs @@ -3,44 +3,57 @@ use hyper::StatusCode; use garage_util::error::Error as GarageError; +/// Errors of this crate #[derive(Debug, Error)] pub enum Error { // Category: internal error + /// Error related to deeper parts of Garage #[error(display = "Internal error: {}", _0)] InternalError(#[error(source)] GarageError), + /// Error related to Hyper #[error(display = "Internal error (Hyper error): {}", _0)] Hyper(#[error(source)] hyper::Error), + /// Error related to HTTP #[error(display = "Internal error (HTTP error): {}", _0)] HTTP(#[error(source)] http::Error), // Category: cannot process + /// No proper api key was used, or the signature was invalid #[error(display = "Forbidden: {}", _0)] Forbidden(String), + /// The object requested don't exists #[error(display = "Not found")] NotFound, // Category: bad request + /// The request used an invalid path #[error(display = "Invalid UTF-8: {}", _0)] InvalidUTF8Str(#[error(source)] std::str::Utf8Error), + /// The request used an invalid path #[error(display = "Invalid UTF-8: {}", _0)] InvalidUTF8String(#[error(source)] std::string::FromUtf8Error), + /// Some base64 encoded data was badly encoded #[error(display = "Invalid base64: {}", _0)] InvalidBase64(#[error(source)] base64::DecodeError), + /// The client sent invalid XML data #[error(display = "Invalid XML: {}", _0)] InvalidXML(String), + /// The client sent a header with invalid value #[error(display = "Invalid header value: {}", _0)] InvalidHeader(#[error(source)] hyper::header::ToStrError), + /// The client sent a range header with invalid value #[error(display = "Invalid HTTP range: {:?}", _0)] InvalidRange(#[error(from)] http_range::HttpRangeParseError), + /// The client sent an invalid request #[error(display = "Bad request: {}", _0)] BadRequest(String), } @@ -52,6 +65,7 @@ impl From for Error { } impl Error { + /// Convert an error into an Http status code pub fn http_status_code(&self) -> StatusCode { match self { Error::NotFound => StatusCode::NOT_FOUND, @@ -65,6 +79,7 @@ impl Error { } } +/// Trait to map error to the Bad Request error code pub trait OkOrBadRequest { type S2; fn ok_or_bad_request(self, reason: &'static str) -> Self::S2; @@ -93,6 +108,7 @@ impl OkOrBadRequest for Option { } } +/// Trait to map an error to an Internal Error code pub trait OkOrInternalError { type S2; fn ok_or_internal_error(self, reason: &'static str) -> Self::S2; diff --git a/src/api/lib.rs b/src/api/lib.rs index 9bb07925..d036cdd6 100644 --- a/src/api/lib.rs +++ b/src/api/lib.rs @@ -1,15 +1,20 @@ +#![deny(missing_crate_level_docs, missing_docs)] +//! Crate for serving a S3 compatible API #[macro_use] extern crate log; -pub mod error; +mod error; +pub use error::Error; -pub mod encoding; +mod encoding; -pub mod api_server; -pub mod signature; +mod api_server; +pub use api_server::run_api_server; -pub mod s3_copy; -pub mod s3_delete; +mod signature; + +mod s3_copy; +mod s3_delete; pub mod s3_get; -pub mod s3_list; -pub mod s3_put; +mod s3_list; +mod s3_put; diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs index 2590c9bd..15c0ed0a 100644 --- a/src/api/s3_get.rs +++ b/src/api/s3_get.rs @@ -1,3 +1,4 @@ +//! Function related to GET and HEAD requests use std::sync::Arc; use std::time::{Duration, UNIX_EPOCH}; @@ -79,6 +80,7 @@ fn try_answer_cached( } } +/// Handle HEAD request pub async fn handle_head( garage: Arc, req: &Request, @@ -118,6 +120,7 @@ pub async fn handle_head( Ok(response) } +/// Handle GET request pub async fn handle_get( garage: Arc, req: &Request, @@ -224,7 +227,7 @@ pub async fn handle_get( } } -pub async fn handle_get_range( +async fn handle_get_range( garage: Arc, version: &ObjectVersion, version_data: &ObjectVersionData, diff --git a/src/garage/server.rs b/src/garage/server.rs index a5bf68a6..97a9bec2 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -8,7 +8,7 @@ use garage_util::background::*; use garage_util::config::*; use garage_util::error::Error; -use garage_api::api_server; +use garage_api::run_api_server; use garage_model::garage::Garage; use garage_rpc::rpc_server::RpcServer; use garage_web::run_web_server; @@ -62,7 +62,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Initializing RPC and API servers..."); let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone())); - let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); + let api_server = run_api_server(garage.clone(), wait_from(watch_cancel.clone())); let web_server = run_web_server(garage, wait_from(watch_cancel.clone())); futures::try_join!( diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 653a38e2..444f3949 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -10,7 +10,6 @@ pub struct Key { pub key_id: String, /// The secret_key associated - // shouldn't it be hashed or something, so it's trully secret? pub secret_key: String, /// Name for the key diff --git a/src/web/error.rs b/src/web/error.rs index 6da86199..2b8aeebe 100644 --- a/src/web/error.rs +++ b/src/web/error.rs @@ -8,7 +8,7 @@ use garage_util::error::Error as GarageError; pub enum Error { /// An error received from the API crate #[error(display = "API error: {}", _0)] - ApiError(#[error(source)] garage_api::error::Error), + ApiError(#[error(source)] garage_api::Error), // Category: internal error /// Error internal to garage From 653d3d588f912bcf86b24c456f23d6db0ca755d8 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Fri, 26 Mar 2021 22:36:23 +0100 Subject: [PATCH 08/10] document garage crate --- src/garage/main.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/garage/main.rs b/src/garage/main.rs index 6c86d0fb..34bf5501 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -1,4 +1,6 @@ +#![deny(missing_crate_level_docs, missing_docs)] #![recursion_limit = "1024"] +//! Garage CLI, used to interact with a running Garage instance, and to launch a Garage #[macro_use] extern crate log; @@ -25,7 +27,7 @@ use cli::*; #[derive(StructOpt, Debug)] #[structopt(name = "garage")] -pub struct Opt { +struct Opt { /// RPC connect to this host to execute client operations #[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")] pub rpc_host: SocketAddr, From c8906f200bf907272bf9fba7d183df4332fa085b Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Tue, 6 Apr 2021 05:25:28 +0200 Subject: [PATCH 09/10] make most requested changes --- src/api/error.rs | 4 ++-- src/api/lib.rs | 1 - src/garage/main.rs | 3 +-- src/model/block.rs | 6 +++--- src/model/block_ref_table.rs | 8 +++----- src/model/key_table.rs | 3 ++- src/model/lib.rs | 1 - src/model/object_table.rs | 14 +++++++------- src/model/version_table.rs | 6 +++--- src/rpc/lib.rs | 1 - src/rpc/membership.rs | 11 ++++++----- src/rpc/ring.rs | 4 +++- src/rpc/rpc_client.rs | 9 +++++---- src/table/lib.rs | 1 - src/table/replication/parameters.rs | 1 - src/table/schema.rs | 4 ++-- src/util/error.rs | 1 - src/util/lib.rs | 1 - src/web/error.rs | 2 +- src/web/lib.rs | 1 - 20 files changed, 38 insertions(+), 44 deletions(-) diff --git a/src/api/error.rs b/src/api/error.rs index acd8ebf7..ad0174ad 100644 --- a/src/api/error.rs +++ b/src/api/error.rs @@ -29,7 +29,7 @@ pub enum Error { NotFound, // Category: bad request - /// The request used an invalid path + /// The request contained an invalid UTF-8 sequence in its path or in other parameters #[error(display = "Invalid UTF-8: {}", _0)] InvalidUTF8Str(#[error(source)] std::str::Utf8Error), @@ -65,7 +65,7 @@ impl From for Error { } impl Error { - /// Convert an error into an Http status code + /// Get the HTTP status code that best represents the meaning of the error for the client pub fn http_status_code(&self) -> StatusCode { match self { Error::NotFound => StatusCode::NOT_FOUND, diff --git a/src/api/lib.rs b/src/api/lib.rs index d036cdd6..be7e37c8 100644 --- a/src/api/lib.rs +++ b/src/api/lib.rs @@ -1,4 +1,3 @@ -#![deny(missing_crate_level_docs, missing_docs)] //! Crate for serving a S3 compatible API #[macro_use] extern crate log; diff --git a/src/garage/main.rs b/src/garage/main.rs index 34bf5501..a78e0f03 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -1,6 +1,5 @@ -#![deny(missing_crate_level_docs, missing_docs)] #![recursion_limit = "1024"] -//! Garage CLI, used to interact with a running Garage instance, and to launch a Garage +//! Garage CLI, used to interact with a running Garage instance, and to launch a Garage instance #[macro_use] extern crate log; diff --git a/src/model/block.rs b/src/model/block.rs index 38b2325c..89685630 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -157,7 +157,7 @@ impl BlockManager { } /// Write a block to disk - pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result { + async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result { let _lock = self.data_dir_lock.lock().await; let mut path = self.block_dir(hash); @@ -176,7 +176,7 @@ impl BlockManager { } /// Read block from disk, verifying it's integrity - pub async fn read_block(&self, hash: &Hash) -> Result { + async fn read_block(&self, hash: &Hash) -> Result { let path = self.block_path(hash); let mut f = match fs::File::open(&path).await { @@ -208,7 +208,7 @@ impl BlockManager { } /// Check if this node should have a block, but don't actually have it - pub async fn need_block(&self, hash: &Hash) -> Result { + async fn need_block(&self, hash: &Hash) -> Result { let needed = self .rc .get(hash.as_ref())? diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs index 2c5f9bf9..1f0c7bb0 100644 --- a/src/model/block_ref_table.rs +++ b/src/model/block_ref_table.rs @@ -10,16 +10,14 @@ use crate::block::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct BlockRef { - // Primary key - /// Hash of the block + /// Hash of the block, used as partition key pub block: Hash, - // Sort key - // why a version on a hashed (probably immutable) piece of data? + /// Id of the Version for the object containing this block, used as sorting key pub version: UUID, // Keep track of deleted status - /// Is that block deleted + /// Is the Version that contains this block deleted pub deleted: crdt::Bool, } diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 444f3949..e1dcd7f4 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -6,7 +6,7 @@ use garage_table::*; /// An api key #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Key { - /// The id of the key (immutable) + /// The id of the key (immutable), used as partition key pub key_id: String, /// The secret_key associated @@ -19,6 +19,7 @@ pub struct Key { pub deleted: crdt::Bool, /// Buckets in which the key is authorized. Empty if `Key` is deleted + // CRDT interaction: deleted implies authorized_buckets is empty pub authorized_buckets: crdt::LWWMap, } diff --git a/src/model/lib.rs b/src/model/lib.rs index 70d2e2ce..b4a8ddb7 100644 --- a/src/model/lib.rs +++ b/src/model/lib.rs @@ -1,4 +1,3 @@ -#![warn(missing_docs)] #[macro_use] extern crate log; diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 5b026ceb..ff42d065 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -14,13 +14,13 @@ use crate::version_table::*; /// An object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { - /// The bucket in which the object is stored + /// The bucket in which the object is stored, used as partition key pub bucket: String, - /// The key at which the object is stored in its bucket + /// The key at which the object is stored in its bucket, used as sorting key pub key: String, - /// The list of known versions of the object + /// The list of currenty stored versions of the object versions: Vec, } @@ -53,7 +53,7 @@ impl Object { } } - /// Get a list of all versions known for `Object` + /// Get a list of currently stored versions of `Object` pub fn versions(&self) -> &[ObjectVersion] { &self.versions[..] } @@ -77,7 +77,7 @@ pub enum ObjectVersionState { Uploading(ObjectVersionHeaders), /// The version is fully received Complete(ObjectVersionData), - /// The version was never fully received + /// The version uploaded containded errors or the upload was explicitly aborted Aborted, } @@ -105,7 +105,7 @@ impl CRDT for ObjectVersionState { /// Data about an object version #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionData { - /// The version is deleted + /// The object was deleted, this Version is a tombstone to mark it as such DeleteMarker, /// The object is short, it's stored inlined Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec), @@ -159,7 +159,7 @@ impl ObjectVersion { } } - /// Is the object version available (received and not deleted) + /// Is the object version available (received and not a tombstone) pub fn is_data(&self) -> bool { match self.state { ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false, diff --git a/src/model/version_table.rs b/src/model/version_table.rs index 428fac10..bb836868 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -13,7 +13,7 @@ use crate::block_ref_table::*; /// A version of an object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Version { - /// UUID of the version + /// UUID of the version, used as partition key pub uuid: UUID, // Actual data: the blocks for this version @@ -49,9 +49,9 @@ impl Version { #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] pub struct VersionBlockKey { - /// Number of the part, starting at 1 + /// Number of the part pub part_number: u64, - /// offset of the block in the file, starting at 0 + /// Offset of this sub-segment in its part pub offset: u64, } diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 308baa83..96561d0e 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -1,4 +1,3 @@ -#![warn(missing_crate_level_docs, missing_docs)] //! Crate containing rpc related functions and types used in Garage #[macro_use] diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index c465ce68..4fce1a7b 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -1,4 +1,4 @@ -/// Module containing structs related to membership management +//! Module containing structs related to membership management use std::collections::HashMap; use std::fmt::Write as FmtWrite; use std::io::{Read, Write}; @@ -96,7 +96,7 @@ pub struct System { rpc_client: Arc>, pub(crate) status: watch::Receiver>, - /// The ring, viewed by this node + /// The ring pub ring: watch::Receiver>, update_lock: Mutex, @@ -114,9 +114,8 @@ struct Updaters { #[derive(Debug, Clone)] pub struct Status { /// Mapping of each node id to its known status - // considering its sorted regularly, maybe it should be a btreeset? pub nodes: HashMap>, - /// Hash of this entry + /// Hash of `nodes`, used to detect when nodes have different views of the cluster pub hash: Hash, } @@ -380,7 +379,7 @@ impl System { id_option, addr, sys.rpc_client - .get_addr() + .by_addr() .call(&addr, ping_msg_ref, PING_TIMEOUT) .await, ) @@ -418,6 +417,8 @@ impl System { } } else if let Some(id) = id_option { if let Some(st) = status.nodes.get_mut(id) { + // TODO this might double-increment the value as the counter is already + // incremented for any kind of failure in rpc_client st.num_failures.fetch_add(1, Ordering::SeqCst); if !st.is_up() { warn!("Node {:?} seems to be down.", id); diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index 6f341fa8..04f8b590 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -1,4 +1,5 @@ //! Module containing types related to computing nodes which should receive a copy of data blocks +//! and metadata use std::collections::{HashMap, HashSet}; use std::convert::TryInto; @@ -197,7 +198,7 @@ impl Ring { top >> (16 - PARTITION_BITS) } - /// Get the list of partitions and + /// Get the list of partitions and the first hash of a partition key that would fall in it pub fn partitions(&self) -> Vec<(Partition, Hash)> { let mut ret = vec![]; @@ -211,6 +212,7 @@ impl Ring { ret } + // TODO rename this function as it no longer walk the ring /// Walk the ring to find the n servers in which data should be replicated pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec { if self.ring.len() != 1 << PARTITION_BITS { diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs index 261dec7a..3bb58c91 100644 --- a/src/rpc/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -53,6 +53,7 @@ impl RequestStrategy { self } /// Set if requests can be dropped after quorum has been reached + /// In general true for read requests, and false for write pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self { self.rs_interrupt_after_quorum = interrupt; self @@ -103,8 +104,8 @@ impl RpcClient { self.local_handler.swap(Some(Arc::new((my_id, handler)))); } - /// Get the server address this client connect to - pub fn get_addr(&self) -> &RpcAddrClient { + /// Get a RPC client to make calls using node's SocketAddr instead of its ID + pub fn by_addr(&self) -> &RpcAddrClient { &self.rpc_addr_client } @@ -167,7 +168,7 @@ impl RpcClient { } /// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if - /// strategy could not be respected due to to many errors + /// strategy could not be respected due to too many errors pub async fn try_call_many( self: &Arc, to: &[UUID], @@ -227,7 +228,7 @@ impl RpcClient { } } -/// Endpoint to which send RPC +/// Thin wrapper arround an `RpcHttpClient` specifying the path of the request pub struct RpcAddrClient { phantom: PhantomData, diff --git a/src/table/lib.rs b/src/table/lib.rs index 8dcb115d..c3e14ab8 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -1,4 +1,3 @@ -#![warn(missing_docs)] #![recursion_limit = "1024"] #[macro_use] diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 0ab9ee5a..c2c78c8b 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -16,7 +16,6 @@ pub trait TableReplication: Send + Sync { fn write_nodes(&self, hash: &Hash) -> Vec; /// Responses needed to consider a write succesfull fn write_quorum(&self) -> usize; - // this feels like its write_nodes().len() - write_quorum() fn max_write_errors(&self) -> usize; // Accessing partitions, for Merkle tree & sync diff --git a/src/table/schema.rs b/src/table/schema.rs index f5fde95f..c17ccc15 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -4,7 +4,7 @@ use garage_util::data::*; use crate::crdt::CRDT; -/// Trait for partitionnable data +/// Trait for field used to partition data pub trait PartitionKey { /// Get the key used to partition fn hash(&self) -> Hash; @@ -22,7 +22,7 @@ impl PartitionKey for Hash { } } -/// Trait for sortable data +/// Trait for field used to sort data pub trait SortKey { /// Get the key used to sort fn sort_key(&self) -> &[u8]; diff --git a/src/util/error.rs b/src/util/error.rs index 13cbeba3..32dccbe6 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -1,5 +1,4 @@ //! Module containing error types used in Garage -#![allow(missing_docs)] use err_derive::Error; use hyper::StatusCode; use std::io; diff --git a/src/util/lib.rs b/src/util/lib.rs index faacd9c1..c080e3a3 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -1,4 +1,3 @@ -#![warn(missing_crate_level_docs, missing_docs)] //! Crate containing common functions and types used in Garage #[macro_use] diff --git a/src/web/error.rs b/src/web/error.rs index 2b8aeebe..f6afbb42 100644 --- a/src/web/error.rs +++ b/src/web/error.rs @@ -19,7 +19,7 @@ pub enum Error { #[error(display = "Not found")] NotFound, - /// The client requested a malformed path + /// The request contained an invalid UTF-8 sequence in its path or in other parameters #[error(display = "Invalid UTF-8: {}", _0)] InvalidUTF8(#[error(source)] std::str::Utf8Error), diff --git a/src/web/lib.rs b/src/web/lib.rs index 3e978af6..c06492a3 100644 --- a/src/web/lib.rs +++ b/src/web/lib.rs @@ -1,4 +1,3 @@ -#![deny(missing_crate_level_docs, missing_docs)] //! Crate for handling web serving of s3 bucket #[macro_use] extern crate log; From 718ae005486baeed358d56cc7cd319fedd1e76eb Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Wed, 7 Apr 2021 13:39:34 +0200 Subject: [PATCH 10/10] change some more comments and revert changes on TableSchema --- src/model/block.rs | 1 - src/model/bucket_table.rs | 8 ++------ src/model/key_table.rs | 4 ---- src/rpc/membership.rs | 4 ++-- src/rpc/ring.rs | 2 +- src/table/schema.rs | 2 +- 6 files changed, 6 insertions(+), 15 deletions(-) diff --git a/src/model/block.rs b/src/model/block.rs index 89685630..5f428fe1 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -250,7 +250,6 @@ impl BlockManager { } /// Decrement the number of time a block is used - // when counter reach 0, it seems not put to resync which I assume put it to gc? pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { let new_rc = self.rc.update_and_fetch(&hash, |old| { let old_v = old.map(u64_from_be_bytes).unwrap_or(0); diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 8198deb7..6a4b021d 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -57,7 +57,7 @@ impl CRDT for BucketParams { } impl BucketParams { - /// Create a new default `BucketParams` + /// Initializes a new instance of the Bucket struct pub fn new() -> Self { BucketParams { authorized_keys: crdt::LWWMap::new(), @@ -75,7 +75,7 @@ impl Bucket { } } - /// Query if bucket is deleted + /// Returns true if this represents a deleted bucket pub fn is_deleted(&self) -> bool { *self.state.get() == BucketState::Deleted } @@ -113,10 +113,6 @@ impl TableSchema for BucketTable { type E = Bucket; type Filter = DeletedFilter; - fn updated(&self, _old: Option, _new: Option) { - // nothing to do when updated - } - fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { filter.apply(entry.is_deleted()) } diff --git a/src/model/key_table.rs b/src/model/key_table.rs index e1dcd7f4..578f8683 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -125,10 +125,6 @@ impl TableSchema for KeyTable { type E = Key; type Filter = KeyFilter; - fn updated(&self, _old: Option, _new: Option) { - // nothing to do when updated - } - fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { match filter { KeyFilter::Deleted(df) => df.apply(entry.deleted.get()), diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 4fce1a7b..5f7bbc96 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -417,8 +417,8 @@ impl System { } } else if let Some(id) = id_option { if let Some(st) = status.nodes.get_mut(id) { - // TODO this might double-increment the value as the counter is already - // incremented for any kind of failure in rpc_client + // we need to increment failure counter as call was done using by_addr so the + // counter was not auto-incremented st.num_failures.fetch_add(1, Ordering::SeqCst); if !st.is_up() { warn!("Node {:?} seems to be down.", id); diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index 04f8b590..bffd7f1f 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -28,7 +28,7 @@ const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_B /// The maximum number of time an object might get replicated pub const MAX_REPLICATION: usize = 3; -/// The versionned configurations of all nodes known in the network +/// The user-defined configuration of the cluster's nodes #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkConfig { /// Map of each node's id to it's configuration diff --git a/src/table/schema.rs b/src/table/schema.rs index c17ccc15..13517271 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -76,7 +76,7 @@ pub trait TableSchema: Send + Sync { // as the update itself is an unchangeable fact that will never go back // due to CRDT logic. Typically errors in propagation of info should be logged // to stderr. - fn updated(&self, old: Option, new: Option); + fn updated(&self, _old: Option, _new: Option) {} fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool; }