improve internal item counter mechanisms and implement bucket quotas (#326)
All checks were successful
continuous-integration/drone/push Build is passing

- [x] Refactoring of internal counting API
- [x] Repair procedure for counters (it's an offline procedure!!!)
- [x] New counter for objects in buckets
- [x] Add quotas to buckets struct
- [x] Add CLI to manage bucket quotas
- [x] Add admin API to manage bucket quotas
- [x] Apply quotas by adding checks on put operations
- [x] Proof-read

Co-authored-by: Alex Auvolat <alex@adnab.me>
Reviewed-on: #326
Co-authored-by: Alex <alex@adnab.me>
Co-committed-by: Alex <alex@adnab.me>
This commit is contained in:
Alex 2022-06-15 20:20:28 +02:00
parent d544a0e0e0
commit 77e3fd6db2
32 changed files with 962 additions and 347 deletions

7
Cargo.lock generated
View file

@ -387,6 +387,12 @@ dependencies = [
"either",
]
[[package]]
name = "bytesize"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c58ec36aac5066d5ca17df51b3e70279f5670a72102f5752cb7e7c856adfc70"
[[package]]
name = "cc"
version = "1.0.73"
@ -948,6 +954,7 @@ dependencies = [
"aws-sdk-s3",
"base64",
"bytes 1.1.0",
"bytesize",
"chrono",
"futures",
"futures-util",

View file

@ -564,6 +564,13 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".bytesize."1.1.0" = overridableMkRustCrate (profileName: rec {
name = "bytesize";
version = "1.1.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "6c58ec36aac5066d5ca17df51b3e70279f5670a72102f5752cb7e7c856adfc70"; };
});
"registry+https://github.com/rust-lang/crates.io-index".cc."1.0.73" = overridableMkRustCrate (profileName: rec {
name = "cc";
version = "1.0.73";
@ -732,7 +739,7 @@ in
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b"; };
dependencies = {
${ if hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-apple-darwin" || hostPlatform.config == "aarch64-linux-android" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
${ if hostPlatform.config == "aarch64-linux-android" || hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-apple-darwin" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
};
});
@ -1353,6 +1360,7 @@ in
dependencies = {
async_trait = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; };
bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; };
bytesize = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytesize."1.1.0" { inherit profileName; };
futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; };
futures_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.21" { inherit profileName; };
garage_api = rustPackages."unknown".garage_api."0.7.0" { inherit profileName; };
@ -2718,7 +2726,7 @@ in
[ "os-poll" ]
];
dependencies = {
${ if hostPlatform.parsed.kernel.name == "wasi" || hostPlatform.isUnix then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
${ if hostPlatform.isUnix || hostPlatform.parsed.kernel.name == "wasi" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; };
${ if hostPlatform.isWindows then "miow" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".miow."0.3.7" { inherit profileName; };
${ if hostPlatform.isWindows then "ntapi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".ntapi."0.3.7" { inherit profileName; };
@ -3730,7 +3738,7 @@ in
];
dependencies = {
${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "dragonfly" || hostPlatform.parsed.kernel.name == "freebsd" || hostPlatform.parsed.kernel.name == "illumos" || hostPlatform.parsed.kernel.name == "netbsd" || hostPlatform.parsed.kernel.name == "openbsd" || hostPlatform.parsed.kernel.name == "solaris" then "once_cell" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".once_cell."1.10.0" { inherit profileName; };
${ if hostPlatform.parsed.kernel.name == "dragonfly" || hostPlatform.parsed.kernel.name == "freebsd" || hostPlatform.parsed.kernel.name == "illumos" || hostPlatform.parsed.kernel.name == "netbsd" || hostPlatform.parsed.kernel.name == "openbsd" || hostPlatform.parsed.kernel.name == "solaris" || hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "once_cell" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".once_cell."1.10.0" { inherit profileName; };
${ if hostPlatform.parsed.cpu.name == "i686" || hostPlatform.parsed.cpu.name == "x86_64" || (hostPlatform.parsed.cpu.name == "aarch64" || hostPlatform.parsed.cpu.name == "armv6l" || hostPlatform.parsed.cpu.name == "armv7l") && (hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "fuchsia" || hostPlatform.parsed.kernel.name == "linux") then "spin" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".spin."0.5.2" { inherit profileName; };
untrusted = rustPackages."registry+https://github.com/rust-lang/crates.io-index".untrusted."0.7.1" { inherit profileName; };
${ if hostPlatform.parsed.cpu.name == "wasm32" && hostPlatform.parsed.vendor.name == "unknown" && hostPlatform.parsed.kernel.name == "unknown" && hostPlatform.parsed.abi.name == "" then "web_sys" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".web-sys."0.3.56" { inherit profileName; };
@ -4319,7 +4327,7 @@ in
];
dependencies = {
bitflags = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bitflags."1.3.2" { inherit profileName; };
${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
${ if hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot."0.11.2" { inherit profileName; };
${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot_core" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot_core."0.8.5" { inherit profileName; };
static_init_macro = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".static_init_macro."1.0.2" { profileName = "__noProfile"; };
@ -5325,7 +5333,7 @@ in
${ if hostPlatform.config == "aarch64-uwp-windows-msvc" || hostPlatform.config == "aarch64-pc-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "i686-pc-windows-gnu" || hostPlatform.config == "i686-uwp-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "i686-uwp-windows-msvc" || hostPlatform.config == "i686-pc-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "x86_64-pc-windows-gnu" || hostPlatform.config == "x86_64-uwp-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "x86_64-uwp-windows-gnu" || hostPlatform.config == "x86_64-pc-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "x86_64-uwp-windows-msvc" || hostPlatform.config == "x86_64-pc-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; };
};
});

View file

@ -473,24 +473,31 @@ Example response:
```json
{
"id": "e6a14cd6a27f48684579ec6b381c078ab11697e6bc8513b72b2f5307e25fff9b",
"globalAliases": [
"alex"
],
"id": "afa8f0a22b40b1247ccd0affb869b0af5cff980924a20e4b5e0720a44deb8d39",
"globalAliases": [],
"websiteAccess": false,
"websiteConfig": null,
"keys": [
{
"accessKeyId": "GK31c2f218a2e44f485b94239e",
"name": "alex",
"name": "Imported key",
"permissions": {
"read": true,
"write": true,
"owner": true
},
"bucketLocalAliases": [
"test"
"debug"
]
}
]
],
"objects": 14827,
"bytes": 13189855625,
"unfinshedUploads": 0,
"quotas": {
"maxSize": null,
"maxObjects": null
}
}
```
@ -540,26 +547,37 @@ Deletes a storage bucket. A bucket cannot be deleted if it is not empty.
Warning: this will delete all aliases associated with the bucket!
#### PutBucketWebsite `PUT /v0/bucket/website?id=<bucket id>`
#### UpdateBucket `PUT /v0/bucket?id=<bucket id>`
Sets the website configuration for a bucket (this also enables website access for this bucket).
Updates configuration of the given bucket.
Request body format:
```json
{
"websiteAccess": {
"enabled": true,
"indexDocument": "index.html",
"errorDocument": "404.html"
},
"quotas": {
"maxSize": 19029801,
"maxObjects": null,
}
}
```
The field `errorDocument` is optional, if no error document is set a generic error message is displayed when errors happen.
All fields (`websiteAccess` and `quotas`) are optionnal.
If they are present, the corresponding modifications are applied to the bucket, otherwise nothing is changed.
In `websiteAccess`: if `enabled` is `true`, `indexDocument` must be specified.
The field `errorDocument` is optional, if no error document is set a generic
error message is displayed when errors happen. Conversely, if `enabled` is
`false`, neither `indexDocument` nor `errorDocument` must be specified.
#### DeleteBucketWebsite `DELETE /v0/bucket/website?id=<bucket id>`
Deletes the website configuration for a bucket (disables website access for this bucket).
In `quotas`: new values of `maxSize` and `maxObjects` must both be specified, or set to `null`
to remove the quotas. An absent value will be considered the same as a `null`. It is not possible
to change only one of the two quotas.
### Operations on permissions for keys on buckets

View file

@ -156,12 +156,7 @@ impl ApiHandler for AdminApiServer {
}
Endpoint::CreateBucket => handle_create_bucket(&self.garage, req).await,
Endpoint::DeleteBucket { id } => handle_delete_bucket(&self.garage, id).await,
Endpoint::PutBucketWebsite { id } => {
handle_put_bucket_website(&self.garage, id, req).await
}
Endpoint::DeleteBucketWebsite { id } => {
handle_delete_bucket_website(&self.garage, id).await
}
Endpoint::UpdateBucket { id } => handle_update_bucket(&self.garage, id, req).await,
// Bucket-key permissions
Endpoint::BucketAllowKey => {
handle_bucket_change_key_perm(&self.garage, req, true).await

View file

@ -14,6 +14,7 @@ use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*;
use garage_model::garage::Garage;
use garage_model::permission::*;
use garage_model::s3::object_table::*;
use crate::admin::error::*;
use crate::admin::key::ApiBucketKeyPerm;
@ -77,6 +78,13 @@ struct BucketLocalAlias {
alias: String,
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ApiBucketQuotas {
max_size: Option<u64>,
max_objects: Option<u64>,
}
pub async fn handle_get_bucket_info(
garage: &Arc<Garage>,
id: Option<String>,
@ -108,6 +116,14 @@ async fn bucket_info_results(
.get_existing_bucket(bucket_id)
.await?;
let counters = garage
.object_counter_table
.table
.get(&bucket_id, &EmptyKey)
.await?
.map(|x| x.filtered_values(&garage.system.ring.borrow()))
.unwrap_or_default();
let mut relevant_keys = HashMap::new();
for (k, _) in bucket
.state
@ -148,6 +164,7 @@ async fn bucket_info_results(
let state = bucket.state.as_option().unwrap();
let quotas = state.quotas.get();
let res =
GetBucketInfoResult {
id: hex::encode(&bucket.id),
@ -191,6 +208,16 @@ async fn bucket_info_results(
}
})
.collect::<Vec<_>>(),
objects: counters.get(OBJECTS).cloned().unwrap_or_default(),
bytes: counters.get(BYTES).cloned().unwrap_or_default(),
unfinshed_uploads: counters
.get(UNFINISHED_UPLOADS)
.cloned()
.unwrap_or_default(),
quotas: ApiBucketQuotas {
max_size: quotas.max_size,
max_objects: quotas.max_objects,
},
};
Ok(json_ok_response(&res)?)
@ -205,6 +232,10 @@ struct GetBucketInfoResult {
#[serde(default)]
website_config: Option<GetBucketInfoWebsiteResult>,
keys: Vec<GetBucketInfoKey>,
objects: i64,
bytes: i64,
unfinshed_uploads: i64,
quotas: ApiBucketQuotas,
}
#[derive(Serialize)]
@ -363,14 +394,12 @@ pub async fn handle_delete_bucket(
.body(Body::empty())?)
}
// ---- BUCKET WEBSITE CONFIGURATION ----
pub async fn handle_put_bucket_website(
pub async fn handle_update_bucket(
garage: &Arc<Garage>,
id: String,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
let req = parse_json_body::<PutBucketWebsiteRequest>(req).await?;
let req = parse_json_body::<UpdateBucketRequest>(req).await?;
let bucket_id = parse_bucket_id(&id)?;
let mut bucket = garage
@ -379,10 +408,31 @@ pub async fn handle_put_bucket_website(
.await?;
let state = bucket.state.as_option_mut().unwrap();
if let Some(wa) = req.website_access {
if wa.enabled {
state.website_config.update(Some(WebsiteConfig {
index_document: req.index_document,
error_document: req.error_document,
index_document: wa.index_document.ok_or_bad_request(
"Please specify indexDocument when enabling website access.",
)?,
error_document: wa.error_document,
}));
} else {
if wa.index_document.is_some() || wa.error_document.is_some() {
return Err(Error::bad_request(
"Cannot specify indexDocument or errorDocument when disabling website access.",
));
}
state.website_config.update(None);
}
}
if let Some(q) = req.quotas {
state.quotas.update(BucketQuotas {
max_size: q.max_size,
max_objects: q.max_objects,
});
}
garage.bucket_table.insert(&bucket).await?;
@ -391,29 +441,17 @@ pub async fn handle_put_bucket_website(
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct PutBucketWebsiteRequest {
index_document: String,
#[serde(default)]
error_document: Option<String>,
struct UpdateBucketRequest {
website_access: Option<UpdateBucketWebsiteAccess>,
quotas: Option<ApiBucketQuotas>,
}
pub async fn handle_delete_bucket_website(
garage: &Arc<Garage>,
id: String,
) -> Result<Response<Body>, Error> {
let bucket_id = parse_bucket_id(&id)?;
let mut bucket = garage
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let state = bucket.state.as_option_mut().unwrap();
state.website_config.update(None);
garage.bucket_table.insert(&bucket).await?;
bucket_info_results(garage, bucket_id).await
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct UpdateBucketWebsiteAccess {
enabled: bool,
index_document: Option<String>,
error_document: Option<String>,
}
// ---- BUCKET/KEY PERMISSIONS ----

View file

@ -48,10 +48,7 @@ pub enum Endpoint {
DeleteBucket {
id: String,
},
PutBucketWebsite {
id: String,
},
DeleteBucketWebsite {
UpdateBucket {
id: String,
},
// Bucket-Key Permissions
@ -113,8 +110,7 @@ impl Endpoint {
GET "/v0/bucket" => ListBuckets,
POST "/v0/bucket" => CreateBucket,
DELETE "/v0/bucket" if id => DeleteBucket (query::id),
PUT "/v0/bucket/website" if id => PutBucketWebsite (query::id),
DELETE "/v0/bucket/website" if id => DeleteBucketWebsite (query::id),
PUT "/v0/bucket" if id => UpdateBucket (query::id),
// Bucket-key permissions
POST "/v0/bucket/allow" => BucketAllowKey,
POST "/v0/bucket/deny" => BucketDenyKey,

View file

@ -10,7 +10,7 @@ use garage_rpc::ring::Ring;
use garage_table::util::*;
use garage_model::garage::Garage;
use garage_model::k2v::counter_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
use crate::k2v::error::*;
use crate::k2v::range::read_range;

View file

@ -212,7 +212,7 @@ impl ApiHandler for S3ApiServer {
.await
}
Endpoint::PutObject { key } => {
handle_put(garage, req, bucket_id, &key, content_sha256).await
handle_put(garage, req, &bucket, &key, content_sha256).await
}
Endpoint::AbortMultipartUpload { key, upload_id } => {
handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
@ -226,7 +226,7 @@ impl ApiHandler for S3ApiServer {
garage,
req,
&bucket_name,
bucket_id,
&bucket,
&key,
&upload_id,
content_sha256,

View file

@ -22,7 +22,7 @@ use crate::signature::payload::{parse_date, verify_v4};
pub async fn handle_post_object(
garage: Arc<Garage>,
req: Request<Body>,
bucket: String,
bucket_name: String,
) -> Result<Response<Body>, Error> {
let boundary = req
.headers()
@ -126,13 +126,18 @@ pub async fn handle_post_object(
let bucket_id = garage
.bucket_helper()
.resolve_bucket(&bucket, &api_key)
.resolve_bucket(&bucket_name, &api_key)
.await?;
if !api_key.allow_write(&bucket_id) {
return Err(Error::forbidden("Operation is not allowed for this key."));
}
let bucket = garage
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let decoded_policy = base64::decode(&policy).ok_or_bad_request("Invalid policy")?;
let decoded_policy: Policy =
serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?;
@ -227,7 +232,7 @@ pub async fn handle_post_object(
garage,
headers,
StreamLimiter::new(stream, conditions.content_length),
bucket_id,
&bucket,
&key,
None,
None,
@ -244,7 +249,7 @@ pub async fn handle_post_object(
{
target
.query_pairs_mut()
.append_pair("bucket", &bucket)
.append_pair("bucket", &bucket_name)
.append_pair("key", &key)
.append_pair("etag", &etag);
let target = target.to_string();
@ -289,7 +294,7 @@ pub async fn handle_post_object(
let xml = s3_xml::PostObject {
xmlns: (),
location: s3_xml::Value(location),
bucket: s3_xml::Value(bucket),
bucket: s3_xml::Value(bucket_name),
key: s3_xml::Value(key),
etag: s3_xml::Value(etag),
};

View file

@ -1,4 +1,4 @@
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::sync::Arc;
use futures::prelude::*;
@ -14,7 +14,9 @@ use garage_util::error::Error as GarageError;
use garage_util::time::*;
use garage_block::manager::INLINE_THRESHOLD;
use garage_model::bucket_table::Bucket;
use garage_model::garage::Garage;
use garage_model::index_counter::CountedItem;
use garage_model::s3::block_ref_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
@ -26,7 +28,7 @@ use crate::signature::verify_signed_content;
pub async fn handle_put(
garage: Arc<Garage>,
req: Request<Body>,
bucket_id: Uuid,
bucket: &Bucket,
key: &str,
content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
@ -46,7 +48,7 @@ pub async fn handle_put(
garage,
headers,
body,
bucket_id,
bucket,
key,
content_md5,
content_sha256,
@ -59,7 +61,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage: Arc<Garage>,
headers: ObjectVersionHeaders,
body: S,
bucket_id: Uuid,
bucket: &Bucket,
key: &str,
content_md5: Option<String>,
content_sha256: Option<FixedBytes32>,
@ -80,6 +82,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let data_md5sum_hex = hex::encode(data_md5sum);
let data_sha256sum = sha256sum(&first_block[..]);
let size = first_block.len() as u64;
ensure_checksum_matches(
data_md5sum.as_slice(),
@ -88,20 +91,22 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256,
)?;
check_quotas(&garage, bucket, key, size).await?;
let object_version = ObjectVersion {
uuid: version_uuid,
timestamp: version_timestamp,
state: ObjectVersionState::Complete(ObjectVersionData::Inline(
ObjectVersionMeta {
headers,
size: first_block.len() as u64,
size,
etag: data_md5sum_hex.clone(),
},
first_block,
)),
};
let object = Object::new(bucket_id, key.into(), vec![object_version]);
let object = Object::new(bucket.id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
return Ok((version_uuid, data_md5sum_hex));
@ -114,19 +119,21 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
timestamp: version_timestamp,
state: ObjectVersionState::Uploading(headers.clone()),
};
let object = Object::new(bucket_id, key.into(), vec![object_version.clone()]);
let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
// Initialize corresponding entry in version table
// Write this entry now, even with empty block list,
// to prevent block_ref entries from being deleted (they can be deleted
// if the reference a version that isn't found in the version table)
let version = Version::new(version_uuid, bucket_id, key.into(), false);
let version = Version::new(version_uuid, bucket.id, key.into(), false);
garage.version_table.insert(&version).await?;
// Transfer data and verify checksum
let first_block_hash = blake2sum(&first_block[..]);
let tx_result = read_and_put_blocks(
let tx_result = (|| async {
let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks(
&garage,
&version,
1,
@ -134,16 +141,20 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
first_block_hash,
&mut chunker,
)
.await
.and_then(|(total_size, data_md5sum, data_sha256sum)| {
.await?;
ensure_checksum_matches(
data_md5sum.as_slice(),
data_sha256sum,
content_md5.as_deref(),
content_sha256,
)
.map(|()| (total_size, data_md5sum))
});
)?;
check_quotas(&garage, bucket, key, total_size).await?;
Ok((total_size, data_md5sum))
})()
.await;
// If something went wrong, clean up
let (total_size, md5sum_arr) = match tx_result {
@ -151,7 +162,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
Err(e) => {
// Mark object as aborted, this will free the blocks further down
object_version.state = ObjectVersionState::Aborted;
let object = Object::new(bucket_id, key.into(), vec![object_version.clone()]);
let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
return Err(e);
}
@ -167,7 +178,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
},
first_block_hash,
));
let object = Object::new(bucket_id, key.into(), vec![object_version]);
let object = Object::new(bucket.id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
Ok((version_uuid, md5sum_hex))
@ -200,6 +211,64 @@ fn ensure_checksum_matches(
Ok(())
}
/// Check that inserting this object with this size doesn't exceed bucket quotas
async fn check_quotas(
garage: &Arc<Garage>,
bucket: &Bucket,
key: &str,
size: u64,
) -> Result<(), Error> {
let quotas = bucket.state.as_option().unwrap().quotas.get();
if quotas.max_objects.is_none() && quotas.max_size.is_none() {
return Ok(());
};
let key = key.to_string();
let (prev_object, counters) = futures::try_join!(
garage.object_table.get(&bucket.id, &key),
garage.object_counter_table.table.get(&bucket.id, &EmptyKey),
)?;
let counters = counters
.map(|x| x.filtered_values(&garage.system.ring.borrow()))
.unwrap_or_default();
let (prev_cnt_obj, prev_cnt_size) = match prev_object {
Some(o) => {
let prev_cnt = o.counts().into_iter().collect::<HashMap<_, _>>();
(
prev_cnt.get(OBJECTS).cloned().unwrap_or_default(),
prev_cnt.get(BYTES).cloned().unwrap_or_default(),
)
}
None => (0, 0),
};
let cnt_obj_diff = 1 - prev_cnt_obj;
let cnt_size_diff = size as i64 - prev_cnt_size;
if let Some(mo) = quotas.max_objects {
let current_objects = counters.get(OBJECTS).cloned().unwrap_or_default();
if cnt_obj_diff > 0 && current_objects + cnt_obj_diff > mo as i64 {
return Err(Error::forbidden(format!(
"Object quota is reached, maximum objects for this bucket: {}",
mo
)));
}
}
if let Some(ms) = quotas.max_size {
let current_size = counters.get(BYTES).cloned().unwrap_or_default();
if cnt_size_diff > 0 && current_size + cnt_size_diff > ms as i64 {
return Err(Error::forbidden(format!(
"Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.",
ms, current_size, size
)));
}
}
Ok(())
}
async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage: &Garage,
version: &Version,
@ -473,7 +542,7 @@ pub async fn handle_complete_multipart_upload(
garage: Arc<Garage>,
req: Request<Body>,
bucket_name: &str,
bucket_id: Uuid,
bucket: &Bucket,
key: &str,
upload_id: &str,
content_sha256: Option<Hash>,
@ -497,7 +566,7 @@ pub async fn handle_complete_multipart_upload(
// Get object and version
let key = key.to_string();
let (object, version) = futures::try_join!(
garage.object_table.get(&bucket_id, &key),
garage.object_table.get(&bucket.id, &key),
garage.version_table.get(&version_uuid, &EmptyKey),
)?;
@ -590,6 +659,14 @@ pub async fn handle_complete_multipart_upload(
// Calculate total size of final object
let total_size = version.blocks.items().iter().map(|x| x.1.size).sum();
if let Err(e) = check_quotas(&garage, bucket, &key, total_size).await {
object_version.state = ObjectVersionState::Aborted;
let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
return Err(e);
}
// Write final object version
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta {
@ -600,7 +677,7 @@ pub async fn handle_complete_multipart_upload(
version.blocks.items()[0].1.hash,
));
let final_object = Object::new(bucket_id, key.clone(), vec![object_version]);
let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
// Send response saying ok we're done

View file

@ -197,6 +197,11 @@ impl Tree {
pub fn remove<T: AsRef<[u8]>>(&self, key: T) -> Result<Option<Value>> {
self.0.remove(self.1, key.as_ref())
}
/// Clears all values from the tree
#[inline]
pub fn clear(&self) -> Result<()> {
self.0.clear(self.1)
}
#[inline]
pub fn iter(&self) -> Result<ValueIter<'_>> {
@ -311,6 +316,7 @@ pub(crate) trait IDb: Send + Sync {
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>>;
fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>;
fn clear(&self, tree: usize) -> Result<()>;
fn iter(&self, tree: usize) -> Result<ValueIter<'_>>;
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>>;

View file

@ -139,6 +139,14 @@ impl IDb for LmdbDb {
Ok(old_val)
}
fn clear(&self, tree: usize) -> Result<()> {
let tree = self.get_tree(tree)?;
let mut tx = self.db.write_txn()?;
tree.clear(&mut tx)?;
tx.commit()?;
Ok(())
}
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?;
let tx = self.db.read_txn()?;

View file

@ -113,6 +113,12 @@ impl IDb for SledDb {
Ok(old_val.map(|x| x.to_vec()))
}
fn clear(&self, tree: usize) -> Result<()> {
let tree = self.get_tree(tree)?;
tree.clear()?;
Ok(())
}
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?;
Ok(Box::new(tree.iter().map(|v| {

View file

@ -182,6 +182,16 @@ impl IDb for SqliteDb {
Ok(old_val)
}
fn clear(&self, tree: usize) -> Result<()> {
trace!("clear {}: lock db", tree);
let this = self.0.lock().unwrap();
trace!("clear {}: lock acquired", tree);
let tree = this.get_tree(tree)?;
this.db.execute(&format!("DELETE FROM {}", tree), [])?;
Ok(())
}
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
trace!("iter {}: lock db", tree);
let this = self.0.lock().unwrap();

View file

@ -30,6 +30,7 @@ garage_util = { version = "0.7.0", path = "../util" }
garage_web = { version = "0.7.0", path = "../web" }
bytes = "1.0"
bytesize = "1.1"
hex = "0.4"
tracing = { version = "0.1.30", features = ["log-always"] }
pretty_env_logger = "0.4"

View file

@ -24,11 +24,12 @@ use garage_model::migrate::Migrate;
use garage_model::permission::*;
use crate::cli::*;
use crate::repair::Repair;
use crate::repair::online::OnlineRepair;
pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
#[derive(Debug, Serialize, Deserialize)]
#[allow(clippy::large_enum_variant)]
pub enum AdminRpc {
BucketOperation(BucketOperation),
KeyOperation(KeyOperation),
@ -39,7 +40,11 @@ pub enum AdminRpc {
// Replies
Ok(String),
BucketList(Vec<Bucket>),
BucketInfo(Bucket, HashMap<String, Key>),
BucketInfo {
bucket: Bucket,
relevant_keys: HashMap<String, Key>,
counters: HashMap<String, i64>,
},
KeyList(Vec<(String, String)>),
KeyInfo(Key, HashMap<Uuid, Bucket>),
}
@ -72,6 +77,7 @@ impl AdminRpcHandler {
BucketOperation::Allow(query) => self.handle_bucket_allow(query).await,
BucketOperation::Deny(query) => self.handle_bucket_deny(query).await,
BucketOperation::Website(query) => self.handle_bucket_website(query).await,
BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await,
}
}
@ -87,6 +93,7 @@ impl AdminRpcHandler {
EnumerationOrder::Forward,
)
.await?;
Ok(AdminRpc::BucketList(buckets))
}
@ -104,6 +111,15 @@ impl AdminRpcHandler {
.get_existing_bucket(bucket_id)
.await?;
let counters = self
.garage
.object_counter_table
.table
.get(&bucket_id, &EmptyKey)
.await?
.map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
.unwrap_or_default();
let mut relevant_keys = HashMap::new();
for (k, _) in bucket
.state
@ -139,7 +155,11 @@ impl AdminRpcHandler {
}
}
Ok(AdminRpc::BucketInfo(bucket, relevant_keys))
Ok(AdminRpc::BucketInfo {
bucket,
relevant_keys,
counters,
})
}
#[allow(clippy::ptr_arg)]
@ -431,6 +451,60 @@ impl AdminRpcHandler {
Ok(AdminRpc::Ok(msg))
}
async fn handle_bucket_set_quotas(&self, query: &SetQuotasOpt) -> Result<AdminRpc, Error> {
let bucket_id = self
.garage
.bucket_helper()
.resolve_global_bucket_name(&query.bucket)
.await?
.ok_or_bad_request("Bucket not found")?;
let mut bucket = self
.garage
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let bucket_state = bucket.state.as_option_mut().unwrap();
if query.max_size.is_none() && query.max_objects.is_none() {
return Err(Error::BadRequest(
"You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(),
));
}
let mut quotas = bucket_state.quotas.get().clone();
match query.max_size.as_ref().map(String::as_ref) {
Some("none") => quotas.max_size = None,
Some(v) => {
let bs = v
.parse::<bytesize::ByteSize>()
.ok_or_bad_request(format!("Invalid size specified: {}", v))?;
quotas.max_size = Some(bs.as_u64());
}
_ => (),
}
match query.max_objects.as_ref().map(String::as_ref) {
Some("none") => quotas.max_objects = None,
Some(v) => {
let mo = v
.parse::<u64>()
.ok_or_bad_request(format!("Invalid number specified: {}", v))?;
quotas.max_objects = Some(mo);
}
_ => (),
}
bucket_state.quotas.update(quotas);
self.garage.bucket_table.insert(&bucket).await?;
Ok(AdminRpc::Ok(format!(
"Quotas updated for {}",
&query.bucket
)))
}
async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
match cmd {
KeyOperation::List => self.handle_list_keys().await,
@ -619,7 +693,7 @@ impl AdminRpcHandler {
)))
}
} else {
let repair = Repair {
let repair = OnlineRepair {
garage: self.garage.clone(),
};
self.garage

View file

@ -169,8 +169,12 @@ pub async fn cmd_admin(
AdminRpc::BucketList(bl) => {
print_bucket_list(bl);
}
AdminRpc::BucketInfo(bucket, rk) => {
print_bucket_info(&bucket, &rk);
AdminRpc::BucketInfo {
bucket,
relevant_keys,
counters,
} => {
print_bucket_info(&bucket, &relevant_keys, &counters);
}
AdminRpc::KeyList(kl) => {
print_key_list(kl);

View file

@ -33,10 +33,15 @@ pub enum Command {
#[structopt(name = "migrate")]
Migrate(MigrateOpt),
/// Start repair of node data
/// Start repair of node data on remote node
#[structopt(name = "repair")]
Repair(RepairOpt),
/// Offline reparation of node data (these repairs must be run offline
/// directly on the server node)
#[structopt(name = "offline-repair")]
OfflineRepair(OfflineRepairOpt),
/// Gather node statistics
#[structopt(name = "stats")]
Stats(StatsOpt),
@ -175,6 +180,10 @@ pub enum BucketOperation {
/// Expose as website or not
#[structopt(name = "website")]
Website(WebsiteOpt),
/// Set the quotas for this bucket
#[structopt(name = "set-quotas")]
SetQuotas(SetQuotasOpt),
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
@ -261,6 +270,21 @@ pub struct PermBucketOpt {
pub bucket: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct SetQuotasOpt {
/// Bucket name
pub bucket: String,
/// Set a maximum size for the bucket (specify a size e.g. in MiB or GiB,
/// or `none` for no size restriction)
#[structopt(long = "max-size")]
pub max_size: Option<String>,
/// Set a maximum number of objects for the bucket (or `none` for no restriction)
#[structopt(long = "max-objects")]
pub max_objects: Option<String>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub enum KeyOperation {
/// List keys
@ -405,6 +429,27 @@ pub enum RepairWhat {
},
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct OfflineRepairOpt {
/// Confirm the launch of the repair operation
#[structopt(long = "yes")]
pub yes: bool,
#[structopt(subcommand)]
pub what: OfflineRepairWhat,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum OfflineRepairWhat {
/// Repair K2V item counters
#[cfg(feature = "k2v")]
#[structopt(name = "k2v_item_counters")]
K2VItemCounters,
/// Repair object counters
#[structopt(name = "object_counters")]
ObjectCounters,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct StatsOpt {
/// Gather statistics from all nodes

View file

@ -7,6 +7,7 @@ use garage_util::formater::format_table;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
pub fn print_bucket_list(bl: Vec<Bucket>) {
println!("List of buckets:");
@ -29,11 +30,12 @@ pub fn print_bucket_list(bl: Vec<Bucket>) {
[((k, n), _, _)] => format!("{}:{}", k, n),
s => format!("[{} local aliases]", s.len()),
};
table.push(format!(
"\t{}\t{}\t{}",
aliases.join(","),
local_aliases_n,
hex::encode(bucket.id)
hex::encode(bucket.id),
));
}
format_table(table);
@ -121,7 +123,11 @@ pub fn print_key_info(key: &Key, relevant_buckets: &HashMap<Uuid, Bucket>) {
}
}
pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>) {
pub fn print_bucket_info(
bucket: &Bucket,
relevant_keys: &HashMap<String, Key>,
counters: &HashMap<String, i64>,
) {
let key_name = |k| {
relevant_keys
.get(k)
@ -133,7 +139,42 @@ pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>)
match &bucket.state {
Deletable::Deleted => println!("Bucket is deleted."),
Deletable::Present(p) => {
println!("Website access: {}", p.website_config.get().is_some());
let size =
bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64);
println!(
"\nSize: {} ({})",
size.to_string_as(true),
size.to_string_as(false)
);
println!(
"Objects: {}",
counters.get(OBJECTS).cloned().unwrap_or_default()
);
println!(
"Unfinished multipart uploads: {}",
counters
.get(UNFINISHED_UPLOADS)
.cloned()
.unwrap_or_default()
);
println!("\nWebsite access: {}", p.website_config.get().is_some());
let quotas = p.quotas.get();
if quotas.max_size.is_some() || quotas.max_objects.is_some() {
println!("\nQuotas:");
if let Some(ms) = quotas.max_size {
let ms = bytesize::ByteSize::b(ms);
println!(
" maximum size: {} ({})",
ms.to_string_as(true),
ms.to_string_as(false)
);
}
if let Some(mo) = quotas.max_objects {
println!(" maximum number of objects: {}", mo);
}
}
println!("\nGlobal aliases:");
for (alias, _, active) in p.aliases.items().iter() {

View file

@ -61,17 +61,17 @@ async fn main() {
pretty_env_logger::init();
sodiumoxide::init().expect("Unable to init sodiumoxide");
let opt = Opt::from_args();
let res = match opt.cmd {
Command::Server => {
// Abort on panic (same behavior as in Go)
std::panic::set_hook(Box::new(|panic_info| {
error!("{}", panic_info.to_string());
std::process::abort();
}));
server::run_server(opt.config_file).await
let opt = Opt::from_args();
let res = match opt.cmd {
Command::Server => server::run_server(opt.config_file).await,
Command::OfflineRepair(repair_opt) => {
repair::offline::offline_repair(opt.config_file, repair_opt).await
}
Command::Node(NodeOperation::NodeId(node_id_opt)) => {
node_id_command(opt.config_file, node_id_opt.quiet)

2
src/garage/repair/mod.rs Normal file
View file

@ -0,0 +1,2 @@
pub mod offline;
pub mod online;

View file

@ -0,0 +1,55 @@
use std::path::PathBuf;
use tokio::sync::watch;
use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::*;
use garage_model::garage::Garage;
use crate::cli::structs::*;
pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Result<(), Error> {
if !opt.yes {
return Err(Error::Message(
"Please add the --yes flag to launch repair operation".into(),
));
}
info!("Loading configuration...");
let config = read_config(config_file)?;
info!("Initializing background runner...");
let (done_tx, done_rx) = watch::channel(false);
let (background, await_background_done) = BackgroundRunner::new(16, done_rx);
info!("Initializing Garage main data store...");
let garage = Garage::new(config.clone(), background)?;
info!("Launching repair operation...");
match opt.what {
#[cfg(feature = "k2v")]
OfflineRepairWhat::K2VItemCounters => {
garage
.k2v
.counter_table
.offline_recount_all(&garage.k2v.item_table)?;
}
OfflineRepairWhat::ObjectCounters => {
garage
.object_counter_table
.offline_recount_all(&garage.object_table)?;
}
}
info!("Repair operation finished, shutting down Garage internals...");
done_tx.send(true).unwrap();
drop(garage);
await_background_done.await?;
info!("Cleaning up...");
Ok(())
}

View file

@ -11,11 +11,11 @@ use garage_util::error::Error;
use crate::*;
pub struct Repair {
pub struct OnlineRepair {
pub garage: Arc<Garage>,
}
impl Repair {
impl OnlineRepair {
pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) {
if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
warn!("Repair worker failed with error: {}", e);

View file

@ -2,8 +2,6 @@ use std::path::PathBuf;
use tokio::sync::watch;
use garage_db as db;
use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
@ -29,57 +27,14 @@ async fn wait_from(mut chan: watch::Receiver<bool>) {
pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Loading configuration...");
let config = read_config(config_file).expect("Unable to read config file");
info!("Opening database...");
let mut db_path = config.metadata_dir.clone();
std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory");
let db = match config.db_engine.as_str() {
"sled" => {
db_path.push("db");
info!("Opening Sled database at: {}", db_path.display());
let db = db::sled_adapter::sled::Config::default()
.path(&db_path)
.cache_capacity(config.sled_cache_capacity)
.flush_every_ms(Some(config.sled_flush_every_ms))
.open()
.expect("Unable to open sled DB");
db::sled_adapter::SledDb::init(db)
}
"sqlite" | "sqlite3" | "rusqlite" => {
db_path.push("db.sqlite");
info!("Opening Sqlite database at: {}", db_path.display());
let db = db::sqlite_adapter::rusqlite::Connection::open(db_path)
.expect("Unable to open sqlite DB");
db::sqlite_adapter::SqliteDb::init(db)
}
"lmdb" | "heed" => {
db_path.push("db.lmdb");
info!("Opening LMDB database at: {}", db_path.display());
std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory");
let map_size = garage_db::lmdb_adapter::recommended_map_size();
let db = db::lmdb_adapter::heed::EnvOpenOptions::new()
.max_dbs(100)
.map_size(map_size)
.open(&db_path)
.expect("Unable to open LMDB DB");
db::lmdb_adapter::LmdbDb::init(db)
}
e => {
return Err(Error::Message(format!(
"Unsupported DB engine: {} (options: sled, sqlite, lmdb)",
e
)));
}
};
let config = read_config(config_file)?;
info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c();
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
info!("Initializing Garage main data store...");
let garage = Garage::new(config.clone(), db, background);
let garage = Garage::new(config.clone(), background)?;
info!("Initialize tracing...");
if let Some(export_to) = config.admin.trace_sink {
@ -89,6 +44,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initialize Admin API server and metrics collector...");
let admin_server = AdminApiServer::new(garage.clone());
info!("Launching internal Garage cluster communications...");
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
info!("Create admin RPC handler...");

View file

@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
use garage_table::crdt::Crdt;
use garage_table::crdt::*;
use garage_table::*;
use garage_util::data::*;
use garage_util::time::*;
@ -44,6 +44,9 @@ pub struct BucketParams {
pub website_config: crdt::Lww<Option<WebsiteConfig>>,
/// CORS rules
pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>,
/// Bucket quotas
#[serde(default)]
pub quotas: crdt::Lww<BucketQuotas>,
}
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
@ -62,6 +65,18 @@ pub struct CorsRule {
pub expose_headers: Vec<String>,
}
#[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct BucketQuotas {
/// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket)
pub max_size: Option<u64>,
/// Maximum number of non-deleted objects in the bucket
pub max_objects: Option<u64>,
}
impl AutoCrdt for BucketQuotas {
const WARN_IF_DIFFERENT: bool = true;
}
impl BucketParams {
/// Create an empty BucketParams with no authorized keys and no website accesss
pub fn new() -> Self {
@ -72,6 +87,7 @@ impl BucketParams {
local_aliases: crdt::LwwMap::new(),
website_config: crdt::Lww::new(None),
cors_config: crdt::Lww::new(None),
quotas: crdt::Lww::new(BucketQuotas::default()),
}
}
}
@ -86,6 +102,7 @@ impl Crdt for BucketParams {
self.website_config.merge(&o.website_config);
self.cors_config.merge(&o.cors_config);
self.quotas.merge(&o.quotas);
}
}

View file

@ -6,6 +6,7 @@ use garage_db as db;
use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
use garage_rpc::system::System;
@ -22,12 +23,11 @@ use crate::s3::version_table::*;
use crate::bucket_alias_table::*;
use crate::bucket_table::*;
use crate::helper;
use crate::index_counter::*;
use crate::key_table::*;
#[cfg(feature = "k2v")]
use crate::index_counter::*;
#[cfg(feature = "k2v")]
use crate::k2v::{counter_table::*, item_table::*, poll::*, rpc::*};
use crate::k2v::{item_table::*, poll::*, rpc::*};
/// An entire Garage full of data
pub struct Garage {
@ -52,6 +52,8 @@ pub struct Garage {
/// Table containing S3 objects
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
/// Counting table containing object counters
pub object_counter_table: Arc<IndexCounter<Object>>,
/// Table containing S3 object versions
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
/// Table containing S3 block references (not blocks themselves)
@ -66,14 +68,57 @@ pub struct GarageK2V {
/// Table containing K2V items
pub item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
/// Indexing table containing K2V item counters
pub counter_table: Arc<IndexCounter<K2VCounterTable>>,
pub counter_table: Arc<IndexCounter<K2VItem>>,
/// K2V RPC handler
pub rpc: Arc<K2VRpcHandler>,
}
impl Garage {
/// Create and run garage
pub fn new(config: Config, db: db::Db, background: Arc<BackgroundRunner>) -> Arc<Self> {
pub fn new(config: Config, background: Arc<BackgroundRunner>) -> Result<Arc<Self>, Error> {
info!("Opening database...");
let mut db_path = config.metadata_dir.clone();
std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory");
let db = match config.db_engine.as_str() {
"sled" => {
db_path.push("db");
info!("Opening Sled database at: {}", db_path.display());
let db = db::sled_adapter::sled::Config::default()
.path(&db_path)
.cache_capacity(config.sled_cache_capacity)
.flush_every_ms(Some(config.sled_flush_every_ms))
.open()
.expect("Unable to open sled DB");
db::sled_adapter::SledDb::init(db)
}
"sqlite" | "sqlite3" | "rusqlite" => {
db_path.push("db.sqlite");
info!("Opening Sqlite database at: {}", db_path.display());
let db = db::sqlite_adapter::rusqlite::Connection::open(db_path)
.expect("Unable to open sqlite DB");
db::sqlite_adapter::SqliteDb::init(db)
}
"lmdb" | "heed" => {
db_path.push("db.lmdb");
info!("Opening LMDB database at: {}", db_path.display());
std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory");
let map_size = garage_db::lmdb_adapter::recommended_map_size();
let db = db::lmdb_adapter::heed::EnvOpenOptions::new()
.max_dbs(100)
.map_size(map_size)
.open(&db_path)
.expect("Unable to open LMDB DB");
db::lmdb_adapter::LmdbDb::init(db)
}
e => {
return Err(Error::Message(format!(
"Unsupported DB engine: {} (options: sled, sqlite, lmdb)",
e
)));
}
};
let network_key = NetworkKey::from_slice(
&hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..],
)
@ -155,12 +200,16 @@ impl Garage {
&db,
);
info!("Initialize object counter table...");
let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
info!("Initialize object_table...");
#[allow(clippy::redundant_clone)]
let object_table = Table::new(
ObjectTable {
background: background.clone(),
version_table: version_table.clone(),
object_counter_table: object_counter_table.clone(),
},
meta_rep_param.clone(),
system.clone(),
@ -171,9 +220,8 @@ impl Garage {
#[cfg(feature = "k2v")]
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
info!("Initialize Garage...");
Arc::new(Self {
// -- done --
Ok(Arc::new(Self {
config,
db,
background,
@ -183,11 +231,12 @@ impl Garage {
bucket_alias_table,
key_table,
object_table,
object_counter_table,
version_table,
block_ref_table,
#[cfg(feature = "k2v")]
k2v,
})
}))
}
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {

View file

@ -1,3 +1,4 @@
use core::ops::Bound;
use std::collections::{hash_map, BTreeMap, HashMap};
use std::marker::PhantomData;
use std::sync::Arc;
@ -12,30 +13,36 @@ use garage_rpc::ring::Ring;
use garage_rpc::system::System;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
use garage_table::crdt::*;
use garage_table::replication::TableShardedReplication;
use garage_table::replication::*;
use garage_table::*;
pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static {
const NAME: &'static str;
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
const COUNTER_TABLE_NAME: &'static str;
type CP: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
type CS: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
fn counter_partition_key(&self) -> &Self::CP;
fn counter_sort_key(&self) -> &Self::CS;
fn counts(&self) -> Vec<(&'static str, i64)>;
}
/// A counter entry in the global table
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct CounterEntry<T: CounterSchema> {
pub pk: T::P,
pub sk: T::S,
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
pub struct CounterEntry<T: CountedItem> {
pub pk: T::CP,
pub sk: T::CS,
pub values: BTreeMap<String, CounterValue>,
}
impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> {
fn partition_key(&self) -> &T::P {
impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
fn partition_key(&self) -> &T::CP {
&self.pk
}
fn sort_key(&self) -> &T::S {
fn sort_key(&self) -> &T::CS {
&self.sk
}
fn is_tombstone(&self) -> bool {
@ -45,7 +52,7 @@ impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> {
}
}
impl<T: CounterSchema> CounterEntry<T> {
impl<T: CountedItem> CounterEntry<T> {
pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> {
let nodes = &ring.layout.node_id_vec[..];
self.filtered_values_with_nodes(nodes)
@ -78,7 +85,7 @@ pub struct CounterValue {
pub node_values: BTreeMap<Uuid, (u64, i64)>,
}
impl<T: CounterSchema> Crdt for CounterEntry<T> {
impl<T: CountedItem> Crdt for CounterEntry<T> {
fn merge(&mut self, other: &Self) {
for (name, e2) in other.values.iter() {
if let Some(e) = self.values.get_mut(name) {
@ -104,15 +111,15 @@ impl Crdt for CounterValue {
}
}
pub struct CounterTable<T: CounterSchema> {
pub struct CounterTable<T: CountedItem> {
_phantom_t: PhantomData<T>,
}
impl<T: CounterSchema> TableSchema for CounterTable<T> {
const TABLE_NAME: &'static str = T::NAME;
impl<T: CountedItem> TableSchema for CounterTable<T> {
const TABLE_NAME: &'static str = T::COUNTER_TABLE_NAME;
type P = T::P;
type S = T::S;
type P = T::CP;
type S = T::CS;
type E = CounterEntry<T>;
type Filter = (DeletedFilter, Vec<Uuid>);
@ -131,14 +138,14 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> {
// ----
pub struct IndexCounter<T: CounterSchema> {
pub struct IndexCounter<T: CountedItem> {
this_node: Uuid,
local_counter: db::Tree,
propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>,
propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>,
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
}
impl<T: CounterSchema> IndexCounter<T> {
impl<T: CountedItem> IndexCounter<T> {
pub fn new(
system: Arc<System>,
replication: TableShardedReplication,
@ -151,7 +158,7 @@ impl<T: CounterSchema> IndexCounter<T> {
let this = Arc::new(Self {
this_node: system.id,
local_counter: db
.open_tree(format!("local_counter:{}", T::NAME))
.open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME))
.expect("Unable to open local counter tree"),
propagate_tx,
table: Table::new(
@ -166,7 +173,7 @@ impl<T: CounterSchema> IndexCounter<T> {
let this2 = this.clone();
background.spawn_worker(
format!("{} index counter propagator", T::NAME),
format!("{} index counter propagator", T::COUNTER_TABLE_NAME),
move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit),
);
this
@ -175,24 +182,45 @@ impl<T: CounterSchema> IndexCounter<T> {
pub fn count(
&self,
tx: &mut db::Transaction,
pk: &T::P,
sk: &T::S,
counts: &[(&str, i64)],
old: Option<&T>,
new: Option<&T>,
) -> db::TxResult<(), Error> {
let pk = old
.map(|e| e.counter_partition_key())
.unwrap_or_else(|| new.unwrap().counter_partition_key());
let sk = old
.map(|e| e.counter_sort_key())
.unwrap_or_else(|| new.unwrap().counter_sort_key());
// calculate counter differences
let mut counts = HashMap::new();
for (k, v) in old.map(|x| x.counts()).unwrap_or_default() {
*counts.entry(k).or_insert(0) -= v;
}
for (k, v) in new.map(|x| x.counts()).unwrap_or_default() {
*counts.entry(k).or_insert(0) += v;
}
// update local counter table
let tree_key = self.table.data.tree_key(pk, sk);
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
Some(old_bytes) => {
rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes)
.map_err(Error::RmpDecode)
.map_err(db::TxError::Abort)?,
.map_err(db::TxError::Abort)?
}
None => LocalCounterEntry {
pk: pk.clone(),
sk: sk.clone(),
values: BTreeMap::new(),
},
};
let now = now_msec();
for (s, inc) in counts.iter() {
let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
ent.0 += 1;
ent.0 = std::cmp::max(ent.0 + 1, now);
ent.1 += *inc;
}
@ -213,7 +241,7 @@ impl<T: CounterSchema> IndexCounter<T> {
async fn propagate_loop(
self: Arc<Self>,
mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>,
mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
must_exit: watch::Receiver<bool>,
) {
// This loop batches updates to counters to be sent all at once.
@ -236,7 +264,7 @@ impl<T: CounterSchema> IndexCounter<T> {
if let Some((pk, sk, counters)) = ent {
let tree_key = self.table.data.tree_key(&pk, &sk);
let dist_entry = counters.into_counter_entry::<T>(self.this_node, pk, sk);
let dist_entry = counters.into_counter_entry(self.this_node);
match buf.entry(tree_key) {
hash_map::Entry::Vacant(e) => {
e.insert(dist_entry);
@ -255,10 +283,10 @@ impl<T: CounterSchema> IndexCounter<T> {
if let Err(e) = self.table.insert_many(entries).await {
errors += 1;
if errors >= 2 && *must_exit.borrow() {
error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e);
error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e);
break;
}
warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors);
warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors);
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
@ -272,23 +300,155 @@ impl<T: CounterSchema> IndexCounter<T> {
}
}
}
pub fn offline_recount_all<TS, TR>(
&self,
counted_table: &Arc<Table<TS, TR>>,
) -> Result<(), Error>
where
TS: TableSchema<E = T>,
TR: TableReplication,
{
let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> {
let entry_k = self
.table
.data
.tree_key(entry.partition_key(), entry.sort_key());
self.table
.data
.update_entry_with(&entry_k, |ent| match ent {
Some(mut ent) => {
ent.merge(&entry);
ent
}
None => entry.clone(),
})?;
Ok(())
};
// 1. Set all old local counters to zero
let now = now_msec();
let mut next_start: Option<Vec<u8>> = None;
loop {
let low_bound = match next_start.take() {
Some(v) => Bound::Excluded(v),
None => Bound::Unbounded,
};
let mut batch = vec![];
for item in self.local_counter.range((low_bound, Bound::Unbounded))? {
batch.push(item?);
if batch.len() > 1000 {
break;
}
}
if batch.is_empty() {
break;
}
info!("zeroing old counters... ({})", hex::encode(&batch[0].0));
for (local_counter_k, local_counter) in batch {
let mut local_counter =
rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?;
for (_, tv) in local_counter.values.iter_mut() {
tv.0 = std::cmp::max(tv.0 + 1, now);
tv.1 = 0;
}
let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
self.local_counter
.insert(&local_counter_k, &local_counter_bytes)?;
let counter_entry = local_counter.into_counter_entry(self.this_node);
save_counter_entry(counter_entry)?;
next_start = Some(local_counter_k);
}
}
// 2. Recount all table entries
let now = now_msec();
let mut next_start: Option<Vec<u8>> = None;
loop {
let low_bound = match next_start.take() {
Some(v) => Bound::Excluded(v),
None => Bound::Unbounded,
};
let mut batch = vec![];
for item in counted_table
.data
.store
.range((low_bound, Bound::Unbounded))?
{
batch.push(item?);
if batch.len() > 1000 {
break;
}
}
if batch.is_empty() {
break;
}
info!("counting entries... ({})", hex::encode(&batch[0].0));
for (counted_entry_k, counted_entry) in batch {
let counted_entry = counted_table.data.decode_entry(&counted_entry)?;
let pk = counted_entry.counter_partition_key();
let sk = counted_entry.counter_sort_key();
let counts = counted_entry.counts();
let local_counter_key = self.table.data.tree_key(pk, sk);
let mut local_counter = match self.local_counter.get(&local_counter_key)? {
Some(old_bytes) => {
let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(
&old_bytes,
)?;
assert!(ent.pk == *pk);
assert!(ent.sk == *sk);
ent
}
None => LocalCounterEntry {
pk: pk.clone(),
sk: sk.clone(),
values: BTreeMap::new(),
},
};
for (s, v) in counts.iter() {
let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0));
tv.0 = std::cmp::max(tv.0 + 1, now);
tv.1 += v;
}
let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
self.local_counter
.insert(&local_counter_key, local_counter_bytes)?;
let counter_entry = local_counter.into_counter_entry(self.this_node);
save_counter_entry(counter_entry)?;
next_start = Some(counted_entry_k);
}
}
// Done
Ok(())
}
}
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
struct LocalCounterEntry {
struct LocalCounterEntry<T: CountedItem> {
pk: T::CP,
sk: T::CS,
values: BTreeMap<String, (u64, i64)>,
}
impl LocalCounterEntry {
fn into_counter_entry<T: CounterSchema>(
self,
this_node: Uuid,
pk: T::P,
sk: T::S,
) -> CounterEntry<T> {
impl<T: CountedItem> LocalCounterEntry<T> {
fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> {
CounterEntry {
pk,
sk,
pk: self.pk,
sk: self.sk,
values: self
.values
.into_iter()

View file

@ -1,20 +0,0 @@
use garage_util::data::*;
use crate::index_counter::*;
pub const ENTRIES: &str = "entries";
pub const CONFLICTS: &str = "conflicts";
pub const VALUES: &str = "values";
pub const BYTES: &str = "bytes";
#[derive(PartialEq, Clone)]
pub struct K2VCounterTable;
impl CounterSchema for K2VCounterTable {
const NAME: &'static str = "k2v_index_counter";
// Partition key = bucket id
type P = Uuid;
// Sort key = K2V item's partition key
type S = String;
}

View file

@ -10,9 +10,13 @@ use garage_table::*;
use crate::index_counter::*;
use crate::k2v::causality::*;
use crate::k2v::counter_table::*;
use crate::k2v::poll::*;
pub const ENTRIES: &str = "entries";
pub const CONFLICTS: &str = "conflicts";
pub const VALUES: &str = "values";
pub const BYTES: &str = "bytes";
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct K2VItem {
pub partition: K2VItemPartition,
@ -112,27 +116,6 @@ impl K2VItem {
ent.discard();
}
}
// returns counters: (non-deleted entries, conflict entries, non-tombstone values, bytes used)
fn stats(&self) -> (i64, i64, i64, i64) {
let values = self.values();
let n_entries = if self.is_tombstone() { 0 } else { 1 };
let n_conflicts = if values.len() > 1 { 1 } else { 0 };
let n_values = values
.iter()
.filter(|v| matches!(v, DvvsValue::Value(_)))
.count() as i64;
let n_bytes = values
.iter()
.map(|v| match v {
DvvsValue::Deleted => 0,
DvvsValue::Value(v) => v.len() as i64,
})
.sum();
(n_entries, n_conflicts, n_values, n_bytes)
}
}
impl DvvsEntry {
@ -204,7 +187,7 @@ impl Entry<K2VItemPartition, String> for K2VItem {
}
pub struct K2VItemTable {
pub(crate) counter_table: Arc<IndexCounter<K2VCounterTable>>,
pub(crate) counter_table: Arc<IndexCounter<K2VItem>>,
pub(crate) subscriptions: Arc<SubscriptionManager>,
}
@ -229,40 +212,14 @@ impl TableSchema for K2VItemTable {
new: Option<&Self::E>,
) -> db::TxOpResult<()> {
// 1. Count
let (old_entries, old_conflicts, old_values, old_bytes) = match old {
None => (0, 0, 0, 0),
Some(e) => e.stats(),
};
let (new_entries, new_conflicts, new_values, new_bytes) = match new {
None => (0, 0, 0, 0),
Some(e) => e.stats(),
};
let count_pk = old
.map(|e| e.partition.bucket_id)
.unwrap_or_else(|| new.unwrap().partition.bucket_id);
let count_sk = old
.map(|e| &e.partition.partition_key)
.unwrap_or_else(|| &new.unwrap().partition.partition_key);
let counter_res = self.counter_table.count(
tx,
&count_pk,
count_sk,
&[
(ENTRIES, new_entries - old_entries),
(CONFLICTS, new_conflicts - old_conflicts),
(VALUES, new_values - old_values),
(BYTES, new_bytes - old_bytes),
],
);
let counter_res = self.counter_table.count(tx, old, new);
if let Err(e) = db::unabort(counter_res)? {
// This result can be returned by `counter_table.count()` for instance
// if messagepack serialization or deserialization fails at some step.
// Warn admin but ignore this error for now, that's all we can do.
error!(
"Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!",
count_pk, count_sk, e
"Unable to update K2V item counter: {}. Index values will be wrong!",
e
);
}
@ -282,6 +239,47 @@ impl TableSchema for K2VItemTable {
}
}
impl CountedItem for K2VItem {
const COUNTER_TABLE_NAME: &'static str = "k2v_index_counter_v2";
// Partition key = bucket id
type CP = Uuid;
// Sort key = K2V item's partition key
type CS = String;
fn counter_partition_key(&self) -> &Uuid {
&self.partition.bucket_id
}
fn counter_sort_key(&self) -> &String {
&self.partition.partition_key
}
fn counts(&self) -> Vec<(&'static str, i64)> {
let values = self.values();
let n_entries = if self.is_tombstone() { 0 } else { 1 };
let n_conflicts = if values.len() > 1 { 1 } else { 0 };
let n_values = values
.iter()
.filter(|v| matches!(v, DvvsValue::Value(_)))
.count() as i64;
let n_bytes = values
.iter()
.map(|v| match v {
DvvsValue::Deleted => 0,
DvvsValue::Value(v) => v.len() as i64,
})
.sum();
vec![
(ENTRIES, n_entries),
(CONFLICTS, n_conflicts),
(VALUES, n_values),
(BYTES, n_bytes),
]
}
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -1,6 +1,5 @@
pub mod causality;
pub mod counter_table;
pub mod item_table;
pub mod poll;

View file

@ -77,6 +77,7 @@ impl Migrate {
local_aliases: LwwMap::new(),
website_config: Lww::new(website),
cors_config: Lww::new(None),
quotas: Lww::new(Default::default()),
}),
})
.await?;

View file

@ -11,10 +11,15 @@ use garage_table::crdt::*;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::index_counter::*;
use crate::s3::version_table::*;
use garage_model_050::object_table as old;
pub const OBJECTS: &str = "objects";
pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads";
pub const BYTES: &str = "bytes";
/// An object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
@ -218,6 +223,7 @@ impl Crdt for Object {
pub struct ObjectTable {
pub background: Arc<BackgroundRunner>,
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
pub object_counter_table: Arc<IndexCounter<Object>>,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
@ -236,10 +242,20 @@ impl TableSchema for ObjectTable {
fn updated(
&self,
_tx: &mut db::Transaction,
tx: &mut db::Transaction,
old: Option<&Self::E>,
new: Option<&Self::E>,
) -> db::TxOpResult<()> {
// 1. Count
let counter_res = self.object_counter_table.count(tx, old, new);
if let Err(e) = db::unabort(counter_res)? {
error!(
"Unable to update object counter: {}. Index values will be wrong!",
e
);
}
// 2. Spawn threads that propagates deletions to version table
let version_table = self.version_table.clone();
let old = old.cloned();
let new = new.cloned();
@ -283,6 +299,49 @@ impl TableSchema for ObjectTable {
}
}
impl CountedItem for Object {
const COUNTER_TABLE_NAME: &'static str = "bucket_object_counter";
// Partition key = bucket id
type CP = Uuid;
// Sort key = nothing
type CS = EmptyKey;
fn counter_partition_key(&self) -> &Uuid {
&self.bucket_id
}
fn counter_sort_key(&self) -> &EmptyKey {
&EmptyKey
}
fn counts(&self) -> Vec<(&'static str, i64)> {
let versions = self.versions();
let n_objects = if versions.iter().any(|v| v.is_data()) {
1
} else {
0
};
let n_unfinished_uploads = versions
.iter()
.filter(|v| matches!(v.state, ObjectVersionState::Uploading(_)))
.count();
let n_bytes = versions
.iter()
.map(|v| match &v.state {
ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _))
| ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta.size,
_ => 0,
})
.sum::<u64>();
vec![
(OBJECTS, n_objects),
(UNFINISHED_UPLOADS, n_unfinished_uploads as i64),
(BYTES, n_bytes as i64),
]
}
}
// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv
// (we just want to change bucket into bucket_id by hashing it)