Compare commits

..

15 Commits

Author SHA1 Message Date
Jill b2eda2c13e
garage_api(fixup): Fix stream early ending edge-case
continuous-integration/drone/pr Build is passing Details
2022-01-13 16:45:23 +01:00
Jill e7e8ce73e3
garage_api(fixup): Fix unlimited buffering 2022-01-13 16:23:55 +01:00
Jill 847b5ad407
garage_api(fixup): Fix unexpected EOF error 2022-01-13 16:23:54 +01:00
Jill 561f614b6e
ci: Use HTTP endpoint for minio 2022-01-13 16:23:54 +01:00
Jill bf08e62ce0
garage_api(fixup): Handle interrupted stream (unittest included) 2022-01-13 16:23:54 +01:00
Jill 2b034b7c4e
garage_api(fixup): Fixups from reviews 2022-01-13 16:23:52 +01:00
Jill f0cb931a45
nix(garage_api): Add missing signature submodule 2022-01-13 16:14:56 +01:00
Jill 006e5cc231
garage_api: Validate signature for chunked PUT payload 2022-01-13 16:14:56 +01:00
Jill f1bfc939aa
garage_api: Handle chunked PUT payload 2022-01-13 16:14:55 +01:00
Jill 732b4d0b63
garage_api: Refactor BodyChunker for stream-composition 2022-01-13 16:14:53 +01:00
Alex f7349f4005
Add quotes in returned etags
continuous-integration/drone/pr Build is passing Details
continuous-integration/drone/push Build is passing Details
2022-01-13 14:03:33 +01:00
Alex 1ee8f596ee
Testing for UploadPartCopies and bugfixes in AWS signatures 2022-01-13 14:03:30 +01:00
Alex 6617a72220
Implement UploadPartCopy 2022-01-13 13:58:47 +01:00
Alex 3770a34e3d
Implement x-amz-copy-if-xxx copy preconditions and return more headers on copy (fix #187) 2022-01-13 13:56:55 +01:00
Quentin b4592a00fe Implement ListMultipartUploads (#171)
continuous-integration/drone/push Build is passing Details
Implement ListMultipartUploads, also refactor ListObjects and ListObjectsV2.

It took me some times as I wanted to propose the following things:
  - Using an iterator instead of the loop+goto pattern. I find it easier to read and it should enable some optimizations. For example, when consuming keys of a common prefix, we do many [redundant checks](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/main/src/api/s3_list.rs#L125-L156) while the only thing to do is to [check if the following key is still part of the common prefix](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/feature/s3-multipart-compat/src/api/s3_list.rs#L476).
  - Try to name things (see ExtractionResult and RangeBegin enums) and to separate concerns (see ListQuery and Accumulator)
  - An IO closure to make unit tests possibles.
  - Unit tests, to track regressions and document how to interact with the code
  - Integration tests with `s3api`. In the future, I would like to move them in Rust with the aws rust SDK.

Merging of the logic of ListMultipartUploads and ListObjects was not a goal but a consequence of the previous modifications.

Some points that we might want to discuss:
  - ListObjectsV1, when using pagination and delimiters, has a weird behavior (it lists multiple times the same prefix) with `aws s3api` due to the fact that it can not use our optimization to skip the whole prefix. It is independant from my refactor and can be tested with the commented `s3api` tests in `test-smoke.sh`. It probably has the same weird behavior on the official AWS S3 implementation.
  - Considering ListMultipartUploads, I had to "abuse" upload id marker to support prefix skipping. I send an `upload-id-marker` with the hardcoded value `include` to emulate your "including" token.
  - Some ways to test ListMultipartUploads with existing software (my tests are limited to s3api for now).

Co-authored-by: Quentin Dufour <quentin@deuxfleurs.fr>
Reviewed-on: #171
Co-authored-by: Quentin <quentin@dufour.io>
Co-committed-by: Quentin <quentin@dufour.io>
2022-01-12 19:04:55 +01:00
23 changed files with 1904 additions and 390 deletions

View File

@ -24,6 +24,11 @@ in let
rustChannel = pkgs.rustPlatform.rust;
overrides = pkgs.buildPackages.rustBuilder.overrides.all ++ [
/*
We want to inject the git version while keeping the build deterministic.
As we do not want to consider the .git folder as part of the input source,
we ask the user (the CI often) to pass the value to Nix.
*/
(pkgs.rustBuilder.rustLib.makeOverride {
name = "garage";
overrideAttrs = drv: if git_version != null then {
@ -33,6 +38,21 @@ in let
'';
} else {};
})
/*
On a sandbox pure NixOS environment, /usr/bin/file is not available.
This is a known problem: https://github.com/NixOS/nixpkgs/issues/98440
We simply patch the file as suggested
*/
/*(pkgs.rustBuilder.rustLib.makeOverride {
name = "libsodium-sys";
overrideAttrs = drv: {
preConfigure = ''
${drv.preConfigure or ""}
sed -i 's,/usr/bin/file,${file}/bin/file,g' ./configure
'';
}
})*/
];
packageFun = import ./Cargo.nix;

View File

@ -21,7 +21,7 @@ Not implemented:
## Endpoint implementation
All APIs that are not mentionned are not implemented and will return a 400 bad request.
All APIs that are not mentionned are not implemented and will return a 501 Not Implemented.
| Endpoint | Status |
|------------------------------|----------------------------------|
@ -43,9 +43,12 @@ All APIs that are not mentionned are not implemented and will return a 400 bad r
| ListBuckets | Implemented |
| ListObjects | Implemented, bugs? (see below) |
| ListObjectsV2 | Implemented |
| ListMultipartUpload | Implemented |
| ListParts | Missing |
| PutObject | Implemented |
| PutBucketWebsite | Partially implemented (see below)|
| UploadPart | Implemented |
| UploadPartCopy | Implemented |
- **GetBucketVersioning:** Stub implementation (Garage does not yet support versionning so this always returns

View File

@ -30,7 +30,7 @@ your motivations for doing so in the PR message.
| | [*GetBucketCors*](https://git.deuxfleurs.fr/Deuxfleurs/garage/issues/138) |
| | [*PutBucketCors*](https://git.deuxfleurs.fr/Deuxfleurs/garage/issues/138) |
| | [*DeleteBucketCors*](https://git.deuxfleurs.fr/Deuxfleurs/garage/issues/138) |
| | [*UploadPartCopy*](https://git.deuxfleurs.fr/Deuxfleurs/garage/issues/160) |
| | UploadPartCopy |
| | [*GetBucketWebsite*](https://git.deuxfleurs.fr/Deuxfleurs/garage/issues/77) |
| | [*PutBucketWebsite*](https://git.deuxfleurs.fr/Deuxfleurs/garage/issues/77) |
| | DeleteBucketWebsite |

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
set -ex

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
set -ex

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
set -e

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
set -ex

View File

@ -1,5 +1,3 @@
#!/bin/bash
export AWS_ACCESS_KEY_ID=`cat /tmp/garage.s3 |cut -d' ' -f1`
export AWS_SECRET_ACCESS_KEY=`cat /tmp/garage.s3 |cut -d' ' -f2`
export AWS_DEFAULT_REGION='garage'

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
set -ex
@ -10,6 +10,7 @@ GARAGE_DEBUG="${REPO_FOLDER}/target/debug/"
GARAGE_RELEASE="${REPO_FOLDER}/target/release/"
NIX_RELEASE="${REPO_FOLDER}/result/bin/"
PATH="${GARAGE_DEBUG}:${GARAGE_RELEASE}:${NIX_RELEASE}:$PATH"
CMDOUT=/tmp/garage.cmd.tmp
# @FIXME Duck is not ready for testing, we have a bug
SKIP_DUCK=1
@ -115,21 +116,196 @@ if [ -z "$SKIP_DUCK" ]; then
done
fi
rm /tmp/garage.{1..3}.{rnd,b64}
# Advanced testing via S3API
if [ -z "$SKIP_AWS" ]; then
echo "🧪 Website Testing"
echo "<h1>hello world</h1>" > /tmp/garage-index.html
aws s3 cp /tmp/garage-index.html s3://eprouvette/index.html
[ `curl -s -o /dev/null -w "%{http_code}" --header "Host: eprouvette.garage.tld" http://127.0.0.1:3921/ ` == 404 ]
garage -c /tmp/config.1.toml bucket website --allow eprouvette
[ `curl -s -o /dev/null -w "%{http_code}" --header "Host: eprouvette.garage.tld" http://127.0.0.1:3921/ ` == 200 ]
garage -c /tmp/config.1.toml bucket website --deny eprouvette
[ `curl -s -o /dev/null -w "%{http_code}" --header "Host: eprouvette.garage.tld" http://127.0.0.1:3921/ ` == 404 ]
aws s3 rm s3://eprouvette/index.html
rm /tmp/garage-index.html
echo "🔌 Test S3API"
echo "Test Objects"
aws s3api put-object --bucket eprouvette --key a
aws s3api put-object --bucket eprouvette --key a/a
aws s3api put-object --bucket eprouvette --key a/b
aws s3api put-object --bucket eprouvette --key a/c
aws s3api put-object --bucket eprouvette --key a/d/a
aws s3api put-object --bucket eprouvette --key a/é
aws s3api put-object --bucket eprouvette --key b
aws s3api put-object --bucket eprouvette --key c
aws s3api list-objects-v2 --bucket eprouvette >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 8 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects-v2 --bucket eprouvette --page-size 0 >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 8 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects-v2 --bucket eprouvette --page-size 999999999999999 >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 8 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects-v2 --bucket eprouvette --page-size 1 >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 8 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects-v2 --bucket eprouvette --delimiter '/' >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 3 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-objects-v2 --bucket eprouvette --delimiter '/' --page-size 1 >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 3 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-objects-v2 --bucket eprouvette --prefix 'a/' >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 5 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects-v2 --bucket eprouvette --prefix 'a/' --delimiter '/' >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 4 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-objects-v2 --bucket eprouvette --prefix 'a/' --page-size 1 >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 5 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects-v2 --bucket eprouvette --prefix 'a/' --delimiter '/' --page-size 1 >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 4 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-objects-v2 --bucket eprouvette --start-after 'Z' >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 8 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects-v2 --bucket eprouvette --start-after 'c' >$CMDOUT
! [ -s $CMDOUT ]
aws s3api list-objects --bucket eprouvette >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 8 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects --bucket eprouvette --page-size 1 >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 8 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects --bucket eprouvette --delimiter '/' >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 3 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
# @FIXME it does not work as expected but might be a limitation of aws s3api
# The problem is the conjunction of a delimiter + pagination + v1 of listobjects
#aws s3api list-objects --bucket eprouvette --delimiter '/' --page-size 1 >$CMDOUT
#[ $(jq '.Contents | length' $CMDOUT) == 3 ]
#[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-objects --bucket eprouvette --prefix 'a/' >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 5 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects --bucket eprouvette --prefix 'a/' --delimiter '/' >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 4 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-objects --bucket eprouvette --prefix 'a/' --page-size 1 >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 5 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
# @FIXME idem
#aws s3api list-objects --bucket eprouvette --prefix 'a/' --delimiter '/' --page-size 1 >$CMDOUT
#[ $(jq '.Contents | length' $CMDOUT) == 4 ]
#[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-objects --bucket eprouvette --starting-token 'Z' >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 8 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects --bucket eprouvette --starting-token 'c' >$CMDOUT
! [ -s $CMDOUT ]
aws s3api list-objects-v2 --bucket eprouvette | \
jq -c '. | {Objects: [.Contents[] | {Key: .Key}], Quiet: true}' | \
aws s3api delete-objects --bucket eprouvette --delete file:///dev/stdin
echo "Test Multipart Upload"
aws s3api create-multipart-upload --bucket eprouvette --key a
aws s3api create-multipart-upload --bucket eprouvette --key a
aws s3api create-multipart-upload --bucket eprouvette --key c
aws s3api create-multipart-upload --bucket eprouvette --key c/a
aws s3api create-multipart-upload --bucket eprouvette --key c/b
aws s3api list-multipart-uploads --bucket eprouvette >$CMDOUT
[ $(jq '.Uploads | length' $CMDOUT) == 5 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-multipart-uploads --bucket eprouvette --page-size 1 >$CMDOUT
[ $(jq '.Uploads | length' $CMDOUT) == 5 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-multipart-uploads --bucket eprouvette --delimiter '/' >$CMDOUT
[ $(jq '.Uploads | length' $CMDOUT) == 3 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-multipart-uploads --bucket eprouvette --delimiter '/' --page-size 1 >$CMDOUT
[ $(jq '.Uploads | length' $CMDOUT) == 3 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-multipart-uploads --bucket eprouvette --prefix 'c' >$CMDOUT
[ $(jq '.Uploads | length' $CMDOUT) == 3 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-multipart-uploads --bucket eprouvette --prefix 'c' --page-size 1 >$CMDOUT
[ $(jq '.Uploads | length' $CMDOUT) == 3 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-multipart-uploads --bucket eprouvette --prefix 'c' --delimiter '/' >$CMDOUT
[ $(jq '.Uploads | length' $CMDOUT) == 1 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-multipart-uploads --bucket eprouvette --prefix 'c' --delimiter '/' --page-size 1 >$CMDOUT
[ $(jq '.Uploads | length' $CMDOUT) == 1 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-multipart-uploads --bucket eprouvette --starting-token 'ZZZZZ' >$CMDOUT
[ $(jq '.Uploads | length' $CMDOUT) == 5 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-multipart-uploads --bucket eprouvette --starting-token 'd' >$CMDOUT
! [ -s $CMDOUT ]
aws s3api list-multipart-uploads --bucket eprouvette | \
jq -r '.Uploads[] | "\(.Key) \(.UploadId)"' | \
while read r; do
key=$(echo $r|cut -d' ' -f 1);
uid=$(echo $r|cut -d' ' -f 2);
aws s3api abort-multipart-upload --bucket eprouvette --key $key --upload-id $uid;
echo "Deleted ${key}:${uid}"
done
# Test for UploadPartCopy
aws s3 cp "/tmp/garage.3.rnd" "s3://eprouvette/copy_part_source"
UPLOAD_ID=$(aws s3api create-multipart-upload --bucket eprouvette --key test_multipart | jq -r .UploadId)
PART1=$(aws s3api upload-part \
--bucket eprouvette --key test_multipart \
--upload-id $UPLOAD_ID --part-number 1 \
--body /tmp/garage.2.rnd | jq .ETag)
PART2=$(aws s3api upload-part-copy \
--bucket eprouvette --key test_multipart \
--upload-id $UPLOAD_ID --part-number 2 \
--copy-source "/eprouvette/copy_part_source" \
--copy-source-range "bytes=500-5000500" \
| jq .CopyPartResult.ETag)
PART3=$(aws s3api upload-part \
--bucket eprouvette --key test_multipart \
--upload-id $UPLOAD_ID --part-number 3 \
--body /tmp/garage.3.rnd | jq .ETag)
cat >/tmp/garage.multipart_struct <<EOF
{
"Parts": [
{
"ETag": $PART1,
"PartNumber": 1
},
{
"ETag": $PART2,
"PartNumber": 2
},
{
"ETag": $PART3,
"PartNumber": 3
}
]
}
EOF
aws s3api complete-multipart-upload \
--bucket eprouvette --key test_multipart --upload-id $UPLOAD_ID \
--multipart-upload file:///tmp/garage.multipart_struct
aws s3 cp "s3://eprouvette/test_multipart" /tmp/garage.test_multipart
cat /tmp/garage.2.rnd <(tail -c +501 /tmp/garage.3.rnd | head -c 5000001) /tmp/garage.3.rnd > /tmp/garage.test_multipart_reference
diff /tmp/garage.test_multipart /tmp/garage.test_multipart_reference >/tmp/garage.test_multipart_diff 2>&1
aws s3 rm "s3://eprouvette/copy_part_source"
aws s3 rm "s3://eprouvette/test_multipart"
rm /tmp/garage.multipart_struct
rm /tmp/garage.test_multipart
rm /tmp/garage.test_multipart_reference
rm /tmp/garage.test_multipart_diff
fi
rm /tmp/garage.{1..3}.{rnd,b64}
if [ -z "$SKIP_AWS" ]; then
echo "🪣 Test bucket logic "
AWS_ACCESS_KEY_ID=`cat /tmp/garage.s3 |cut -d' ' -f1`
@ -145,11 +321,25 @@ if [ -z "$SKIP_AWS" ]; then
[ $(aws s3 ls | wc -l) == 1 ]
fi
if [ -z "$SKIP_AWS" ]; then
echo "🧪 Website Testing"
echo "<h1>hello world</h1>" > /tmp/garage-index.html
aws s3 cp /tmp/garage-index.html s3://eprouvette/index.html
[ `curl -s -o /dev/null -w "%{http_code}" --header "Host: eprouvette.garage.tld" http://127.0.0.1:3921/ ` == 404 ]
garage -c /tmp/config.1.toml bucket website --allow eprouvette
[ `curl -s -o /dev/null -w "%{http_code}" --header "Host: eprouvette.garage.tld" http://127.0.0.1:3921/ ` == 200 ]
garage -c /tmp/config.1.toml bucket website --deny eprouvette
[ `curl -s -o /dev/null -w "%{http_code}" --header "Host: eprouvette.garage.tld" http://127.0.0.1:3921/ ` == 404 ]
aws s3 rm s3://eprouvette/index.html
rm /tmp/garage-index.html
fi
echo "🏁 Teardown"
AWS_ACCESS_KEY_ID=`cat /tmp/garage.s3 |cut -d' ' -f1`
AWS_SECRET_ACCESS_KEY=`cat /tmp/garage.s3 |cut -d' ' -f2`
garage -c /tmp/config.1.toml bucket deny --read --write eprouvette --key $AWS_ACCESS_KEY_ID
garage -c /tmp/config.1.toml bucket delete --yes eprouvette
garage -c /tmp/config.1.toml key delete --yes $AWS_ACCESS_KEY_ID
exec 3>&-
echo "✅ Success"

View File

@ -83,6 +83,7 @@ function refresh_toolchain {
pkgs.which
pkgs.openssl
pkgs.curl
pkgs.jq
] else [])
++
(if release then [

View File

@ -1,3 +1,4 @@
use std::cmp::{max, min};
use std::net::SocketAddr;
use std::sync::Arc;
@ -155,19 +156,24 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
.await
}
Endpoint::CopyObject { key, .. } => {
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
let (source_bucket, source_key) = parse_bucket_key(&copy_source, None)?;
let source_bucket_id =
resolve_bucket(&garage, &source_bucket.to_string(), &api_key).await?;
if !api_key.allow_read(&source_bucket_id) {
return Err(Error::Forbidden(format!(
"Reading from bucket {} not allowed for this key",
source_bucket
)));
}
let source_key = source_key.ok_or_bad_request("No source key specified")?;
handle_copy(garage, &req, bucket_id, &key, source_bucket_id, source_key).await
handle_copy(garage, &api_key, &req, bucket_id, &key).await
}
Endpoint::UploadPartCopy {
key,
part_number,
upload_id,
..
} => {
handle_upload_part_copy(
garage,
&api_key,
&req,
bucket_id,
&key,
part_number,
&upload_id,
)
.await
}
Endpoint::PutObject { key, .. } => {
handle_put(garage, req, bucket_id, &key, &api_key, content_sha256).await
@ -217,16 +223,18 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
handle_list(
garage,
&ListObjectsQuery {
common: ListQueryCommon {
bucket_name: bucket,
bucket_id,
delimiter: delimiter.map(|d| d.to_string()),
page_size: max_keys.map(|p| min(1000, max(1, p))).unwrap_or(1000),
prefix: prefix.unwrap_or_default(),
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
},
is_v2: false,
bucket_name: bucket,
bucket_id,
delimiter: delimiter.map(|d| d.to_string()),
max_keys: max_keys.unwrap_or(1000),
prefix: prefix.unwrap_or_default(),
marker,
continuation_token: None,
start_after: None,
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
},
)
.await
@ -246,16 +254,18 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
handle_list(
garage,
&ListObjectsQuery {
common: ListQueryCommon {
bucket_name: bucket,
bucket_id,
delimiter: delimiter.map(|d| d.to_string()),
page_size: max_keys.map(|p| min(1000, max(1, p))).unwrap_or(1000),
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
prefix: prefix.unwrap_or_default(),
},
is_v2: true,
bucket_name: bucket,
bucket_id,
delimiter: delimiter.map(|d| d.to_string()),
max_keys: max_keys.unwrap_or(1000),
prefix: prefix.unwrap_or_default(),
marker: None,
continuation_token,
start_after,
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
},
)
.await
@ -266,6 +276,32 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
)))
}
}
Endpoint::ListMultipartUploads {
bucket,
delimiter,
encoding_type,
key_marker,
max_uploads,
prefix,
upload_id_marker,
} => {
handle_list_multipart_upload(
garage,
&ListMultipartUploadsQuery {
common: ListQueryCommon {
bucket_name: bucket,
bucket_id,
delimiter: delimiter.map(|d| d.to_string()),
page_size: max_uploads.map(|p| min(1000, max(1, p))).unwrap_or(1000),
prefix: prefix.unwrap_or_default(),
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
},
key_marker,
upload_id_marker,
},
)
.await
}
Endpoint::DeleteObjects { .. } => {
handle_delete_objects(garage, bucket_id, req, content_sha256).await
}
@ -290,7 +326,7 @@ async fn handle_request_without_bucket(
}
#[allow(clippy::ptr_arg)]
async fn resolve_bucket(
pub async fn resolve_bucket(
garage: &Garage,
bucket_name: &String,
api_key: &Key,
@ -316,7 +352,7 @@ async fn resolve_bucket(
///
/// S3 internally manages only buckets and keys. This function splits
/// an HTTP path to get the corresponding bucket name and key.
fn parse_bucket_key<'a>(
pub fn parse_bucket_key<'a>(
path: &'a str,
host_bucket: Option<&'a str>,
) -> Result<(&'a str, Option<&'a str>), Error> {

View File

@ -54,6 +54,10 @@ pub enum Error {
#[error(display = "Tried to delete a non-empty bucket")]
BucketNotEmpty,
/// Precondition failed (e.g. x-amz-copy-source-if-match)
#[error(display = "At least one of the preconditions you specified did not hold")]
PreconditionFailed,
// Category: bad request
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
#[error(display = "Invalid UTF-8: {}", _0)]
@ -115,6 +119,7 @@ impl Error {
match self {
Error::NoSuchKey | Error::NoSuchBucket | Error::NoSuchUpload => StatusCode::NOT_FOUND,
Error::BucketNotEmpty | Error::BucketAlreadyExists => StatusCode::CONFLICT,
Error::PreconditionFailed => StatusCode::PRECONDITION_FAILED,
Error::Forbidden(_) => StatusCode::FORBIDDEN,
Error::InternalError(
GarageError::Timeout
@ -137,6 +142,7 @@ impl Error {
Error::NoSuchUpload => "NoSuchUpload",
Error::BucketAlreadyExists => "BucketAlreadyExists",
Error::BucketNotEmpty => "BucketNotEmpty",
Error::PreconditionFailed => "PreconditionFailed",
Error::Forbidden(_) => "AccessDenied",
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
Error::NotImplemented(_) => "NotImplemented",

View File

@ -7,6 +7,7 @@ use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::Bucket;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
use garage_model::object_table::ObjectFilter;
use garage_model::permission::BucketKeyPerm;
use garage_table::util::*;
use garage_util::crdt::*;
@ -229,7 +230,7 @@ pub async fn handle_delete_bucket(
// Check bucket is empty
let objects = garage
.object_table
.get_range(&bucket_id, None, Some(DeletedFilter::NotDeleted), 10)
.get_range(&bucket_id, None, Some(ObjectFilter::IsData), 10)
.await?;
if !objects.is_empty() {
return Err(Error::BucketNotEmpty);

View File

@ -1,6 +1,11 @@
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::TryFutureExt;
use md5::{Digest as Md5Digest, Md5};
use hyper::{Body, Request, Response};
use serde::Serialize;
use garage_table::*;
use garage_util::data::*;
@ -8,63 +13,50 @@ use garage_util::time::*;
use garage_model::block_ref_table::*;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
use garage_model::object_table::*;
use garage_model::version_table::*;
use crate::api_server::{parse_bucket_key, resolve_bucket};
use crate::error::*;
use crate::s3_put::get_headers;
use crate::s3_xml;
use crate::s3_put::{decode_upload_id, get_headers};
use crate::s3_xml::{self, xmlns_tag};
pub async fn handle_copy(
garage: Arc<Garage>,
api_key: &Key,
req: &Request<Body>,
dest_bucket_id: Uuid,
dest_key: &str,
source_bucket_id: Uuid,
source_key: &str,
) -> Result<Response<Body>, Error> {
let source_object = garage
.object_table
.get(&source_bucket_id, &source_key.to_string())
.await?
.ok_or(Error::NoSuchKey)?;
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
let source_last_v = source_object
.versions()
.iter()
.rev()
.find(|v| v.is_complete())
.ok_or(Error::NoSuchKey)?;
let source_object = get_copy_source(&garage, api_key, req).await?;
let source_last_state = match &source_last_v.state {
ObjectVersionState::Complete(x) => x,
_ => unreachable!(),
};
let (source_version, source_version_data, source_version_meta) =
extract_source_info(&source_object)?;
// Check precondition, e.g. x-amz-copy-source-if-match
copy_precondition.check(source_version, &source_version_meta.etag)?;
// Generate parameters for copied object
let new_uuid = gen_uuid();
let new_timestamp = now_msec();
// Implement x-amz-metadata-directive: REPLACE
let old_meta = match source_last_state {
ObjectVersionData::DeleteMarker => {
return Err(Error::NoSuchKey);
}
ObjectVersionData::Inline(meta, _bytes) => meta,
ObjectVersionData::FirstBlock(meta, _fbh) => meta,
};
let new_meta = match req.headers().get("x-amz-metadata-directive") {
Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta {
headers: get_headers(req)?,
size: old_meta.size,
etag: old_meta.etag.clone(),
size: source_version_meta.size,
etag: source_version_meta.etag.clone(),
},
_ => old_meta.clone(),
_ => source_version_meta.clone(),
};
let etag = new_meta.etag.to_string();
// Save object copy
match source_last_state {
match source_version_data {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_meta, bytes) => {
let dest_object_version = ObjectVersion {
@ -86,7 +78,7 @@ pub async fn handle_copy(
// Get block list from source version
let source_version = garage
.version_table
.get(&source_last_v.uuid, &EmptyKey)
.get(&source_version.uuid, &EmptyKey)
.await?;
let source_version = source_version.ok_or(Error::NoSuchKey)?;
@ -156,13 +148,468 @@ pub async fn handle_copy(
}
let last_modified = msec_to_rfc3339(new_timestamp);
let result = s3_xml::CopyObjectResult {
let result = CopyObjectResult {
last_modified: s3_xml::Value(last_modified),
etag: s3_xml::Value(etag),
etag: s3_xml::Value(format!("\"{}\"", etag)),
};
let xml = s3_xml::to_xml_with_header(&result)?;
Ok(Response::builder()
.header("Content-Type", "application/xml")
.header("x-amz-version-id", hex::encode(new_uuid))
.header(
"x-amz-copy-source-version-id",
hex::encode(source_version.uuid),
)
.body(Body::from(xml))?)
}
pub async fn handle_upload_part_copy(
garage: Arc<Garage>,
api_key: &Key,
req: &Request<Body>,
dest_bucket_id: Uuid,
dest_key: &str,
part_number: u64,
upload_id: &str,
) -> Result<Response<Body>, Error> {
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
let dest_version_uuid = decode_upload_id(upload_id)?;
let dest_key = dest_key.to_string();
let (source_object, dest_object) = futures::try_join!(
get_copy_source(&garage, api_key, req),
garage
.object_table
.get(&dest_bucket_id, &dest_key)
.map_err(Error::from),
)?;
let dest_object = dest_object.ok_or(Error::NoSuchKey)?;
let (source_object_version, source_version_data, source_version_meta) =
extract_source_info(&source_object)?;
// Check precondition on source, e.g. x-amz-copy-source-if-match
copy_precondition.check(source_object_version, &source_version_meta.etag)?;
// Check source range is valid
let source_range = match req.headers().get("x-amz-copy-source-range") {
Some(range) => {
let range_str = range.to_str()?;
let mut ranges = http_range::HttpRange::parse(range_str, source_version_meta.size)
.map_err(|e| (e, source_version_meta.size))?;
if ranges.len() != 1 {
return Err(Error::BadRequest(
"Invalid x-amz-copy-source-range header: exactly 1 range must be given".into(),
));
} else {
ranges.pop().unwrap()
}
}
None => http_range::HttpRange {
start: 0,
length: source_version_meta.size,
},
};
// Check destination version is indeed in uploading state
if !dest_object
.versions()
.iter()
.any(|v| v.uuid == dest_version_uuid && v.is_uploading())
{
return Err(Error::NoSuchUpload);
}
// Check source version is not inlined
match source_version_data {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_meta, _bytes) => {
// This is only for small files, we don't bother handling this.
// (in AWS UploadPartCopy works for parts at least 5MB which
// is never the case of an inline object)
return Err(Error::BadRequest(
"Source object is too small (minimum part size is 5Mb)".into(),
));
}
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => (),
};
// Fetch source versin with its block list,
// and destination version to check part hasn't yet been uploaded
let (source_version, dest_version) = futures::try_join!(
garage
.version_table
.get(&source_object_version.uuid, &EmptyKey),
garage.version_table.get(&dest_version_uuid, &EmptyKey),
)?;
let source_version = source_version.ok_or(Error::NoSuchKey)?;
// Check this part number hasn't yet been uploaded
if let Some(dv) = dest_version {
if dv.has_part_number(part_number) {
return Err(Error::BadRequest(format!(
"Part number {} has already been uploaded",
part_number
)));
}
}
// We want to reuse blocks from the source version as much as possible.
// However, we still need to get the data from these blocks
// because we need to know it to calculate the MD5sum of the part
// which is used as its ETag.
// First, calculate what blocks we want to keep,
// and the subrange of the block to take, if the bounds of the
// requested range are in the middle.
let (range_begin, range_end) = (source_range.start, source_range.start + source_range.length);
let mut blocks_to_copy = vec![];
let mut current_offset = 0;
let mut size_to_copy = 0;
for (_bk, block) in source_version.blocks.items().iter() {
let (block_begin, block_end) = (current_offset, current_offset + block.size);
if block_begin < range_end && block_end > range_begin {
let subrange_begin = if block_begin < range_begin {
Some(range_begin - block_begin)
} else {
None
};
let subrange_end = if block_end > range_end {
Some(range_end - block_begin)
} else {
None
};
let range_to_copy = match (subrange_begin, subrange_end) {
(Some(b), Some(e)) => Some(b as usize..e as usize),
(None, Some(e)) => Some(0..e as usize),
(Some(b), None) => Some(b as usize..block.size as usize),
(None, None) => None,
};
size_to_copy += range_to_copy
.as_ref()
.map(|x| x.len() as u64)
.unwrap_or(block.size);
blocks_to_copy.push((block.hash, range_to_copy));
}
current_offset = block_end;
}
if size_to_copy < 1024 * 1024 {
return Err(Error::BadRequest(format!(
"Not enough data to copy: {} bytes (minimum: 1MB)",
size_to_copy
)));
}
// Now, actually copy the blocks
let mut md5hasher = Md5::new();
let mut block = Some(
garage
.block_manager
.rpc_get_block(&blocks_to_copy[0].0)
.await?,
);
let mut current_offset = 0;
for (i, (block_hash, range_to_copy)) in blocks_to_copy.iter().enumerate() {
let (current_block, subrange_hash) = match range_to_copy.clone() {
Some(r) => {
let subrange = block.take().unwrap()[r].to_vec();
let hash = blake2sum(&subrange);
(subrange, hash)
}
None => (block.take().unwrap(), *block_hash),
};
md5hasher.update(&current_block[..]);
let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false);
version.blocks.put(
VersionBlockKey {
part_number,
offset: current_offset,
},
VersionBlock {
hash: subrange_hash,
size: current_block.len() as u64,
},
);
current_offset += current_block.len() as u64;
let block_ref = BlockRef {
block: subrange_hash,
version: dest_version_uuid,
deleted: false.into(),
};
let next_block_hash = blocks_to_copy.get(i + 1).map(|(h, _)| *h);
let garage2 = garage.clone();
let garage3 = garage.clone();
let is_subrange = range_to_copy.is_some();
let (_, _, _, next_block) = futures::try_join!(
// Thing 1: if we are taking a subrange of the source block,
// we need to insert that subrange as a new block.
async move {
if is_subrange {
garage2
.block_manager
.rpc_put_block(subrange_hash, current_block)
.await
} else {
Ok(())
}
},
// Thing 2: we need to insert the block in the version
garage.version_table.insert(&version),
// Thing 3: we need to add a block reference
garage.block_ref_table.insert(&block_ref),
// Thing 4: we need to prefetch the next block
async move {
match next_block_hash {
Some(h) => Ok(Some(garage3.block_manager.rpc_get_block(&h).await?)),
None => Ok(None),
}
},
)?;
block = next_block;
}
let data_md5sum = md5hasher.finalize();
let etag = hex::encode(data_md5sum);
// Put the part's ETag in the Versiontable
let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false);
version.parts_etags.put(part_number, etag.clone());
garage.version_table.insert(&version).await?;
// LGTM
let resp_xml = s3_xml::to_xml_with_header(&CopyPartResult {
xmlns: (),
etag: s3_xml::Value(format!("\"{}\"", etag)),
last_modified: s3_xml::Value(msec_to_rfc3339(source_object_version.timestamp)),
})?;
Ok(Response::builder()
.header("Content-Type", "application/xml")
.header(
"x-amz-copy-source-version-id",
hex::encode(source_object_version.uuid),
)
.body(Body::from(resp_xml))?)
}
async fn get_copy_source(
garage: &Garage,
api_key: &Key,
req: &Request<Body>,
) -> Result<Object, Error> {
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
let (source_bucket, source_key) = parse_bucket_key(&copy_source, None)?;
let source_bucket_id = resolve_bucket(garage, &source_bucket.to_string(), api_key).await?;
if !api_key.allow_read(&source_bucket_id) {
return Err(Error::Forbidden(format!(
"Reading from bucket {} not allowed for this key",
source_bucket
)));
}
let source_key = source_key.ok_or_bad_request("No source key specified")?;
let source_object = garage
.object_table
.get(&source_bucket_id, &source_key.to_string())
.await?
.ok_or(Error::NoSuchKey)?;
Ok(source_object)
}
fn extract_source_info(
source_object: &Object,
) -> Result<(&ObjectVersion, &ObjectVersionData, &ObjectVersionMeta), Error> {
let source_version = source_object
.versions()
.iter()
.rev()
.find(|v| v.is_complete())
.ok_or(Error::NoSuchKey)?;
let source_version_data = match &source_version.state {
ObjectVersionState::Complete(x) => x,
_ => unreachable!(),
};
let source_version_meta = match source_version_data {
ObjectVersionData::DeleteMarker => {
return Err(Error::NoSuchKey);
}
ObjectVersionData::Inline(meta, _bytes) => meta,
ObjectVersionData::FirstBlock(meta, _fbh) => meta,
};
Ok((source_version, source_version_data, source_version_meta))
}
struct CopyPreconditionHeaders {
copy_source_if_match: Option<Vec<String>>,
copy_source_if_modified_since: Option<SystemTime>,
copy_source_if_none_match: Option<Vec<String>>,
copy_source_if_unmodified_since: Option<SystemTime>,
}
impl CopyPreconditionHeaders {
fn parse(req: &Request<Body>) -> Result<Self, Error> {
Ok(Self {
copy_source_if_match: req
.headers()
.get("x-amz-copy-source-if-match")
.map(|x| x.to_str())
.transpose()?
.map(|x| {
x.split(',')
.map(|m| m.trim().trim_matches('"').to_string())
.collect::<Vec<_>>()
}),
copy_source_if_modified_since: req
.headers()
.get("x-amz-copy-source-if-modified-since")
.map(|x| x.to_str())
.transpose()?
.map(|x| httpdate::parse_http_date(x))
.transpose()
.ok_or_bad_request("Invalid date in x-amz-copy-source-if-modified-since")?,
copy_source_if_none_match: req
.headers()
.get("x-amz-copy-source-if-none-match")
.map(|x| x.to_str())
.transpose()?
.map(|x| {
x.split(',')
.map(|m| m.trim().trim_matches('"').to_string())
.collect::<Vec<_>>()
}),
copy_source_if_unmodified_since: req
.headers()
.get("x-amz-copy-source-if-unmodified-since")
.map(|x| x.to_str())
.transpose()?
.map(|x| httpdate::parse_http_date(x))
.transpose()
.ok_or_bad_request("Invalid date in x-amz-copy-source-if-unmodified-since")?,
})
}
fn check(&self, v: &ObjectVersion, etag: &str) -> Result<(), Error> {
let v_date = UNIX_EPOCH + Duration::from_millis(v.timestamp);
let ok = match (
&self.copy_source_if_match,
&self.copy_source_if_unmodified_since,
&self.copy_source_if_none_match,
&self.copy_source_if_modified_since,
) {
// TODO I'm not sure all of the conditions are evaluated correctly here
// If we have both if-match and if-unmodified-since,
// basically we don't care about if-unmodified-since,
// because in the spec it says that if if-match evaluates to
// true but if-unmodified-since evaluates to false,
// the copy is still done.
(Some(im), _, None, None) => im.iter().any(|x| x == etag || x == "*"),
(None, Some(ius), None, None) => v_date <= *ius,
// If we have both if-none-match and if-modified-since,
// then both of the two conditions must evaluate to true
(None, None, Some(inm), Some(ims)) => {
!inm.iter().any(|x| x == etag || x == "*") && v_date > *ims
}
(None, None, Some(inm), None) => !inm.iter().any(|x| x == etag || x == "*"),
(None, None, None, Some(ims)) => v_date > *ims,
(None, None, None, None) => true,
_ => {
return Err(Error::BadRequest(
"Invalid combination of x-amz-copy-source-if-xxxxx headers".into(),
))
}
};
if ok {
Ok(())
} else {
Err(Error::PreconditionFailed)
}
}
}
#[derive(Debug, Serialize, PartialEq)]
pub struct CopyObjectResult {
#[serde(rename = "LastModified")]
pub last_modified: s3_xml::Value,
#[serde(rename = "ETag")]
pub etag: s3_xml::Value,
}
#[derive(Debug, Serialize, PartialEq)]
pub struct CopyPartResult {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
#[serde(rename = "LastModified")]
pub last_modified: s3_xml::Value,
#[serde(rename = "ETag")]
pub etag: s3_xml::Value,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::s3_xml::to_xml_with_header;
#[test]
fn copy_object_result() -> Result<(), Error> {
let copy_result = CopyObjectResult {
last_modified: s3_xml::Value(msec_to_rfc3339(0)),
etag: s3_xml::Value("\"9b2cf535f27731c974343645a3985328\"".to_string()),
};
assert_eq!(
to_xml_with_header(&copy_result)?,
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<CopyObjectResult>\
<LastModified>1970-01-01T00:00:00.000Z</LastModified>\
<ETag>&quot;9b2cf535f27731c974343645a3985328&quot;</ETag>\
</CopyObjectResult>\
"
);
Ok(())
}
#[test]
fn serialize_copy_part_result() -> Result<(), Error> {
let expected_retval = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<CopyPartResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
<LastModified>2011-04-11T20:34:56.000Z</LastModified>\
<ETag>&quot;9b2cf535f27731c974343645a3985328&quot;</ETag>\
</CopyPartResult>";
let v = CopyPartResult {
xmlns: (),
last_modified: s3_xml::Value("2011-04-11T20:34:56.000Z".into()),
etag: s3_xml::Value("\"9b2cf535f27731c974343645a3985328\"".into()),
};
println!("{}", to_xml_with_header(&v)?);
assert_eq!(to_xml_with_header(&v)?, expected_retval);
Ok(())
}
}

File diff suppressed because it is too large Load Diff

View File

@ -425,17 +425,21 @@ pub async fn handle_put_part(
let body = req.into_body().map_err(Error::from);
let mut chunker = StreamChunker::new(body, garage.config.block_size);
let (object, first_block) = futures::try_join!(
let (object, version, first_block) = futures::try_join!(
garage
.object_table
.get(&bucket_id, &key)
.map_err(Error::from),
garage
.version_table
.get(&version_uuid, &EmptyKey)
.map_err(Error::from),
chunker.next(),
)?;
// Check object is valid and multipart block can be accepted
let first_block = first_block.ok_or_else(|| Error::BadRequest("Empty body".to_string()))?;
let object = object.ok_or_else(|| Error::BadRequest("Object not found".to_string()))?;
let first_block = first_block.ok_or_bad_request("Empty body")?;
let object = object.ok_or_bad_request("Object not found")?;
if !object
.versions()
@ -445,6 +449,16 @@ pub async fn handle_put_part(
return Err(Error::NoSuchUpload);
}
// Check part hasn't already been uploaded
if let Some(v) = version {
if v.has_part_number(part_number) {
return Err(Error::BadRequest(format!(
"Part number {} has already been uploaded",
part_number
)));
}
}
// Copy block to store
let version = Version::new(version_uuid, bucket_id, key, false);
let first_block_hash = blake2sum(&first_block[..]);
@ -582,7 +596,7 @@ pub async fn handle_complete_multipart_upload(
location: None,
bucket: s3_xml::Value(bucket_name.to_string()),
key: s3_xml::Value(key),
etag: s3_xml::Value(etag),
etag: s3_xml::Value(format!("\"{}\"", etag)),
};
let xml = s3_xml::to_xml_with_header(&result)?;
@ -673,7 +687,7 @@ pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, E
})
}
fn decode_upload_id(id: &str) -> Result<Uuid, Error> {
pub fn decode_upload_id(id: &str) -> Result<Uuid, Error> {
let id_bin = hex::decode(id).map_err(|_| Error::NoSuchUpload)?;
if id_bin.len() != 32 {
return Err(Error::NoSuchUpload);

View File

@ -350,7 +350,7 @@ pub enum Endpoint {
delimiter: Option<char>,
encoding_type: Option<String>,
key_marker: Option<String>,
max_uploads: Option<u64>,
max_uploads: Option<usize>,
prefix: Option<String>,
upload_id_marker: Option<String>,
},

View File

@ -107,14 +107,6 @@ pub struct DeleteResult {
pub errors: Vec<DeleteError>,
}
#[derive(Debug, Serialize, PartialEq)]
pub struct CopyObjectResult {
#[serde(rename = "LastModified")]
pub last_modified: Value,
#[serde(rename = "ETag")]
pub etag: Value,
}
#[derive(Debug, Serialize, PartialEq)]
pub struct InitiateMultipartUploadResult {
#[serde(serialize_with = "xmlns_tag")]
@ -141,6 +133,60 @@ pub struct CompleteMultipartUploadResult {
pub etag: Value,
}
#[derive(Debug, Serialize, PartialEq)]
pub struct Initiator {
#[serde(rename = "DisplayName")]
pub display_name: Value,
#[serde(rename = "ID")]
pub id: Value,
}
#[derive(Debug, Serialize, PartialEq)]
pub struct ListMultipartItem {
#[serde(rename = "Initiated")]
pub initiated: Value,
#[serde(rename = "Initiator")]
pub initiator: Initiator,
#[serde(rename = "Key")]
pub key: Value,
#[serde(rename = "UploadId")]
pub upload_id: Value,
#[serde(rename = "Owner")]
pub owner: Owner,
#[serde(rename = "StorageClass")]
pub storage_class: Value,
}
#[derive(Debug, Serialize, PartialEq)]
pub struct ListMultipartUploadsResult {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
#[serde(rename = "Bucket")]
pub bucket: Value,
#[serde(rename = "KeyMarker")]
pub key_marker: Option<Value>,
#[serde(rename = "UploadIdMarker")]
pub upload_id_marker: Option<Value>,
#[serde(rename = "NextKeyMarker")]
pub next_key_marker: Option<Value>,
#[serde(rename = "NextUploadIdMarker")]
pub next_upload_id_marker: Option<Value>,
#[serde(rename = "Prefix")]
pub prefix: Value,
#[serde(rename = "Delimiter")]
pub delimiter: Option<Value>,
#[serde(rename = "MaxUploads")]
pub max_uploads: IntValue,
#[serde(rename = "IsTruncated")]
pub is_truncated: Value,
#[serde(rename = "Upload")]
pub upload: Vec<ListMultipartItem>,
#[serde(rename = "CommonPrefixes")]
pub common_prefixes: Vec<CommonPrefix>,
#[serde(rename = "EncodingType")]
pub encoding_type: Option<Value>,
}
#[derive(Debug, Serialize, PartialEq)]
pub struct ListBucketItem {
#[serde(rename = "Key")]
@ -372,24 +418,6 @@ mod tests {
Ok(())
}
#[test]
fn copy_object_result() -> Result<(), ApiError> {
let copy_result = CopyObjectResult {
last_modified: Value(msec_to_rfc3339(0)),
etag: Value("9b2cf535f27731c974343645a3985328".to_string()),
};
assert_eq!(
to_xml_with_header(&copy_result)?,
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<CopyObjectResult>\
<LastModified>1970-01-01T00:00:00.000Z</LastModified>\
<ETag>9b2cf535f27731c974343645a3985328</ETag>\
</CopyObjectResult>\
"
);
Ok(())
}
#[test]
fn initiate_multipart_upload_result() -> Result<(), ApiError> {
let result = InitiateMultipartUploadResult {
@ -417,7 +445,7 @@ mod tests {
location: Some(Value("https://garage.tld/mybucket/a/plop".to_string())),
bucket: Value("mybucket".to_string()),
key: Value("a/plop".to_string()),
etag: Value("3858f62230ac3c915f300c664312c11f-9".to_string()),
etag: Value("\"3858f62230ac3c915f300c664312c11f-9\"".to_string()),
};
assert_eq!(
to_xml_with_header(&result)?,
@ -426,12 +454,64 @@ mod tests {
<Location>https://garage.tld/mybucket/a/plop</Location>\
<Bucket>mybucket</Bucket>\
<Key>a/plop</Key>\
<ETag>3858f62230ac3c915f300c664312c11f-9</ETag>\
<ETag>&quot;3858f62230ac3c915f300c664312c11f-9&quot;</ETag>\
</CompleteMultipartUploadResult>"
);
Ok(())
}
#[test]
fn list_multipart_uploads_result() -> Result<(), ApiError> {
let result = ListMultipartUploadsResult {
xmlns: (),
bucket: Value("example-bucket".to_string()),
key_marker: None,
next_key_marker: None,
upload_id_marker: None,
encoding_type: None,
next_upload_id_marker: None,
upload: vec![],
delimiter: Some(Value("/".to_string())),
prefix: Value("photos/2006/".to_string()),
max_uploads: IntValue(1000),
is_truncated: Value("false".to_string()),
common_prefixes: vec![
CommonPrefix {
prefix: Value("photos/2006/February/".to_string()),
},
CommonPrefix {
prefix: Value("photos/2006/January/".to_string()),
},
CommonPrefix {
prefix: Value("photos/2006/March/".to_string()),
},
],
};
assert_eq!(
to_xml_with_header(&result)?,
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<ListMultipartUploadsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
<Bucket>example-bucket</Bucket>\
<Prefix>photos/2006/</Prefix>\
<Delimiter>/</Delimiter>\
<MaxUploads>1000</MaxUploads>\
<IsTruncated>false</IsTruncated>\
<CommonPrefixes>\
<Prefix>photos/2006/February/</Prefix>\
</CommonPrefixes>\
<CommonPrefixes>\
<Prefix>photos/2006/January/</Prefix>\
</CommonPrefixes>\
<CommonPrefixes>\
<Prefix>photos/2006/March/</Prefix>\
</CommonPrefixes>\
</ListMultipartUploadsResult>"
);
Ok(())
}
#[test]
fn list_objects_v1_1() -> Result<(), ApiError> {
let result = ListBucketResult {
@ -451,7 +531,7 @@ mod tests {
contents: vec![ListBucketItem {
key: Value("sample.jpg".to_string()),
last_modified: Value(msec_to_rfc3339(0)),
etag: Value("bf1d737a4d46a19f3bced6905cc8b902".to_string()),
etag: Value("\"bf1d737a4d46a19f3bced6905cc8b902\"".to_string()),
size: IntValue(142863),
storage_class: Value("STANDARD".to_string()),
}],
@ -472,7 +552,7 @@ mod tests {
<Contents>\
<Key>sample.jpg</Key>\
<LastModified>1970-01-01T00:00:00.000Z</LastModified>\
<ETag>bf1d737a4d46a19f3bced6905cc8b902</ETag>\
<ETag>&quot;bf1d737a4d46a19f3bced6905cc8b902&quot;</ETag>\
<Size>142863</Size>\
<StorageClass>STANDARD</StorageClass>\
</Contents>\
@ -550,7 +630,7 @@ mod tests {
contents: vec![ListBucketItem {
key: Value("ExampleObject.txt".to_string()),
last_modified: Value(msec_to_rfc3339(0)),
etag: Value("599bab3ed2c697f1d26842727561fd94".to_string()),
etag: Value("\"599bab3ed2c697f1d26842727561fd94\"".to_string()),
size: IntValue(857),
storage_class: Value("REDUCED_REDUNDANCY".to_string()),
}],
@ -568,7 +648,7 @@ mod tests {
<Contents>\
<Key>ExampleObject.txt</Key>\
<LastModified>1970-01-01T00:00:00.000Z</LastModified>\
<ETag>599bab3ed2c697f1d26842727561fd94</ETag>\
<ETag>&quot;599bab3ed2c697f1d26842727561fd94&quot;</ETag>\
<Size>857</Size>\
<StorageClass>REDUCED_REDUNDANCY</StorageClass>\
</Contents>\
@ -598,7 +678,7 @@ mod tests {
contents: vec![ListBucketItem {
key: Value("happyfacex.jpg".to_string()),
last_modified: Value(msec_to_rfc3339(0)),
etag: Value("70ee1738b6b21e2c8a43f3a5ab0eee71".to_string()),
etag: Value("\"70ee1738b6b21e2c8a43f3a5ab0eee71\"".to_string()),
size: IntValue(1111),
storage_class: Value("STANDARD".to_string()),
}],
@ -618,7 +698,7 @@ mod tests {
<Contents>\
<Key>happyfacex.jpg</Key>\
<LastModified>1970-01-01T00:00:00.000Z</LastModified>\
<ETag>70ee1738b6b21e2c8a43f3a5ab0eee71</ETag>\
<ETag>&quot;70ee1738b6b21e2c8a43f3a5ab0eee71&quot;</ETag>\
<Size>1111</Size>\
<StorageClass>STANDARD</StorageClass>\
</Contents>\

View File

@ -245,10 +245,13 @@ fn canonical_header_string(headers: &HashMap<String, String>, signed_headers: &s
let mut items = headers
.iter()
.filter(|(key, _)| signed_headers_vec.contains(&key.as_str()))
.map(|(key, value)| key.to_lowercase() + ":" + value.trim())
.collect::<Vec<_>>();
items.sort();
items.join("\n")
items.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
items
.iter()
.map(|(key, value)| key.to_lowercase() + ":" + value.trim())
.collect::<Vec<_>>()
.join("\n")
}
fn canonical_query_string(uri: &hyper::Uri) -> String {

View File

@ -233,10 +233,6 @@ where
return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e))))
}
None => {
if this.buf.is_empty() {
return Poll::Ready(None);
}
return Poll::Ready(Some(Err(SignedPayloadStreamError::message(
"Unexpected EOF",
))));

View File

@ -21,6 +21,7 @@ use garage_model::garage::Garage;
use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*;
use garage_model::migrate::Migrate;
use garage_model::object_table::ObjectFilter;
use garage_model::permission::*;
use crate::cli::*;
@ -209,7 +210,7 @@ impl AdminRpcHandler {
let objects = self
.garage
.object_table
.get_range(&bucket_id, None, Some(DeletedFilter::NotDeleted), 10)
.get_range(&bucket_id, None, Some(ObjectFilter::IsData), 10)
.await?;
if !objects.is_empty() {
return Err(Error::BadRequest(format!(

View File

@ -218,13 +218,19 @@ pub struct ObjectTable {
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum ObjectFilter {
IsData,
IsUploading,
}
impl TableSchema for ObjectTable {
const TABLE_NAME: &'static str = "object";
type P = Uuid;
type S = String;
type E = Object;
type Filter = DeletedFilter;
type Filter = ObjectFilter;
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let version_table = self.version_table.clone();
@ -254,8 +260,10 @@ impl TableSchema for ObjectTable {
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
let deleted = !entry.versions.iter().any(|v| v.is_data());
filter.apply(deleted)
match filter {
ObjectFilter::IsData => entry.versions.iter().any(|v| v.is_data()),
ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()),
}
}
fn try_migrate(bytes: &[u8]) -> Option<Self::E> {

View File

@ -47,6 +47,20 @@ impl Version {
key,
}
}
pub fn has_part_number(&self, part_number: u64) -> bool {
let case1 = self
.parts_etags
.items()
.binary_search_by(|(k, _)| k.cmp(&part_number))
.is_ok();
let case2 = self
.blocks
.items()
.binary_search_by(|(k, _)| k.part_number.cmp(&part_number))
.is_ok();
case1 || case2
}
}
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]