Actually distribute counters over nodes
This commit is contained in:
parent
c054de43dd
commit
180e7fef0a
6 changed files with 18 additions and 69 deletions
|
@ -33,28 +33,10 @@ pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<Body>,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let ring = garage.system.ring.borrow().clone();
|
|
||||||
let counters = garage
|
|
||||||
.object_counter_table
|
|
||||||
.table
|
|
||||||
.get_range(
|
|
||||||
&EmptyKey,
|
|
||||||
None,
|
|
||||||
Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())),
|
|
||||||
15000,
|
|
||||||
EnumerationOrder::Forward,
|
|
||||||
)
|
|
||||||
.await?
|
|
||||||
.iter()
|
|
||||||
.map(|x| (x.sk, x.filtered_values(&ring)))
|
|
||||||
.collect::<HashMap<_, _>>();
|
|
||||||
|
|
||||||
let res = buckets
|
let res = buckets
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|b| {
|
.map(|b| {
|
||||||
let state = b.state.as_option().unwrap();
|
let state = b.state.as_option().unwrap();
|
||||||
let empty_cnts = HashMap::new();
|
|
||||||
let cnts = counters.get(&b.id).unwrap_or(&empty_cnts);
|
|
||||||
ListBucketResultItem {
|
ListBucketResultItem {
|
||||||
id: hex::encode(b.id),
|
id: hex::encode(b.id),
|
||||||
global_aliases: state
|
global_aliases: state
|
||||||
|
@ -74,9 +56,6 @@ pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<Body>,
|
||||||
alias: n.to_string(),
|
alias: n.to_string(),
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>(),
|
.collect::<Vec<_>>(),
|
||||||
objects: cnts.get(OBJECTS).cloned().unwrap_or_default(),
|
|
||||||
bytes: cnts.get(BYTES).cloned().unwrap_or_default(),
|
|
||||||
unfinshed_uploads: cnts.get(UNFINISHED_UPLOADS).cloned().unwrap_or_default(),
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
@ -90,9 +69,6 @@ struct ListBucketResultItem {
|
||||||
id: String,
|
id: String,
|
||||||
global_aliases: Vec<String>,
|
global_aliases: Vec<String>,
|
||||||
local_aliases: Vec<BucketLocalAlias>,
|
local_aliases: Vec<BucketLocalAlias>,
|
||||||
objects: i64,
|
|
||||||
bytes: i64,
|
|
||||||
unfinshed_uploads: i64,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
|
@ -143,7 +119,7 @@ async fn bucket_info_results(
|
||||||
let counters = garage
|
let counters = garage
|
||||||
.object_counter_table
|
.object_counter_table
|
||||||
.table
|
.table
|
||||||
.get(&EmptyKey, &bucket_id)
|
.get(&bucket_id, &EmptyKey)
|
||||||
.await?
|
.await?
|
||||||
.map(|x| x.filtered_values(&garage.system.ring.borrow()))
|
.map(|x| x.filtered_values(&garage.system.ring.borrow()))
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
|
@ -226,7 +226,7 @@ async fn check_quotas(
|
||||||
let key = key.to_string();
|
let key = key.to_string();
|
||||||
let (prev_object, counters) = futures::try_join!(
|
let (prev_object, counters) = futures::try_join!(
|
||||||
garage.object_table.get(&bucket.id, &key),
|
garage.object_table.get(&bucket.id, &key),
|
||||||
garage.object_counter_table.table.get(&EmptyKey, &bucket.id),
|
garage.object_counter_table.table.get(&bucket.id, &EmptyKey),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let counters = counters
|
let counters = counters
|
||||||
|
|
|
@ -39,10 +39,7 @@ pub enum AdminRpc {
|
||||||
|
|
||||||
// Replies
|
// Replies
|
||||||
Ok(String),
|
Ok(String),
|
||||||
BucketList {
|
BucketList(Vec<Bucket>),
|
||||||
buckets: Vec<Bucket>,
|
|
||||||
counters: HashMap<Uuid, HashMap<String, i64>>,
|
|
||||||
},
|
|
||||||
BucketInfo {
|
BucketInfo {
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
relevant_keys: HashMap<String, Key>,
|
relevant_keys: HashMap<String, Key>,
|
||||||
|
@ -97,24 +94,7 @@ impl AdminRpcHandler {
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let ring = self.garage.system.ring.borrow().clone();
|
Ok(AdminRpc::BucketList(buckets))
|
||||||
let counters = self
|
|
||||||
.garage
|
|
||||||
.object_counter_table
|
|
||||||
.table
|
|
||||||
.get_range(
|
|
||||||
&EmptyKey,
|
|
||||||
None,
|
|
||||||
Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())),
|
|
||||||
15000,
|
|
||||||
EnumerationOrder::Forward,
|
|
||||||
)
|
|
||||||
.await?
|
|
||||||
.iter()
|
|
||||||
.map(|x| (x.sk, x.filtered_values(&ring)))
|
|
||||||
.collect::<HashMap<_, _>>();
|
|
||||||
|
|
||||||
Ok(AdminRpc::BucketList { buckets, counters })
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_bucket_info(&self, query: &BucketOpt) -> Result<AdminRpc, Error> {
|
async fn handle_bucket_info(&self, query: &BucketOpt) -> Result<AdminRpc, Error> {
|
||||||
|
@ -135,7 +115,7 @@ impl AdminRpcHandler {
|
||||||
.garage
|
.garage
|
||||||
.object_counter_table
|
.object_counter_table
|
||||||
.table
|
.table
|
||||||
.get(&EmptyKey, &bucket_id)
|
.get(&bucket_id, &EmptyKey)
|
||||||
.await?
|
.await?
|
||||||
.map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
|
.map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
|
@ -166,8 +166,8 @@ pub async fn cmd_admin(
|
||||||
AdminRpc::Ok(msg) => {
|
AdminRpc::Ok(msg) => {
|
||||||
println!("{}", msg);
|
println!("{}", msg);
|
||||||
}
|
}
|
||||||
AdminRpc::BucketList { buckets, counters } => {
|
AdminRpc::BucketList(bl) => {
|
||||||
print_bucket_list(buckets, counters);
|
print_bucket_list(bl);
|
||||||
}
|
}
|
||||||
AdminRpc::BucketInfo {
|
AdminRpc::BucketInfo {
|
||||||
bucket,
|
bucket,
|
||||||
|
|
|
@ -9,11 +9,11 @@ use garage_model::bucket_table::*;
|
||||||
use garage_model::key_table::*;
|
use garage_model::key_table::*;
|
||||||
use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
|
use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
|
||||||
|
|
||||||
pub fn print_bucket_list(buckets: Vec<Bucket>, counters: HashMap<Uuid, HashMap<String, i64>>) {
|
pub fn print_bucket_list(bl: Vec<Bucket>) {
|
||||||
println!("List of buckets:");
|
println!("List of buckets:");
|
||||||
|
|
||||||
let mut table = vec![];
|
let mut table = vec![];
|
||||||
for bucket in buckets {
|
for bucket in bl {
|
||||||
let aliases = bucket
|
let aliases = bucket
|
||||||
.aliases()
|
.aliases()
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -31,18 +31,11 @@ pub fn print_bucket_list(buckets: Vec<Bucket>, counters: HashMap<Uuid, HashMap<S
|
||||||
s => format!("[{} local aliases]", s.len()),
|
s => format!("[{} local aliases]", s.len()),
|
||||||
};
|
};
|
||||||
|
|
||||||
let empty_counters = HashMap::new();
|
|
||||||
let cnt = counters.get(&bucket.id).unwrap_or(&empty_counters);
|
|
||||||
|
|
||||||
table.push(format!(
|
table.push(format!(
|
||||||
"\t{}\t{}\t{}\t{}\t{}\t{}",
|
"\t{}\t{}\t{}",
|
||||||
aliases.join(","),
|
aliases.join(","),
|
||||||
local_aliases_n,
|
local_aliases_n,
|
||||||
hex::encode(bucket.id),
|
hex::encode(bucket.id),
|
||||||
bytesize::ByteSize::b(cnt.get(BYTES).cloned().unwrap_or_default() as u64)
|
|
||||||
.to_string_as(true),
|
|
||||||
cnt.get(OBJECTS).cloned().unwrap_or_default(),
|
|
||||||
cnt.get(UNFINISHED_UPLOADS).cloned().unwrap_or_default(),
|
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
format_table(table);
|
format_table(table);
|
||||||
|
|
|
@ -302,17 +302,17 @@ impl TableSchema for ObjectTable {
|
||||||
impl CountedItem for Object {
|
impl CountedItem for Object {
|
||||||
const COUNTER_TABLE_NAME: &'static str = "bucket_object_counter";
|
const COUNTER_TABLE_NAME: &'static str = "bucket_object_counter";
|
||||||
|
|
||||||
// Partition key = nothing
|
// Partition key = bucket id
|
||||||
type CP = EmptyKey;
|
type CP = Uuid;
|
||||||
// Sort key = bucket id
|
// Sort key = nothing
|
||||||
type CS = Uuid;
|
type CS = EmptyKey;
|
||||||
|
|
||||||
fn counter_partition_key(&self) -> &EmptyKey {
|
fn counter_partition_key(&self) -> &Uuid {
|
||||||
&EmptyKey
|
|
||||||
}
|
|
||||||
fn counter_sort_key(&self) -> &Uuid {
|
|
||||||
&self.bucket_id
|
&self.bucket_id
|
||||||
}
|
}
|
||||||
|
fn counter_sort_key(&self) -> &EmptyKey {
|
||||||
|
&EmptyKey
|
||||||
|
}
|
||||||
|
|
||||||
fn counts(&self) -> Vec<(&'static str, i64)> {
|
fn counts(&self) -> Vec<(&'static str, i64)> {
|
||||||
let versions = self.versions();
|
let versions = self.versions();
|
||||||
|
|
Loading…
Reference in a new issue