Compare commits
8 commits
stream-bod
...
main
Author | SHA1 | Date | |
---|---|---|---|
170ddbfac4 | |||
2484d7654a | |||
b7beb15492 | |||
34aade6ce9 | |||
6df6411b72 | |||
e4c0be848d | |||
1a413eef97 | |||
8ac109e3a8 |
11 changed files with 108 additions and 68 deletions
36
.drone.yml
36
.drone.yml
|
@ -1,28 +1,17 @@
|
||||||
|
---
|
||||||
kind: pipeline
|
kind: pipeline
|
||||||
name: default
|
name: default
|
||||||
|
|
||||||
workspace:
|
|
||||||
base: /drone
|
|
||||||
|
|
||||||
clone:
|
|
||||||
disable: true
|
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: clone
|
|
||||||
image: alpine/git
|
|
||||||
commands:
|
|
||||||
- mkdir -p cargo
|
|
||||||
- git clone $DRONE_GIT_HTTP_URL
|
|
||||||
- cd netapp
|
|
||||||
- git checkout $DRONE_COMMIT
|
|
||||||
|
|
||||||
- name: style
|
- name: style
|
||||||
image: rust:1.58-buster
|
image: rust:1.58-buster
|
||||||
environment:
|
environment:
|
||||||
CARGO_HOME: /drone/cargo
|
CARGO_HOME: /drone/cargo
|
||||||
|
volumes:
|
||||||
|
- name: cargo
|
||||||
|
path: /drone/cargo
|
||||||
commands:
|
commands:
|
||||||
- rustup component add rustfmt clippy
|
- rustup component add rustfmt clippy
|
||||||
- cd netapp
|
|
||||||
- cargo fmt -- --check
|
- cargo fmt -- --check
|
||||||
- cargo clippy --all-features -- --deny warnings
|
- cargo clippy --all-features -- --deny warnings
|
||||||
- cargo clippy --example fullmesh -- --deny warnings
|
- cargo clippy --example fullmesh -- --deny warnings
|
||||||
|
@ -32,11 +21,13 @@ steps:
|
||||||
image: rust:1.58-buster
|
image: rust:1.58-buster
|
||||||
environment:
|
environment:
|
||||||
CARGO_HOME: /drone/cargo
|
CARGO_HOME: /drone/cargo
|
||||||
|
volumes:
|
||||||
|
- name: cargo
|
||||||
|
path: /drone/cargo
|
||||||
commands:
|
commands:
|
||||||
- apt-get update
|
- apt-get update
|
||||||
- apt-get install --yes libsodium-dev
|
- apt-get install --yes libsodium-dev
|
||||||
- cargo install -f cargo-all-features
|
- cargo install -f cargo-all-features
|
||||||
- cd netapp
|
|
||||||
- cargo build-all-features
|
- cargo build-all-features
|
||||||
- cargo build --example fullmesh
|
- cargo build --example fullmesh
|
||||||
- cargo build --example basalt --features "basalt"
|
- cargo build --example basalt --features "basalt"
|
||||||
|
@ -45,8 +36,19 @@ steps:
|
||||||
image: rust:1.58-buster
|
image: rust:1.58-buster
|
||||||
environment:
|
environment:
|
||||||
CARGO_HOME: /drone/cargo
|
CARGO_HOME: /drone/cargo
|
||||||
|
volumes:
|
||||||
|
- name: cargo
|
||||||
|
path: /drone/cargo
|
||||||
commands:
|
commands:
|
||||||
- apt-get update
|
- apt-get update
|
||||||
- apt-get install --yes libsodium-dev
|
- apt-get install --yes libsodium-dev
|
||||||
- cd netapp
|
|
||||||
- cargo test --all-features -- --test-threads 1
|
- cargo test --all-features -- --test-threads 1
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
- name: cargo
|
||||||
|
temp: {}
|
||||||
|
---
|
||||||
|
kind: signature
|
||||||
|
hmac: f0d1a9e8d85a22c1d9084b4d90c9930be9700da52284f1875ece996cc52a6ce9
|
||||||
|
|
||||||
|
...
|
||||||
|
|
6
Cargo.lock
generated
6
Cargo.lock
generated
|
@ -428,7 +428,7 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "netapp"
|
name = "netapp"
|
||||||
version = "0.5.0"
|
version = "0.10.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
@ -677,9 +677,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rmp-serde"
|
name = "rmp-serde"
|
||||||
version = "0.15.5"
|
version = "1.1.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "723ecff9ad04f4ad92fe1c8ca6c20d2196d9286e9c60727c4cb5511629260e9d"
|
checksum = "bffea85eea980d8a74453e5d02a8d93028f3c34725de143085a844ebe953258a"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"byteorder",
|
"byteorder",
|
||||||
"rmp",
|
"rmp",
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "netapp"
|
name = "netapp"
|
||||||
version = "0.5.0"
|
version = "0.10.0"
|
||||||
authors = ["Alex Auvolat <alex@adnab.me>"]
|
authors = ["Alex Auvolat <alex@adnab.me>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
license-file = "LICENSE"
|
license-file = "LICENSE"
|
||||||
|
@ -27,7 +27,7 @@ tokio-util = { version = "0.7", default-features = false, features = ["compat",
|
||||||
tokio-stream = "0.1.7"
|
tokio-stream = "0.1.7"
|
||||||
|
|
||||||
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
||||||
rmp-serde = "0.15"
|
rmp-serde = "1.1"
|
||||||
hex = "0.4.2"
|
hex = "0.4.2"
|
||||||
|
|
||||||
rand = { version = "0.8" }
|
rand = { version = "0.8" }
|
||||||
|
|
|
@ -72,7 +72,7 @@ async fn main() {
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("Node private key: {}", hex::encode(&privkey));
|
info!("Node private key: {}", hex::encode(&privkey));
|
||||||
info!("Node public key: {}", hex::encode(&privkey.public_key()));
|
info!("Node public key: {}", hex::encode(privkey.public_key()));
|
||||||
|
|
||||||
let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
|
let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
|
||||||
let listen_addr: SocketAddr = opt.listen_addr.parse().unwrap();
|
let listen_addr: SocketAddr = opt.listen_addr.parse().unwrap();
|
||||||
|
@ -94,7 +94,7 @@ async fn main() {
|
||||||
|
|
||||||
info!("Add more peers to this mesh by running: fullmesh -n {} -l 127.0.0.1:$((1000 + $RANDOM)) -b {}@{}",
|
info!("Add more peers to this mesh by running: fullmesh -n {} -l 127.0.0.1:$((1000 + $RANDOM)) -b {}@{}",
|
||||||
hex::encode(&netid),
|
hex::encode(&netid),
|
||||||
hex::encode(&privkey.public_key()),
|
hex::encode(privkey.public_key()),
|
||||||
listen_addr);
|
listen_addr);
|
||||||
|
|
||||||
let watch_cancel = netapp::util::watch_ctrl_c();
|
let watch_cancel = netapp::util::watch_ctrl_c();
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use std::cmp::Ordering;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
|
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
|
@ -44,7 +45,7 @@ impl BytesBuf {
|
||||||
|
|
||||||
/// Takes the whole content of the buffer and returns it as a single Bytes unit
|
/// Takes the whole content of the buffer and returns it as a single Bytes unit
|
||||||
pub fn take_all(&mut self) -> Bytes {
|
pub fn take_all(&mut self) -> Bytes {
|
||||||
if self.buf.len() == 0 {
|
if self.buf.is_empty() {
|
||||||
Bytes::new()
|
Bytes::new()
|
||||||
} else if self.buf.len() == 1 {
|
} else if self.buf.len() == 1 {
|
||||||
self.buf_len = 0;
|
self.buf_len = 0;
|
||||||
|
@ -82,31 +83,35 @@ impl BytesBuf {
|
||||||
fn take_exact_ok(&mut self, len: usize) -> Bytes {
|
fn take_exact_ok(&mut self, len: usize) -> Bytes {
|
||||||
assert!(len <= self.buf_len);
|
assert!(len <= self.buf_len);
|
||||||
let front = self.buf.pop_front().unwrap();
|
let front = self.buf.pop_front().unwrap();
|
||||||
if front.len() > len {
|
match front.len().cmp(&len) {
|
||||||
self.buf.push_front(front.slice(len..));
|
Ordering::Greater => {
|
||||||
self.buf_len -= len;
|
self.buf.push_front(front.slice(len..));
|
||||||
front.slice(..len)
|
self.buf_len -= len;
|
||||||
} else if front.len() == len {
|
front.slice(..len)
|
||||||
self.buf_len -= len;
|
}
|
||||||
front
|
Ordering::Equal => {
|
||||||
} else {
|
self.buf_len -= len;
|
||||||
let mut ret = BytesMut::with_capacity(len);
|
front
|
||||||
ret.extend_from_slice(&front[..]);
|
}
|
||||||
self.buf_len -= front.len();
|
Ordering::Less => {
|
||||||
while ret.len() < len {
|
let mut ret = BytesMut::with_capacity(len);
|
||||||
let front = self.buf.pop_front().unwrap();
|
ret.extend_from_slice(&front[..]);
|
||||||
if front.len() > len - ret.len() {
|
self.buf_len -= front.len();
|
||||||
let take = len - ret.len();
|
while ret.len() < len {
|
||||||
ret.extend_from_slice(&front[..take]);
|
let front = self.buf.pop_front().unwrap();
|
||||||
self.buf.push_front(front.slice(take..));
|
if front.len() > len - ret.len() {
|
||||||
self.buf_len -= take;
|
let take = len - ret.len();
|
||||||
break;
|
ret.extend_from_slice(&front[..take]);
|
||||||
} else {
|
self.buf.push_front(front.slice(take..));
|
||||||
ret.extend_from_slice(&front[..]);
|
self.buf_len -= take;
|
||||||
self.buf_len -= front.len();
|
break;
|
||||||
}
|
} else {
|
||||||
|
ret.extend_from_slice(&front[..]);
|
||||||
|
self.buf_len -= front.len();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ret.freeze()
|
||||||
}
|
}
|
||||||
ret.freeze()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,6 +121,12 @@ impl BytesBuf {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Default for BytesBuf {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<Bytes> for BytesBuf {
|
impl From<Bytes> for BytesBuf {
|
||||||
fn from(b: Bytes) -> BytesBuf {
|
fn from(b: Bytes) -> BytesBuf {
|
||||||
let mut ret = BytesBuf::new();
|
let mut ret = BytesBuf::new();
|
||||||
|
|
|
@ -65,7 +65,7 @@ impl ClientConn {
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"Handshake complete (client) with {}@{}",
|
"Handshake complete (client) with {}@{}",
|
||||||
hex::encode(&peer_id),
|
hex::encode(peer_id),
|
||||||
remote_addr
|
remote_addr
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -250,7 +250,7 @@ impl CancelOnDrop {
|
||||||
fn for_stream(self, stream: ByteStream) -> CancelOnDropStream {
|
fn for_stream(self, stream: ByteStream) -> CancelOnDropStream {
|
||||||
CancelOnDropStream {
|
CancelOnDropStream {
|
||||||
cancel: Some(self),
|
cancel: Some(self),
|
||||||
stream: stream,
|
stream,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,11 +54,11 @@ pub struct OrderTagStream(u64);
|
||||||
|
|
||||||
impl OrderTag {
|
impl OrderTag {
|
||||||
/// Create a new stream from which to generate order tags. Example:
|
/// Create a new stream from which to generate order tags. Example:
|
||||||
/// ```ignore
|
/// ```ignore
|
||||||
/// let stream = OrderTag.stream();
|
/// let stream = OrderTag.stream();
|
||||||
/// let tag_1 = stream.order(1);
|
/// let tag_1 = stream.order(1);
|
||||||
/// let tag_2 = stream.order(2);
|
/// let tag_2 = stream.order(2);
|
||||||
/// ```
|
/// ```
|
||||||
pub fn stream() -> OrderTagStream {
|
pub fn stream() -> OrderTagStream {
|
||||||
OrderTagStream(thread_rng().gen())
|
OrderTagStream(thread_rng().gen())
|
||||||
}
|
}
|
||||||
|
@ -155,7 +155,7 @@ impl<M: Message> Req<M> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn from_enc(enc: ReqEnc) -> Result<Self, rmp_serde::decode::Error> {
|
pub(crate) fn from_enc(enc: ReqEnc) -> Result<Self, rmp_serde::decode::Error> {
|
||||||
let msg = rmp_serde::decode::from_read_ref(&enc.msg)?;
|
let msg = rmp_serde::decode::from_slice(&enc.msg)?;
|
||||||
Ok(Req {
|
Ok(Req {
|
||||||
msg: Arc::new(msg),
|
msg: Arc::new(msg),
|
||||||
msg_ser: Some(enc.msg),
|
msg_ser: Some(enc.msg),
|
||||||
|
@ -316,7 +316,7 @@ impl<M: Message> Resp<M> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn from_enc(enc: RespEnc) -> Result<Self, Error> {
|
pub(crate) fn from_enc(enc: RespEnc) -> Result<Self, Error> {
|
||||||
let msg = rmp_serde::decode::from_read_ref(&enc.msg)?;
|
let msg = rmp_serde::decode::from_slice(&enc.msg)?;
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
_phantom: Default::default(),
|
_phantom: Default::default(),
|
||||||
msg,
|
msg,
|
||||||
|
|
|
@ -25,9 +25,10 @@ const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30);
|
||||||
const CONN_MAX_RETRIES: usize = 10;
|
const CONN_MAX_RETRIES: usize = 10;
|
||||||
const PING_INTERVAL: Duration = Duration::from_secs(15);
|
const PING_INTERVAL: Duration = Duration::from_secs(15);
|
||||||
const LOOP_DELAY: Duration = Duration::from_secs(1);
|
const LOOP_DELAY: Duration = Duration::from_secs(1);
|
||||||
const PING_TIMEOUT: Duration = Duration::from_secs(10);
|
|
||||||
const FAILED_PING_THRESHOLD: usize = 4;
|
const FAILED_PING_THRESHOLD: usize = 4;
|
||||||
|
|
||||||
|
const DEFAULT_PING_TIMEOUT_MILLIS: u64 = 10_000;
|
||||||
|
|
||||||
// -- Protocol messages --
|
// -- Protocol messages --
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
|
@ -112,7 +113,7 @@ impl PeerInfo {
|
||||||
/// PeerConnState: possible states for our tentative connections to given peer
|
/// PeerConnState: possible states for our tentative connections to given peer
|
||||||
/// This structure is only interested in recording connection info for outgoing
|
/// This structure is only interested in recording connection info for outgoing
|
||||||
/// TCP connections
|
/// TCP connections
|
||||||
#[derive(Copy, Clone, Debug, PartialEq)]
|
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||||
pub enum PeerConnState {
|
pub enum PeerConnState {
|
||||||
/// This entry represents ourself (the local node)
|
/// This entry represents ourself (the local node)
|
||||||
Ourself,
|
Ourself,
|
||||||
|
@ -184,6 +185,8 @@ pub struct FullMeshPeeringStrategy {
|
||||||
next_ping_id: AtomicU64,
|
next_ping_id: AtomicU64,
|
||||||
ping_endpoint: Arc<Endpoint<PingMessage, Self>>,
|
ping_endpoint: Arc<Endpoint<PingMessage, Self>>,
|
||||||
peer_list_endpoint: Arc<Endpoint<PeerListMessage, Self>>,
|
peer_list_endpoint: Arc<Endpoint<PeerListMessage, Self>>,
|
||||||
|
|
||||||
|
ping_timeout_millis: AtomicU64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FullMeshPeeringStrategy {
|
impl FullMeshPeeringStrategy {
|
||||||
|
@ -220,6 +223,7 @@ impl FullMeshPeeringStrategy {
|
||||||
next_ping_id: AtomicU64::new(42),
|
next_ping_id: AtomicU64::new(42),
|
||||||
ping_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/Ping".into()),
|
ping_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/Ping".into()),
|
||||||
peer_list_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/PeerList".into()),
|
peer_list_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/PeerList".into()),
|
||||||
|
ping_timeout_millis: DEFAULT_PING_TIMEOUT_MILLIS.into(),
|
||||||
});
|
});
|
||||||
|
|
||||||
strat.update_public_peer_list(&strat.known_hosts.read().unwrap());
|
strat.update_public_peer_list(&strat.known_hosts.read().unwrap());
|
||||||
|
@ -331,6 +335,12 @@ impl FullMeshPeeringStrategy {
|
||||||
self.public_peer_list.load_full()
|
self.public_peer_list.load_full()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set the timeout for ping messages, in milliseconds
|
||||||
|
pub fn set_ping_timeout_millis(&self, timeout: u64) {
|
||||||
|
self.ping_timeout_millis
|
||||||
|
.store(timeout, atomic::Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
// -- internal stuff --
|
// -- internal stuff --
|
||||||
|
|
||||||
fn update_public_peer_list(&self, known_hosts: &KnownHosts) {
|
fn update_public_peer_list(&self, known_hosts: &KnownHosts) {
|
||||||
|
@ -372,6 +382,8 @@ impl FullMeshPeeringStrategy {
|
||||||
let peer_list_hash = self.known_hosts.read().unwrap().hash;
|
let peer_list_hash = self.known_hosts.read().unwrap().hash;
|
||||||
let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed);
|
let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed);
|
||||||
let ping_time = Instant::now();
|
let ping_time = Instant::now();
|
||||||
|
let ping_timeout =
|
||||||
|
Duration::from_millis(self.ping_timeout_millis.load(atomic::Ordering::Relaxed));
|
||||||
let ping_msg = PingMessage {
|
let ping_msg = PingMessage {
|
||||||
id: ping_id,
|
id: ping_id,
|
||||||
peer_list_hash,
|
peer_list_hash,
|
||||||
|
@ -385,7 +397,7 @@ impl FullMeshPeeringStrategy {
|
||||||
);
|
);
|
||||||
let ping_response = select! {
|
let ping_response = select! {
|
||||||
r = self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH) => r,
|
r = self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH) => r,
|
||||||
_ = tokio::time::sleep(PING_TIMEOUT) => Err(Error::Message("Ping timeout".into())),
|
_ = tokio::time::sleep(ping_timeout) => Err(Error::Message("Ping timeout".into())),
|
||||||
};
|
};
|
||||||
|
|
||||||
match ping_response {
|
match ping_response {
|
||||||
|
|
|
@ -79,7 +79,7 @@ impl ServerConn {
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"Handshake complete (server) with {}@{}",
|
"Handshake complete (server) with {}@{}",
|
||||||
hex::encode(&peer_id),
|
hex::encode(peer_id),
|
||||||
remote_addr
|
remote_addr
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
26
src/util.rs
26
src/util.rs
|
@ -1,5 +1,4 @@
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::net::ToSocketAddrs;
|
|
||||||
|
|
||||||
use log::info;
|
use log::info;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
@ -18,9 +17,7 @@ where
|
||||||
T: Serialize + ?Sized,
|
T: Serialize + ?Sized,
|
||||||
{
|
{
|
||||||
let mut wr = Vec::with_capacity(128);
|
let mut wr = Vec::with_capacity(128);
|
||||||
let mut se = rmp_serde::Serializer::new(&mut wr)
|
let mut se = rmp_serde::Serializer::new(&mut wr).with_struct_map();
|
||||||
.with_struct_map()
|
|
||||||
.with_string_variants();
|
|
||||||
val.serialize(&mut se)?;
|
val.serialize(&mut se)?;
|
||||||
Ok(wr)
|
Ok(wr)
|
||||||
}
|
}
|
||||||
|
@ -63,7 +60,7 @@ pub fn watch_ctrl_c() -> watch::Receiver<bool> {
|
||||||
pub fn parse_peer_addr(peer: &str) -> Option<(NodeID, SocketAddr)> {
|
pub fn parse_peer_addr(peer: &str) -> Option<(NodeID, SocketAddr)> {
|
||||||
let delim = peer.find('@')?;
|
let delim = peer.find('@')?;
|
||||||
let (key, ip) = peer.split_at(delim);
|
let (key, ip) = peer.split_at(delim);
|
||||||
let pubkey = NodeID::from_slice(&hex::decode(&key).ok()?)?;
|
let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?;
|
||||||
let ip = ip[1..].parse::<SocketAddr>().ok()?;
|
let ip = ip[1..].parse::<SocketAddr>().ok()?;
|
||||||
Some((pubkey, ip))
|
Some((pubkey, ip))
|
||||||
}
|
}
|
||||||
|
@ -71,12 +68,29 @@ pub fn parse_peer_addr(peer: &str) -> Option<(NodeID, SocketAddr)> {
|
||||||
/// Parse and resolve a peer's address including public key, written in the format:
|
/// Parse and resolve a peer's address including public key, written in the format:
|
||||||
/// `<public key hex>@<ip or hostname>:<port>`
|
/// `<public key hex>@<ip or hostname>:<port>`
|
||||||
pub fn parse_and_resolve_peer_addr(peer: &str) -> Option<(NodeID, Vec<SocketAddr>)> {
|
pub fn parse_and_resolve_peer_addr(peer: &str) -> Option<(NodeID, Vec<SocketAddr>)> {
|
||||||
|
use std::net::ToSocketAddrs;
|
||||||
|
|
||||||
let delim = peer.find('@')?;
|
let delim = peer.find('@')?;
|
||||||
let (key, host) = peer.split_at(delim);
|
let (key, host) = peer.split_at(delim);
|
||||||
let pubkey = NodeID::from_slice(&hex::decode(&key).ok()?)?;
|
let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?;
|
||||||
let hosts = host[1..].to_socket_addrs().ok()?.collect::<Vec<_>>();
|
let hosts = host[1..].to_socket_addrs().ok()?.collect::<Vec<_>>();
|
||||||
if hosts.is_empty() {
|
if hosts.is_empty() {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
Some((pubkey, hosts))
|
Some((pubkey, hosts))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// async version of parse_and_resolve_peer_addr
|
||||||
|
pub async fn parse_and_resolve_peer_addr_async(peer: &str) -> Option<(NodeID, Vec<SocketAddr>)> {
|
||||||
|
let delim = peer.find('@')?;
|
||||||
|
let (key, host) = peer.split_at(delim);
|
||||||
|
let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?;
|
||||||
|
let hosts = tokio::net::lookup_host(&host[1..])
|
||||||
|
.await
|
||||||
|
.ok()?
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
if hosts.is_empty() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some((pubkey, hosts))
|
||||||
|
}
|
||||||
|
|
1
target
Symbolic link
1
target
Symbolic link
|
@ -0,0 +1 @@
|
||||||
|
/home/lx.nobackup/rust/netapp.target/
|
Loading…
Add table
Reference in a new issue