document util crate

This commit is contained in:
Trinity Pointard 2021-03-20 20:38:44 +01:00
parent c4c4b7dedc
commit f4bf987627
6 changed files with 63 additions and 3 deletions

View file

@ -1,3 +1,4 @@
//! Job runner for futures and async functions
use core::future::Future; use core::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
@ -12,14 +13,15 @@ use crate::error::Error;
type JobOutput = Result<(), Error>; type JobOutput = Result<(), Error>;
type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
/// Job runner for futures and async functions
pub struct BackgroundRunner { pub struct BackgroundRunner {
pub stop_signal: watch::Receiver<bool>, stop_signal: watch::Receiver<bool>,
queue_in: mpsc::UnboundedSender<(Job, bool)>, queue_in: mpsc::UnboundedSender<(Job, bool)>,
worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>, worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
} }
impl BackgroundRunner { impl BackgroundRunner {
/// Create a new BackgroundRunner
pub fn new( pub fn new(
n_runners: usize, n_runners: usize,
stop_signal: watch::Receiver<bool>, stop_signal: watch::Receiver<bool>,
@ -103,7 +105,7 @@ impl BackgroundRunner {
(bgrunner, await_all_done) (bgrunner, await_all_done)
} }
// Spawn a task to be run in background /// Spawn a task to be run in background
pub fn spawn<T>(&self, job: T) pub fn spawn<T>(&self, job: T)
where where
T: Future<Output = JobOutput> + Send + 'static, T: Future<Output = JobOutput> + Send + 'static,
@ -115,6 +117,8 @@ impl BackgroundRunner {
.unwrap(); .unwrap();
} }
/// Spawn a task to be run in background. It may get discarded before running if spawned while
/// the runner is stopping
pub fn spawn_cancellable<T>(&self, job: T) pub fn spawn_cancellable<T>(&self, job: T)
where where
T: Future<Output = JobOutput> + Send + 'static, T: Future<Output = JobOutput> + Send + 'static,

View file

@ -1,3 +1,4 @@
//! Contains type and functions related to Garage configuration file
use std::io::Read; use std::io::Read;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
@ -6,57 +7,82 @@ use serde::{de, Deserialize};
use crate::error::Error; use crate::error::Error;
/// Represent the whole configuration
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct Config { pub struct Config {
/// Path where to store metadata. Should be fast, but low volume
pub metadata_dir: PathBuf, pub metadata_dir: PathBuf,
/// Path where to store data. Can be slower, but need higher volume
pub data_dir: PathBuf, pub data_dir: PathBuf,
/// Address to bind for RPC
pub rpc_bind_addr: SocketAddr, pub rpc_bind_addr: SocketAddr,
/// Bootstrap peers RPC address
#[serde(deserialize_with = "deserialize_vec_addr")] #[serde(deserialize_with = "deserialize_vec_addr")]
pub bootstrap_peers: Vec<SocketAddr>, pub bootstrap_peers: Vec<SocketAddr>,
/// Consule host to connect to to discover more peers
pub consul_host: Option<String>, pub consul_host: Option<String>,
/// Consul service name to use
pub consul_service_name: Option<String>, pub consul_service_name: Option<String>,
/// Max number of concurrent RPC request
#[serde(default = "default_max_concurrent_rpc_requests")] #[serde(default = "default_max_concurrent_rpc_requests")]
pub max_concurrent_rpc_requests: usize, pub max_concurrent_rpc_requests: usize,
/// Size of data blocks to save to disk
#[serde(default = "default_block_size")] #[serde(default = "default_block_size")]
pub block_size: usize, pub block_size: usize,
#[serde(default = "default_control_write_max_faults")] #[serde(default = "default_control_write_max_faults")]
pub control_write_max_faults: usize, pub control_write_max_faults: usize,
/// How many nodes should hold a copy of meta data
#[serde(default = "default_replication_factor")] #[serde(default = "default_replication_factor")]
pub meta_replication_factor: usize, pub meta_replication_factor: usize,
/// How many nodes should hold a copy of data
#[serde(default = "default_replication_factor")] #[serde(default = "default_replication_factor")]
pub data_replication_factor: usize, pub data_replication_factor: usize,
/// Configuration for RPC TLS
pub rpc_tls: Option<TlsConfig>, pub rpc_tls: Option<TlsConfig>,
/// Configuration for S3 api
pub s3_api: ApiConfig, pub s3_api: ApiConfig,
/// Configuration for serving files as normal web server
pub s3_web: WebConfig, pub s3_web: WebConfig,
} }
/// Configuration for RPC TLS
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct TlsConfig { pub struct TlsConfig {
/// Path to certificate autority used for all nodes
pub ca_cert: String, pub ca_cert: String,
/// Path to public certificate for this node
pub node_cert: String, pub node_cert: String,
/// Path to private key for this node
pub node_key: String, pub node_key: String,
} }
/// Configuration for S3 api
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct ApiConfig { pub struct ApiConfig {
/// Address and port to bind for api serving
pub api_bind_addr: SocketAddr, pub api_bind_addr: SocketAddr,
/// S3 region to use
pub s3_region: String, pub s3_region: String,
} }
/// Configuration for serving files as normal web server
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct WebConfig { pub struct WebConfig {
/// Address and port to bind for web serving
pub bind_addr: SocketAddr, pub bind_addr: SocketAddr,
/// Suffix to remove from domain name to find bucket
pub root_domain: String, pub root_domain: String,
/// Suffix to add when user-agent request path end with "/"
pub index: String, pub index: String,
} }
@ -73,6 +99,7 @@ fn default_control_write_max_faults() -> usize {
1 1
} }
/// Read and parse configuration
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> { pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
let mut file = std::fs::OpenOptions::new() let mut file = std::fs::OpenOptions::new()
.read(true) .read(true)

View file

@ -1,8 +1,10 @@
//! Contains common types and functions related to serialization and integrity
use rand::Rng; use rand::Rng;
use serde::de::{self, Visitor}; use serde::de::{self, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt; use std::fmt;
/// An array of 32 bytes
#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)] #[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)]
pub struct FixedBytes32([u8; 32]); pub struct FixedBytes32([u8; 32]);
@ -61,15 +63,20 @@ impl Serialize for FixedBytes32 {
} }
impl FixedBytes32 { impl FixedBytes32 {
/// Access the content as a slice
pub fn as_slice(&self) -> &[u8] { pub fn as_slice(&self) -> &[u8] {
&self.0[..] &self.0[..]
} }
/// Access the content as a mutable slice
pub fn as_slice_mut(&mut self) -> &mut [u8] { pub fn as_slice_mut(&mut self) -> &mut [u8] {
&mut self.0[..] &mut self.0[..]
} }
/// Copy to a slice
pub fn to_vec(&self) -> Vec<u8> { pub fn to_vec(&self) -> Vec<u8> {
self.0.to_vec() self.0.to_vec()
} }
/// Try building a FixedBytes32 from a slice
/// Return None if the slice is not 32 bytes long
pub fn try_from(by: &[u8]) -> Option<Self> { pub fn try_from(by: &[u8]) -> Option<Self> {
if by.len() != 32 { if by.len() != 32 {
return None; return None;
@ -80,9 +87,12 @@ impl FixedBytes32 {
} }
} }
/// A 32 bytes UUID
pub type UUID = FixedBytes32; pub type UUID = FixedBytes32;
/// A 256 bit cryptographic hash, can be sha256 or blake2 depending on provenance
pub type Hash = FixedBytes32; pub type Hash = FixedBytes32;
/// Compute the sha256 of a slice
pub fn sha256sum(data: &[u8]) -> Hash { pub fn sha256sum(data: &[u8]) -> Hash {
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
@ -93,6 +103,7 @@ pub fn sha256sum(data: &[u8]) -> Hash {
hash.into() hash.into()
} }
/// Compute the blake2 of a slice
pub fn blake2sum(data: &[u8]) -> Hash { pub fn blake2sum(data: &[u8]) -> Hash {
use blake2::{Blake2b, Digest}; use blake2::{Blake2b, Digest};
@ -103,8 +114,10 @@ pub fn blake2sum(data: &[u8]) -> Hash {
hash.into() hash.into()
} }
/// A 64 bit non cryptographic hash
pub type FastHash = u64; pub type FastHash = u64;
/// Compute a (non cryptographic) of a slice
pub fn fasthash(data: &[u8]) -> FastHash { pub fn fasthash(data: &[u8]) -> FastHash {
use xxhash_rust::xxh3::Xxh3; use xxhash_rust::xxh3::Xxh3;
@ -113,12 +126,14 @@ pub fn fasthash(data: &[u8]) -> FastHash {
h.digest() h.digest()
} }
/// Generate a random 32 bytes UUID
pub fn gen_uuid() -> UUID { pub fn gen_uuid() -> UUID {
rand::thread_rng().gen::<[u8; 32]>().into() rand::thread_rng().gen::<[u8; 32]>().into()
} }
// RMP serialization with names of fields and variants // RMP serialization with names of fields and variants
/// Serialize to MessagePack
pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error> pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
where where
T: Serialize + ?Sized, T: Serialize + ?Sized,
@ -131,10 +146,13 @@ where
Ok(wr) Ok(wr)
} }
/// Serialize to JSON, truncating long result
pub fn debug_serialize<T: Serialize>(x: T) -> String { pub fn debug_serialize<T: Serialize>(x: T) -> String {
match serde_json::to_string(&x) { match serde_json::to_string(&x) {
Ok(ss) => { Ok(ss) => {
if ss.len() > 100 { if ss.len() > 100 {
// TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
// (or more) codepoint
ss[..100].to_string() ss[..100].to_string()
} else { } else {
ss ss

View file

@ -1,9 +1,12 @@
//! Module containing error types used in Garage
#![allow(missing_docs)]
use err_derive::Error; use err_derive::Error;
use hyper::StatusCode; use hyper::StatusCode;
use std::io; use std::io;
use crate::data::*; use crate::data::*;
/// RPC related errors
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum RPCError { pub enum RPCError {
#[error(display = "Node is down: {:?}.", _0)] #[error(display = "Node is down: {:?}.", _0)]
@ -28,6 +31,7 @@ pub enum RPCError {
TooManyErrors(Vec<String>), TooManyErrors(Vec<String>),
} }
/// Regroup all Garage errors
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum Error { pub enum Error {
#[error(display = "IO error: {}", _0)] #[error(display = "IO error: {}", _0)]

View file

@ -1,3 +1,6 @@
#![warn(missing_crate_level_docs, missing_docs)]
//! Crate containing common functions and types used in Garage
#[macro_use] #[macro_use]
extern crate log; extern crate log;

View file

@ -1,6 +1,8 @@
//! Module containing helper functions to manipulate time
use chrono::{SecondsFormat, TimeZone, Utc}; use chrono::{SecondsFormat, TimeZone, Utc};
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
/// Returns milliseconds since UNIX Epoch
pub fn now_msec() -> u64 { pub fn now_msec() -> u64 {
SystemTime::now() SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
@ -8,6 +10,8 @@ pub fn now_msec() -> u64 {
.as_millis() as u64 .as_millis() as u64
} }
/// Convert a timestamp represented as milliseconds since UNIX Epoch to
/// its RFC3339 representation, such as "2021-01-01T12:30:00Z"
pub fn msec_to_rfc3339(msecs: u64) -> String { pub fn msec_to_rfc3339(msecs: u64) -> String {
let secs = msecs as i64 / 1000; let secs = msecs as i64 / 1000;
let nanos = (msecs as i64 % 1000) as u32 * 1_000_000; let nanos = (msecs as i64 % 1000) as u32 * 1_000_000;