forked from Deuxfleurs/garage
Merge branch 'master' into feature/website
This commit is contained in:
commit
ccda9ab1ca
5 changed files with 207 additions and 38 deletions
|
@ -8,6 +8,9 @@ use garage_util::error::Error;
|
||||||
|
|
||||||
use crate::key_table::PermissionSet;
|
use crate::key_table::PermissionSet;
|
||||||
|
|
||||||
|
// We import the same file but in its version 0.1.0.
|
||||||
|
// We can then access v0.1.0 data structures.
|
||||||
|
// We use them to perform migrations.
|
||||||
use model010::bucket_table as prev;
|
use model010::bucket_table as prev;
|
||||||
|
|
||||||
/// A bucket is a collection of objects
|
/// A bucket is a collection of objects
|
||||||
|
|
|
@ -5,4 +5,4 @@ pub mod consul;
|
||||||
pub mod membership;
|
pub mod membership;
|
||||||
pub mod rpc_client;
|
pub mod rpc_client;
|
||||||
pub mod rpc_server;
|
pub mod rpc_server;
|
||||||
pub mod tls_util;
|
pub(crate) mod tls_util;
|
||||||
|
|
|
@ -46,13 +46,13 @@ impl RpcMessage for Message {}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub struct PingMessage {
|
pub struct PingMessage {
|
||||||
pub id: UUID,
|
id: UUID,
|
||||||
pub rpc_port: u16,
|
rpc_port: u16,
|
||||||
|
|
||||||
pub status_hash: Hash,
|
status_hash: Hash,
|
||||||
pub config_version: u64,
|
config_version: u64,
|
||||||
|
|
||||||
pub state_info: StateInfo,
|
state_info: StateInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
@ -81,12 +81,13 @@ pub struct NetworkConfigEntry {
|
||||||
|
|
||||||
pub struct System {
|
pub struct System {
|
||||||
pub id: UUID,
|
pub id: UUID,
|
||||||
pub data_dir: PathBuf,
|
|
||||||
pub rpc_local_port: u16,
|
|
||||||
|
|
||||||
pub state_info: StateInfo,
|
metadata_dir: PathBuf,
|
||||||
|
rpc_local_port: u16,
|
||||||
|
|
||||||
pub rpc_http_client: Arc<RpcHttpClient>,
|
state_info: StateInfo,
|
||||||
|
|
||||||
|
rpc_http_client: Arc<RpcHttpClient>,
|
||||||
rpc_client: Arc<RpcClient<Message>>,
|
rpc_client: Arc<RpcClient<Message>>,
|
||||||
|
|
||||||
pub status: watch::Receiver<Arc<Status>>,
|
pub status: watch::Receiver<Arc<Status>>,
|
||||||
|
@ -296,15 +297,15 @@ fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
|
||||||
|
|
||||||
impl System {
|
impl System {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
data_dir: PathBuf,
|
metadata_dir: PathBuf,
|
||||||
rpc_http_client: Arc<RpcHttpClient>,
|
rpc_http_client: Arc<RpcHttpClient>,
|
||||||
background: Arc<BackgroundRunner>,
|
background: Arc<BackgroundRunner>,
|
||||||
rpc_server: &mut RpcServer,
|
rpc_server: &mut RpcServer,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
let id = gen_node_id(&data_dir).expect("Unable to read or generate node ID");
|
let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID");
|
||||||
info!("Node ID: {}", hex::encode(&id));
|
info!("Node ID: {}", hex::encode(&id));
|
||||||
|
|
||||||
let net_config = match read_network_config(&data_dir) {
|
let net_config = match read_network_config(&metadata_dir) {
|
||||||
Ok(x) => x,
|
Ok(x) => x,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
info!(
|
info!(
|
||||||
|
@ -347,7 +348,7 @@ impl System {
|
||||||
|
|
||||||
let sys = Arc::new(System {
|
let sys = Arc::new(System {
|
||||||
id,
|
id,
|
||||||
data_dir,
|
metadata_dir,
|
||||||
rpc_local_port: rpc_server.bind_addr.port(),
|
rpc_local_port: rpc_server.bind_addr.port(),
|
||||||
state_info,
|
state_info,
|
||||||
rpc_http_client,
|
rpc_http_client,
|
||||||
|
@ -388,7 +389,7 @@ impl System {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
|
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
|
||||||
let mut path = self.data_dir.clone();
|
let mut path = self.metadata_dir.clone();
|
||||||
path.push("network_config");
|
path.push("network_config");
|
||||||
|
|
||||||
let ring = self.ring.borrow().clone();
|
let ring = self.ring.borrow().clone();
|
||||||
|
@ -399,7 +400,7 @@ impl System {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn make_ping(&self) -> Message {
|
fn make_ping(&self) -> Message {
|
||||||
let status = self.status.borrow().clone();
|
let status = self.status.borrow().clone();
|
||||||
let ring = self.ring.borrow().clone();
|
let ring = self.ring.borrow().clone();
|
||||||
Message::Ping(PingMessage {
|
Message::Ping(PingMessage {
|
||||||
|
@ -411,7 +412,7 @@ impl System {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) {
|
async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) {
|
||||||
let status = self.status.borrow().clone();
|
let status = self.status.borrow().clone();
|
||||||
let to = status
|
let to = status
|
||||||
.nodes
|
.nodes
|
||||||
|
@ -527,7 +528,7 @@ impl System {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_ping(
|
async fn handle_ping(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
from: &SocketAddr,
|
from: &SocketAddr,
|
||||||
ping: &PingMessage,
|
ping: &PingMessage,
|
||||||
|
@ -557,7 +558,7 @@ impl System {
|
||||||
Ok(self.make_ping())
|
Ok(self.make_ping())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn handle_pull_status(&self) -> Result<Message, Error> {
|
fn handle_pull_status(&self) -> Result<Message, Error> {
|
||||||
let status = self.status.borrow().clone();
|
let status = self.status.borrow().clone();
|
||||||
let mut mem = vec![];
|
let mut mem = vec![];
|
||||||
for (node, status) in status.nodes.iter() {
|
for (node, status) in status.nodes.iter() {
|
||||||
|
@ -577,12 +578,12 @@ impl System {
|
||||||
Ok(Message::AdvertiseNodesUp(mem))
|
Ok(Message::AdvertiseNodesUp(mem))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn handle_pull_config(&self) -> Result<Message, Error> {
|
fn handle_pull_config(&self) -> Result<Message, Error> {
|
||||||
let ring = self.ring.borrow().clone();
|
let ring = self.ring.borrow().clone();
|
||||||
Ok(Message::AdvertiseConfig(ring.config.clone()))
|
Ok(Message::AdvertiseConfig(ring.config.clone()))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_advertise_nodes_up(
|
async fn handle_advertise_nodes_up(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
adv: &[AdvertisedNode],
|
adv: &[AdvertisedNode],
|
||||||
) -> Result<Message, Error> {
|
) -> Result<Message, Error> {
|
||||||
|
@ -635,7 +636,7 @@ impl System {
|
||||||
Ok(Message::Ok)
|
Ok(Message::Ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_advertise_config(
|
async fn handle_advertise_config(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
adv: &NetworkConfig,
|
adv: &NetworkConfig,
|
||||||
) -> Result<Message, Error> {
|
) -> Result<Message, Error> {
|
||||||
|
@ -716,7 +717,7 @@ impl System {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn pull_status(
|
fn pull_status(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
peer: UUID,
|
peer: UUID,
|
||||||
) -> impl futures::future::Future<Output = ()> + Send + 'static {
|
) -> impl futures::future::Future<Output = ()> + Send + 'static {
|
||||||
|
@ -731,7 +732,7 @@ impl System {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn pull_config(self: Arc<Self>, peer: UUID) {
|
async fn pull_config(self: Arc<Self>, peer: UUID) {
|
||||||
let resp = self
|
let resp = self
|
||||||
.rpc_client
|
.rpc_client
|
||||||
.call(peer, Message::PullConfig, PING_TIMEOUT)
|
.call(peer, Message::PullConfig, PING_TIMEOUT)
|
||||||
|
|
|
@ -61,7 +61,7 @@ pub struct RpcClient<M: RpcMessage> {
|
||||||
|
|
||||||
local_handler: ArcSwapOption<(UUID, LocalHandlerFn<M>)>,
|
local_handler: ArcSwapOption<(UUID, LocalHandlerFn<M>)>,
|
||||||
|
|
||||||
pub rpc_addr_client: RpcAddrClient<M>,
|
rpc_addr_client: RpcAddrClient<M>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: RpcMessage + 'static> RpcClient<M> {
|
impl<M: RpcMessage + 'static> RpcClient<M> {
|
||||||
|
@ -215,8 +215,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
||||||
pub struct RpcAddrClient<M: RpcMessage> {
|
pub struct RpcAddrClient<M: RpcMessage> {
|
||||||
phantom: PhantomData<M>,
|
phantom: PhantomData<M>,
|
||||||
|
|
||||||
pub http_client: Arc<RpcHttpClient>,
|
http_client: Arc<RpcHttpClient>,
|
||||||
pub path: String,
|
path: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: RpcMessage> RpcAddrClient<M> {
|
impl<M: RpcMessage> RpcAddrClient<M> {
|
||||||
|
|
|
@ -1,11 +1,48 @@
|
||||||
|
//! This package provides a simple implementation of conflict-free replicated data types (CRDTs)
|
||||||
|
//!
|
||||||
|
//! CRDTs are a type of data structures that do not require coordination. In other words, we can
|
||||||
|
//! edit them in parallel, we will always find a way to merge it.
|
||||||
|
//!
|
||||||
|
//! A general example is a counter. Its initial value is 0. Alice and Bob get a copy of the
|
||||||
|
//! counter. Alice does +1 on her copy, she reads 1. Bob does +3 on his copy, he reads 3. Now,
|
||||||
|
//! it is easy to merge their counters, order does not count: we always get 4.
|
||||||
|
//!
|
||||||
|
//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type)
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
|
||||||
|
/// Definition of a CRDT - all CRDT Rust types implement this.
|
||||||
|
///
|
||||||
|
/// A CRDT is defined as a merge operator that respects a certain set of axioms.
|
||||||
|
///
|
||||||
|
/// In particular, the merge operator must be commutative, associative,
|
||||||
|
/// idempotent, and monotonic.
|
||||||
|
/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator,
|
||||||
|
/// the following axioms must apply:
|
||||||
|
///
|
||||||
|
/// ```text
|
||||||
|
/// a ⊔ b = b ⊔ a (commutativity)
|
||||||
|
/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) (associativity)
|
||||||
|
/// (a ⊔ b) ⊔ b = a ⊔ b (idempotence)
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order.
|
||||||
|
/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`,
|
||||||
|
/// as this would imply a cycle in the partial order.
|
||||||
pub trait CRDT {
|
pub trait CRDT {
|
||||||
|
/// Merge the two datastructures according to the CRDT rules.
|
||||||
|
/// `self` is modified to contain the merged CRDT value. `other` is not modified.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `other` - the other CRDT we wish to merge with
|
||||||
fn merge(&mut self, other: &Self);
|
fn merge(&mut self, other: &Self);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// All types that implement `Ord` (a total order) also implement a trivial CRDT
|
||||||
|
/// defined by the merge rule: `a ⊔ b = max(a, b)`.
|
||||||
impl<T> CRDT for T
|
impl<T> CRDT for T
|
||||||
where
|
where
|
||||||
T: Ord + Clone,
|
T: Ord + Clone,
|
||||||
|
@ -19,6 +56,37 @@ where
|
||||||
|
|
||||||
// ---- LWW Register ----
|
// ---- LWW Register ----
|
||||||
|
|
||||||
|
/// Last Write Win (LWW)
|
||||||
|
///
|
||||||
|
/// An LWW CRDT associates a timestamp with a value, in order to implement a
|
||||||
|
/// time-based reconciliation rule: the most recent write wins.
|
||||||
|
/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs
|
||||||
|
/// with the same timestamp but different values.
|
||||||
|
///
|
||||||
|
/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must
|
||||||
|
/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to
|
||||||
|
/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types
|
||||||
|
/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value.
|
||||||
|
/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is
|
||||||
|
/// generally desirable in this case to never explicitly produce LWW values with the same timestamp
|
||||||
|
/// but different inner values, as the rule to keep the maximum value isn't generally the desired
|
||||||
|
/// semantics.)
|
||||||
|
///
|
||||||
|
/// As multiple computers clocks are always desynchronized,
|
||||||
|
/// when operations are close enough, it is equivalent to
|
||||||
|
/// take one copy and drop the other one.
|
||||||
|
///
|
||||||
|
/// Given that clocks are not too desynchronized, this assumption
|
||||||
|
/// is enough for most cases, as there is few chance that two humans
|
||||||
|
/// coordonate themself faster than the time difference between two NTP servers.
|
||||||
|
///
|
||||||
|
/// As a more concret example, let's suppose you want to upload a file
|
||||||
|
/// with the same key (path) in the same bucket at the very same time.
|
||||||
|
/// For each request, the file will be timestamped by the receiving server
|
||||||
|
/// and may differ from what you observed with your atomic clock!
|
||||||
|
///
|
||||||
|
/// This scheme is used by AWS S3 or Soundcloud and often without knowing
|
||||||
|
/// in entreprise when reconciliating databases with ad-hoc scripts.
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||||
pub struct LWW<T> {
|
pub struct LWW<T> {
|
||||||
ts: u64,
|
ts: u64,
|
||||||
|
@ -29,22 +97,55 @@ impl<T> LWW<T>
|
||||||
where
|
where
|
||||||
T: CRDT,
|
T: CRDT,
|
||||||
{
|
{
|
||||||
|
/// Creates a new CRDT
|
||||||
|
///
|
||||||
|
/// CRDT's internal timestamp is set with current node's clock.
|
||||||
pub fn new(value: T) -> Self {
|
pub fn new(value: T) -> Self {
|
||||||
Self {
|
Self {
|
||||||
ts: now_msec(),
|
ts: now_msec(),
|
||||||
v: value,
|
v: value,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Build a new CRDT from a previous non-compatible one
|
||||||
|
///
|
||||||
|
/// Compared to new, the CRDT's timestamp is not set to now
|
||||||
|
/// but must be set to the previous, non-compatible, CRDT's timestamp.
|
||||||
pub fn migrate_from_raw(ts: u64, value: T) -> Self {
|
pub fn migrate_from_raw(ts: u64, value: T) -> Self {
|
||||||
Self { ts, v: value }
|
Self { ts, v: value }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Update the LWW CRDT while keeping some causal ordering.
|
||||||
|
///
|
||||||
|
/// The timestamp of the LWW CRDT is updated to be the current node's clock
|
||||||
|
/// at time of update, or the previous timestamp + 1 if that's bigger,
|
||||||
|
/// so that the new timestamp is always strictly larger than the previous one.
|
||||||
|
/// This ensures that merging the update with the old value will result in keeping
|
||||||
|
/// the updated value.
|
||||||
pub fn update(&mut self, new_value: T) {
|
pub fn update(&mut self, new_value: T) {
|
||||||
self.ts = std::cmp::max(self.ts + 1, now_msec());
|
self.ts = std::cmp::max(self.ts + 1, now_msec());
|
||||||
self.v = new_value;
|
self.v = new_value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the CRDT value
|
||||||
pub fn get(&self) -> &T {
|
pub fn get(&self) -> &T {
|
||||||
&self.v
|
&self.v
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get a mutable reference to the CRDT's value
|
||||||
|
///
|
||||||
|
/// This is usefull to mutate the inside value without changing the LWW timestamp.
|
||||||
|
/// When such mutation is done, the merge between two LWW values is done using the inner
|
||||||
|
/// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large
|
||||||
|
/// data type, such as a map, and we only want to change a single item in the map.
|
||||||
|
/// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification.
|
||||||
|
/// This delta consists in a LWW with the same timestamp, and the map
|
||||||
|
/// inside only contains the updated value.
|
||||||
|
/// The advantage of such a delta is that it is much smaller than the whole map.
|
||||||
|
///
|
||||||
|
/// Avoid using this if the inner data type is a primitive type such as a number or a string,
|
||||||
|
/// as you will then rely on the merge function defined on `Ord` types by keeping the maximum
|
||||||
|
/// of both values.
|
||||||
pub fn get_mut(&mut self) -> &mut T {
|
pub fn get_mut(&mut self) -> &mut T {
|
||||||
&mut self.v
|
&mut self.v
|
||||||
}
|
}
|
||||||
|
@ -64,18 +165,20 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- Boolean (true as absorbing state) ----
|
/// Boolean, where `true` is an absorbing state
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
|
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
|
||||||
pub struct Bool(bool);
|
pub struct Bool(bool);
|
||||||
|
|
||||||
impl Bool {
|
impl Bool {
|
||||||
|
/// Create a new boolean with the specified value
|
||||||
pub fn new(b: bool) -> Self {
|
pub fn new(b: bool) -> Self {
|
||||||
Self(b)
|
Self(b)
|
||||||
}
|
}
|
||||||
|
/// Set the boolean to true
|
||||||
pub fn set(&mut self) {
|
pub fn set(&mut self) {
|
||||||
self.0 = true;
|
self.0 = true;
|
||||||
}
|
}
|
||||||
|
/// Get the boolean value
|
||||||
pub fn get(&self) -> bool {
|
pub fn get(&self) -> bool {
|
||||||
self.0
|
self.0
|
||||||
}
|
}
|
||||||
|
@ -87,8 +190,23 @@ impl CRDT for Bool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- LWW Map ----
|
/// Last Write Win Map
|
||||||
|
///
|
||||||
|
/// This types defines a CRDT for a map from keys to values.
|
||||||
|
/// The values have an associated timestamp, such that the last written value
|
||||||
|
/// takes precedence over previous ones. As for the simpler `LWW` type, the value
|
||||||
|
/// type `V` is also required to implement the CRDT trait.
|
||||||
|
/// We do not encourage mutating the values associated with a given key
|
||||||
|
/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()`
|
||||||
|
/// method that would allow that.
|
||||||
|
///
|
||||||
|
/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order.
|
||||||
|
/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization,
|
||||||
|
/// such that two values can be compared for equality based on their hashes). As a consequence,
|
||||||
|
/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps.
|
||||||
|
/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
|
||||||
|
/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
|
||||||
|
/// actually not losing anything here.
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||||
pub struct LWWMap<K, V> {
|
pub struct LWWMap<K, V> {
|
||||||
vals: Vec<(K, u64, V)>,
|
vals: Vec<(K, u64, V)>,
|
||||||
|
@ -99,21 +217,35 @@ where
|
||||||
K: Ord,
|
K: Ord,
|
||||||
V: CRDT,
|
V: CRDT,
|
||||||
{
|
{
|
||||||
|
/// Create a new empty map CRDT
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self { vals: vec![] }
|
Self { vals: vec![] }
|
||||||
}
|
}
|
||||||
|
/// Used to migrate from a map defined in an incompatible format. This produces
|
||||||
|
/// a map that contains a single item with the specified timestamp (copied from
|
||||||
|
/// the incompatible format). Do this as many times as you have items to migrate,
|
||||||
|
/// and put them all together using the CRDT merge operator.
|
||||||
pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
|
pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
|
||||||
Self {
|
Self {
|
||||||
vals: vec![(k, ts, v)],
|
vals: vec![(k, ts, v)],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn take_and_clear(&mut self) -> Self {
|
/// Returns a map that contains a single mapping from the specified key to the specified value.
|
||||||
let vals = std::mem::replace(&mut self.vals, vec![]);
|
/// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map,
|
||||||
Self { vals }
|
/// the previous value will be replaced with the one specified here.
|
||||||
}
|
/// The timestamp in the provided mutator is set to the maximum of the current system's clock
|
||||||
pub fn clear(&mut self) {
|
/// and 1 + the previous value's timestamp (if there is one), so that the new value will always
|
||||||
self.vals.clear();
|
/// take precedence (LWW rule).
|
||||||
}
|
///
|
||||||
|
/// Typically, to update the value associated to a key in the map, you would do the following:
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// let my_update = my_crdt.update_mutator(key_to_modify, new_value);
|
||||||
|
/// my_crdt.merge(&my_update);
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// However extracting the mutator on its own and only sending that on the network is very
|
||||||
|
/// interesting as it is much smaller than the whole map.
|
||||||
pub fn update_mutator(&self, k: K, new_v: V) -> Self {
|
pub fn update_mutator(&self, k: K, new_v: V) -> Self {
|
||||||
let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
|
let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
|
||||||
Ok(i) => {
|
Ok(i) => {
|
||||||
|
@ -125,12 +257,45 @@ where
|
||||||
};
|
};
|
||||||
Self { vals: new_vals }
|
Self { vals: new_vals }
|
||||||
}
|
}
|
||||||
|
/// Takes all of the values of the map and returns them. The current map is reset to the
|
||||||
|
/// empty map. This is very usefull to produce in-place a new map that contains only a delta
|
||||||
|
/// that modifies a certain value:
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// let mut a = get_my_crdt_value();
|
||||||
|
/// let old_a = a.take_and_clear();
|
||||||
|
/// a.merge(&old_a.update_mutator(key_to_modify, new_value));
|
||||||
|
/// put_my_crdt_value(a);
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// Of course in this simple example we could have written simply
|
||||||
|
/// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`,
|
||||||
|
/// but in the case where the map is a field in a struct for instance (as is always the case),
|
||||||
|
/// this becomes very handy:
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// let mut a = get_my_crdt_value();
|
||||||
|
/// let old_a_map = a.map_field.take_and_clear();
|
||||||
|
/// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value));
|
||||||
|
/// put_my_crdt_value(a);
|
||||||
|
/// ```
|
||||||
|
pub fn take_and_clear(&mut self) -> Self {
|
||||||
|
let vals = std::mem::replace(&mut self.vals, vec![]);
|
||||||
|
Self { vals }
|
||||||
|
}
|
||||||
|
/// Removes all values from the map
|
||||||
|
pub fn clear(&mut self) {
|
||||||
|
self.vals.clear();
|
||||||
|
}
|
||||||
|
/// Get a reference to the value assigned to a key
|
||||||
pub fn get(&self, k: &K) -> Option<&V> {
|
pub fn get(&self, k: &K) -> Option<&V> {
|
||||||
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
|
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
|
||||||
Ok(i) => Some(&self.vals[i].2),
|
Ok(i) => Some(&self.vals[i].2),
|
||||||
Err(_) => None,
|
Err(_) => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
|
||||||
|
/// In most case you will want to ignore the timestamp (second item of the tuple).
|
||||||
pub fn items(&self) -> &[(K, u64, V)] {
|
pub fn items(&self) -> &[(K, u64, V)] {
|
||||||
&self.vals[..]
|
&self.vals[..]
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue