Compare commits

..

22 commits

Author SHA1 Message Date
5bf58bd539
Fix resync queue to not drop items 2022-03-01 14:55:37 +01:00
2f9d606bd6
Spawn a single resync worker 2022-03-01 12:04:34 +01:00
9125a57234
Add appropriate fsync() calls in write_block
to ensure that data is persisted properly
2022-03-01 12:04:31 +01:00
68b2693cd4
Update Grafana dashboard 2022-03-01 12:02:07 +01:00
6059ce31c8
Implement exponential backoff for resync retries 2022-03-01 12:02:07 +01:00
8c2a8ecd98
Add Grafana dashboard for Garage 2022-03-01 11:59:58 +01:00
34a557e6a6
Add wrapper over sled tree to count items (used for big queues) 2022-03-01 11:59:58 +01:00
50096bc460
Bump version to 0.7 because of incompatible Netapp 2022-03-01 11:59:58 +01:00
238e17aa0b
Add spans to table calls, change span names in RPC 2022-03-01 11:59:58 +01:00
897baf8d75
add missing netapp telemetry feature 2022-03-01 11:59:58 +01:00
84110d682e
Refactoring: rename config files, make modifications less invasive 2022-03-01 11:59:58 +01:00
44edc0448b
Add metrics to web endpoint 2022-03-01 11:59:57 +01:00
0d04799694
Add metrics to API endpoint 2022-03-01 11:59:56 +01:00
c4a9a8e0bb
Refactor how durations are measured 2022-03-01 11:59:04 +01:00
547f175410
Remove strum crate dependency; add protobuf nix dependency 2022-03-01 11:59:03 +01:00
159047c6f6
Remove ... at end of hex IDs 2022-03-01 11:58:37 +01:00
12215a1997
Update to Netapp 0.4 which supports distributed tracing 2022-03-01 11:58:37 +01:00
3549a760f1
Add tracing integration with opentelemetry 2022-03-01 11:58:35 +01:00
57c0674323
Add docker-compose for traces & metrics 2022-03-01 11:58:01 +01:00
3a237a1336
Add many metrics in table/ and rpc/ 2022-03-01 11:58:01 +01:00
c51663073b
Implement basic metrics in table 2022-03-01 11:58:01 +01:00
mricher
95bff404bc
Update dependencies and add admin module with metrics
- Global dependencies updated in Cargo.lock
- New module created in src/admin to host:
  - the (future) admin REST API
  - the metric collection
- add configuration block

No metrics implemented yet
2022-03-01 11:58:01 +01:00
50 changed files with 5558 additions and 311 deletions

551
Cargo.lock generated
View file

