New model for buckets #172

Merged
lx merged 19 commits from new-buckets into main 2022-01-10 11:32:42 +00:00
18 changed files with 243 additions and 107 deletions
Showing only changes of commit b1cfd16913 - Show all commits

1
Cargo.lock generated
View file

@ -403,6 +403,7 @@ dependencies = [
"rand",
"rmp-serde 0.15.5",
"serde",
"serde_bytes",
"sled",
"structopt",
"tokio",

View file

@ -109,11 +109,9 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
let endpoint = Endpoint::from_request(&req, bucket.map(ToOwned::to_owned))?;
let bucket_name = match endpoint.authorization_type() {
Authorization::None => {
return handle_request_without_bucket(garage, req, api_key, endpoint).await
}
Authorization::Read(bucket) | Authorization::Write(bucket) => bucket.to_string(),
let bucket_name = match endpoint.get_bucket() {
None => return handle_request_without_bucket(garage, req, api_key, endpoint).await,
Some(bucket) => bucket.to_string(),
};
let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?;

View file

@ -67,7 +67,7 @@ pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result<Respo
if let Some(alias_ent) = alias_ent {
if let Some(alias_p) = alias_ent.state.get().as_option() {
if alias_p.bucket_id == *bucket_id {
aliases.insert(alias_ent.name.clone(), *bucket_id);
aliases.insert(alias_ent.name().to_string(), *bucket_id);
}
}
}

View file

@ -25,7 +25,6 @@ pub async fn handle_delete_website(
.ok_or(Error::NotFound)?;
if let crdt::Deletable::Present(param) = &mut bucket.state {
param.website_access.update(false);
param.website_config.update(None);
garage.bucket_table.insert(&bucket).await?;
} else {
@ -57,7 +56,6 @@ pub async fn handle_put_website(
conf.validate()?;
if let crdt::Deletable::Present(param) = &mut bucket.state {
param.website_access.update(true);
param
.website_config
.update(Some(ByteBuf::from(body.to_vec())));

View file

@ -35,6 +35,7 @@ sled = "0.34"
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
structopt = { version = "0.3", default-features = false }
toml = "0.5"

View file

@ -4,6 +4,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use garage_util::crdt::*;
use garage_util::data::*;
@ -27,6 +28,8 @@ use crate::repair::Repair;
pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
macro_rules! INVALID_BUCKET_NAME_MESSAGE { () => { "Invalid bucket name: {}. See AWS documentation for constraints on S3 bucket names:\nhttps://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html" }; }
#[derive(Debug, Serialize, Deserialize)]
pub enum AdminRpc {
BucketOperation(BucketOperation),
@ -142,14 +145,14 @@ impl AdminRpcHandler {
}));
alias
}
None => BucketAlias::new(name.clone(), bucket.id),
None => BucketAlias::new(name.clone(), bucket.id)
.ok_or_message(format!(INVALID_BUCKET_NAME_MESSAGE!(), name))?,
};
bucket
.state
.as_option_mut()
.unwrap()
.aliases
.update_in_place(name.clone(), true);
bucket.state.as_option_mut().unwrap().aliases.merge_raw(
name,
alias.state.timestamp(),
&true,
);
self.garage.bucket_table.insert(&bucket).await?;
self.garage.bucket_alias_table.insert(&alias).await?;
Ok(AdminRpc::Ok(format!("Bucket {} was created.", name)))
@ -222,7 +225,7 @@ impl AdminRpcHandler {
// 2. delete bucket alias
bucket_alias.state.update(Deletable::Deleted);
self.garage.bucket_alias_table.insert(&bucket_alias).await?;
// 3. delete bucket alias
// 3. delete bucket
bucket.state = Deletable::delete();
self.garage.bucket_table.insert(&bucket).await?;
@ -259,15 +262,36 @@ impl AdminRpcHandler {
}
}
key_param.local_aliases = key_param
.local_aliases
.update_mutator(query.new_name.clone(), Deletable::present(bucket_id));
if !is_valid_bucket_name(&query.new_name) {
return Err(Error::Message(format!(
INVALID_BUCKET_NAME_MESSAGE!(),
query.new_name
)));
}
// Checks ok, add alias
let mut bucket_p = bucket.state.as_option_mut().unwrap();
let bucket_p_local_alias_key = (key.key_id.clone(), query.new_name.clone());
// Calculate the timestamp to assign to this aliasing in the two local_aliases maps
// (the one from key to bucket, and the reverse one stored in the bucket iself)
// so that merges on both maps in case of a concurrent operation resolve
// to the same alias being set
let alias_ts = increment_logical_clock_2(
key_param.local_aliases.get_timestamp(&query.new_name),
bucket_p
.local_aliases
.get_timestamp(&bucket_p_local_alias_key),
);
key_param.local_aliases = LwwMap::raw_item(
query.new_name.clone(),
alias_ts,
Deletable::present(bucket_id),
);
self.garage.key_table.insert(&key).await?;
let mut bucket_p = bucket.state.as_option_mut().unwrap();
bucket_p.local_aliases = bucket_p
.local_aliases
.update_mutator((key.key_id.clone(), query.new_name.clone()), true);
bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, true);
self.garage.bucket_table.insert(&bucket).await?;
Ok(AdminRpc::Ok(format!(
@ -275,40 +299,47 @@ impl AdminRpcHandler {
query.new_name, bucket_id, key.key_id
)))
} else {
let mut alias = self
let alias = self
.garage
.bucket_alias_table
.get(&EmptyKey, &query.new_name)
.await?
.unwrap_or(BucketAlias {
name: query.new_name.clone(),
state: Lww::new(Deletable::delete()),
});
.await?;
if let Some(existing_alias) = alias.state.get().as_option() {
if existing_alias.bucket_id == bucket_id {
return Ok(AdminRpc::Ok(format!(
"Alias {} already points to bucket {:?}",
query.new_name, bucket_id
)));
} else {
return Err(Error::Message(format!(
"Alias {} already exists and points to different bucket: {:?}",
query.new_name, existing_alias.bucket_id
)));
if let Some(existing_alias) = alias.as_ref() {
if let Some(p) = existing_alias.state.get().as_option() {
if p.bucket_id == bucket_id {
return Ok(AdminRpc::Ok(format!(
"Alias {} already points to bucket {:?}",
query.new_name, bucket_id
)));
} else {
return Err(Error::Message(format!(
"Alias {} already exists and points to different bucket: {:?}",
query.new_name, p.bucket_id
)));
}
}
}
// Checks ok, add alias
alias
.state
.update(Deletable::present(AliasParams { bucket_id }));
let mut bucket_p = bucket.state.as_option_mut().unwrap();
let alias_ts = increment_logical_clock_2(
bucket_p.aliases.get_timestamp(&query.new_name),
alias.as_ref().map(|a| a.state.timestamp()).unwrap_or(0),
);
let alias = match alias {
None => BucketAlias::new(query.new_name.clone(), bucket_id)
.ok_or_message(format!(INVALID_BUCKET_NAME_MESSAGE!(), query.new_name))?,
Some(mut a) => {
a.state = Lww::raw(alias_ts, Deletable::present(AliasParams { bucket_id }));
a
}
};
self.garage.bucket_alias_table.insert(&alias).await?;
let mut bucket_p = bucket.state.as_option_mut().unwrap();
bucket_p.aliases = bucket_p
.aliases
.update_mutator(query.new_name.clone(), true);
bucket_p.aliases = LwwMap::raw_item(query.new_name.clone(), alias_ts, true);
self.garage.bucket_table.insert(&bucket).await?;
Ok(AdminRpc::Ok(format!(
@ -336,14 +367,14 @@ impl AdminRpcHandler {
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let mut bucket_state = bucket.state.as_option_mut().unwrap();
let mut bucket_p = bucket.state.as_option_mut().unwrap();
let has_other_aliases = bucket_state
let has_other_aliases = bucket_p
.aliases
.items()
.iter()
.any(|(_, _, active)| *active)
|| bucket_state
|| bucket_p
.local_aliases
.items()
.iter()
@ -352,15 +383,22 @@ impl AdminRpcHandler {
return Err(Error::Message(format!("Bucket {} doesn't have other aliases, please delete it instead of just unaliasing.", query.name)));
}
// Checks ok, remove alias
let mut key_param = key.state.as_option_mut().unwrap();
key_param.local_aliases = key_param
.local_aliases
.update_mutator(query.name.clone(), Deletable::delete());
let bucket_p_local_alias_key = (key.key_id.clone(), query.name.clone());
let alias_ts = increment_logical_clock_2(
key_param.local_aliases.get_timestamp(&query.name),
bucket_p
.local_aliases
.get_timestamp(&bucket_p_local_alias_key),
);
key_param.local_aliases =
LwwMap::raw_item(query.name.clone(), alias_ts, Deletable::delete());
self.garage.key_table.insert(&key).await?;
bucket_state.local_aliases = bucket_state
.local_aliases
.update_mutator((key.key_id.clone(), query.name.clone()), false);
bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, false);
self.garage.bucket_table.insert(&bucket).await?;
Ok(AdminRpc::Ok(format!(
@ -401,12 +439,17 @@ impl AdminRpcHandler {
.get(&EmptyKey, &query.name)
.await?
.ok_or_message("Internal error: alias not found")?;
alias.state.update(Deletable::delete());
// Checks ok, remove alias
let alias_ts = increment_logical_clock_2(
alias.state.timestamp(),
bucket_state.aliases.get_timestamp(&query.name),
);
alias.state = Lww::raw(alias_ts, Deletable::delete());
self.garage.bucket_alias_table.insert(&alias).await?;
bucket_state.aliases = bucket_state
.aliases
.update_mutator(query.name.clone(), false);
bucket_state.aliases = LwwMap::raw_item(query.name.clone(), alias_ts, false);
self.garage.bucket_table.insert(&bucket).await?;
Ok(AdminRpc::Ok(format!("Bucket alias {} deleted", query.name)))
@ -494,7 +537,13 @@ impl AdminRpcHandler {
));
}
bucket_state.website_access.update(query.allow);
let website = if query.allow {
Some(ByteBuf::from(DEFAULT_WEBSITE_CONFIGURATION.to_vec()))
} else {
None
};
bucket_state.website_config.update(website);
self.garage.bucket_table.insert(&bucket).await?;
let msg = if query.allow {

View file

@ -167,7 +167,7 @@ pub async fn cmd_admin(
let mut table = vec![];
for alias in bl {
if let Some(p) = alias.state.get().as_option() {
table.push(format!("\t{}\t{:?}", alias.name, p.bucket_id));
table.push(format!("\t{}\t{:?}", alias.name(), p.bucket_id));
}
}
format_table(table);

View file

@ -82,7 +82,7 @@ pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>)
match &bucket.state {
Deletable::Deleted => println!("Bucket is deleted."),
Deletable::Present(p) => {
println!("Website access: {}", p.website_access.get());
println!("Website access: {}", p.website_config.get().is_some());
println!("\nGlobal aliases:");
for (alias, _, active) in p.aliases.items().iter() {

View file

@ -8,7 +8,7 @@ use garage_util::data::*;
/// in the global namespace.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketAlias {
pub name: String,
name: String,
pub state: crdt::Lww<crdt::Deletable<AliasParams>>,
}
@ -22,15 +22,22 @@ impl AutoCrdt for AliasParams {
}
impl BucketAlias {
pub fn new(name: String, bucket_id: Uuid) -> Self {
BucketAlias {
name,
state: crdt::Lww::new(crdt::Deletable::present(AliasParams { bucket_id })),
pub fn new(name: String, bucket_id: Uuid) -> Option<Self> {
if !is_valid_bucket_name(&name) {
None
} else {
Some(BucketAlias {
name,
state: crdt::Lww::new(crdt::Deletable::present(AliasParams { bucket_id })),
})
}
}
pub fn is_deleted(&self) -> bool {
self.state.get().is_deleted()
}
pub fn name(&self) -> &str {
&self.name
}
}
impl Crdt for BucketAlias {
@ -62,3 +69,29 @@ impl TableSchema for BucketAliasTable {
filter.apply(entry.is_deleted())
}
}
/// Check if a bucket name is valid.
///
/// The requirements are listed here:
///
/// <https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html>
///
/// In the case of Garage, bucket names must not be hex-encoded
/// 32 byte string, which is excluded thanks to the
/// maximum length of 63 bytes given in the spec.
pub fn is_valid_bucket_name(n: &str) -> bool {
// Bucket names must be between 3 and 63 characters
n.len() >= 3 && n.len() <= 63
// Bucket names must be composed of lowercase letters, numbers,
// dashes and dots
&& n.chars().all(|c| matches!(c, '.' | '-' | 'a'..='z' | '0'..='9'))
// Bucket names must start and end with a letter or a number
&& !n.starts_with(&['-', '.'][..])
&& !n.ends_with(&['-', '.'][..])
// Bucket names must not be formated as an IP address
&& n.parse::<std::net::IpAddr>().is_err()
// Bucket names must not start wih "xn--"
&& !n.starts_with("xn--")
// Bucket names must not end with "-s3alias"
&& !n.ends_with("-s3alias")
}

View file

@ -8,12 +8,21 @@ use crate::garage::Garage;
pub struct BucketHelper<'a>(pub(crate) &'a Garage);
#[allow(clippy::ptr_arg)]
impl<'a> BucketHelper<'a> {
#[allow(clippy::ptr_arg)]
pub async fn resolve_global_bucket_name(
&self,
bucket_name: &String,
) -> Result<Option<Uuid>, Error> {
// Bucket names in Garage are aliases, true bucket identifiers
// are 32-byte UUIDs. This function resolves bucket names into
// their full identifier by looking up in the bucket_alias_table.
// This function also allows buckets to be identified by their
// full UUID (hex-encoded). Here, if the name to be resolved is a
// hex string of the correct length, it is directly parsed as a bucket
// identifier which is returned. There is no risk of this conflicting
// with an actual bucket name: bucket names are max 63 chars long by
// the AWS spec, and hex-encoded UUIDs are 64 chars long.
let hexbucket = hex::decode(bucket_name.as_str())
.ok()
.map(|by| Uuid::try_from(&by))
@ -37,7 +46,6 @@ impl<'a> BucketHelper<'a> {
}
}
#[allow(clippy::ptr_arg)]
pub async fn get_existing_bucket(&self, bucket_id: Uuid) -> Result<Bucket, Error> {
self.0
.bucket_table

View file

@ -8,6 +8,8 @@ use garage_util::time::*;
use crate::permission::BucketKeyPerm;
pub const DEFAULT_WEBSITE_CONFIGURATION: &[u8] = b""; // TODO (an XML WebsiteConfiguration document per the AWS spec)
/// A bucket is a collection of objects
///
/// Its parameters are not directly accessible as:
@ -29,9 +31,8 @@ pub struct BucketParams {
/// Map of key with access to the bucket, and what kind of access they give
pub authorized_keys: crdt::Map<String, BucketKeyPerm>,
/// Whether this bucket is allowed for website access
/// (under all of its global alias names)
pub website_access: crdt::Lww<bool>,
/// The website configuration XML document
/// (under all of its global alias names),
/// and if so, the website configuration XML document
pub website_config: crdt::Lww<Option<ByteBuf>>,
/// Map of aliases that are or have been given to this bucket
/// in the global namespace
@ -50,7 +51,6 @@ impl BucketParams {
BucketParams {
creation_date: now_msec(),
authorized_keys: crdt::Map::new(),
website_access: crdt::Lww::new(false),
website_config: crdt::Lww::new(None),
aliases: crdt::LwwMap::new(),
local_aliases: crdt::LwwMap::new(),
@ -61,7 +61,6 @@ impl BucketParams {
impl Crdt for BucketParams {
fn merge(&mut self, o: &Self) {
self.authorized_keys.merge(&o.authorized_keys);
self.website_access.merge(&o.website_access);
self.website_config.merge(&o.website_config);
self.aliases.merge(&o.aliases);
self.local_aliases.merge(&o.local_aliases);

View file

@ -190,7 +190,7 @@ impl TableSchema for KeyTable {
local_aliases: crdt::LwwMap::new(),
})
};
let name = crdt::Lww::migrate_from_raw(old_k.name.timestamp(), old_k.name.get().clone());
let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone());
Some(Key {
key_id: old_k.key_id,
secret_key: old_k.secret_key,

View file

@ -1,5 +1,7 @@
use std::sync::Arc;
use serde_bytes::ByteBuf;
use garage_table::util::EmptyKey;
use garage_util::crdt::*;
use garage_util::data::*;
@ -38,6 +40,16 @@ impl Migrate {
old_bucket: &old_bucket::Bucket,
old_bucket_p: &old_bucket::BucketParams,
) -> Result<(), Error> {
let bucket_id = blake2sum(old_bucket.name.as_bytes());
let new_name = if is_valid_bucket_name(&old_bucket.name) {
old_bucket.name.clone()
} else {
// if old bucket name was not valid, replace it by
// a hex-encoded name derived from its identifier
hex::encode(&bucket_id.as_slice()[..16])
};
let mut new_ak = Map::new();
for (k, ts, perm) in old_bucket_p.authorized_keys.items().iter() {
new_ak.put(
@ -52,27 +64,27 @@ impl Migrate {
}
let mut aliases = LwwMap::new();
aliases.update_in_place(old_bucket.name.clone(), true);
aliases.update_in_place(new_name.clone(), true);
let website = if *old_bucket_p.website.get() {
Some(ByteBuf::from(DEFAULT_WEBSITE_CONFIGURATION.to_vec()))
} else {
None
};
let new_bucket = Bucket {
id: blake2sum(old_bucket.name.as_bytes()),
id: bucket_id,
state: Deletable::Present(BucketParams {
creation_date: now_msec(),
authorized_keys: new_ak.clone(),
website_access: Lww::new(*old_bucket_p.website.get()),
website_config: Lww::new(None),
website_config: Lww::new(website),
aliases,
local_aliases: LwwMap::new(),
}),
};
self.garage.bucket_table.insert(&new_bucket).await?;
let new_alias = BucketAlias {
name: old_bucket.name.clone(),
state: Lww::new(Deletable::Present(AliasParams {
bucket_id: new_bucket.id,
})),
};
let new_alias = BucketAlias::new(new_name.clone(), new_bucket.id).unwrap();
self.garage.bucket_alias_table.insert(&new_alias).await?;
for (k, perm) in new_ak.items().iter() {

View file

@ -34,6 +34,9 @@ impl Crdt for BucketKeyPerm {
if !other.allow_write {
self.allow_write = false;
}
if !other.allow_owner {
self.allow_owner = false;
}
}
_ => (),
}

View file

@ -61,7 +61,7 @@ where
///
/// Compared to new, the CRDT's timestamp is not set to now
/// but must be set to the previous, non-compatible, CRDT's timestamp.
pub fn migrate_from_raw(ts: u64, value: T) -> Self {
pub fn raw(ts: u64, value: T) -> Self {
Self { ts, v: value }
}
@ -77,6 +77,11 @@ where
self.v = new_value;
}
/// Get the timestamp currently associated with the value
pub fn timestamp(&self) -> u64 {
self.ts
}
/// Get the CRDT value
pub fn get(&self) -> &T {
&self.v

View file

@ -37,11 +37,12 @@ where
pub fn new() -> Self {
Self { vals: vec![] }
}
/// Used to migrate from a map defined in an incompatible format. This produces
/// a map that contains a single item with the specified timestamp (copied from
/// the incompatible format). Do this as many times as you have items to migrate,
/// and put them all together using the CRDT merge operator.
pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
pub fn raw_item(k: K, ts: u64, v: V) -> Self {
Self {
vals: vec![(k, ts, v)],
}
@ -74,9 +75,37 @@ where
Self { vals: new_vals }
}
/// Updates a value in place in the map (this generates
/// a new timestamp)
pub fn update_in_place(&mut self, k: K, new_v: V) {
self.merge(&self.update_mutator(k, new_v));
}
/// Updates a value in place in the map, from a
/// (key, timestamp, value) triple, only if the given
/// timestamp is larger than the timestamp currently
/// in the map
pub fn merge_raw(&mut self, k: &K, ts2: u64, v2: &V) {
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) {
Ok(i) => {
let (_, ts1, _v1) = &self.vals[i];
match ts2.cmp(ts1) {
Ordering::Greater => {
self.vals[i].1 = ts2;
self.vals[i].2 = v2.clone();
}
Ordering::Equal => {
self.vals[i].2.merge(v2);
}
Ordering::Less => (),
}
}
Err(i) => {
self.vals.insert(i, (k.clone(), ts2, v2.clone()));
}
}
}
/// Takes all of the values of the map and returns them. The current map is reset to the
/// empty map. This is very usefull to produce in-place a new map that contains only a delta
/// that modifies a certain value:
@ -103,10 +132,12 @@ where
let vals = std::mem::take(&mut self.vals);
Self { vals }
}
/// Removes all values from the map
pub fn clear(&mut self) {
self.vals.clear();
}
/// Get a reference to the value assigned to a key
pub fn get(&self, k: &K) -> Option<&V> {
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) {
@ -114,6 +145,16 @@ where
Err(_) => None,
}
}
/// Get the timestamp of the value assigned to a key, or 0 if
/// no value is assigned
pub fn get_timestamp(&self, k: &K) -> u64 {
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) {
Ok(i) => self.vals[i].1,
Err(_) => 0,
}
}
/// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
/// In most case you will want to ignore the timestamp (second item of the tuple).
pub fn items(&self) -> &[(K, u64, V)] {
@ -138,24 +179,7 @@ where
{
fn merge(&mut self, other: &Self) {
for (k, ts2, v2) in other.vals.iter() {
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) {
Ok(i) => {
let (_, ts1, _v1) = &self.vals[i];
match ts2.cmp(ts1) {
Ordering::Greater => {
self.vals[i].1 = *ts2;
self.vals[i].2 = v2.clone();
}
Ordering::Equal => {
self.vals[i].2.merge(v2);
}
Ordering::Less => (),
}
}
Err(i) => {
self.vals.insert(i, (k.clone(), *ts2, v2.clone()));
}
}
self.merge_raw(k, *ts2, v2);
}
}
}

View file

@ -15,6 +15,11 @@ pub fn increment_logical_clock(prev: u64) -> u64 {
std::cmp::max(prev + 1, now_msec())
}
/// Increment two logical clocks
pub fn increment_logical_clock_2(prev: u64, prev2: u64) -> u64 {
std::cmp::max(prev2 + 1, std::cmp::max(prev + 1, now_msec()))
}
/// Convert a timestamp represented as milliseconds since UNIX Epoch to
/// its RFC3339 representation, such as "2021-01-01T12:30:00Z"
pub fn msec_to_rfc3339(msecs: u64) -> String {

View file

@ -99,7 +99,7 @@ async fn serve_file(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<
.filter(|b| {
b.state
.as_option()
.map(|x| *x.website_access.get())
.map(|x| x.website_config.get().is_some())
.unwrap_or(false)
})
.ok_or(Error::NotFound)?;