forked from Deuxfleurs/garage
document util crate
This commit is contained in:
parent
bf36f1f16a
commit
f9bd2d8fb7
6 changed files with 63 additions and 3 deletions
|
@ -1,3 +1,4 @@
|
||||||
|
//! Job runner for futures and async functions
|
||||||
use core::future::Future;
|
use core::future::Future;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -12,14 +13,15 @@ use crate::error::Error;
|
||||||
type JobOutput = Result<(), Error>;
|
type JobOutput = Result<(), Error>;
|
||||||
type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
|
type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
|
||||||
|
|
||||||
|
/// Job runner for futures and async functions
|
||||||
pub struct BackgroundRunner {
|
pub struct BackgroundRunner {
|
||||||
pub stop_signal: watch::Receiver<bool>,
|
stop_signal: watch::Receiver<bool>,
|
||||||
|
|
||||||
queue_in: mpsc::UnboundedSender<(Job, bool)>,
|
queue_in: mpsc::UnboundedSender<(Job, bool)>,
|
||||||
worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
|
worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BackgroundRunner {
|
impl BackgroundRunner {
|
||||||
|
/// Create a new BackgroundRunner
|
||||||
pub fn new(
|
pub fn new(
|
||||||
n_runners: usize,
|
n_runners: usize,
|
||||||
stop_signal: watch::Receiver<bool>,
|
stop_signal: watch::Receiver<bool>,
|
||||||
|
@ -103,7 +105,7 @@ impl BackgroundRunner {
|
||||||
(bgrunner, await_all_done)
|
(bgrunner, await_all_done)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Spawn a task to be run in background
|
/// Spawn a task to be run in background
|
||||||
pub fn spawn<T>(&self, job: T)
|
pub fn spawn<T>(&self, job: T)
|
||||||
where
|
where
|
||||||
T: Future<Output = JobOutput> + Send + 'static,
|
T: Future<Output = JobOutput> + Send + 'static,
|
||||||
|
@ -115,6 +117,8 @@ impl BackgroundRunner {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Spawn a task to be run in background. It may get discarded before running if spawned while
|
||||||
|
/// the runner is stopping
|
||||||
pub fn spawn_cancellable<T>(&self, job: T)
|
pub fn spawn_cancellable<T>(&self, job: T)
|
||||||
where
|
where
|
||||||
T: Future<Output = JobOutput> + Send + 'static,
|
T: Future<Output = JobOutput> + Send + 'static,
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
//! Contains type and functions related to Garage configuration file
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
@ -6,57 +7,82 @@ use serde::{de, Deserialize};
|
||||||
|
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
|
|
||||||
|
/// Represent the whole configuration
|
||||||
#[derive(Deserialize, Debug, Clone)]
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
|
/// Path where to store metadata. Should be fast, but low volume
|
||||||
pub metadata_dir: PathBuf,
|
pub metadata_dir: PathBuf,
|
||||||
|
/// Path where to store data. Can be slower, but need higher volume
|
||||||
pub data_dir: PathBuf,
|
pub data_dir: PathBuf,
|
||||||
|
|
||||||
|
/// Address to bind for RPC
|
||||||
pub rpc_bind_addr: SocketAddr,
|
pub rpc_bind_addr: SocketAddr,
|
||||||
|
|
||||||
|
/// Bootstrap peers RPC address
|
||||||
#[serde(deserialize_with = "deserialize_vec_addr")]
|
#[serde(deserialize_with = "deserialize_vec_addr")]
|
||||||
pub bootstrap_peers: Vec<SocketAddr>,
|
pub bootstrap_peers: Vec<SocketAddr>,
|
||||||
|
/// Consule host to connect to to discover more peers
|
||||||
pub consul_host: Option<String>,
|
pub consul_host: Option<String>,
|
||||||
|
/// Consul service name to use
|
||||||
pub consul_service_name: Option<String>,
|
pub consul_service_name: Option<String>,
|
||||||
|
|
||||||
|
/// Max number of concurrent RPC request
|
||||||
#[serde(default = "default_max_concurrent_rpc_requests")]
|
#[serde(default = "default_max_concurrent_rpc_requests")]
|
||||||
pub max_concurrent_rpc_requests: usize,
|
pub max_concurrent_rpc_requests: usize,
|
||||||
|
|
||||||
|
/// Size of data blocks to save to disk
|
||||||
#[serde(default = "default_block_size")]
|
#[serde(default = "default_block_size")]
|
||||||
pub block_size: usize,
|
pub block_size: usize,
|
||||||
|
|
||||||
#[serde(default = "default_control_write_max_faults")]
|
#[serde(default = "default_control_write_max_faults")]
|
||||||
pub control_write_max_faults: usize,
|
pub control_write_max_faults: usize,
|
||||||
|
|
||||||
|
/// How many nodes should hold a copy of meta data
|
||||||
#[serde(default = "default_replication_factor")]
|
#[serde(default = "default_replication_factor")]
|
||||||
pub meta_replication_factor: usize,
|
pub meta_replication_factor: usize,
|
||||||
|
|
||||||
|
/// How many nodes should hold a copy of data
|
||||||
#[serde(default = "default_replication_factor")]
|
#[serde(default = "default_replication_factor")]
|
||||||
pub data_replication_factor: usize,
|
pub data_replication_factor: usize,
|
||||||
|
|
||||||
|
/// Configuration for RPC TLS
|
||||||
pub rpc_tls: Option<TlsConfig>,
|
pub rpc_tls: Option<TlsConfig>,
|
||||||
|
|
||||||
|
/// Configuration for S3 api
|
||||||
pub s3_api: ApiConfig,
|
pub s3_api: ApiConfig,
|
||||||
|
|
||||||
|
/// Configuration for serving files as normal web server
|
||||||
pub s3_web: WebConfig,
|
pub s3_web: WebConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Configuration for RPC TLS
|
||||||
#[derive(Deserialize, Debug, Clone)]
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
pub struct TlsConfig {
|
pub struct TlsConfig {
|
||||||
|
/// Path to certificate autority used for all nodes
|
||||||
pub ca_cert: String,
|
pub ca_cert: String,
|
||||||
|
/// Path to public certificate for this node
|
||||||
pub node_cert: String,
|
pub node_cert: String,
|
||||||
|
/// Path to private key for this node
|
||||||
pub node_key: String,
|
pub node_key: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Configuration for S3 api
|
||||||
#[derive(Deserialize, Debug, Clone)]
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
pub struct ApiConfig {
|
pub struct ApiConfig {
|
||||||
|
/// Address and port to bind for api serving
|
||||||
pub api_bind_addr: SocketAddr,
|
pub api_bind_addr: SocketAddr,
|
||||||
|
/// S3 region to use
|
||||||
pub s3_region: String,
|
pub s3_region: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Configuration for serving files as normal web server
|
||||||
#[derive(Deserialize, Debug, Clone)]
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
pub struct WebConfig {
|
pub struct WebConfig {
|
||||||
|
/// Address and port to bind for web serving
|
||||||
pub bind_addr: SocketAddr,
|
pub bind_addr: SocketAddr,
|
||||||
|
/// Suffix to remove from domain name to find bucket
|
||||||
pub root_domain: String,
|
pub root_domain: String,
|
||||||
|
/// Suffix to add when user-agent request path end with "/"
|
||||||
pub index: String,
|
pub index: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,6 +99,7 @@ fn default_control_write_max_faults() -> usize {
|
||||||
1
|
1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Read and parse configuration
|
||||||
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
|
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
|
||||||
let mut file = std::fs::OpenOptions::new()
|
let mut file = std::fs::OpenOptions::new()
|
||||||
.read(true)
|
.read(true)
|
||||||
|
|
|
@ -1,8 +1,10 @@
|
||||||
|
//! Contains common types and functions related to serialization and integrity
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use serde::de::{self, Visitor};
|
use serde::de::{self, Visitor};
|
||||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
|
||||||
|
/// An array of 32 bytes
|
||||||
#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)]
|
#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)]
|
||||||
pub struct FixedBytes32([u8; 32]);
|
pub struct FixedBytes32([u8; 32]);
|
||||||
|
|
||||||
|
@ -61,15 +63,20 @@ impl Serialize for FixedBytes32 {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FixedBytes32 {
|
impl FixedBytes32 {
|
||||||
|
/// Access the content as a slice
|
||||||
pub fn as_slice(&self) -> &[u8] {
|
pub fn as_slice(&self) -> &[u8] {
|
||||||
&self.0[..]
|
&self.0[..]
|
||||||
}
|
}
|
||||||
|
/// Access the content as a mutable slice
|
||||||
pub fn as_slice_mut(&mut self) -> &mut [u8] {
|
pub fn as_slice_mut(&mut self) -> &mut [u8] {
|
||||||
&mut self.0[..]
|
&mut self.0[..]
|
||||||
}
|
}
|
||||||
|
/// Copy to a slice
|
||||||
pub fn to_vec(&self) -> Vec<u8> {
|
pub fn to_vec(&self) -> Vec<u8> {
|
||||||
self.0.to_vec()
|
self.0.to_vec()
|
||||||
}
|
}
|
||||||
|
/// Try building a FixedBytes32 from a slice
|
||||||
|
/// Return None if the slice is not 32 bytes long
|
||||||
pub fn try_from(by: &[u8]) -> Option<Self> {
|
pub fn try_from(by: &[u8]) -> Option<Self> {
|
||||||
if by.len() != 32 {
|
if by.len() != 32 {
|
||||||
return None;
|
return None;
|
||||||
|
@ -80,9 +87,12 @@ impl FixedBytes32 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A 32 bytes UUID
|
||||||
pub type UUID = FixedBytes32;
|
pub type UUID = FixedBytes32;
|
||||||
|
/// A 256 bit cryptographic hash, can be sha256 or blake2 depending on provenance
|
||||||
pub type Hash = FixedBytes32;
|
pub type Hash = FixedBytes32;
|
||||||
|
|
||||||
|
/// Compute the sha256 of a slice
|
||||||
pub fn sha256sum(data: &[u8]) -> Hash {
|
pub fn sha256sum(data: &[u8]) -> Hash {
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
|
|
||||||
|
@ -93,6 +103,7 @@ pub fn sha256sum(data: &[u8]) -> Hash {
|
||||||
hash.into()
|
hash.into()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compute the blake2 of a slice
|
||||||
pub fn blake2sum(data: &[u8]) -> Hash {
|
pub fn blake2sum(data: &[u8]) -> Hash {
|
||||||
use blake2::{Blake2b, Digest};
|
use blake2::{Blake2b, Digest};
|
||||||
|
|
||||||
|
@ -103,8 +114,10 @@ pub fn blake2sum(data: &[u8]) -> Hash {
|
||||||
hash.into()
|
hash.into()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A 64 bit non cryptographic hash
|
||||||
pub type FastHash = u64;
|
pub type FastHash = u64;
|
||||||
|
|
||||||
|
/// Compute a (non cryptographic) of a slice
|
||||||
pub fn fasthash(data: &[u8]) -> FastHash {
|
pub fn fasthash(data: &[u8]) -> FastHash {
|
||||||
use xxhash_rust::xxh3::Xxh3;
|
use xxhash_rust::xxh3::Xxh3;
|
||||||
|
|
||||||
|
@ -113,12 +126,14 @@ pub fn fasthash(data: &[u8]) -> FastHash {
|
||||||
h.digest()
|
h.digest()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Generate a random 32 bytes UUID
|
||||||
pub fn gen_uuid() -> UUID {
|
pub fn gen_uuid() -> UUID {
|
||||||
rand::thread_rng().gen::<[u8; 32]>().into()
|
rand::thread_rng().gen::<[u8; 32]>().into()
|
||||||
}
|
}
|
||||||
|
|
||||||
// RMP serialization with names of fields and variants
|
// RMP serialization with names of fields and variants
|
||||||
|
|
||||||
|
/// Serialize to MessagePack
|
||||||
pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
|
pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
|
||||||
where
|
where
|
||||||
T: Serialize + ?Sized,
|
T: Serialize + ?Sized,
|
||||||
|
@ -131,10 +146,13 @@ where
|
||||||
Ok(wr)
|
Ok(wr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Serialize to JSON, truncating long result
|
||||||
pub fn debug_serialize<T: Serialize>(x: T) -> String {
|
pub fn debug_serialize<T: Serialize>(x: T) -> String {
|
||||||
match serde_json::to_string(&x) {
|
match serde_json::to_string(&x) {
|
||||||
Ok(ss) => {
|
Ok(ss) => {
|
||||||
if ss.len() > 100 {
|
if ss.len() > 100 {
|
||||||
|
// TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
|
||||||
|
// (or more) codepoint
|
||||||
ss[..100].to_string()
|
ss[..100].to_string()
|
||||||
} else {
|
} else {
|
||||||
ss
|
ss
|
||||||
|
|
|
@ -1,9 +1,12 @@
|
||||||
|
//! Module containing error types used in Garage
|
||||||
|
#![allow(missing_docs)]
|
||||||
use err_derive::Error;
|
use err_derive::Error;
|
||||||
use hyper::StatusCode;
|
use hyper::StatusCode;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
use crate::data::*;
|
use crate::data::*;
|
||||||
|
|
||||||
|
/// RPC related errors
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
pub enum RPCError {
|
pub enum RPCError {
|
||||||
#[error(display = "Node is down: {:?}.", _0)]
|
#[error(display = "Node is down: {:?}.", _0)]
|
||||||
|
@ -28,6 +31,7 @@ pub enum RPCError {
|
||||||
TooManyErrors(Vec<String>),
|
TooManyErrors(Vec<String>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Regroup all Garage errors
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[error(display = "IO error: {}", _0)]
|
#[error(display = "IO error: {}", _0)]
|
||||||
|
|
|
@ -1,3 +1,6 @@
|
||||||
|
#![warn(missing_crate_level_docs, missing_docs)]
|
||||||
|
//! Crate containing common functions and types used in Garage
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
|
//! Module containing helper functions to manipulate time
|
||||||
use chrono::{SecondsFormat, TimeZone, Utc};
|
use chrono::{SecondsFormat, TimeZone, Utc};
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
|
/// Returns milliseconds since UNIX Epoch
|
||||||
pub fn now_msec() -> u64 {
|
pub fn now_msec() -> u64 {
|
||||||
SystemTime::now()
|
SystemTime::now()
|
||||||
.duration_since(UNIX_EPOCH)
|
.duration_since(UNIX_EPOCH)
|
||||||
|
@ -8,6 +10,8 @@ pub fn now_msec() -> u64 {
|
||||||
.as_millis() as u64
|
.as_millis() as u64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Convert a timestamp represented as milliseconds since UNIX Epoch to
|
||||||
|
/// its RFC3339 representation, such as "2021-01-01T12:30:00Z"
|
||||||
pub fn msec_to_rfc3339(msecs: u64) -> String {
|
pub fn msec_to_rfc3339(msecs: u64) -> String {
|
||||||
let secs = msecs as i64 / 1000;
|
let secs = msecs as i64 / 1000;
|
||||||
let nanos = (msecs as i64 % 1000) as u32 * 1_000_000;
|
let nanos = (msecs as i64 % 1000) as u32 * 1_000_000;
|
||||||
|
|
Loading…
Reference in a new issue