diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs index 8c76a32b..9a692870 100644 --- a/src/model/k2v/causality.rs +++ b/src/model/k2v/causality.rs @@ -15,7 +15,7 @@ pub fn make_node_id(node_id: Uuid) -> K2VNodeId { u64::from_be_bytes(tmp) } -#[derive(PartialEq, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)] pub struct CausalContext { pub vector_clock: BTreeMap, } diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index baa1db4b..7860cb17 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -17,7 +17,7 @@ pub const CONFLICTS: &str = "conflicts"; pub const VALUES: &str = "values"; pub const BYTES: &str = "bytes"; -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct K2VItem { pub partition: K2VItemPartition, pub sort_key: String, @@ -25,19 +25,19 @@ pub struct K2VItem { items: BTreeMap, } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, Hash, Eq)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)] pub struct K2VItemPartition { pub bucket_id: Uuid, pub partition_key: String, } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] struct DvvsEntry { t_discard: u64, values: Vec<(u64, DvvsValue)>, } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub enum DvvsValue { Value(#[serde(with = "serde_bytes")] Vec), Deleted, diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 857ed620..949aced6 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -44,8 +44,15 @@ pub struct RequestStrategy { pub rs_interrupt_after_quorum: bool, /// Request priority pub rs_priority: RequestPriority, - /// Deactivate timeout for this request - pub rs_no_timeout: bool, + /// Custom timeout for this request + rs_timeout: Timeout, +} + +#[derive(Copy, Clone)] +enum Timeout { + None, + Default, + Custom(Duration), } impl RequestStrategy { @@ -55,7 +62,7 @@ impl RequestStrategy { rs_quorum: None, rs_interrupt_after_quorum: false, rs_priority: prio, - rs_no_timeout: false, + rs_timeout: Timeout::Default, } } /// Set quorum to be reached for request @@ -71,7 +78,12 @@ impl RequestStrategy { } /// Deactivate timeout for this request pub fn without_timeout(mut self) -> Self { - self.rs_no_timeout = true; + self.rs_timeout = Timeout::None; + self + } + /// Set custom timeout for this request + pub fn with_custom_timeout(mut self, timeout: Duration) -> Self { + self.rs_timeout = Timeout::Custom(timeout); self } } @@ -138,10 +150,10 @@ impl RpcHelper { .record_duration(&self.0.metrics.rpc_duration, &metric_tags); let timeout = async { - if strat.rs_no_timeout { - futures::future::pending().await - } else { - tokio::time::sleep(self.0.rpc_timeout).await + match strat.rs_timeout { + Timeout::None => futures::future::pending().await, + Timeout::Default => tokio::time::sleep(self.0.rpc_timeout).await, + Timeout::Custom(t) => tokio::time::sleep(t).await, } }; @@ -412,7 +424,7 @@ impl RpcHelper { .iter() .find(|x| x.id.as_ref() == to.as_slice()) .and_then(|pi| pi.avg_ping) - .unwrap_or_else(|| Duration::from_secs(1)); + .unwrap_or_else(|| Duration::from_secs(10)); ( *to != self.0.our_node_id, peer_zone != our_zone,