@ -11,6 +11,12 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "anyhow"
version = "1.0.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "159bb86af3a200e19a068f4224eae4c8bb2d0fa054c7e5d1cacd5cef95e684cd"
[[package]] [[package]]
name = "arc-swap" name = "arc-swap"
version = "1.4.0" version = "1.4.0"
@ -23,6 +29,27 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]]
name = "async-stream"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625"
dependencies = [
"async-stream-impl",
"futures-core",
]
[[package]]
name = "async-stream-impl"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.51" version = "0.1.51"
@ -368,6 +395,15 @@ dependencies = [
"unicode-width", "unicode-width",
] ]
[[package]]
name = "cloudabi"
version = "0.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
dependencies = [
"bitflags",
]
[[package]] [[package]]
name = "core-foundation" name = "core-foundation"
version = "0.9.2" version = "0.9.2"
@ -402,6 +438,16 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "crossbeam-channel"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e54ea8bc3fb1ee042f5aace6e3c6e025d3874866da222930f70ce62aceba0bfa"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]] [[package]]
name = "crossbeam-epoch" name = "crossbeam-epoch"
version = "0.9.5" version = "0.9.5"
@ -454,6 +500,16 @@ dependencies = [
"sct", "sct",
] ]
[[package]]
name = "dashmap"
version = "4.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c"
dependencies = [
"cfg-if",
"num_cpus",
]
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.9.0" version = "0.9.0"
@ -528,6 +584,12 @@ dependencies = [
"instant", "instant",
] ]
[[package]]
name = "fixedbitset"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e"
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@ -554,6 +616,12 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.17" version = "0.3.17"
@ -659,27 +727,27 @@ dependencies = [
[[package]] [[package]]
name = "garage" name = "garage"
version = "0.6.0" version = "0.7.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"aws-sdk-s3", "aws-sdk-s3",
"bytes 1.1.0", "bytes 1.1.0",
"futures", "futures",
"futures-util", "futures-util",
"garage_admin",
"garage_api", "garage_api",
"garage_model 0.6.0", "garage_model 0.7.0",
"garage_rpc 0.6.0", "garage_rpc 0.7.0",
"garage_table 0.6.0", "garage_table 0.7.0",
"garage_util 0.6.0", "garage_util 0.7.0",
"garage_web", "garage_web",
"git-version", "git-version",
"hex", "hex",
"http", "http",
"kuska-sodiumoxide", "kuska-sodiumoxide",
"log", "netapp 0.4.0",
"netapp",
"pretty_env_logger", "pretty_env_logger",
"rand", "rand 0.8.4",
"rmp-serde 0.15.5", "rmp-serde 0.15.5",
"serde", "serde",
"serde_bytes", "serde_bytes",
@ -688,11 +756,29 @@ dependencies = [
"structopt", "structopt",
"tokio", "tokio",
"toml", "toml",
"tracing",
]
[[package]]
name = "garage_admin"
version = "0.7.0"
dependencies = [
"futures",
"futures-util",
"garage_util 0.7.0",
"hex",
"http",
"hyper",
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry-prometheus",
"prometheus",
"tracing",
] ]
[[package]] [[package]]
name = "garage_api" name = "garage_api"
version = "0.6.0" version = "0.7.0"
dependencies = [ dependencies = [
"base64", "base64",
"bytes 1.1.0", "bytes 1.1.0",
@ -702,9 +788,9 @@ dependencies = [
"form_urlencoded", "form_urlencoded",
"futures", "futures",
"futures-util", "futures-util",
"garage_model 0.6.0", "garage_model 0.7.0",
"garage_table 0.6.0", "garage_table 0.7.0",
"garage_util 0.6.0", "garage_util 0.7.0",
"hex", "hex",
"hmac", "hmac",
"http", "http",
@ -712,10 +798,10 @@ dependencies = [
"httpdate 0.3.2", "httpdate 0.3.2",
"hyper", "hyper",
"idna", "idna",
"log",
"md-5", "md-5",
"multer", "multer",
"nom", "nom",
"opentelemetry",
"percent-encoding", "percent-encoding",
"pin-project", "pin-project",
"quick-xml", "quick-xml",
@ -725,6 +811,7 @@ dependencies = [
"serde_json", "serde_json",
"sha2", "sha2",
"tokio", "tokio",
"tracing",
"url", "url",
] ]
@ -743,8 +830,8 @@ dependencies = [
"garage_util 0.5.1", "garage_util 0.5.1",
"hex", "hex",
"log", "log",
"netapp", "netapp 0.3.1",
"rand", "rand 0.8.4",
"rmp-serde 0.15.5", "rmp-serde 0.15.5",
"serde", "serde",
"serde_bytes", "serde_bytes",
@ -755,7 +842,7 @@ dependencies = [
[[package]] [[package]]
name = "garage_model" name = "garage_model"
version = "0.6.0" version = "0.7.0"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
@ -763,18 +850,19 @@ dependencies = [
"futures", "futures",
"futures-util", "futures-util",
"garage_model 0.5.1", "garage_model 0.5.1",
"garage_rpc 0.6.0", "garage_rpc 0.7.0",
"garage_table 0.6.0", "garage_table 0.7.0",
"garage_util 0.6.0", "garage_util 0.7.0",
"hex", "hex",
"log", "netapp 0.4.0",
"netapp", "opentelemetry",
"rand", "rand 0.8.4",
"rmp-serde 0.15.5", "rmp-serde 0.15.5",
"serde", "serde",
"serde_bytes", "serde_bytes",
"sled", "sled",
"tokio", "tokio",
"tracing",
"zstd", "zstd",
] ]
@ -795,8 +883,8 @@ dependencies = [
"hyper", "hyper",
"kuska-sodiumoxide", "kuska-sodiumoxide",
"log", "log",
"netapp", "netapp 0.3.1",
"rand", "rand 0.8.4",
"rmp-serde 0.15.5", "rmp-serde 0.15.5",
"serde", "serde",
"serde_bytes", "serde_bytes",
@ -807,27 +895,29 @@ dependencies = [
[[package]] [[package]]
name = "garage_rpc" name = "garage_rpc"
version = "0.6.0" version = "0.7.0"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
"bytes 1.1.0", "bytes 1.1.0",
"futures", "futures",
"futures-util", "futures-util",
"garage_util 0.6.0", "garage_admin",
"garage_util 0.7.0",
"gethostname", "gethostname",
"hex", "hex",
"hyper", "hyper",
"kuska-sodiumoxide", "kuska-sodiumoxide",
"log", "netapp 0.4.0",
"netapp", "opentelemetry",
"rand", "rand 0.8.4",
"rmp-serde 0.15.5", "rmp-serde 0.15.5",
"serde", "serde",
"serde_bytes", "serde_bytes",
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tracing",
] ]
[[package]] [[package]]
@ -844,7 +934,7 @@ dependencies = [
"garage_util 0.5.1", "garage_util 0.5.1",
"hexdump", "hexdump",
"log", "log",
"rand", "rand 0.8.4",
"rmp-serde 0.15.5", "rmp-serde 0.15.5",
"serde", "serde",
"serde_bytes", "serde_bytes",
@ -854,22 +944,23 @@ dependencies = [
[[package]] [[package]]
name = "garage_table" name = "garage_table"
version = "0.6.0" version = "0.7.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bytes 1.1.0", "bytes 1.1.0",
"futures", "futures",
"futures-util", "futures-util",
"garage_rpc 0.6.0", "garage_rpc 0.7.0",
"garage_util 0.6.0", "garage_util 0.7.0",
"hexdump", "hexdump",
"log", "opentelemetry",
"rand", "rand 0.8.4",
"rmp-serde 0.15.5", "rmp-serde 0.15.5",
"serde", "serde",
"serde_bytes", "serde_bytes",
"sled", "sled",
"tokio", "tokio",
"tracing",
] ]
[[package]] [[package]]
@ -886,8 +977,8 @@ dependencies = [
"http", "http",
"hyper", "hyper",
"log", "log",
"netapp", "netapp 0.3.1",
"rand", "rand 0.8.4",
"rmp-serde 0.15.5", "rmp-serde 0.15.5",
"serde", "serde",
"serde_json", "serde_json",
@ -900,7 +991,7 @@ dependencies = [
[[package]] [[package]]
name = "garage_util" name = "garage_util"
version = "0.6.0" version = "0.7.0"
dependencies = [ dependencies = [
"blake2", "blake2",
"chrono", "chrono",
@ -909,9 +1000,9 @@ dependencies = [
"hex", "hex",
"http", "http",
"hyper", "hyper",
"log", "netapp 0.4.0",
"netapp", "opentelemetry",
"rand", "rand 0.8.4",
"rmp-serde 0.15.5", "rmp-serde 0.15.5",
"serde", "serde",
"serde_json", "serde_json",
@ -919,23 +1010,25 @@ dependencies = [
"sled", "sled",
"tokio", "tokio",
"toml", "toml",
"tracing",
"xxhash-rust", "xxhash-rust",
] ]
[[package]] [[package]]
name = "garage_web" name = "garage_web"
version = "0.6.0" version = "0.7.0"
dependencies = [ dependencies = [
"err-derive 0.3.0", "err-derive 0.3.0",
"futures", "futures",
"garage_api", "garage_api",
"garage_model 0.6.0", "garage_model 0.7.0",
"garage_table 0.6.0", "garage_table 0.7.0",
"garage_util 0.6.0", "garage_util 0.7.0",
"http", "http",
"hyper", "hyper",
"log", "opentelemetry",
"percent-encoding", "percent-encoding",
"tracing",
] ]
[[package]] [[package]]
@ -1047,7 +1140,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e40283dadb02f3af778878be1d717b17b4e4ab92e1d935ab03a730b0542905f2" checksum = "e40283dadb02f3af778878be1d717b17b4e4ab92e1d935ab03a730b0542905f2"
dependencies = [ dependencies = [
"arrayvec", "arrayvec",
"itertools", "itertools 0.4.19",
] ]
[[package]] [[package]]
@ -1090,9 +1183,9 @@ checksum = "eee9694f83d9b7c09682fdb32213682939507884e5bcf227be9aff5d644b90dc"
[[package]] [[package]]
name = "httparse" name = "httparse"
version = "1.5.1" version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503" checksum = "9100414882e15fb7feccb4897e5f0ff0ff1ca7d1a86a23208ada4d7a18e6c6c4"
[[package]] [[package]]
name = "httpdate" name = "httpdate"
@ -1117,9 +1210,9 @@ dependencies = [
[[package]] [[package]]
name = "hyper" name = "hyper"
version = "0.14.13" version = "0.14.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15d1cfb9e4f68655fa04c01f59edb405b6074a0f7118ea881e5026e4a1cd8593" checksum = "043f0e083e9901b6cc658a77d1eb86f4fc650bbb977a4337dd63192826aa85dd"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.1.0",
"futures-channel", "futures-channel",
@ -1130,7 +1223,7 @@ dependencies = [
"http-body", "http-body",
"httparse", "httparse",
"httpdate 1.0.1", "httpdate 1.0.1",
"itoa 0.4.8", "itoa 1.0.1",
"pin-project-lite", "pin-project-lite",
"socket2", "socket2",
"tokio", "tokio",
@ -1156,6 +1249,18 @@ dependencies = [
"webpki", "webpki",
] ]
[[package]]
name = "hyper-timeout"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
dependencies = [
"hyper",
"pin-project-lite",
"tokio",
"tokio-io-timeout",
]
[[package]] [[package]]
name = "idna" name = "idna"
version = "0.2.3" version = "0.2.3"
@ -1192,6 +1297,15 @@ version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a9b56eb56058f43dc66e58f40a214b2ccbc9f3df51861b63d51dec7b65bc3f" checksum = "c4a9b56eb56058f43dc66e58f40a214b2ccbc9f3df51861b63d51dec7b65bc3f"
[[package]]
name = "itertools"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3"
dependencies = [
"either",
]
[[package]] [[package]]
name = "itoa" name = "itoa"
version = "0.4.8" version = "0.4.8"
@ -1379,10 +1493,16 @@ dependencies = [
] ]
[[package]] [[package]]
name = "netapp" name = "multimap"
version = "0.3.0" version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d202ff23744499a8e5d89a3199ca7b9a8aef9980a23b1b09d6508a8db2a118d2" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "netapp"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac7dcd2c6c5a9d5ea88ffc17d3339d49d308e68c4d8190c494b55ddbf78d80ad"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
@ -1400,6 +1520,32 @@ dependencies = [
"tokio-util", "tokio-util",
] ]
[[package]]
name = "netapp"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22c545a13b0c47b47e8052b35c4884dbe33c9ea62607371b0f4f1b0490cafd38"
dependencies = [
"arc-swap",
"async-trait",
"bytes 0.6.0",
"cfg-if",
"err-derive 0.2.4",
"futures",
"hex",
"kuska-handshake",
"kuska-sodiumoxide",
"log",
"opentelemetry",
"opentelemetry-contrib",
"rand 0.5.6",
"rmp-serde 0.14.4",
"serde",
"tokio",
"tokio-stream",
"tokio-util",
]
[[package]] [[package]]
name = "nom" name = "nom"
version = "7.1.0" version = "7.1.0"
@ -1476,6 +1622,68 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "opentelemetry"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6105e89802af13fdf48c49d7646d3b533a70e536d818aae7e78ba0433d01acb8"
dependencies = [
"async-trait",
"crossbeam-channel",
"dashmap",
"fnv",
"futures-channel",
"futures-executor",
"futures-util",
"js-sys",
"lazy_static",
"percent-encoding",
"pin-project",
"rand 0.8.4",
"thiserror",
"tokio",
"tokio-stream",
]
[[package]]
name = "opentelemetry-contrib"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85637add8f60bb4cac673469c14f47a329c6cec7365c72d72cd32f2d104a721a"
dependencies = [
"lazy_static",
"opentelemetry",
]
[[package]]
name = "opentelemetry-otlp"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d1a6ca9de4c8b00aa7f1a153bd76cb263287155cec642680d79d98706f3d28a"
dependencies = [
"async-trait",
"futures",
"futures-util",
"http",
"opentelemetry",
"prost",
"thiserror",
"tokio",
"tonic",
"tonic-build",
]
[[package]]
name = "opentelemetry-prometheus"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9328977e479cebe12ce0d3fcecdaea4721d234895a9440c5b5dfd113f0594ac6"
dependencies = [
"opentelemetry",
"prometheus",
"protobuf",
]
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.11.2" version = "0.11.2"
@ -1507,6 +1715,16 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "petgraph"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a13a2fa9d0b63e5f22328828741e523766fff0ee9e779316902290dff3f824f"
dependencies = [
"fixedbitset",
"indexmap",
]
[[package]] [[package]]
name = "pin-project" name = "pin-project"
version = "1.0.8" version = "1.0.8"
@ -1606,6 +1824,80 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "prometheus"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7f64969ffd5dd8f39bd57a68ac53c163a095ed9d0fb707146da1b27025a3504"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"memchr",
"parking_lot",
"protobuf",
"thiserror",
]
[[package]]
name = "prost"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001"
dependencies = [
"bytes 1.1.0",
"prost-derive",
]
[[package]]
name = "prost-build"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"
dependencies = [
"bytes 1.1.0",
"heck",
"itertools 0.10.3",
"lazy_static",
"log",
"multimap",
"petgraph",
"prost",
"prost-types",
"regex",
"tempfile",
"which",
]
[[package]]
name = "prost-derive"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9cc1a3263e07e0bf68e96268f37665207b49560d98739662cdfaae215c720fe"
dependencies = [
"anyhow",
"itertools 0.10.3",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-types"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a"
dependencies = [
"bytes 1.1.0",
"prost",
]
[[package]]
name = "protobuf"
version = "2.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf7e6d18738ecd0902d30d1ad232c9125985a3422929b16c65517b38adc14f96"
[[package]] [[package]]
name = "quick-error" name = "quick-error"
version = "1.2.3" version = "1.2.3"
@ -1631,6 +1923,19 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "rand"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c618c47cd3ebd209790115ab837de41425723956ad3ce2e6a7f09890947cacb9"
dependencies = [
"cloudabi",
"fuchsia-cprng",
"libc",
"rand_core 0.3.1",
"winapi",
]
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.8.4" version = "0.8.4"
@ -1639,7 +1944,7 @@ checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8"
dependencies = [ dependencies = [
"libc", "libc",
"rand_chacha", "rand_chacha",
"rand_core", "rand_core 0.6.3",
"rand_hc", "rand_hc",
] ]
@ -1650,9 +1955,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [ dependencies = [
"ppv-lite86", "ppv-lite86",
"rand_core", "rand_core 0.6.3",
] ]
[[package]]
name = "rand_core"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b"
dependencies = [
"rand_core 0.4.2",
]
[[package]]
name = "rand_core"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc"
[[package]] [[package]]
name = "rand_core" name = "rand_core"
version = "0.6.3" version = "0.6.3"
@ -1668,7 +1988,7 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7"
dependencies = [ dependencies = [
"rand_core", "rand_core 0.6.3",
] ]
[[package]] [[package]]
@ -1697,6 +2017,15 @@ version = "0.6.25"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]]
name = "remove_dir_all"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
dependencies = [
"winapi",
]
[[package]] [[package]]
name = "ring" name = "ring"
version = "0.16.20" version = "0.16.20"
@ -2056,6 +2385,20 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "tempfile"
version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4"
dependencies = [
"cfg-if",
"fastrand",
"libc",
"redox_syscall",
"remove_dir_all",
"winapi",
]
[[package]] [[package]]
name = "termcolor" name = "termcolor"
version = "1.1.2" version = "1.1.2"
@ -2142,12 +2485,23 @@ dependencies = [
"mio", "mio",
"num_cpus", "num_cpus",
"once_cell", "once_cell",
"parking_lot",
"pin-project-lite", "pin-project-lite",
"signal-hook-registry", "signal-hook-registry",
"tokio-macros", "tokio-macros",
"winapi", "winapi",
] ]
[[package]]
name = "tokio-io-timeout"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf"
dependencies = [
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "1.5.0" version = "1.5.0"
@ -2205,6 +2559,49 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "tonic"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff08f4649d10a70ffa3522ca559031285d8e421d727ac85c60825761818f5d0a"
dependencies = [
"async-stream",
"async-trait",
"base64",
"bytes 1.1.0",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"prost-derive",
"tokio",
"tokio-stream",
"tokio-util",
"tower",
"tower-layer",
"tower-service",
"tracing",
"tracing-futures",
]
[[package]]
name = "tonic-build"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9403f1bafde247186684b230dc6f38b5cd514584e8bec1dd32514be4745fa757"
dependencies = [
"proc-macro2",
"prost-build",
"quote",
"syn",
]
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.4.11" version = "0.4.11"
@ -2213,9 +2610,14 @@ checksum = "5651b5f6860a99bd1adb59dbfe1db8beb433e73709d9032b413a77e2fb7c066a"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-util", "futures-util",
"indexmap",
"pin-project", "pin-project",
"pin-project-lite", "pin-project-lite",
"rand 0.8.4",
"slab",
"tokio", "tokio",
"tokio-stream",
"tokio-util",
"tower-layer", "tower-layer",
"tower-service", "tower-service",
"tracing", "tracing",
@ -2235,9 +2637,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]] [[package]]
name = "tracing" name = "tracing"
version = "0.1.29" version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105" checksum = "f6c650a8ef0cd2dd93736f033d21cbd1224c5a967aa0c258d00fcf7dafef9b9f"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"log", "log",
@ -2248,9 +2650,9 @@ dependencies = [
[[package]] [[package]]
name = "tracing-attributes" name = "tracing-attributes"
version = "0.1.18" version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e" checksum = "8276d9a4a3a558d7b7ad5303ad50b53d58264641b82914b7ada36bd762e7a716"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -2259,13 +2661,23 @@ dependencies = [
[[package]] [[package]]
name = "tracing-core" name = "tracing-core"
version = "0.1.21" version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4" checksum = "03cfcb51380632a72d3111cb8d3447a8d908e577d31beeac006f836383d29a23"
dependencies = [ dependencies = [
"lazy_static", "lazy_static",
] ]
[[package]]
name = "tracing-futures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
dependencies = [
"pin-project",
"tracing",
]
[[package]] [[package]]
name = "try-lock" name = "try-lock"
version = "0.2.3" version = "0.2.3"
@ -2436,6 +2848,17 @@ dependencies = [
"untrusted", "untrusted",
] ]
[[package]]
name = "which"
version = "4.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a5a7e487e921cf220206864a94a89b6c6905bfc19f1057fa26a4cb360e5c1d2"
dependencies = [
"either",
"lazy_static",
"libc",
]
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.3.9" version = "0.3.9"

860
Cargo.nix

File diff suppressed because it is too large Load diff

View file

@ -4,9 +4,10 @@ members = [
"src/rpc", "src/rpc",
"src/table", "src/table",
"src/model", "src/model",
"src/admin",
"src/api", "src/api",
"src/web", "src/web",
"src/garage", "src/garage"
] ]
[profile.dev] [profile.dev]

View file

@ -44,6 +44,9 @@ root_domain = ".s3.garage.localhost"
bind_addr = "0.0.0.0:$((3920+$count))" bind_addr = "0.0.0.0:$((3920+$count))"
root_domain = ".web.garage.localhost" root_domain = ".web.garage.localhost"
index = "index.html" index = "index.html"
[admin]
api_bind_addr = "0.0.0.0:$((9900+$count))"
EOF EOF
echo -en "$LABEL configuration written to $CONF_PATH\n" echo -en "$LABEL configuration written to $CONF_PATH\n"

View file

@ -0,0 +1,3 @@
COMPOSE_PROJECT_NAME=telemetry
OTEL_COLLECT_TAG=0.44.0
ELASTIC_BUNDLE_TAG=7.17.0

View file

@ -0,0 +1,10 @@
apm-server:
# Defines the host and port the server is listening on. Use "unix:/path/to.sock" to listen on a unix domain socket.
host: "0.0.0.0:8200"
#-------------------------- Elasticsearch output --------------------------
output.elasticsearch:
# Array of hosts to connect to.
# Scheme and port can be left out and will be set to the default (`http` and `9200`).
# In case you specify and additional path, the scheme is required: `http://localhost:9200/path`.
# IPv6 addresses should always be defined as: `https://[2001:db8::1]:9200`.
hosts: ["localhost:9200"]

View file

@ -0,0 +1,69 @@
version: "2"
services:
otel:
image: otel/opentelemetry-collector-contrib:${OTEL_COLLECT_TAG}
command: [ "--config=/etc/otel-config.yaml" ]
volumes:
- ./otel-config.yaml:/etc/otel-config.yaml
network_mode: "host"
elastic:
image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTIC_BUNDLE_TAG}
container_name: elastic
environment:
- "node.name=elastic"
- "http.port=9200"
- "cluster.name=es-docker-cluster"
- "discovery.type=single-node"
- "bootstrap.memory_lock=true"
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
nofile: 65536
volumes:
- "es_data:/usr/share/elasticsearch/data"
network_mode: "host"
# kibana instance and collectors
# see https://www.elastic.co/guide/en/elastic-stack-get-started/current/get-started-docker.html
kibana:
image: docker.elastic.co/kibana/kibana:${ELASTIC_BUNDLE_TAG}
container_name: kibana
environment:
SERVER_NAME: "kibana.local"
# ELASTICSEARCH_URL: "http://localhost:9700"
ELASTICSEARCH_HOSTS: "http://localhost:9200"
depends_on: [ 'elastic' ]
network_mode: "host"
apm:
image: docker.elastic.co/apm/apm-server:${ELASTIC_BUNDLE_TAG}
container_name: apm
volumes:
- "./apm-config.yaml:/usr/share/apm-server/apm-server.yml:ro"
depends_on: [ 'elastic' ]
network_mode: "host"
grafana:
# see https://grafana.com/docs/grafana/latest/installation/docker/
image: "grafana/grafana:8.3.5"
container_name: grafana
# restart: unless-stopped
environment:
- "GF_INSTALL_PLUGINS=grafana-clock-panel,grafana-simple-json-datasource,grafana-piechart-panel,grafana-worldmap-panel,grafana-polystat-panel"
network_mode: "host"
volumes:
# chown 472:472 if needed
- grafana:/var/lib/grafana
- ./grafana/provisioning/:/etc/grafana/provisioning/
volumes:
es_data:
driver: local
grafana:
driver: local
metricbeat:
driver: local

View file

@ -0,0 +1,20 @@
apiVersion: 1
datasources:
- name: DS_ELASTICSEARCH
type: elasticsearch
access: proxy
url: http://elastic:9700
password: ''
user: ''
database: metricbeat-*
basicAuth: false
isDefault: true
jsonData:
esVersion: 70
logLevelField: ''
logMessageField: ''
maxConcurrentShardRequests: 5
timeField: "@timestamp"
timeInterval: 10s
readOnly: false

View file

@ -0,0 +1,47 @@
receivers:
# Data sources: metrics, traces
otlp:
protocols:
grpc:
endpoint: ":4317"
http:
endpoint: ":55681"
# Data sources: metrics
prometheus:
config:
scrape_configs:
- job_name: "garage"
scrape_interval: 5s
static_configs:
- targets: ["localhost:3909"]
exporters:
logging:
logLevel: info
# see https://www.elastic.co/guide/en/apm/get-started/current/open-telemetry-elastic.html#open-telemetry-collector
otlp/elastic:
endpoint: "localhost:8200"
tls:
insecure: true
processors:
batch:
extensions:
health_check:
pprof:
endpoint: :1888
zpages:
endpoint: :55679
service:
extensions: [pprof, zpages, health_check]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [logging, otlp/elastic]
metrics:
receivers: [otlp, prometheus]
processors: [batch]
exporters: [logging, otlp/elastic]

File diff suppressed because it is too large Load diff

View file

@ -76,6 +76,7 @@ function refresh_toolchain {
pkgs.rustPlatform.rust.cargo pkgs.rustPlatform.rust.cargo
pkgs.clippy pkgs.clippy
pkgs.rustfmt pkgs.rustfmt
pkgs.protobuf
cargo2nix.packages.x86_64-linux.cargo2nix cargo2nix.packages.x86_64-linux.cargo2nix
] else []) ] else [])
++ ++

