RPC performance changes #387
3 changed files with 26 additions and 14 deletions
|
@ -15,7 +15,7 @@ pub fn make_node_id(node_id: Uuid) -> K2VNodeId {
|
||||||
u64::from_be_bytes(tmp)
|
u64::from_be_bytes(tmp)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
|
||||||
pub struct CausalContext {
|
pub struct CausalContext {
|
||||||
pub vector_clock: BTreeMap<K2VNodeId, u64>,
|
pub vector_clock: BTreeMap<K2VNodeId, u64>,
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@ pub const CONFLICTS: &str = "conflicts";
|
||||||
pub const VALUES: &str = "values";
|
pub const VALUES: &str = "values";
|
||||||
pub const BYTES: &str = "bytes";
|
pub const BYTES: &str = "bytes";
|
||||||
|
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct K2VItem {
|
pub struct K2VItem {
|
||||||
pub partition: K2VItemPartition,
|
pub partition: K2VItemPartition,
|
||||||
pub sort_key: String,
|
pub sort_key: String,
|
||||||
|
@ -25,19 +25,19 @@ pub struct K2VItem {
|
||||||
items: BTreeMap<K2VNodeId, DvvsEntry>,
|
items: BTreeMap<K2VNodeId, DvvsEntry>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, Hash, Eq)]
|
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)]
|
||||||
pub struct K2VItemPartition {
|
pub struct K2VItemPartition {
|
||||||
pub bucket_id: Uuid,
|
pub bucket_id: Uuid,
|
||||||
pub partition_key: String,
|
pub partition_key: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||||
struct DvvsEntry {
|
struct DvvsEntry {
|
||||||
t_discard: u64,
|
t_discard: u64,
|
||||||
values: Vec<(u64, DvvsValue)>,
|
values: Vec<(u64, DvvsValue)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub enum DvvsValue {
|
pub enum DvvsValue {
|
||||||
Value(#[serde(with = "serde_bytes")] Vec<u8>),
|
Value(#[serde(with = "serde_bytes")] Vec<u8>),
|
||||||
Deleted,
|
Deleted,
|
||||||
|
|
|
@ -44,8 +44,15 @@ pub struct RequestStrategy {
|
||||||
pub rs_interrupt_after_quorum: bool,
|
pub rs_interrupt_after_quorum: bool,
|
||||||
/// Request priority
|
/// Request priority
|
||||||
pub rs_priority: RequestPriority,
|
pub rs_priority: RequestPriority,
|
||||||
/// Deactivate timeout for this request
|
/// Custom timeout for this request
|
||||||
pub rs_no_timeout: bool,
|
rs_timeout: Timeout,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone)]
|
||||||
|
enum Timeout {
|
||||||
|
None,
|
||||||
|
Default,
|
||||||
|
Custom(Duration),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RequestStrategy {
|
impl RequestStrategy {
|
||||||
|
@ -55,7 +62,7 @@ impl RequestStrategy {
|
||||||
rs_quorum: None,
|
rs_quorum: None,
|
||||||
rs_interrupt_after_quorum: false,
|
rs_interrupt_after_quorum: false,
|
||||||
rs_priority: prio,
|
rs_priority: prio,
|
||||||
rs_no_timeout: false,
|
rs_timeout: Timeout::Default,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/// Set quorum to be reached for request
|
/// Set quorum to be reached for request
|
||||||
|
@ -71,7 +78,12 @@ impl RequestStrategy {
|
||||||
}
|
}
|
||||||
/// Deactivate timeout for this request
|
/// Deactivate timeout for this request
|
||||||
pub fn without_timeout(mut self) -> Self {
|
pub fn without_timeout(mut self) -> Self {
|
||||||
self.rs_no_timeout = true;
|
self.rs_timeout = Timeout::None;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
/// Set custom timeout for this request
|
||||||
|
pub fn with_custom_timeout(mut self, timeout: Duration) -> Self {
|
||||||
|
self.rs_timeout = Timeout::Custom(timeout);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -138,10 +150,10 @@ impl RpcHelper {
|
||||||
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
|
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
|
||||||
|
|
||||||
let timeout = async {
|
let timeout = async {
|
||||||
if strat.rs_no_timeout {
|
match strat.rs_timeout {
|
||||||
futures::future::pending().await
|
Timeout::None => futures::future::pending().await,
|
||||||
} else {
|
Timeout::Default => tokio::time::sleep(self.0.rpc_timeout).await,
|
||||||
tokio::time::sleep(self.0.rpc_timeout).await
|
Timeout::Custom(t) => tokio::time::sleep(t).await,
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -412,7 +424,7 @@ impl RpcHelper {
|
||||||
.iter()
|
.iter()
|
||||||
.find(|x| x.id.as_ref() == to.as_slice())
|
.find(|x| x.id.as_ref() == to.as_slice())
|
||||||
.and_then(|pi| pi.avg_ping)
|
.and_then(|pi| pi.avg_ping)
|
||||||
.unwrap_or_else(|| Duration::from_secs(1));
|
.unwrap_or_else(|| Duration::from_secs(10));
|
||||||
(
|
(
|
||||||
*to != self.0.our_node_id,
|
*to != self.0.our_node_id,
|
||||||
peer_zone != our_zone,
|
peer_zone != our_zone,
|
||||||
|
|
Loading…
Reference in a new issue