Fix #204 (full Multipart Uploads semantics) #553

Merged
lx merged 20 commits from nlnet-task1 into next 2023-06-09 15:34:10 +00:00
10 changed files with 64 additions and 24 deletions
Showing only changes of commit 511e07ecd4 - Show all commits

View file

@ -535,7 +535,10 @@ Example response:
], ],
"objects": 14827, "objects": 14827,
"bytes": 13189855625, "bytes": 13189855625,
"unfinshedUploads": 0, "unfinishedUploads": 1,
"unfinishedMultipartUploads": 1,
"unfinishedMultipartUploadParts": 11,
"unfinishedMultipartUploadBytes": 41943040,
"quotas": { "quotas": {
"maxSize": null, "maxSize": null,
"maxObjects": null "maxObjects": null

View file

@ -14,6 +14,7 @@ use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*; use garage_model::bucket_table::*;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::permission::*; use garage_model::permission::*;
use garage_model::s3::mpu_table;
use garage_model::s3::object_table::*; use garage_model::s3::object_table::*;
use crate::admin::error::*; use crate::admin::error::*;
@ -124,6 +125,14 @@ async fn bucket_info_results(
.map(|x| x.filtered_values(&garage.system.ring.borrow())) .map(|x| x.filtered_values(&garage.system.ring.borrow()))
.unwrap_or_default(); .unwrap_or_default();
let mpu_counters = garage
.mpu_counter_table
.table
.get(&bucket_id, &EmptyKey)
.await?
.map(|x| x.filtered_values(&garage.system.ring.borrow()))
.unwrap_or_default();
let mut relevant_keys = HashMap::new(); let mut relevant_keys = HashMap::new();
for (k, _) in bucket for (k, _) in bucket
.state .state
@ -208,12 +217,12 @@ async fn bucket_info_results(
} }
}) })
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
objects: counters.get(OBJECTS).cloned().unwrap_or_default(), objects: *counters.get(OBJECTS).unwrap_or(&0),
bytes: counters.get(BYTES).cloned().unwrap_or_default(), bytes: *counters.get(BYTES).unwrap_or(&0),
unfinished_uploads: counters unfinished_uploads: *counters.get(UNFINISHED_UPLOADS).unwrap_or(&0),
.get(UNFINISHED_UPLOADS) unfinished_multipart_uploads: *mpu_counters.get(mpu_table::UPLOADS).unwrap_or(&0),
.cloned() unfinished_multipart_upload_parts: *mpu_counters.get(mpu_table::PARTS).unwrap_or(&0),
.unwrap_or_default(), unfinished_multipart_upload_bytes: *mpu_counters.get(mpu_table::BYTES).unwrap_or(&0),
quotas: ApiBucketQuotas { quotas: ApiBucketQuotas {
max_size: quotas.max_size, max_size: quotas.max_size,
max_objects: quotas.max_objects, max_objects: quotas.max_objects,
@ -235,6 +244,9 @@ struct GetBucketInfoResult {
objects: i64, objects: i64,
bytes: i64, bytes: i64,
unfinished_uploads: i64, unfinished_uploads: i64,
unfinished_multipart_uploads: i64,
unfinished_multipart_upload_parts: i64,
unfinished_multipart_upload_bytes: i64,
quotas: ApiBucketQuotas, quotas: ApiBucketQuotas,
} }

View file

@ -73,6 +73,15 @@ impl AdminRpcHandler {
.map(|x| x.filtered_values(&self.garage.system.ring.borrow())) .map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
.unwrap_or_default(); .unwrap_or_default();
let mpu_counters = self
.garage
.mpu_counter_table
.table
.get(&bucket_id, &EmptyKey)
.await?
.map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
.unwrap_or_default();
let mut relevant_keys = HashMap::new(); let mut relevant_keys = HashMap::new();
for (k, _) in bucket for (k, _) in bucket
.state .state
@ -112,6 +121,7 @@ impl AdminRpcHandler {
bucket, bucket,
relevant_keys, relevant_keys,
counters, counters,
mpu_counters,
}) })
} }

View file

@ -53,6 +53,7 @@ pub enum AdminRpc {
bucket: Bucket, bucket: Bucket,
relevant_keys: HashMap<String, Key>, relevant_keys: HashMap<String, Key>,
counters: HashMap<String, i64>, counters: HashMap<String, i64>,
mpu_counters: HashMap<String, i64>,
}, },
KeyList(Vec<(String, String)>), KeyList(Vec<(String, String)>),
KeyInfo(Key, HashMap<Uuid, Bucket>), KeyInfo(Key, HashMap<Uuid, Bucket>),

View file