29
src/admin/Cargo.toml Normal file
View file

@ -0,0 +1,29 @@
[package]
name = "garage_admin"
version = "0.7.0"
authors = ["Maximilien Richer <code@mricher.fr>"]
edition = "2018"
license = "AGPL-3.0"
description = "Administration and metrics REST HTTP server for Garage"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
[lib]
path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
garage_util = { version = "0.7.0", path = "../util" }
hex = "0.4"
futures = "0.3"
futures-util = "0.3"
http = "0.2"
hyper = "0.14"
tracing = "0.1.30"
opentelemetry = { version = "0.17", features = [ "rt-tokio" ] }
opentelemetry-prometheus = "0.10"
opentelemetry-otlp = "0.10"
prometheus = "0.13"

6
src/admin/lib.rs Normal file
View file

@ -0,0 +1,6 @@
//! Crate for handling the admin and metric HTTP APIs
#[macro_use]
extern crate tracing;
pub mod metrics;
pub mod tracing_setup;

146
src/admin/metrics.rs Normal file
View file

@ -0,0 +1,146 @@
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::SystemTime;
use futures::future::*;
use hyper::{
header::CONTENT_TYPE,
service::{make_service_fn, service_fn},
Body, Method, Request, Response, Server,
};
use opentelemetry::{
global,
metrics::{BoundCounter, BoundValueRecorder},
trace::{FutureExt, TraceContextExt, Tracer},
Context,
};
use opentelemetry_prometheus::PrometheusExporter;
use prometheus::{Encoder, TextEncoder};
use garage_util::error::Error as GarageError;
use garage_util::metrics::*;
// serve_req on metric endpoint
async fn serve_req(
req: Request<Body>,
admin_server: Arc<AdminServer>,
) -> Result<Response<Body>, hyper::Error> {
info!("Receiving request at path {}", req.uri());
let request_start = SystemTime::now();
admin_server.metrics.http_counter.add(1);
let response = match (req.method(), req.uri().path()) {
(&Method::GET, "/metrics") => {
let mut buffer = vec![];
let encoder = TextEncoder::new();
let tracer = opentelemetry::global::tracer("garage");
let metric_families = tracer.in_span("admin/gather_metrics", |_| {
admin_server.exporter.registry().gather()
});
encoder.encode(&metric_families, &mut buffer).unwrap();
admin_server
.metrics
.http_body_gauge
.record(buffer.len() as u64);
Response::builder()
.status(200)
.header(CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.unwrap()
}
_ => Response::builder()
.status(404)
.body(Body::from("Not implemented"))
.unwrap(),
};
admin_server
.metrics
.http_req_histogram
.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(response)
}
// AdminServer hold the admin server internal admin_server and the metric exporter
pub struct AdminServer {
exporter: PrometheusExporter,
metrics: AdminServerMetrics,
}
// GarageMetricadmin_server holds the metrics counter definition for Garage
// FIXME: we would rather have that split up among the different libraries?
struct AdminServerMetrics {
http_counter: BoundCounter<u64>,
http_body_gauge: BoundValueRecorder<u64>,
http_req_histogram: BoundValueRecorder<f64>,
}
impl AdminServer {
/// init initilialize the AdminServer and background metric server
pub fn init() -> AdminServer {
let exporter = opentelemetry_prometheus::exporter().init();
let meter = global::meter("garage/admin_server");
AdminServer {
exporter,
metrics: AdminServerMetrics {
http_counter: meter
.u64_counter("admin.http_requests_total")
.with_description("Total number of HTTP requests made.")
.init()
.bind(&[]),
http_body_gauge: meter
.u64_value_recorder("admin.http_response_size_bytes")
.with_description("The metrics HTTP response sizes in bytes.")
.init()
.bind(&[]),
http_req_histogram: meter
.f64_value_recorder("admin.http_request_duration_seconds")
.with_description("The HTTP request latencies in seconds.")
.init()
.bind(&[]),
},
}
}
/// run execute the admin server on the designated HTTP port and listen for requests
pub async fn run(
self,
bind_addr: SocketAddr,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {
let admin_server = Arc::new(self);
// For every connection, we must make a `Service` to handle all
// incoming HTTP requests on said connection.
let make_svc = make_service_fn(move |_conn| {
let admin_server = admin_server.clone();
// This is the `Service` that will handle the connection.
// `service_fn` is a helper to convert a function that
// returns a Response into a `Service`.
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let tracer = opentelemetry::global::tracer("garage");
let span = tracer
.span_builder("admin/request")
.with_trace_id(gen_trace_id())
.start(&tracer);
serve_req(req, admin_server.clone())
.with_context(Context::current_with_span(span))
}))
}
});
let server = Server::bind(&bind_addr).serve(make_svc);
let graceful = server.with_graceful_shutdown(shutdown_signal);
info!("Admin server listening on http://{}", bind_addr);
graceful.await?;
Ok(())
}
}

View file

@ -0,0 +1,37 @@
use std::time::Duration;
use opentelemetry::sdk::{
trace::{self, IdGenerator, Sampler},
Resource,
};
use opentelemetry::KeyValue;
use opentelemetry_otlp::WithExportConfig;
use garage_util::data::*;
use garage_util::error::*;
pub fn init_tracing(export_to: &str, node_id: Uuid) -> Result<(), Error> {
let node_id = hex::encode(&node_id.as_slice()[..8]);
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(export_to)
.with_timeout(Duration::from_secs(3)),
)
.with_trace_config(
trace::config()
.with_id_generator(IdGenerator::default())
.with_sampler(Sampler::AlwaysOn)
.with_resource(Resource::new(vec![
KeyValue::new("service.name", "garage"),
KeyValue::new("service.instance.id", node_id),
])),
)
.install_batch(opentelemetry::runtime::Tokio)
.ok_or_message("Unable to initialize tracing")?;
Ok(())
}

View file

@ -1,6 +1,6 @@
[package] [package]
name = "garage_api" name = "garage_api"
version = "0.6.0" version = "0.7.0"
authors = ["Alex Auvolat <alex@adnab.me>"] authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018" edition = "2018"
license = "AGPL-3.0" license = "AGPL-3.0"
@ -14,9 +14,9 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
garage_model = { version = "0.6.0", path = "../model" } garage_model = { version = "0.7.0", path = "../model" }
garage_table = { version = "0.6.0", path = "../table" } garage_table = { version = "0.7.0", path = "../table" }
garage_util = { version = "0.6.0", path = "../util" } garage_util = { version = "0.7.0", path = "../util" }
base64 = "0.13" base64 = "0.13"
bytes = "1.0" bytes = "1.0"
@ -26,7 +26,7 @@ err-derive = "0.3"
hex = "0.4" hex = "0.4"
hmac = "0.10" hmac = "0.10"
idna = "0.2" idna = "0.2"
log = "0.4" tracing = "0.1.30"
md-5 = "0.9" md-5 = "0.9"
nom = "7.1" nom = "7.1"
sha2 = "0.9" sha2 = "0.9"
@ -49,3 +49,5 @@ serde_bytes = "0.11"
serde_json = "1.0" serde_json = "1.0"
quick-xml = { version = "0.21", features = [ "serialize" ] } quick-xml = { version = "0.21", features = [ "serialize" ] }
url = "2.1" url = "2.1"
opentelemetry = "0.17"

View file

