Compare commits

..

8 commits

11 changed files with 108 additions and 68 deletions

View file

@ -1,28 +1,17 @@
---
kind: pipeline kind: pipeline
name: default name: default
workspace:
base: /drone
clone:
disable: true
steps: steps:
- name: clone
image: alpine/git
commands:
- mkdir -p cargo
- git clone $DRONE_GIT_HTTP_URL
- cd netapp
- git checkout $DRONE_COMMIT
- name: style - name: style
image: rust:1.58-buster image: rust:1.58-buster
environment: environment:
CARGO_HOME: /drone/cargo CARGO_HOME: /drone/cargo
volumes:
- name: cargo
path: /drone/cargo
commands: commands:
- rustup component add rustfmt clippy - rustup component add rustfmt clippy
- cd netapp
- cargo fmt -- --check - cargo fmt -- --check
- cargo clippy --all-features -- --deny warnings - cargo clippy --all-features -- --deny warnings
- cargo clippy --example fullmesh -- --deny warnings - cargo clippy --example fullmesh -- --deny warnings
@ -32,11 +21,13 @@ steps:
image: rust:1.58-buster image: rust:1.58-buster
environment: environment:
CARGO_HOME: /drone/cargo CARGO_HOME: /drone/cargo
volumes:
- name: cargo
path: /drone/cargo
commands: commands:
- apt-get update - apt-get update
- apt-get install --yes libsodium-dev - apt-get install --yes libsodium-dev
- cargo install -f cargo-all-features - cargo install -f cargo-all-features
- cd netapp
- cargo build-all-features - cargo build-all-features
- cargo build --example fullmesh - cargo build --example fullmesh
- cargo build --example basalt --features "basalt" - cargo build --example basalt --features "basalt"
@ -45,8 +36,19 @@ steps:
image: rust:1.58-buster image: rust:1.58-buster
environment: environment:
CARGO_HOME: /drone/cargo CARGO_HOME: /drone/cargo
volumes:
- name: cargo
path: /drone/cargo
commands: commands:
- apt-get update - apt-get update
- apt-get install --yes libsodium-dev - apt-get install --yes libsodium-dev
- cd netapp
- cargo test --all-features -- --test-threads 1 - cargo test --all-features -- --test-threads 1
volumes:
- name: cargo
temp: {}
---
kind: signature
hmac: f0d1a9e8d85a22c1d9084b4d90c9930be9700da52284f1875ece996cc52a6ce9
...

6
Cargo.lock generated
View file

