diff --git a/Cargo.lock b/Cargo.lock index baf6494d..7454d58e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -730,7 +730,7 @@ dependencies = [ "hex", "http", "kuska-sodiumoxide", - "netapp", + "netapp 0.4.0", "pretty_env_logger", "rand", "rmp-serde 0.15.5", @@ -818,7 +818,7 @@ dependencies = [ "garage_util 0.5.1", "hex", "log", - "netapp", + "netapp 0.3.1", "rand", "rmp-serde 0.15.5", "serde", @@ -842,7 +842,7 @@ dependencies = [ "garage_table 0.6.0", "garage_util 0.6.0", "hex", - "netapp", + "netapp 0.4.0", "opentelemetry", "rand", "rmp-serde 0.15.5", @@ -871,7 +871,7 @@ dependencies = [ "hyper", "kuska-sodiumoxide", "log", - "netapp", + "netapp 0.3.1", "rand", "rmp-serde 0.15.5", "serde", @@ -895,7 +895,7 @@ dependencies = [ "hex", "hyper", "kuska-sodiumoxide", - "netapp", + "netapp 0.4.0", "opentelemetry", "rand", "rmp-serde 0.15.5", @@ -964,7 +964,7 @@ dependencies = [ "http", "hyper", "log", - "netapp", + "netapp 0.3.1", "rand", "rmp-serde 0.15.5", "serde", @@ -987,7 +987,7 @@ dependencies = [ "hex", "http", "hyper", - "netapp", + "netapp 0.4.0", "rand", "rmp-serde 0.15.5", "serde", @@ -1505,6 +1505,29 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "netapp" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22c545a13b0c47b47e8052b35c4884dbe33c9ea62607371b0f4f1b0490cafd38" +dependencies = [ + "arc-swap", + "async-trait", + "bytes 0.6.0", + "cfg-if", + "err-derive 0.2.4", + "futures", + "hex", + "kuska-handshake", + "kuska-sodiumoxide", + "log", + "rmp-serde 0.14.4", + "serde", + "tokio", + "tokio-stream", + "tokio-util", +] + [[package]] name = "nom" version = "7.1.0" diff --git a/Cargo.nix b/Cargo.nix index 47edf542..7fb76d2c 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -1059,7 +1059,7 @@ in git_version = rustPackages."registry+https://github.com/rust-lang/crates.io-index".git-version."0.3.5" { inherit profileName; }; hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }; sodiumoxide = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; }; - netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.3.1" { inherit profileName; }; + netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.4.0" { inherit profileName; }; pretty_env_logger = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pretty_env_logger."0.4.0" { inherit profileName; }; rand = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.4" { inherit profileName; }; rmp_serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }; @@ -1185,7 +1185,7 @@ in garage_table = rustPackages."unknown".garage_table."0.6.0" { inherit profileName; }; garage_util = rustPackages."unknown".garage_util."0.6.0" { inherit profileName; }; hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }; - netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.3.1" { inherit profileName; }; + netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.4.0" { inherit profileName; }; opentelemetry = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }; rand = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.4" { inherit profileName; }; rmp_serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }; @@ -1242,7 +1242,7 @@ in hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }; hyper = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."0.14.17" { inherit profileName; }; sodiumoxide = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; }; - netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.3.1" { inherit profileName; }; + netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.4.0" { inherit profileName; }; opentelemetry = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }; rand = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.4" { inherit profileName; }; rmp_serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }; @@ -1342,7 +1342,7 @@ in hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }; http = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.5" { inherit profileName; }; hyper = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."0.14.17" { inherit profileName; }; - netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.3.1" { inherit profileName; }; + netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.4.0" { inherit profileName; }; rand = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.4" { inherit profileName; }; rmp_serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }; serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.130" { inherit profileName; }; @@ -2024,6 +2024,33 @@ in }; }); + "registry+https://github.com/rust-lang/crates.io-index".netapp."0.4.0" = overridableMkRustCrate (profileName: rec { + name = "netapp"; + version = "0.4.0"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "22c545a13b0c47b47e8052b35c4884dbe33c9ea62607371b0f4f1b0490cafd38"; }; + features = builtins.concatLists [ + [ "default" ] + ]; + dependencies = { + arc_swap = rustPackages."registry+https://github.com/rust-lang/crates.io-index".arc-swap."1.4.0" { inherit profileName; }; + async_trait = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.51" { profileName = "__noProfile"; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."0.6.0" { inherit profileName; }; + cfg_if = rustPackages."registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" { inherit profileName; }; + err_derive = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.2.4" { profileName = "__noProfile"; }; + futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.17" { inherit profileName; }; + hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }; + kuska_handshake = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-handshake."0.2.0" { inherit profileName; }; + sodiumoxide = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; }; + log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.14" { inherit profileName; }; + rmp_serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.14.4" { inherit profileName; }; + serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.130" { inherit profileName; }; + tokio = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.12.0" { inherit profileName; }; + tokio_stream = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.8" { inherit profileName; }; + tokio_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-util."0.6.8" { inherit profileName; }; + }; + }); + "registry+https://github.com/rust-lang/crates.io-index".nom."7.1.0" = overridableMkRustCrate (profileName: rec { name = "nom"; version = "7.1.0"; diff --git a/src/api/api_server.rs b/src/api/api_server.rs index 15b00dde..00d582d1 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -9,7 +9,7 @@ use hyper::{Body, Method, Request, Response, Server}; use opentelemetry::{ trace::{FutureExt, TraceContextExt, Tracer}, - Context, + Context, KeyValue, }; use garage_util::data::*; @@ -50,15 +50,19 @@ pub async fn run_api_server( let garage = garage.clone(); let tracer = opentelemetry::global::tracer("garage"); - let uuid = gen_uuid(); + let trace_id = gen_uuid(); let span = tracer .span_builder("S3 API call (unknown)") .with_trace_id( opentelemetry::trace::TraceId::from_hex(&hex::encode( - &uuid.as_slice()[..16], + &trace_id.as_slice()[..16], )) .unwrap(), ) + .with_attributes(vec![ + KeyValue::new("method", format!("{}", req.method())), + KeyValue::new("uri", req.uri().path().to_string()), + ]) .start(&tracer); handler(garage, req, client_addr).with_context(Context::current_with_span(span)) diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 76c6b51f..0e197593 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -50,8 +50,9 @@ futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -netapp = "0.3.0" +#netapp = { version = "0.3.2", git = "https://git.deuxfleurs.fr/lx/netapp" } +#netapp = { version = "0.4", path = "../../../netapp" } +netapp = "0.4" [dev-dependencies] aws-sdk-s3 = "0.6" diff --git a/src/garage/main.rs b/src/garage/main.rs index 7de7740f..08ec912b 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -55,7 +55,7 @@ struct Opt { #[tokio::main] async fn main() { if std::env::var("RUST_LOG").is_err() { - std::env::set_var("RUST_LOG", "garage=info") + std::env::set_var("RUST_LOG", "netapp=info,garage=info") } pretty_env_logger::init(); sodiumoxide::init().expect("Unable to init sodiumoxide"); @@ -106,7 +106,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { // Generate a temporary keypair for our RPC client let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair(); - let netapp = NetApp::new(network_key, sk); + let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk); // Find and parse the address of the target host let (id, addr) = if let Some(h) = opt.rpc_host { diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 4465af1c..8083445e 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -39,4 +39,5 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi opentelemetry = "0.17" #netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -netapp = "0.3.0" +#netapp = { version = "0.4", path = "../../../netapp" } +netapp = "0.4" diff --git a/src/model/block.rs b/src/model/block.rs index ddda5e57..3799c6aa 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -14,7 +14,10 @@ use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; -use opentelemetry::KeyValue; +use opentelemetry::{ + trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, + Context, KeyValue, +}; use garage_util::data::*; use garage_util::error::*; @@ -554,7 +557,24 @@ impl BlockManager { let start_time = SystemTime::now(); let hash = Hash::try_from(&hash_bytes[..]).unwrap(); - let res = self.resync_block(&hash).await; + + let tracer = opentelemetry::global::tracer("garage"); + let trace_id = gen_uuid(); + let span = tracer + .span_builder("Resync block") + .with_trace_id( + opentelemetry::trace::TraceId::from_hex(&hex::encode( + &trace_id.as_slice()[..16], + )) + .unwrap(), + ) + .with_attributes(vec![KeyValue::new("block", format!("{:?}", hash))]) + .start(&tracer); + + let res = self + .resync_block(&hash) + .with_context(Context::current_with_span(span)) + .await; self.metrics.resync_counter.add(1); self.metrics diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 87a499f4..bd84fde6 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -37,6 +37,8 @@ tokio-stream = { version = "0.1", features = ["net"] } opentelemetry = "0.17" #netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -netapp = "0.3.1" +#netapp = { version = "0.4", path = "../../../netapp", features = ["telemetry"] } +netapp = "0.4" + hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 97716b18..4b4235f1 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -152,14 +152,8 @@ impl RpcHelper { self.0.metrics.rpc_counter.add(1, &metric_tags); let rpc_start_time = SystemTime::now(); - let tracer = opentelemetry::global::tracer("garage"); - let mut span = tracer.start(format!("RPC {}", endpoint.path())); - span.set_attribute(KeyValue::new("to", format!("{:?}", to))); - let node_id = to.into(); - let rpc_call = endpoint - .call(&node_id, &msg, strat.rs_priority) - .with_context(Context::current_with_span(span)); + let rpc_call = endpoint.call(&node_id, msg, strat.rs_priority); select! { res = rpc_call => { @@ -246,7 +240,7 @@ impl RpcHelper { let quorum = strategy.rs_quorum.unwrap_or(to.len()); let tracer = opentelemetry::global::tracer("garage"); - let mut span = tracer.start(format!("RPC {} to {:?}", endpoint.path(), to)); + let mut span = tracer.start(format!("RPC {} to {}", endpoint.path(), to.len())); span.set_attribute(KeyValue::new("to", format!("{:?}", to))); span.set_attribute(KeyValue::new("quorum", quorum as i64)); @@ -329,7 +323,6 @@ impl RpcHelper { // reach quorum, start some new requests. while successes.len() + resp_stream.len() < quorum { if let Some((_, _, _, req_to, fut)) = requests.next() { - let tracer = opentelemetry::global::tracer("garage"); let span = tracer.start(format!("RPC to {:?}", req_to)); resp_stream.push(tokio::spawn( fut.with_context(Context::current_with_span(span)), diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 6bca6e3e..e4d289ec 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -37,6 +37,9 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); const PING_TIMEOUT: Duration = Duration::from_secs(2); +/// Version tag used for version check upon Netapp connection +pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650006; // garage 0x0006 + /// RPC endpoint used for calls related to membership pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc"; @@ -188,7 +191,10 @@ impl System { ) -> Arc { let node_key = gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID"); - info!("Node public key: {}", hex::encode(&node_key.public_key())); + info!( + "Node ID of this node: {}", + hex::encode(&node_key.public_key()[..8]) + ); let persist_cluster_layout = Persister::new(&config.metadata_dir, "cluster_layout"); let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); @@ -216,13 +222,7 @@ impl System { let ring = Ring::new(cluster_layout, replication_factor); let (update_ring, ring) = watch::channel(Arc::new(ring)); - if let Some(addr) = config.rpc_public_addr { - println!("{}@{}", hex::encode(&node_key.public_key()), addr); - } else { - println!("{}", hex::encode(&node_key.public_key())); - } - - let netapp = NetApp::new(network_key, node_key); + let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); let fullmesh = FullMeshPeeringStrategy::new( netapp.clone(), config.bootstrap_peers.clone(), diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 717cd954..1c426c86 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -34,7 +34,8 @@ futures = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } #netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -netapp = "0.3.0" +#netapp = { version = "0.4", path = "../../../netapp" } +netapp = "0.4" http = "0.2" hyper = "0.14"