@ -7,8 +7,16 @@ use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn}; use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server}; use hyper::{Body, Method, Request, Response, Server};
use opentelemetry::{
global,
metrics::{Counter, ValueRecorder},
trace::{FutureExt, TraceContextExt, Tracer},
Context, KeyValue,
};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
use garage_util::metrics::{gen_trace_id, RecordDuration};
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::key_table::Key; use garage_model::key_table::Key;
@ -30,6 +38,34 @@ use crate::s3_put::*;
use crate::s3_router::{Authorization, Endpoint}; use crate::s3_router::{Authorization, Endpoint};
use crate::s3_website::*; use crate::s3_website::*;
struct ApiMetrics {
request_counter: Counter<u64>,
error_counter: Counter<u64>,
request_duration: ValueRecorder<f64>,
}
impl ApiMetrics {
fn new() -> Self {
let meter = global::meter("garage/api");
Self {
request_counter: meter
.u64_counter("api.request_counter")
.with_description("Number of API calls to the various S3 API endpoints")
.init(),
error_counter: meter
.u64_counter("api.error_counter")
.with_description(
"Number of API calls to the various S3 API endpoints that resulted in errors",
)
.init(),
request_duration: meter
.f64_value_recorder("api.request_duration")
.with_description("Duration of API calls to the various S3 API endpoints")
.init(),
}
}
}
/// Run the S3 API server /// Run the S3 API server
pub async fn run_api_server( pub async fn run_api_server(
garage: Arc<Garage>, garage: Arc<Garage>,
@ -37,13 +73,19 @@ pub async fn run_api_server(
) -> Result<(), GarageError> { ) -> Result<(), GarageError> {
let addr = &garage.config.s3_api.api_bind_addr; let addr = &garage.config.s3_api.api_bind_addr;
let metrics = Arc::new(ApiMetrics::new());
let service = make_service_fn(|conn: &AddrStream| { let service = make_service_fn(|conn: &AddrStream| {
let garage = garage.clone(); let garage = garage.clone();
let metrics = metrics.clone();
let client_addr = conn.remote_addr(); let client_addr = conn.remote_addr();
async move { async move {
Ok::<_, GarageError>(service_fn(move |req: Request<Body>| { Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
let garage = garage.clone(); let garage = garage.clone();
handler(garage, req, client_addr) let metrics = metrics.clone();
handler(garage, metrics, req, client_addr)
})) }))
} }
}); });
@ -59,13 +101,29 @@ pub async fn run_api_server(
async fn handler( async fn handler(
garage: Arc<Garage>, garage: Arc<Garage>,
metrics: Arc<ApiMetrics>,
req: Request<Body>, req: Request<Body>,
addr: SocketAddr, addr: SocketAddr,
) -> Result<Response<Body>, GarageError> { ) -> Result<Response<Body>, GarageError> {
let uri = req.uri().clone(); let uri = req.uri().clone();
info!("{} {} {}", addr, req.method(), uri); info!("{} {} {}", addr, req.method(), uri);
debug!("{:?}", req); debug!("{:?}", req);
match handler_inner(garage.clone(), req).await {
let tracer = opentelemetry::global::tracer("garage");
let span = tracer
.span_builder("S3 API call (unknown)")
.with_trace_id(gen_trace_id())
.with_attributes(vec![
KeyValue::new("method", format!("{}", req.method())),
KeyValue::new("uri", req.uri().to_string()),
])
.start(&tracer);
let res = handler_stage2(garage.clone(), metrics, req)
.with_context(Context::current_with_span(span))
.await;
match res {
Ok(x) => { Ok(x) => {
debug!("{} {:?}", x.status(), x.headers()); debug!("{} {:?}", x.status(), x.headers());
Ok(x) Ok(x)
@ -92,7 +150,11 @@ async fn handler(
} }
} }
async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> { async fn handler_stage2(
garage: Arc<Garage>,
metrics: Arc<ApiMetrics>,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
let authority = req let authority = req
.headers() .headers()
.get(header::HOST) .get(header::HOST)
@ -111,6 +173,46 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
let (endpoint, bucket_name) = Endpoint::from_request(&req, bucket_name.map(ToOwned::to_owned))?; let (endpoint, bucket_name) = Endpoint::from_request(&req, bucket_name.map(ToOwned::to_owned))?;
debug!("Endpoint: {:?}", endpoint); debug!("Endpoint: {:?}", endpoint);
let current_context = Context::current();
let current_span = current_context.span();
current_span.update_name::<String>(format!("S3 API {}", endpoint.name()));
current_span.set_attribute(KeyValue::new("endpoint", endpoint.name()));
current_span.set_attribute(KeyValue::new(
"bucket",
bucket_name.clone().unwrap_or_default(),
));
let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())];
let res = handler_stage3(garage, req, endpoint, bucket_name)
.record_duration(&metrics.request_duration, &metrics_tags[..])
.await;
metrics.request_counter.add(1, &metrics_tags[..]);
let status_code = match &res {
Ok(r) => r.status(),
Err(e) => e.http_status_code(),
};
if status_code.is_client_error() || status_code.is_server_error() {
metrics.error_counter.add(
1,
&[
metrics_tags[0].clone(),
KeyValue::new("status_code", status_code.as_str().to_string()),
],
);
}
res
}
async fn handler_stage3(
garage: Arc<Garage>,
req: Request<Body>,
endpoint: Endpoint,
bucket_name: Option<String>,
) -> Result<Response<Body>, Error> {
// Some endpoints are processed early, before we even check for an API key // Some endpoints are processed early, before we even check for an API key
if let Endpoint::PostObject = endpoint { if let Endpoint::PostObject = endpoint {
return handle_post_object(garage, req, bucket_name.unwrap()).await; return handle_post_object(garage, req, bucket_name.unwrap()).await;

View file

@ -1,6 +1,6 @@
//! Crate for serving a S3 compatible API //! Crate for serving a S3 compatible API
#[macro_use] #[macro_use]
extern crate log; extern crate tracing;
pub mod error; pub mod error;
pub use error::Error; pub use error::Error;

View file

@ -1,6 +1,6 @@
[package] [package]
name = "garage" name = "garage"
version = "0.6.0" version = "0.7.0"
authors = ["Alex Auvolat <alex@adnab.me>"] authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018" edition = "2018"
license = "AGPL-3.0" license = "AGPL-3.0"
@ -21,17 +21,18 @@ path = "tests/lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
garage_api = { version = "0.6.0", path = "../api" } garage_api = { version = "0.7.0", path = "../api" }
garage_model = { version = "0.6.0", path = "../model" } garage_model = { version = "0.7.0", path = "../model" }
garage_rpc = { version = "0.6.0", path = "../rpc" } garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.6.0", path = "../table" } garage_table = { version = "0.7.0", path = "../table" }
garage_util = { version = "0.6.0", path = "../util" } garage_util = { version = "0.7.0", path = "../util" }
garage_web = { version = "0.6.0", path = "../web" } garage_web = { version = "0.7.0", path = "../web" }
garage_admin = { version = "0.7.0", path = "../admin" }
bytes = "1.0" bytes = "1.0"
git-version = "0.3.4" git-version = "0.3.4"
hex = "0.4" hex = "0.4"
log = "0.4" tracing = { version = "0.1.30", features = ["log-always"] }
pretty_env_logger = "0.4" pretty_env_logger = "0.4"
rand = "0.8" rand = "0.8"
async-trait = "0.1.7" async-trait = "0.1.7"
@ -49,8 +50,9 @@ futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } #netapp = { version = "0.3.2", git = "https://git.deuxfleurs.fr/lx/netapp" }
netapp = "0.3.0" #netapp = { version = "0.4", path = "../../../netapp" }
netapp = "0.4"
[dev-dependencies] [dev-dependencies]
aws-sdk-s3 = "0.6" aws-sdk-s3 = "0.6"

View file

@ -1,7 +1,5 @@
use std::path::PathBuf; use std::path::PathBuf;
use log::warn;
use garage_util::error::*; use garage_util::error::*;
pub const READ_KEY_ERROR: &str = "Unable to read node key. It will be generated by your garage node the first time is it launched. Ensure that your garage node is currently running. (The node key is supposed to be stored in your metadata directory.)"; pub const READ_KEY_ERROR: &str = "Unable to read node key. It will be generated by your garage node the first time is it launched. Ensure that your garage node is currently running. (The node key is supposed to be stored in your metadata directory.)";

View file

@ -2,7 +2,7 @@
//! Garage CLI, used to interact with a running Garage instance, and to launch a Garage instance //! Garage CLI, used to interact with a running Garage instance, and to launch a Garage instance
#[macro_use] #[macro_use]
extern crate log; extern crate tracing;
mod admin; mod admin;
mod cli; mod cli;
@ -55,7 +55,7 @@ struct Opt {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
if std::env::var("RUST_LOG").is_err() { if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "garage=info") std::env::set_var("RUST_LOG", "netapp=info,garage=info")
} }
pretty_env_logger::init(); pretty_env_logger::init();
sodiumoxide::init().expect("Unable to init sodiumoxide"); sodiumoxide::init().expect("Unable to init sodiumoxide");
@ -106,7 +106,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
// Generate a temporary keypair for our RPC client // Generate a temporary keypair for our RPC client
let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair(); let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair();
let netapp = NetApp::new(network_key, sk); let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk);
// Find and parse the address of the target host // Find and parse the address of the target host
let (id, addr) = if let Some(h) = opt.rpc_host { let (id, addr) = if let Some(h) = opt.rpc_host {

View file

@ -6,6 +6,8 @@ use garage_util::background::*;
use garage_util::config::*; use garage_util::config::*;
use garage_util::error::Error; use garage_util::error::Error;
use garage_admin::metrics::*;
use garage_admin::tracing_setup::*;
use garage_api::run_api_server; use garage_api::run_api_server;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_web::run_web_server; use garage_web::run_web_server;
@ -34,6 +36,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
.open() .open()
.expect("Unable to open sled DB"); .expect("Unable to open sled DB");
info!("Initialize admin web server and metric backend...");
let admin_server_init = AdminServer::init();
info!("Initializing background runner..."); info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c(); let watch_cancel = netapp::util::watch_ctrl_c();
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
@ -41,9 +46,14 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initializing Garage main data store..."); info!("Initializing Garage main data store...");
let garage = Garage::new(config.clone(), db, background); let garage = Garage::new(config.clone(), db, background);
info!("Initialize tracing...");
if let Some(export_to) = config.admin.trace_sink {
init_tracing(&export_to, garage.system.id)?;
}
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone())); let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
info!("Crate admin RPC handler..."); info!("Create admin RPC handler...");
AdminRpcHandler::new(garage.clone()); AdminRpcHandler::new(garage.clone());
info!("Initializing API server..."); info!("Initializing API server...");
@ -58,6 +68,11 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
wait_from(watch_cancel.clone()), wait_from(watch_cancel.clone()),
)); ));
info!("Configure and run admin web server...");
let admin_server = tokio::spawn(
admin_server_init.run(config.admin.api_bind_addr, wait_from(watch_cancel.clone())),
);
// Stuff runs // Stuff runs
// When a cancel signal is sent, stuff stops // When a cancel signal is sent, stuff stops
@ -67,6 +82,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
if let Err(e) = web_server.await? { if let Err(e) = web_server.await? {
warn!("Web server exited with error: {}", e); warn!("Web server exited with error: {}", e);
} }
if let Err(e) = admin_server.await? {
warn!("Admin web server exited with error: {}", e);
}
// Remove RPC handlers for system to break reference cycles // Remove RPC handlers for system to break reference cycles
garage.system.netapp.drop_all_handlers(); garage.system.netapp.drop_all_handlers();

View file

@ -65,6 +65,9 @@ root_domain = ".s3.garage"
bind_addr = "127.0.0.1:{web_port}" bind_addr = "127.0.0.1:{web_port}"
root_domain = ".web.garage" root_domain = ".web.garage"
index = "index.html" index = "index.html"
[admin]
api_bind_addr = "127.0.0.1:{admin_port}"
"#, "#,
path = path.display(), path = path.display(),
secret = GARAGE_TEST_SECRET, secret = GARAGE_TEST_SECRET,
@ -72,6 +75,7 @@ index = "index.html"
api_port = port, api_port = port,
rpc_port = port + 1, rpc_port = port + 1,
web_port = port + 2, web_port = port + 2,
admin_port = port + 3,
); );
fs::write(path.join("config.toml"), config).expect("Could not write garage config file"); fs::write(path.join("config.toml"), config).expect("Could not write garage config file");

View file

@ -1,6 +1,6 @@
[package] [package]
name = "garage_model" name = "garage_model"
version = "0.6.0" version = "0.7.0"
authors = ["Alex Auvolat <alex@adnab.me>"] authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018" edition = "2018"
license = "AGPL-3.0" license = "AGPL-3.0"
@ -14,16 +14,16 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
garage_rpc = { version = "0.6.0", path = "../rpc" } garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.6.0", path = "../table" } garage_table = { version = "0.7.0", path = "../table" }
garage_util = { version = "0.6.0", path = "../util" } garage_util = { version = "0.7.0", path = "../util" }
garage_model_050 = { package = "garage_model", version = "0.5.1" } garage_model_050 = { package = "garage_model", version = "0.5.1" }
async-trait = "0.1.7" async-trait = "0.1.7"
arc-swap = "1.0" arc-swap = "1.0"
err-derive = "0.3" err-derive = "0.3"
hex = "0.4" hex = "0.4"
log = "0.4" tracing = "0.1.30"
rand = "0.8" rand = "0.8"
zstd = { version = "0.9", default-features = false } zstd = { version = "0.9", default-features = false }
@ -36,6 +36,8 @@ serde_bytes = "0.11"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
opentelemetry = "0.17"
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } #netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
netapp = "0.3.0" #netapp = { version = "0.4", path = "../../../netapp" }
netapp = "0.4"

View file

