Correct implementation of distributed tracing
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Alex 2022-02-18 20:10:46 +01:00
parent ab0f7785ae
commit fb6b4dc9a9
4 changed files with 67 additions and 37 deletions

13
Cargo.lock generated
View file

@ -433,7 +433,7 @@ dependencies = [
[[package]] [[package]]
name = "netapp" name = "netapp"
version = "0.3.2" version = "0.4.0"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
@ -449,6 +449,7 @@ dependencies = [
"log", "log",
"lru", "lru",
"opentelemetry", "opentelemetry",
"opentelemetry-contrib",
"rand 0.5.6", "rand 0.5.6",
"rmp-serde", "rmp-serde",
"serde", "serde",
@ -521,6 +522,16 @@ dependencies = [
"thiserror", "thiserror",
] ]
[[package]]
name = "opentelemetry-contrib"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85637add8f60bb4cac673469c14f47a329c6cec7365c72d72cd32f2d104a721a"
dependencies = [
"lazy_static",
"opentelemetry",
]
[[package]] [[package]]
name = "percent-encoding" name = "percent-encoding"
version = "2.1.0" version = "2.1.0"

View file

@ -1,6 +1,6 @@
[package] [package]
name = "netapp" name = "netapp"
version = "0.3.2" version = "0.4.0"
authors = ["Alex Auvolat <alex@adnab.me>"] authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018" edition = "2018"
license-file = "LICENSE" license-file = "LICENSE"
@ -17,7 +17,7 @@ name = "netapp"
[features] [features]
default = [] default = []
basalt = ["lru", "rand"] basalt = ["lru", "rand"]
telemetry = ["opentelemetry", "rand"] telemetry = ["opentelemetry", "opentelemetry-contrib", "rand"]
[dependencies] [dependencies]
futures = "0.3.17" futures = "0.3.17"
@ -43,6 +43,7 @@ sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
kuska-handshake = { version = "0.2.0", features = ["default", "async_std"] } kuska-handshake = { version = "0.2.0", features = ["default", "async_std"] }
opentelemetry = { version = "0.17", optional = true } opentelemetry = { version = "0.17", optional = true }
opentelemetry-contrib = { version = "0.9", optional = true }
[dev-dependencies] [dev-dependencies]
env_logger = "0.8" env_logger = "0.8"

View file

@ -11,6 +11,14 @@ use tokio::select;
use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch};
use tokio_util::compat::*; use tokio_util::compat::*;
#[cfg(feature = "telemetry")]
use opentelemetry::{
trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer},
Context,
};
#[cfg(feature = "telemetry")]
use opentelemetry_contrib::trace::propagator::binary::*;
use futures::io::AsyncReadExt; use futures::io::AsyncReadExt;
use async_trait::async_trait; use async_trait::async_trait;
@ -127,18 +135,14 @@ impl ClientConn {
cfg_if::cfg_if! { cfg_if::cfg_if! {
if #[cfg(feature = "telemetry")] { if #[cfg(feature = "telemetry")] {
use opentelemetry::{trace::{TraceId, TraceContextExt}, Context}; let tracer = opentelemetry::global::tracer("netapp");
let trace_id_int = Context::current() let span = tracer.span_builder(format!("RPC >> {}", path))
.span() .with_kind(SpanKind::Server)
.span_context() .start(&tracer);
.trace_id(); let propagator = BinaryPropagator::new();
let trace_id = if trace_id_int == TraceId::INVALID { let telemetry_id = Some(propagator.to_bytes(span.span_context()).to_vec());
None
} else { } else {
Some(trace_id_int.to_bytes().to_vec()) let telemetry_id: Option<Vec<u8>> = None;
};
} else {
let trace_id: Option<Vec<u8>> = None;
} }
}; };
@ -148,7 +152,7 @@ impl ClientConn {
bytes.extend_from_slice(&[prio, path.as_bytes().len() as u8]); bytes.extend_from_slice(&[prio, path.as_bytes().len() as u8]);
bytes.extend_from_slice(path.as_bytes()); bytes.extend_from_slice(path.as_bytes());
if let Some(by) = trace_id { if let Some(by) = telemetry_id {
bytes.push(by.len() as u8); bytes.push(by.len() as u8);
bytes.extend(by); bytes.extend(by);
} else { } else {
@ -172,7 +176,16 @@ impl ClientConn {
trace!("request: query_send {}, {} bytes", id, bytes.len()); trace!("request: query_send {}, {} bytes", id, bytes.len());
query_send.send((id, prio, bytes))?; query_send.send((id, prio, bytes))?;
cfg_if::cfg_if! {
if #[cfg(feature = "telemetry")] {
let resp = resp_recv
.with_context(Context::current_with_span(span))
.await?;
} else {
let resp = resp_recv.await?; let resp = resp_recv.await?;
}
}
if resp.is_empty() { if resp.is_empty() {
return Err(Error::Message( return Err(Error::Message(
"Response is 0 bytes, either a collision or a protocol error".into(), "Response is 0 bytes, either a collision or a protocol error".into(),

View file

@ -5,6 +5,13 @@ use arc_swap::ArcSwapOption;
use bytes::Bytes; use bytes::Bytes;
use log::{debug, trace}; use log::{debug, trace};
#[cfg(feature = "telemetry")]
use opentelemetry::{
trace::{FutureExt, Span, SpanKind, TraceContextExt, TraceId, Tracer},
Context, KeyValue,
};
#[cfg(feature = "telemetry")]
use opentelemetry_contrib::trace::propagator::binary::*;
#[cfg(feature = "telemetry")] #[cfg(feature = "telemetry")]
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
@ -122,9 +129,9 @@ impl ServerConn {
let path = &bytes[2..2 + path_length]; let path = &bytes[2..2 + path_length];
let path = String::from_utf8(path.to_vec())?; let path = String::from_utf8(path.to_vec())?;
let trace_id_len = bytes[2 + path_length] as usize; let telemetry_id_len = bytes[2 + path_length] as usize;
let data = &bytes[3 + path_length + trace_id_len..]; let data = &bytes[3 + path_length + telemetry_id_len..];
let handler_opt = { let handler_opt = {
let endpoints = self.netapp.endpoints.read().unwrap(); let endpoints = self.netapp.endpoints.read().unwrap();
@ -134,28 +141,26 @@ impl ServerConn {
if let Some(handler) = handler_opt { if let Some(handler) = handler_opt {
cfg_if::cfg_if! { cfg_if::cfg_if! {
if #[cfg(feature = "telemetry")] { if #[cfg(feature = "telemetry")] {
use opentelemetry::{ let tracer = opentelemetry::global::tracer("netapp");
KeyValue,
trace::{FutureExt, TraceContextExt, Tracer}, let mut span = if telemetry_id_len > 0 {
Context, trace::TraceId let by = bytes[3+path_length..3+path_length+telemetry_id_len].to_vec();
}; let propagator = BinaryPropagator::new();
let trace_id = if trace_id_len == 16 { let context = propagator.from_bytes(by);
let mut by = [0u8; 16]; let context = Context::new().with_remote_span_context(context);
by.copy_from_slice(&bytes[3+path_length..19+path_length]); tracer.span_builder(format!(">> RPC {}", path))
TraceId::from_bytes(by) .with_kind(SpanKind::Server)
.start_with_context(&tracer, &context)
} else { } else {
let mut rng = thread_rng(); let mut rng = thread_rng();
TraceId::from_bytes(rng.gen()) let trace_id = TraceId::from_bytes(rng.gen());
}; tracer
.span_builder(format!(">> RPC {}", path))
let tracer = opentelemetry::global::tracer("garage"); .with_kind(SpanKind::Server)
let span = tracer
.span_builder(format!("RPC handler {}", path))
.with_trace_id(trace_id) .with_trace_id(trace_id)
.with_attributes(vec![ .start(&tracer)
KeyValue::new("path", path), };
]) span.set_attribute(KeyValue::new("path", path.to_string()));
.start(&tracer);
handler.handle(data, self.peer_id) handler.handle(data, self.peer_id)
.with_context(Context::current_with_span(span)) .with_context(Context::current_with_span(span))