Update to Netapp 0.4 which supports distributed tracing

This commit is contained in:
Alex 2022-02-18 20:39:55 +01:00
parent 3549a760f1
commit 12215a1997
Signed by: lx
GPG key ID: 0E496D15096376BE
11 changed files with 112 additions and 40 deletions

37
Cargo.lock generated
View file

@ -730,7 +730,7 @@ dependencies = [
"hex",
"http",
"kuska-sodiumoxide",
"netapp",
"netapp 0.4.0",
"pretty_env_logger",
"rand",
"rmp-serde 0.15.5",
@ -818,7 +818,7 @@ dependencies = [
"garage_util 0.5.1",
"hex",
"log",
"netapp",
"netapp 0.3.1",
"rand",
"rmp-serde 0.15.5",
"serde",
@ -842,7 +842,7 @@ dependencies = [
"garage_table 0.6.0",
"garage_util 0.6.0",
"hex",
"netapp",
"netapp 0.4.0",
"opentelemetry",
"rand",
"rmp-serde 0.15.5",
@ -871,7 +871,7 @@ dependencies = [
"hyper",
"kuska-sodiumoxide",
"log",
"netapp",
"netapp 0.3.1",
"rand",
"rmp-serde 0.15.5",
"serde",
@ -895,7 +895,7 @@ dependencies = [
"hex",
"hyper",
"kuska-sodiumoxide",
"netapp",
"netapp 0.4.0",
"opentelemetry",
"rand",
"rmp-serde 0.15.5",
@ -964,7 +964,7 @@ dependencies = [
"http",
"hyper",
"log",
"netapp",
"netapp 0.3.1",
"rand",
"rmp-serde 0.15.5",
"serde",
@ -987,7 +987,7 @@ dependencies = [
"hex",
"http",
"hyper",
"netapp",
"netapp 0.4.0",
"rand",
"rmp-serde 0.15.5",
"serde",
@ -1505,6 +1505,29 @@ dependencies = [
"tokio-util",
]
[[package]]
name = "netapp"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22c545a13b0c47b47e8052b35c4884dbe33c9ea62607371b0f4f1b0490cafd38"
dependencies = [
"arc-swap",
"async-trait",
"bytes 0.6.0",
"cfg-if",
"err-derive 0.2.4",
"futures",
"hex",
"kuska-handshake",
"kuska-sodiumoxide",
"log",
"rmp-serde 0.14.4",
"serde",
"tokio",
"tokio-stream",
"tokio-util",
]
[[package]]
name = "nom"
version = "7.1.0"

View file

@ -1059,7 +1059,7 @@ in
git_version = rustPackages."registry+https://github.com/rust-lang/crates.io-index".git-version."0.3.5" { inherit profileName; };
hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; };
sodiumoxide = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; };
netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.3.1" { inherit profileName; };
netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.4.0" { inherit profileName; };
pretty_env_logger = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pretty_env_logger."0.4.0" { inherit profileName; };
rand = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.4" { inherit profileName; };
rmp_serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; };
@ -1185,7 +1185,7 @@ in
garage_table = rustPackages."unknown".garage_table."0.6.0" { inherit profileName; };
garage_util = rustPackages."unknown".garage_util."0.6.0" { inherit profileName; };
hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; };
netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.3.1" { inherit profileName; };
netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.4.0" { inherit profileName; };
opentelemetry = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; };
rand = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.4" { inherit profileName; };
rmp_serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; };
@ -1242,7 +1242,7 @@ in
hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; };
hyper = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."0.14.17" { inherit profileName; };
sodiumoxide = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; };
netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.3.1" { inherit profileName; };
netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.4.0" { inherit profileName; };
opentelemetry = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; };
rand = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.4" { inherit profileName; };
rmp_serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; };
@ -1342,7 +1342,7 @@ in
hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; };
http = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.5" { inherit profileName; };
hyper = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."0.14.17" { inherit profileName; };
netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.3.1" { inherit profileName; };
netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.4.0" { inherit profileName; };
rand = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.4" { inherit profileName; };
rmp_serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; };
serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.130" { inherit profileName; };
@ -2024,6 +2024,33 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".netapp."0.4.0" = overridableMkRustCrate (profileName: rec {
name = "netapp";
version = "0.4.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "22c545a13b0c47b47e8052b35c4884dbe33c9ea62607371b0f4f1b0490cafd38"; };
features = builtins.concatLists [
[ "default" ]
];
dependencies = {
arc_swap = rustPackages."registry+https://github.com/rust-lang/crates.io-index".arc-swap."1.4.0" { inherit profileName; };
async_trait = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.51" { profileName = "__noProfile"; };
bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."0.6.0" { inherit profileName; };
cfg_if = rustPackages."registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" { inherit profileName; };
err_derive = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.2.4" { profileName = "__noProfile"; };
futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.17" { inherit profileName; };
hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; };
kuska_handshake = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-handshake."0.2.0" { inherit profileName; };
sodiumoxide = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; };
log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.14" { inherit profileName; };
rmp_serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.14.4" { inherit profileName; };
serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.130" { inherit profileName; };
tokio = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.12.0" { inherit profileName; };
tokio_stream = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.8" { inherit profileName; };
tokio_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-util."0.6.8" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".nom."7.1.0" = overridableMkRustCrate (profileName: rec {
name = "nom";
version = "7.1.0";

View file

