Compare commits

..

No commits in common. "2128b5febd97eb6c95e3bdd380dcba39b860c0e3" and "1ace34adbb05bb10cf7a2c8d0d2b84769ca797df" have entirely different histories.

11 changed files with 400 additions and 21 deletions

View file

@ -27,6 +27,7 @@ use garage_model::bucket_table::*;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::helper::error::{Error, OkOrBadRequest}; use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*; use garage_model::key_table::*;
use garage_model::migrate::Migrate;
use garage_model::s3::mpu_table::MultipartUpload; use garage_model::s3::mpu_table::MultipartUpload;
use garage_model::s3::version_table::Version; use garage_model::s3::version_table::Version;
@ -41,6 +42,7 @@ pub enum AdminRpc {
BucketOperation(BucketOperation), BucketOperation(BucketOperation),
KeyOperation(KeyOperation), KeyOperation(KeyOperation),
LaunchRepair(RepairOpt), LaunchRepair(RepairOpt),
Migrate(MigrateOpt),
Stats(StatsOpt), Stats(StatsOpt),
Worker(WorkerOperation), Worker(WorkerOperation),
BlockOperation(BlockOperation), BlockOperation(BlockOperation),
@ -93,6 +95,24 @@ impl AdminRpcHandler {
admin admin
} }
// ================ MIGRATION COMMANDS ====================
async fn handle_migrate(self: &Arc<Self>, opt: MigrateOpt) -> Result<AdminRpc, Error> {
if !opt.yes {
return Err(Error::BadRequest(
"Please provide the --yes flag to initiate migration operation.".to_string(),
));
}
let m = Migrate {
garage: self.garage.clone(),
};
match opt.what {
MigrateWhat::Buckets050 => m.migrate_buckets050().await,
}?;
Ok(AdminRpc::Ok("Migration successfull.".into()))
}
// ================ REPAIR COMMANDS ==================== // ================ REPAIR COMMANDS ====================
async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> { async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
@ -510,6 +530,7 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
match message { match message {
AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await, AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await, AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await,
AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await, AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,

View file

@ -33,6 +33,9 @@ pub async fn cli_command_dispatch(
Command::Key(ko) => { Command::Key(ko) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::KeyOperation(ko)).await cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::KeyOperation(ko)).await
} }
Command::Migrate(mo) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Migrate(mo)).await
}
Command::Repair(ro) => { Command::Repair(ro) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await
} }

View file

@ -31,6 +31,11 @@ pub enum Command {
#[structopt(name = "key", version = garage_version())] #[structopt(name = "key", version = garage_version())]
Key(KeyOperation), Key(KeyOperation),
/// Run migrations from previous Garage version
/// (DO NOT USE WITHOUT READING FULL DOCUMENTATION)
#[structopt(name = "migrate", version = garage_version())]
Migrate(MigrateOpt),
/// Start repair of node data on remote node /// Start repair of node data on remote node
#[structopt(name = "repair", version = garage_version())] #[structopt(name = "repair", version = garage_version())]
Repair(RepairOpt), Repair(RepairOpt),
@ -440,6 +445,23 @@ pub struct KeyImportOpt {
pub yes: bool, pub yes: bool,
} }
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct MigrateOpt {
/// Confirm the launch of the migrate operation
#[structopt(long = "yes")]
pub yes: bool,
#[structopt(subcommand)]
pub what: MigrateWhat,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum MigrateWhat {
/// Migrate buckets and permissions from v0.5.0
#[structopt(name = "buckets050", version = garage_version())]
Buckets050,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] #[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct RepairOpt { pub struct RepairOpt {
/// Launch repair operation on all nodes /// Launch repair operation on all nodes

View file

@ -7,7 +7,48 @@ use garage_table::{DeletedFilter, EmptyKey, Entry, TableSchema};
use crate::permission::BucketKeyPerm; use crate::permission::BucketKeyPerm;
pub(crate) mod v05 {
use garage_util::crdt;
use serde::{Deserialize, Serialize};
/// An api key
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Key {
/// The id of the key (immutable), used as partition key
pub key_id: String,
/// The secret_key associated
pub secret_key: String,
/// Name for the key
pub name: crdt::Lww<String>,
/// Is the key deleted
pub deleted: crdt::Bool,
/// Buckets in which the key is authorized. Empty if `Key` is deleted
// CRDT interaction: deleted implies authorized_buckets is empty
pub authorized_buckets: crdt::LwwMap<String, PermissionSet>,
}
/// Permission given to a key in a bucket
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct PermissionSet {
/// The key can be used to read the bucket
pub allow_read: bool,
/// The key can be used to write in the bucket
pub allow_write: bool,
}
impl crdt::AutoCrdt for PermissionSet {
const WARN_IF_DIFFERENT: bool = true;
}
impl garage_util::migrate::InitialFormat for Key {}
}
mod v08 { mod v08 {
use super::v05;
use crate::permission::BucketKeyPerm; use crate::permission::BucketKeyPerm;
use garage_util::crdt; use garage_util::crdt;
use garage_util::data::Uuid; use garage_util::data::Uuid;
@ -45,7 +86,32 @@ mod v08 {
pub local_aliases: crdt::LwwMap<String, Option<Uuid>>, pub local_aliases: crdt::LwwMap<String, Option<Uuid>>,
} }
impl garage_util::migrate::InitialFormat for Key {} impl garage_util::migrate::Migrate for Key {
type Previous = v05::Key;
fn migrate(old_k: v05::Key) -> Key {
let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone());
let state = if old_k.deleted.get() {
crdt::Deletable::Deleted
} else {
// Authorized buckets is ignored here,
// migration is performed in specific migration code in
// garage/migrate.rs
crdt::Deletable::Present(KeyParams {
secret_key: old_k.secret_key,
name,
allow_create_bucket: crdt::Lww::new(false),
authorized_buckets: crdt::Map::new(),
local_aliases: crdt::LwwMap::new(),
})
};
Key {
key_id: old_k.key_id,
state,
}
}
}
} }
pub use v08::*; pub use v08::*;