@ -5,16 +5,24 @@ use std::time::Duration;
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use zstd::stream::{decode_all as zstd_decode, Encoder};
use futures::future::*; use futures::future::*;
use futures::select; use futures::select;
use serde::{Deserialize, Serialize};
use tokio::fs; use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{watch, Mutex, Notify}; use tokio::sync::{watch, Mutex, Notify};
use zstd::stream::{decode_all as zstd_decode, Encoder};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
Context, KeyValue,
};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::metrics::RecordDuration;
use garage_util::sled_counter::SledCountedTree;
use garage_util::time::*; use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer; use garage_util::tranquilizer::Tranquilizer;
@ -23,15 +31,14 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication}; use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block_metrics::*;
use crate::block_ref_table::*; use crate::block_ref_table::*;
use crate::garage::Garage; use crate::garage::Garage;
/// Size under which data will be stored inlined in database instead of as files /// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072; pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1; pub const BACKGROUND_TRANQUILITY: u32 = 2;
pub const BACKGROUND_TRANQUILITY: u32 = 3;
// Timeout for RPCs that read and write blocks to remote nodes // Timeout for RPCs that read and write blocks to remote nodes
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30); const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
@ -40,7 +47,8 @@ const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
// The delay between the time where a resync operation fails // The delay between the time where a resync operation fails
// and the time when it is retried. // and the time when it is retried, with exponential backoff
// (multiplied by 2, 4, 8, 16, etc. for every consecutive failure).
const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60); const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
// The delay between the moment when the reference counter // The delay between the moment when the reference counter
@ -148,12 +156,15 @@ pub struct BlockManager {
rc: sled::Tree, rc: sled::Tree,
resync_queue: sled::Tree, resync_queue: SledCountedTree,
resync_notify: Notify, resync_notify: Notify,
resync_errors: SledCountedTree,
system: Arc<System>, system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>, endpoint: Arc<Endpoint<BlockRpc, Self>>,
pub(crate) garage: ArcSwapOption<Garage>, pub(crate) garage: ArcSwapOption<Garage>,
metrics: BlockManagerMetrics,
} }
// This custom struct contains functions that must only be ran // This custom struct contains functions that must only be ran
@ -175,6 +186,12 @@ impl BlockManager {
let resync_queue = db let resync_queue = db
.open_tree("block_local_resync_queue") .open_tree("block_local_resync_queue")
.expect("Unable to open block_local_resync_queue tree"); .expect("Unable to open block_local_resync_queue tree");
let resync_queue = SledCountedTree::new(resync_queue);
let resync_errors = db
.open_tree("block_local_resync_errors")
.expect("Unable to open block_local_resync_errors tree");
let resync_errors = SledCountedTree::new(resync_errors);
let endpoint = system let endpoint = system
.netapp .netapp
@ -182,6 +199,8 @@ impl BlockManager {
let manager_locked = BlockManagerLocked(); let manager_locked = BlockManagerLocked();
let metrics = BlockManagerMetrics::new(resync_queue.clone(), resync_errors.clone());
let block_manager = Arc::new(Self { let block_manager = Arc::new(Self {
replication, replication,
data_dir, data_dir,
@ -189,9 +208,11 @@ impl BlockManager {
rc, rc,
resync_queue, resync_queue,
resync_notify: Notify::new(), resync_notify: Notify::new(),
resync_errors,
system, system,
endpoint, endpoint,
garage: ArcSwapOption::from(None), garage: ArcSwapOption::from(None),
metrics,
}); });
block_manager.endpoint.set_handler(block_manager.clone()); block_manager.endpoint.set_handler(block_manager.clone());
@ -380,15 +401,36 @@ impl BlockManager {
/// Write a block to disk /// Write a block to disk
async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<BlockRpc, Error> { async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<BlockRpc, Error> {
self.mutation_lock let write_size = data.inner_buffer().len() as u64;
let res = self
.mutation_lock
.lock() .lock()
.await .await
.write_block(hash, data, self) .write_block(hash, data, self)
.await .bound_record_duration(&self.metrics.block_write_duration)
.await?;
self.metrics.bytes_written.add(write_size);
Ok(res)
} }
/// Read block from disk, verifying it's integrity /// Read block from disk, verifying it's integrity
async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> { async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
let data = self
.read_block_internal(hash)
.bound_record_duration(&self.metrics.block_read_duration)
.await?;
self.metrics
.bytes_read
.add(data.inner_buffer().len() as u64);
Ok(BlockRpc::PutBlock { hash: *hash, data })
}
async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> {
let mut path = self.block_path(hash); let mut path = self.block_path(hash);
let compressed = match self.is_block_compressed(hash).await { let compressed = match self.is_block_compressed(hash).await {
Ok(c) => c, Ok(c) => c,
@ -414,6 +456,8 @@ impl BlockManager {
}; };
if data.verify(*hash).is_err() { if data.verify(*hash).is_err() {
self.metrics.corruption_counter.add(1);
self.mutation_lock self.mutation_lock
.lock() .lock()
.await .await
@ -423,7 +467,7 @@ impl BlockManager {
return Err(Error::CorruptData(*hash)); return Err(Error::CorruptData(*hash));
} }
Ok(BlockRpc::PutBlock { hash: *hash, data }) Ok(data)
} }
/// Check if this node should have a block, but don't actually have it /// Check if this node should have a block, but don't actually have it
@ -467,21 +511,22 @@ impl BlockManager {
// ---- Resync loop ---- // ---- Resync loop ----
pub fn spawn_background_worker(self: Arc<Self>) { pub fn spawn_background_worker(self: Arc<Self>) {
// Launch n simultaneous workers for background resync loop preprocessing // Launch a background workers for background resync loop processing
for i in 0..BACKGROUND_WORKERS {
let bm2 = self.clone();
let background = self.system.background.clone(); let background = self.system.background.clone();
tokio::spawn(async move { tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10 * (i + 1))).await; tokio::time::sleep(Duration::from_secs(10)).await;
background.spawn_worker(format!("block resync worker {}", i), move |must_exit| { background.spawn_worker("block resync worker".into(), move |must_exit| {
bm2.resync_loop(must_exit) self.resync_loop(must_exit)
}); });
}); });
} }
}
fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> { fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> {
let when = now_msec() + delay.as_millis() as u64; let when = now_msec() + delay.as_millis() as u64;
self.put_to_resync_at(hash, when)
}
fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), Error> {
trace!("Put resync_queue: {} {:?}", when, hash); trace!("Put resync_queue: {} {:?}", when, hash);
let mut key = u64::to_be_bytes(when).to_vec(); let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref()); key.extend(hash.as_ref());
@ -517,19 +562,74 @@ impl BlockManager {
} }
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> { async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { if let Some(first_pair_res) = self.resync_queue.iter().next() {
let (time_bytes, hash_bytes) = first_pair_res?;
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec(); let now = now_msec();
if now >= time_msec { if now >= time_msec {
let hash = Hash::try_from(&hash_bytes[..]).unwrap(); let hash = Hash::try_from(&hash_bytes[..]).unwrap();
let res = self.resync_block(&hash).await;
if let Err(e) = &res { if let Some(ec) = self.resync_errors.get(hash.as_slice())? {
warn!("Error when resyncing {:?}: {}", hash, e); let ec = ErrorCounter::decode(ec);
self.put_to_resync(&hash, RESYNC_RETRY_DELAY)?; if now < ec.next_try() {
// if next retry after an error is not yet,
// don't do resync and return early, but still
// make sure the item is still in queue at expected time
self.put_to_resync_at(&hash, ec.next_try())?;
// ec.next_try() > now >= time_msec, so this remove
// is not removing the one we added just above
self.resync_queue.remove(time_bytes)?;
return Ok(false);
} }
}
let tracer = opentelemetry::global::tracer("garage");
let trace_id = gen_uuid();
let span = tracer
.span_builder("Resync block")
.with_trace_id(
opentelemetry::trace::TraceId::from_hex(&hex::encode(
&trace_id.as_slice()[..16],
))
.unwrap(),
)
.with_attributes(vec![KeyValue::new("block", format!("{:?}", hash))])
.start(&tracer);
let res = self
.resync_block(&hash)
.with_context(Context::current_with_span(span))
.bound_record_duration(&self.metrics.resync_duration)
.await;
self.metrics.resync_counter.add(1);
if let Err(e) = &res {
self.metrics.resync_error_counter.add(1);
warn!("Error when resyncing {:?}: {}", hash, e);
let err_counter = match self.resync_errors.get(hash.as_slice())? {
Some(ec) => ErrorCounter::decode(ec).add1(now + 1),
None => ErrorCounter::new(now + 1),
};
self.resync_errors
.insert(hash.as_slice(), err_counter.encode())?;
self.put_to_resync_at(&hash, err_counter.next_try())?;
// err_counter.next_try() >= now + 1 > now,
// the entry we remove from the queue is not
// the entry we inserted with put_to_resync_at
self.resync_queue.remove(time_bytes)?;
} else {
self.resync_errors.remove(hash.as_slice())?;
self.resync_queue.remove(time_bytes)?;
}
Ok(true) Ok(true)
} else { } else {
self.resync_queue.insert(time_bytes, hash_bytes)?;
let delay = tokio::time::sleep(Duration::from_millis(time_msec - now)); let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
select! { select! {
_ = delay.fuse() => {}, _ = delay.fuse() => {},
@ -607,6 +707,12 @@ impl BlockManager {
need_nodes.len() need_nodes.len()
); );
for node in need_nodes.iter() {
self.metrics
.resync_send_counter
.add(1, &[KeyValue::new("to", format!("{:?}", node))]);
}
let put_block_message = self.read_block(hash).await?; let put_block_message = self.read_block(hash).await?;
self.system self.system
.rpc .rpc
@ -644,6 +750,9 @@ impl BlockManager {
); );
let block_data = self.rpc_get_raw_block(hash).await?; let block_data = self.rpc_get_raw_block(hash).await?;
self.metrics.resync_recv_counter.add(1);
self.write_block(hash, &block_data).await?; self.write_block(hash, &block_data).await?;
} }
@ -760,9 +869,11 @@ impl BlockManagerLocked {
let data = data.inner_buffer(); let data = data.inner_buffer();
let mut path = mgr.block_dir(hash); let mut path = mgr.block_dir(hash);
fs::create_dir_all(&path).await?; let directory = path.clone();
path.push(hex::encode(hash)); path.push(hex::encode(hash));
fs::create_dir_all(&directory).await?;
let to_delete = match (mgr.is_block_compressed(hash).await, compressed) { let to_delete = match (mgr.is_block_compressed(hash).await, compressed) {
(Ok(true), _) => return Ok(BlockRpc::Ok), (Ok(true), _) => return Ok(BlockRpc::Ok),
(Ok(false), false) => return Ok(BlockRpc::Ok), (Ok(false), false) => return Ok(BlockRpc::Ok),
@ -783,6 +894,7 @@ impl BlockManagerLocked {
path2.set_extension("tmp"); path2.set_extension("tmp");
let mut f = fs::File::create(&path2).await?; let mut f = fs::File::create(&path2).await?;
f.write_all(data).await?; f.write_all(data).await?;
f.sync_all().await?;
drop(f); drop(f);
fs::rename(path2, path).await?; fs::rename(path2, path).await?;
@ -790,6 +902,14 @@ impl BlockManagerLocked {
fs::remove_file(to_delete).await?; fs::remove_file(to_delete).await?;
} }
let dir = fs::OpenOptions::new()
.read(true)
.mode(0)
.open(directory)
.await?;
dir.sync_all().await?;
drop(dir);
Ok(BlockRpc::Ok) Ok(BlockRpc::Ok)
} }
@ -819,6 +939,7 @@ impl BlockManagerLocked {
path.set_extension("zst"); path.set_extension("zst");
} }
fs::remove_file(path).await?; fs::remove_file(path).await?;
mgr.metrics.delete_counter.add(1);
} }
Ok(()) Ok(())
} }
@ -925,6 +1046,52 @@ impl RcEntry {
} }
} }
/// Counts the number of errors when resyncing a block,
/// and the time of the last try.
/// Used to implement exponential backoff.
#[derive(Clone, Copy, Debug)]
struct ErrorCounter {
errors: u64,
last_try: u64,
}
impl ErrorCounter {
fn new(now: u64) -> Self {
Self {
errors: 1,
last_try: now,
}
}
fn decode(data: sled::IVec) -> Self {
Self {
errors: u64::from_be_bytes(data[0..8].try_into().unwrap()),
last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()),
}
}
fn encode(&self) -> Vec<u8> {
[
u64::to_be_bytes(self.errors),
u64::to_be_bytes(self.last_try),
]
.concat()
}
fn add1(self, now: u64) -> Self {
Self {
errors: self.errors + 1,
last_try: now,
}
}
fn delay_msec(&self) -> u64 {
(RESYNC_RETRY_DELAY.as_millis() as u64) << std::cmp::min(self.errors - 1, 10)
}
fn next_try(&self) -> u64 {
self.last_try + self.delay_msec()
}
}
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> { fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
let mut result = Vec::<u8>::new(); let mut result = Vec::<u8>::new();
let mut encoder = Encoder::new(&mut result, level)?; let mut encoder = Encoder::new(&mut result, level)?;

102
src/model/block_metrics.rs Normal file
View file

@ -0,0 +1,102 @@
use opentelemetry::{global, metrics::*};
use garage_util::sled_counter::SledCountedTree;
/// TableMetrics reference all counter used for metrics
pub struct BlockManagerMetrics {
pub(crate) _resync_queue_len: ValueObserver<u64>,
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
pub(crate) resync_counter: BoundCounter<u64>,
pub(crate) resync_error_counter: BoundCounter<u64>,
pub(crate) resync_duration: BoundValueRecorder<f64>,
pub(crate) resync_send_counter: Counter<u64>,
pub(crate) resync_recv_counter: BoundCounter<u64>,
pub(crate) bytes_read: BoundCounter<u64>,
pub(crate) block_read_duration: BoundValueRecorder<f64>,
pub(crate) bytes_written: BoundCounter<u64>,
pub(crate) block_write_duration: BoundValueRecorder<f64>,
pub(crate) delete_counter: BoundCounter<u64>,
pub(crate) corruption_counter: BoundCounter<u64>,
}
impl BlockManagerMetrics {
pub fn new(resync_queue: SledCountedTree, resync_errors: SledCountedTree) -> Self {
let meter = global::meter("garage_model/block");
Self {
_resync_queue_len: meter
.u64_value_observer("block.resync_queue_length", move |observer| {
observer.observe(resync_queue.len() as u64, &[])
})
.with_description(
"Number of block hashes queued for local check and possible resync",
)
.init(),
_resync_errored_blocks: meter
.u64_value_observer("block.resync_errored_blocks", move |observer| {
observer.observe(resync_errors.len() as u64, &[])
})
.with_description("Number of block hashes whose last resync resulted in an error")
.init(),
resync_counter: meter
.u64_counter("block.resync_counter")
.with_description("Number of calls to resync_block")
.init()
.bind(&[]),
resync_error_counter: meter
.u64_counter("block.resync_error_counter")
.with_description("Number of calls to resync_block that returned an error")
.init()
.bind(&[]),
resync_duration: meter
.f64_value_recorder("block.resync_duration")
.with_description("Duration of resync_block operations")
.init()
.bind(&[]),
resync_send_counter: meter
.u64_counter("block.resync_send_counter")
.with_description("Number of blocks sent to another node in resync operations")
.init(),
resync_recv_counter: meter
.u64_counter("block.resync_recv_counter")
.with_description("Number of blocks received from other nodes in resync operations")
.init()
.bind(&[]),
bytes_read: meter
.u64_counter("block.bytes_read")
.with_description("Number of bytes read from disk")
.init()
.bind(&[]),
block_read_duration: meter
.f64_value_recorder("block.read_duration")
.with_description("Duration of block read operations")
.init()
.bind(&[]),
bytes_written: meter
.u64_counter("block.bytes_written")
.with_description("Number of bytes written to disk")
.init()
.bind(&[]),
block_write_duration: meter
.f64_value_recorder("block.write_duration")
.with_description("Duration of block write operations")
.init()
.bind(&[]),
delete_counter: meter
.u64_counter("block.delete_counter")
.with_description("Number of blocks deleted")
.init()
.bind(&[]),
corruption_counter: meter
.u64_counter("block.corruption_counter")
.with_description("Data corruptions detected on block reads")
.init()
.bind(&[]),
}
}
}

View file

@ -1,5 +1,5 @@
#[macro_use] #[macro_use]
extern crate log; extern crate tracing;
pub mod permission; pub mod permission;
@ -11,6 +11,7 @@ pub mod object_table;
pub mod version_table; pub mod version_table;
pub mod block; pub mod block;
mod block_metrics;
pub mod garage; pub mod garage;
pub mod helper; pub mod helper;

View file

@ -1,6 +1,6 @@
[package] [package]
name = "garage_rpc" name = "garage_rpc"
version = "0.6.0" version = "0.7.0"
authors = ["Alex Auvolat <alex@adnab.me>"] authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018" edition = "2018"
license = "AGPL-3.0" license = "AGPL-3.0"
@ -14,13 +14,14 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
garage_util = { version = "0.6.0", path = "../util" } garage_util = { version = "0.7.0", path = "../util" }
garage_admin = { version = "0.7.0", path = "../admin" }
arc-swap = "1.0" arc-swap = "1.0"
bytes = "1.0" bytes = "1.0"
gethostname = "0.2" gethostname = "0.2"
hex = "0.4" hex = "0.4"
log = "0.4" tracing = "0.1.30"
rand = "0.8" rand = "0.8"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
@ -34,8 +35,11 @@ futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
tokio-stream = { version = "0.1", features = ["net"] } tokio-stream = { version = "0.1", features = ["net"] }
opentelemetry = "0.17"
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } #netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
netapp = "0.3.0" #netapp = { version = "0.4", path = "../../../netapp", features = ["telemetry"] }
netapp = { version = "0.4", features = ["telemetry"] }
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }

