garage/src/util/error.rs
Max Audron 9d44127245
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
add support for kubernetes service discovery
This commit adds support to discover garage instances running in
kubernetes.

Once enabled by setting `kubernetes_namespace` and
`kubernetes_service_name` garage will create a Custom Resources
`garagenodes.deuxfleurs.fr` with nodes public key as the resource name.
and IP and Port information as spec in the namespace configured by
`kubernetes_namespace`.

For discovering nodes the resources are filtered with the optionally set
`kubernetes_service_name` which sets a label
`garage.deuxfleurs.fr/service` on the resources.

This allows to separate multiple garage deployments in a single
namespace.

the `kubernetes_skip_crd` variable allows to disable the creation of the
CRD by garage itself. The user must deploy this manually.
2022-03-12 13:05:52 +01:00

199 lines
5 KiB
Rust

//! Module containing error types used in Garage
use std::fmt;
use std::io;
use err_derive::Error;
use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer};
use crate::data::*;
/// Regroup all Garage errors
#[derive(Debug, Error)]
pub enum Error {
#[error(display = "IO error: {}", _0)]
Io(#[error(source)] io::Error),
#[error(display = "Hyper error: {}", _0)]
Hyper(#[error(source)] hyper::Error),
#[error(display = "HTTP error: {}", _0)]
Http(#[error(source)] http::Error),
#[error(display = "Invalid HTTP header value: {}", _0)]
HttpHeader(#[error(source)] http::header::ToStrError),
#[error(display = "kubernetes error: {}", _0)]
Kubernetes(#[error(source)] kube::Error),
#[error(display = "Netapp error: {}", _0)]
Netapp(#[error(source)] netapp::error::Error),
#[error(display = "Sled error: {}", _0)]
Sled(#[error(source)] sled::Error),
#[error(display = "Messagepack encode error: {}", _0)]
RmpEncode(#[error(source)] rmp_serde::encode::Error),
#[error(display = "Messagepack decode error: {}", _0)]
RmpDecode(#[error(source)] rmp_serde::decode::Error),
#[error(display = "JSON error: {}", _0)]
Json(#[error(source)] serde_json::error::Error),
#[error(display = "TOML decode error: {}", _0)]
TomlDecode(#[error(source)] toml::de::Error),
#[error(display = "Tokio join error: {}", _0)]
TokioJoin(#[error(source)] tokio::task::JoinError),
#[error(display = "Tokio semaphore acquire error: {}", _0)]
TokioSemAcquire(#[error(source)] tokio::sync::AcquireError),
#[error(display = "Remote error: {}", _0)]
RemoteError(String),
#[error(display = "Timeout")]
Timeout,
#[error(
display = "Could not reach quorum of {}. {} of {} request succeeded, others returned errors: {:?}",
_0,
_1,
_2,
_3
)]
Quorum(usize, usize, usize, Vec<String>),
#[error(display = "Unexpected RPC message: {}", _0)]
UnexpectedRpcMessage(String),
#[error(display = "Corrupt data: does not match hash {:?}", _0)]
CorruptData(Hash),
#[error(display = "{}", _0)]
Message(String),
}
impl Error {
pub fn unexpected_rpc_message<T: Serialize>(v: T) -> Self {
Self::UnexpectedRpcMessage(debug_serialize(&v))
}
}
impl From<sled::transaction::TransactionError<Error>> for Error {
fn from(e: sled::transaction::TransactionError<Error>) -> Error {
match e {
sled::transaction::TransactionError::Abort(x) => x,
sled::transaction::TransactionError::Storage(x) => Error::Sled(x),
}
}
}
impl<T> From<tokio::sync::watch::error::SendError<T>> for Error {
fn from(_e: tokio::sync::watch::error::SendError<T>) -> Error {
Error::Message("Watch send error".to_string())
}
}
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
fn from(_e: tokio::sync::mpsc::error::SendError<T>) -> Error {
Error::Message("MPSC send error".to_string())
}
}
impl<'a> From<&'a str> for Error {
fn from(v: &'a str) -> Error {
Error::Message(v.to_string())
}
}
impl From<String> for Error {
fn from(v: String) -> Error {
Error::Message(v)
}
}
pub trait ErrorContext<T, E> {
fn err_context<C: std::borrow::Borrow<str>>(self, ctx: C) -> Result<T, Error>;
}
impl<T, E> ErrorContext<T, E> for Result<T, E>
where
E: std::fmt::Display,
{
#[inline]
fn err_context<C: std::borrow::Borrow<str>>(self, ctx: C) -> Result<T, Error> {
match self {
Ok(x) => Ok(x),
Err(e) => Err(Error::Message(format!("{}\n{}", ctx.borrow(), e))),
}
}
}
/// Trait to map any error type to Error::Message
pub trait OkOrMessage {
type S;
fn ok_or_message<M: Into<String>>(self, message: M) -> Result<Self::S, Error>;
}
impl<T, E> OkOrMessage for Result<T, E>
where
E: std::fmt::Display,
{
type S = T;
fn ok_or_message<M: Into<String>>(self, message: M) -> Result<T, Error> {
match self {
Ok(x) => Ok(x),
Err(e) => Err(Error::Message(format!("{}: {}", message.into(), e))),
}
}
}
impl<T> OkOrMessage for Option<T> {
type S = T;
fn ok_or_message<M: Into<String>>(self, message: M) -> Result<T, Error> {
match self {
Some(x) => Ok(x),
None => Err(Error::Message(message.into())),
}
}
}
// Custom serialization for our error type, for use in RPC.
// Errors are serialized as a string of their Display representation.
// Upon deserialization, they all become a RemoteError with the
// given representation.
impl Serialize for Error {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&format!("{}", self))
}
}
impl<'de> Deserialize<'de> for Error {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_string(ErrorVisitor)
}
}
struct ErrorVisitor;
impl<'de> Visitor<'de> for ErrorVisitor {
type Value = Error;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(formatter, "a string that represents an error value")
}
fn visit_str<E>(self, error_msg: &str) -> Result<Self::Value, E> {
Ok(Error::RemoteError(error_msg.to_string()))
}
fn visit_string<E>(self, error_msg: String) -> Result<Self::Value, E> {
Ok(Error::RemoteError(error_msg))
}
}