View file

@ -1,6 +1,9 @@
#[macro_use] #[macro_use]
extern crate tracing; extern crate tracing;
// For migration from previous versions
pub(crate) mod prev;
pub mod permission; pub mod permission;
pub mod index_counter; pub mod index_counter;
@ -15,3 +18,4 @@ pub mod s3;
pub mod garage; pub mod garage;
pub mod helper; pub mod helper;
pub mod migrate;

108
src/model/migrate.rs Normal file
View file

@ -0,0 +1,108 @@
use std::sync::Arc;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::encode::nonversioned_decode;
use garage_util::error::Error as GarageError;
use garage_util::time::*;
use crate::prev::v051::bucket_table as old_bucket;
use crate::bucket_alias_table::*;
use crate::bucket_table::*;
use crate::garage::Garage;
use crate::helper::error::*;
use crate::permission::*;
pub struct Migrate {
pub garage: Arc<Garage>,
}
impl Migrate {
pub async fn migrate_buckets050(&self) -> Result<(), Error> {
let tree = self
.garage
.db
.open_tree("bucket:table")
.map_err(GarageError::from)?;
let mut old_buckets = vec![];
for res in tree.iter().map_err(GarageError::from)? {
let (_k, v) = res.map_err(GarageError::from)?;
let bucket =
nonversioned_decode::<old_bucket::Bucket>(&v[..]).map_err(GarageError::from)?;
old_buckets.push(bucket);
}
for bucket in old_buckets {
if let old_bucket::BucketState::Present(p) = bucket.state.get() {
self.migrate_buckets050_do_bucket(&bucket, p).await?;
}
}
Ok(())
}
pub async fn migrate_buckets050_do_bucket(
&self,
old_bucket: &old_bucket::Bucket,
old_bucket_p: &old_bucket::BucketParams,
) -> Result<(), Error> {
let bucket_id = blake2sum(old_bucket.name.as_bytes());
let new_name = if is_valid_bucket_name(&old_bucket.name) {
old_bucket.name.clone()
} else {
// if old bucket name was not valid, replace it by
// a hex-encoded name derived from its identifier
hex::encode(&bucket_id.as_slice()[..16])
};
let website = if *old_bucket_p.website.get() {
Some(WebsiteConfig {
index_document: "index.html".into(),
error_document: None,
})
} else {
None
};
let helper = self.garage.locked_helper().await;
self.garage
.bucket_table
.insert(&Bucket {
id: bucket_id,
state: Deletable::Present(BucketParams {
creation_date: now_msec(),
authorized_keys: Map::new(),
aliases: LwwMap::new(),
local_aliases: LwwMap::new(),
website_config: Lww::new(website),
cors_config: Lww::new(None),
lifecycle_config: Lww::new(None),
quotas: Lww::new(Default::default()),
}),
})
.await?;
helper.set_global_bucket_alias(bucket_id, &new_name).await?;
for (k, ts, perm) in old_bucket_p.authorized_keys.items().iter() {
helper
.set_bucket_key_permissions(
bucket_id,
k,
BucketKeyPerm {
timestamp: *ts,
allow_read: perm.allow_read,
allow_write: perm.allow_write,
allow_owner: false,
},
)
.await?;
}
Ok(())
}
}

1
src/model/prev/mod.rs Normal file
View file

@ -0,0 +1 @@
pub(crate) mod v051;

View file

@ -0,0 +1,63 @@
use serde::{Deserialize, Serialize};
use garage_table::crdt::Crdt;
use garage_table::*;
use crate::key_table::v05::PermissionSet;
/// A bucket is a collection of objects
///
/// Its parameters are not directly accessible as:
/// - It must be possible to merge paramaters, hence the use of a LWW CRDT.
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket {
/// Name of the bucket
pub name: String,
/// State, and configuration if not deleted, of the bucket
pub state: crdt::Lww<BucketState>,
}
/// State of a bucket
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum BucketState {
/// The bucket is deleted
Deleted,
/// The bucket exists
Present(BucketParams),
}
impl Crdt for BucketState {
fn merge(&mut self, o: &Self) {
match o {
BucketState::Deleted => *self = BucketState::Deleted,
BucketState::Present(other_params) => {
if let BucketState::Present(params) = self {
params.merge(other_params);
}
}
}
}
}
/// Configuration for a bucket
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketParams {
/// Map of key with access to the bucket, and what kind of access they give
pub authorized_keys: crdt::LwwMap<String, PermissionSet>,
/// Is the bucket served as http
pub website: crdt::Lww<bool>,
}
impl Crdt for BucketParams {
fn merge(&mut self, o: &Self) {
self.authorized_keys.merge(&o.authorized_keys);
self.website.merge(&o.website);
}
}
impl Crdt for Bucket {
fn merge(&mut self, other: &Self) {
self.state.merge(&other.state);
}
}

