From 91faae679f35ecf1d6339113efe4c64665afdbc0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Apr 2022 15:07:18 +0200 Subject: [PATCH] Fix bugs, test does interesting things --- k2v_test.py | 54 +++++++++++++++++++++++++++++++++++++- src/api/k2v/item.rs | 4 +-- src/api/k2v/range.rs | 2 +- src/model/index_counter.rs | 10 +++---- src/model/k2v/rpc.rs | 3 ++- 5 files changed, 63 insertions(+), 10 deletions(-) diff --git a/k2v_test.py b/k2v_test.py index 653c7489..d56f5413 100755 --- a/k2v_test.py +++ b/k2v_test.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import requests +from datetime import datetime # let's talk to our AWS Elasticsearch cluster #from requests_aws4auth import AWS4Auth @@ -17,6 +18,57 @@ auth = AWSRequestsAuth(aws_access_key='GK31c2f218a2e44f485b94239e', aws_service='k2v') +print("-- ReadIndex") response = requests.get('http://localhost:3812/alex', auth=auth) -print(response.content) +print(response.headers) +print(response.text) + + +print("-- Put initial (no CT)") +response = requests.put('http://localhost:3812/alex/root?sort_key=b', + auth=auth, + data='{}: Hello, world!'.format(datetime.timestamp(datetime.now()))) +print(response.headers) +print(response.text) + +print("-- Get") +response = requests.get('http://localhost:3812/alex/root?sort_key=b', + auth=auth) +print(response.headers) +print(response.text) +ct = response.headers["x-garage-causality-token"] + +print("-- ReadIndex") +response = requests.get('http://localhost:3812/alex', + auth=auth) +print(response.headers) +print(response.text) + +print("-- Put with CT") +response = requests.put('http://localhost:3812/alex/root?sort_key=b', + auth=auth, + headers={'x-garage-causality-token': ct}, + data='{}: Good bye, world!'.format(datetime.timestamp(datetime.now()))) +print(response.headers) +print(response.text) + +print("-- Get") +response = requests.get('http://localhost:3812/alex/root?sort_key=b', + auth=auth) +print(response.headers) +print(response.text) + +print("-- Put again with same CT (concurrent)") +response = requests.put('http://localhost:3812/alex/root?sort_key=b', + auth=auth, + headers={'x-garage-causality-token': ct}, + data='{}: Concurrent value, oops'.format(datetime.timestamp(datetime.now()))) +print(response.headers) +print(response.text) + +print("-- Get") +response = requests.get('http://localhost:3812/alex/root?sort_key=b', + auth=auth) +print(response.headers) +print(response.text) diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index c74e4192..0eb4ed70 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -28,8 +28,8 @@ impl ReturnFormat { }; let accept = accept.split(',').map(|s| s.trim()).collect::>(); - let accept_json = accept.contains(&"application/json"); - let accept_binary = accept.contains(&"application/octet-stream"); + let accept_json = accept.contains(&"application/json") || accept.contains(&"*/*"); + let accept_binary = accept.contains(&"application/octet-stream") || accept.contains(&"*/*"); match (accept_json, accept_binary) { (true, true) => Ok(Self::Either), diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs index 29bca19e..37ab7aa1 100644 --- a/src/api/k2v/range.rs +++ b/src/api/k2v/range.rs @@ -41,7 +41,7 @@ where let mut entries = vec![]; loop { - let n_get = std::cmp::min(1000, limit.unwrap_or(u64::MAX) as usize - entries.len() + 2); + let n_get = std::cmp::min(1000, limit.map(|x| x as usize).unwrap_or(usize::MAX - 10) - entries.len() + 2); let get_ret = table .get_range(partition_key, Some(start.clone()), filter.clone(), n_get) .await?; diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index cabe9de5..13273956 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -24,7 +24,7 @@ pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static { pub struct CounterEntry { pub pk: T::P, pub sk: T::S, - values: BTreeMap, + pub values: BTreeMap, } impl Entry for CounterEntry { @@ -51,10 +51,10 @@ impl CounterEntry { .node_values .iter() .filter(|(n, _)| nodes.contains(n)) - .map(|(_, (_, v))| v) + .map(|(_, (_, v))| *v) .collect::>(); if !new_vals.is_empty() { - ret.insert(name.clone(), new_vals.iter().fold(i64::MIN, |a, b| a + *b)); + ret.insert(name.clone(), new_vals.iter().fold(i64::MIN, |a, b| std::cmp::max(a, *b))); } } @@ -64,8 +64,8 @@ impl CounterEntry { /// A counter entry in the global table #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -struct CounterValue { - node_values: BTreeMap, +pub struct CounterValue { + pub node_values: BTreeMap, } impl Crdt for CounterEntry { diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 857b494d..397496c9 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -97,7 +97,8 @@ impl K2VRpcHandler { }, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(1) - .with_timeout(TABLE_RPC_TIMEOUT), + .with_timeout(TABLE_RPC_TIMEOUT) + .interrupt_after_quorum(true), ) .await?;