@ -190,8 +190,9 @@ pub async fn cmd_admin(
bucket, bucket,
relevant_keys, relevant_keys,
counters, counters,
mpu_counters,
} => { } => {
print_bucket_info(&bucket, &relevant_keys, &counters); print_bucket_info(&bucket, &relevant_keys, &counters, &mpu_counters);
} }
AdminRpc::KeyList(kl) => { AdminRpc::KeyList(kl) => {
print_key_list(kl); print_key_list(kl);

View file

@ -443,22 +443,22 @@ pub struct RepairOpt {
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum RepairWhat { pub enum RepairWhat {
/// Only do a full sync of metadata tables /// Do a full sync of metadata tables
#[structopt(name = "tables", version = garage_version())] #[structopt(name = "tables", version = garage_version())]
Tables, Tables,
/// Only repair (resync/rebalance) the set of stored blocks /// Repair (resync/rebalance) the set of stored blocks
#[structopt(name = "blocks", version = garage_version())] #[structopt(name = "blocks", version = garage_version())]
Blocks, Blocks,
/// Only redo the propagation of object deletions to the version table (slow) /// Repropagate object deletions to the version table
#[structopt(name = "versions", version = garage_version())] #[structopt(name = "versions", version = garage_version())]
Versions, Versions,
/// Only redo the propagation of object deletions to the multipart upload table (slow) /// Repropagate object deletions to the multipart upload table
#[structopt(name = "mpu", version = garage_version())] #[structopt(name = "mpu", version = garage_version())]
MultipartUploads, MultipartUploads,
/// Only redo the propagation of version deletions to the block ref table (extremely slow) /// Repropagate version deletions to the block ref table
#[structopt(name = "block_refs", version = garage_version())] #[structopt(name = "block_refs", version = garage_version())]
BlockRefs, BlockRefs,
/// Verify integrity of all blocks on disc (extremely slow, i/o intensive) /// Verify integrity of all blocks on disc
#[structopt(name = "scrub", version = garage_version())] #[structopt(name = "scrub", version = garage_version())]
Scrub { Scrub {
#[structopt(subcommand)] #[structopt(subcommand)]

View file

@ -12,8 +12,8 @@ use garage_block::manager::BlockResyncErrorInfo;
use garage_model::bucket_table::*; use garage_model::bucket_table::*;
use garage_model::key_table::*; use garage_model::key_table::*;
use garage_model::s3::mpu_table::MultipartUpload; use garage_model::s3::mpu_table::{self, MultipartUpload};
use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; use garage_model::s3::object_table;
use garage_model::s3::version_table::*; use garage_model::s3::version_table::*;
use crate::cli::structs::WorkerListOpt; use crate::cli::structs::WorkerListOpt;
@ -136,6 +136,7 @@ pub fn print_bucket_info(
bucket: &Bucket, bucket: &Bucket,
relevant_keys: &HashMap<String, Key>, relevant_keys: &HashMap<String, Key>,
counters: &HashMap<String, i64>, counters: &HashMap<String, i64>,
mpu_counters: &HashMap<String, i64>,
) { ) {
let key_name = |k| { let key_name = |k| {
relevant_keys relevant_keys
@ -149,7 +150,7 @@ pub fn print_bucket_info(
Deletable::Deleted => println!("Bucket is deleted."), Deletable::Deleted => println!("Bucket is deleted."),
Deletable::Present(p) => { Deletable::Present(p) => {
let size = let size =
bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64); bytesize::ByteSize::b(*counters.get(object_table::BYTES).unwrap_or(&0) as u64);
println!( println!(
"\nSize: {} ({})", "\nSize: {} ({})",
size.to_string_as(true), size.to_string_as(true),
@ -157,14 +158,22 @@ pub fn print_bucket_info(
); );
println!( println!(
"Objects: {}", "Objects: {}",
counters.get(OBJECTS).cloned().unwrap_or_default() *counters.get(object_table::OBJECTS).unwrap_or(&0)
);
println!(
"Unfinished uploads (multipart and non-multipart): {}",
*counters.get(object_table::UNFINISHED_UPLOADS).unwrap_or(&0)
); );
println!( println!(
"Unfinished multipart uploads: {}", "Unfinished multipart uploads: {}",
counters *mpu_counters.get(mpu_table::UPLOADS).unwrap_or(&0)
.get(UNFINISHED_UPLOADS) );
.cloned() let mpu_size =
.unwrap_or_default() bytesize::ByteSize::b(*mpu_counters.get(mpu_table::BYTES).unwrap_or(&0) as u64);
println!(
"Size of unfinished multipart uploads: {} ({})",
mpu_size.to_string_as(true),
mpu_size.to_string_as(false),
); );
println!("\nWebsite access: {}", p.website_config.get().is_some()); println!("\nWebsite access: {}", p.website_config.get().is_some());

View file

@ -335,6 +335,8 @@ impl Garage {
self.object_table.spawn_workers(bg); self.object_table.spawn_workers(bg);
self.object_counter_table.spawn_workers(bg); self.object_counter_table.spawn_workers(bg);
self.mpu_table.spawn_workers(bg);
self.mpu_counter_table.spawn_workers(bg);
self.version_table.spawn_workers(bg); self.version_table.spawn_workers(bg);
self.block_ref_table.spawn_workers(bg); self.block_ref_table.spawn_workers(bg);

View file

@ -478,7 +478,9 @@ impl<'a> BucketHelper<'a> {
// ---- // ----
/// Deletes all incomplete multipart uploads that are older than a certain time. /// Deletes all incomplete multipart uploads that are older than a certain time.
/// Returns the number of uploads aborted /// Returns the number of uploads aborted.
/// This will also include non-multipart uploads, which may be lingering
/// after a node crash
pub async fn cleanup_incomplete_uploads( pub async fn cleanup_incomplete_uploads(
&self, &self,
bucket_id: &Uuid, bucket_id: &Uuid,

View file

@ -208,7 +208,7 @@ impl TableSchema for MultipartUploadTable {
} }
impl CountedItem for MultipartUpload { impl CountedItem for MultipartUpload {
const COUNTER_TABLE_NAME: &'static str = "bucket_mpu_part_counter"; const COUNTER_TABLE_NAME: &'static str = "bucket_mpu_counter";
// Partition key = bucket id // Partition key = bucket id
type CP = Uuid; type CP = Uuid;