View file

@ -139,10 +139,10 @@ pub async fn publish_consul_service(
let resp = client.request(req).await?; let resp = client.request(req).await?;
debug!("Response of advertising to Consul: {:?}", resp); debug!("Response of advertising to Consul: {:?}", resp);
let resp_code = resp.status(); let resp_code = resp.status();
let resp_bytes = &hyper::body::to_bytes(resp.into_body()).await?;
debug!( debug!(
"{}", "{}",
std::str::from_utf8(&hyper::body::to_bytes(resp.into_body()).await?) std::str::from_utf8(resp_bytes).unwrap_or("<invalid utf8>")
.unwrap_or("<invalid utf8>")
); );
if resp_code != StatusCode::OK { if resp_code != StatusCode::OK {

View file

@ -1,7 +1,7 @@
//! Crate containing rpc related functions and types used in Garage //! Crate containing rpc related functions and types used in Garage
#[macro_use] #[macro_use]
extern crate log; extern crate tracing;
mod consul; mod consul;
@ -9,6 +9,7 @@ pub mod layout;
pub mod ring; pub mod ring;
pub mod system; pub mod system;
mod metrics;
pub mod rpc_helper; pub mod rpc_helper;
pub use rpc_helper::*; pub use rpc_helper::*;

55
src/rpc/metrics.rs Normal file
View file

@ -0,0 +1,55 @@
use std::sync::Arc;
use opentelemetry::{global, metrics::*};
use tokio::sync::Semaphore;
/// TableMetrics reference all counter used for metrics
pub struct RpcMetrics {
pub(crate) _rpc_available_permits: ValueObserver<u64>,
pub(crate) rpc_counter: Counter<u64>,
pub(crate) rpc_timeout_counter: Counter<u64>,
pub(crate) rpc_netapp_error_counter: Counter<u64>,
pub(crate) rpc_garage_error_counter: Counter<u64>,
pub(crate) rpc_duration: ValueRecorder<f64>,
pub(crate) rpc_queueing_time: ValueRecorder<f64>,
}
impl RpcMetrics {
pub fn new(sem: Arc<Semaphore>) -> Self {
let meter = global::meter("garage_rpc");
RpcMetrics {
_rpc_available_permits: meter
.u64_value_observer("rpc.available_permits", move |observer| {
observer.observe(sem.available_permits() as u64, &[])
})
.with_description("Number of available RPC permits")
.init(),
rpc_counter: meter
.u64_counter("rpc.request_counter")
.with_description("Number of RPC requests emitted")
.init(),
rpc_timeout_counter: meter
.u64_counter("rpc.timeout_counter")
.with_description("Number of RPC timeouts")
.init(),
rpc_netapp_error_counter: meter
.u64_counter("rpc.netapp_error_counter")
.with_description("Number of communication errors (errors in the Netapp library)")
.init(),
rpc_garage_error_counter: meter
.u64_counter("rpc.garage_error_counter")
.with_description("Number of RPC errors (errors happening when handling the RPC)")
.init(),
rpc_duration: meter
.f64_value_recorder("rpc.duration")
.with_description("Duration of RPCs")
.init(),
rpc_queueing_time: meter
.f64_value_recorder("rpc.queueing_time")
.with_description("Time RPC requests were queued for before being sent")
.init(),
}
}
}

View file

@ -9,6 +9,12 @@ use futures_util::future::FutureExt;
use tokio::select; use tokio::select;
use tokio::sync::{watch, Semaphore}; use tokio::sync::{watch, Semaphore};
use opentelemetry::KeyValue;
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, Span, TraceContextExt, Tracer},
Context,
};
pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc}; pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc};
use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::peering::fullmesh::FullMeshPeeringStrategy;
pub use netapp::proto::*; pub use netapp::proto::*;
@ -17,7 +23,9 @@ pub use netapp::{NetApp, NodeID};
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
use crate::metrics::RpcMetrics;
use crate::ring::Ring; use crate::ring::Ring;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
@ -76,7 +84,8 @@ struct RpcHelperInner {
fullmesh: Arc<FullMeshPeeringStrategy>, fullmesh: Arc<FullMeshPeeringStrategy>,
background: Arc<BackgroundRunner>, background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>, ring: watch::Receiver<Arc<Ring>>,
request_buffer_semaphore: Semaphore, request_buffer_semaphore: Arc<Semaphore>,
metrics: RpcMetrics,
} }
impl RpcHelper { impl RpcHelper {
@ -86,12 +95,17 @@ impl RpcHelper {
background: Arc<BackgroundRunner>, background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>, ring: watch::Receiver<Arc<Ring>>,
) -> Self { ) -> Self {
let sem = Arc::new(Semaphore::new(REQUEST_BUFFER_SIZE));
let metrics = RpcMetrics::new(sem.clone());
Self(Arc::new(RpcHelperInner { Self(Arc::new(RpcHelperInner {
our_node_id, our_node_id,
fullmesh, fullmesh,
background, background,
ring, ring,
request_buffer_semaphore: Semaphore::new(REQUEST_BUFFER_SIZE), request_buffer_semaphore: sem,
metrics,
})) }))
} }
@ -120,21 +134,45 @@ impl RpcHelper {
M: Rpc<Response = Result<S, Error>>, M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>, H: EndpointHandler<M>,
{ {
let metric_tags = [
KeyValue::new("rpc_endpoint", endpoint.path().to_string()),
KeyValue::new("from", format!("{:?}", self.0.our_node_id)),
KeyValue::new("to", format!("{:?}", to)),
];
let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32; let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
let permit = self let permit = self
.0 .0
.request_buffer_semaphore .request_buffer_semaphore
.acquire_many(msg_size) .acquire_many(msg_size)
.record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags)
.await?; .await?;
self.0.metrics.rpc_counter.add(1, &metric_tags);
let node_id = to.into(); let node_id = to.into();
let rpc_call = endpoint
.call(&node_id, msg, strat.rs_priority)
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
select! { select! {
res = endpoint.call(&node_id, &msg, strat.rs_priority) => { res = rpc_call => {
drop(permit); drop(permit);
Ok(res??)
if res.is_err() {
self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags);
}
let res = res?;
if res.is_err() {
self.0.metrics.rpc_garage_error_counter.add(1, &metric_tags);
}
Ok(res?)
} }
_ = tokio::time::sleep(strat.rs_timeout) => { _ = tokio::time::sleep(strat.rs_timeout) => {
drop(permit); drop(permit);
self.0.metrics.rpc_timeout_counter.add(1, &metric_tags);
Err(Error::Timeout) Err(Error::Timeout)
} }
} }
@ -195,7 +233,47 @@ impl RpcHelper {
where where
M: Rpc<Response = Result<S, Error>> + 'static, M: Rpc<Response = Result<S, Error>> + 'static,
H: EndpointHandler<M> + 'static, H: EndpointHandler<M> + 'static,
S: Send, S: Send + 'static,
{
let quorum = strategy.rs_quorum.unwrap_or(to.len());
let tracer = opentelemetry::global::tracer("garage");
let span_name = if strategy.rs_interrupt_after_quorum {
format!("RPC {} to {} of {}", endpoint.path(), quorum, to.len())
} else {
format!(
"RPC {} to {} (quorum {})",
endpoint.path(),
to.len(),
quorum
)
};
let mut span = tracer.start(span_name);
span.set_attribute(KeyValue::new("from", format!("{:?}", self.0.our_node_id)));
span.set_attribute(KeyValue::new("to", format!("{:?}", to)));
span.set_attribute(KeyValue::new("quorum", quorum as i64));
span.set_attribute(KeyValue::new(
"interrupt_after_quorum",
strategy.rs_interrupt_after_quorum.to_string(),
));
self.try_call_many_internal(endpoint, to, msg, strategy, quorum)
.with_context(Context::current_with_span(span))
.await
}
async fn try_call_many_internal<M, H, S>(
&self,
endpoint: &Arc<Endpoint<M, H>>,
to: &[Uuid],
msg: M,
strategy: RequestStrategy,
quorum: usize,
) -> Result<Vec<S>, Error>
where
M: Rpc<Response = Result<S, Error>> + 'static,
H: EndpointHandler<M> + 'static,
S: Send + 'static,
{ {
let msg = Arc::new(msg); let msg = Arc::new(msg);
@ -210,7 +288,6 @@ impl RpcHelper {
self2.call_arc(&endpoint2, to, msg, strategy).await self2.call_arc(&endpoint2, to, msg, strategy).await
}) })
}); });
let quorum = strategy.rs_quorum.unwrap_or(to.len());
// Vectors in which success results and errors will be collected // Vectors in which success results and errors will be collected
let mut successes = vec![]; let mut successes = vec![];
@ -274,8 +351,12 @@ impl RpcHelper {
// If the current set of requests that are running is not enough to possibly // If the current set of requests that are running is not enough to possibly
// reach quorum, start some new requests. // reach quorum, start some new requests.
while successes.len() + resp_stream.len() < quorum { while successes.len() + resp_stream.len() < quorum {
if let Some((_, _, _, _to, fut)) = requests.next() { if let Some((_, _, _, req_to, fut)) = requests.next() {
resp_stream.push(fut); let tracer = opentelemetry::global::tracer("garage");
let span = tracer.start(format!("RPC to {:?}", req_to));
resp_stream.push(tokio::spawn(
fut.with_context(Context::current_with_span(span)),
));
} else { } else {
// If we have no request to add, we know that we won't ever // If we have no request to add, we know that we won't ever
// reach quorum: bail out now. // reach quorum: bail out now.
@ -285,7 +366,7 @@ impl RpcHelper {
assert!(!resp_stream.is_empty()); // because of loop invariants assert!(!resp_stream.is_empty()); // because of loop invariants
// Wait for one request to terminate // Wait for one request to terminate
match resp_stream.next().await.unwrap() { match resp_stream.next().await.unwrap().unwrap() {
Ok(msg) => { Ok(msg) => {
successes.push(msg); successes.push(msg);
} }

View file

@ -37,6 +37,9 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(2); const PING_TIMEOUT: Duration = Duration::from_secs(2);
/// Version tag used for version check upon Netapp connection
pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650007; // garage 0x0007
/// RPC endpoint used for calls related to membership /// RPC endpoint used for calls related to membership
pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc"; pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
@ -188,7 +191,10 @@ impl System {
) -> Arc<Self> { ) -> Arc<Self> {
let node_key = let node_key =
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID"); gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
info!("Node public key: {}", hex::encode(&node_key.public_key())); info!(
"Node ID of this node: {}",
hex::encode(&node_key.public_key()[..8])
);
let persist_cluster_layout = Persister::new(&config.metadata_dir, "cluster_layout"); let persist_cluster_layout = Persister::new(&config.metadata_dir, "cluster_layout");
let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
@ -216,13 +222,7 @@ impl System {
let ring = Ring::new(cluster_layout, replication_factor); let ring = Ring::new(cluster_layout, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring)); let (update_ring, ring) = watch::channel(Arc::new(ring));
if let Some(addr) = config.rpc_public_addr { let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
println!("{}@{}", hex::encode(&node_key.public_key()), addr);
} else {
println!("{}", hex::encode(&node_key.public_key()));
}
let netapp = NetApp::new(network_key, node_key);
let fullmesh = FullMeshPeeringStrategy::new( let fullmesh = FullMeshPeeringStrategy::new(
netapp.clone(), netapp.clone(),
config.bootstrap_peers.clone(), config.bootstrap_peers.clone(),

View file

@ -1,6 +1,6 @@
[package] [package]
name = "garage_table" name = "garage_table"
version = "0.6.0" version = "0.7.0"
authors = ["Alex Auvolat <alex@adnab.me>"] authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018" edition = "2018"
license = "AGPL-3.0" license = "AGPL-3.0"
@ -14,13 +14,15 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
garage_rpc = { version = "0.6.0", path = "../rpc" } garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_util = { version = "0.6.0", path = "../util" } garage_util = { version = "0.7.0", path = "../util" }
opentelemetry = "0.17"
async-trait = "0.1.7" async-trait = "0.1.7"
bytes = "1.0" bytes = "1.0"
hexdump = "0.1" hexdump = "0.1"
log = "0.4" tracing = "0.1.30"
rand = "0.8" rand = "0.8"
sled = "0.34" sled = "0.34"

View file

@ -1,18 +1,19 @@
use core::borrow::Borrow; use core::borrow::Borrow;
use std::sync::Arc; use std::sync::Arc;
use log::warn;
use serde_bytes::ByteBuf; use serde_bytes::ByteBuf;
use sled::Transactional; use sled::Transactional;
use tokio::sync::Notify; use tokio::sync::Notify;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::sled_counter::SledCountedTree;
use garage_rpc::system::System; use garage_rpc::system::System;
use crate::crdt::Crdt; use crate::crdt::Crdt;
use crate::gc::GcTodoEntry; use crate::gc::GcTodoEntry;
use crate::metrics::*;
use crate::replication::*; use crate::replication::*;
use crate::schema::*; use crate::schema::*;
@ -27,7 +28,9 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) merkle_tree: sled::Tree, pub(crate) merkle_tree: sled::Tree,
pub(crate) merkle_todo: sled::Tree, pub(crate) merkle_todo: sled::Tree,
pub(crate) merkle_todo_notify: Notify, pub(crate) merkle_todo_notify: Notify,
pub(crate) gc_todo: sled::Tree, pub(crate) gc_todo: SledCountedTree,
pub(crate) metrics: TableMetrics,
} }
impl<F, R> TableData<F, R> impl<F, R> TableData<F, R>
@ -50,6 +53,9 @@ where
let gc_todo = db let gc_todo = db
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open DB tree"); .expect("Unable to open DB tree");
let gc_todo = SledCountedTree::new(gc_todo);
let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone());
Arc::new(Self { Arc::new(Self {
system, system,
@ -60,6 +66,7 @@ where
merkle_todo, merkle_todo,
merkle_todo_notify: Notify::new(), merkle_todo_notify: Notify::new(),
gc_todo, gc_todo,
metrics,
}) })
} }
@ -165,6 +172,8 @@ where
})?; })?;
if let Some((old_entry, new_entry, new_bytes_hash)) = changed { if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
self.metrics.internal_update_counter.add(1);
let is_tombstone = new_entry.is_tombstone(); let is_tombstone = new_entry.is_tombstone();
self.instance.updated(old_entry, Some(new_entry)); self.instance.updated(old_entry, Some(new_entry));
self.merkle_todo_notify.notify_one(); self.merkle_todo_notify.notify_one();
@ -199,6 +208,8 @@ where
})?; })?;
if removed { if removed {
self.metrics.internal_delete_counter.add(1);
let old_entry = self.decode_entry(v)?; let old_entry = self.decode_entry(v)?;
self.instance.updated(Some(old_entry), None); self.instance.updated(Some(old_entry), None);
self.merkle_todo_notify.notify_one(); self.merkle_todo_notify.notify_one();

View file

@ -14,6 +14,7 @@ use tokio::sync::watch;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::sled_counter::SledCountedTree;
use garage_util::time::*; use garage_util::time::*;
use garage_rpc::system::System; use garage_rpc::system::System;
@ -362,7 +363,7 @@ impl GcTodoEntry {
} }
/// Saves the GcTodoEntry in the gc_todo tree /// Saves the GcTodoEntry in the gc_todo tree
pub(crate) fn save(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> { pub(crate) fn save(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> {
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?; gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
Ok(()) Ok(())
} }
@ -372,7 +373,7 @@ impl GcTodoEntry {
/// This is usefull to remove a todo entry only under the condition /// This is usefull to remove a todo entry only under the condition
/// that it has not changed since the time it was read, i.e. /// that it has not changed since the time it was read, i.e.
/// what we have to do is still the same /// what we have to do is still the same
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> { pub(crate) fn remove_if_equal(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> {
let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>( let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>(
&self.todo_table_key()[..], &self.todo_table_key()[..],
Some(self.value_hash), Some(self.value_hash),

View file

@ -2,8 +2,9 @@
#![allow(clippy::comparison_chain)] #![allow(clippy::comparison_chain)]
#[macro_use] #[macro_use]
extern crate log; extern crate tracing;
mod metrics;
pub mod schema; pub mod schema;
pub mod util; pub mod util;

View file

@ -3,7 +3,6 @@ use std::time::Duration;
use futures::select; use futures::select;
use futures_util::future::*; use futures_util::future::*;
use log::{debug, warn};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sled::transaction::{ use sled::transaction::{
ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree, ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree,

96
src/table/metrics.rs Normal file
View file

@ -0,0 +1,96 @@
use opentelemetry::{global, metrics::*, KeyValue};
use garage_util::sled_counter::SledCountedTree;
/// TableMetrics reference all counter used for metrics
pub struct TableMetrics {
pub(crate) _merkle_todo_len: ValueObserver<u64>,
pub(crate) _gc_todo_len: ValueObserver<u64>,
pub(crate) get_request_counter: BoundCounter<u64>,
pub(crate) get_request_duration: BoundValueRecorder<f64>,
pub(crate) put_request_counter: BoundCounter<u64>,
pub(crate) put_request_duration: BoundValueRecorder<f64>,
pub(crate) internal_update_counter: BoundCounter<u64>,
pub(crate) internal_delete_counter: BoundCounter<u64>,
pub(crate) sync_items_sent: Counter<u64>,
pub(crate) sync_items_received: Counter<u64>,
}
impl TableMetrics {
pub fn new(
table_name: &'static str,
merkle_todo: sled::Tree,
gc_todo: SledCountedTree,
) -> Self {
let meter = global::meter(table_name);
TableMetrics {
_merkle_todo_len: meter
.u64_value_observer(
"table.merkle_updater_todo_queue_length",
move |observer| {
observer.observe(
merkle_todo.len() as u64,
&[KeyValue::new("table_name", table_name)],
)
},
)
.with_description("Merkle tree updater TODO queue length")
.init(),
_gc_todo_len: meter
.u64_value_observer(
"table.gc_todo_queue_length",
move |observer| {
observer.observe(
gc_todo.len() as u64,
&[KeyValue::new("table_name", table_name)],
)
},
)
.with_description("Table garbage collector TODO queue length")
.init(),
get_request_counter: meter
.u64_counter("table.get_request_counter")
.with_description("Number of get/get_range requests internally made on this table")
.init()
.bind(&[KeyValue::new("table_name", table_name)]),
get_request_duration: meter
.f64_value_recorder("table.get_request_duration")
.with_description("Duration of get/get_range requests internally made on this table, in seconds")
.init()
.bind(&[KeyValue::new("table_name", table_name)]),
put_request_counter: meter
.u64_counter("table.put_request_counter")
.with_description("Number of insert/insert_many requests internally made on this table")
.init()
.bind(&[KeyValue::new("table_name", table_name)]),
put_request_duration: meter
.f64_value_recorder("table.put_request_duration")
.with_description("Duration of insert/insert_many requests internally made on this table, in seconds")
.init()
.bind(&[KeyValue::new("table_name", table_name)]),
internal_update_counter: meter
.u64_counter("table.internal_update_counter")
.with_description("Number of value updates where the value actually changes (includes creation of new key and update of existing key)")
.init()
.bind(&[KeyValue::new("table_name", table_name)]),
internal_delete_counter: meter
.u64_counter("table.internal_delete_counter")
.with_description("Number of value deletions in the tree (due to GC or repartitioning)")
.init()
.bind(&[KeyValue::new("table_name", table_name)]),
sync_items_sent: meter
.u64_counter("table.sync_items_sent")
.with_description("Number of data items sent to other nodes during resync procedures")
.init(),
sync_items_received: meter
.u64_counter("table.sync_items_received")
.with_description("Number of data items received from other nodes during resync procedures")
.init(),
}
}
}

View file

@ -6,6 +6,7 @@ use async_trait::async_trait;
use futures::select; use futures::select;
use futures_util::future::*; use futures_util::future::*;
use futures_util::stream::*; use futures_util::stream::*;
use opentelemetry::KeyValue;
use rand::Rng; use rand::Rng;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf; use serde_bytes::ByteBuf;
@ -312,6 +313,16 @@ where
) -> Result<(), Error> { ) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>(); let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
for to in nodes.iter() {
self.data.metrics.sync_items_sent.add(
values.len() as u64,
&[
KeyValue::new("table_name", F::TABLE_NAME),
KeyValue::new("to", format!("{:?}", to)),
],
);
}
self.system self.system
.rpc .rpc
.try_call_many( .try_call_many(
@ -500,6 +511,14 @@ where
.map(|x| Arc::new(ByteBuf::from(x))) .map(|x| Arc::new(ByteBuf::from(x)))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
self.data.metrics.sync_items_sent.add(
values.len() as u64,
&[
KeyValue::new("table_name", F::TABLE_NAME),
KeyValue::new("to", format!("{:?}", who)),
],
);
let rpc_resp = self let rpc_resp = self
.system .system
.rpc .rpc
@ -527,7 +546,7 @@ where
F: TableSchema + 'static, F: TableSchema + 'static,
R: TableReplication + 'static, R: TableReplication + 'static,
{ {
async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> Result<SyncRpc, Error> { async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> {
match message { match message {
SyncRpc::RootCkHash(range, h) => { SyncRpc::RootCkHash(range, h) => {
let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
@ -539,6 +558,17 @@ where
Ok(SyncRpc::Node(k.clone(), node)) Ok(SyncRpc::Node(k.clone(), node))
} }
SyncRpc::Items(items) => { SyncRpc::Items(items) => {
self.data.metrics.sync_items_received.add(
items.len() as u64,
&[
KeyValue::new("table_name", F::TABLE_NAME),
KeyValue::new(
"from",
format!("{:?}", Uuid::try_from(from.as_ref()).unwrap()),
),
],
);
self.data.update_many(items)?; self.data.update_many(items)?;
Ok(SyncRpc::Ok) Ok(SyncRpc::Ok)
} }

View file

@ -7,8 +7,14 @@ use futures::stream::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf; use serde_bytes::ByteBuf;
use opentelemetry::{
trace::{FutureExt, TraceContextExt, Tracer},
Context,
};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
use garage_rpc::system::System; use garage_rpc::system::System;
use garage_rpc::*; use garage_rpc::*;
@ -81,6 +87,20 @@ where
} }
pub async fn insert(&self, e: &F::E) -> Result<(), Error> { pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} insert", F::TABLE_NAME));
self.insert_internal(e)
.bound_record_duration(&self.data.metrics.put_request_duration)
.with_context(Context::current_with_span(span))
.await?;
self.data.metrics.put_request_counter.add(1);
Ok(())
}
async fn insert_internal(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash(); let hash = e.partition_key().hash();
let who = self.data.replication.write_nodes(&hash); let who = self.data.replication.write_nodes(&hash);
//eprintln!("insert who: {:?}", who); //eprintln!("insert who: {:?}", who);
@ -99,10 +119,25 @@ where
.with_timeout(TABLE_RPC_TIMEOUT), .with_timeout(TABLE_RPC_TIMEOUT),
) )
.await?; .await?;
Ok(()) Ok(())
} }
pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} insert_many {}", F::TABLE_NAME, entries.len()));
self.insert_many_internal(entries)
.bound_record_duration(&self.data.metrics.put_request_duration)
.with_context(Context::current_with_span(span))
.await?;
self.data.metrics.put_request_counter.add(1);
Ok(())
}
async fn insert_many_internal(&self, entries: &[F::E]) -> Result<(), Error> {
let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
for entry in entries.iter() { for entry in entries.iter() {
@ -148,10 +183,28 @@ where
self: &Arc<Self>, self: &Arc<Self>,
partition_key: &F::P, partition_key: &F::P,
sort_key: &F::S, sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} get", F::TABLE_NAME));
let res = self
.get_internal(partition_key, sort_key)
.bound_record_duration(&self.data.metrics.get_request_duration)
.with_context(Context::current_with_span(span))
.await?;
self.data.metrics.get_request_counter.add(1);
Ok(res)
}
async fn get_internal(
self: &Arc<Self>,
partition_key: &F::P,
sort_key: &F::S,
) -> Result<Option<F::E>, Error> { ) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash(); let hash = partition_key.hash();
let who = self.data.replication.read_nodes(&hash); let who = self.data.replication.read_nodes(&hash);
//eprintln!("get who: {:?}", who);
let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self let resps = self
@ -198,6 +251,7 @@ where
.spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
} }
} }
Ok(ret) Ok(ret)
} }
@ -207,6 +261,27 @@ where
begin_sort_key: Option<F::S>, begin_sort_key: Option<F::S>,
filter: Option<F::Filter>, filter: Option<F::Filter>,
limit: usize, limit: usize,
) -> Result<Vec<F::E>, Error> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} get_range", F::TABLE_NAME));
let res = self
.get_range_internal(partition_key, begin_sort_key, filter, limit)
.bound_record_duration(&self.data.metrics.get_request_duration)
.with_context(Context::current_with_span(span))
.await?;
self.data.metrics.get_request_counter.add(1);
Ok(res)
}
async fn get_range_internal(
self: &Arc<Self>,
partition_key: &F::P,
begin_sort_key: Option<F::S>,
filter: Option<F::Filter>,
limit: usize,
) -> Result<Vec<F::E>, Error> { ) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash(); let hash = partition_key.hash();
let who = self.data.replication.read_nodes(&hash); let who = self.data.replication.read_nodes(&hash);

View file

@ -1,6 +1,6 @@
[package] [package]
name = "garage_util" name = "garage_util"
version = "0.6.0" version = "0.7.0"
authors = ["Alex Auvolat <alex@adnab.me>"] authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018" edition = "2018"
license = "AGPL-3.0" license = "AGPL-3.0"
@ -18,7 +18,7 @@ blake2 = "0.9"
err-derive = "0.3" err-derive = "0.3"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
hex = "0.4" hex = "0.4"
log = "0.4" tracing = "0.1.30"
rand = "0.8" rand = "0.8"
sha2 = "0.9" sha2 = "0.9"
@ -34,7 +34,9 @@ futures = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } #netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
netapp = "0.3.0" #netapp = { version = "0.4", path = "../../../netapp" }
netapp = "0.4"
http = "0.2" http = "0.2"
hyper = "0.14" hyper = "0.14"
opentelemetry = "0.17"

View file

@ -66,6 +66,9 @@ pub struct Config {
/// Configuration for serving files as normal web server /// Configuration for serving files as normal web server
pub s3_web: WebConfig, pub s3_web: WebConfig,
/// Configuration for the admin API endpoint
pub admin: AdminConfig,
} }
/// Configuration for S3 api /// Configuration for S3 api
@ -89,6 +92,15 @@ pub struct WebConfig {
pub root_domain: String, pub root_domain: String,
} }
/// Configuration for the admin and monitoring HTTP API
#[derive(Deserialize, Debug, Clone)]
pub struct AdminConfig {
/// Address and port to bind for admin API serving
pub api_bind_addr: SocketAddr,
/// OTLP server to where to export traces
pub trace_sink: Option<String>,
}
fn default_sled_cache_capacity() -> u64 { fn default_sled_cache_capacity() -> u64 {
128 * 1024 * 1024 128 * 1024 * 1024
} }