@ -428,7 +428,7 @@ dependencies = [
[[package]] [[package]]
name = "netapp" name = "netapp"
version = "0.5.0" version = "0.10.0"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
@ -677,9 +677,9 @@ dependencies = [
[[package]] [[package]]
name = "rmp-serde" name = "rmp-serde"
version = "0.15.5" version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "723ecff9ad04f4ad92fe1c8ca6c20d2196d9286e9c60727c4cb5511629260e9d" checksum = "bffea85eea980d8a74453e5d02a8d93028f3c34725de143085a844ebe953258a"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"rmp", "rmp",

View file

@ -1,6 +1,6 @@
[package] [package]
name = "netapp" name = "netapp"
version = "0.5.0" version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>"] authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018" edition = "2018"
license-file = "LICENSE" license-file = "LICENSE"
@ -27,7 +27,7 @@ tokio-util = { version = "0.7", default-features = false, features = ["compat",
tokio-stream = "0.1.7" tokio-stream = "0.1.7"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
rmp-serde = "0.15" rmp-serde = "1.1"
hex = "0.4.2" hex = "0.4.2"
rand = { version = "0.8" } rand = { version = "0.8" }

View file

@ -72,7 +72,7 @@ async fn main() {
}; };
info!("Node private key: {}", hex::encode(&privkey)); info!("Node private key: {}", hex::encode(&privkey));
info!("Node public key: {}", hex::encode(&privkey.public_key())); info!("Node public key: {}", hex::encode(privkey.public_key()));
let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
let listen_addr: SocketAddr = opt.listen_addr.parse().unwrap(); let listen_addr: SocketAddr = opt.listen_addr.parse().unwrap();
@ -94,7 +94,7 @@ async fn main() {
info!("Add more peers to this mesh by running: fullmesh -n {} -l 127.0.0.1:$((1000 + $RANDOM)) -b {}@{}", info!("Add more peers to this mesh by running: fullmesh -n {} -l 127.0.0.1:$((1000 + $RANDOM)) -b {}@{}",
hex::encode(&netid), hex::encode(&netid),
hex::encode(&privkey.public_key()), hex::encode(privkey.public_key()),
listen_addr); listen_addr);
let watch_cancel = netapp::util::watch_ctrl_c(); let watch_cancel = netapp::util::watch_ctrl_c();

View file

@ -1,3 +1,4 @@
use std::cmp::Ordering;
use std::collections::VecDeque; use std::collections::VecDeque;
use bytes::BytesMut; use bytes::BytesMut;
@ -44,7 +45,7 @@ impl BytesBuf {
/// Takes the whole content of the buffer and returns it as a single Bytes unit /// Takes the whole content of the buffer and returns it as a single Bytes unit
pub fn take_all(&mut self) -> Bytes { pub fn take_all(&mut self) -> Bytes {
if self.buf.len() == 0 { if self.buf.is_empty() {
Bytes::new() Bytes::new()
} else if self.buf.len() == 1 { } else if self.buf.len() == 1 {
self.buf_len = 0; self.buf_len = 0;
@ -82,31 +83,35 @@ impl BytesBuf {
fn take_exact_ok(&mut self, len: usize) -> Bytes { fn take_exact_ok(&mut self, len: usize) -> Bytes {
assert!(len <= self.buf_len); assert!(len <= self.buf_len);
let front = self.buf.pop_front().unwrap(); let front = self.buf.pop_front().unwrap();
if front.len() > len { match front.len().cmp(&len) {
self.buf.push_front(front.slice(len..)); Ordering::Greater => {
self.buf_len -= len; self.buf.push_front(front.slice(len..));
front.slice(..len) self.buf_len -= len;
} else if front.len() == len { front.slice(..len)
self.buf_len -= len; }
front Ordering::Equal => {
} else { self.buf_len -= len;
let mut ret = BytesMut::with_capacity(len); front
ret.extend_from_slice(&front[..]); }
self.buf_len -= front.len(); Ordering::Less => {
while ret.len() < len { let mut ret = BytesMut::with_capacity(len);
let front = self.buf.pop_front().unwrap(); ret.extend_from_slice(&front[..]);
if front.len() > len - ret.len() { self.buf_len -= front.len();
let take = len - ret.len(); while ret.len() < len {
ret.extend_from_slice(&front[..take]); let front = self.buf.pop_front().unwrap();
self.buf.push_front(front.slice(take..)); if front.len() > len - ret.len() {
self.buf_len -= take; let take = len - ret.len();
break; ret.extend_from_slice(&front[..take]);
} else { self.buf.push_front(front.slice(take..));
ret.extend_from_slice(&front[..]); self.buf_len -= take;
self.buf_len -= front.len(); break;
} } else {
ret.extend_from_slice(&front[..]);
self.buf_len -= front.len();
}
}
ret.freeze()
} }
ret.freeze()
} }
} }
@ -116,6 +121,12 @@ impl BytesBuf {
} }
} }
impl Default for BytesBuf {
fn default() -> Self {
Self::new()
}
}
impl From<Bytes> for BytesBuf { impl From<Bytes> for BytesBuf {
fn from(b: Bytes) -> BytesBuf { fn from(b: Bytes) -> BytesBuf {
let mut ret = BytesBuf::new(); let mut ret = BytesBuf::new();

View file

@ -65,7 +65,7 @@ impl ClientConn {
debug!( debug!(
"Handshake complete (client) with {}@{}", "Handshake complete (client) with {}@{}",
hex::encode(&peer_id), hex::encode(peer_id),
remote_addr remote_addr
); );
@ -250,7 +250,7 @@ impl CancelOnDrop {
fn for_stream(self, stream: ByteStream) -> CancelOnDropStream { fn for_stream(self, stream: ByteStream) -> CancelOnDropStream {
CancelOnDropStream { CancelOnDropStream {
cancel: Some(self), cancel: Some(self),
stream: stream, stream,
} }
} }
} }

View file

@ -54,11 +54,11 @@ pub struct OrderTagStream(u64);
impl OrderTag { impl OrderTag {
/// Create a new stream from which to generate order tags. Example: /// Create a new stream from which to generate order tags. Example:
/// ```ignore /// ```ignore
/// let stream = OrderTag.stream(); /// let stream = OrderTag.stream();
/// let tag_1 = stream.order(1); /// let tag_1 = stream.order(1);
/// let tag_2 = stream.order(2); /// let tag_2 = stream.order(2);
/// ``` /// ```
pub fn stream() -> OrderTagStream { pub fn stream() -> OrderTagStream {
OrderTagStream(thread_rng().gen()) OrderTagStream(thread_rng().gen())
} }
@ -155,7 +155,7 @@ impl<M: Message> Req<M> {
} }
pub(crate) fn from_enc(enc: ReqEnc) -> Result<Self, rmp_serde::decode::Error> { pub(crate) fn from_enc(enc: ReqEnc) -> Result<Self, rmp_serde::decode::Error> {
let msg = rmp_serde::decode::from_read_ref(&enc.msg)?; let msg = rmp_serde::decode::from_slice(&enc.msg)?;
Ok(Req { Ok(Req {
msg: Arc::new(msg), msg: Arc::new(msg),
msg_ser: Some(enc.msg), msg_ser: Some(enc.msg),
@ -316,7 +316,7 @@ impl<M: Message> Resp<M> {
} }
pub(crate) fn from_enc(enc: RespEnc) -> Result<Self, Error> { pub(crate) fn from_enc(enc: RespEnc) -> Result<Self, Error> {
let msg = rmp_serde::decode::from_read_ref(&enc.msg)?; let msg = rmp_serde::decode::from_slice(&enc.msg)?;
Ok(Self { Ok(Self {
_phantom: Default::default(), _phantom: Default::default(),
msg, msg,

View file

@ -25,9 +25,10 @@ const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30);
const CONN_MAX_RETRIES: usize = 10; const CONN_MAX_RETRIES: usize = 10;
const PING_INTERVAL: Duration = Duration::from_secs(15); const PING_INTERVAL: Duration = Duration::from_secs(15);
const LOOP_DELAY: Duration = Duration::from_secs(1); const LOOP_DELAY: Duration = Duration::from_secs(1);
const PING_TIMEOUT: Duration = Duration::from_secs(10);
const FAILED_PING_THRESHOLD: usize = 4; const FAILED_PING_THRESHOLD: usize = 4;
const DEFAULT_PING_TIMEOUT_MILLIS: u64 = 10_000;
// -- Protocol messages -- // -- Protocol messages --
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
@ -112,7 +113,7 @@ impl PeerInfo {
/// PeerConnState: possible states for our tentative connections to given peer /// PeerConnState: possible states for our tentative connections to given peer
/// This structure is only interested in recording connection info for outgoing /// This structure is only interested in recording connection info for outgoing
/// TCP connections /// TCP connections
#[derive(Copy, Clone, Debug, PartialEq)] #[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum PeerConnState { pub enum PeerConnState {
/// This entry represents ourself (the local node) /// This entry represents ourself (the local node)
Ourself, Ourself,
@ -184,6 +185,8 @@ pub struct FullMeshPeeringStrategy {
next_ping_id: AtomicU64, next_ping_id: AtomicU64,
ping_endpoint: Arc<Endpoint<PingMessage, Self>>, ping_endpoint: Arc<Endpoint<PingMessage, Self>>,
peer_list_endpoint: Arc<Endpoint<PeerListMessage, Self>>, peer_list_endpoint: Arc<Endpoint<PeerListMessage, Self>>,
ping_timeout_millis: AtomicU64,
} }
impl FullMeshPeeringStrategy { impl FullMeshPeeringStrategy {
@ -220,6 +223,7 @@ impl FullMeshPeeringStrategy {
next_ping_id: AtomicU64::new(42), next_ping_id: AtomicU64::new(42),
ping_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/Ping".into()), ping_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/Ping".into()),
peer_list_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/PeerList".into()), peer_list_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/PeerList".into()),
ping_timeout_millis: DEFAULT_PING_TIMEOUT_MILLIS.into(),
}); });
strat.update_public_peer_list(&strat.known_hosts.read().unwrap()); strat.update_public_peer_list(&strat.known_hosts.read().unwrap());
@ -331,6 +335,12 @@ impl FullMeshPeeringStrategy {
self.public_peer_list.load_full() self.public_peer_list.load_full()
} }
/// Set the timeout for ping messages, in milliseconds
pub fn set_ping_timeout_millis(&self, timeout: u64) {
self.ping_timeout_millis
.store(timeout, atomic::Ordering::Relaxed);
}
// -- internal stuff -- // -- internal stuff --
fn update_public_peer_list(&self, known_hosts: &KnownHosts) { fn update_public_peer_list(&self, known_hosts: &KnownHosts) {
@ -372,6 +382,8 @@ impl FullMeshPeeringStrategy {
let peer_list_hash = self.known_hosts.read().unwrap().hash; let peer_list_hash = self.known_hosts.read().unwrap().hash;
let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed); let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed);
let ping_time = Instant::now(); let ping_time = Instant::now();
let ping_timeout =
Duration::from_millis(self.ping_timeout_millis.load(atomic::Ordering::Relaxed));
let ping_msg = PingMessage { let ping_msg = PingMessage {
id: ping_id, id: ping_id,
peer_list_hash, peer_list_hash,
@ -385,7 +397,7 @@ impl FullMeshPeeringStrategy {
); );
let ping_response = select! { let ping_response = select! {
r = self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH) => r, r = self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH) => r,
_ = tokio::time::sleep(PING_TIMEOUT) => Err(Error::Message("Ping timeout".into())), _ = tokio::time::sleep(ping_timeout) => Err(Error::Message("Ping timeout".into())),
}; };
match ping_response { match ping_response {

View file

@ -79,7 +79,7 @@ impl ServerConn {
debug!( debug!(
"Handshake complete (server) with {}@{}", "Handshake complete (server) with {}@{}",
hex::encode(&peer_id), hex::encode(peer_id),
remote_addr remote_addr
); );

View file

@ -1,5 +1,4 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::net::ToSocketAddrs;
use log::info; use log::info;
use serde::Serialize; use serde::Serialize;
@ -18,9 +17,7 @@ where
T: Serialize + ?Sized, T: Serialize + ?Sized,
{ {
let mut wr = Vec::with_capacity(128); let mut wr = Vec::with_capacity(128);
let mut se = rmp_serde::Serializer::new(&mut wr) let mut se = rmp_serde::Serializer::new(&mut wr).with_struct_map();
.with_struct_map()
.with_string_variants();
val.serialize(&mut se)?; val.serialize(&mut se)?;
Ok(wr) Ok(wr)
} }
@ -63,7 +60,7 @@ pub fn watch_ctrl_c() -> watch::Receiver<bool> {
pub fn parse_peer_addr(peer: &str) -> Option<(NodeID, SocketAddr)> { pub fn parse_peer_addr(peer: &str) -> Option<(NodeID, SocketAddr)> {
let delim = peer.find('@')?; let delim = peer.find('@')?;
let (key, ip) = peer.split_at(delim); let (key, ip) = peer.split_at(delim);
let pubkey = NodeID::from_slice(&hex::decode(&key).ok()?)?; let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?;
let ip = ip[1..].parse::<SocketAddr>().ok()?; let ip = ip[1..].parse::<SocketAddr>().ok()?;
Some((pubkey, ip)) Some((pubkey, ip))
} }
@ -71,12 +68,29 @@ pub fn parse_peer_addr(peer: &str) -> Option<(NodeID, SocketAddr)> {
/// Parse and resolve a peer's address including public key, written in the format: /// Parse and resolve a peer's address including public key, written in the format:
/// `<public key hex>@<ip or hostname>:<port>` /// `<public key hex>@<ip or hostname>:<port>`
pub fn parse_and_resolve_peer_addr(peer: &str) -> Option<(NodeID, Vec<SocketAddr>)> { pub fn parse_and_resolve_peer_addr(peer: &str) -> Option<(NodeID, Vec<SocketAddr>)> {
use std::net::ToSocketAddrs;
let delim = peer.find('@')?; let delim = peer.find('@')?;
let (key, host) = peer.split_at(delim); let (key, host) = peer.split_at(delim);
let pubkey = NodeID::from_slice(&hex::decode(&key).ok()?)?; let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?;
let hosts = host[1..].to_socket_addrs().ok()?.collect::<Vec<_>>(); let hosts = host[1..].to_socket_addrs().ok()?.collect::<Vec<_>>();
if hosts.is_empty() { if hosts.is_empty() {
return None; return None;
} }
Some((pubkey, hosts)) Some((pubkey, hosts))
} }
/// async version of parse_and_resolve_peer_addr
pub async fn parse_and_resolve_peer_addr_async(peer: &str) -> Option<(NodeID, Vec<SocketAddr>)> {
let delim = peer.find('@')?;
let (key, host) = peer.split_at(delim);
let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?;
let hosts = tokio::net::lookup_host(&host[1..])
.await
.ok()?
.collect::<Vec<_>>();
if hosts.is_empty() {
return None;
}
Some((pubkey, hosts))
}

1
target Symbolic link
View file

@ -0,0 +1 @@
/home/lx.nobackup/rust/netapp.target/