Merge branch 'main' into next-0.10

This commit is contained in:
Alex 2024-03-04 15:56:10 +01:00
commit bbde9bc912
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
54 changed files with 6525 additions and 763 deletions

24
Cargo.lock generated
View file

@ -120,6 +120,18 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
[[package]]
name = "argon2"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c3610892ee6e0cbce8ae2700349fcf8f98adb0dbfbee85aec3c9179d29cc072"
dependencies = [
"base64ct",
"blake2",
"cpufeatures",
"password-hash",
]
[[package]]
name = "arrayvec"
version = "0.5.2"
@ -1321,6 +1333,7 @@ dependencies = [
name = "garage_api"
version = "0.10.0"
dependencies = [
"argon2",
"async-trait",
"base64 0.21.7",
"bytes",
@ -2799,6 +2812,17 @@ dependencies = [
"regex",
]
[[package]]
name = "password-hash"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "346f04948ba92c43e8469c1ee6736c7563d71012b17d40745260fe106aac2166"
dependencies = [
"base64ct",
"rand_core",
"subtle",
]
[[package]]
name = "paste"
version = "1.0.14"

View file

@ -34,7 +34,7 @@ args@{
ignoreLockHash,
}:
let
nixifiedLockHash = "78a919892c20922859f8146234cfb36542303861f757e6ebb11d010965285f04";
nixifiedLockHash = "263873397c8aa960f9ef6a815187218ab9c58b5ab35bbeb9c3dc70d032dcc963";
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
lockHashIgnored = if ignoreLockHash
@ -235,6 +235,25 @@ in
src = fetchCratesIo { inherit name version; sha256 = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"; };
});
"registry+https://github.com/rust-lang/crates.io-index".argon2."0.5.3" = overridableMkRustCrate (profileName: rec {
name = "argon2";
version = "0.5.3";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "3c3610892ee6e0cbce8ae2700349fcf8f98adb0dbfbee85aec3c9179d29cc072"; };
features = builtins.concatLists [
[ "alloc" ]
[ "default" ]
[ "password-hash" ]
[ "rand" ]
];
dependencies = {
base64ct = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64ct."1.6.0" { inherit profileName; }).out;
blake2 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".blake2."0.10.6" { inherit profileName; }).out;
${ if hostPlatform.parsed.cpu.name == "i686" || hostPlatform.parsed.cpu.name == "x86_64" then "cpufeatures" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".cpufeatures."0.2.12" { inherit profileName; }).out;
password_hash = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".password-hash."0.5.0" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".arrayvec."0.5.2" = overridableMkRustCrate (profileName: rec {
name = "arrayvec";
version = "0.5.2";
@ -1939,6 +1958,7 @@ in
(lib.optional (rootFeatures' ? "garage/default" || rootFeatures' ? "garage/metrics" || rootFeatures' ? "garage_api/metrics" || rootFeatures' ? "garage_api/prometheus") "prometheus")
];
dependencies = {
argon2 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".argon2."0.5.3" { inherit profileName; }).out;
async_trait = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.77" { profileName = "__noProfile"; }).out;
base64 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64."0.21.7" { inherit profileName; }).out;
bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.5.0" { inherit profileName; }).out;
@ -3989,6 +4009,23 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".password-hash."0.5.0" = overridableMkRustCrate (profileName: rec {
name = "password-hash";
version = "0.5.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "346f04948ba92c43e8469c1ee6736c7563d71012b17d40745260fe106aac2166"; };
features = builtins.concatLists [
[ "alloc" ]
[ "default" ]
[ "rand_core" ]
];
dependencies = {
base64ct = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64ct."1.6.0" { inherit profileName; }).out;
rand_core = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.6.4" { inherit profileName; }).out;
subtle = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".subtle."2.5.0" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".paste."1.0.14" = overridableMkRustCrate (profileName: rec {
name = "paste";
version = "1.0.14";

View file

@ -34,6 +34,7 @@ k2v-client = { version = "0.0.4", path = "src/k2v-client" }
# External crates from crates.io
arc-swap = "1.0"
argon2 = "0.5"
async-trait = "0.1.7"
backtrace = "0.3"
base64 = "0.21"

View file

@ -55,8 +55,8 @@ Create your key and bucket:
```bash
garage key create my-key
garage bucket create backup
garage bucket allow backup --read --write --key my-key
garage bucket create backups
garage bucket allow backups --read --write --key my-key
```
Then register your Key ID and Secret key in your environment:

View file

@ -85,14 +85,14 @@ to store 2 TB of data in total.
## Get a Docker image
Our docker image is currently named `dxflrs/garage` and is stored on the [Docker Hub](https://hub.docker.com/r/dxflrs/garage/tags?page=1&ordering=last_updated).
We encourage you to use a fixed tag (eg. `v0.9.1`) and not the `latest` tag.
For this example, we will use the latest published version at the time of the writing which is `v0.9.1` but it's up to you
We encourage you to use a fixed tag (eg. `v0.9.3`) and not the `latest` tag.
For this example, we will use the latest published version at the time of the writing which is `v0.9.3` but it's up to you
to check [the most recent versions on the Docker Hub](https://hub.docker.com/r/dxflrs/garage/tags?page=1&ordering=last_updated).
For example:
```
sudo docker pull dxflrs/garage:v0.9.1
sudo docker pull dxflrs/garage:v0.9.3
```
## Deploying and configuring Garage
@ -157,7 +157,7 @@ docker run \
-v /etc/garage.toml:/etc/garage.toml \
-v /var/lib/garage/meta:/var/lib/garage/meta \
-v /var/lib/garage/data:/var/lib/garage/data \
dxflrs/garage:v0.9.1
dxflrs/garage:v0.9.3
```
With this command line, Garage should be started automatically at each boot.
@ -171,7 +171,7 @@ If you want to use `docker-compose`, you may use the following `docker-compose.y
version: "3"
services:
garage:
image: dxflrs/garage:v0.9.1
image: dxflrs/garage:v0.9.3
network_mode: "host"
restart: unless-stopped
volumes:

View file

@ -79,8 +79,9 @@ index = "index.html"
api_bind_addr = "[::]:3904"
[admin]
api_bind_addr = "0.0.0.0:3903"
api_bind_addr = "[::]:3903"
admin_token = "$(openssl rand -base64 32)"
metrics_token = "$(openssl rand -base64 32)"
EOF
```

View file

@ -69,8 +69,8 @@ root_domain = ".web.garage"
[admin]
api_bind_addr = "0.0.0.0:3903"
metrics_token = "cacce0b2de4bc2d9f5b5fdff551e01ac1496055aed248202d415398987e35f81"
admin_token = "ae8cb40ea7368bbdbb6430af11cca7da833d3458a5f52086f4e805a570fb5c2a"
metrics_token = "BCAdFjoa9G0KJR0WXnHHm7fs1ZAbfpI8iIZ+Z/a2NgI="
admin_token = "UkLeGWEvHnXBqnueR3ISEMWpOnm40jH2tM2HnnL/0F4="
trace_sink = "http://localhost:4317"
```
@ -417,7 +417,7 @@ the node, even in the case of a NAT: the NAT should be configured to forward the
port number to the same internal port nubmer. This means that if you have several nodes running
behind a NAT, they should each use a different RPC port number.
#### `rpc_bind_outgoing` {#rpc_bind_outgoing} (since v0.9.2)
#### `rpc_bind_outgoing`(since v0.9.2) {#rpc_bind_outgoing}
If enabled, pre-bind all sockets for outgoing connections to the same IP address
used for listening (the IP address specified in `rpc_bind_addr`) before
@ -612,7 +612,7 @@ the socket will have 0220 mode. Make sure to set user and group permissions acco
The token for accessing the Metrics endpoint. If this token is not set, the
Metrics endpoint can be accessed without access control.
You can use any random string for this value. We recommend generating a random token with `openssl rand -hex 32`.
You can use any random string for this value. We recommend generating a random token with `openssl rand -base64 32`.
`metrics_token` was introduced in Garage `v0.7.2`.
`metrics_token_file` and the `GARAGE_METRICS_TOKEN` environment variable are supported since Garage `v0.8.2`.
@ -624,7 +624,7 @@ You can use any random string for this value. We recommend generating a random t
The token for accessing all of the other administration endpoints. If this
token is not set, access to these endpoints is disabled entirely.
You can use any random string for this value. We recommend generating a random token with `openssl rand -hex 32`.
You can use any random string for this value. We recommend generating a random token with `openssl rand -base64 32`.
`admin_token` was introduced in Garage `v0.7.2`.
`admin_token_file` and the `GARAGE_ADMIN_TOKEN` environment variable are supported since Garage `v0.8.2`.

View file

@ -37,6 +37,21 @@ A Garage cluster can very easily evolve over time, as storage nodes are added or
Garage will automatically rebalance data between nodes as needed to ensure the desired number of copies.
Read about cluster layout management [here](@/documentation/operations/layout.md).
### Several replication modes
Garage supports a variety of replication modes, with 1 copy, 2 copies or 3 copies of your data,
and with various levels of consistency, in order to adapt to a variety of usage scenarios.
Read our reference page on [supported replication modes](@/documentation/reference-manual/configuration.md#replication_mode)
to select the replication mode best suited to your use case (hint: in most cases, `replication_mode = "3"` is what you want).
### Compression and deduplication
All data stored in Garage is deduplicated, and optionnally compressed using
Zstd. Objects uploaded to Garage are chunked in blocks of constant sizes (see
[`block_size`](@/documentation/reference-manual/configuration.md#block_size)),
and the hashes of individual blocks are used to dispatch them to storage nodes
and to deduplicate them.
### No RAFT slowing you down
It might seem strange to tout the absence of something as a desirable feature,
@ -48,13 +63,6 @@ As a consequence, requests can be handled much faster, even in cases where laten
between cluster nodes is important (see our [benchmarks](@/documentation/design/benchmarks/index.md) for data on this).
This is particularly usefull when nodes are far from one another and talk to one other through standard Internet connections.
### Several replication modes
Garage supports a variety of replication modes, with 1 copy, 2 copies or 3 copies of your data,
and with various levels of consistency, in order to adapt to a variety of usage scenarios.
Read our reference page on [supported replication modes](@/documentation/reference-manual/configuration.md#replication_mode)
to select the replication mode best suited to your use case (hint: in most cases, `replication_mode = "3"` is what you want).
### Web server for static websites
A storage bucket can easily be configured to be served directly by Garage as a static web site.

View file

@ -0,0 +1,17 @@
*
!*.txt
!*.md
!assets
!.gitignore
!*.svg
!*.png
!*.jpg
!*.tex
!Makefile
!.gitignore
!assets/*.drawio.pdf
!talk.pdf

View file

@ -0,0 +1,10 @@
ASSETS=../assets/logos/deuxfleurs.pdf
talk.pdf: talk.tex $(ASSETS)
pdflatex talk.tex
%.pdf: %.svg
inkscape -D -z --file=$^ --export-pdf=$@
%.pdf_tex: %.svg
inkscape -D -z --file=$^ --export-pdf=$@ --export-latex

Binary file not shown.

View file

@ -0,0 +1,543 @@
\nonstopmode
\documentclass[aspectratio=169,xcolor={svgnames}]{beamer}
\usepackage[utf8]{inputenc}
% \usepackage[frenchb]{babel}
\usepackage{amsmath}
\usepackage{mathtools}
\usepackage{breqn}
\usepackage{multirow}
\usetheme{boxes}
\usepackage{graphicx}
\usepackage{import}
\usepackage{adjustbox}
\usepackage[absolute,overlay]{textpos}
%\useoutertheme[footline=authortitle,subsection=false]{miniframes}
%\useoutertheme[footline=authorinstitute,subsection=false]{miniframes}
\useoutertheme{infolines}
\setbeamertemplate{headline}{}
\beamertemplatenavigationsymbolsempty
\definecolor{TitleOrange}{RGB}{255,137,0}
\setbeamercolor{title}{fg=TitleOrange}
\setbeamercolor{frametitle}{fg=TitleOrange}
\definecolor{ListOrange}{RGB}{255,145,5}
\setbeamertemplate{itemize item}{\color{ListOrange}$\blacktriangleright$}
\definecolor{verygrey}{RGB}{70,70,70}
\setbeamercolor{normal text}{fg=verygrey}
\usepackage{tabu}
\usepackage{multicol}
\usepackage{vwcol}
\usepackage{stmaryrd}
\usepackage{graphicx}
\usepackage[normalem]{ulem}
\AtBeginSection[]{
\begin{frame}
\vfill
\centering
\begin{beamercolorbox}[sep=8pt,center,shadow=true,rounded=true]{title}
\usebeamerfont{title}\insertsectionhead\par%
\end{beamercolorbox}
\vfill
\end{frame}
}
\title{Garage}
\author{Alex Auvolat, Deuxfleurs}
\date{Capitoul, 2024-02-29}
\begin{document}
\begin{frame}
\centering
\includegraphics[width=.3\linewidth]{../../sticker/Garage.png}
\vspace{1em}
{\large\bf Alex Auvolat, Deuxfleurs Association}
\vspace{1em}
\url{https://garagehq.deuxfleurs.fr/}
Matrix channel: \texttt{\#garage:deuxfleurs.fr}
\end{frame}
\begin{frame}
\frametitle{Who I am}
\begin{columns}[t]
\begin{column}{.2\textwidth}
\centering
\adjincludegraphics[width=.4\linewidth, valign=t]{../assets/alex.jpg}
\end{column}
\begin{column}{.6\textwidth}
\textbf{Alex Auvolat}\\
PhD; co-founder of Deuxfleurs
\end{column}
\begin{column}{.2\textwidth}
~
\end{column}
\end{columns}
\vspace{2em}
\begin{columns}[t]
\begin{column}{.2\textwidth}
\centering
\adjincludegraphics[width=.5\linewidth, valign=t]{../assets/logos/deuxfleurs.pdf}
\end{column}
\begin{column}{.6\textwidth}
\textbf{Deuxfleurs}\\
A non-profit self-hosting collective,\\
member of the CHATONS network
\end{column}
\begin{column}{.2\textwidth}
\centering
\adjincludegraphics[width=.7\linewidth, valign=t]{../assets/logos/logo_chatons.png}
\end{column}
\end{columns}
\end{frame}
\begin{frame}
\frametitle{Our objective at Deuxfleurs}
\begin{center}
\textbf{Promote self-hosting and small-scale hosting\\
as an alternative to large cloud providers}
\end{center}
\vspace{2em}
\visible<2->{
Why is it hard?
\vspace{2em}
\begin{center}
\textbf{\underline{Resilience}}\\
{\footnotesize we want good uptime/availability with low supervision}
\end{center}
}
\end{frame}
\begin{frame}
\frametitle{Our very low-tech infrastructure}
\only<1,3-6>{
\begin{itemize}
\item \textcolor<4->{gray}{Commodity hardware (e.g. old desktop PCs)\\
\vspace{.5em}
\visible<3->{{\footnotesize (can die at any time)}}}
\vspace{1.5em}
\item<4-> \textcolor<6->{gray}{Regular Internet (e.g. FTTB, FTTH) and power grid connections\\
\vspace{.5em}
\visible<5->{{\footnotesize (can be unavailable randomly)}}}
\vspace{1.5em}
\item<6-> \textbf{Geographical redundancy} (multi-site replication)
\end{itemize}
}
\only<2>{
\begin{center}
\includegraphics[width=.8\linewidth]{../assets/neptune.jpg}
\end{center}
}
\only<7>{
\begin{center}
\includegraphics[width=.8\linewidth]{../assets/inframap_jdll2023.pdf}
\end{center}
}
\end{frame}
\begin{frame}
\frametitle{How to make this happen}
\begin{center}
\only<1>{\includegraphics[width=.8\linewidth]{../assets/intro/slide1.png}}%
\only<2>{\includegraphics[width=.8\linewidth]{../assets/intro/slide2.png}}%
\only<3>{\includegraphics[width=.8\linewidth]{../assets/intro/slide3.png}}%
\end{center}
\end{frame}
\begin{frame}
\frametitle{Distributed file systems are slow}
File systems are complex, for example:
\vspace{1em}
\begin{itemize}
\item Concurrent modification by several processes
\vspace{1em}
\item Folder hierarchies
\vspace{1em}
\item Other requirements of the POSIX spec (e.g.~locks)
\end{itemize}
\vspace{1em}
Coordination in a distributed system is costly
\vspace{1em}
Costs explode with commodity hardware / Internet connections\\
{\small (we experienced this!)}
\end{frame}
\begin{frame}
\frametitle{A simpler solution: object storage}
Only two operations:
\vspace{1em}
\begin{itemize}
\item Put an object at a key
\vspace{1em}
\item Retrieve an object from its key
\end{itemize}
\vspace{1em}
{\footnotesize (and a few others)}
\vspace{1em}
Sufficient for many applications!
\end{frame}
\begin{frame}
\frametitle{A simpler solution: object storage}
\begin{center}
\includegraphics[height=6em]{../assets/logos/Amazon-S3.jpg}
\hspace{3em}
\visible<2->{\includegraphics[height=5em]{../assets/logos/minio.png}}
\hspace{3em}
\visible<3>{\includegraphics[height=6em]{../../logo/garage_hires_crop.png}}
\end{center}
\vspace{1em}
S3: a de-facto standard, many compatible applications
\vspace{1em}
\visible<2->{MinIO is self-hostable but not suited for geo-distributed deployments}
\vspace{1em}
\visible<3->{\textbf{Garage is a self-hosted drop-in replacement for the Amazon S3 object store}}
\end{frame}
% --------- BASED ON CRDTS ----------
\section{Principle 1: based on CRDTs}
\begin{frame}
\frametitle{CRDTs / weak consistency instead of consensus}
\underline{Internally, Garage uses only CRDTs} (conflict-free replicated data types)
\vspace{2em}
Why not Raft, Paxos, ...? Issues of consensus algorithms:
\vspace{1em}
\begin{itemize}
\item<2-> \textbf{Software complexity}
\vspace{1em}
\item<3-> \textbf{Performance issues:}
\vspace{.5em}
\begin{itemize}
\item<4-> The leader is a \textbf{bottleneck} for all requests\\
\vspace{.5em}
\item<5-> \textbf{Sensitive to higher latency} between nodes
\vspace{.5em}
\item<6-> \textbf{Takes time to reconverge} when disrupted (e.g. node going down)
\end{itemize}
\end{itemize}
\end{frame}
\begin{frame}
\frametitle{The data model of object storage}
Object storage is basically a \textbf{key-value store}:
\vspace{.5em}
{\scriptsize
\begin{center}
\begin{tabular}{|l|p{7cm}|}
\hline
\textbf{Key: file path + name} & \textbf{Value: file data + metadata} \\
\hline
\hline
\texttt{index.html} &
\texttt{Content-Type: text/html; charset=utf-8} \newline
\texttt{Content-Length: 24929} \newline
\texttt{<binary blob>} \\
\hline
\texttt{img/logo.svg} &
\texttt{Content-Type: text/svg+xml} \newline
\texttt{Content-Length: 13429} \newline
\texttt{<binary blob>} \\
\hline
\texttt{download/index.html} &
\texttt{Content-Type: text/html; charset=utf-8} \newline
\texttt{Content-Length: 26563} \newline
\texttt{<binary blob>} \\
\hline
\end{tabular}
\end{center}
}
\vspace{.5em}
\begin{itemize}
\item<2-> Maps well to CRDT data types
\item<3> Read-after-write consistency with quorums
\end{itemize}
\end{frame}
\begin{frame}
\frametitle{Performance gains in practice}
\begin{center}
\includegraphics[width=.8\linewidth]{../assets/perf/endpoint_latency_0.7_0.8_minio.png}
\end{center}
\end{frame}
% --------- GEO-DISTRIBUTED MODEL ----------
\section{Principle 2: geo-distributed data model}
\begin{frame}
\frametitle{Key-value stores, upgraded: the Dynamo model}
\textbf{Two keys:}
\begin{itemize}
\item Partition key: used to divide data into partitions {\small (a.k.a.~shards)}
\item Sort key: used to identify items inside a partition
\end{itemize}
\vspace{1em}
\begin{center}
\begin{tabular}{|l|l|p{3cm}|}
\hline
\textbf{Partition key: bucket} & \textbf{Sort key: filename} & \textbf{Value} \\
\hline
\hline
\texttt{website} & \texttt{index.html} & (file data) \\
\hline
\texttt{website} & \texttt{img/logo.svg} & (file data) \\
\hline
\texttt{website} & \texttt{download/index.html} & (file data) \\
\hline
\hline
\texttt{backup} & \texttt{borg/index.2822} & (file data) \\
\hline
\texttt{backup} & \texttt{borg/data/2/2329} & (file data) \\
\hline
\texttt{backup} & \texttt{borg/data/2/2680} & (file data) \\
\hline
\hline
\texttt{private} & \texttt{qq3a2nbe1qjq0ebbvo6ocsp6co} & (file data) \\
\hline
\end{tabular}
\end{center}
\end{frame}
\begin{frame}
\frametitle{Layout computation}
\begin{overprint}
\onslide<1>
\begin{center}
\includegraphics[width=\linewidth, trim=0 0 0 -4cm]{../assets/screenshots/garage_status_0.9_prod_zonehl.png}
\end{center}
\onslide<2>
\begin{center}
\includegraphics[width=.7\linewidth]{../assets/map.png}
\end{center}
\end{overprint}
\vspace{1em}
Garage stores replicas on different zones when possible
\end{frame}
\begin{frame}
\frametitle{What a "layout" is}
\textbf{A layout is a precomputed index table:}
\vspace{1em}
{\footnotesize
\begin{center}
\begin{tabular}{|l|l|l|l|}
\hline
\textbf{Partition} & \textbf{Node 1} & \textbf{Node 2} & \textbf{Node 3} \\
\hline
\hline
Partition 0 & df-ymk (bespin) & Abricot (scorpio) & Courgette (neptune) \\
\hline
Partition 1 & Ananas (scorpio) & Courgette (neptune) & df-ykl (bespin) \\
\hline
Partition 2 & df-ymf (bespin) & Celeri (neptune) & Abricot (scorpio) \\
\hline
\hspace{1em}$\vdots$ & \hspace{1em}$\vdots$ & \hspace{1em}$\vdots$ & \hspace{1em}$\vdots$ \\
\hline
Partition 255 & Concombre (neptune) & df-ykl (bespin) & Abricot (scorpio) \\
\hline
\end{tabular}
\end{center}
}
\vspace{2em}
\visible<2->{
The index table is built centrally using an optimal algorithm,\\
then propagated to all nodes
}
\vspace{1em}
\visible<3->{
\footnotesize
Oulamara, M., \& Auvolat, A. (2023). \emph{An algorithm for geo-distributed and redundant storage in Garage}.\\ arXiv preprint arXiv:2302.13798.
}
\end{frame}
\begin{frame}
\frametitle{The relationship between \emph{partition} and \emph{partition key}}
\begin{center}
\begin{tabular}{|l|l|l|l|}
\hline
\textbf{Partition key} & \textbf{Partition} & \textbf{Sort key} & \textbf{Value} \\
\hline
\hline
\texttt{website} & Partition 12 & \texttt{index.html} & (file data) \\
\hline
\texttt{website} & Partition 12 & \texttt{img/logo.svg} & (file data) \\
\hline
\texttt{website} & Partition 12 &\texttt{download/index.html} & (file data) \\
\hline
\hline
\texttt{backup} & Partition 42 & \texttt{borg/index.2822} & (file data) \\
\hline
\texttt{backup} & Partition 42 & \texttt{borg/data/2/2329} & (file data) \\
\hline
\texttt{backup} & Partition 42 & \texttt{borg/data/2/2680} & (file data) \\
\hline
\hline
\texttt{private} & Partition 42 & \texttt{qq3a2nbe1qjq0ebbvo6ocsp6co} & (file data) \\
\hline
\end{tabular}
\end{center}
\vspace{1em}
\textbf{To read or write an item:} hash partition key
\\ \hspace{5cm} $\to$ determine partition number (first 8 bits)
\\ \hspace{5cm} $\to$ find associated nodes
\end{frame}
\begin{frame}
\frametitle{Garage's internal data structures}
\centering
\includegraphics[width=.75\columnwidth]{../assets/garage_tables.pdf}
\end{frame}
% ---------- OPERATING GARAGE ---------
\section{Operating Garage clusters}
\begin{frame}
\frametitle{Operating Garage}
\begin{center}
\only<1-2>{
\includegraphics[width=.9\linewidth]{../assets/screenshots/garage_status_0.10.png}
\\\vspace{1em}
\visible<2>{\includegraphics[width=.9\linewidth]{../assets/screenshots/garage_status_unhealthy_0.10.png}}
}
\end{center}
\end{frame}
\begin{frame}
\frametitle{Background synchronization}
\begin{center}
\includegraphics[width=.6\linewidth]{../assets/garage_sync.drawio.pdf}
\end{center}
\end{frame}
\begin{frame}
\frametitle{Digging deeper}
\begin{center}
\only<1>{\includegraphics[width=.9\linewidth]{../assets/screenshots/garage_stats_0.10.png}}
\only<2>{\includegraphics[width=.5\linewidth]{../assets/screenshots/garage_worker_list_0.10.png}}
\only<3>{\includegraphics[width=.6\linewidth]{../assets/screenshots/garage_worker_param_0.10.png}}
\end{center}
\end{frame}
\begin{frame}
\frametitle{Monitoring with Prometheus + Grafana}
\begin{center}
\includegraphics[width=.9\linewidth]{../assets/screenshots/grafana_dashboard.png}
\end{center}
\end{frame}
\begin{frame}
\frametitle{Debugging with traces}
\begin{center}
\includegraphics[width=.8\linewidth]{../assets/screenshots/jaeger_listobjects.png}
\end{center}
\end{frame}
% ---------- SCALING GARAGE ---------
\section{Scaling Garage clusters}
\begin{frame}
\frametitle{Potential limitations and bottlenecks}
\begin{itemize}
\item Global:
\begin{itemize}
\item Max. $\sim$100 nodes per cluster (excluding gateways)
\end{itemize}
\vspace{1em}
\item Metadata:
\begin{itemize}
\item One big bucket = bottleneck, object list on 3 nodes only
\end{itemize}
\vspace{1em}
\item Block manager:
\begin{itemize}
\item Lots of small files on disk
\item Processing the resync queue can be slow
\end{itemize}
\end{itemize}
\end{frame}
\begin{frame}
\frametitle{Deployment advice for very large clusters}
\begin{itemize}
\item Metadata storage:
\begin{itemize}
\item ZFS mirror (x2) on fast NVMe
\item Use LMDB storage engine
\end{itemize}
\vspace{.5em}
\item Data block storage:
\begin{itemize}
\item Use Garage's native multi-HDD support
\item XFS on individual drives
\item Increase block size (1MB $\to$ 10MB, requires more RAM and good networking)
\item Tune \texttt{resync-tranquility} and \texttt{resync-worker-count} dynamically
\end{itemize}
\vspace{.5em}
\item Other :
\begin{itemize}
\item Split data over several buckets
\item Use less than 100 storage nodes
\item Use gateway nodes
\end{itemize}
\vspace{.5em}
\end{itemize}
Our deployments: $< 10$ TB. Some people have done more!
\end{frame}
% ======================================== END
% ======================================== END
% ======================================== END
\begin{frame}
\frametitle{Where to find us}
\begin{center}
\includegraphics[width=.25\linewidth]{../../logo/garage_hires.png}\\
\vspace{-1em}
\url{https://garagehq.deuxfleurs.fr/}\\
\url{mailto:garagehq@deuxfleurs.fr}\\
\texttt{\#garage:deuxfleurs.fr} on Matrix
\vspace{1.5em}
\includegraphics[width=.06\linewidth]{../assets/logos/rust_logo.png}
\includegraphics[width=.13\linewidth]{../assets/logos/AGPLv3_Logo.png}
\end{center}
\end{frame}
\end{document}
%% vim: set ts=4 sw=4 tw=0 noet spelllang=en :

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 87 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 81 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 124 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 84 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 81 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 81 KiB

File diff suppressed because it is too large Load diff

After

Width:  |  Height:  |  Size: 315 KiB

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 286 KiB

View file

@ -21,4 +21,4 @@ version: 0.4.1
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "v0.9.1"
appVersion: "v0.9.3"

View file

@ -81,11 +81,9 @@ if [ -z "$SKIP_AWS" ]; then
echo "Invalid multipart upload"
exit 1
fi
aws s3api delete-object --bucket eprouvette --key upload
fi
echo "OK!!"
exit 0
# S3CMD
if [ -z "$SKIP_S3CMD" ]; then
echo "🛠️ Testing with s3cmd"

View file

@ -21,6 +21,7 @@ garage_net.workspace = true
garage_util.workspace = true
garage_rpc.workspace = true
argon2.workspace = true
async-trait.workspace = true
base64.workspace = true
bytes.workspace = true

View file

@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use argon2::password_hash::PasswordHash;
use async_trait::async_trait;
use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW};
@ -45,14 +46,8 @@ impl AdminApiServer {
#[cfg(feature = "metrics")] exporter: PrometheusExporter,
) -> Self {
let cfg = &garage.config.admin;
let metrics_token = cfg
.metrics_token
.as_ref()
.map(|tok| format!("Bearer {}", tok));
let admin_token = cfg
.admin_token
.as_ref()
.map(|tok| format!("Bearer {}", tok));
let metrics_token = cfg.metrics_token.as_deref().map(hash_bearer_token);
let admin_token = cfg.admin_token.as_deref().map(hash_bearer_token);
Self {
garage,
#[cfg(feature = "metrics")]
@ -248,11 +243,11 @@ impl ApiHandler for AdminApiServer {
req: Request<IncomingBody>,
endpoint: Endpoint,
) -> Result<Response<ResBody>, Error> {
let expected_auth_header =
let required_auth_hash =
match endpoint.authorization_type() {
Authorization::None => None,
Authorization::MetricsToken => self.metrics_token.as_ref(),
Authorization::AdminToken => match &self.admin_token {
Authorization::MetricsToken => self.metrics_token.as_deref(),
Authorization::AdminToken => match self.admin_token.as_deref() {
None => return Err(Error::forbidden(
"Admin token isn't configured, admin API access is disabled for security.",
)),
@ -260,14 +255,11 @@ impl ApiHandler for AdminApiServer {
},
};
if let Some(h) = expected_auth_header {
if let Some(password_hash) = required_auth_hash {
match req.headers().get("Authorization") {
None => return Err(Error::forbidden("Authorization token must be provided")),
Some(v) => {
let authorized = v.to_str().map(|hv| hv.trim() == h).unwrap_or(false);
if !authorized {
return Err(Error::forbidden("Invalid authorization token provided"));
}
Some(authorization) => {
verify_bearer_token(&authorization, password_hash)?;
}
}
}
@ -342,3 +334,35 @@ impl ApiEndpoint for Endpoint {
fn add_span_attributes(&self, _span: SpanRef<'_>) {}
}
fn hash_bearer_token(token: &str) -> String {
use argon2::{
password_hash::{rand_core::OsRng, PasswordHasher, SaltString},
Argon2,
};
let salt = SaltString::generate(&mut OsRng);
let argon2 = Argon2::default();
argon2
.hash_password(token.trim().as_bytes(), &salt)
.expect("could not hash API token")
.to_string()
}
fn verify_bearer_token(token: &hyper::http::HeaderValue, password_hash: &str) -> Result<(), Error> {
use argon2::{password_hash::PasswordVerifier, Argon2};
let parsed_hash = PasswordHash::new(&password_hash).unwrap();
token
.to_str()?
.strip_prefix("Bearer ")
.and_then(|token| {
Argon2::default()
.verify_password(token.trim().as_bytes(), &parsed_hash)
.ok()
})
.ok_or_else(|| Error::forbidden("Invalid authorization token"))?;
Ok(())
}

View file

@ -1,4 +1,5 @@
use std::convert::Infallible;
use std::sync::Arc;
use futures::{Stream, StreamExt, TryStreamExt};
@ -10,6 +11,10 @@ use hyper::{
use idna::domain_to_unicode;
use serde::{Deserialize, Serialize};
use garage_model::bucket_table::BucketParams;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
use garage_util::data::Uuid;
use garage_util::error::Error as GarageError;
use crate::common_error::{CommonError as Error, *};
@ -27,6 +32,15 @@ pub enum Authorization {
Owner,
}
/// The values which are known for each request related to a bucket
pub struct ReqCtx {
pub garage: Arc<Garage>,
pub bucket_id: Uuid,
pub bucket_name: String,
pub bucket_params: BucketParams,
pub api_key: Key,
}
/// Host to bucket
///
/// Convert a host, like "bucket.garage-site.tld" to the corresponding bucket "bucket",

View file

@ -15,8 +15,7 @@ use garage_model::garage::Garage;
use crate::generic_server::*;
use crate::k2v::error::*;
use crate::signature::payload::check_payload_signature;
use crate::signature::streaming::*;
use crate::signature::verify_request;
use crate::helpers::*;
use crate::k2v::batch::*;
@ -86,17 +85,7 @@ impl ApiHandler for K2VApiServer {
return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
}
let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?;
let api_key = api_key
.ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?;
let req = parse_streaming_body(
&api_key,
req,
&mut content_sha256,
&garage.config.s3_api.s3_region,
"k2v",
)?;
let (req, api_key, _content_sha256) = verify_request(&garage, req, "k2v").await?;
let bucket_id = garage
.bucket_helper()
@ -106,6 +95,7 @@ impl ApiHandler for K2VApiServer {
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let bucket_params = bucket.state.into_option().unwrap();
let allowed = match endpoint.authorization_type() {
Authorization::Read => api_key.allow_read(&bucket_id),
@ -123,40 +113,42 @@ impl ApiHandler for K2VApiServer {
// are always preflighted, i.e. the browser should make
// an OPTIONS call before to check it is allowed
let matching_cors_rule = match *req.method() {
Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)
.ok_or_internal_error("Error looking up CORS rule")?,
Method::GET | Method::HEAD | Method::POST => {
find_matching_cors_rule(&bucket_params, &req)
.ok_or_internal_error("Error looking up CORS rule")?
.cloned()
}
_ => None,
};
let ctx = ReqCtx {
garage,
bucket_id,
bucket_name,
bucket_params,
api_key,
};
let resp = match endpoint {
Endpoint::DeleteItem {
partition_key,
sort_key,
} => handle_delete_item(garage, req, bucket_id, &partition_key, &sort_key).await,
} => handle_delete_item(ctx, req, &partition_key, &sort_key).await,
Endpoint::InsertItem {
partition_key,
sort_key,
} => handle_insert_item(garage, req, bucket_id, &partition_key, &sort_key).await,
} => handle_insert_item(ctx, req, &partition_key, &sort_key).await,
Endpoint::ReadItem {
partition_key,
sort_key,
} => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await,
} => handle_read_item(ctx, &req, &partition_key, &sort_key).await,
Endpoint::PollItem {
partition_key,
sort_key,
causality_token,
timeout,
} => {
handle_poll_item(
garage,
&req,
bucket_id,
partition_key,
sort_key,
causality_token,
timeout,
)
.await
handle_poll_item(ctx, &req, partition_key, sort_key, causality_token, timeout).await
}
Endpoint::ReadIndex {
prefix,
@ -164,12 +156,12 @@ impl ApiHandler for K2VApiServer {
end,
limit,
reverse,
} => handle_read_index(garage, bucket_id, prefix, start, end, limit, reverse).await,
Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await,
Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await,
Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await,
} => handle_read_index(ctx, prefix, start, end, limit, reverse).await,
Endpoint::InsertBatch {} => handle_insert_batch(ctx, req).await,
Endpoint::ReadBatch {} => handle_read_batch(ctx, req).await,
Endpoint::DeleteBatch {} => handle_delete_batch(ctx, req).await,
Endpoint::PollRange { partition_key } => {
handle_poll_range(garage, bucket_id, &partition_key, req).await
handle_poll_range(ctx, &partition_key, req).await
}
Endpoint::Options => unreachable!(),
};
@ -178,7 +170,7 @@ impl ApiHandler for K2VApiServer {
// add the corresponding CORS headers to the response
let mut resp_ok = resp?;
if let Some(rule) = matching_cors_rule {
add_cors_headers(&mut resp_ok, rule)
add_cors_headers(&mut resp_ok, &rule)
.ok_or_internal_error("Invalid bucket CORS configuration")?;
}

View file

@ -1,14 +1,9 @@
use std::sync::Arc;
use base64::prelude::*;
use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_util::data::*;
use garage_table::{EnumerationOrder, TableSchema};
use garage_model::garage::Garage;
use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
@ -18,10 +13,12 @@ use crate::k2v::error::*;
use crate::k2v::range::read_range;
pub async fn handle_insert_batch(
garage: Arc<Garage>,
bucket_id: Uuid,
ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
} = &ctx;
let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?;
let mut items2 = vec![];
@ -38,7 +35,7 @@ pub async fn handle_insert_batch(
items2.push((it.pk, it.sk, ct, v));
}
garage.k2v.rpc.insert_batch(bucket_id, items2).await?;
garage.k2v.rpc.insert_batch(*bucket_id, items2).await?;
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
@ -46,8 +43,7 @@ pub async fn handle_insert_batch(
}
pub async fn handle_read_batch(
garage: Arc<Garage>,
bucket_id: Uuid,
ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?;
@ -55,7 +51,7 @@ pub async fn handle_read_batch(
let resp_results = futures::future::join_all(
queries
.into_iter()
.map(|q| handle_read_batch_query(&garage, bucket_id, q)),
.map(|q| handle_read_batch_query(&ctx, q)),
)
.await;
@ -68,12 +64,15 @@ pub async fn handle_read_batch(
}
async fn handle_read_batch_query(
garage: &Arc<Garage>,
bucket_id: Uuid,
ctx: &ReqCtx,
query: ReadBatchQuery,
) -> Result<ReadBatchResponse, Error> {
let ReqCtx {
garage, bucket_id, ..
} = ctx;
let partition = K2VItemPartition {
bucket_id,
bucket_id: *bucket_id,
partition_key: query.partition_key.clone(),
};
@ -138,8 +137,7 @@ async fn handle_read_batch_query(
}
pub async fn handle_delete_batch(
garage: Arc<Garage>,
bucket_id: Uuid,
ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?;
@ -147,7 +145,7 @@ pub async fn handle_delete_batch(
let resp_results = futures::future::join_all(
queries
.into_iter()
.map(|q| handle_delete_batch_query(&garage, bucket_id, q)),
.map(|q| handle_delete_batch_query(&ctx, q)),
)
.await;
@ -160,12 +158,15 @@ pub async fn handle_delete_batch(
}
async fn handle_delete_batch_query(
garage: &Arc<Garage>,
bucket_id: Uuid,
ctx: &ReqCtx,
query: DeleteBatchQuery,
) -> Result<DeleteBatchResponse, Error> {
let ReqCtx {
garage, bucket_id, ..
} = &ctx;
let partition = K2VItemPartition {
bucket_id,
bucket_id: *bucket_id,
partition_key: query.partition_key.clone(),
};
@ -195,7 +196,7 @@ async fn handle_delete_batch_query(
.k2v
.rpc
.insert(
bucket_id,
*bucket_id,
i.partition.partition_key,
i.sort_key,
Some(cc),
@ -235,7 +236,7 @@ async fn handle_delete_batch_query(
.collect::<Vec<_>>();
let n = items.len();
garage.k2v.rpc.insert_batch(bucket_id, items).await?;
garage.k2v.rpc.insert_batch(*bucket_id, items).await?;
n
};
@ -251,11 +252,13 @@ async fn handle_delete_batch_query(
}
pub(crate) async fn handle_poll_range(
garage: Arc<Garage>,
bucket_id: Uuid,
ctx: ReqCtx,
partition_key: &str,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
} = ctx;
use garage_model::k2v::sub::PollRange;
let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?;

View file

@ -1,13 +1,8 @@
use std::sync::Arc;
use hyper::Response;
use serde::Serialize;
use garage_util::data::*;
use garage_table::util::*;
use garage_model::garage::Garage;
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
use crate::helpers::*;
@ -16,14 +11,17 @@ use crate::k2v::error::*;
use crate::k2v::range::read_range;
pub async fn handle_read_index(
garage: Arc<Garage>,
bucket_id: Uuid,
ctx: ReqCtx,
prefix: Option<String>,
start: Option<String>,
end: Option<String>,
limit: Option<u64>,
reverse: Option<bool>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
} = &ctx;
let reverse = reverse.unwrap_or(false);
let node_id_vec = garage

View file

@ -1,13 +1,8 @@
use std::sync::Arc;
use base64::prelude::*;
use http::header;
use hyper::{Request, Response, StatusCode};
use garage_util::data::*;
use garage_model::garage::Garage;
use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
@ -100,12 +95,15 @@ impl ReturnFormat {
/// Handle ReadItem request
#[allow(clippy::ptr_arg)]
pub async fn handle_read_item(
garage: Arc<Garage>,
ctx: ReqCtx,
req: &Request<ReqBody>,
bucket_id: Uuid,
partition_key: &str,
sort_key: &String,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
} = &ctx;
let format = ReturnFormat::from(req)?;
let item = garage
@ -113,7 +111,7 @@ pub async fn handle_read_item(
.item_table
.get(
&K2VItemPartition {
bucket_id,
bucket_id: *bucket_id,
partition_key: partition_key.to_string(),
},
sort_key,
@ -125,12 +123,14 @@ pub async fn handle_read_item(
}
pub async fn handle_insert_item(
garage: Arc<Garage>,
ctx: ReqCtx,
req: Request<ReqBody>,
bucket_id: Uuid,
partition_key: &str,
sort_key: &str,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
} = &ctx;
let causal_context = req
.headers()
.get(X_GARAGE_CAUSALITY_TOKEN)
@ -149,7 +149,7 @@ pub async fn handle_insert_item(
.k2v
.rpc
.insert(
bucket_id,
*bucket_id,
partition_key.to_string(),
sort_key.to_string(),
causal_context,
@ -163,12 +163,14 @@ pub async fn handle_insert_item(
}
pub async fn handle_delete_item(
garage: Arc<Garage>,
ctx: ReqCtx,
req: Request<ReqBody>,
bucket_id: Uuid,
partition_key: &str,
sort_key: &str,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
} = &ctx;
let causal_context = req
.headers()
.get(X_GARAGE_CAUSALITY_TOKEN)
@ -183,7 +185,7 @@ pub async fn handle_delete_item(
.k2v
.rpc
.insert(
bucket_id,
*bucket_id,
partition_key.to_string(),
sort_key.to_string(),
causal_context,
@ -199,14 +201,16 @@ pub async fn handle_delete_item(
/// Handle ReadItem request
#[allow(clippy::ptr_arg)]
pub async fn handle_poll_item(
garage: Arc<Garage>,
ctx: ReqCtx,
req: &Request<ReqBody>,
bucket_id: Uuid,
partition_key: String,
sort_key: String,
causality_token: String,
timeout_secs: Option<u64>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
} = &ctx;
let format = ReturnFormat::from(req)?;
let causal_context =
@ -218,7 +222,7 @@ pub async fn handle_poll_item(
.k2v
.rpc
.poll_item(
bucket_id,
*bucket_id,
partition_key,
sort_key,
causal_context,

View file

@ -17,8 +17,7 @@ use garage_model::key_table::Key;
use crate::generic_server::*;
use crate::s3::error::*;
use crate::signature::payload::check_payload_signature;
use crate::signature::streaming::*;
use crate::signature::verify_request;
use crate::helpers::*;
use crate::s3::bucket::*;
@ -125,17 +124,7 @@ impl ApiHandler for S3ApiServer {
return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
}
let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?;
let api_key = api_key
.ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?;
let req = parse_streaming_body(
&api_key,
req,
&mut content_sha256,
&garage.config.s3_api.s3_region,
"s3",
)?;
let (req, api_key, content_sha256) = verify_request(&garage, req, "s3").await?;
let bucket_name = match bucket_name {
None => {
@ -166,6 +155,7 @@ impl ApiHandler for S3ApiServer {
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let bucket_params = bucket.state.into_option().unwrap();
let allowed = match endpoint.authorization_type() {
Authorization::Read => api_key.allow_read(&bucket_id),
@ -178,12 +168,20 @@ impl ApiHandler for S3ApiServer {
return Err(Error::forbidden("Operation is not allowed for this key."));
}
let matching_cors_rule = find_matching_cors_rule(&bucket, &req)?;
let matching_cors_rule = find_matching_cors_rule(&bucket_params, &req)?.cloned();
let ctx = ReqCtx {
garage,
bucket_id,
bucket_name,
bucket_params,
api_key,
};
let resp = match endpoint {
Endpoint::HeadObject {
key, part_number, ..
} => handle_head(garage, &req, bucket_id, &key, part_number).await,
} => handle_head(ctx, &req, &key, part_number).await,
Endpoint::GetObject {
key,
part_number,
@ -203,74 +201,37 @@ impl ApiHandler for S3ApiServer {
response_content_type,
response_expires,
};
handle_get(garage, &req, bucket_id, &key, part_number, overrides).await
handle_get(ctx, &req, &key, part_number, overrides).await
}
Endpoint::UploadPart {
key,
part_number,
upload_id,
} => {
handle_put_part(
garage,
req,
bucket_id,
&key,
part_number,
&upload_id,
content_sha256,
)
.await
}
Endpoint::CopyObject { key } => {
handle_copy(garage, &api_key, &req, bucket_id, &key).await
}
} => handle_put_part(ctx, req, &key, part_number, &upload_id, content_sha256).await,
Endpoint::CopyObject { key } => handle_copy(ctx, &req, &key).await,
Endpoint::UploadPartCopy {
key,
part_number,
upload_id,
} => {
handle_upload_part_copy(
garage,
&api_key,
&req,
bucket_id,
&key,
part_number,
&upload_id,
)
.await
}
Endpoint::PutObject { key } => {
handle_put(garage, req, &bucket, &key, content_sha256).await
}
} => handle_upload_part_copy(ctx, &req, &key, part_number, &upload_id).await,
Endpoint::PutObject { key } => handle_put(ctx, req, &key, content_sha256).await,
Endpoint::AbortMultipartUpload { key, upload_id } => {
handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
handle_abort_multipart_upload(ctx, &key, &upload_id).await
}
Endpoint::DeleteObject { key, .. } => handle_delete(garage, bucket_id, &key).await,
Endpoint::DeleteObject { key, .. } => handle_delete(ctx, &key).await,
Endpoint::CreateMultipartUpload { key } => {
handle_create_multipart_upload(garage, &req, &bucket_name, bucket_id, &key).await
handle_create_multipart_upload(ctx, &req, &key).await
}
Endpoint::CompleteMultipartUpload { key, upload_id } => {
handle_complete_multipart_upload(
garage,
req,
&bucket_name,
&bucket,
&key,
&upload_id,
content_sha256,
)
.await
handle_complete_multipart_upload(ctx, req, &key, &upload_id, content_sha256).await
}
Endpoint::CreateBucket {} => unreachable!(),
Endpoint::HeadBucket {} => {
let response = Response::builder().body(empty_body()).unwrap();
Ok(response)
}
Endpoint::DeleteBucket {} => {
handle_delete_bucket(&garage, bucket_id, bucket_name, &api_key.key_id).await
}
Endpoint::GetBucketLocation {} => handle_get_bucket_location(garage),
Endpoint::DeleteBucket {} => handle_delete_bucket(ctx).await,
Endpoint::GetBucketLocation {} => handle_get_bucket_location(ctx),
Endpoint::GetBucketVersioning {} => handle_get_bucket_versioning(),
Endpoint::ListObjects {
delimiter,
@ -279,24 +240,21 @@ impl ApiHandler for S3ApiServer {
max_keys,
prefix,
} => {
handle_list(
garage,
&ListObjectsQuery {
common: ListQueryCommon {
bucket_name,
bucket_id,
delimiter,
page_size: max_keys.unwrap_or(1000).clamp(1, 1000),
prefix: prefix.unwrap_or_default(),
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
},
is_v2: false,
marker,
continuation_token: None,
start_after: None,
let query = ListObjectsQuery {
common: ListQueryCommon {
bucket_name: ctx.bucket_name.clone(),
bucket_id,
delimiter,
page_size: max_keys.unwrap_or(1000).clamp(1, 1000),
prefix: prefix.unwrap_or_default(),
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
},
)
.await
is_v2: false,
marker,
continuation_token: None,
start_after: None,
};
handle_list(ctx, &query).await
}
Endpoint::ListObjectsV2 {
delimiter,
@ -309,24 +267,21 @@ impl ApiHandler for S3ApiServer {
..
} => {
if list_type == "2" {
handle_list(
garage,
&ListObjectsQuery {
common: ListQueryCommon {
bucket_name,
bucket_id,
delimiter,
page_size: max_keys.unwrap_or(1000).clamp(1, 1000),
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
prefix: prefix.unwrap_or_default(),
},
is_v2: true,
marker: None,
continuation_token,
start_after,
let query = ListObjectsQuery {
common: ListQueryCommon {
bucket_name: ctx.bucket_name.clone(),
bucket_id,
delimiter,
page_size: max_keys.unwrap_or(1000).clamp(1, 1000),
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
prefix: prefix.unwrap_or_default(),
},
)
.await
is_v2: true,
marker: None,
continuation_token,
start_after,
};
handle_list(ctx, &query).await
} else {
Err(Error::bad_request(format!(
"Invalid endpoint: list-type={}",
@ -342,22 +297,19 @@ impl ApiHandler for S3ApiServer {
prefix,
upload_id_marker,
} => {
handle_list_multipart_upload(
garage,
&ListMultipartUploadsQuery {
common: ListQueryCommon {
bucket_name,
bucket_id,
delimiter,
page_size: max_uploads.unwrap_or(1000).clamp(1, 1000),
prefix: prefix.unwrap_or_default(),
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
},
key_marker,
upload_id_marker,
let query = ListMultipartUploadsQuery {
common: ListQueryCommon {
bucket_name: ctx.bucket_name.clone(),
bucket_id,
delimiter,
page_size: max_uploads.unwrap_or(1000).clamp(1, 1000),
prefix: prefix.unwrap_or_default(),
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
},
)
.await
key_marker,
upload_id_marker,
};
handle_list_multipart_upload(ctx, &query).await
}
Endpoint::ListParts {
key,
@ -365,39 +317,28 @@ impl ApiHandler for S3ApiServer {
part_number_marker,
upload_id,
} => {
handle_list_parts(
garage,
&ListPartsQuery {
bucket_name,
bucket_id,
key,
upload_id,
part_number_marker: part_number_marker.map(|p| p.min(10000)),
max_parts: max_parts.unwrap_or(1000).clamp(1, 1000),
},
)
.await
let query = ListPartsQuery {
bucket_name: ctx.bucket_name.clone(),
bucket_id,
key,
upload_id,
part_number_marker: part_number_marker.map(|p| p.min(10000)),
max_parts: max_parts.unwrap_or(1000).clamp(1, 1000),
};
handle_list_parts(ctx, &query).await
}
Endpoint::DeleteObjects {} => {
handle_delete_objects(garage, bucket_id, req, content_sha256).await
}
Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await,
Endpoint::PutBucketWebsite {} => {
handle_put_website(garage, bucket.clone(), req, content_sha256).await
}
Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket.clone()).await,
Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await,
Endpoint::PutBucketCors {} => {
handle_put_cors(garage, bucket.clone(), req, content_sha256).await
}
Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket.clone()).await,
Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(&bucket).await,
Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req, content_sha256).await,
Endpoint::GetBucketWebsite {} => handle_get_website(ctx).await,
Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req, content_sha256).await,
Endpoint::DeleteBucketWebsite {} => handle_delete_website(ctx).await,
Endpoint::GetBucketCors {} => handle_get_cors(ctx).await,
Endpoint::PutBucketCors {} => handle_put_cors(ctx, req, content_sha256).await,
Endpoint::DeleteBucketCors {} => handle_delete_cors(ctx).await,
Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(ctx).await,
Endpoint::PutBucketLifecycleConfiguration {} => {
handle_put_lifecycle(garage, bucket.clone(), req, content_sha256).await
}
Endpoint::DeleteBucketLifecycle {} => {
handle_delete_lifecycle(garage, bucket.clone()).await
handle_put_lifecycle(ctx, req, content_sha256).await
}
Endpoint::DeleteBucketLifecycle {} => handle_delete_lifecycle(ctx).await,
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
};
@ -405,7 +346,7 @@ impl ApiHandler for S3ApiServer {
// add the corresponding CORS headers to the response
let mut resp_ok = resp?;
if let Some(rule) = matching_cors_rule {
add_cors_headers(&mut resp_ok, rule)
add_cors_headers(&mut resp_ok, &rule)
.ok_or_internal_error("Invalid bucket CORS configuration")?;
}

View file

@ -1,5 +1,4 @@
use std::collections::HashMap;
use std::sync::Arc;
use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
@ -21,7 +20,8 @@ use crate::s3::error::*;
use crate::s3::xml as s3_xml;
use crate::signature::verify_signed_content;
pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<ResBody>, Error> {
pub fn handle_get_bucket_location(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
let ReqCtx { garage, .. } = ctx;
let loc = s3_xml::LocationConstraint {
xmlns: (),
region: garage.config.s3_api.s3_region.to_string(),
@ -204,21 +204,20 @@ pub async fn handle_create_bucket(
.unwrap())
}
pub async fn handle_delete_bucket(
garage: &Garage,
bucket_id: Uuid,
bucket_name: String,
api_key_id: &String,
) -> Result<Response<ResBody>, Error> {
pub async fn handle_delete_bucket(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
bucket_id,
bucket_name,
bucket_params: bucket_state,
api_key,
..
} = &ctx;
let helper = garage.locked_helper().await;
let api_key = helper.key().get_existing_key(api_key_id).await?;
let key_params = api_key.params().unwrap();
let is_local_alias = matches!(key_params.local_aliases.get(&bucket_name), Some(Some(_)));
let mut bucket = helper.bucket().get_existing_bucket(bucket_id).await?;
let bucket_state = bucket.state.as_option().unwrap();
let is_local_alias = matches!(key_params.local_aliases.get(bucket_name), Some(Some(_)));
// If the bucket has no other aliases, this is a true deletion.
// Otherwise, it is just an alias removal.
@ -228,20 +227,20 @@ pub async fn handle_delete_bucket(
.items()
.iter()
.filter(|(_, _, active)| *active)
.any(|(n, _, _)| is_local_alias || (*n != bucket_name));
.any(|(n, _, _)| is_local_alias || (*n != *bucket_name));
let has_other_local_aliases = bucket_state
.local_aliases
.items()
.iter()
.filter(|(_, _, active)| *active)
.any(|((k, n), _, _)| !is_local_alias || *n != bucket_name || *k != api_key.key_id);
.any(|((k, n), _, _)| !is_local_alias || *n != *bucket_name || *k != api_key.key_id);
if !has_other_global_aliases && !has_other_local_aliases {
// Delete bucket
// Check bucket is empty
if !helper.bucket().is_bucket_empty(bucket_id).await? {
if !helper.bucket().is_bucket_empty(*bucket_id).await? {
return Err(CommonError::BucketNotEmpty.into());
}
@ -249,33 +248,36 @@ pub async fn handle_delete_bucket(
// 1. delete bucket alias
if is_local_alias {
helper
.unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name)
.unset_local_bucket_alias(*bucket_id, &api_key.key_id, bucket_name)
.await?;
} else {
helper
.unset_global_bucket_alias(bucket_id, &bucket_name)
.unset_global_bucket_alias(*bucket_id, bucket_name)
.await?;
}
// 2. delete authorization from keys that had access
for (key_id, _) in bucket.authorized_keys() {
for (key_id, _) in bucket_state.authorized_keys.items() {
helper
.set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
.set_bucket_key_permissions(*bucket_id, key_id, BucketKeyPerm::NO_PERMISSIONS)
.await?;
}
let bucket = Bucket {
id: *bucket_id,
state: Deletable::delete(),
};
// 3. delete bucket
bucket.state = Deletable::delete();
garage.bucket_table.insert(&bucket).await?;
} else if is_local_alias {
// Just unalias
helper
.unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name)
.unset_local_bucket_alias(*bucket_id, &api_key.key_id, bucket_name)
.await?;
} else {
// Just unalias (but from global namespace)
helper
.unset_global_bucket_alias(bucket_id, &bucket_name)
.unset_global_bucket_alias(*bucket_id, bucket_name)
.await?;
}

View file

@ -1,5 +1,4 @@
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::{stream, stream::Stream, StreamExt};
@ -15,8 +14,6 @@ use garage_table::*;
use garage_util::data::*;
use garage_util::time::*;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
use garage_model::s3::block_ref_table::*;
use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
@ -30,15 +27,19 @@ use crate::s3::put::get_headers;
use crate::s3::xml::{self as s3_xml, xmlns_tag};
pub async fn handle_copy(
garage: Arc<Garage>,
api_key: &Key,
ctx: ReqCtx,
req: &Request<ReqBody>,
dest_bucket_id: Uuid,
dest_key: &str,
) -> Result<Response<ResBody>, Error> {
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
let source_object = get_copy_source(&garage, api_key, req).await?;
let source_object = get_copy_source(&ctx, req).await?;
let ReqCtx {
garage,
bucket_id: dest_bucket_id,
..
} = ctx;
let (source_version, source_version_data, source_version_meta) =
extract_source_info(&source_object)?;
@ -181,10 +182,8 @@ pub async fn handle_copy(
}
pub async fn handle_upload_part_copy(
garage: Arc<Garage>,
api_key: &Key,
ctx: ReqCtx,
req: &Request<ReqBody>,
dest_bucket_id: Uuid,
dest_key: &str,
part_number: u64,
upload_id: &str,
@ -195,10 +194,12 @@ pub async fn handle_upload_part_copy(
let dest_key = dest_key.to_string();
let (source_object, (_, _, mut dest_mpu)) = futures::try_join!(
get_copy_source(&garage, api_key, req),
multipart::get_upload(&garage, &dest_bucket_id, &dest_key, &dest_upload_id)
get_copy_source(&ctx, req),
multipart::get_upload(&ctx, &dest_key, &dest_upload_id)
)?;
let ReqCtx { garage, .. } = ctx;
let (source_object_version, source_version_data, source_version_meta) =
extract_source_info(&source_object)?;
@ -439,11 +440,11 @@ pub async fn handle_upload_part_copy(
.body(string_body(resp_xml))?)
}
async fn get_copy_source(
garage: &Garage,
api_key: &Key,
req: &Request<ReqBody>,
) -> Result<Object, Error> {
async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object, Error> {
let ReqCtx {
garage, api_key, ..
} = ctx;
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;

View file

@ -21,16 +21,13 @@ use crate::s3::error::*;
use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule};
use garage_model::bucket_table::{Bucket, BucketParams, CorsRule as GarageCorsRule};
use garage_model::garage::Garage;
use garage_util::data::*;
pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<ResBody>, Error> {
let param = bucket
.params()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
if let Some(cors) = param.cors_config.get() {
pub async fn handle_get_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
let ReqCtx { bucket_params, .. } = ctx;
if let Some(cors) = bucket_params.cors_config.get() {
let wc = CorsConfiguration {
xmlns: (),
cors_rules: cors
@ -50,16 +47,18 @@ pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<ResBody>, Error
}
}
pub async fn handle_delete_cors(
garage: Arc<Garage>,
mut bucket: Bucket,
) -> Result<Response<ResBody>, Error> {
let param = bucket
.params_mut()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
param.cors_config.update(None);
garage.bucket_table.insert(&bucket).await?;
pub async fn handle_delete_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
bucket_id,
mut bucket_params,
..
} = ctx;
bucket_params.cors_config.update(None);
garage
.bucket_table
.insert(&Bucket::present(bucket_id, bucket_params))
.await?;
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
@ -67,28 +66,33 @@ pub async fn handle_delete_cors(
}
pub async fn handle_put_cors(
garage: Arc<Garage>,
mut bucket: Bucket,
ctx: ReqCtx,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
bucket_id,
mut bucket_params,
..
} = ctx;
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let param = bucket
.params_mut()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
let conf: CorsConfiguration = from_reader(&body as &[u8])?;
conf.validate()?;
param
bucket_params
.cors_config
.update(Some(conf.into_garage_cors_config()?));
garage.bucket_table.insert(&bucket).await?;
garage
.bucket_table
.insert(&Bucket::present(bucket_id, bucket_params))
.await?;
Ok(Response::builder()
.status(StatusCode::OK)
@ -115,7 +119,8 @@ pub async fn handle_options_api(
let bucket_id = helper.resolve_global_bucket_name(&bn).await?;
if let Some(id) = bucket_id {
let bucket = garage.bucket_helper().get_existing_bucket(id).await?;
handle_options_for_bucket(req, &bucket)
let bucket_params = bucket.state.into_option().unwrap();
handle_options_for_bucket(req, &bucket_params)
} else {
// If there is a bucket name in the request, but that name
// does not correspond to a global alias for a bucket,
@ -145,7 +150,7 @@ pub async fn handle_options_api(
pub fn handle_options_for_bucket(
req: &Request<IncomingBody>,
bucket: &Bucket,
bucket_params: &BucketParams,
) -> Result<Response<EmptyBody>, CommonError> {
let origin = req
.headers()
@ -162,7 +167,7 @@ pub fn handle_options_for_bucket(
None => vec![],
};
if let Some(cors_config) = bucket.params().unwrap().cors_config.get() {
if let Some(cors_config) = bucket_params.cors_config.get() {
let matching_rule = cors_config
.iter()
.find(|rule| cors_rule_matches(rule, origin, request_method, request_headers.iter()));
@ -181,10 +186,10 @@ pub fn handle_options_for_bucket(
}
pub fn find_matching_cors_rule<'a>(
bucket: &'a Bucket,
bucket_params: &'a BucketParams,
req: &Request<impl Body>,
) -> Result<Option<&'a GarageCorsRule>, Error> {
if let Some(cors_config) = bucket.params().unwrap().cors_config.get() {
if let Some(cors_config) = bucket_params.cors_config.get() {
if let Some(origin) = req.headers().get("Origin") {
let origin = origin.to_str()?;
let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) {

View file

@ -1,11 +1,8 @@
use std::sync::Arc;
use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
use garage_util::data::*;
use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
use crate::helpers::*;
@ -15,14 +12,13 @@ use crate::s3::put::next_timestamp;
use crate::s3::xml as s3_xml;
use crate::signature::verify_signed_content;
async fn handle_delete_internal(
garage: &Garage,
bucket_id: Uuid,
key: &str,
) -> Result<(Uuid, Uuid), Error> {
async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid), Error> {
let ReqCtx {
garage, bucket_id, ..
} = ctx;
let object = garage
.object_table
.get(&bucket_id, &key.to_string())
.get(bucket_id, &key.to_string())
.await?
.ok_or(Error::NoSuchKey)?; // No need to delete
@ -44,7 +40,7 @@ async fn handle_delete_internal(
};
let object = Object::new(
bucket_id,
*bucket_id,
key.into(),
vec![ObjectVersion {
uuid: del_uuid,
@ -58,12 +54,8 @@ async fn handle_delete_internal(
Ok((deleted_version, del_uuid))
}
pub async fn handle_delete(
garage: Arc<Garage>,
bucket_id: Uuid,
key: &str,
) -> Result<Response<ResBody>, Error> {
match handle_delete_internal(&garage, bucket_id, key).await {
pub async fn handle_delete(ctx: ReqCtx, key: &str) -> Result<Response<ResBody>, Error> {
match handle_delete_internal(&ctx, key).await {
Ok(_) | Err(Error::NoSuchKey) => Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
.body(empty_body())
@ -73,8 +65,7 @@ pub async fn handle_delete(
}
pub async fn handle_delete_objects(
garage: Arc<Garage>,
bucket_id: Uuid,
ctx: ReqCtx,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
@ -91,7 +82,7 @@ pub async fn handle_delete_objects(
let mut ret_errors = Vec::new();
for obj in cmd.objects.iter() {
match handle_delete_internal(&garage, bucket_id, &obj.key).await {
match handle_delete_internal(&ctx, &obj.key).await {
Ok((deleted_version, delete_marker_version)) => {
if cmd.quiet {
continue;

View file

@ -131,6 +131,16 @@ fn try_answer_cached(
/// Handle HEAD request
pub async fn handle_head(
ctx: ReqCtx,
req: &Request<impl Body>,
key: &str,
part_number: Option<u64>,
) -> Result<Response<ResBody>, Error> {
handle_head_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number).await
}
/// Handle HEAD request for website
pub async fn handle_head_without_ctx(
garage: Arc<Garage>,
req: &Request<impl Body>,
bucket_id: Uuid,
@ -218,6 +228,17 @@ pub async fn handle_head(
/// Handle GET request
pub async fn handle_get(
ctx: ReqCtx,
req: &Request<impl Body>,
key: &str,
part_number: Option<u64>,
overrides: GetObjectOverrides,
) -> Result<Response<ResBody>, Error> {
handle_get_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number, overrides).await
}
/// Handle GET request
pub async fn handle_get_without_ctx(
garage: Arc<Garage>,
req: &Request<impl Body>,
bucket_id: Uuid,

View file

@ -1,5 +1,4 @@
use quick_xml::de::from_reader;
use std::sync::Arc;
use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
@ -16,15 +15,12 @@ use garage_model::bucket_table::{
parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration,
LifecycleFilter as GarageLifecycleFilter, LifecycleRule as GarageLifecycleRule,
};
use garage_model::garage::Garage;
use garage_util::data::*;
pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<ResBody>, Error> {
let param = bucket
.params()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
pub async fn handle_get_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
let ReqCtx { bucket_params, .. } = ctx;
if let Some(lifecycle) = param.lifecycle_config.get() {
if let Some(lifecycle) = bucket_params.lifecycle_config.get() {
let wc = LifecycleConfiguration::from_garage_lifecycle_config(lifecycle);
let xml = to_xml_with_header(&wc)?;
Ok(Response::builder()
@ -38,16 +34,18 @@ pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<ResBody>,
}
}
pub async fn handle_delete_lifecycle(
garage: Arc<Garage>,
mut bucket: Bucket,
) -> Result<Response<ResBody>, Error> {
let param = bucket
.params_mut()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
param.lifecycle_config.update(None);
garage.bucket_table.insert(&bucket).await?;
pub async fn handle_delete_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
bucket_id,
mut bucket_params,
..
} = ctx;
bucket_params.lifecycle_config.update(None);
garage
.bucket_table
.insert(&Bucket::present(bucket_id, bucket_params))
.await?;
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
@ -55,28 +53,33 @@ pub async fn handle_delete_lifecycle(
}
pub async fn handle_put_lifecycle(
garage: Arc<Garage>,
mut bucket: Bucket,
ctx: ReqCtx,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
bucket_id,
mut bucket_params,
..
} = ctx;
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let param = bucket
.params_mut()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
let conf: LifecycleConfiguration = from_reader(&body as &[u8])?;
let config = conf
.validate_into_garage_lifecycle_config()
.ok_or_bad_request("Invalid lifecycle configuration")?;
param.lifecycle_config.update(Some(config));
garage.bucket_table.insert(&bucket).await?;
bucket_params.lifecycle_config.update(Some(config));
garage
.bucket_table
.insert(&Bucket::present(bucket_id, bucket_params))
.await?;
Ok(Response::builder()
.status(StatusCode::OK)

View file

@ -1,6 +1,5 @@
use std::collections::{BTreeMap, BTreeSet};
use std::iter::{Iterator, Peekable};
use std::sync::Arc;
use base64::prelude::*;
use hyper::Response;
@ -9,7 +8,6 @@ use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::*;
use garage_model::garage::Garage;
use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
@ -62,9 +60,10 @@ pub struct ListPartsQuery {
}
pub async fn handle_list(
garage: Arc<Garage>,
ctx: ReqCtx,
query: &ListObjectsQuery,
) -> Result<Response<ResBody>, Error> {
let ReqCtx { garage, .. } = &ctx;
let io = |bucket, key, count| {
let t = &garage.object_table;
async move {
@ -167,9 +166,11 @@ pub async fn handle_list(
}
pub async fn handle_list_multipart_upload(
garage: Arc<Garage>,
ctx: ReqCtx,
query: &ListMultipartUploadsQuery,
) -> Result<Response<ResBody>, Error> {
let ReqCtx { garage, .. } = &ctx;
let io = |bucket, key, count| {
let t = &garage.object_table;
async move {
@ -269,15 +270,14 @@ pub async fn handle_list_multipart_upload(
}
pub async fn handle_list_parts(
garage: Arc<Garage>,
ctx: ReqCtx,
query: &ListPartsQuery,
) -> Result<Response<ResBody>, Error> {
debug!("ListParts {:?}", query);
let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?;
let (_, _, mpu) =
s3_multipart::get_upload(&garage, &query.bucket_id, &query.key, &upload_id).await?;
let (_, _, mpu) = s3_multipart::get_upload(&ctx, &query.key, &upload_id).await?;
let (info, next) = fetch_part_info(query, &mpu)?;

View file

@ -8,7 +8,6 @@ use md5::{Digest as Md5Digest, Md5};
use garage_table::*;
use garage_util::data::*;
use garage_model::bucket_table::Bucket;
use garage_model::garage::Garage;
use garage_model::s3::block_ref_table::*;
use garage_model::s3::mpu_table::*;
@ -25,12 +24,16 @@ use crate::signature::verify_signed_content;
// ----
pub async fn handle_create_multipart_upload(
garage: Arc<Garage>,
ctx: ReqCtx,
req: &Request<ReqBody>,
bucket_name: &str,
bucket_id: Uuid,
key: &String,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
bucket_id,
bucket_name,
..
} = &ctx;
let existing_object = garage.object_table.get(&bucket_id, &key).await?;
let upload_id = gen_uuid();
@ -47,13 +50,13 @@ pub async fn handle_create_multipart_upload(
headers,
},
};
let object = Object::new(bucket_id, key.to_string(), vec![object_version]);
let object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
garage.object_table.insert(&object).await?;
// Create multipart upload in mpu table
// This multipart upload will hold references to uploaded parts
// (which are entries in the Version table)
let mpu = MultipartUpload::new(upload_id, timestamp, bucket_id, key.into(), false);
let mpu = MultipartUpload::new(upload_id, timestamp, *bucket_id, key.into(), false);
garage.mpu_table.insert(&mpu).await?;
// Send success response
@ -69,14 +72,15 @@ pub async fn handle_create_multipart_upload(
}
pub async fn handle_put_part(
garage: Arc<Garage>,
ctx: ReqCtx,
req: Request<ReqBody>,
bucket_id: Uuid,
key: &str,
part_number: u64,
upload_id: &str,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx { garage, .. } = &ctx;
let upload_id = decode_upload_id(upload_id)?;
let content_md5 = match req.headers().get("content-md5") {
@ -90,10 +94,8 @@ pub async fn handle_put_part(
let stream = body_stream(req.into_body());
let mut chunker = StreamChunker::new(stream, garage.config.block_size);
let ((_, _, mut mpu), first_block) = futures::try_join!(
get_upload(&garage, &bucket_id, &key, &upload_id),
chunker.next(),
)?;
let ((_, _, mut mpu), first_block) =
futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?;
// Check object is valid and part can be accepted
let first_block = first_block.ok_or_bad_request("Empty body")?;
@ -135,7 +137,7 @@ pub async fn handle_put_part(
// Copy data to version
let (total_size, data_md5sum, data_sha256sum, _) =
read_and_put_blocks(&garage, &version, part_number, first_block, &mut chunker).await?;
read_and_put_blocks(&ctx, &version, part_number, first_block, &mut chunker).await?;
// Verify that checksums map
ensure_checksum_matches(
@ -200,14 +202,19 @@ impl Drop for InterruptedCleanup {
}
pub async fn handle_complete_multipart_upload(
garage: Arc<Garage>,
ctx: ReqCtx,
req: Request<ReqBody>,
bucket_name: &str,
bucket: &Bucket,
key: &str,
upload_id: &str,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
bucket_id,
bucket_name,
..
} = &ctx;
let body = http_body_util::BodyExt::collect(req.into_body())
.await?
.to_bytes();
@ -228,8 +235,7 @@ pub async fn handle_complete_multipart_upload(
// Get object and multipart upload
let key = key.to_string();
let (object, mut object_version, mpu) =
get_upload(&garage, &bucket.id, &key, &upload_id).await?;
let (object, mut object_version, mpu) = get_upload(&ctx, &key, &upload_id).await?;
if mpu.parts.is_empty() {
return Err(Error::bad_request("No data was uploaded"));
@ -283,7 +289,7 @@ pub async fn handle_complete_multipart_upload(
let mut final_version = Version::new(
upload_id,
VersionBacklink::Object {
bucket_id: bucket.id,
bucket_id: *bucket_id,
key: key.to_string(),
},
false,
@ -327,9 +333,9 @@ pub async fn handle_complete_multipart_upload(
// Calculate total size of final object
let total_size = parts.iter().map(|x| x.size.unwrap()).sum();
if let Err(e) = check_quotas(&garage, bucket, total_size, Some(&object)).await {
if let Err(e) = check_quotas(&ctx, total_size, Some(&object)).await {
object_version.state = ObjectVersionState::Aborted;
let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
return Err(e);
@ -345,7 +351,7 @@ pub async fn handle_complete_multipart_upload(
final_version.blocks.items()[0].1.hash,
));
let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
// Send response saying ok we're done
@ -362,18 +368,20 @@ pub async fn handle_complete_multipart_upload(
}
pub async fn handle_abort_multipart_upload(
garage: Arc<Garage>,
bucket_id: Uuid,
ctx: ReqCtx,
key: &str,
upload_id: &str,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
} = &ctx;
let upload_id = decode_upload_id(upload_id)?;
let (_, mut object_version, _) =
get_upload(&garage, &bucket_id, &key.to_string(), &upload_id).await?;
let (_, mut object_version, _) = get_upload(&ctx, &key.to_string(), &upload_id).await?;
object_version.state = ObjectVersionState::Aborted;
let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]);
let final_object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
Ok(Response::new(empty_body()))
@ -383,11 +391,13 @@ pub async fn handle_abort_multipart_upload(
#[allow(clippy::ptr_arg)]
pub(crate) async fn get_upload(
garage: &Garage,
bucket_id: &Uuid,
ctx: &ReqCtx,
key: &String,
upload_id: &Uuid,
) -> Result<(Object, ObjectVersion, MultipartUpload), Error> {
let ReqCtx {
garage, bucket_id, ..
} = ctx;
let (object, mpu) = futures::try_join!(
garage.object_table.get(bucket_id, key).map_err(Error::from),
garage

View file

@ -21,7 +21,7 @@ use crate::s3::cors::*;
use crate::s3::error::*;
use crate::s3::put::{get_headers, save_stream};
use crate::s3::xml as s3_xml;
use crate::signature::payload::{parse_date, verify_v4};
use crate::signature::payload::{verify_v4, Authorization};
pub async fn handle_post_object(
garage: Arc<Garage>,
@ -88,22 +88,11 @@ pub async fn handle_post_object(
.get("key")
.ok_or_bad_request("No key was provided")?
.to_str()?;
let credential = params
.get("x-amz-credential")
.ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?
.to_str()?;
let policy = params
.get("policy")
.ok_or_bad_request("No policy was provided")?
.to_str()?;
let signature = params
.get("x-amz-signature")
.ok_or_bad_request("No signature was provided")?
.to_str()?;
let date = params
.get("x-amz-date")
.ok_or_bad_request("No date was provided")?
.to_str()?;
let authorization = Authorization::parse_form(&params)?;
let key = if key.contains("${filename}") {
// if no filename is provided, don't replace. This matches the behavior of AWS.
@ -116,16 +105,7 @@ pub async fn handle_post_object(
key.to_owned()
};
let date = parse_date(date)?;
let api_key = verify_v4(
&garage,
"s3",
credential,
&date,
signature,
policy.as_bytes(),
)
.await?;
let api_key = verify_v4(&garage, "s3", &authorization, policy.as_bytes()).await?;
let bucket_id = garage
.bucket_helper()
@ -140,6 +120,12 @@ pub async fn handle_post_object(
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let bucket_params = bucket.state.into_option().unwrap();
let matching_cors_rule = find_matching_cors_rule(
&bucket_params,
&Request::from_parts(head.clone(), empty_body::<Infallible>()),
)?
.cloned();
let decoded_policy = BASE64_STANDARD
.decode(policy)
@ -233,11 +219,19 @@ pub async fn handle_post_object(
let headers = get_headers(&params)?;
let stream = field.map(|r| r.map_err(Into::into));
let (_, md5) = save_stream(
let ctx = ReqCtx {
garage,
bucket_id,
bucket_name,
bucket_params,
api_key,
};
let (_, md5) = save_stream(
&ctx,
headers,
StreamLimiter::new(stream, conditions.content_length),
&bucket,
&key,
None,
None,
@ -254,7 +248,7 @@ pub async fn handle_post_object(
{
target
.query_pairs_mut()
.append_pair("bucket", &bucket_name)
.append_pair("bucket", &ctx.bucket_name)
.append_pair("key", &key)
.append_pair("etag", &etag);
let target = target.to_string();
@ -298,7 +292,7 @@ pub async fn handle_post_object(
let xml = s3_xml::PostObject {
xmlns: (),
location: s3_xml::Value(location),
bucket: s3_xml::Value(bucket_name),
bucket: s3_xml::Value(ctx.bucket_name),
key: s3_xml::Value(key),
etag: s3_xml::Value(etag),
};
@ -311,12 +305,8 @@ pub async fn handle_post_object(
}
};
let matching_cors_rule = find_matching_cors_rule(
&bucket,
&Request::from_parts(head, empty_body::<Infallible>()),
)?;
if let Some(rule) = matching_cors_rule {
add_cors_headers(&mut resp, rule)
add_cors_headers(&mut resp, &rule)
.ok_or_internal_error("Invalid bucket CORS configuration")?;
}

View file

@ -28,7 +28,6 @@ use garage_util::error::Error as GarageError;
use garage_util::time::*;
use garage_block::manager::INLINE_THRESHOLD;
use garage_model::bucket_table::Bucket;
use garage_model::garage::Garage;
use garage_model::index_counter::CountedItem;
use garage_model::s3::block_ref_table::*;
@ -42,9 +41,8 @@ use crate::s3::error::*;
const PUT_BLOCKS_MAX_PARALLEL: usize = 3;
pub async fn handle_put(
garage: Arc<Garage>,
ctx: ReqCtx,
req: Request<ReqBody>,
bucket: &Bucket,
key: &String,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
@ -59,35 +57,27 @@ pub async fn handle_put(
let stream = body_stream(req.into_body());
save_stream(
garage,
headers,
stream,
bucket,
key,
content_md5,
content_sha256,
)
.await
.map(|(uuid, md5)| put_response(uuid, md5))
save_stream(&ctx, headers, stream, key, content_md5, content_sha256)
.await
.map(|(uuid, md5)| put_response(uuid, md5))
}
pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage: Arc<Garage>,
ctx: &ReqCtx,
headers: ObjectVersionHeaders,
body: S,
bucket: &Bucket,
key: &String,
content_md5: Option<String>,
content_sha256: Option<FixedBytes32>,
) -> Result<(Uuid, String), Error> {
let ReqCtx {
garage, bucket_id, ..
} = ctx;
let mut chunker = StreamChunker::new(body, garage.config.block_size);
let (first_block_opt, existing_object) = try_join!(
chunker.next(),
garage
.object_table
.get(&bucket.id, key)
.map_err(Error::from),
garage.object_table.get(bucket_id, key).map_err(Error::from),
)?;
let first_block = first_block_opt.unwrap_or_default();
@ -114,7 +104,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256,
)?;
check_quotas(&garage, bucket, size, existing_object.as_ref()).await?;
check_quotas(ctx, size, existing_object.as_ref()).await?;
let object_version = ObjectVersion {
uuid: version_uuid,
@ -129,7 +119,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
)),
};
let object = Object::new(bucket.id, key.into(), vec![object_version]);
let object = Object::new(*bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
return Ok((version_uuid, data_md5sum_hex));
@ -140,7 +130,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// before everything is finished (cleanup is done using the Drop trait).
let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner {
garage: garage.clone(),
bucket_id: bucket.id,
bucket_id: *bucket_id,
key: key.into(),
version_uuid,
version_timestamp,
@ -156,7 +146,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
multipart: false,
},
};
let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]);
let object = Object::new(*bucket_id, key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
// Initialize corresponding entry in version table
@ -166,7 +156,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let version = Version::new(
version_uuid,
VersionBacklink::Object {
bucket_id: bucket.id,
bucket_id: *bucket_id,
key: key.into(),
},
false,
@ -175,7 +165,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// Transfer data and verify checksum
let (total_size, data_md5sum, data_sha256sum, first_block_hash) =
read_and_put_blocks(&garage, &version, 1, first_block, &mut chunker).await?;
read_and_put_blocks(ctx, &version, 1, first_block, &mut chunker).await?;
ensure_checksum_matches(
data_md5sum.as_slice(),
@ -184,7 +174,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256,
)?;
check_quotas(&garage, bucket, total_size, existing_object.as_ref()).await?;
check_quotas(ctx, total_size, existing_object.as_ref()).await?;
// Save final object state, marked as Complete
let md5sum_hex = hex::encode(data_md5sum);
@ -196,7 +186,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
},
first_block_hash,
));
let object = Object::new(bucket.id, key.into(), vec![object_version]);
let object = Object::new(*bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
// We were not interrupted, everything went fine.
@ -235,12 +225,18 @@ pub(crate) fn ensure_checksum_matches(
/// Check that inserting this object with this size doesn't exceed bucket quotas
pub(crate) async fn check_quotas(
garage: &Arc<Garage>,
bucket: &Bucket,
ctx: &ReqCtx,
size: u64,
prev_object: Option<&Object>,
) -> Result<(), Error> {
let quotas = bucket.state.as_option().unwrap().quotas.get();
let ReqCtx {
garage,
bucket_id,
bucket_params,
..
} = ctx;
let quotas = bucket_params.quotas.get();
if quotas.max_objects.is_none() && quotas.max_size.is_none() {
return Ok(());
};
@ -248,7 +244,7 @@ pub(crate) async fn check_quotas(
let counters = garage
.object_counter_table
.table
.get(&bucket.id, &EmptyKey)
.get(bucket_id, &EmptyKey)
.await?;
let counters = counters
@ -292,7 +288,7 @@ pub(crate) async fn check_quotas(
}
pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage: &Garage,
ctx: &ReqCtx,
version: &Version,
part_number: u64,
first_block: Bytes,
@ -417,7 +413,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
let offset = written_bytes;
written_bytes += block.len() as u64;
write_futs.push_back(put_block_and_meta(
garage,
ctx,
version,
part_number,
offset,
@ -447,7 +443,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
}
async fn put_block_and_meta(
garage: &Garage,
ctx: &ReqCtx,
version: &Version,
part_number: u64,
offset: u64,
@ -455,6 +451,8 @@ async fn put_block_and_meta(
block: Bytes,
order_tag: OrderTag,
) -> Result<(), GarageError> {
let ReqCtx { garage, .. } = ctx;
let mut version = version.clone();
version.blocks.put(
VersionBlockKey {

View file

@ -1,5 +1,4 @@
use quick_xml::de::from_reader;
use std::sync::Arc;
use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
@ -12,15 +11,11 @@ use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
use garage_model::bucket_table::*;
use garage_model::garage::Garage;
use garage_util::data::*;
pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<ResBody>, Error> {
let param = bucket
.params()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
if let Some(website) = param.website_config.get() {
pub async fn handle_get_website(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
let ReqCtx { bucket_params, .. } = ctx;
if let Some(website) = bucket_params.website_config.get() {
let wc = WebsiteConfiguration {
xmlns: (),
error_document: website.error_document.as_ref().map(|v| Key {
@ -44,16 +39,18 @@ pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<ResBody>, Er
}
}
pub async fn handle_delete_website(
garage: Arc<Garage>,
mut bucket: Bucket,
) -> Result<Response<ResBody>, Error> {
let param = bucket
.params_mut()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
param.website_config.update(None);
garage.bucket_table.insert(&bucket).await?;
pub async fn handle_delete_website(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
bucket_id,
mut bucket_params,
..
} = ctx;
bucket_params.website_config.update(None);
garage
.bucket_table
.insert(&Bucket::present(bucket_id, bucket_params))
.await?;
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
@ -61,28 +58,33 @@ pub async fn handle_delete_website(
}
pub async fn handle_put_website(
garage: Arc<Garage>,
mut bucket: Bucket,
ctx: ReqCtx,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
bucket_id,
mut bucket_params,
..
} = ctx;
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let param = bucket
.params_mut()
.ok_or_internal_error("Bucket should not be deleted at this point")?;
let conf: WebsiteConfiguration = from_reader(&body as &[u8])?;
conf.validate()?;
param
bucket_params
.website_config
.update(Some(conf.into_garage_website_config()?));
garage.bucket_table.insert(&bucket).await?;
garage
.bucket_table
.insert(&Bucket::present(bucket_id, bucket_params))
.await?;
Ok(Response::builder()
.status(StatusCode::OK)

View file

@ -2,19 +2,44 @@ use chrono::{DateTime, Utc};
use hmac::{Hmac, Mac};
use sha2::Sha256;
use hyper::{body::Incoming as IncomingBody, Request};
use garage_model::garage::Garage;
use garage_model::key_table::Key;
use garage_util::data::{sha256sum, Hash};
use error::*;
pub mod error;
pub mod payload;
pub mod streaming;
use error::*;
pub const SHORT_DATE: &str = "%Y%m%d";
pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ";
type HmacSha256 = Hmac<Sha256>;
pub async fn verify_request(
garage: &Garage,
mut req: Request<IncomingBody>,
service: &'static str,
) -> Result<(Request<streaming::ReqBody>, Key, Option<Hash>), Error> {
let (api_key, mut content_sha256) =
payload::check_payload_signature(&garage, &mut req, service).await?;
let api_key =
api_key.ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?;
let req = streaming::parse_streaming_body(
&api_key,
req,
&mut content_sha256,
&garage.config.s3_api.s3_region,
service,
)?;
Ok((req, api_key, content_sha256))
}
pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> {
if expected_sha256 != sha256sum(body) {
return Err(Error::bad_request(

View file

@ -1,7 +1,9 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use chrono::{DateTime, Duration, NaiveDateTime, TimeZone, Utc};
use hmac::Mac;
use hyper::header::{HeaderMap, HeaderName, HeaderValue, AUTHORIZATION, HOST};
use hyper::{body::Incoming as IncomingBody, Method, Request};
use sha2::{Digest, Sha256};
@ -17,66 +19,98 @@ use super::{compute_scope, signing_hmac};
use crate::encoding::uri_encode;
use crate::signature::error::*;
pub const X_AMZ_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-algorithm");
pub const X_AMZ_CREDENTIAL: HeaderName = HeaderName::from_static("x-amz-credential");
pub const X_AMZ_DATE: HeaderName = HeaderName::from_static("x-amz-date");
pub const X_AMZ_EXPIRES: HeaderName = HeaderName::from_static("x-amz-expires");
pub const X_AMZ_SIGNEDHEADERS: HeaderName = HeaderName::from_static("x-amz-signedheaders");
pub const X_AMZ_SIGNATURE: HeaderName = HeaderName::from_static("x-amz-signature");
pub const X_AMZ_CONTENT_SH256: HeaderName = HeaderName::from_static("x-amz-content-sha256");
pub const AWS4_HMAC_SHA256: &str = "AWS4-HMAC-SHA256";
pub const UNSIGNED_PAYLOAD: &str = "UNSIGNED-PAYLOAD";
pub const STREAMING_AWS4_HMAC_SHA256_PAYLOAD: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD";
pub type QueryMap = HeaderMap<QueryValue>;
pub struct QueryValue {
/// Original key with potential uppercase characters,
/// for use in signature calculation
key: String,
value: String,
}
pub async fn check_payload_signature(
garage: &Garage,
request: &mut Request<IncomingBody>,
service: &'static str,
request: &Request<IncomingBody>,
) -> Result<(Option<Key>, Option<Hash>), Error> {
let mut headers = HashMap::new();
for (key, val) in request.headers() {
headers.insert(key.to_string(), val.to_str()?.to_string());
}
if let Some(query) = request.uri().query() {
let query_pairs = url::form_urlencoded::parse(query.as_bytes());
for (key, val) in query_pairs {
headers.insert(key.to_lowercase(), val.to_string());
}
}
let query = parse_query_map(request.uri())?;
let authorization = if let Some(authorization) = headers.get("authorization") {
parse_authorization(authorization, &headers)?
} else if let Some(algorithm) = headers.get("x-amz-algorithm") {
parse_query_authorization(algorithm, &headers)?
if query.contains_key(&X_AMZ_ALGORITHM) {
// We check for presigned-URL-style authentification first, because
// the browser or someting else could inject an Authorization header
// that is totally unrelated to AWS signatures.
check_presigned_signature(garage, service, request, query).await
} else if request.headers().contains_key(AUTHORIZATION) {
check_standard_signature(garage, service, request, query).await
} else {
let content_sha256 = headers.get("x-amz-content-sha256");
if let Some(content_sha256) = content_sha256.filter(|c| "UNSIGNED-PAYLOAD" != c.as_str()) {
// Unsigned (anonymous) request
let content_sha256 = request
.headers()
.get("x-amz-content-sha256")
.filter(|c| c.as_bytes() != UNSIGNED_PAYLOAD.as_bytes());
if let Some(content_sha256) = content_sha256 {
let sha256 = hex::decode(content_sha256)
.ok()
.and_then(|bytes| Hash::try_from(&bytes))
.ok_or_bad_request("Invalid content sha256 hash")?;
return Ok((None, Some(sha256)));
Ok((None, Some(sha256)))
} else {
return Ok((None, None));
Ok((None, None))
}
};
}
}
async fn check_standard_signature(
garage: &Garage,
service: &'static str,
request: &Request<IncomingBody>,
query: QueryMap,
) -> Result<(Option<Key>, Option<Hash>), Error> {
let authorization = Authorization::parse_header(request.headers())?;
// Verify that all necessary request headers are included in signed_headers
// The following must be included for all signatures:
// - the Host header (mandatory)
// - all x-amz-* headers used in the request
// AWS also indicates that the Content-Type header should be signed if
// it is used, but Minio client doesn't sign it so we don't check it for compatibility.
let signed_headers = split_signed_headers(&authorization)?;
verify_signed_headers(request.headers(), &signed_headers)?;
let canonical_request = canonical_request(
service,
request.method(),
request.uri(),
&headers,
&authorization.signed_headers,
request.uri().path(),
&query,
request.headers(),
&signed_headers,
&authorization.content_sha256,
)?;
let string_to_sign = string_to_sign(
&authorization.date,
&authorization.scope,
&canonical_request,
);
let (_, scope) = parse_credential(&authorization.credential)?;
let string_to_sign = string_to_sign(&authorization.date, &scope, &canonical_request);
trace!("canonical request:\n{}", canonical_request);
trace!("string to sign:\n{}", string_to_sign);
let key = verify_v4(
garage,
service,
&authorization.credential,
&authorization.date,
&authorization.signature,
string_to_sign.as_bytes(),
)
.await?;
let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes()).await?;
let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" {
let content_sha256 = if authorization.content_sha256 == UNSIGNED_PAYLOAD {
None
} else if authorization.content_sha256 == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" {
} else if authorization.content_sha256 == STREAMING_AWS4_HMAC_SHA256_PAYLOAD {
let bytes = hex::decode(authorization.signature).ok_or_bad_request("Invalid signature")?;
Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid signature")?)
} else {
@ -88,124 +122,102 @@ pub async fn check_payload_signature(
Ok((Some(key), content_sha256))
}
struct Authorization {
credential: String,
signed_headers: String,
signature: String,
content_sha256: String,
date: DateTime<Utc>,
async fn check_presigned_signature(
garage: &Garage,
service: &'static str,
request: &mut Request<IncomingBody>,
mut query: QueryMap,
) -> Result<(Option<Key>, Option<Hash>), Error> {
let algorithm = query.get(&X_AMZ_ALGORITHM).unwrap();
let authorization = Authorization::parse_presigned(&algorithm.value, &query)?;
// Verify that all necessary request headers are included in signed_headers
// For AWSv4 pre-signed URLs, the following must be incldued:
// - the Host header (mandatory)
// - all x-amz-* headers used in the request
let signed_headers = split_signed_headers(&authorization)?;
verify_signed_headers(request.headers(), &signed_headers)?;
// The X-Amz-Signature value is passed as a query parameter,
// but the signature cannot be computed from a string that contains itself.
// AWS specifies that all query params except X-Amz-Signature are included
// in the canonical request.
query.remove(&X_AMZ_SIGNATURE);
let canonical_request = canonical_request(
service,
request.method(),
request.uri().path(),
&query,
request.headers(),
&signed_headers,
&authorization.content_sha256,
)?;
let string_to_sign = string_to_sign(
&authorization.date,
&authorization.scope,
&canonical_request,
);
trace!("canonical request (presigned url):\n{}", canonical_request);
trace!("string to sign (presigned url):\n{}", string_to_sign);
let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes()).await?;
// In the page on presigned URLs, AWS specifies that if a signed query
// parameter and a signed header of the same name have different values,
// then an InvalidRequest error is raised.
let headers_mut = request.headers_mut();
for (name, value) in query.iter() {
if let Some(existing) = headers_mut.get(name) {
if signed_headers.contains(&name) && existing.as_bytes() != value.value.as_bytes() {
return Err(Error::bad_request(format!(
"Conflicting values for `{}` in query parameters and request headers",
name
)));
}
}
if name.as_str().starts_with("x-amz-") {
// Query parameters that start by x-amz- are actually intended to stand in for
// headers that can't be added at the time the request is made.
// What we do is just add them to the Request object as regular headers,
// that will be handled downstream as if they were included like in a normal request.
// (Here we allow such query parameters to override headers with the same name
// that are not signed, however there is not much reason that this would happen)
headers_mut.insert(
name,
HeaderValue::from_bytes(value.value.as_bytes())
.ok_or_bad_request("invalid query parameter value")?,
);
}
}
// Presigned URLs always use UNSIGNED-PAYLOAD,
// so there is no sha256 hash to return.
Ok((Some(key), None))
}
fn parse_authorization(
authorization: &str,
headers: &HashMap<String, String>,
) -> Result<Authorization, Error> {
let first_space = authorization
.find(' ')
.ok_or_bad_request("Authorization field to short")?;
let (auth_kind, rest) = authorization.split_at(first_space);
pub fn parse_query_map(uri: &http::uri::Uri) -> Result<QueryMap, Error> {
let mut query = QueryMap::with_capacity(0);
if let Some(query_str) = uri.query() {
let query_pairs = url::form_urlencoded::parse(query_str.as_bytes());
for (key, val) in query_pairs {
let name =
HeaderName::from_bytes(key.as_bytes()).ok_or_bad_request("Invalid header name")?;
if auth_kind != "AWS4-HMAC-SHA256" {
return Err(Error::bad_request("Unsupported authorization method"));
let value = QueryValue {
key: key.to_string(),
value: val.into_owned(),
};
if query.insert(name, value).is_some() {
return Err(Error::bad_request(format!(
"duplicate query parameter: `{}`",
key
)));
}
}
}
let mut auth_params = HashMap::new();
for auth_part in rest.split(',') {
let auth_part = auth_part.trim();
let eq = auth_part
.find('=')
.ok_or_bad_request("Field without value in authorization header")?;
let (key, value) = auth_part.split_at(eq);
auth_params.insert(key.to_string(), value.trim_start_matches('=').to_string());
}
let cred = auth_params
.get("Credential")
.ok_or_bad_request("Could not find Credential in Authorization field")?;
let content_sha256 = headers
.get("x-amz-content-sha256")
.ok_or_bad_request("Missing X-Amz-Content-Sha256 field")?;
let date = headers
.get("x-amz-date")
.ok_or_bad_request("Missing X-Amz-Date field")
.map_err(Error::from)
.and_then(|d| parse_date(d))?;
if Utc::now() - date > Duration::hours(24) {
return Err(Error::bad_request("Date is too old".to_string()));
}
let auth = Authorization {
credential: cred.to_string(),
signed_headers: auth_params
.get("SignedHeaders")
.ok_or_bad_request("Could not find SignedHeaders in Authorization field")?
.to_string(),
signature: auth_params
.get("Signature")
.ok_or_bad_request("Could not find Signature in Authorization field")?
.to_string(),
content_sha256: content_sha256.to_string(),
date,
};
Ok(auth)
}
fn parse_query_authorization(
algorithm: &str,
headers: &HashMap<String, String>,
) -> Result<Authorization, Error> {
if algorithm != "AWS4-HMAC-SHA256" {
return Err(Error::bad_request(
"Unsupported authorization method".to_string(),
));
}
let cred = headers
.get("x-amz-credential")
.ok_or_bad_request("X-Amz-Credential not found in query parameters")?;
let signed_headers = headers
.get("x-amz-signedheaders")
.ok_or_bad_request("X-Amz-SignedHeaders not found in query parameters")?;
let signature = headers
.get("x-amz-signature")
.ok_or_bad_request("X-Amz-Signature not found in query parameters")?;
let content_sha256 = headers
.get("x-amz-content-sha256")
.map(|x| x.as_str())
.unwrap_or("UNSIGNED-PAYLOAD");
let duration = headers
.get("x-amz-expires")
.ok_or_bad_request("X-Amz-Expires not found in query parameters")?
.parse()
.map_err(|_| Error::bad_request("X-Amz-Expires is not a number".to_string()))?;
if duration > 7 * 24 * 3600 {
return Err(Error::bad_request(
"X-Amz-Expires may not exceed a week".to_string(),
));
}
let date = headers
.get("x-amz-date")
.ok_or_bad_request("Missing X-Amz-Date field")
.map_err(Error::from)
.and_then(|d| parse_date(d))?;
if Utc::now() - date > Duration::seconds(duration) {
return Err(Error::bad_request("Date is too old".to_string()));
}
Ok(Authorization {
credential: cred.to_string(),
signed_headers: signed_headers.to_string(),
signature: signature.to_string(),
content_sha256: content_sha256.to_string(),
date,
})
Ok(query)
}
fn parse_credential(cred: &str) -> Result<(String, String), Error> {
@ -219,11 +231,39 @@ fn parse_credential(cred: &str) -> Result<(String, String), Error> {
))
}
fn split_signed_headers(authorization: &Authorization) -> Result<Vec<HeaderName>, Error> {
let mut signed_headers = authorization
.signed_headers
.split(';')
.map(HeaderName::try_from)
.collect::<Result<Vec<HeaderName>, _>>()
.ok_or_bad_request("invalid header name")?;
signed_headers.sort_by(|h1, h2| h1.as_str().cmp(h2.as_str()));
Ok(signed_headers)
}
fn verify_signed_headers(headers: &HeaderMap, signed_headers: &[HeaderName]) -> Result<(), Error> {
if !signed_headers.contains(&HOST) {
return Err(Error::bad_request("Header `Host` should be signed"));
}
for (name, _) in headers.iter() {
if name.as_str().starts_with("x-amz-") {
if !signed_headers.contains(name) {
return Err(Error::bad_request(format!(
"Header `{}` should be signed",
name
)));
}
}
}
Ok(())
}
pub fn string_to_sign(datetime: &DateTime<Utc>, scope_string: &str, canonical_req: &str) -> String {
let mut hasher = Sha256::default();
hasher.update(canonical_req.as_bytes());
[
"AWS4-HMAC-SHA256",
AWS4_HMAC_SHA256,
&datetime.format(LONG_DATETIME).to_string(),
scope_string,
&hex::encode(hasher.finalize().as_slice()),
@ -234,11 +274,12 @@ pub fn string_to_sign(datetime: &DateTime<Utc>, scope_string: &str, canonical_re
pub fn canonical_request(
service: &'static str,
method: &Method,
uri: &hyper::Uri,
headers: &HashMap<String, String>,
signed_headers: &str,
canonical_uri: &str,
query: &QueryMap,
headers: &HeaderMap,
signed_headers: &[HeaderName],
content_sha256: &str,
) -> String {
) -> Result<String, Error> {
// There seems to be evidence that in AWSv4 signatures, the path component is url-encoded
// a second time when building the canonical request, as specified in this documentation page:
// -> https://docs.aws.amazon.com/rolesanywhere/latest/userguide/authentication-sign-process.html
@ -268,49 +309,46 @@ pub fn canonical_request(
// it mentions it in the comments (same link to the souce code as above).
// We make the explicit choice of NOT normalizing paths in the K2V API because doing so
// would make non-normalized paths invalid K2V partition keys, and we don't want that.
let path: std::borrow::Cow<str> = if service != "s3" {
uri_encode(uri.path(), false).into()
let canonical_uri: std::borrow::Cow<str> = if service != "s3" {
uri_encode(canonical_uri, false).into()
} else {
uri.path().into()
canonical_uri.into()
};
[
method.as_str(),
&path,
&canonical_query_string(uri),
&canonical_header_string(headers, signed_headers),
"",
signed_headers,
content_sha256,
]
.join("\n")
}
fn canonical_header_string(headers: &HashMap<String, String>, signed_headers: &str) -> String {
let signed_headers_vec = signed_headers.split(';').collect::<Vec<_>>();
let mut items = headers
.iter()
.filter(|(key, _)| signed_headers_vec.contains(&key.as_str()))
.collect::<Vec<_>>();
items.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
items
.iter()
.map(|(key, value)| key.to_lowercase() + ":" + value.trim())
.collect::<Vec<_>>()
.join("\n")
}
fn canonical_query_string(uri: &hyper::Uri) -> String {
if let Some(query) = uri.query() {
let query_pairs = url::form_urlencoded::parse(query.as_bytes());
let mut items = query_pairs
.filter(|(key, _)| key != "X-Amz-Signature")
.map(|(key, value)| uri_encode(&key, true) + "=" + &uri_encode(&value, true))
.collect::<Vec<_>>();
// Canonical query string from passed HeaderMap
let canonical_query_string = {
let mut items = Vec::with_capacity(query.len());
for (_, QueryValue { key, value }) in query.iter() {
items.push(uri_encode(&key, true) + "=" + &uri_encode(&value, true));
}
items.sort();
items.join("&")
} else {
"".to_string()
}
};
// Canonical header string calculated from signed headers
let canonical_header_string = signed_headers
.iter()
.map(|name| {
let value = headers
.get(name)
.ok_or_bad_request(format!("signed header `{}` is not present", name))?
.to_str()?;
Ok(format!("{}:{}", name.as_str(), value.trim()))
})
.collect::<Result<Vec<String>, Error>>()?
.join("\n");
let signed_headers = signed_headers.join(";");
let list = [
method.as_str(),
&canonical_uri,
&canonical_query_string,
&canonical_header_string,
"",
&signed_headers,
content_sha256,
];
Ok(list.join("\n"))
}
pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> {
@ -322,38 +360,203 @@ pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> {
pub async fn verify_v4(
garage: &Garage,
service: &str,
credential: &str,
date: &DateTime<Utc>,
signature: &str,
auth: &Authorization,
payload: &[u8],
) -> Result<Key, Error> {
let (key_id, scope) = parse_credential(credential)?;
let scope_expected = compute_scope(date, &garage.config.s3_api.s3_region, service);
if scope != scope_expected {
return Err(Error::AuthorizationHeaderMalformed(scope.to_string()));
let scope_expected = compute_scope(&auth.date, &garage.config.s3_api.s3_region, service);
if auth.scope != scope_expected {
return Err(Error::AuthorizationHeaderMalformed(auth.scope.to_string()));
}
let key = garage
.key_table
.get(&EmptyKey, &key_id)
.get(&EmptyKey, &auth.key_id)
.await?
.filter(|k| !k.state.is_deleted())
.ok_or_else(|| Error::forbidden(format!("No such key: {}", &key_id)))?;
.ok_or_else(|| Error::forbidden(format!("No such key: {}", &auth.key_id)))?;
let key_p = key.params().unwrap();
let mut hmac = signing_hmac(
date,
&auth.date,
&key_p.secret_key,
&garage.config.s3_api.s3_region,
service,
)
.ok_or_internal_error("Unable to build signing HMAC")?;
hmac.update(payload);
let our_signature = hex::encode(hmac.finalize().into_bytes());
if signature != our_signature {
return Err(Error::forbidden("Invalid signature".to_string()));
let signature =
hex::decode(&auth.signature).map_err(|_| Error::forbidden("Invalid signature"))?;
if hmac.verify_slice(&signature).is_err() {
return Err(Error::forbidden("Invalid signature"));
}
Ok(key)
}
// ============ Authorization header, or X-Amz-* query params =========
pub struct Authorization {
key_id: String,
scope: String,
signed_headers: String,
signature: String,
content_sha256: String,
date: DateTime<Utc>,
}
impl Authorization {
fn parse_header(headers: &HeaderMap) -> Result<Self, Error> {
let authorization = headers
.get(AUTHORIZATION)
.ok_or_bad_request("Missing authorization header")?
.to_str()?;
let (auth_kind, rest) = authorization
.split_once(' ')
.ok_or_bad_request("Authorization field to short")?;
if auth_kind != AWS4_HMAC_SHA256 {
return Err(Error::bad_request("Unsupported authorization method"));
}
let mut auth_params = HashMap::new();
for auth_part in rest.split(',') {
let auth_part = auth_part.trim();
let eq = auth_part
.find('=')
.ok_or_bad_request("Field without value in authorization header")?;
let (key, value) = auth_part.split_at(eq);
auth_params.insert(key.to_string(), value.trim_start_matches('=').to_string());
}
let cred = auth_params
.get("Credential")
.ok_or_bad_request("Could not find Credential in Authorization field")?;
let signed_headers = auth_params
.get("SignedHeaders")
.ok_or_bad_request("Could not find SignedHeaders in Authorization field")?
.to_string();
let signature = auth_params
.get("Signature")
.ok_or_bad_request("Could not find Signature in Authorization field")?
.to_string();
let content_sha256 = headers
.get(X_AMZ_CONTENT_SH256)
.ok_or_bad_request("Missing X-Amz-Content-Sha256 field")?;
let date = headers
.get(X_AMZ_DATE)
.ok_or_bad_request("Missing X-Amz-Date field")
.map_err(Error::from)?
.to_str()?;
let date = parse_date(date)?;
if Utc::now() - date > Duration::hours(24) {
return Err(Error::bad_request("Date is too old".to_string()));
}
let (key_id, scope) = parse_credential(cred)?;
let auth = Authorization {
key_id,
scope,
signed_headers,
signature,
content_sha256: content_sha256.to_str()?.to_string(),
date,
};
Ok(auth)
}
fn parse_presigned(algorithm: &str, query: &QueryMap) -> Result<Self, Error> {
if algorithm != AWS4_HMAC_SHA256 {
return Err(Error::bad_request(
"Unsupported authorization method".to_string(),
));
}
let cred = query
.get(&X_AMZ_CREDENTIAL)
.ok_or_bad_request("X-Amz-Credential not found in query parameters")?;
let signed_headers = query
.get(&X_AMZ_SIGNEDHEADERS)
.ok_or_bad_request("X-Amz-SignedHeaders not found in query parameters")?;
let signature = query
.get(&X_AMZ_SIGNATURE)
.ok_or_bad_request("X-Amz-Signature not found in query parameters")?;
let duration = query
.get(&X_AMZ_EXPIRES)
.ok_or_bad_request("X-Amz-Expires not found in query parameters")?
.value
.parse()
.map_err(|_| Error::bad_request("X-Amz-Expires is not a number".to_string()))?;
if duration > 7 * 24 * 3600 {
return Err(Error::bad_request(
"X-Amz-Expires may not exceed a week".to_string(),
));
}
let date = query
.get(&X_AMZ_DATE)
.ok_or_bad_request("Missing X-Amz-Date field")?;
let date = parse_date(&date.value)?;
if Utc::now() - date > Duration::seconds(duration) {
return Err(Error::bad_request("Date is too old".to_string()));
}
let (key_id, scope) = parse_credential(&cred.value)?;
Ok(Authorization {
key_id,
scope,
signed_headers: signed_headers.value.clone(),
signature: signature.value.clone(),
content_sha256: UNSIGNED_PAYLOAD.to_string(),
date,
})
}
pub(crate) fn parse_form(params: &HeaderMap) -> Result<Self, Error> {
let algorithm = params
.get(X_AMZ_ALGORITHM)
.ok_or_bad_request("Missing X-Amz-Algorithm header")?
.to_str()?;
if algorithm != AWS4_HMAC_SHA256 {
return Err(Error::bad_request(
"Unsupported authorization method".to_string(),
));
}
let credential = params
.get(X_AMZ_CREDENTIAL)
.ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?
.to_str()?;
let signature = params
.get(X_AMZ_SIGNATURE)
.ok_or_bad_request("No signature was provided")?
.to_str()?
.to_string();
let date = params
.get(X_AMZ_DATE)
.ok_or_bad_request("No date was provided")?
.to_str()?;
let date = parse_date(date)?;
if Utc::now() - date > Duration::hours(24) {
return Err(Error::bad_request("Date is too old".to_string()));
}
let (key_id, scope) = parse_credential(credential)?;
let auth = Authorization {
key_id,
scope,
signed_headers: "".to_string(),
signature,
content_sha256: UNSIGNED_PAYLOAD.to_string(),
date,
};
Ok(auth)
}
}

View file

@ -15,6 +15,11 @@ use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME};
use crate::helpers::*;
use crate::signature::error::*;
use crate::signature::payload::{
STREAMING_AWS4_HMAC_SHA256_PAYLOAD, X_AMZ_CONTENT_SH256, X_AMZ_DATE,
};
pub const AWS4_HMAC_SHA256_PAYLOAD: &str = "AWS4-HMAC-SHA256-PAYLOAD";
pub type ReqBody = BoxBody<Error>;
@ -25,8 +30,8 @@ pub fn parse_streaming_body(
region: &str,
service: &str,
) -> Result<Request<ReqBody>, Error> {
match req.headers().get("x-amz-content-sha256") {
Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
match req.headers().get(X_AMZ_CONTENT_SH256) {
Some(header) if header == STREAMING_AWS4_HMAC_SHA256_PAYLOAD => {
let signature = content_sha256
.take()
.ok_or_bad_request("No signature provided")?;
@ -39,7 +44,7 @@ pub fn parse_streaming_body(
let date = req
.headers()
.get("x-amz-date")
.get(X_AMZ_DATE)
.ok_or_bad_request("Missing X-Amz-Date field")?
.to_str()?;
let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
@ -75,7 +80,7 @@ fn compute_streaming_payload_signature(
content_sha256: Hash,
) -> Result<Hash, Error> {
let string_to_sign = [
"AWS4-HMAC-SHA256-PAYLOAD",
AWS4_HMAC_SHA256_PAYLOAD,
&date.format(LONG_DATETIME).to_string(),
scope,
&hex::encode(previous_signature),

View file

@ -1,12 +1,15 @@
#![allow(dead_code)]
use std::collections::HashMap;
use std::convert::TryFrom;
use std::convert::{TryFrom, TryInto};
use chrono::{offset::Utc, DateTime};
use hmac::{Hmac, Mac};
use http_body_util::BodyExt;
use http_body_util::Full as FullBody;
use hyper::header::{
HeaderMap, HeaderName, HeaderValue, AUTHORIZATION, CONTENT_ENCODING, CONTENT_LENGTH, HOST,
};
use hyper::{Method, Request, Response, Uri};
use hyper_util::client::legacy::{connect::HttpConnector, Client};
use hyper_util::rt::TokioExecutor;
@ -61,6 +64,10 @@ impl CustomRequester {
vhost_style: false,
}
}
pub fn client(&self) -> &Client<HttpConnector, Body> {
&self.client
}
}
pub struct RequestBuilder<'a> {
@ -173,54 +180,85 @@ impl<'a> RequestBuilder<'a> {
.unwrap();
let streaming_signer = signer.clone();
let mut all_headers = self.signed_headers.clone();
let mut all_headers = self
.signed_headers
.iter()
.map(|(k, v)| {
(
HeaderName::try_from(k).expect("invalid header name"),
HeaderValue::try_from(v).expect("invalid header value"),
)
})
.collect::<HeaderMap>();
let date = now.format(signature::LONG_DATETIME).to_string();
all_headers.insert("x-amz-date".to_owned(), date);
all_headers.insert("host".to_owned(), host);
all_headers.insert(
signature::payload::X_AMZ_DATE,
HeaderValue::from_str(&date).unwrap(),
);
all_headers.insert(HOST, HeaderValue::from_str(&host).unwrap());
let body_sha = match self.body_signature {
BodySignature::Unsigned => "UNSIGNED-PAYLOAD".to_owned(),
BodySignature::Classic => hex::encode(garage_util::data::sha256sum(&self.body)),
BodySignature::Streaming(size) => {
all_headers.insert("content-encoding".to_owned(), "aws-chunked".to_owned());
all_headers.insert(
"x-amz-decoded-content-length".to_owned(),
self.body.len().to_string(),
CONTENT_ENCODING,
HeaderValue::from_str("aws-chunked").unwrap(),
);
all_headers.insert(
HeaderName::from_static("x-amz-decoded-content-length"),
HeaderValue::from_str(&self.body.len().to_string()).unwrap(),
);
// Get lenght of body by doing the conversion to a streaming body with an
// invalid signature (we don't know the seed) just to get its length. This
// is a pretty lazy and inefficient way to do it, but it's enought for test
// code.
all_headers.insert(
"content-length".to_owned(),
CONTENT_LENGTH,
to_streaming_body(&self.body, size, String::new(), signer.clone(), now, "")
.len()
.to_string(),
.to_string()
.try_into()
.unwrap(),
);
"STREAMING-AWS4-HMAC-SHA256-PAYLOAD".to_owned()
}
};
all_headers.insert("x-amz-content-sha256".to_owned(), body_sha.clone());
all_headers.insert(
signature::payload::X_AMZ_CONTENT_SH256,
HeaderValue::from_str(&body_sha).unwrap(),
);
let mut signed_headers = all_headers
.keys()
.map(|k| k.as_ref())
.collect::<Vec<&str>>();
signed_headers.sort();
let signed_headers = signed_headers.join(";");
let mut signed_headers = all_headers.keys().cloned().collect::<Vec<_>>();
signed_headers.sort_by(|h1, h2| h1.as_str().cmp(h2.as_str()));
let signed_headers_str = signed_headers
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(";");
all_headers.extend(self.unsigned_headers.clone());
all_headers.extend(self.unsigned_headers.iter().map(|(k, v)| {
(
HeaderName::try_from(k).expect("invalid header name"),
HeaderValue::try_from(v).expect("invalid header value"),
)
}));
let uri = Uri::try_from(&uri).unwrap();
let query = signature::payload::parse_query_map(&uri).unwrap();
let canonical_request = signature::payload::canonical_request(
self.service,
&self.method,
&Uri::try_from(&uri).unwrap(),
uri.path(),
&query,
&all_headers,
&signed_headers,
&body_sha,
);
)
.unwrap();
let string_to_sign = signature::payload::string_to_sign(&now, &scope, &canonical_request);
@ -228,14 +266,15 @@ impl<'a> RequestBuilder<'a> {
let signature = hex::encode(signer.finalize().into_bytes());
let authorization = format!(
"AWS4-HMAC-SHA256 Credential={}/{},SignedHeaders={},Signature={}",
self.requester.key.id, scope, signed_headers, signature
self.requester.key.id, scope, signed_headers_str, signature
);
all_headers.insert(
AUTHORIZATION,
HeaderValue::from_str(&authorization).unwrap(),
);
all_headers.insert("authorization".to_owned(), authorization);
let mut request = Request::builder();
for (k, v) in all_headers {
request = request.header(k, v);
}
*request.headers_mut().unwrap() = all_headers;
let body = if let BodySignature::Streaming(size) = self.body_signature {
to_streaming_body(&self.body, size, signature, streaming_signer, now, &scope)

View file

@ -1,6 +1,7 @@
mod list;
mod multipart;
mod objects;
mod presigned;
mod simple;
mod streaming_signature;
mod website;

View file

@ -0,0 +1,72 @@
use std::time::{Duration, SystemTime};
use crate::common;
use aws_sdk_s3::presigning::PresigningConfig;
use bytes::Bytes;
use http_body_util::{BodyExt, Full};
use hyper::Request;
const STD_KEY: &str = "hello world";
const BODY: &[u8; 62] = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
#[tokio::test]
async fn test_presigned_url() {
let ctx = common::context();
let bucket = ctx.create_bucket("presigned");
let etag = "\"46cf18a9b447991b450cad3facf5937e\"";
let body = Bytes::from(BODY.to_vec());
let psc = PresigningConfig::builder()
.start_time(SystemTime::now() - Duration::from_secs(60))
.expires_in(Duration::from_secs(3600))
.build()
.unwrap();
{
// PutObject
let req = ctx
.client
.put_object()
.bucket(&bucket)
.key(STD_KEY)
.presigned(psc.clone())
.await
.unwrap();
let client = ctx.custom_request.client();
let req = Request::builder()
.method("PUT")
.uri(req.uri())
.body(Full::new(body.clone()))
.unwrap();
let res = client.request(req).await.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.headers().get("etag").unwrap(), etag);
}
{
// GetObject
let req = ctx
.client
.get_object()
.bucket(&bucket)
.key(STD_KEY)
.presigned(psc)
.await
.unwrap();
let client = ctx.custom_request.client();
let req = Request::builder()
.method("GET")
.uri(req.uri())
.body(Full::new(Bytes::new()))
.unwrap();
let res = client.request(req).await.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.headers().get("etag").unwrap(), etag);
let body2 = BodyExt::collect(res.into_body()).await.unwrap().to_bytes();
assert_eq!(body, body2);
}
}

View file

@ -26,7 +26,7 @@ async fn test_putobject_streaming() {
.builder(bucket.clone())
.method(Method::PUT)
.path(STD_KEY.to_owned())
.unsigned_headers(headers)
.signed_headers(headers)
.vhost_style(true)
.body(vec![])
.body_signature(BodySignature::Streaming(10))

View file

@ -191,6 +191,13 @@ impl Bucket {
}
}
pub fn present(id: Uuid, params: BucketParams) -> Self {
Bucket {
id,
state: crdt::Deletable::present(params),
}
}
/// Returns true if this represents a deleted bucket
pub fn is_deleted(&self) -> bool {
self.state.is_deleted()

View file

@ -3,7 +3,7 @@ name = "garage_net"
version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license-file = "AGPL-3.0"
license = "AGPL-3.0"
description = "Networking library for Garage RPC communication, forked from Netapp"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
readme = "../../README.md"

View file

@ -26,7 +26,7 @@ use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_opt
use garage_api::s3::error::{
CommonErrorDerivative, Error as ApiError, OkOrBadRequest, OkOrInternalError,
};
use garage_api::s3::get::{handle_get, handle_head};
use garage_api::s3::get::{handle_get_without_ctx, handle_head_without_ctx};
use garage_model::garage::Garage;
@ -219,14 +219,13 @@ impl WebServer {
// Check bucket isn't deleted and has website access enabled
let bucket = self
.garage
.bucket_table
.get(&EmptyKey, &bucket_id)
.await?
.ok_or(Error::NotFound)?;
.bucket_helper()
.get_existing_bucket(bucket_id)
.await
.map_err(|_| Error::NotFound)?;
let bucket_params = bucket.state.into_option().unwrap();
let website_config = bucket
.params()
.ok_or(Error::NotFound)?
let website_config = bucket_params
.website_config
.get()
.as_ref()
@ -243,14 +242,16 @@ impl WebServer {
);
let ret_doc = match *req.method() {
Method::OPTIONS => handle_options_for_bucket(req, &bucket)
Method::OPTIONS => handle_options_for_bucket(req, &bucket_params)
.map_err(ApiError::from)
.map(|res| res.map(|_empty_body: EmptyBody| empty_body())),
Method::HEAD => handle_head(self.garage.clone(), &req, bucket_id, &key, None).await,
Method::HEAD => {
handle_head_without_ctx(self.garage.clone(), req, bucket_id, &key, None).await
}
Method::GET => {
handle_get(
handle_get_without_ctx(
self.garage.clone(),
&req,
req,
bucket_id,
&key,
None,
@ -301,7 +302,7 @@ impl WebServer {
.body(empty_body::<Infallible>())
.unwrap();
match handle_get(
match handle_get_without_ctx(
self.garage.clone(),
&req2,
bucket_id,
@ -344,7 +345,7 @@ impl WebServer {
}
Ok(mut resp) => {
// Maybe add CORS headers
if let Some(rule) = find_matching_cors_rule(&bucket, req)? {
if let Some(rule) = find_matching_cors_rule(&bucket_params, req)? {
add_cors_headers(&mut resp, rule)
.ok_or_internal_error("Invalid bucket CORS configuration")?;
}