View file

@ -22,7 +22,7 @@ impl std::convert::AsRef<[u8]> for FixedBytes32 {
impl fmt::Debug for FixedBytes32 { impl fmt::Debug for FixedBytes32 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", hex::encode(&self.0[..8])) write!(f, "{}", hex::encode(&self.0[..8]))
} }
} }

View file

@ -1,14 +1,16 @@
//! Crate containing common functions and types used in Garage //! Crate containing common functions and types used in Garage
#[macro_use] #[macro_use]
extern crate log; extern crate tracing;
pub mod background; pub mod background;
pub mod config; pub mod config;
pub mod crdt; pub mod crdt;
pub mod data; pub mod data;
pub mod error; pub mod error;
pub mod metrics;
pub mod persister; pub mod persister;
pub mod sled_counter;
pub mod time; pub mod time;
pub mod token_bucket; pub mod token_bucket;
pub mod tranquilizer; pub mod tranquilizer;

57
src/util/metrics.rs Normal file
View file

@ -0,0 +1,57 @@
use std::time::SystemTime;
use futures::{future::BoxFuture, Future, FutureExt};
use rand::Rng;
use opentelemetry::{metrics::*, trace::TraceId, KeyValue};
pub trait RecordDuration<'a>: 'a {
type Output;
fn record_duration(
self,
r: &'a ValueRecorder<f64>,
attributes: &'a [KeyValue],
) -> BoxFuture<'a, Self::Output>;
fn bound_record_duration(self, r: &'a BoundValueRecorder<f64>) -> BoxFuture<'a, Self::Output>;
}
impl<'a, T, O> RecordDuration<'a> for T
where
T: Future<Output = O> + Send + 'a,
{
type Output = O;
fn record_duration(
self,
r: &'a ValueRecorder<f64>,
attributes: &'a [KeyValue],
) -> BoxFuture<'a, Self::Output> {
async move {
let request_start = SystemTime::now();
let res = self.await;
r.record(
request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()),
attributes,
);
res
}
.boxed()
}
fn bound_record_duration(self, r: &'a BoundValueRecorder<f64>) -> BoxFuture<'a, Self::Output> {
async move {
let request_start = SystemTime::now();
let res = self.await;
r.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
res
}
.boxed()
}
}
// ----
pub fn gen_trace_id() -> TraceId {
rand::thread_rng().gen::<[u8; 16]>().into()
}

