K2V #293

Merged
lx merged 68 commits from k2v into main 2022-05-10 11:16:58 +00:00
5 changed files with 63 additions and 10 deletions
Showing only changes of commit 91faae679f - Show all commits

View file

@ -1,6 +1,7 @@
#!/usr/bin/env python #!/usr/bin/env python
import requests import requests
from datetime import datetime
# let's talk to our AWS Elasticsearch cluster # let's talk to our AWS Elasticsearch cluster
#from requests_aws4auth import AWS4Auth #from requests_aws4auth import AWS4Auth
@ -17,6 +18,57 @@ auth = AWSRequestsAuth(aws_access_key='GK31c2f218a2e44f485b94239e',
aws_service='k2v') aws_service='k2v')
print("-- ReadIndex")
response = requests.get('http://localhost:3812/alex', response = requests.get('http://localhost:3812/alex',
auth=auth) auth=auth)
print(response.content) print(response.headers)
print(response.text)
print("-- Put initial (no CT)")
response = requests.put('http://localhost:3812/alex/root?sort_key=b',
auth=auth,
data='{}: Hello, world!'.format(datetime.timestamp(datetime.now())))
print(response.headers)
print(response.text)
print("-- Get")
response = requests.get('http://localhost:3812/alex/root?sort_key=b',
auth=auth)
print(response.headers)
print(response.text)
ct = response.headers["x-garage-causality-token"]
print("-- ReadIndex")
response = requests.get('http://localhost:3812/alex',
auth=auth)
print(response.headers)
print(response.text)
print("-- Put with CT")
response = requests.put('http://localhost:3812/alex/root?sort_key=b',
auth=auth,
headers={'x-garage-causality-token': ct},
data='{}: Good bye, world!'.format(datetime.timestamp(datetime.now())))
print(response.headers)
print(response.text)
print("-- Get")
response = requests.get('http://localhost:3812/alex/root?sort_key=b',
auth=auth)
print(response.headers)
print(response.text)
print("-- Put again with same CT (concurrent)")
response = requests.put('http://localhost:3812/alex/root?sort_key=b',
auth=auth,
headers={'x-garage-causality-token': ct},
data='{}: Concurrent value, oops'.format(datetime.timestamp(datetime.now())))
print(response.headers)
print(response.text)
print("-- Get")
response = requests.get('http://localhost:3812/alex/root?sort_key=b',
auth=auth)
print(response.headers)
print(response.text)

View file

@ -28,8 +28,8 @@ impl ReturnFormat {
}; };
let accept = accept.split(',').map(|s| s.trim()).collect::<Vec<_>>(); let accept = accept.split(',').map(|s| s.trim()).collect::<Vec<_>>();
let accept_json = accept.contains(&"application/json"); let accept_json = accept.contains(&"application/json") || accept.contains(&"*/*");
let accept_binary = accept.contains(&"application/octet-stream"); let accept_binary = accept.contains(&"application/octet-stream") || accept.contains(&"*/*");
match (accept_json, accept_binary) { match (accept_json, accept_binary) {
(true, true) => Ok(Self::Either), (true, true) => Ok(Self::Either),

View file

@ -41,7 +41,7 @@ where
let mut entries = vec![]; let mut entries = vec![];
loop { loop {
let n_get = std::cmp::min(1000, limit.unwrap_or(u64::MAX) as usize - entries.len() + 2); let n_get = std::cmp::min(1000, limit.map(|x| x as usize).unwrap_or(usize::MAX - 10) - entries.len() + 2);
let get_ret = table let get_ret = table
.get_range(partition_key, Some(start.clone()), filter.clone(), n_get) .get_range(partition_key, Some(start.clone()), filter.clone(), n_get)
.await?; .await?;

View file

@ -24,7 +24,7 @@ pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static {
pub struct CounterEntry<T: CounterSchema> { pub struct CounterEntry<T: CounterSchema> {
pub pk: T::P, pub pk: T::P,
pub sk: T::S, pub sk: T::S,
values: BTreeMap<String, CounterValue>, pub values: BTreeMap<String, CounterValue>,
} }
impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> { impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> {
@ -51,10 +51,10 @@ impl<T: CounterSchema> CounterEntry<T> {
.node_values .node_values
.iter() .iter()
.filter(|(n, _)| nodes.contains(n)) .filter(|(n, _)| nodes.contains(n))
.map(|(_, (_, v))| v) .map(|(_, (_, v))| *v)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
if !new_vals.is_empty() { if !new_vals.is_empty() {
ret.insert(name.clone(), new_vals.iter().fold(i64::MIN, |a, b| a + *b)); ret.insert(name.clone(), new_vals.iter().fold(i64::MIN, |a, b| std::cmp::max(a, *b)));
} }
} }
@ -64,8 +64,8 @@ impl<T: CounterSchema> CounterEntry<T> {
/// A counter entry in the global table /// A counter entry in the global table
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
struct CounterValue { pub struct CounterValue {
node_values: BTreeMap<Uuid, (u64, i64)>, pub node_values: BTreeMap<Uuid, (u64, i64)>,
} }
impl<T: CounterSchema> Crdt for CounterEntry<T> { impl<T: CounterSchema> Crdt for CounterEntry<T> {

View file

@ -97,7 +97,8 @@ impl K2VRpcHandler {
}, },
RequestStrategy::with_priority(PRIO_NORMAL) RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(1) .with_quorum(1)
.with_timeout(TABLE_RPC_TIMEOUT), .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
) )
.await?; .await?;