Add telemetry

This commit is contained in:
Alex 2022-02-18 19:01:23 +01:00
parent dc0b5c0305
commit ab0f7785ae
4 changed files with 248 additions and 6 deletions

176
Cargo.lock generated
View file

@ -62,6 +62,12 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bumpalo"
version = "3.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a45a46ab1f2412e53d3a0ade76ffad2025804294569aae387231a0cd6e0899"
[[package]]
name = "byteorder"
version = "1.4.3"
@ -125,6 +131,26 @@ dependencies = [
"bitflags",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e54ea8bc3fb1ee042f5aace6e3c6e025d3874866da222930f70ce62aceba0bfa"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e5bed1f1c269533fa816a0a5492b3545209a205ca1a54842be180eb63a16a6"
dependencies = [
"cfg-if",
"lazy_static",
]
[[package]]
name = "env_logger"
version = "0.8.4"
@ -302,6 +328,15 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "js-sys"
version = "0.3.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a38fc24e30fd564ce974c02bf1d337caddff65be6cc4735a1f7eab22a7440f04"
dependencies = [
"wasm-bindgen",
]
[[package]]
name = "kuska-handshake"
version = "0.2.0"
@ -398,11 +433,12 @@ dependencies = [
[[package]]
name = "netapp"
version = "0.3.1"
version = "0.3.2"
dependencies = [
"arc-swap",
"async-trait",
"bytes 0.6.0",
"cfg-if",
"chrono",
"env_logger",
"err-derive",
@ -412,7 +448,8 @@ dependencies = [
"kuska-sodiumoxide",
"log",
"lru",
"rand",
"opentelemetry",
"rand 0.5.6",
"rmp-serde",
"serde",
"structopt",
@ -465,6 +502,51 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56"
[[package]]
name = "opentelemetry"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6105e89802af13fdf48c49d7646d3b533a70e536d818aae7e78ba0433d01acb8"
dependencies = [
"async-trait",
"crossbeam-channel",
"futures-channel",
"futures-executor",
"futures-util",
"js-sys",
"lazy_static",
"percent-encoding",
"pin-project",
"rand 0.8.5",
"thiserror",
]
[[package]]
name = "percent-encoding"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.7"
@ -483,6 +565,12 @@ version = "0.3.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c9b1041b4387893b91ee6746cddfc28516aff326a3519fb2adf820932c5e6cb"
[[package]]
name = "ppv-lite86"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "proc-macro-error"
version = "1.0.4"
@ -550,6 +638,27 @@ dependencies = [
"winapi",
]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core 0.6.3",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core 0.6.3",
]
[[package]]
name = "rand_core"
version = "0.3.1"
@ -565,6 +674,15 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc"
[[package]]
name = "rand_core"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7"
dependencies = [
"getrandom",
]
[[package]]
name = "regex"
version = "1.5.4"
@ -845,6 +963,60 @@ version = "0.10.2+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]]
name = "wasm-bindgen"
version = "0.2.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25f1af7423d8588a3d840681122e72e6a24ddbcb3f0ec385cac0d12d24256c06"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b21c0df030f5a177f3cba22e9bc4322695ec43e7257d865302900290bcdedca"
dependencies = [
"bumpalo",
"lazy_static",
"log",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f4203d69e40a52ee523b2529a773d5ffc1dc0071801c87b3d270b471b80ed01"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa8a30d46208db204854cadbb5d4baf5fcf8071ba5bf48190c3e59937962ebc"
dependencies = [
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d958d035c4438e28c70e4321a2911302f10135ce78a9c7834c0cab4123d06a2"
[[package]]
name = "winapi"
version = "0.3.9"

View file

@ -1,6 +1,6 @@
[package]
name = "netapp"
version = "0.3.1"
version = "0.3.2"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license-file = "LICENSE"
@ -17,6 +17,7 @@ name = "netapp"
[features]
default = []
basalt = ["lru", "rand"]
telemetry = ["opentelemetry", "rand"]
[dependencies]
futures = "0.3.17"
@ -36,10 +37,13 @@ async-trait = "0.1.7"
err-derive = "0.2.3"
bytes = "0.6.0"
lru = { version = "0.6", optional = true }
cfg-if = "1.0"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
kuska-handshake = { version = "0.2.0", features = ["default", "async_std"] }
opentelemetry = { version = "0.17", optional = true }
[dev-dependencies]
env_logger = "0.8"
structopt = { version = "0.3", default-features = false }

View file

@ -125,10 +125,39 @@ impl ClientConn {
.next_query_number
.fetch_add(1, atomic::Ordering::Relaxed);
let mut bytes = vec![prio, path.as_bytes().len() as u8];
cfg_if::cfg_if! {
if #[cfg(feature = "telemetry")] {
use opentelemetry::{trace::{TraceId, TraceContextExt}, Context};
let trace_id_int = Context::current()
.span()
.span_context()
.trace_id();
let trace_id = if trace_id_int == TraceId::INVALID {
None
} else {
Some(trace_id_int.to_bytes().to_vec())
};
} else {
let trace_id: Option<Vec<u8>> = None;
}
};
// Encode request
let mut bytes = vec![];
bytes.extend_from_slice(&[prio, path.as_bytes().len() as u8]);
bytes.extend_from_slice(path.as_bytes());
if let Some(by) = trace_id {
bytes.push(by.len() as u8);
bytes.extend(by);
} else {
bytes.push(0);
}
bytes.extend_from_slice(&rmp_to_vec_all_named(rq)?[..]);
// Send request through
let (resp_send, resp_recv) = oneshot::channel();
let old = self.inflight.lock().unwrap().insert(id, resp_send);
if let Some(old_ch) = old {

View file

@ -5,6 +5,9 @@ use arc_swap::ArcSwapOption;
use bytes::Bytes;
use log::{debug, trace};
#[cfg(feature = "telemetry")]
use rand::{thread_rng, Rng};
use tokio::net::TcpStream;
use tokio::select;
use tokio::sync::{mpsc, watch};
@ -118,7 +121,10 @@ impl ServerConn {
let path = &bytes[2..2 + path_length];
let path = String::from_utf8(path.to_vec())?;
let data = &bytes[2 + path_length..];
let trace_id_len = bytes[2 + path_length] as usize;
let data = &bytes[3 + path_length + trace_id_len..];
let handler_opt = {
let endpoints = self.netapp.endpoints.read().unwrap();
@ -126,7 +132,38 @@ impl ServerConn {
};
if let Some(handler) = handler_opt {
handler.handle(data, self.peer_id).await
cfg_if::cfg_if! {
if #[cfg(feature = "telemetry")] {
use opentelemetry::{
KeyValue,
trace::{FutureExt, TraceContextExt, Tracer},
Context, trace::TraceId
};
let trace_id = if trace_id_len == 16 {
let mut by = [0u8; 16];
by.copy_from_slice(&bytes[3+path_length..19+path_length]);
TraceId::from_bytes(by)
} else {
let mut rng = thread_rng();
TraceId::from_bytes(rng.gen())
};
let tracer = opentelemetry::global::tracer("garage");
let span = tracer
.span_builder(format!("RPC handler {}", path))
.with_trace_id(trace_id)
.with_attributes(vec![
KeyValue::new("path", path),
])
.start(&tracer);
handler.handle(data, self.peer_id)
.with_context(Context::current_with_span(span))
.await
} else {
handler.handle(data, self.peer_id).await
}
}
} else {
Err(Error::NoHandler)
}