Make K2V item timestamps globally increasing on each node
This commit is contained in:
parent
a48e2e0cb2
commit
9f5419f465
4 changed files with 65 additions and 17 deletions
|
@ -305,8 +305,10 @@ impl GarageK2V {
|
||||||
fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
|
fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
|
||||||
info!("Initialize K2V counter table...");
|
info!("Initialize K2V counter table...");
|
||||||
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
|
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
|
||||||
|
|
||||||
info!("Initialize K2V subscription manager...");
|
info!("Initialize K2V subscription manager...");
|
||||||
let subscriptions = Arc::new(SubscriptionManager::new());
|
let subscriptions = Arc::new(SubscriptionManager::new());
|
||||||
|
|
||||||
info!("Initialize K2V item table...");
|
info!("Initialize K2V item table...");
|
||||||
let item_table = Table::new(
|
let item_table = Table::new(
|
||||||
K2VItemTable {
|
K2VItemTable {
|
||||||
|
@ -317,7 +319,9 @@ impl GarageK2V {
|
||||||
system.clone(),
|
system.clone(),
|
||||||
db,
|
db,
|
||||||
);
|
);
|
||||||
let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions);
|
|
||||||
|
info!("Initialize K2V RPC handler...");
|
||||||
|
let rpc = K2VRpcHandler::new(system, db, item_table.clone(), subscriptions);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
item_table,
|
item_table,
|
||||||
|
|
|
@ -73,7 +73,8 @@ impl K2VItem {
|
||||||
this_node: Uuid,
|
this_node: Uuid,
|
||||||
context: &Option<CausalContext>,
|
context: &Option<CausalContext>,
|
||||||
new_value: DvvsValue,
|
new_value: DvvsValue,
|
||||||
) {
|
node_ts: u64,
|
||||||
|
) -> u64 {
|
||||||
if let Some(context) = context {
|
if let Some(context) = context {
|
||||||
for (node, t_discard) in context.vector_clock.iter() {
|
for (node, t_discard) in context.vector_clock.iter() {
|
||||||
if let Some(e) = self.items.get_mut(node) {
|
if let Some(e) = self.items.get_mut(node) {
|
||||||
|
@ -98,7 +99,9 @@ impl K2VItem {
|
||||||
values: vec![],
|
values: vec![],
|
||||||
});
|
});
|
||||||
let t_prev = e.max_time();
|
let t_prev = e.max_time();
|
||||||
e.values.push((t_prev + 1, new_value));
|
let t_new = std::cmp::max(t_prev + 1, node_ts + 1);
|
||||||
|
e.values.push((t_new, new_value));
|
||||||
|
t_new
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extract the causality context of a K2V Item
|
/// Extract the causality context of a K2V Item
|
||||||
|
|
|
@ -6,7 +6,8 @@
|
||||||
//! mean the vector clock gets much larger than needed).
|
//! mean the vector clock gets much larger than needed).
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::convert::TryInto;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
@ -15,9 +16,12 @@ use futures::StreamExt;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
|
|
||||||
|
use garage_db as db;
|
||||||
|
|
||||||
use garage_util::crdt::*;
|
use garage_util::crdt::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
|
use garage_util::time::now_msec;
|
||||||
|
|
||||||
use garage_rpc::system::System;
|
use garage_rpc::system::System;
|
||||||
use garage_rpc::*;
|
use garage_rpc::*;
|
||||||
|
@ -29,6 +33,8 @@ use crate::k2v::causality::*;
|
||||||
use crate::k2v::item_table::*;
|
use crate::k2v::item_table::*;
|
||||||
use crate::k2v::sub::*;
|
use crate::k2v::sub::*;
|
||||||
|
|
||||||
|
const TIMESTAMP_KEY: &'static [u8] = b"timestamp";
|
||||||
|
|
||||||
/// RPC messages for K2V
|
/// RPC messages for K2V
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
enum K2VRpc {
|
enum K2VRpc {
|
||||||
|
@ -59,6 +65,7 @@ impl Rpc for K2VRpc {
|
||||||
pub struct K2VRpcHandler {
|
pub struct K2VRpcHandler {
|
||||||
system: Arc<System>,
|
system: Arc<System>,
|
||||||
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
|
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
|
||||||
|
local_timestamp_tree: Mutex<db::Tree>,
|
||||||
endpoint: Arc<Endpoint<K2VRpc, Self>>,
|
endpoint: Arc<Endpoint<K2VRpc, Self>>,
|
||||||
subscriptions: Arc<SubscriptionManager>,
|
subscriptions: Arc<SubscriptionManager>,
|
||||||
}
|
}
|
||||||
|
@ -66,14 +73,19 @@ pub struct K2VRpcHandler {
|
||||||
impl K2VRpcHandler {
|
impl K2VRpcHandler {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
system: Arc<System>,
|
system: Arc<System>,
|
||||||
|
db: &db::Db,
|
||||||
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
|
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
|
||||||
subscriptions: Arc<SubscriptionManager>,
|
subscriptions: Arc<SubscriptionManager>,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
|
let local_timestamp_tree = db
|
||||||
|
.open_tree("k2v_local_timestamp")
|
||||||
|
.expect("Unable to open DB tree for k2v local timestamp");
|
||||||
let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string());
|
let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string());
|
||||||
|
|
||||||
let rpc_handler = Arc::new(Self {
|
let rpc_handler = Arc::new(Self {
|
||||||
system,
|
system,
|
||||||
item_table,
|
item_table,
|
||||||
|
local_timestamp_tree: Mutex::new(local_timestamp_tree),
|
||||||
endpoint,
|
endpoint,
|
||||||
subscriptions,
|
subscriptions,
|
||||||
});
|
});
|
||||||
|
@ -273,9 +285,22 @@ impl K2VRpcHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
|
fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
|
||||||
|
// Using a mutex on the local_timestamp_tree is not strictly necessary,
|
||||||
|
// but it helps to not try to do several inserts at the same time,
|
||||||
|
// which would create transaction conflicts and force many useless retries.
|
||||||
|
let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap();
|
||||||
|
|
||||||
|
let now = now_msec();
|
||||||
|
|
||||||
self.item_table
|
self.item_table
|
||||||
.data
|
.data
|
||||||
.update_entry_with(&item.partition, &item.sort_key, |ent| {
|
.update_entry_with(&item.partition, &item.sort_key, |tx, ent| {
|
||||||
|
let old_local_timestamp = tx
|
||||||
|
.get(&local_timestamp_tree, TIMESTAMP_KEY)?
|
||||||
|
.and_then(|x| x.try_into().ok())
|
||||||
|
.map(u64::from_be_bytes)
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
let mut ent = ent.unwrap_or_else(|| {
|
let mut ent = ent.unwrap_or_else(|| {
|
||||||
K2VItem::new(
|
K2VItem::new(
|
||||||
item.partition.bucket_id,
|
item.partition.bucket_id,
|
||||||
|
@ -283,8 +308,20 @@ impl K2VRpcHandler {
|
||||||
item.sort_key.clone(),
|
item.sort_key.clone(),
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
ent.update(self.system.id, &item.causal_context, item.value.clone());
|
let new_local_timestamp = ent.update(
|
||||||
ent
|
self.system.id,
|
||||||
|
&item.causal_context,
|
||||||
|
item.value.clone(),
|
||||||
|
std::cmp::max(old_local_timestamp, now),
|
||||||
|
);
|
||||||
|
|
||||||
|
tx.insert(
|
||||||
|
&local_timestamp_tree,
|
||||||
|
TIMESTAMP_KEY,
|
||||||
|
u64::to_be_bytes(new_local_timestamp),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(ent)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -181,13 +181,17 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
||||||
pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
|
pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
|
||||||
let update = self.decode_entry(update_bytes)?;
|
let update = self.decode_entry(update_bytes)?;
|
||||||
|
|
||||||
self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent {
|
self.update_entry_with(
|
||||||
Some(mut ent) => {
|
update.partition_key(),
|
||||||
ent.merge(&update);
|
update.sort_key(),
|
||||||
ent
|
|_tx, ent| match ent {
|
||||||
}
|
Some(mut ent) => {
|
||||||
None => update.clone(),
|
ent.merge(&update);
|
||||||
})?;
|
Ok(ent)
|
||||||
|
}
|
||||||
|
None => Ok(update.clone()),
|
||||||
|
},
|
||||||
|
)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,7 +199,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
||||||
&self,
|
&self,
|
||||||
partition_key: &F::P,
|
partition_key: &F::P,
|
||||||
sort_key: &F::S,
|
sort_key: &F::S,
|
||||||
f: impl Fn(Option<F::E>) -> F::E,
|
update_fn: impl Fn(&mut db::Transaction, Option<F::E>) -> db::TxOpResult<F::E>,
|
||||||
) -> Result<Option<F::E>, Error> {
|
) -> Result<Option<F::E>, Error> {
|
||||||
let tree_key = self.tree_key(partition_key, sort_key);
|
let tree_key = self.tree_key(partition_key, sort_key);
|
||||||
|
|
||||||
|
@ -203,10 +207,10 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
||||||
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
|
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
|
||||||
Some(old_bytes) => {
|
Some(old_bytes) => {
|
||||||
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
|
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
|
||||||
let new_entry = f(Some(old_entry.clone()));
|
let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?;
|
||||||
(Some(old_entry), Some(old_bytes), new_entry)
|
(Some(old_entry), Some(old_bytes), new_entry)
|
||||||
}
|
}
|
||||||
None => (None, None, f(None)),
|
None => (None, None, update_fn(&mut tx, None)?),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Changed can be true in two scenarios
|
// Changed can be true in two scenarios
|
||||||
|
|
Loading…
Reference in a new issue