View file

@ -0,0 +1 @@
pub(crate) mod bucket_table;

View file

@ -17,7 +17,7 @@ pub const OBJECTS: &str = "objects";
pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads";
pub const BYTES: &str = "bytes"; pub const BYTES: &str = "bytes";
mod v08 { mod v05 {
use garage_util::data::{Hash, Uuid}; use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::BTreeMap; use std::collections::BTreeMap;
@ -26,7 +26,7 @@ mod v08 {
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Object { pub struct Object {
/// The bucket in which the object is stored, used as partition key /// The bucket in which the object is stored, used as partition key
pub bucket_id: Uuid, pub bucket: String,
/// The key at which the object is stored in its bucket, used as sorting key /// The key at which the object is stored in its bucket, used as sorting key
pub key: String, pub key: String,
@ -92,6 +92,45 @@ mod v08 {
impl garage_util::migrate::InitialFormat for Object {} impl garage_util::migrate::InitialFormat for Object {}
} }
mod v08 {
use garage_util::data::Uuid;
use serde::{Deserialize, Serialize};
use super::v05;
pub use v05::{
ObjectVersion, ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta,
ObjectVersionState,
};
/// An object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
/// The bucket in which the object is stored, used as partition key
pub bucket_id: Uuid,
/// The key at which the object is stored in its bucket, used as sorting key
pub key: String,
/// The list of currenty stored versions of the object
pub(super) versions: Vec<ObjectVersion>,
}
impl garage_util::migrate::Migrate for Object {
type Previous = v05::Object;
fn migrate(old: v05::Object) -> Object {
use garage_util::data::blake2sum;
Object {
bucket_id: blake2sum(old.bucket.as_bytes()),
key: old.key,
versions: old.versions,
}
}
}
}
mod v09 { mod v09 {
use garage_util::data::Uuid; use garage_util::data::Uuid;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};

View file

@ -11,11 +11,65 @@ use garage_table::*;
use crate::s3::block_ref_table::*; use crate::s3::block_ref_table::*;
mod v08 { mod v05 {
use garage_util::crdt; use garage_util::crdt;
use garage_util::data::{Hash, Uuid}; use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// A version of an object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
/// UUID of the version, used as partition key
pub uuid: Uuid,
// Actual data: the blocks for this version
// In the case of a multipart upload, also store the etags
// of individual parts and check them when doing CompleteMultipartUpload
/// Is this version deleted
pub deleted: crdt::Bool,
/// list of blocks of data composing the version
pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
/// Etag of each part in case of a multipart upload, empty otherwise
pub parts_etags: crdt::Map<u64, String>,
// Back link to bucket+key so that we can figure if
// this was deleted later on
/// Bucket in which the related object is stored
pub bucket: String,
/// Key in which the related object is stored
pub key: String,
}
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlockKey {
/// Number of the part
pub part_number: u64,
/// Offset of this sub-segment in its part as sent by the client
/// (before any kind of compression or encryption)
pub offset: u64,
}
/// Informations about a single block
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlock {
/// Blake2 sum of the block
pub hash: Hash,
/// Size of the block, before any kind of compression or encryption
pub size: u64,
}
impl garage_util::migrate::InitialFormat for Version {}
}
mod v08 {
use garage_util::crdt;
use garage_util::data::Uuid;
use serde::{Deserialize, Serialize};
use super::v05;
pub use v05::{VersionBlock, VersionBlockKey};
/// A version of an object /// A version of an object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Version { pub struct Version {
@ -40,25 +94,22 @@ mod v08 {
pub key: String, pub key: String,
} }
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] impl garage_util::migrate::Migrate for Version {
pub struct VersionBlockKey { type Previous = v05::Version;
/// Number of the part
pub part_number: u64,
/// Offset of this sub-segment in its part as sent by the client
/// (before any kind of compression or encryption)
pub offset: u64,
}
/// Informations about a single block fn migrate(old: v05::Version) -> Version {
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] use garage_util::data::blake2sum;
pub struct VersionBlock {
/// Blake2 sum of the block
pub hash: Hash,
/// Size of the block, before any kind of compression or encryption
pub size: u64,
}
impl garage_util::migrate::InitialFormat for Version {} Version {
uuid: old.uuid,
deleted: old.deleted,
blocks: old.blocks,
parts_etags: old.parts_etags,
bucket_id: blake2sum(old.bucket.as_bytes()),
key: old.key,
}
}
}
} }
pub(crate) mod v09 { pub(crate) mod v09 {