@ -9,7 +9,7 @@ use hyper::{Body, Method, Request, Response, Server};
use opentelemetry::{
trace::{FutureExt, TraceContextExt, Tracer},
Context,
Context, KeyValue,
};
use garage_util::data::*;
@ -50,15 +50,19 @@ pub async fn run_api_server(
let garage = garage.clone();
let tracer = opentelemetry::global::tracer("garage");
let uuid = gen_uuid();
let trace_id = gen_uuid();
let span = tracer
.span_builder("S3 API call (unknown)")
.with_trace_id(
opentelemetry::trace::TraceId::from_hex(&hex::encode(
&uuid.as_slice()[..16],
&trace_id.as_slice()[..16],
))
.unwrap(),
)
.with_attributes(vec![
KeyValue::new("method", format!("{}", req.method())),
KeyValue::new("uri", req.uri().path().to_string()),
])
.start(&tracer);
handler(garage, req, client_addr).with_context(Context::current_with_span(span))

View file

@ -50,8 +50,9 @@ futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
netapp = "0.3.0"
#netapp = { version = "0.3.2", git = "https://git.deuxfleurs.fr/lx/netapp" }
#netapp = { version = "0.4", path = "../../../netapp" }
netapp = "0.4"
[dev-dependencies]
aws-sdk-s3 = "0.6"

View file

@ -55,7 +55,7 @@ struct Opt {
#[tokio::main]
async fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "garage=info")
std::env::set_var("RUST_LOG", "netapp=info,garage=info")
}
pretty_env_logger::init();
sodiumoxide::init().expect("Unable to init sodiumoxide");
@ -106,7 +106,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
// Generate a temporary keypair for our RPC client
let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair();
let netapp = NetApp::new(network_key, sk);
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk);
// Find and parse the address of the target host
let (id, addr) = if let Some(h) = opt.rpc_host {

View file

@ -39,4 +39,5 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi
opentelemetry = "0.17"
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
netapp = "0.3.0"
#netapp = { version = "0.4", path = "../../../netapp" }
netapp = "0.4"

View file

@ -14,7 +14,10 @@ use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{watch, Mutex, Notify};
use opentelemetry::KeyValue;
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
Context, KeyValue,
};
use garage_util::data::*;
use garage_util::error::*;
@ -554,7 +557,24 @@ impl BlockManager {
let start_time = SystemTime::now();
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
let res = self.resync_block(&hash).await;
let tracer = opentelemetry::global::tracer("garage");
let trace_id = gen_uuid();
let span = tracer
.span_builder("Resync block")
.with_trace_id(
opentelemetry::trace::TraceId::from_hex(&hex::encode(
&trace_id.as_slice()[..16],
))
.unwrap(),
)
.with_attributes(vec![KeyValue::new("block", format!("{:?}", hash))])
.start(&tracer);
let res = self
.resync_block(&hash)
.with_context(Context::current_with_span(span))
.await;
self.metrics.resync_counter.add(1);
self.metrics

View file

@ -37,6 +37,8 @@ tokio-stream = { version = "0.1", features = ["net"] }
opentelemetry = "0.17"
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
netapp = "0.3.1"
#netapp = { version = "0.4", path = "../../../netapp", features = ["telemetry"] }
netapp = "0.4"
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }

View file

@ -152,14 +152,8 @@ impl RpcHelper {
self.0.metrics.rpc_counter.add(1, &metric_tags);
let rpc_start_time = SystemTime::now();
let tracer = opentelemetry::global::tracer("garage");
let mut span = tracer.start(format!("RPC {}", endpoint.path()));
span.set_attribute(KeyValue::new("to", format!("{:?}", to)));
let node_id = to.into();
let rpc_call = endpoint
.call(&node_id, &msg, strat.rs_priority)
.with_context(Context::current_with_span(span));
let rpc_call = endpoint.call(&node_id, msg, strat.rs_priority);
select! {
res = rpc_call => {
@ -246,7 +240,7 @@ impl RpcHelper {
let quorum = strategy.rs_quorum.unwrap_or(to.len());
let tracer = opentelemetry::global::tracer("garage");
let mut span = tracer.start(format!("RPC {} to {:?}", endpoint.path(), to));
let mut span = tracer.start(format!("RPC {} to {}", endpoint.path(), to.len()));
span.set_attribute(KeyValue::new("to", format!("{:?}", to)));
span.set_attribute(KeyValue::new("quorum", quorum as i64));
@ -329,7 +323,6 @@ impl RpcHelper {
// reach quorum, start some new requests.
while successes.len() + resp_stream.len() < quorum {
if let Some((_, _, _, req_to, fut)) = requests.next() {
let tracer = opentelemetry::global::tracer("garage");
let span = tracer.start(format!("RPC to {:?}", req_to));
resp_stream.push(tokio::spawn(
fut.with_context(Context::current_with_span(span)),

View file

@ -37,6 +37,9 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
/// Version tag used for version check upon Netapp connection
pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650006; // garage 0x0006
/// RPC endpoint used for calls related to membership
pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
@ -188,7 +191,10 @@ impl System {
) -> Arc<Self> {
let node_key =
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
info!("Node public key: {}", hex::encode(&node_key.public_key()));
info!(
"Node ID of this node: {}",
hex::encode(&node_key.public_key()[..8])
);
let persist_cluster_layout = Persister::new(&config.metadata_dir, "cluster_layout");
let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
@ -216,13 +222,7 @@ impl System {
let ring = Ring::new(cluster_layout, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring));
if let Some(addr) = config.rpc_public_addr {
println!("{}@{}", hex::encode(&node_key.public_key()), addr);
} else {
println!("{}", hex::encode(&node_key.public_key()));
}
let netapp = NetApp::new(network_key, node_key);
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
let fullmesh = FullMeshPeeringStrategy::new(
netapp.clone(),
config.bootstrap_peers.clone(),

View file

@ -34,7 +34,8 @@ futures = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
netapp = "0.3.0"
#netapp = { version = "0.4", path = "../../../netapp" }
netapp = "0.4"
http = "0.2"
hyper = "0.14"