100
src/util/sled_counter.rs Normal file
View file

@ -0,0 +1,100 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use sled::{CompareAndSwapError, IVec, Iter, Result, Tree};
#[derive(Clone)]
pub struct SledCountedTree(Arc<SledCountedTreeInternal>);
struct SledCountedTreeInternal {
tree: Tree,
len: AtomicUsize,
}
impl SledCountedTree {
pub fn new(tree: Tree) -> Self {
let len = tree.len();
Self(Arc::new(SledCountedTreeInternal {
tree,
len: AtomicUsize::new(len),
}))
}
pub fn len(&self) -> usize {
self.0.len.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.0.tree.is_empty()
}
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
self.0.tree.get(key)
}
pub fn iter(&self) -> Iter {
self.0.tree.iter()
}
// ---- writing functions ----
pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<IVec>>
where
K: AsRef<[u8]>,
V: Into<IVec>,
{
let res = self.0.tree.insert(key, value);
if res == Ok(None) {
self.0.len.fetch_add(1, Ordering::Relaxed);
}
res
}
pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
let res = self.0.tree.remove(key);
if matches!(res, Ok(Some(_))) {
self.0.len.fetch_sub(1, Ordering::Relaxed);
}
res
}
pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> {
let res = self.0.tree.pop_min();
if let Ok(Some(_)) = &res {
self.0.len.fetch_sub(1, Ordering::Relaxed);
};
res
}
pub fn compare_and_swap<K, OV, NV>(
&self,
key: K,
old: Option<OV>,
new: Option<NV>,
) -> Result<std::result::Result<(), CompareAndSwapError>>
where
K: AsRef<[u8]>,
OV: AsRef<[u8]>,
NV: Into<IVec>,
{
let old_some = old.is_some();
let new_some = new.is_some();
let res = self.0.tree.compare_and_swap(key, old, new);
if res == Ok(Ok(())) {
match (old_some, new_some) {
(false, true) => {
self.0.len.fetch_add(1, Ordering::Relaxed);
}
(true, false) => {
self.0.len.fetch_sub(1, Ordering::Relaxed);
}
_ => (),
}
}
res
}
}

View file

@ -1,6 +1,6 @@
[package] [package]
name = "garage_web" name = "garage_web"
version = "0.6.0" version = "0.7.0"
authors = ["Alex Auvolat <alex@adnab.me>", "Quentin Dufour <quentin@dufour.io>"] authors = ["Alex Auvolat <alex@adnab.me>", "Quentin Dufour <quentin@dufour.io>"]
edition = "2018" edition = "2018"
license = "AGPL-3.0" license = "AGPL-3.0"
@ -14,16 +14,18 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
garage_api = { version = "0.6.0", path = "../api" } garage_api = { version = "0.7.0", path = "../api" }
garage_model = { version = "0.6.0", path = "../model" } garage_model = { version = "0.7.0", path = "../model" }
garage_util = { version = "0.6.0", path = "../util" } garage_util = { version = "0.7.0", path = "../util" }
garage_table = { version = "0.6.0", path = "../table" } garage_table = { version = "0.7.0", path = "../table" }
err-derive = "0.3" err-derive = "0.3"
log = "0.4" tracing = "0.1.30"
percent-encoding = "2.1.0" percent-encoding = "2.1.0"
futures = "0.3" futures = "0.3"
http = "0.2" http = "0.2"
hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] } hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] }
opentelemetry = "0.17"

View file

@ -1,6 +1,6 @@
//! Crate for handling web serving of s3 bucket //! Crate for handling web serving of s3 bucket
#[macro_use] #[macro_use]
extern crate log; extern crate tracing;
mod error; mod error;
pub use error::Error; pub use error::Error;

View file

@ -9,6 +9,13 @@ use hyper::{
Body, Method, Request, Response, Server, Body, Method, Request, Response, Server,
}; };
use opentelemetry::{
global,
metrics::{Counter, ValueRecorder},
trace::{FutureExt, TraceContextExt, Tracer},
Context, KeyValue,
};
use crate::error::*; use crate::error::*;
use garage_api::error::{Error as ApiError, OkOrBadRequest, OkOrInternalError}; use garage_api::error::{Error as ApiError, OkOrBadRequest, OkOrInternalError};
@ -20,6 +27,33 @@ use garage_model::garage::Garage;
use garage_table::*; use garage_table::*;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
use garage_util::metrics::{gen_trace_id, RecordDuration};
struct WebMetrics {
request_counter: Counter<u64>,
error_counter: Counter<u64>,
request_duration: ValueRecorder<f64>,
}
impl WebMetrics {
fn new() -> Self {
let meter = global::meter("garage/web");
Self {
request_counter: meter
.u64_counter("web.request_counter")
.with_description("Number of requests to the web endpoint")
.init(),
error_counter: meter
.u64_counter("web.error_counter")
.with_description("Number of requests to the web endpoint resulting in errors")
.init(),
request_duration: meter
.f64_value_recorder("web.request_duration")
.with_description("Duration of requests to the web endpoint")
.init(),
}
}
}
/// Run a web server /// Run a web server
pub async fn run_web_server( pub async fn run_web_server(
@ -28,13 +62,19 @@ pub async fn run_web_server(
) -> Result<(), GarageError> { ) -> Result<(), GarageError> {
let addr = &garage.config.s3_web.bind_addr; let addr = &garage.config.s3_web.bind_addr;
let metrics = Arc::new(WebMetrics::new());
let service = make_service_fn(|conn: &AddrStream| { let service = make_service_fn(|conn: &AddrStream| {
let garage = garage.clone(); let garage = garage.clone();
let metrics = metrics.clone();
let client_addr = conn.remote_addr(); let client_addr = conn.remote_addr();
async move { async move {
Ok::<_, Error>(service_fn(move |req: Request<Body>| { Ok::<_, Error>(service_fn(move |req: Request<Body>| {
let garage = garage.clone(); let garage = garage.clone();
handle_request(garage, req, client_addr) let metrics = metrics.clone();
handle_request(garage, metrics, req, client_addr)
})) }))
} }
}); });
@ -49,23 +89,55 @@ pub async fn run_web_server(
async fn handle_request( async fn handle_request(
garage: Arc<Garage>, garage: Arc<Garage>,
metrics: Arc<WebMetrics>,
req: Request<Body>, req: Request<Body>,
addr: SocketAddr, addr: SocketAddr,
) -> Result<Response<Body>, Infallible> { ) -> Result<Response<Body>, Infallible> {
info!("{} {} {}", addr, req.method(), req.uri()); info!("{} {} {}", addr, req.method(), req.uri());
match serve_file(garage, &req).await {
// Lots of instrumentation
let tracer = opentelemetry::global::tracer("garage");
let span = tracer
.span_builder(format!("Web {} request", req.method()))
.with_trace_id(gen_trace_id())
.with_attributes(vec![
KeyValue::new("method", format!("{}", req.method())),
KeyValue::new("uri", req.uri().to_string()),
])
.start(&tracer);
let metrics_tags = &[KeyValue::new("method", req.method().to_string())];
// The actual handler
let res = serve_file(garage, &req)
.with_context(Context::current_with_span(span))
.record_duration(&metrics.request_duration, &metrics_tags[..])
.await;
// More instrumentation
metrics.request_counter.add(1, &metrics_tags[..]);
// Returning the result
match res {
Ok(res) => { Ok(res) => {
debug!("{} {} {}", req.method(), req.uri(), res.status()); debug!("{} {} {}", req.method(), res.status(), req.uri());
Ok(res) Ok(res)
} }
Err(error) => { Err(error) => {
info!( info!(
"{} {} {} {}", "{} {} {} {}",
req.method(), req.method(),
req.uri(),
error.http_status_code(), error.http_status_code(),
req.uri(),
error error
); );
metrics.error_counter.add(
1,
&[
metrics_tags[0].clone(),
KeyValue::new("status_code", error.http_status_code().to_string()),
],
);
Ok(error_to_res(error)) Ok(error_to_res(error))
} }
} }