K2V: Fix #305 #306

Merged
lx merged 2 commits from fix-k2v-305 into main 2022-05-17 11:10:39 +00:00
3 changed files with 115 additions and 17 deletions

View file

@ -74,7 +74,11 @@ where
}
}
if let Some(e) = end {
if entry.sort_key() == e {
let is_finished = match enumeration_order {
EnumerationOrder::Forward => entry.sort_key() >= e,
EnumerationOrder::Reverse => entry.sort_key() <= e,
};
if is_finished {
return Ok((entries, false, None));
}
}

View file

@ -92,7 +92,9 @@ async fn test_batch() {
br#"[
{"partitionKey": "root"},
{"partitionKey": "root", "start": "c"},
{"partitionKey": "root", "start": "c", "end": "dynamite"},
{"partitionKey": "root", "start": "c", "reverse": true, "end": "a"},
{"partitionKey": "root", "start": "c", "reverse": true, "end": "azerty"},
{"partitionKey": "root", "limit": 1},
{"partitionKey": "root", "prefix": "d"}
]"#
@ -147,6 +149,24 @@ async fn test_batch() {
"more": false,
"nextStart": null,
},
{
"partitionKey": "root",
"prefix": null,
"start": "c",
"end": "dynamite",
"limit": null,
"reverse": false,
"conflictsOnly": false,
"tombstones": false,
"singleItem": false,
"items": [
{"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
{"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
{"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]},
],
"more": false,
"nextStart": null,
},
{
"partitionKey": "root",
"prefix": null,
@ -164,6 +184,23 @@ async fn test_batch() {
"more": false,
"nextStart": null,
},
{
"partitionKey": "root",
"prefix": null,
"start": "c",
"end": "azerty",
"limit": null,
"reverse": true,
"conflictsOnly": false,
"tombstones": false,
"singleItem": false,
"items": [
{"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
{"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
],
"more": false,
"nextStart": null,
},
{
"partitionKey": "root",
"prefix": null,
@ -465,6 +502,34 @@ async fn test_batch() {
])
);
// update our known tombstones
for sk in ["a", "b", "d.1", "d.2"] {
let res = ctx
.k2v
.request
.builder(bucket.clone())
.path("root")
.query_param("sort_key", Some(sk))
.signed_header("accept", "application/octet-stream")
.send()
.await
.unwrap();
assert_eq!(res.status(), 204);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/octet-stream"
);
ct.insert(
sk,
res.headers()
.get("x-garage-causality-token")
.unwrap()
.to_str()
.unwrap()
.to_string(),
);
}
let res = ctx
.k2v
.request
@ -473,7 +538,8 @@ async fn test_batch() {
.body(
br#"[
{"partitionKey": "root"},
{"partitionKey": "root", "reverse": true}
{"partitionKey": "root", "reverse": true},
{"partitionKey": "root", "tombstones": true}
]"#
.to_vec(),
)
@ -520,6 +586,27 @@ async fn test_batch() {
"more": false,
"nextStart": null,
},
{
"partitionKey": "root",
"prefix": null,
"start": null,
"end": null,
"limit": null,
"reverse": false,
"conflictsOnly": false,
"tombstones": true,
"singleItem": false,
"items": [
{"sk": "a", "ct": ct.get("a").unwrap(), "v": [null]},
{"sk": "b", "ct": ct.get("b").unwrap(), "v": [null]},
{"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
{"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [null]},
{"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [null]},
{"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]},
],
"more": false,
"nextStart": null,
},
])
);
}

View file

@ -6,7 +6,9 @@ use std::time::Duration;
use futures::future::*;
use futures::select;
use tokio::sync::{mpsc, watch, Mutex};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex};
use crate::error::Error;
@ -30,26 +32,31 @@ impl BackgroundRunner {
let stop_signal_2 = stop_signal.clone();
let await_all_done = tokio::spawn(async move {
let mut workers = FuturesUnordered::new();
let mut shutdown_timer = 0;
loop {
let wkr = {
select! {
item = worker_out.recv().fuse() => {
match item {
Some(x) => x,
None => break,
}
let closed = match worker_out.try_recv() {
Ok(wkr) => {
workers.push(wkr);
false
}
Err(TryRecvError::Empty) => false,
Err(TryRecvError::Disconnected) => true,
};
select! {
res = workers.next() => {
if let Some(Err(e)) = res {
error!("Worker exited with error: {}", e);
}
_ = tokio::time::sleep(Duration::from_secs(5)).fuse() => {
if *stop_signal_2.borrow() {
}
_ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
if closed || *stop_signal_2.borrow() {
shutdown_timer += 1;
if shutdown_timer >= 10 {
break;
} else {
continue;
}
}
}
};
if let Err(e) = wkr.await {
error!("Error while awaiting for worker: {}", e);
}
}
});