From fb6b4dc9a95f4268775e674323a2b84c7b4ae7f0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 18 Feb 2022 20:10:46 +0100 Subject: [PATCH] Correct implementation of distributed tracing --- Cargo.lock | 13 ++++++++++++- Cargo.toml | 5 +++-- src/client.rs | 39 ++++++++++++++++++++++++++------------- src/server.rs | 47 ++++++++++++++++++++++++++--------------------- 4 files changed, 67 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 36facc8..3b7f2d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -433,7 +433,7 @@ dependencies = [ [[package]] name = "netapp" -version = "0.3.2" +version = "0.4.0" dependencies = [ "arc-swap", "async-trait", @@ -449,6 +449,7 @@ dependencies = [ "log", "lru", "opentelemetry", + "opentelemetry-contrib", "rand 0.5.6", "rmp-serde", "serde", @@ -521,6 +522,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry-contrib" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85637add8f60bb4cac673469c14f47a329c6cec7365c72d72cd32f2d104a721a" +dependencies = [ + "lazy_static", + "opentelemetry", +] + [[package]] name = "percent-encoding" version = "2.1.0" diff --git a/Cargo.toml b/Cargo.toml index 4b6db48..8eb32db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "netapp" -version = "0.3.2" +version = "0.4.0" authors = ["Alex Auvolat "] edition = "2018" license-file = "LICENSE" @@ -17,7 +17,7 @@ name = "netapp" [features] default = [] basalt = ["lru", "rand"] -telemetry = ["opentelemetry", "rand"] +telemetry = ["opentelemetry", "opentelemetry-contrib", "rand"] [dependencies] futures = "0.3.17" @@ -43,6 +43,7 @@ sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } kuska-handshake = { version = "0.2.0", features = ["default", "async_std"] } opentelemetry = { version = "0.17", optional = true } +opentelemetry-contrib = { version = "0.9", optional = true } [dev-dependencies] env_logger = "0.8" diff --git a/src/client.rs b/src/client.rs index 7ef772d..e08b30b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,6 +11,14 @@ use tokio::select; use tokio::sync::{mpsc, oneshot, watch}; use tokio_util::compat::*; +#[cfg(feature = "telemetry")] +use opentelemetry::{ + trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer}, + Context, +}; +#[cfg(feature = "telemetry")] +use opentelemetry_contrib::trace::propagator::binary::*; + use futures::io::AsyncReadExt; use async_trait::async_trait; @@ -127,18 +135,14 @@ impl ClientConn { cfg_if::cfg_if! { if #[cfg(feature = "telemetry")] { - use opentelemetry::{trace::{TraceId, TraceContextExt}, Context}; - let trace_id_int = Context::current() - .span() - .span_context() - .trace_id(); - let trace_id = if trace_id_int == TraceId::INVALID { - None - } else { - Some(trace_id_int.to_bytes().to_vec()) - }; + let tracer = opentelemetry::global::tracer("netapp"); + let span = tracer.span_builder(format!("RPC >> {}", path)) + .with_kind(SpanKind::Server) + .start(&tracer); + let propagator = BinaryPropagator::new(); + let telemetry_id = Some(propagator.to_bytes(span.span_context()).to_vec()); } else { - let trace_id: Option> = None; + let telemetry_id: Option> = None; } }; @@ -148,7 +152,7 @@ impl ClientConn { bytes.extend_from_slice(&[prio, path.as_bytes().len() as u8]); bytes.extend_from_slice(path.as_bytes()); - if let Some(by) = trace_id { + if let Some(by) = telemetry_id { bytes.push(by.len() as u8); bytes.extend(by); } else { @@ -172,7 +176,16 @@ impl ClientConn { trace!("request: query_send {}, {} bytes", id, bytes.len()); query_send.send((id, prio, bytes))?; - let resp = resp_recv.await?; + cfg_if::cfg_if! { + if #[cfg(feature = "telemetry")] { + let resp = resp_recv + .with_context(Context::current_with_span(span)) + .await?; + } else { + let resp = resp_recv.await?; + } + } + if resp.is_empty() { return Err(Error::Message( "Response is 0 bytes, either a collision or a protocol error".into(), diff --git a/src/server.rs b/src/server.rs index 937d65a..8b60e17 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,6 +5,13 @@ use arc_swap::ArcSwapOption; use bytes::Bytes; use log::{debug, trace}; +#[cfg(feature = "telemetry")] +use opentelemetry::{ + trace::{FutureExt, Span, SpanKind, TraceContextExt, TraceId, Tracer}, + Context, KeyValue, +}; +#[cfg(feature = "telemetry")] +use opentelemetry_contrib::trace::propagator::binary::*; #[cfg(feature = "telemetry")] use rand::{thread_rng, Rng}; @@ -122,9 +129,9 @@ impl ServerConn { let path = &bytes[2..2 + path_length]; let path = String::from_utf8(path.to_vec())?; - let trace_id_len = bytes[2 + path_length] as usize; + let telemetry_id_len = bytes[2 + path_length] as usize; - let data = &bytes[3 + path_length + trace_id_len..]; + let data = &bytes[3 + path_length + telemetry_id_len..]; let handler_opt = { let endpoints = self.netapp.endpoints.read().unwrap(); @@ -134,28 +141,26 @@ impl ServerConn { if let Some(handler) = handler_opt { cfg_if::cfg_if! { if #[cfg(feature = "telemetry")] { - use opentelemetry::{ - KeyValue, - trace::{FutureExt, TraceContextExt, Tracer}, - Context, trace::TraceId - }; - let trace_id = if trace_id_len == 16 { - let mut by = [0u8; 16]; - by.copy_from_slice(&bytes[3+path_length..19+path_length]); - TraceId::from_bytes(by) + let tracer = opentelemetry::global::tracer("netapp"); + + let mut span = if telemetry_id_len > 0 { + let by = bytes[3+path_length..3+path_length+telemetry_id_len].to_vec(); + let propagator = BinaryPropagator::new(); + let context = propagator.from_bytes(by); + let context = Context::new().with_remote_span_context(context); + tracer.span_builder(format!(">> RPC {}", path)) + .with_kind(SpanKind::Server) + .start_with_context(&tracer, &context) } else { let mut rng = thread_rng(); - TraceId::from_bytes(rng.gen()) + let trace_id = TraceId::from_bytes(rng.gen()); + tracer + .span_builder(format!(">> RPC {}", path)) + .with_kind(SpanKind::Server) + .with_trace_id(trace_id) + .start(&tracer) }; - - let tracer = opentelemetry::global::tracer("garage"); - let span = tracer - .span_builder(format!("RPC handler {}", path)) - .with_trace_id(trace_id) - .with_attributes(vec![ - KeyValue::new("path", path), - ]) - .start(&tracer); + span.set_attribute(KeyValue::new("path", path.to_string())); handler.handle(data, self.peer_id) .with_context(Context::current_with_span(span))