Add spans to table calls, change span names in RPC
This commit is contained in:
parent
897baf8d75
commit
238e17aa0b
3 changed files with 54 additions and 11 deletions
|
@ -20,8 +20,8 @@ use opentelemetry_prometheus::PrometheusExporter;
|
|||
|
||||
use prometheus::{Encoder, TextEncoder};
|
||||
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error as GarageError;
|
||||
use garage_util::metrics::*;
|
||||
|
||||
// serve_req on metric endpoint
|
||||
async fn serve_req(
|
||||
|
@ -125,15 +125,9 @@ impl AdminServer {
|
|||
async move {
|
||||
Ok::<_, Infallible>(service_fn(move |req| {
|
||||
let tracer = opentelemetry::global::tracer("garage");
|
||||
let uuid = gen_uuid();
|
||||
let span = tracer
|
||||
.span_builder("admin/request")
|
||||
.with_trace_id(
|
||||
opentelemetry::trace::TraceId::from_hex(&hex::encode(
|
||||
&uuid.as_slice()[..16],
|
||||
))
|
||||
.unwrap(),
|
||||
)
|
||||
.with_trace_id(gen_trace_id())
|
||||
.start(&tracer);
|
||||
|
||||
serve_req(req, admin_server.clone())
|
||||
|
|
|
@ -234,9 +234,23 @@ impl RpcHelper {
|
|||
let quorum = strategy.rs_quorum.unwrap_or(to.len());
|
||||
|
||||
let tracer = opentelemetry::global::tracer("garage");
|
||||
let mut span = tracer.start(format!("RPC {} to {}", endpoint.path(), to.len()));
|
||||
let span_name = if strategy.rs_interrupt_after_quorum {
|
||||
format!("RPC {} to {} of {}", endpoint.path(), quorum, to.len())
|
||||
} else {
|
||||
format!(
|
||||
"RPC {} to {} (quorum {})",
|
||||
endpoint.path(),
|
||||
to.len(),
|
||||
quorum
|
||||
)
|
||||
};
|
||||
let mut span = tracer.start(span_name);
|
||||
span.set_attribute(KeyValue::new("to", format!("{:?}", to)));
|
||||
span.set_attribute(KeyValue::new("quorum", quorum as i64));
|
||||
span.set_attribute(KeyValue::new(
|
||||
"interrupt_after_quorum",
|
||||
strategy.rs_interrupt_after_quorum.to_string(),
|
||||
));
|
||||
|
||||
self.try_call_many_internal(endpoint, to, msg, strategy, quorum)
|
||||
.with_context(Context::current_with_span(span))
|
||||
|
|
|
@ -7,6 +7,11 @@ use futures::stream::*;
|
|||
use serde::{Deserialize, Serialize};
|
||||
use serde_bytes::ByteBuf;
|
||||
|
||||
use opentelemetry::{
|
||||
trace::{FutureExt, TraceContextExt, Tracer},
|
||||
Context,
|
||||
};
|
||||
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error;
|
||||
use garage_util::metrics::RecordDuration;
|
||||
|
@ -82,6 +87,20 @@ where
|
|||
}
|
||||
|
||||
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
|
||||
let tracer = opentelemetry::global::tracer("garage_table");
|
||||
let span = tracer.start(format!("{} insert", F::TABLE_NAME));
|
||||
|
||||
self.insert_internal(e)
|
||||
.bound_record_duration(&self.data.metrics.put_request_duration)
|
||||
.with_context(Context::current_with_span(span))
|
||||
.await?;
|
||||
|
||||
self.data.metrics.put_request_counter.add(1);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn insert_internal(&self, e: &F::E) -> Result<(), Error> {
|
||||
let hash = e.partition_key().hash();
|
||||
let who = self.data.replication.write_nodes(&hash);
|
||||
//eprintln!("insert who: {:?}", who);
|
||||
|
@ -99,18 +118,22 @@ where
|
|||
.with_quorum(self.data.replication.write_quorum())
|
||||
.with_timeout(TABLE_RPC_TIMEOUT),
|
||||
)
|
||||
.bound_record_duration(&self.data.metrics.put_request_duration)
|
||||
.await?;
|
||||
|
||||
self.data.metrics.put_request_counter.add(1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> {
|
||||
let tracer = opentelemetry::global::tracer("garage_table");
|
||||
let span = tracer.start(format!("{} insert_many {}", F::TABLE_NAME, entries.len()));
|
||||
|
||||
self.insert_many_internal(entries)
|
||||
.bound_record_duration(&self.data.metrics.put_request_duration)
|
||||
.with_context(Context::current_with_span(span))
|
||||
.await?;
|
||||
|
||||
self.data.metrics.put_request_counter.add(1);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -161,11 +184,17 @@ where
|
|||
partition_key: &F::P,
|
||||
sort_key: &F::S,
|
||||
) -> Result<Option<F::E>, Error> {
|
||||
let tracer = opentelemetry::global::tracer("garage_table");
|
||||
let span = tracer.start(format!("{} get", F::TABLE_NAME));
|
||||
|
||||
let res = self
|
||||
.get_internal(partition_key, sort_key)
|
||||
.bound_record_duration(&self.data.metrics.get_request_duration)
|
||||
.with_context(Context::current_with_span(span))
|
||||
.await?;
|
||||
|
||||
self.data.metrics.get_request_counter.add(1);
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
|
@ -233,11 +262,17 @@ where
|
|||
filter: Option<F::Filter>,
|
||||
limit: usize,
|
||||
) -> Result<Vec<F::E>, Error> {
|
||||
let tracer = opentelemetry::global::tracer("garage_table");
|
||||
let span = tracer.start(format!("{} get_range", F::TABLE_NAME));
|
||||
|
||||
let res = self
|
||||
.get_range_internal(partition_key, begin_sort_key, filter, limit)
|
||||
.bound_record_duration(&self.data.metrics.get_request_duration)
|
||||
.with_context(Context::current_with_span(span))
|
||||
.await?;
|
||||
|
||||
self.data.metrics.get_request_counter.add(1);
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue