garage/src/api/k2v/api_server.rs

170 lines
4.3 KiB
Rust
Raw Normal View History

use std::sync::Arc;
use async_trait::async_trait;
2022-04-14 12:44:18 +00:00
use futures::future::Future;
use hyper::{Body, Method, Request, Response};
use opentelemetry::{trace::SpanRef, KeyValue};
use garage_table::util::*;
use garage_util::error::Error as GarageError;
use garage_model::garage::Garage;
2022-04-14 12:44:18 +00:00
use crate::error::*;
use crate::generic_server::*;
2022-04-14 12:44:18 +00:00
use crate::signature::payload::check_payload_signature;
use crate::signature::streaming::*;
2022-04-14 12:44:18 +00:00
use crate::helpers::*;
use crate::k2v::index::*;
2022-04-14 14:19:31 +00:00
use crate::k2v::item::*;
2022-04-14 13:02:49 +00:00
use crate::k2v::router::Endpoint;
use crate::s3::cors::*;
pub struct K2VApiServer {
garage: Arc<Garage>,
}
pub(crate) struct K2VApiEndpoint {
bucket_name: String,
endpoint: Endpoint,
}
impl K2VApiServer {
pub async fn run(
garage: Arc<Garage>,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {
let addr = garage.config.s3_api.api_bind_addr;
ApiServer::new(
garage.config.s3_api.s3_region.clone(),
K2VApiServer { garage },
)
.run_server(addr, shutdown_signal)
.await
}
}
#[async_trait]
impl ApiHandler for K2VApiServer {
const API_NAME: &'static str = "k2v";
const API_NAME_DISPLAY: &'static str = "K2V";
type Endpoint = K2VApiEndpoint;
fn parse_endpoint(&self, req: &Request<Body>) -> Result<K2VApiEndpoint, Error> {
let (endpoint, bucket_name) = Endpoint::from_request(req)?;
Ok(K2VApiEndpoint {
bucket_name,
endpoint,
})
}
async fn handle(
&self,
req: Request<Body>,
endpoint: K2VApiEndpoint,
) -> Result<Response<Body>, Error> {
let K2VApiEndpoint {
bucket_name,
endpoint,
} = endpoint;
let garage = self.garage.clone();
// The OPTIONS method is procesed early, before we even check for an API key
if let Endpoint::Options = endpoint {
return handle_options_s3api(garage, &req, Some(bucket_name)).await;
}
let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?;
let api_key = api_key.ok_or_else(|| {
Error::Forbidden("Garage does not support anonymous access yet".to_string())
})?;
2022-04-14 13:02:49 +00:00
let req = parse_streaming_body(
&api_key,
req,
&mut content_sha256,
&garage.config.s3_api.s3_region,
)?;
let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?;
let bucket = garage
.bucket_table
.get(&EmptyKey, &bucket_id)
.await?
.filter(|b| !b.state.is_deleted())
.ok_or(Error::NoSuchBucket)?;
let allowed = match endpoint.authorization_type() {
Authorization::Read => api_key.allow_read(&bucket_id),
Authorization::Write => api_key.allow_write(&bucket_id),
Authorization::Owner => api_key.allow_owner(&bucket_id),
_ => unreachable!(),
};
if !allowed {
return Err(Error::Forbidden(
"Operation is not allowed for this key.".to_string(),
));
}
// Look up what CORS rule might apply to response.
// Requests for methods different than GET, HEAD or POST
// are always preflighted, i.e. the browser should make
// an OPTIONS call before to check it is allowed
let matching_cors_rule = match *req.method() {
Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?,
_ => None,
};
let resp = match endpoint {
2022-04-15 13:49:30 +00:00
Endpoint::DeleteItem {
partition_key,
sort_key,
} => handle_delete_item(garage, req, bucket_id, &partition_key, &sort_key).await,
Endpoint::InsertItem {
partition_key,
sort_key,
} => handle_insert_item(garage, req, bucket_id, &partition_key, &sort_key).await,
2022-04-14 14:19:31 +00:00
Endpoint::ReadItem {
partition_key,
sort_key,
} => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await,
Endpoint::ReadIndex {
prefix,
start,
end,
limit,
} => handle_read_index(garage, bucket_id, prefix, start, end, limit).await,
//TODO
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
};
// If request was a success and we have a CORS rule that applies to it,
// add the corresponding CORS headers to the response
let mut resp_ok = resp?;
if let Some(rule) = matching_cors_rule {
add_cors_headers(&mut resp_ok, rule)
.ok_or_internal_error("Invalid bucket CORS configuration")?;
}
Ok(resp_ok)
}
}
impl ApiEndpoint for K2VApiEndpoint {
fn name(&self) -> &'static str {
self.endpoint.name()
}
fn add_span_attributes(&self, span: SpanRef<'_>) {
2022-04-14 13:02:49 +00:00
span.set_attribute(KeyValue::new("bucket", self.bucket_name.clone()));
}
}