From 87a83155466a1e0f1e5a8aa31565d6f23124929b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Apr 2022 13:27:10 +0200 Subject: [PATCH] First implementation of ReadIndex that might work if I'm lucky --- Cargo.lock | 1 + src/api/Cargo.toml | 1 + src/api/k2v/api_server.rs | 7 +++ src/api/k2v/index.rs | 89 +++++++++++++++++++++++++++++++++++++++ src/api/k2v/item.rs | 3 +- src/api/k2v/mod.rs | 4 +- src/api/k2v/range.rs | 80 +++++++++++++++++++++++++++++++++++ 7 files changed, 182 insertions(+), 3 deletions(-) create mode 100644 src/api/k2v/index.rs create mode 100644 src/api/k2v/range.rs diff --git a/Cargo.lock b/Cargo.lock index 452b8eac..cbc251d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -887,6 +887,7 @@ dependencies = [ "futures-util", "garage_block", "garage_model 0.7.0", + "garage_rpc 0.7.0", "garage_table 0.7.0", "garage_util 0.7.0", "hex", diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 1ba3fd2a..05730a4e 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -18,6 +18,7 @@ garage_model = { version = "0.7.0", path = "../model" } garage_table = { version = "0.7.0", path = "../table" } garage_block = { version = "0.7.0", path = "../block" } garage_util = { version = "0.7.0", path = "../util" } +garage_rpc = { version = "0.7.0", path = "../rpc" } async-trait = "0.1.7" base64 = "0.13" diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index 7a9b039f..0efa5d8e 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -19,6 +19,7 @@ use crate::signature::payload::check_payload_signature; use crate::signature::streaming::*; use crate::helpers::*; +use crate::k2v::index::*; use crate::k2v::item::*; use crate::k2v::router::Endpoint; use crate::s3::cors::*; @@ -135,6 +136,12 @@ impl ApiHandler for K2VApiServer { partition_key, sort_key, } => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await, + Endpoint::ReadIndex { + prefix, + start, + end, + limit, + } => handle_read_index(garage, bucket_id, prefix, start, end, limit).await, //TODO endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), }; diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs new file mode 100644 index 00000000..71e04cd4 --- /dev/null +++ b/src/api/k2v/index.rs @@ -0,0 +1,89 @@ +use std::sync::Arc; + +use hyper::{Body, Response, StatusCode}; +use serde::Serialize; + +use garage_util::data::*; +use garage_util::error::Error as GarageError; + +use garage_rpc::ring::Ring; + +use garage_model::garage::Garage; + +use crate::error::*; +use crate::k2v::range::read_range; + +pub async fn handle_read_index( + garage: Arc, + bucket_id: Uuid, + prefix: Option, + start: Option, + end: Option, + limit: Option, +) -> Result, Error> { + let ring: Arc = garage.system.ring.borrow().clone(); + + let (partition_keys, more, next_start) = read_range( + &garage.k2v_counter_table.table, + &bucket_id, + &prefix, + &start, + &end, + limit, + None, + ) + .await?; + + let s_entries = "entries".to_string(); + let s_values = "values".to_string(); + let s_bytes = "bytes".to_string(); + + let resp = ReadIndexResponse { + prefix, + start, + end, + limit, + partition_keys: partition_keys + .into_iter() + .map(|part| { + let vals = part.filtered_values(&ring); + ReadIndexResponseEntry { + pk: part.sk, + entries: *vals.get(&s_entries).unwrap_or(&0), + values: *vals.get(&s_values).unwrap_or(&0), + bytes: *vals.get(&s_bytes).unwrap_or(&0), + } + }) + .collect::>(), + more, + next_start, + }; + + let resp_json = serde_json::to_string_pretty(&resp).map_err(GarageError::from)?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(resp_json))?) +} + +#[derive(Serialize)] +struct ReadIndexResponse { + prefix: Option, + start: Option, + end: Option, + limit: Option, + + #[serde(rename = "partitionKeys")] + partition_keys: Vec, + + more: bool, + #[serde(rename = "nextStart")] + next_start: Option, +} + +#[derive(Serialize)] +struct ReadIndexResponseEntry { + pk: String, + entries: i64, + values: i64, + bytes: i64, +} diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index 3aa20afe..c74e4192 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -1,4 +1,3 @@ -//! Function related to GET and HEAD requests use std::sync::Arc; use http::header; @@ -13,7 +12,7 @@ use garage_model::k2v::item_table::*; use crate::error::*; -const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; +pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; pub enum ReturnFormat { Json, diff --git a/src/api/k2v/mod.rs b/src/api/k2v/mod.rs index cf8247f7..62eeaa5b 100644 --- a/src/api/k2v/mod.rs +++ b/src/api/k2v/mod.rs @@ -1,5 +1,7 @@ pub mod api_server; - mod router; +mod index; mod item; + +mod range; diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs new file mode 100644 index 00000000..29bca19e --- /dev/null +++ b/src/api/k2v/range.rs @@ -0,0 +1,80 @@ +//! Utility module for retrieving ranges of items in Garage tables +//! Implements parameters (prefix, start, end, limit) as specified +//! for endpoints ReadIndex, ReadBatch and DeleteBatch + +use std::sync::Arc; + +use garage_table::replication::TableShardedReplication; +use garage_table::*; + +use crate::error::*; + +/// Read range in a Garage table. +/// Returns (entries, more?, nextStart) +pub(crate) async fn read_range( + table: &Arc>, + partition_key: &F::P, + prefix: &Option, + start: &Option, + end: &Option, + limit: Option, + filter: Option, +) -> Result<(Vec, bool, Option), Error> +where + F: TableSchema + 'static, +{ + let mut start = match (prefix, start) { + (None, None) => "".to_string(), + (Some(p), None) => p.clone(), + (None, Some(s)) => s.clone(), + (Some(p), Some(s)) => { + if !s.starts_with(p) { + return Err(Error::BadRequest(format!( + "Start key '{}' does not start with prefix '{}'", + s, p + ))); + } + s.clone() + } + }; + let mut start_ignore = false; + + let mut entries = vec![]; + loop { + let n_get = std::cmp::min(1000, limit.unwrap_or(u64::MAX) as usize - entries.len() + 2); + let get_ret = table + .get_range(partition_key, Some(start.clone()), filter.clone(), n_get) + .await?; + + let get_ret_len = get_ret.len(); + + for entry in get_ret { + if let Some(p) = prefix { + if !entry.sort_key().starts_with(p) { + return Ok((entries, false, None)); + } + } + if let Some(e) = end { + if entry.sort_key() == e { + return Ok((entries, false, None)); + } + } + if let Some(l) = limit { + if entries.len() >= l as usize { + return Ok((entries, true, Some(entry.sort_key().clone()))); + } + } + if start_ignore && entry.sort_key() == &start { + continue; + } + entries.push(entry); + } + + if get_ret_len < n_get { + return Ok((entries, false, None)); + } + + start = entries.last().unwrap().sort_key().clone(); + start_ignore = true; + } +}