K2V: Fix #305 #306
3 changed files with 115 additions and 17 deletions
|
@ -74,7 +74,11 @@ where
|
|||
}
|
||||
}
|
||||
if let Some(e) = end {
|
||||
if entry.sort_key() == e {
|
||||
let is_finished = match enumeration_order {
|
||||
EnumerationOrder::Forward => entry.sort_key() >= e,
|
||||
EnumerationOrder::Reverse => entry.sort_key() <= e,
|
||||
};
|
||||
if is_finished {
|
||||
return Ok((entries, false, None));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -92,7 +92,9 @@ async fn test_batch() {
|
|||
br#"[
|
||||
{"partitionKey": "root"},
|
||||
{"partitionKey": "root", "start": "c"},
|
||||
{"partitionKey": "root", "start": "c", "end": "dynamite"},
|
||||
{"partitionKey": "root", "start": "c", "reverse": true, "end": "a"},
|
||||
{"partitionKey": "root", "start": "c", "reverse": true, "end": "azerty"},
|
||||
{"partitionKey": "root", "limit": 1},
|
||||
{"partitionKey": "root", "prefix": "d"}
|
||||
]"#
|
||||
|
@ -147,6 +149,24 @@ async fn test_batch() {
|
|||
"more": false,
|
||||
"nextStart": null,
|
||||
},
|
||||
{
|
||||
"partitionKey": "root",
|
||||
"prefix": null,
|
||||
"start": "c",
|
||||
"end": "dynamite",
|
||||
"limit": null,
|
||||
"reverse": false,
|
||||
"conflictsOnly": false,
|
||||
"tombstones": false,
|
||||
"singleItem": false,
|
||||
"items": [
|
||||
{"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
|
||||
{"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
|
||||
{"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]},
|
||||
],
|
||||
"more": false,
|
||||
"nextStart": null,
|
||||
},
|
||||
{
|
||||
"partitionKey": "root",
|
||||
"prefix": null,
|
||||
|
@ -164,6 +184,23 @@ async fn test_batch() {
|
|||
"more": false,
|
||||
"nextStart": null,
|
||||
},
|
||||
{
|
||||
"partitionKey": "root",
|
||||
"prefix": null,
|
||||
"start": "c",
|
||||
"end": "azerty",
|
||||
"limit": null,
|
||||
"reverse": true,
|
||||
"conflictsOnly": false,
|
||||
"tombstones": false,
|
||||
"singleItem": false,
|
||||
"items": [
|
||||
{"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
|
||||
{"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
|
||||
],
|
||||
"more": false,
|
||||
"nextStart": null,
|
||||
},
|
||||
{
|
||||
"partitionKey": "root",
|
||||
"prefix": null,
|
||||
|
@ -465,6 +502,34 @@ async fn test_batch() {
|
|||
])
|
||||
);
|
||||
|
||||
// update our known tombstones
|
||||
for sk in ["a", "b", "d.1", "d.2"] {
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.path("root")
|
||||
.query_param("sort_key", Some(sk))
|
||||
.signed_header("accept", "application/octet-stream")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res.status(), 204);
|
||||
assert_eq!(
|
||||
res.headers().get("content-type").unwrap().to_str().unwrap(),
|
||||
"application/octet-stream"
|
||||
);
|
||||
ct.insert(
|
||||
sk,
|
||||
res.headers()
|
||||
.get("x-garage-causality-token")
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
|
@ -473,7 +538,8 @@ async fn test_batch() {
|
|||
.body(
|
||||
br#"[
|
||||
{"partitionKey": "root"},
|
||||
{"partitionKey": "root", "reverse": true}
|
||||
{"partitionKey": "root", "reverse": true},
|
||||
{"partitionKey": "root", "tombstones": true}
|
||||
]"#
|
||||
.to_vec(),
|
||||
)
|
||||
|
@ -520,6 +586,27 @@ async fn test_batch() {
|
|||
"more": false,
|
||||
"nextStart": null,
|
||||
},
|
||||
{
|
||||
"partitionKey": "root",
|
||||
"prefix": null,
|
||||
"start": null,
|
||||
"end": null,
|
||||
"limit": null,
|
||||
"reverse": false,
|
||||
"conflictsOnly": false,
|
||||
"tombstones": true,
|
||||
"singleItem": false,
|
||||
"items": [
|
||||
{"sk": "a", "ct": ct.get("a").unwrap(), "v": [null]},
|
||||
{"sk": "b", "ct": ct.get("b").unwrap(), "v": [null]},
|
||||
{"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
|
||||
{"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [null]},
|
||||
{"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [null]},
|
||||
{"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]},
|
||||
],
|
||||
"more": false,
|
||||
"nextStart": null,
|
||||
},
|
||||
])
|
||||
);
|
||||
}
|
||||
|
|
|
@ -6,7 +6,9 @@ use std::time::Duration;
|
|||
|
||||
use futures::future::*;
|
||||
use futures::select;
|
||||
use tokio::sync::{mpsc, watch, Mutex};
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex};
|
||||
|
||||
use crate::error::Error;
|
||||
|
||||
|
@ -30,26 +32,31 @@ impl BackgroundRunner {
|
|||
|
||||
let stop_signal_2 = stop_signal.clone();
|
||||
let await_all_done = tokio::spawn(async move {
|
||||
let mut workers = FuturesUnordered::new();
|
||||
let mut shutdown_timer = 0;
|
||||
loop {
|
||||
let wkr = {
|
||||
select! {
|
||||
item = worker_out.recv().fuse() => {
|
||||
match item {
|
||||
Some(x) => x,
|
||||
None => break,
|
||||
}
|
||||
let closed = match worker_out.try_recv() {
|
||||
Ok(wkr) => {
|
||||
workers.push(wkr);
|
||||
false
|
||||
}
|
||||
Err(TryRecvError::Empty) => false,
|
||||
Err(TryRecvError::Disconnected) => true,
|
||||
};
|
||||
select! {
|
||||
res = workers.next() => {
|
||||
if let Some(Err(e)) = res {
|
||||
error!("Worker exited with error: {}", e);
|
||||
}
|
||||
_ = tokio::time::sleep(Duration::from_secs(5)).fuse() => {
|
||||
if *stop_signal_2.borrow() {
|
||||
}
|
||||
_ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
|
||||
if closed || *stop_signal_2.borrow() {
|
||||
shutdown_timer += 1;
|
||||
if shutdown_timer >= 10 {
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
if let Err(e) = wkr.await {
|
||||
error!("Error while awaiting for worker: {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
Loading…
Reference in a new issue