First implementation of ReadIndex that might work if I'm lucky
Some checks failed
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is failing

This commit is contained in:
Alex 2022-04-21 13:27:10 +02:00
parent d3a9075cd3
commit 87a8315546
Signed by: lx
GPG key ID: 0E496D15096376BE
7 changed files with 182 additions and 3 deletions

1
Cargo.lock generated
View file

@ -887,6 +887,7 @@ dependencies = [
"futures-util",
"garage_block",
"garage_model 0.7.0",
"garage_rpc 0.7.0",
"garage_table 0.7.0",
"garage_util 0.7.0",
"hex",

View file

@ -18,6 +18,7 @@ garage_model = { version = "0.7.0", path = "../model" }
garage_table = { version = "0.7.0", path = "../table" }
garage_block = { version = "0.7.0", path = "../block" }
garage_util = { version = "0.7.0", path = "../util" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
async-trait = "0.1.7"
base64 = "0.13"

View file

@ -19,6 +19,7 @@ use crate::signature::payload::check_payload_signature;
use crate::signature::streaming::*;
use crate::helpers::*;
use crate::k2v::index::*;
use crate::k2v::item::*;
use crate::k2v::router::Endpoint;
use crate::s3::cors::*;
@ -135,6 +136,12 @@ impl ApiHandler for K2VApiServer {
partition_key,
sort_key,
} => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await,
Endpoint::ReadIndex {
prefix,
start,
end,
limit,
} => handle_read_index(garage, bucket_id, prefix, start, end, limit).await,
//TODO
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
};

89
src/api/k2v/index.rs Normal file
View file

@ -0,0 +1,89 @@
use std::sync::Arc;
use hyper::{Body, Response, StatusCode};
use serde::Serialize;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_rpc::ring::Ring;
use garage_model::garage::Garage;
use crate::error::*;
use crate::k2v::range::read_range;
pub async fn handle_read_index(
garage: Arc<Garage>,
bucket_id: Uuid,
prefix: Option<String>,
start: Option<String>,
end: Option<String>,
limit: Option<u64>,
) -> Result<Response<Body>, Error> {
let ring: Arc<Ring> = garage.system.ring.borrow().clone();
let (partition_keys, more, next_start) = read_range(
&garage.k2v_counter_table.table,
&bucket_id,
&prefix,
&start,
&end,
limit,
None,
)
.await?;
let s_entries = "entries".to_string();
let s_values = "values".to_string();
let s_bytes = "bytes".to_string();
let resp = ReadIndexResponse {
prefix,
start,
end,
limit,
partition_keys: partition_keys
.into_iter()
.map(|part| {
let vals = part.filtered_values(&ring);
ReadIndexResponseEntry {
pk: part.sk,
entries: *vals.get(&s_entries).unwrap_or(&0),
values: *vals.get(&s_values).unwrap_or(&0),
bytes: *vals.get(&s_bytes).unwrap_or(&0),
}
})
.collect::<Vec<_>>(),
more,
next_start,
};
let resp_json = serde_json::to_string_pretty(&resp).map_err(GarageError::from)?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(resp_json))?)
}
#[derive(Serialize)]
struct ReadIndexResponse {
prefix: Option<String>,
start: Option<String>,
end: Option<String>,
limit: Option<u64>,
#[serde(rename = "partitionKeys")]
partition_keys: Vec<ReadIndexResponseEntry>,
more: bool,
#[serde(rename = "nextStart")]
next_start: Option<String>,
}
#[derive(Serialize)]
struct ReadIndexResponseEntry {
pk: String,
entries: i64,
values: i64,
bytes: i64,
}

View file

@ -1,4 +1,3 @@
//! Function related to GET and HEAD requests
use std::sync::Arc;
use http::header;
@ -13,7 +12,7 @@ use garage_model::k2v::item_table::*;
use crate::error::*;
const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
pub enum ReturnFormat {
Json,

View file

@ -1,5 +1,7 @@
pub mod api_server;
mod router;
mod index;
mod item;
mod range;

80
src/api/k2v/range.rs Normal file
View file

@ -0,0 +1,80 @@
//! Utility module for retrieving ranges of items in Garage tables
//! Implements parameters (prefix, start, end, limit) as specified
//! for endpoints ReadIndex, ReadBatch and DeleteBatch
use std::sync::Arc;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::error::*;
/// Read range in a Garage table.
/// Returns (entries, more?, nextStart)
pub(crate) async fn read_range<F>(
table: &Arc<Table<F, TableShardedReplication>>,
partition_key: &F::P,
prefix: &Option<String>,
start: &Option<String>,
end: &Option<String>,
limit: Option<u64>,
filter: Option<F::Filter>,
) -> Result<(Vec<F::E>, bool, Option<String>), Error>
where
F: TableSchema<S = String> + 'static,
{
let mut start = match (prefix, start) {
(None, None) => "".to_string(),
(Some(p), None) => p.clone(),
(None, Some(s)) => s.clone(),
(Some(p), Some(s)) => {
if !s.starts_with(p) {
return Err(Error::BadRequest(format!(
"Start key '{}' does not start with prefix '{}'",
s, p
)));
}
s.clone()
}
};
let mut start_ignore = false;
let mut entries = vec![];
loop {
let n_get = std::cmp::min(1000, limit.unwrap_or(u64::MAX) as usize - entries.len() + 2);
let get_ret = table
.get_range(partition_key, Some(start.clone()), filter.clone(), n_get)
.await?;
let get_ret_len = get_ret.len();
for entry in get_ret {
if let Some(p) = prefix {
if !entry.sort_key().starts_with(p) {
return Ok((entries, false, None));
}
}
if let Some(e) = end {
if entry.sort_key() == e {
return Ok((entries, false, None));
}
}
if let Some(l) = limit {
if entries.len() >= l as usize {
return Ok((entries, true, Some(entry.sort_key().clone())));
}
}
if start_ignore && entry.sort_key() == &start {
continue;
}
entries.push(entry);
}
if get_ret_len < n_get {
return Ok((entries, false, None));
}
start = entries.last().unwrap().sort_key().clone();
start_ignore = true;
}
}