fix clippy warnings on table
This commit is contained in:
parent
f05bb111c2
commit
f5a0cf0414
10 changed files with 63 additions and 43 deletions
|
@ -52,12 +52,10 @@ where
|
||||||
*self = other.clone();
|
*self = other.clone();
|
||||||
}
|
}
|
||||||
warn!("Making an arbitrary choice: {:?}", self);
|
warn!("Making an arbitrary choice: {:?}", self);
|
||||||
} else {
|
} else if other > self {
|
||||||
if other > self {
|
|
||||||
*self = other.clone();
|
*self = other.clone();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AutoCRDT for String {
|
impl AutoCRDT for String {
|
||||||
|
|
|
@ -94,7 +94,7 @@ where
|
||||||
/// put_my_crdt_value(a);
|
/// put_my_crdt_value(a);
|
||||||
/// ```
|
/// ```
|
||||||
pub fn take_and_clear(&mut self) -> Self {
|
pub fn take_and_clear(&mut self) -> Self {
|
||||||
let vals = std::mem::replace(&mut self.vals, vec![]);
|
let vals = std::mem::take(&mut self.vals);
|
||||||
Self { vals }
|
Self { vals }
|
||||||
}
|
}
|
||||||
/// Removes all values from the map
|
/// Removes all values from the map
|
||||||
|
@ -113,10 +113,16 @@ where
|
||||||
pub fn items(&self) -> &[(K, u64, V)] {
|
pub fn items(&self) -> &[(K, u64, V)] {
|
||||||
&self.vals[..]
|
&self.vals[..]
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the number of items in the map
|
/// Returns the number of items in the map
|
||||||
pub fn len(&self) -> usize {
|
pub fn len(&self) -> usize {
|
||||||
self.vals.len()
|
self.vals.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns true if the map is empty
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.len() == 0
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<K, V> CRDT for LWWMap<K, V>
|
impl<K, V> CRDT for LWWMap<K, V>
|
||||||
|
@ -143,3 +149,13 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<K, V> Default for LWWMap<K, V>
|
||||||
|
where
|
||||||
|
K: Ord,
|
||||||
|
V: CRDT,
|
||||||
|
{
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -62,6 +62,11 @@ where
|
||||||
pub fn len(&self) -> usize {
|
pub fn len(&self) -> usize {
|
||||||
self.vals.len()
|
self.vals.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns true if the map is empty
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.len() == 0
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<K, V> CRDT for Map<K, V>
|
impl<K, V> CRDT for Map<K, V>
|
||||||
|
@ -82,3 +87,13 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<K, V> Default for Map<K, V>
|
||||||
|
where
|
||||||
|
K: Clone + Ord,
|
||||||
|
V: Clone + CRDT,
|
||||||
|
{
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -10,6 +10,7 @@
|
||||||
//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type)
|
//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type)
|
||||||
|
|
||||||
mod bool;
|
mod bool;
|
||||||
|
#[allow(clippy::module_inception)]
|
||||||
mod crdt;
|
mod crdt;
|
||||||
mod lww;
|
mod lww;
|
||||||
mod lww_map;
|
mod lww_map;
|
||||||
|
|
|
@ -85,8 +85,8 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
select! {
|
select! {
|
||||||
_ = tokio::time::sleep(Duration::from_secs(10)).fuse() => (),
|
_ = tokio::time::sleep(Duration::from_secs(10)).fuse() => {},
|
||||||
_ = must_exit.changed().fuse() => (),
|
_ = must_exit.changed().fuse() => {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -120,7 +120,7 @@ where
|
||||||
self.todo_remove_if_equal(&k[..], vhash)?;
|
self.todo_remove_if_equal(&k[..], vhash)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if entries.len() == 0 {
|
if entries.is_empty() {
|
||||||
// Nothing to do in this iteration
|
// Nothing to do in this iteration
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
@ -247,7 +247,7 @@ where
|
||||||
}
|
}
|
||||||
Ok(GcRPC::Ok)
|
Ok(GcRPC::Ok)
|
||||||
}
|
}
|
||||||
_ => Err(Error::Message(format!("Unexpected GC RPC"))),
|
_ => Err(Error::Message("Unexpected GC RPC".to_string())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
#![recursion_limit = "1024"]
|
#![recursion_limit = "1024"]
|
||||||
|
#![allow(clippy::comparison_chain, clippy::upper_case_acronyms)]
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
|
|
@ -111,8 +111,8 @@ where
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
select! {
|
select! {
|
||||||
_ = self.data.merkle_todo_notify.notified().fuse() => (),
|
_ = self.data.merkle_todo_notify.notified().fuse() => {},
|
||||||
_ = must_exit.changed().fuse() => (),
|
_ = must_exit.changed().fuse() => {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -121,10 +121,10 @@ where
|
||||||
fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> {
|
fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> {
|
||||||
let khash = blake2sum(k);
|
let khash = blake2sum(k);
|
||||||
|
|
||||||
let new_vhash = if vhash_by.len() == 0 {
|
let new_vhash = if vhash_by.is_empty() {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
Some(Hash::try_from(&vhash_by[..]).unwrap())
|
Some(Hash::try_from(vhash_by).unwrap())
|
||||||
};
|
};
|
||||||
|
|
||||||
let key = MerkleNodeKey {
|
let key = MerkleNodeKey {
|
||||||
|
@ -168,14 +168,7 @@ where
|
||||||
// This update is an Option<_>, so that it is None if the update is a no-op
|
// This update is an Option<_>, so that it is None if the update is a no-op
|
||||||
// and we can thus skip recalculating and re-storing everything
|
// and we can thus skip recalculating and re-storing everything
|
||||||
let mutate = match self.read_node_txn(tx, &key)? {
|
let mutate = match self.read_node_txn(tx, &key)? {
|
||||||
MerkleNode::Empty => {
|
MerkleNode::Empty => new_vhash.map(|vhv| MerkleNode::Leaf(k.to_vec(), vhv)),
|
||||||
if let Some(vhv) = new_vhash {
|
|
||||||
Some(MerkleNode::Leaf(k.to_vec(), vhv))
|
|
||||||
} else {
|
|
||||||
// Nothing to do, keep empty node
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
MerkleNode::Intermediate(mut children) => {
|
MerkleNode::Intermediate(mut children) => {
|
||||||
let key2 = key.next_key(khash);
|
let key2 = key.next_key(khash);
|
||||||
if let Some(subhash) = self.update_item_rec(tx, k, khash, &key2, new_vhash)? {
|
if let Some(subhash) = self.update_item_rec(tx, k, khash, &key2, new_vhash)? {
|
||||||
|
@ -186,7 +179,7 @@ where
|
||||||
intermediate_set_child(&mut children, key2.prefix[i], subhash);
|
intermediate_set_child(&mut children, key2.prefix[i], subhash);
|
||||||
}
|
}
|
||||||
|
|
||||||
if children.len() == 0 {
|
if children.is_empty() {
|
||||||
// should not happen
|
// should not happen
|
||||||
warn!(
|
warn!(
|
||||||
"({}) Replacing intermediate node with empty node, should not happen.",
|
"({}) Replacing intermediate node with empty node, should not happen.",
|
||||||
|
|
|
@ -18,7 +18,7 @@ impl PartitionKey for String {
|
||||||
|
|
||||||
impl PartitionKey for Hash {
|
impl PartitionKey for Hash {
|
||||||
fn hash(&self) -> Hash {
|
fn hash(&self) -> Hash {
|
||||||
self.clone()
|
*self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -150,14 +150,12 @@ where
|
||||||
if let Some(busy) = busy_opt {
|
if let Some(busy) = busy_opt {
|
||||||
if busy {
|
if busy {
|
||||||
nothing_to_do_since = None;
|
nothing_to_do_since = None;
|
||||||
} else {
|
} else if nothing_to_do_since.is_none() {
|
||||||
if nothing_to_do_since.is_none() {
|
|
||||||
nothing_to_do_since = Some(Instant::now());
|
nothing_to_do_since = Some(Instant::now());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
_ = must_exit.changed().fuse() => {},
|
||||||
_ = must_exit.changed().fuse() => (),
|
|
||||||
_ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
|
_ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
|
||||||
if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
|
if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
|
||||||
nothing_to_do_since = None;
|
nothing_to_do_since = None;
|
||||||
|
@ -277,7 +275,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if items.len() > 0 {
|
if !items.is_empty() {
|
||||||
let nodes = self
|
let nodes = self
|
||||||
.data
|
.data
|
||||||
.replication
|
.replication
|
||||||
|
@ -292,9 +290,10 @@ where
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if nodes.len() < self.data.replication.write_quorum() {
|
if nodes.len() < self.data.replication.write_quorum() {
|
||||||
return Err(Error::Message(format!(
|
return Err(Error::Message(
|
||||||
"Not offloading as we don't have a quorum of nodes to write to."
|
"Not offloading as we don't have a quorum of nodes to write to."
|
||||||
)));
|
.to_string(),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
counter += 1;
|
counter += 1;
|
||||||
|
@ -317,14 +316,14 @@ where
|
||||||
|
|
||||||
async fn offload_items(
|
async fn offload_items(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
items: &Vec<(Vec<u8>, Arc<ByteBuf>)>,
|
items: &[(Vec<u8>, Arc<ByteBuf>)],
|
||||||
nodes: &[UUID],
|
nodes: &[UUID],
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
|
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
|
||||||
|
|
||||||
self.rpc_client
|
self.rpc_client
|
||||||
.try_call_many(
|
.try_call_many(
|
||||||
&nodes[..],
|
nodes,
|
||||||
SyncRPC::Items(values),
|
SyncRPC::Items(values),
|
||||||
RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT),
|
RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT),
|
||||||
)
|
)
|
||||||
|
@ -467,7 +466,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
if todo_items.len() >= 256 {
|
if todo_items.len() >= 256 {
|
||||||
self.send_items(who, std::mem::replace(&mut todo_items, vec![]))
|
self.send_items(who, std::mem::take(&mut todo_items))
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -523,7 +522,7 @@ where
|
||||||
self.data.update_many(items)?;
|
self.data.update_many(items)?;
|
||||||
Ok(SyncRPC::Ok)
|
Ok(SyncRPC::Ok)
|
||||||
}
|
}
|
||||||
_ => Err(Error::Message(format!("Unexpected sync RPC"))),
|
_ => Err(Error::Message("Unexpected sync RPC".to_string())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -109,17 +109,14 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> {
|
pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> {
|
||||||
let mut call_list = HashMap::new();
|
let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
|
||||||
|
|
||||||
for entry in entries.iter() {
|
for entry in entries.iter() {
|
||||||
let hash = entry.partition_key().hash();
|
let hash = entry.partition_key().hash();
|
||||||
let who = self.data.replication.write_nodes(&hash);
|
let who = self.data.replication.write_nodes(&hash);
|
||||||
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
|
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
|
||||||
for node in who {
|
for node in who {
|
||||||
if !call_list.contains_key(&node) {
|
call_list.entry(node).or_default().push(e_enc.clone());
|
||||||
call_list.insert(node, vec![]);
|
|
||||||
}
|
|
||||||
call_list.get_mut(&node).unwrap().push(e_enc.clone());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -183,7 +180,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Err(Error::Message(format!("Invalid return value to read")));
|
return Err(Error::Message("Invalid return value to read".to_string()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if let Some(ret_entry) = &ret {
|
if let Some(ret_entry) = &ret {
|
||||||
|
@ -268,7 +265,7 @@ where
|
||||||
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
|
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
|
||||||
self.rpc_client
|
self.rpc_client
|
||||||
.try_call_many(
|
.try_call_many(
|
||||||
&who[..],
|
who,
|
||||||
TableRPC::<F>::Update(vec![what_enc]),
|
TableRPC::<F>::Update(vec![what_enc]),
|
||||||
RequestStrategy::with_quorum(who.len()).with_timeout(TABLE_RPC_TIMEOUT),
|
RequestStrategy::with_quorum(who.len()).with_timeout(TABLE_RPC_TIMEOUT),
|
||||||
)
|
)
|
||||||
|
@ -307,7 +304,7 @@ where
|
||||||
self.data.update_many(pairs)?;
|
self.data.update_many(pairs)?;
|
||||||
Ok(TableRPC::Ok)
|
Ok(TableRPC::Ok)
|
||||||
}
|
}
|
||||||
_ => Err(Error::BadRPC(format!("Unexpected table RPC"))),
|
_ => Err(Error::BadRPC("Unexpected table RPC".to_string())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue