New model for buckets #172
15 changed files with 174 additions and 96 deletions
|
@ -156,62 +156,67 @@ impl Error {
|
||||||
|
|
||||||
/// Trait to map error to the Bad Request error code
|
/// Trait to map error to the Bad Request error code
|
||||||
pub trait OkOrBadRequest {
|
pub trait OkOrBadRequest {
|
||||||
type S2;
|
type S;
|
||||||
fn ok_or_bad_request(self, reason: &'static str) -> Self::S2;
|
fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<Self::S, Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, E> OkOrBadRequest for Result<T, E>
|
impl<T, E> OkOrBadRequest for Result<T, E>
|
||||||
where
|
where
|
||||||
E: std::fmt::Display,
|
E: std::fmt::Display,
|
||||||
{
|
{
|
||||||
type S2 = Result<T, Error>;
|
type S = T;
|
||||||
fn ok_or_bad_request(self, reason: &'static str) -> Result<T, Error> {
|
fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
|
||||||
match self {
|
match self {
|
||||||
Ok(x) => Ok(x),
|
Ok(x) => Ok(x),
|
||||||
Err(e) => Err(Error::BadRequest(format!("{}: {}", reason, e))),
|
Err(e) => Err(Error::BadRequest(format!(
|
||||||
|
"{}: {}",
|
||||||
|
reason.as_ref(),
|
||||||
|
e.to_string()
|
||||||
|
))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> OkOrBadRequest for Option<T> {
|
impl<T> OkOrBadRequest for Option<T> {
|
||||||
type S2 = Result<T, Error>;
|
type S = T;
|
||||||
fn ok_or_bad_request(self, reason: &'static str) -> Result<T, Error> {
|
fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
|
||||||
match self {
|
match self {
|
||||||
Some(x) => Ok(x),
|
Some(x) => Ok(x),
|
||||||
None => Err(Error::BadRequest(reason.to_string())),
|
None => Err(Error::BadRequest(reason.as_ref().to_string())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Trait to map an error to an Internal Error code
|
/// Trait to map an error to an Internal Error code
|
||||||
pub trait OkOrInternalError {
|
pub trait OkOrInternalError {
|
||||||
type S2;
|
type S;
|
||||||
fn ok_or_internal_error(self, reason: &'static str) -> Self::S2;
|
fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<Self::S, Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, E> OkOrInternalError for Result<T, E>
|
impl<T, E> OkOrInternalError for Result<T, E>
|
||||||
where
|
where
|
||||||
E: std::fmt::Display,
|
E: std::fmt::Display,
|
||||||
{
|
{
|
||||||
type S2 = Result<T, Error>;
|
type S = T;
|
||||||
fn ok_or_internal_error(self, reason: &'static str) -> Result<T, Error> {
|
fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
|
||||||
match self {
|
match self {
|
||||||
Ok(x) => Ok(x),
|
Ok(x) => Ok(x),
|
||||||
Err(e) => Err(Error::InternalError(GarageError::Message(format!(
|
Err(e) => Err(Error::InternalError(GarageError::Message(format!(
|
||||||
"{}: {}",
|
"{}: {}",
|
||||||
reason, e
|
reason.as_ref(),
|
||||||
|
e
|
||||||
)))),
|
)))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> OkOrInternalError for Option<T> {
|
impl<T> OkOrInternalError for Option<T> {
|
||||||
type S2 = Result<T, Error>;
|
type S = T;
|
||||||
fn ok_or_internal_error(self, reason: &'static str) -> Result<T, Error> {
|
fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
|
||||||
match self {
|
match self {
|
||||||
Some(x) => Ok(x),
|
Some(x) => Ok(x),
|
||||||
None => Err(Error::InternalError(GarageError::Message(
|
None => Err(Error::InternalError(GarageError::Message(
|
||||||
reason.to_string(),
|
reason.as_ref().to_string(),
|
||||||
))),
|
))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,12 +3,12 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use hyper::{Body, Request, Response, StatusCode};
|
use hyper::{Body, Request, Response, StatusCode};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_bytes::ByteBuf;
|
|
||||||
|
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
use crate::s3_xml::{xmlns_tag, IntValue, Value};
|
use crate::s3_xml::{xmlns_tag, IntValue, Value};
|
||||||
use crate::signature::verify_signed_content;
|
use crate::signature::verify_signed_content;
|
||||||
|
|
||||||
|
use garage_model::bucket_table::*;
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
use garage_util::crdt;
|
use garage_util::crdt;
|
||||||
|
@ -58,7 +58,7 @@ pub async fn handle_put_website(
|
||||||
if let crdt::Deletable::Present(param) = &mut bucket.state {
|
if let crdt::Deletable::Present(param) = &mut bucket.state {
|
||||||
param
|
param
|
||||||
.website_config
|
.website_config
|
||||||
.update(Some(ByteBuf::from(body.to_vec())));
|
.update(Some(conf.into_garage_website_config()?));
|
||||||
garage.bucket_table.insert(&bucket).await?;
|
garage.bucket_table.insert(&bucket).await?;
|
||||||
} else {
|
} else {
|
||||||
unreachable!();
|
unreachable!();
|
||||||
|
@ -168,6 +168,26 @@ impl WebsiteConfiguration {
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn into_garage_website_config(self) -> Result<WebsiteConfig, Error> {
|
||||||
|
if let Some(rart) = self.redirect_all_requests_to {
|
||||||
|
Ok(WebsiteConfig::RedirectAll {
|
||||||
|
hostname: rart.hostname.0,
|
||||||
|
protocol: rart
|
||||||
|
.protocol
|
||||||
|
.map(|x| x.0)
|
||||||
|
.unwrap_or_else(|| "http".to_string()),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
Ok(WebsiteConfig::Website {
|
||||||
|
index_document: self
|
||||||
|
.index_document
|
||||||
|
.map(|x| x.suffix.0)
|
||||||
|
.unwrap_or_else(|| "index.html".to_string()),
|
||||||
|
error_document: self.error_document.map(|x| x.key.0),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Key {
|
impl Key {
|
||||||
|
|
|
@ -4,7 +4,6 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_bytes::ByteBuf;
|
|
||||||
|
|
||||||
use garage_util::crdt::*;
|
use garage_util::crdt::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
@ -538,7 +537,10 @@ impl AdminRpcHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
let website = if query.allow {
|
let website = if query.allow {
|
||||||
Some(ByteBuf::from(DEFAULT_WEBSITE_CONFIGURATION.to_vec()))
|
Some(WebsiteConfig::Website {
|
||||||
|
index_document: "index.html".into(),
|
||||||
|
error_document: None,
|
||||||
|
})
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
|
@ -1,8 +1,10 @@
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use garage_util::data::*;
|
||||||
|
use garage_util::time::*;
|
||||||
|
|
||||||
use garage_table::crdt::*;
|
use garage_table::crdt::*;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
use garage_util::data::*;
|
|
||||||
|
|
||||||
/// The bucket alias table holds the names given to buckets
|
/// The bucket alias table holds the names given to buckets
|
||||||
/// in the global namespace.
|
/// in the global namespace.
|
||||||
|
@ -23,15 +25,19 @@ impl AutoCrdt for AliasParams {
|
||||||
|
|
||||||
impl BucketAlias {
|
impl BucketAlias {
|
||||||
pub fn new(name: String, bucket_id: Uuid) -> Option<Self> {
|
pub fn new(name: String, bucket_id: Uuid) -> Option<Self> {
|
||||||
|
Self::raw(name, now_msec(), bucket_id)
|
||||||
|
}
|
||||||
|
pub fn raw(name: String, ts: u64, bucket_id: Uuid) -> Option<Self> {
|
||||||
if !is_valid_bucket_name(&name) {
|
if !is_valid_bucket_name(&name) {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
Some(BucketAlias {
|
Some(BucketAlias {
|
||||||
name,
|
name,
|
||||||
state: crdt::Lww::new(crdt::Deletable::present(AliasParams { bucket_id })),
|
state: crdt::Lww::raw(ts, crdt::Deletable::present(AliasParams { bucket_id })),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_deleted(&self) -> bool {
|
pub fn is_deleted(&self) -> bool {
|
||||||
self.state.get().is_deleted()
|
self.state.get().is_deleted()
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_bytes::ByteBuf;
|
|
||||||
|
|
||||||
use garage_table::crdt::Crdt;
|
use garage_table::crdt::Crdt;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
|
@ -8,8 +7,6 @@ use garage_util::time::*;
|
||||||
|
|
||||||
use crate::permission::BucketKeyPerm;
|
use crate::permission::BucketKeyPerm;
|
||||||
|
|
||||||
pub const DEFAULT_WEBSITE_CONFIGURATION: &[u8] = b""; // TODO (an XML WebsiteConfiguration document per the AWS spec)
|
|
||||||
|
|
||||||
/// A bucket is a collection of objects
|
/// A bucket is a collection of objects
|
||||||
///
|
///
|
||||||
/// Its parameters are not directly accessible as:
|
/// Its parameters are not directly accessible as:
|
||||||
|
@ -33,7 +30,7 @@ pub struct BucketParams {
|
||||||
/// Whether this bucket is allowed for website access
|
/// Whether this bucket is allowed for website access
|
||||||
/// (under all of its global alias names),
|
/// (under all of its global alias names),
|
||||||
/// and if so, the website configuration XML document
|
/// and if so, the website configuration XML document
|
||||||
pub website_config: crdt::Lww<Option<ByteBuf>>,
|
pub website_config: crdt::Lww<Option<WebsiteConfig>>,
|
||||||
/// Map of aliases that are or have been given to this bucket
|
/// Map of aliases that are or have been given to this bucket
|
||||||
/// in the global namespace
|
/// in the global namespace
|
||||||
/// (not authoritative: this is just used as an indication to
|
/// (not authoritative: this is just used as an indication to
|
||||||
|
@ -45,6 +42,18 @@ pub struct BucketParams {
|
||||||
pub local_aliases: crdt::LwwMap<(String, String), bool>,
|
pub local_aliases: crdt::LwwMap<(String, String), bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub enum WebsiteConfig {
|
||||||
|
RedirectAll {
|
||||||
|
hostname: String,
|
||||||
|
protocol: String,
|
||||||
|
},
|
||||||
|
Website {
|
||||||
|
index_document: String,
|
||||||
|
error_document: Option<String>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
impl BucketParams {
|
impl BucketParams {
|
||||||
/// Create an empty BucketParams with no authorized keys and no website accesss
|
/// Create an empty BucketParams with no authorized keys and no website accesss
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
|
|
|
@ -6,6 +6,8 @@ use garage_util::data::*;
|
||||||
|
|
||||||
use crate::permission::BucketKeyPerm;
|
use crate::permission::BucketKeyPerm;
|
||||||
|
|
||||||
|
use garage_model_050::key_table as old;
|
||||||
|
|
||||||
/// An api key
|
/// An api key
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct Key {
|
pub struct Key {
|
||||||
|
@ -173,11 +175,8 @@ impl TableSchema for KeyTable {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
|
fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
|
||||||
let old_k =
|
let old_k = rmp_serde::decode::from_read_ref::<_, old::Key>(bytes).ok()?;
|
||||||
match rmp_serde::decode::from_read_ref::<_, garage_model_050::key_table::Key>(bytes) {
|
|
||||||
Ok(x) => x,
|
|
||||||
Err(_) => return None,
|
|
||||||
};
|
|
||||||
let state = if old_k.deleted.get() {
|
let state = if old_k.deleted.get() {
|
||||||
crdt::Deletable::Deleted
|
crdt::Deletable::Deleted
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1,14 +1,17 @@
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
|
||||||
pub mod block;
|
pub mod permission;
|
||||||
|
|
||||||
pub mod block_ref_table;
|
pub mod block_ref_table;
|
||||||
pub mod bucket_alias_table;
|
pub mod bucket_alias_table;
|
||||||
pub mod bucket_helper;
|
|
||||||
pub mod bucket_table;
|
pub mod bucket_table;
|
||||||
pub mod garage;
|
|
||||||
pub mod key_table;
|
pub mod key_table;
|
||||||
pub mod migrate;
|
|
||||||
pub mod object_table;
|
pub mod object_table;
|
||||||
pub mod permission;
|
|
||||||
pub mod version_table;
|
pub mod version_table;
|
||||||
|
|
||||||
|
pub mod block;
|
||||||
|
|
||||||
|
pub mod bucket_helper;
|
||||||
|
pub mod garage;
|
||||||
|
pub mod migrate;
|
||||||
|
|
|
@ -1,7 +1,5 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use serde_bytes::ByteBuf;
|
|
||||||
|
|
||||||
use garage_table::util::EmptyKey;
|
use garage_table::util::EmptyKey;
|
||||||
use garage_util::crdt::*;
|
use garage_util::crdt::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
@ -50,9 +48,12 @@ impl Migrate {
|
||||||
hex::encode(&bucket_id.as_slice()[..16])
|
hex::encode(&bucket_id.as_slice()[..16])
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut new_ak = Map::new();
|
let new_ak = old_bucket_p
|
||||||
for (k, ts, perm) in old_bucket_p.authorized_keys.items().iter() {
|
.authorized_keys
|
||||||
new_ak.put(
|
.items()
|
||||||
|
.iter()
|
||||||
|
.map(|(k, ts, perm)| {
|
||||||
|
(
|
||||||
k.to_string(),
|
k.to_string(),
|
||||||
BucketKeyPerm {
|
BucketKeyPerm {
|
||||||
timestamp: *ts,
|
timestamp: *ts,
|
||||||
|
@ -60,14 +61,19 @@ impl Migrate {
|
||||||
allow_write: perm.allow_write,
|
allow_write: perm.allow_write,
|
||||||
allow_owner: false,
|
allow_owner: false,
|
||||||
},
|
},
|
||||||
);
|
)
|
||||||
}
|
})
|
||||||
|
.collect::<Map<_, _>>();
|
||||||
|
|
||||||
let mut aliases = LwwMap::new();
|
let mut aliases = LwwMap::new();
|
||||||
aliases.update_in_place(new_name.clone(), true);
|
aliases.update_in_place(new_name.clone(), true);
|
||||||
|
let alias_ts = aliases.get_timestamp(&new_name);
|
||||||
|
|
||||||
let website = if *old_bucket_p.website.get() {
|
let website = if *old_bucket_p.website.get() {
|
||||||
Some(ByteBuf::from(DEFAULT_WEBSITE_CONFIGURATION.to_vec()))
|
Some(WebsiteConfig::Website {
|
||||||
|
index_document: "index.html".into(),
|
||||||
|
error_document: None,
|
||||||
|
})
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
@ -84,7 +90,7 @@ impl Migrate {
|
||||||
};
|
};
|
||||||
self.garage.bucket_table.insert(&new_bucket).await?;
|
self.garage.bucket_table.insert(&new_bucket).await?;
|
||||||
|
|
||||||
let new_alias = BucketAlias::new(new_name.clone(), new_bucket.id).unwrap();
|
let new_alias = BucketAlias::raw(new_name.clone(), alias_ts, new_bucket.id).unwrap();
|
||||||
self.garage.bucket_alias_table.insert(&new_alias).await?;
|
self.garage.bucket_alias_table.insert(&new_alias).await?;
|
||||||
|
|
||||||
for (k, perm) in new_ak.items().iter() {
|
for (k, perm) in new_ak.items().iter() {
|
||||||
|
|
|
@ -259,11 +259,8 @@ impl TableSchema for ObjectTable {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
|
fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
|
||||||
let old_v = match rmp_serde::decode::from_read_ref::<_, old::Object>(bytes) {
|
let old_obj = rmp_serde::decode::from_read_ref::<_, old::Object>(bytes).ok()?;
|
||||||
Ok(x) => x,
|
Some(migrate_object(old_obj))
|
||||||
Err(_) => return None,
|
|
||||||
};
|
|
||||||
Some(migrate_object(old_v))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,8 @@ use garage_table::*;
|
||||||
|
|
||||||
use crate::block_ref_table::*;
|
use crate::block_ref_table::*;
|
||||||
|
|
||||||
|
use garage_model_050::version_table as old;
|
||||||
|
|
||||||
/// A version of an object
|
/// A version of an object
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct Version {
|
pub struct Version {
|
||||||
|
@ -149,16 +151,14 @@ impl TableSchema for VersionTable {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
|
fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
|
||||||
let old =
|
let old = rmp_serde::decode::from_read_ref::<_, old::Version>(bytes).ok()?;
|
||||||
match rmp_serde::decode::from_read_ref::<_, garage_model_050::version_table::Version>(
|
|
||||||
bytes,
|
let blocks = old
|
||||||
) {
|
.blocks
|
||||||
Ok(x) => x,
|
.items()
|
||||||
Err(_) => return None,
|
.iter()
|
||||||
};
|
.map(|(k, v)| {
|
||||||
let mut new_blocks = crdt::Map::new();
|
(
|
||||||
for (k, v) in old.blocks.items().iter() {
|
|
||||||
new_blocks.put(
|
|
||||||
VersionBlockKey {
|
VersionBlockKey {
|
||||||
part_number: k.part_number,
|
part_number: k.part_number,
|
||||||
offset: k.offset,
|
offset: k.offset,
|
||||||
|
@ -167,17 +167,22 @@ impl TableSchema for VersionTable {
|
||||||
hash: Hash::try_from(v.hash.as_slice()).unwrap(),
|
hash: Hash::try_from(v.hash.as_slice()).unwrap(),
|
||||||
size: v.size,
|
size: v.size,
|
||||||
},
|
},
|
||||||
);
|
)
|
||||||
}
|
})
|
||||||
let mut new_parts_etags = crdt::Map::new();
|
.collect::<crdt::Map<_, _>>();
|
||||||
for (k, v) in old.parts_etags.items().iter() {
|
|
||||||
new_parts_etags.put(*k, v.clone());
|
let parts_etags = old
|
||||||
}
|
.parts_etags
|
||||||
|
.items()
|
||||||
|
.iter()
|
||||||
|
.map(|(k, v)| (*k, v.clone()))
|
||||||
|
.collect::<crdt::Map<_, _>>();
|
||||||
|
|
||||||
Some(Version {
|
Some(Version {
|
||||||
uuid: Hash::try_from(old.uuid.as_slice()).unwrap(),
|
uuid: Hash::try_from(old.uuid.as_slice()).unwrap(),
|
||||||
deleted: crdt::Bool::new(old.deleted.get()),
|
deleted: crdt::Bool::new(old.deleted.get()),
|
||||||
blocks: new_blocks,
|
blocks,
|
||||||
parts_etags: new_parts_etags,
|
parts_etags,
|
||||||
bucket_id: blake2sum(old.bucket.as_bytes()),
|
bucket_id: blake2sum(old.bucket.as_bytes()),
|
||||||
key: old.key,
|
key: old.key,
|
||||||
})
|
})
|
||||||
|
|
|
@ -16,6 +16,9 @@ impl PartitionKey for String {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Values of type FixedBytes32 are assumed to be random,
|
||||||
|
/// either a hash or a random UUID. This means we can use
|
||||||
|
/// them directly as an index into the hash table.
|
||||||
impl PartitionKey for FixedBytes32 {
|
impl PartitionKey for FixedBytes32 {
|
||||||
fn hash(&self) -> Hash {
|
fn hash(&self) -> Hash {
|
||||||
*self
|
*self
|
||||||
|
|
|
@ -57,10 +57,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Build a new CRDT from a previous non-compatible one
|
/// Build a new LWW CRDT from its raw pieces: a timestamp and the value
|
||||||
///
|
|
||||||
/// Compared to new, the CRDT's timestamp is not set to now
|
|
||||||
/// but must be set to the previous, non-compatible, CRDT's timestamp.
|
|
||||||
pub fn raw(ts: u64, value: T) -> Self {
|
pub fn raw(ts: u64, value: T) -> Self {
|
||||||
Self { ts, v: value }
|
Self { ts, v: value }
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,10 +38,11 @@ where
|
||||||
Self { vals: vec![] }
|
Self { vals: vec![] }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Used to migrate from a map defined in an incompatible format. This produces
|
/// This produces a map that contains a single item with the specified timestamp.
|
||||||
/// a map that contains a single item with the specified timestamp (copied from
|
///
|
||||||
/// the incompatible format). Do this as many times as you have items to migrate,
|
/// Used to migrate from a map defined in an incompatible format. Do this as many
|
||||||
/// and put them all together using the CRDT merge operator.
|
/// times as you have items to migrate, and put them all together using the
|
||||||
|
/// CRDT merge operator.
|
||||||
pub fn raw_item(k: K, ts: u64, v: V) -> Self {
|
pub fn raw_item(k: K, ts: u64, v: V) -> Self {
|
||||||
Self {
|
Self {
|
||||||
vals: vec![(k, ts, v)],
|
vals: vec![(k, ts, v)],
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
use std::iter::{FromIterator, IntoIterator};
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::crdt::crdt::*;
|
use crate::crdt::crdt::*;
|
||||||
|
@ -98,3 +100,26 @@ where
|
||||||
Self::new()
|
Self::new()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A crdt map can be created from an iterator of key-value pairs.
|
||||||
|
/// Note that all keys in the iterator must be distinct:
|
||||||
|
/// this function will throw a panic if it is not the case.
|
||||||
|
impl<K, V> FromIterator<(K, V)> for Map<K, V>
|
||||||
|
where
|
||||||
|
K: Clone + Ord,
|
||||||
|
V: Clone + Crdt,
|
||||||
|
{
|
||||||
|
fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
|
||||||
|
let mut vals: Vec<(K, V)> = iter.into_iter().collect();
|
||||||
|
vals.sort_by_cached_key(|tup| tup.0.clone());
|
||||||
|
|
||||||
|
// sanity check
|
||||||
|
for i in 1..vals.len() {
|
||||||
|
if vals[i - 1].0 == vals[i].0 {
|
||||||
|
panic!("Duplicate key in crdt::Map resulting from .from_iter() or .collect()");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Self { vals }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -119,17 +119,17 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Trait to map error to the Bad Request error code
|
/// Trait to map any error type to Error::Message
|
||||||
pub trait OkOrMessage {
|
pub trait OkOrMessage {
|
||||||
type S2;
|
type S;
|
||||||
fn ok_or_message<M: Into<String>>(self, message: M) -> Self::S2;
|
fn ok_or_message<M: Into<String>>(self, message: M) -> Result<Self::S, Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, E> OkOrMessage for Result<T, E>
|
impl<T, E> OkOrMessage for Result<T, E>
|
||||||
where
|
where
|
||||||
E: std::fmt::Display,
|
E: std::fmt::Display,
|
||||||
{
|
{
|
||||||
type S2 = Result<T, Error>;
|
type S = T;
|
||||||
fn ok_or_message<M: Into<String>>(self, message: M) -> Result<T, Error> {
|
fn ok_or_message<M: Into<String>>(self, message: M) -> Result<T, Error> {
|
||||||
match self {
|
match self {
|
||||||
Ok(x) => Ok(x),
|
Ok(x) => Ok(x),
|
||||||
|
@ -139,7 +139,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> OkOrMessage for Option<T> {
|
impl<T> OkOrMessage for Option<T> {
|
||||||
type S2 = Result<T, Error>;
|
type S = T;
|
||||||
fn ok_or_message<M: Into<String>>(self, message: M) -> Result<T, Error> {
|
fn ok_or_message<M: Into<String>>(self, message: M) -> Result<T, Error> {
|
||||||
match self {
|
match self {
|
||||||
Some(x) => Ok(x),
|
Some(x) => Ok(x),
|
||||||
|
|
Loading…
Reference in a new issue