use std::collections::BTreeMap; use std::time::Duration; use http::header::{ACCEPT, CONTENT_LENGTH, CONTENT_TYPE}; use http::status::StatusCode; use http::HeaderMap; use rusoto_core::{ByteStream, DispatchSignedRequest, HttpClient}; use rusoto_credential::AwsCredentials; use rusoto_signature::region::Region; use rusoto_signature::signature::SignedRequest; use serde::de::Error as DeError; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use tokio::io::AsyncReadExt; mod error; pub use error::Error; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300); const SERVICE: &str = "k2v"; const GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; /// Client used to query a K2V server. pub struct K2vClient { region: Region, bucket: String, creds: AwsCredentials, client: HttpClient, } impl K2vClient { /// Create a new K2V client. pub fn new( region: Region, bucket: String, creds: AwsCredentials, user_agent: Option, ) -> Result { let mut client = HttpClient::new()?; if let Some(ua) = user_agent { client.local_agent_prepend(ua); } else { client.local_agent_prepend(format!("k2v/{}", env!("CARGO_PKG_VERSION"))); } Ok(K2vClient { region, bucket, creds, client, }) } /// Perform a ReadItem request, reading the value(s) stored for a single pk+sk. pub async fn read_item( &self, partition_key: &str, sort_key: &str, ) -> Result { let mut req = SignedRequest::new( "GET", SERVICE, &self.region, &format!("/{}/{}", self.bucket, partition_key), ); req.add_param("sort_key", sort_key); req.add_header(ACCEPT, "application/octet-stream, application/json"); let res = self.dispatch(req, None).await?; let causality = res .causality_token .ok_or_else(|| Error::InvalidResponse("missing causality token".into()))?; if res.status == StatusCode::NO_CONTENT { return Ok(CausalValue { causality, value: vec![K2vValue::Tombstone], }); } match res.content_type.as_deref() { Some("application/octet-stream") => Ok(CausalValue { causality, value: vec![K2vValue::Value(res.body)], }), Some("application/json") => { let value = serde_json::from_slice(&res.body)?; Ok(CausalValue { causality, value }) } Some(ct) => Err(Error::InvalidResponse( format!("invalid content type: {}", ct).into(), )), None => Err(Error::InvalidResponse("missing content type".into())), } } /// Perform a PollItem request, waiting for the value(s) stored for a single pk+sk to be /// updated. pub async fn poll_item( &self, partition_key: &str, sort_key: &str, causality: CausalityToken, timeout: Option, ) -> Result, Error> { let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT); let mut req = SignedRequest::new( "GET", SERVICE, &self.region, &format!("/{}/{}", self.bucket, partition_key), ); req.add_param("sort_key", sort_key); req.add_param("causality_token", &causality.0); req.add_param("timeout", &timeout.as_secs().to_string()); req.add_header(ACCEPT, "application/octet-stream, application/json"); let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?; let causality = res .causality_token .ok_or_else(|| Error::InvalidResponse("missing causality token".into()))?; if res.status == StatusCode::NOT_MODIFIED { return Ok(None); } if res.status == StatusCode::NO_CONTENT { return Ok(Some(CausalValue { causality, value: vec![K2vValue::Tombstone], })); } match res.content_type.as_deref() { Some("application/octet-stream") => Ok(Some(CausalValue { causality, value: vec![K2vValue::Value(res.body)], })), Some("application/json") => { let value = serde_json::from_slice(&res.body)?; Ok(Some(CausalValue { causality, value })) } Some(ct) => Err(Error::InvalidResponse( format!("invalid content type: {}", ct).into(), )), None => Err(Error::InvalidResponse("missing content type".into())), } } /// Perform an InsertItem request, inserting a value for a single pk+sk. pub async fn insert_item( &self, partition_key: &str, sort_key: &str, value: Vec, causality: Option, ) -> Result<(), Error> { let mut req = SignedRequest::new( "PUT", SERVICE, &self.region, &format!("/{}/{}", self.bucket, partition_key), ); req.add_param("sort_key", sort_key); req.set_payload(Some(value)); if let Some(causality) = causality { req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0); } self.dispatch(req, None).await?; Ok(()) } /// Perform a DeleteItem request, deleting the value(s) stored for a single pk+sk. pub async fn delete_item( &self, partition_key: &str, sort_key: &str, causality: CausalityToken, ) -> Result<(), Error> { let mut req = SignedRequest::new( "DELETE", SERVICE, &self.region, &format!("/{}/{}", self.bucket, partition_key), ); req.add_param("sort_key", sort_key); req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0); self.dispatch(req, None).await?; Ok(()) } /// Perform a ReadIndex request, listing partition key which have at least one associated /// sort key, and which matches the filter. pub async fn read_index( &self, filter: Filter<'_>, ) -> Result, Error> { let mut req = SignedRequest::new("GET", SERVICE, &self.region, &format!("/{}", self.bucket)); filter.insert_params(&mut req); let res = self.dispatch(req, None).await?; let resp: ReadIndexResponse = serde_json::from_slice(&res.body)?; let items = resp .partition_keys .into_iter() .map(|ReadIndexItem { pk, info }| (pk, info)) .collect(); Ok(PaginatedRange { items, next_start: resp.next_start, }) } /// Perform an InsertBatch request, inserting multiple values at once. Note: this operation is /// *not* atomic: it is possible for some sub-operations to fails and others to success. In /// that case, failure is reported. pub async fn insert_batch(&self, operations: &[BatchInsertOp<'_>]) -> Result<(), Error> { let mut req = SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket)); let payload = serde_json::to_vec(operations)?; req.set_payload(Some(payload)); self.dispatch(req, None).await?; Ok(()) } /// Perform a ReadBatch request, reading multiple values or range of values at once. pub async fn read_batch( &self, operations: &[BatchReadOp<'_>], ) -> Result>, Error> { let mut req = SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket)); req.add_param("search", ""); let payload = serde_json::to_vec(operations)?; req.set_payload(Some(payload)); let res = self.dispatch(req, None).await?; let resp: Vec = serde_json::from_slice(&res.body)?; Ok(resp .into_iter() .map(|e| PaginatedRange { items: e .items .into_iter() .map(|BatchReadItem { sk, ct, v }| { ( sk, CausalValue { causality: ct, value: v, }, ) }) .collect(), next_start: e.next_start, }) .collect()) } /// Perform a DeleteBatch request, deleting mutiple values or range of values at once, without /// providing causality information. pub async fn delete_batch(&self, operations: &[BatchDeleteOp<'_>]) -> Result, Error> { let mut req = SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket)); req.add_param("delete", ""); let payload = serde_json::to_vec(operations)?; req.set_payload(Some(payload)); let res = self.dispatch(req, None).await?; let resp: Vec = serde_json::from_slice(&res.body)?; Ok(resp.into_iter().map(|r| r.deleted_items).collect()) } async fn dispatch( &self, mut req: SignedRequest, timeout: Option, ) -> Result { req.sign(&self.creds); let mut res = self .client .dispatch(req, Some(timeout.unwrap_or(DEFAULT_TIMEOUT))) .await?; let causality_token = res .headers .remove(GARAGE_CAUSALITY_TOKEN) .map(CausalityToken); let content_type = res.headers.remove(CONTENT_TYPE); let body = match res.status { StatusCode::OK => read_body(&mut res.headers, res.body).await?, StatusCode::NO_CONTENT => Vec::new(), StatusCode::NOT_FOUND => return Err(Error::NotFound), StatusCode::NOT_MODIFIED => Vec::new(), _ => { return Err(Error::InvalidResponse( format!("invalid error code: {}", res.status).into(), )) } }; Ok(Response { body, status: res.status, causality_token, content_type, }) } } async fn read_body(headers: &mut HeaderMap, body: ByteStream) -> Result, Error> { let body_len = headers .get(CONTENT_LENGTH) .and_then(|h| h.parse().ok()) .unwrap_or(0); let mut res = Vec::with_capacity(body_len); body.into_async_read().read_to_end(&mut res).await?; Ok(res) } /// An opaque token used to convey causality between operations. #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] #[serde(transparent)] pub struct CausalityToken(String); impl From for CausalityToken { fn from(v: String) -> Self { CausalityToken(v) } } impl From for String { fn from(v: CausalityToken) -> Self { v.0 } } /// A value in K2V. can be either a binary value, or a tombstone. #[derive(Debug, Clone, PartialEq, Eq)] pub enum K2vValue { Tombstone, Value(Vec), } impl From> for K2vValue { fn from(v: Vec) -> Self { K2vValue::Value(v) } } impl From>> for K2vValue { fn from(v: Option>) -> Self { match v { Some(v) => K2vValue::Value(v), None => K2vValue::Tombstone, } } } impl<'de> Deserialize<'de> for K2vValue { fn deserialize(d: D) -> Result where D: Deserializer<'de>, { let val: Option<&str> = Option::deserialize(d)?; Ok(match val { Some(s) => { K2vValue::Value(base64::decode(s).map_err(|_| DeError::custom("invalid base64"))?) } None => K2vValue::Tombstone, }) } } impl Serialize for K2vValue { fn serialize(&self, serializer: S) -> Result where S: Serializer, { match self { K2vValue::Tombstone => serializer.serialize_none(), K2vValue::Value(v) => { let b64 = base64::encode(v); serializer.serialize_str(&b64) } } } } /// A set of K2vValue and associated causality information. #[derive(Debug, Clone, Serialize)] pub struct CausalValue { pub causality: CausalityToken, pub value: Vec, } /// Result of paginated requests. #[derive(Debug, Clone)] pub struct PaginatedRange { pub items: BTreeMap, pub next_start: Option, } /// Filter for batch operations. #[derive(Debug, Default, Clone, Deserialize, Serialize)] pub struct Filter<'a> { pub start: Option<&'a str>, pub end: Option<&'a str>, pub prefix: Option<&'a str>, pub limit: Option, #[serde(default)] pub reverse: bool, } impl<'a> Filter<'a> { fn insert_params(&self, req: &mut SignedRequest) { if let Some(start) = &self.start { req.add_param("start", start); } if let Some(end) = &self.end { req.add_param("end", end); } if let Some(prefix) = &self.prefix { req.add_param("prefix", prefix); } if let Some(limit) = &self.limit { req.add_param("limit", &limit.to_string()); } if self.reverse { req.add_param("reverse", "true"); } } } #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] struct ReadIndexResponse<'a> { #[serde(flatten, borrow)] #[allow(dead_code)] filter: Filter<'a>, partition_keys: Vec, #[allow(dead_code)] more: bool, next_start: Option, } #[derive(Debug, Clone, Deserialize)] struct ReadIndexItem { pk: String, #[serde(flatten)] info: PartitionInfo, } /// Information about data stored with a given partition key. #[derive(Debug, Clone, Deserialize, Serialize)] pub struct PartitionInfo { pub entries: u64, pub conflicts: u64, pub values: u64, pub bytes: u64, } /// Single sub-operation of an InsertBatch. #[derive(Debug, Clone, Serialize)] pub struct BatchInsertOp<'a> { #[serde(rename = "pk")] pub partition_key: &'a str, #[serde(rename = "sk")] pub sort_key: &'a str, #[serde(rename = "ct")] pub causality: Option, #[serde(rename = "v")] pub value: K2vValue, } /// Single sub-operation of a ReadBatch. #[derive(Debug, Default, Clone, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct BatchReadOp<'a> { pub partition_key: &'a str, #[serde(flatten, borrow)] pub filter: Filter<'a>, #[serde(default)] pub single_item: bool, #[serde(default)] pub conflicts_only: bool, #[serde(default)] pub tombstones: bool, } #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] struct BatchReadResponse<'a> { #[serde(flatten, borrow)] #[allow(dead_code)] op: BatchReadOp<'a>, items: Vec, #[allow(dead_code)] more: bool, next_start: Option, } #[derive(Debug, Clone, Deserialize)] struct BatchReadItem { sk: String, ct: CausalityToken, v: Vec, } /// Single sub-operation of a DeleteBatch #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct BatchDeleteOp<'a> { pub partition_key: &'a str, pub prefix: Option<&'a str>, pub start: Option<&'a str>, pub end: Option<&'a str>, #[serde(default)] pub single_item: bool, } impl<'a> BatchDeleteOp<'a> { pub fn new(partition_key: &'a str) -> Self { BatchDeleteOp { partition_key, prefix: None, start: None, end: None, single_item: false, } } } #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] struct BatchDeleteResponse<'a> { #[serde(flatten, borrow)] #[allow(dead_code)] filter: BatchDeleteOp<'a>, deleted_items: u64, } struct Response { body: Vec, status: StatusCode, causality_token: Option, content_type: Option, }