2022-05-10 11:16:57 +00:00
|
|
|
use std::sync::Arc;
|
|
|
|
|
|
|
|
use async_trait::async_trait;
|
|
|
|
|
2024-02-05 18:27:12 +00:00
|
|
|
use hyper::{body::Incoming as IncomingBody, Method, Request, Response};
|
2024-02-08 22:43:59 +00:00
|
|
|
use tokio::sync::watch;
|
2022-05-10 11:16:57 +00:00
|
|
|
|
|
|
|
use opentelemetry::{trace::SpanRef, KeyValue};
|
|
|
|
|
|
|
|
use garage_util::error::Error as GarageError;
|
2023-09-29 16:41:00 +00:00
|
|
|
use garage_util::socket_address::UnixOrTCPSocketAddress;
|
2022-05-10 11:16:57 +00:00
|
|
|
|
|
|
|
use garage_model::garage::Garage;
|
|
|
|
|
|
|
|
use crate::generic_server::*;
|
2022-05-24 10:16:39 +00:00
|
|
|
use crate::k2v::error::*;
|
2022-05-10 11:16:57 +00:00
|
|
|
|
2024-02-28 09:51:08 +00:00
|
|
|
use crate::signature::verify_request;
|
2022-05-10 11:16:57 +00:00
|
|
|
|
|
|
|
use crate::helpers::*;
|
|
|
|
use crate::k2v::batch::*;
|
|
|
|
use crate::k2v::index::*;
|
|
|
|
use crate::k2v::item::*;
|
|
|
|
use crate::k2v::router::Endpoint;
|
|
|
|
use crate::s3::cors::*;
|
|
|
|
|
2024-02-05 18:27:12 +00:00
|
|
|
pub use crate::signature::streaming::ReqBody;
|
|
|
|
pub type ResBody = BoxBody<Error>;
|
|
|
|
|
2022-05-10 11:16:57 +00:00
|
|
|
pub struct K2VApiServer {
|
|
|
|
garage: Arc<Garage>,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) struct K2VApiEndpoint {
|
|
|
|
bucket_name: String,
|
|
|
|
endpoint: Endpoint,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl K2VApiServer {
|
|
|
|
pub async fn run(
|
|
|
|
garage: Arc<Garage>,
|
2023-09-29 16:41:00 +00:00
|
|
|
bind_addr: UnixOrTCPSocketAddress,
|
2022-09-07 15:54:16 +00:00
|
|
|
s3_region: String,
|
2024-02-08 22:43:59 +00:00
|
|
|
must_exit: watch::Receiver<bool>,
|
2022-05-10 11:16:57 +00:00
|
|
|
) -> Result<(), GarageError> {
|
2022-09-07 15:54:16 +00:00
|
|
|
ApiServer::new(s3_region, K2VApiServer { garage })
|
2024-02-08 22:43:59 +00:00
|
|
|
.run_server(bind_addr, None, must_exit)
|
2022-05-10 11:16:57 +00:00
|
|
|
.await
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[async_trait]
|
|
|
|
impl ApiHandler for K2VApiServer {
|
|
|
|
const API_NAME: &'static str = "k2v";
|
|
|
|
const API_NAME_DISPLAY: &'static str = "K2V";
|
|
|
|
|
|
|
|
type Endpoint = K2VApiEndpoint;
|
2022-05-24 10:16:39 +00:00
|
|
|
type Error = Error;
|
2022-05-10 11:16:57 +00:00
|
|
|
|
2024-02-05 18:27:12 +00:00
|
|
|
fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<K2VApiEndpoint, Error> {
|
2022-05-10 11:16:57 +00:00
|
|
|
let (endpoint, bucket_name) = Endpoint::from_request(req)?;
|
|
|
|
|
|
|
|
Ok(K2VApiEndpoint {
|
|
|
|
bucket_name,
|
|
|
|
endpoint,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn handle(
|
|
|
|
&self,
|
2024-02-28 09:51:08 +00:00
|
|
|
req: Request<IncomingBody>,
|
2022-05-10 11:16:57 +00:00
|
|
|
endpoint: K2VApiEndpoint,
|
2024-02-05 18:27:12 +00:00
|
|
|
) -> Result<Response<ResBody>, Error> {
|
2022-05-10 11:16:57 +00:00
|
|
|
let K2VApiEndpoint {
|
|
|
|
bucket_name,
|
|
|
|
endpoint,
|
|
|
|
} = endpoint;
|
|
|
|
let garage = self.garage.clone();
|
|
|
|
|
|
|
|
// The OPTIONS method is procesed early, before we even check for an API key
|
|
|
|
if let Endpoint::Options = endpoint {
|
2024-02-05 18:27:12 +00:00
|
|
|
let options_res = handle_options_api(garage, &req, Some(bucket_name))
|
2022-05-24 10:16:39 +00:00
|
|
|
.await
|
2024-02-05 18:27:12 +00:00
|
|
|
.ok_or_bad_request("Error handling OPTIONS")?;
|
|
|
|
return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
|
2022-05-10 11:16:57 +00:00
|
|
|
}
|
|
|
|
|
2024-02-28 09:51:08 +00:00
|
|
|
let (req, api_key, _content_sha256) = verify_request(&garage, req, "k2v").await?;
|
2022-05-10 11:16:57 +00:00
|
|
|
|
2022-05-24 10:16:39 +00:00
|
|
|
let bucket_id = garage
|
|
|
|
.bucket_helper()
|
|
|
|
.resolve_bucket(&bucket_name, &api_key)
|
|
|
|
.await?;
|
2022-05-10 11:16:57 +00:00
|
|
|
let bucket = garage
|
2022-05-24 10:16:39 +00:00
|
|
|
.bucket_helper()
|
|
|
|
.get_existing_bucket(bucket_id)
|
|
|
|
.await?;
|
2022-05-10 11:16:57 +00:00
|
|
|
|
|
|
|
let allowed = match endpoint.authorization_type() {
|
|
|
|
Authorization::Read => api_key.allow_read(&bucket_id),
|
|
|
|
Authorization::Write => api_key.allow_write(&bucket_id),
|
|
|
|
Authorization::Owner => api_key.allow_owner(&bucket_id),
|
|
|
|
_ => unreachable!(),
|
|
|
|
};
|
|
|
|
|
|
|
|
if !allowed {
|
2022-05-24 10:16:39 +00:00
|
|
|
return Err(Error::forbidden("Operation is not allowed for this key."));
|
2022-05-10 11:16:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Look up what CORS rule might apply to response.
|
|
|
|
// Requests for methods different than GET, HEAD or POST
|
|
|
|
// are always preflighted, i.e. the browser should make
|
|
|
|
// an OPTIONS call before to check it is allowed
|
|
|
|
let matching_cors_rule = match *req.method() {
|
2022-05-24 10:16:39 +00:00
|
|
|
Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)
|
|
|
|
.ok_or_internal_error("Error looking up CORS rule")?,
|
2022-05-10 11:16:57 +00:00
|
|
|
_ => None,
|
|
|
|
};
|
|
|
|
|
|
|
|
let resp = match endpoint {
|
|
|
|
Endpoint::DeleteItem {
|
|
|
|
partition_key,
|
|
|
|
sort_key,
|
|
|
|
} => handle_delete_item(garage, req, bucket_id, &partition_key, &sort_key).await,
|
|
|
|
Endpoint::InsertItem {
|
|
|
|
partition_key,
|
|
|
|
sort_key,
|
|
|
|
} => handle_insert_item(garage, req, bucket_id, &partition_key, &sort_key).await,
|
|
|
|
Endpoint::ReadItem {
|
|
|
|
partition_key,
|
|
|
|
sort_key,
|
|
|
|
} => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await,
|
|
|
|
Endpoint::PollItem {
|
|
|
|
partition_key,
|
|
|
|
sort_key,
|
|
|
|
causality_token,
|
|
|
|
timeout,
|
|
|
|
} => {
|
|
|
|
handle_poll_item(
|
|
|
|
garage,
|
|
|
|
&req,
|
|
|
|
bucket_id,
|
|
|
|
partition_key,
|
|
|
|
sort_key,
|
|
|
|
causality_token,
|
|
|
|
timeout,
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
}
|
|
|
|
Endpoint::ReadIndex {
|
|
|
|
prefix,
|
|
|
|
start,
|
|
|
|
end,
|
|
|
|
limit,
|
|
|
|
reverse,
|
|
|
|
} => handle_read_index(garage, bucket_id, prefix, start, end, limit, reverse).await,
|
|
|
|
Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await,
|
|
|
|
Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await,
|
|
|
|
Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await,
|
2023-01-10 14:22:25 +00:00
|
|
|
Endpoint::PollRange { partition_key } => {
|
|
|
|
handle_poll_range(garage, bucket_id, &partition_key, req).await
|
|
|
|
}
|
2022-05-10 11:16:57 +00:00
|
|
|
Endpoint::Options => unreachable!(),
|
|
|
|
};
|
|
|
|
|
|
|
|
// If request was a success and we have a CORS rule that applies to it,
|
|
|
|
// add the corresponding CORS headers to the response
|
|
|
|
let mut resp_ok = resp?;
|
|
|
|
if let Some(rule) = matching_cors_rule {
|
|
|
|
add_cors_headers(&mut resp_ok, rule)
|
|
|
|
.ok_or_internal_error("Invalid bucket CORS configuration")?;
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(resp_ok)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ApiEndpoint for K2VApiEndpoint {
|
|
|
|
fn name(&self) -> &'static str {
|
|
|
|
self.endpoint.name()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn add_span_attributes(&self, span: SpanRef<'_>) {
|
|
|
|
span.set_attribute(KeyValue::new("bucket", self.bucket_name.clone()));
|
|
|
|
}
|
|
|
|
}
|