From ab0f7785ae73e2f5aaf912fc3c0f2cd724967546 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 18 Feb 2022 19:01:23 +0100 Subject: [PATCH] Add telemetry --- Cargo.lock | 176 +++++++++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 6 +- src/client.rs | 31 ++++++++- src/server.rs | 41 +++++++++++- 4 files changed, 248 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c5bf7ba..36facc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -62,6 +62,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bumpalo" +version = "3.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a45a46ab1f2412e53d3a0ade76ffad2025804294569aae387231a0cd6e0899" + [[package]] name = "byteorder" version = "1.4.3" @@ -125,6 +131,26 @@ dependencies = [ "bitflags", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e54ea8bc3fb1ee042f5aace6e3c6e025d3874866da222930f70ce62aceba0bfa" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e5bed1f1c269533fa816a0a5492b3545209a205ca1a54842be180eb63a16a6" +dependencies = [ + "cfg-if", + "lazy_static", +] + [[package]] name = "env_logger" version = "0.8.4" @@ -302,6 +328,15 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "js-sys" +version = "0.3.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a38fc24e30fd564ce974c02bf1d337caddff65be6cc4735a1f7eab22a7440f04" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "kuska-handshake" version = "0.2.0" @@ -398,11 +433,12 @@ dependencies = [ [[package]] name = "netapp" -version = "0.3.1" +version = "0.3.2" dependencies = [ "arc-swap", "async-trait", "bytes 0.6.0", + "cfg-if", "chrono", "env_logger", "err-derive", @@ -412,7 +448,8 @@ dependencies = [ "kuska-sodiumoxide", "log", "lru", - "rand", + "opentelemetry", + "rand 0.5.6", "rmp-serde", "serde", "structopt", @@ -465,6 +502,51 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" +[[package]] +name = "opentelemetry" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6105e89802af13fdf48c49d7646d3b533a70e536d818aae7e78ba0433d01acb8" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures-channel", + "futures-executor", + "futures-util", + "js-sys", + "lazy_static", + "percent-encoding", + "pin-project", + "rand 0.8.5", + "thiserror", +] + +[[package]] +name = "percent-encoding" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" + +[[package]] +name = "pin-project" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.7" @@ -483,6 +565,12 @@ version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c9b1041b4387893b91ee6746cddfc28516aff326a3519fb2adf820932c5e6cb" +[[package]] +name = "ppv-lite86" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -550,6 +638,27 @@ dependencies = [ "winapi", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core 0.6.3", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.3", +] + [[package]] name = "rand_core" version = "0.3.1" @@ -565,6 +674,15 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" +[[package]] +name = "rand_core" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" +dependencies = [ + "getrandom", +] + [[package]] name = "regex" version = "1.5.4" @@ -845,6 +963,60 @@ version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +[[package]] +name = "wasm-bindgen" +version = "0.2.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25f1af7423d8588a3d840681122e72e6a24ddbcb3f0ec385cac0d12d24256c06" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b21c0df030f5a177f3cba22e9bc4322695ec43e7257d865302900290bcdedca" +dependencies = [ + "bumpalo", + "lazy_static", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f4203d69e40a52ee523b2529a773d5ffc1dc0071801c87b3d270b471b80ed01" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa8a30d46208db204854cadbb5d4baf5fcf8071ba5bf48190c3e59937962ebc" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d958d035c4438e28c70e4321a2911302f10135ce78a9c7834c0cab4123d06a2" + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 3621309..4b6db48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "netapp" -version = "0.3.1" +version = "0.3.2" authors = ["Alex Auvolat "] edition = "2018" license-file = "LICENSE" @@ -17,6 +17,7 @@ name = "netapp" [features] default = [] basalt = ["lru", "rand"] +telemetry = ["opentelemetry", "rand"] [dependencies] futures = "0.3.17" @@ -36,10 +37,13 @@ async-trait = "0.1.7" err-derive = "0.2.3" bytes = "0.6.0" lru = { version = "0.6", optional = true } +cfg-if = "1.0" sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } kuska-handshake = { version = "0.2.0", features = ["default", "async_std"] } +opentelemetry = { version = "0.17", optional = true } + [dev-dependencies] env_logger = "0.8" structopt = { version = "0.3", default-features = false } diff --git a/src/client.rs b/src/client.rs index ca1bcf9..7ef772d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -125,10 +125,39 @@ impl ClientConn { .next_query_number .fetch_add(1, atomic::Ordering::Relaxed); - let mut bytes = vec![prio, path.as_bytes().len() as u8]; + cfg_if::cfg_if! { + if #[cfg(feature = "telemetry")] { + use opentelemetry::{trace::{TraceId, TraceContextExt}, Context}; + let trace_id_int = Context::current() + .span() + .span_context() + .trace_id(); + let trace_id = if trace_id_int == TraceId::INVALID { + None + } else { + Some(trace_id_int.to_bytes().to_vec()) + }; + } else { + let trace_id: Option> = None; + } + }; + + // Encode request + let mut bytes = vec![]; + + bytes.extend_from_slice(&[prio, path.as_bytes().len() as u8]); bytes.extend_from_slice(path.as_bytes()); + + if let Some(by) = trace_id { + bytes.push(by.len() as u8); + bytes.extend(by); + } else { + bytes.push(0); + } + bytes.extend_from_slice(&rmp_to_vec_all_named(rq)?[..]); + // Send request through let (resp_send, resp_recv) = oneshot::channel(); let old = self.inflight.lock().unwrap().insert(id, resp_send); if let Some(old_ch) = old { diff --git a/src/server.rs b/src/server.rs index f23b810..937d65a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,6 +5,9 @@ use arc_swap::ArcSwapOption; use bytes::Bytes; use log::{debug, trace}; +#[cfg(feature = "telemetry")] +use rand::{thread_rng, Rng}; + use tokio::net::TcpStream; use tokio::select; use tokio::sync::{mpsc, watch}; @@ -118,7 +121,10 @@ impl ServerConn { let path = &bytes[2..2 + path_length]; let path = String::from_utf8(path.to_vec())?; - let data = &bytes[2 + path_length..]; + + let trace_id_len = bytes[2 + path_length] as usize; + + let data = &bytes[3 + path_length + trace_id_len..]; let handler_opt = { let endpoints = self.netapp.endpoints.read().unwrap(); @@ -126,7 +132,38 @@ impl ServerConn { }; if let Some(handler) = handler_opt { - handler.handle(data, self.peer_id).await + cfg_if::cfg_if! { + if #[cfg(feature = "telemetry")] { + use opentelemetry::{ + KeyValue, + trace::{FutureExt, TraceContextExt, Tracer}, + Context, trace::TraceId + }; + let trace_id = if trace_id_len == 16 { + let mut by = [0u8; 16]; + by.copy_from_slice(&bytes[3+path_length..19+path_length]); + TraceId::from_bytes(by) + } else { + let mut rng = thread_rng(); + TraceId::from_bytes(rng.gen()) + }; + + let tracer = opentelemetry::global::tracer("garage"); + let span = tracer + .span_builder(format!("RPC handler {}", path)) + .with_trace_id(trace_id) + .with_attributes(vec![ + KeyValue::new("path", path), + ]) + .start(&tracer); + + handler.handle(data, self.peer_id) + .with_context(Context::current_with_span(span)) + .await + } else { + handler.handle(data, self.peer_id).await + } + } } else { Err(Error::NoHandler) }