From 7b474855e3a8491fcdde69d12d3fbae27f520383 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 5 May 2022 10:56:44 +0200 Subject: [PATCH 1/2] Make background runner terminate correctly --- src/util/background.rs | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/src/util/background.rs b/src/util/background.rs index bfdaaf1e..d35425f5 100644 --- a/src/util/background.rs +++ b/src/util/background.rs @@ -6,7 +6,9 @@ use std::time::Duration; use futures::future::*; use futures::select; -use tokio::sync::{mpsc, watch, Mutex}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex}; use crate::error::Error; @@ -30,26 +32,31 @@ impl BackgroundRunner { let stop_signal_2 = stop_signal.clone(); let await_all_done = tokio::spawn(async move { + let mut workers = FuturesUnordered::new(); + let mut shutdown_timer = 0; loop { - let wkr = { - select! { - item = worker_out.recv().fuse() => { - match item { - Some(x) => x, - None => break, - } + let closed = match worker_out.try_recv() { + Ok(wkr) => { + workers.push(wkr); + false + } + Err(TryRecvError::Empty) => false, + Err(TryRecvError::Disconnected) => true, + }; + select! { + res = workers.next() => { + if let Some(Err(e)) = res { + error!("Worker exited with error: {}", e); } - _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => { - if *stop_signal_2.borrow() { + } + _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { + if closed || *stop_signal_2.borrow() { + shutdown_timer += 1; + if shutdown_timer >= 10 { break; - } else { - continue; } } } - }; - if let Err(e) = wkr.await { - error!("Error while awaiting for worker: {}", e); } } }); -- 2.45.2 From c692f55d5ce2c3ed08db7fbc4844debcc0aeb134 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 17 May 2022 11:50:23 +0200 Subject: [PATCH 2/2] K2V: Fix `end` parameter and add tests (fix #305) --- src/api/k2v/range.rs | 6 ++- src/garage/tests/k2v/batch.rs | 89 ++++++++++++++++++++++++++++++++++- 2 files changed, 93 insertions(+), 2 deletions(-) diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs index cd019723..295c34aa 100644 --- a/src/api/k2v/range.rs +++ b/src/api/k2v/range.rs @@ -74,7 +74,11 @@ where } } if let Some(e) = end { - if entry.sort_key() == e { + let is_finished = match enumeration_order { + EnumerationOrder::Forward => entry.sort_key() >= e, + EnumerationOrder::Reverse => entry.sort_key() <= e, + }; + if is_finished { return Ok((entries, false, None)); } } diff --git a/src/garage/tests/k2v/batch.rs b/src/garage/tests/k2v/batch.rs index 1182a298..acae1910 100644 --- a/src/garage/tests/k2v/batch.rs +++ b/src/garage/tests/k2v/batch.rs @@ -92,7 +92,9 @@ async fn test_batch() { br#"[ {"partitionKey": "root"}, {"partitionKey": "root", "start": "c"}, + {"partitionKey": "root", "start": "c", "end": "dynamite"}, {"partitionKey": "root", "start": "c", "reverse": true, "end": "a"}, + {"partitionKey": "root", "start": "c", "reverse": true, "end": "azerty"}, {"partitionKey": "root", "limit": 1}, {"partitionKey": "root", "prefix": "d"} ]"# @@ -147,6 +149,24 @@ async fn test_batch() { "more": false, "nextStart": null, }, + { + "partitionKey": "root", + "prefix": null, + "start": "c", + "end": "dynamite", + "limit": null, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, { "partitionKey": "root", "prefix": null, @@ -164,6 +184,23 @@ async fn test_batch() { "more": false, "nextStart": null, }, + { + "partitionKey": "root", + "prefix": null, + "start": "c", + "end": "azerty", + "limit": null, + "reverse": true, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, + {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, { "partitionKey": "root", "prefix": null, @@ -465,6 +502,34 @@ async fn test_batch() { ]) ); + // update our known tombstones + for sk in ["a", "b", "d.1", "d.2"] { + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 204); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/octet-stream" + ); + ct.insert( + sk, + res.headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(), + ); + } + let res = ctx .k2v .request @@ -473,7 +538,8 @@ async fn test_batch() { .body( br#"[ {"partitionKey": "root"}, - {"partitionKey": "root", "reverse": true} + {"partitionKey": "root", "reverse": true}, + {"partitionKey": "root", "tombstones": true} ]"# .to_vec(), ) @@ -520,6 +586,27 @@ async fn test_batch() { "more": false, "nextStart": null, }, + { + "partitionKey": "root", + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "conflictsOnly": false, + "tombstones": true, + "singleItem": false, + "items": [ + {"sk": "a", "ct": ct.get("a").unwrap(), "v": [null]}, + {"sk": "b", "ct": ct.get("b").unwrap(), "v": [null]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [null]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [null]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, ]) ); } -- 2.45.2