Compare commits
8 commits
stream-bod
...
main
Author | SHA1 | Date | |
---|---|---|---|
170ddbfac4 | |||
2484d7654a | |||
b7beb15492 | |||
34aade6ce9 | |||
6df6411b72 | |||
e4c0be848d | |||
1a413eef97 | |||
8ac109e3a8 |
11 changed files with 108 additions and 68 deletions
36
.drone.yml
36
.drone.yml
|
@ -1,28 +1,17 @@
|
|||
---
|
||||
kind: pipeline
|
||||
name: default
|
||||
|
||||
workspace:
|
||||
base: /drone
|
||||
|
||||
clone:
|
||||
disable: true
|
||||
|
||||
steps:
|
||||
- name: clone
|
||||
image: alpine/git
|
||||
commands:
|
||||
- mkdir -p cargo
|
||||
- git clone $DRONE_GIT_HTTP_URL
|
||||
- cd netapp
|
||||
- git checkout $DRONE_COMMIT
|
||||
|
||||
- name: style
|
||||
image: rust:1.58-buster
|
||||
environment:
|
||||
CARGO_HOME: /drone/cargo
|
||||
volumes:
|
||||
- name: cargo
|
||||
path: /drone/cargo
|
||||
commands:
|
||||
- rustup component add rustfmt clippy
|
||||
- cd netapp
|
||||
- cargo fmt -- --check
|
||||
- cargo clippy --all-features -- --deny warnings
|
||||
- cargo clippy --example fullmesh -- --deny warnings
|
||||
|
@ -32,11 +21,13 @@ steps:
|
|||
image: rust:1.58-buster
|
||||
environment:
|
||||
CARGO_HOME: /drone/cargo
|
||||
volumes:
|
||||
- name: cargo
|
||||
path: /drone/cargo
|
||||
commands:
|
||||
- apt-get update
|
||||
- apt-get install --yes libsodium-dev
|
||||
- cargo install -f cargo-all-features
|
||||
- cd netapp
|
||||
- cargo build-all-features
|
||||
- cargo build --example fullmesh
|
||||
- cargo build --example basalt --features "basalt"
|
||||
|
@ -45,8 +36,19 @@ steps:
|
|||
image: rust:1.58-buster
|
||||
environment:
|
||||
CARGO_HOME: /drone/cargo
|
||||
volumes:
|
||||
- name: cargo
|
||||
path: /drone/cargo
|
||||
commands:
|
||||
- apt-get update
|
||||
- apt-get install --yes libsodium-dev
|
||||
- cd netapp
|
||||
- cargo test --all-features -- --test-threads 1
|
||||
|
||||
volumes:
|
||||
- name: cargo
|
||||
temp: {}
|
||||
---
|
||||
kind: signature
|
||||
hmac: f0d1a9e8d85a22c1d9084b4d90c9930be9700da52284f1875ece996cc52a6ce9
|
||||
|
||||
...
|
||||
|
|
6
Cargo.lock
generated
6
Cargo.lock
generated
|
@ -428,7 +428,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "netapp"
|
||||
version = "0.5.0"
|
||||
version = "0.10.0"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"async-trait",
|
||||
|
@ -677,9 +677,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "rmp-serde"
|
||||
version = "0.15.5"
|
||||
version = "1.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "723ecff9ad04f4ad92fe1c8ca6c20d2196d9286e9c60727c4cb5511629260e9d"
|
||||
checksum = "bffea85eea980d8a74453e5d02a8d93028f3c34725de143085a844ebe953258a"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"rmp",
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "netapp"
|
||||
version = "0.5.0"
|
||||
version = "0.10.0"
|
||||
authors = ["Alex Auvolat <alex@adnab.me>"]
|
||||
edition = "2018"
|
||||
license-file = "LICENSE"
|
||||
|
@ -27,7 +27,7 @@ tokio-util = { version = "0.7", default-features = false, features = ["compat",
|
|||
tokio-stream = "0.1.7"
|
||||
|
||||
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
||||
rmp-serde = "0.15"
|
||||
rmp-serde = "1.1"
|
||||
hex = "0.4.2"
|
||||
|
||||
rand = { version = "0.8" }
|
||||
|
|
|
@ -72,7 +72,7 @@ async fn main() {
|
|||
};
|
||||
|
||||
info!("Node private key: {}", hex::encode(&privkey));
|
||||
info!("Node public key: {}", hex::encode(&privkey.public_key()));
|
||||
info!("Node public key: {}", hex::encode(privkey.public_key()));
|
||||
|
||||
let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
|
||||
let listen_addr: SocketAddr = opt.listen_addr.parse().unwrap();
|
||||
|
@ -94,7 +94,7 @@ async fn main() {
|
|||
|
||||
info!("Add more peers to this mesh by running: fullmesh -n {} -l 127.0.0.1:$((1000 + $RANDOM)) -b {}@{}",
|
||||
hex::encode(&netid),
|
||||
hex::encode(&privkey.public_key()),
|
||||
hex::encode(privkey.public_key()),
|
||||
listen_addr);
|
||||
|
||||
let watch_cancel = netapp::util::watch_ctrl_c();
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use std::cmp::Ordering;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use bytes::BytesMut;
|
||||
|
@ -44,7 +45,7 @@ impl BytesBuf {
|
|||
|
||||
/// Takes the whole content of the buffer and returns it as a single Bytes unit
|
||||
pub fn take_all(&mut self) -> Bytes {
|
||||
if self.buf.len() == 0 {
|
||||
if self.buf.is_empty() {
|
||||
Bytes::new()
|
||||
} else if self.buf.len() == 1 {
|
||||
self.buf_len = 0;
|
||||
|
@ -82,31 +83,35 @@ impl BytesBuf {
|
|||
fn take_exact_ok(&mut self, len: usize) -> Bytes {
|
||||
assert!(len <= self.buf_len);
|
||||
let front = self.buf.pop_front().unwrap();
|
||||
if front.len() > len {
|
||||
self.buf.push_front(front.slice(len..));
|
||||
self.buf_len -= len;
|
||||
front.slice(..len)
|
||||
} else if front.len() == len {
|
||||
self.buf_len -= len;
|
||||
front
|
||||
} else {
|
||||
let mut ret = BytesMut::with_capacity(len);
|
||||
ret.extend_from_slice(&front[..]);
|
||||
self.buf_len -= front.len();
|
||||
while ret.len() < len {
|
||||
let front = self.buf.pop_front().unwrap();
|
||||
if front.len() > len - ret.len() {
|
||||
let take = len - ret.len();
|
||||
ret.extend_from_slice(&front[..take]);
|
||||
self.buf.push_front(front.slice(take..));
|
||||
self.buf_len -= take;
|
||||
break;
|
||||
} else {
|
||||
ret.extend_from_slice(&front[..]);
|
||||
self.buf_len -= front.len();
|
||||
}
|
||||
match front.len().cmp(&len) {
|
||||
Ordering::Greater => {
|
||||
self.buf.push_front(front.slice(len..));
|
||||
self.buf_len -= len;
|
||||
front.slice(..len)
|
||||
}
|
||||
Ordering::Equal => {
|
||||
self.buf_len -= len;
|
||||
front
|
||||
}
|
||||
Ordering::Less => {
|
||||
let mut ret = BytesMut::with_capacity(len);
|
||||
ret.extend_from_slice(&front[..]);
|
||||
self.buf_len -= front.len();
|
||||
while ret.len() < len {
|
||||
let front = self.buf.pop_front().unwrap();
|
||||
if front.len() > len - ret.len() {
|
||||
let take = len - ret.len();
|
||||
ret.extend_from_slice(&front[..take]);
|
||||
self.buf.push_front(front.slice(take..));
|
||||
self.buf_len -= take;
|
||||
break;
|
||||
} else {
|
||||
ret.extend_from_slice(&front[..]);
|
||||
self.buf_len -= front.len();
|
||||
}
|
||||
}
|
||||
ret.freeze()
|
||||
}
|
||||
ret.freeze()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -116,6 +121,12 @@ impl BytesBuf {
|
|||
}
|
||||
}
|
||||
|
||||
impl Default for BytesBuf {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Bytes> for BytesBuf {
|
||||
fn from(b: Bytes) -> BytesBuf {
|
||||
let mut ret = BytesBuf::new();
|
||||
|
|
|
@ -65,7 +65,7 @@ impl ClientConn {
|
|||
|
||||
debug!(
|
||||
"Handshake complete (client) with {}@{}",
|
||||
hex::encode(&peer_id),
|
||||
hex::encode(peer_id),
|
||||
remote_addr
|
||||
);
|
||||
|
||||
|
@ -250,7 +250,7 @@ impl CancelOnDrop {
|
|||
fn for_stream(self, stream: ByteStream) -> CancelOnDropStream {
|
||||
CancelOnDropStream {
|
||||
cancel: Some(self),
|
||||
stream: stream,
|
||||
stream,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,11 +54,11 @@ pub struct OrderTagStream(u64);
|
|||
|
||||
impl OrderTag {
|
||||
/// Create a new stream from which to generate order tags. Example:
|
||||
/// ```ignore
|
||||
/// let stream = OrderTag.stream();
|
||||
/// let tag_1 = stream.order(1);
|
||||
/// let tag_2 = stream.order(2);
|
||||
/// ```
|
||||
/// ```ignore
|
||||
/// let stream = OrderTag.stream();
|
||||
/// let tag_1 = stream.order(1);
|
||||
/// let tag_2 = stream.order(2);
|
||||
/// ```
|
||||
pub fn stream() -> OrderTagStream {
|
||||
OrderTagStream(thread_rng().gen())
|
||||
}
|
||||
|
@ -155,7 +155,7 @@ impl<M: Message> Req<M> {
|
|||
}
|
||||
|
||||
pub(crate) fn from_enc(enc: ReqEnc) -> Result<Self, rmp_serde::decode::Error> {
|
||||
let msg = rmp_serde::decode::from_read_ref(&enc.msg)?;
|
||||
let msg = rmp_serde::decode::from_slice(&enc.msg)?;
|
||||
Ok(Req {
|
||||
msg: Arc::new(msg),
|
||||
msg_ser: Some(enc.msg),
|
||||
|
@ -316,7 +316,7 @@ impl<M: Message> Resp<M> {
|
|||
}
|
||||
|
||||
pub(crate) fn from_enc(enc: RespEnc) -> Result<Self, Error> {
|
||||
let msg = rmp_serde::decode::from_read_ref(&enc.msg)?;
|
||||
let msg = rmp_serde::decode::from_slice(&enc.msg)?;
|
||||
Ok(Self {
|
||||
_phantom: Default::default(),
|
||||
msg,
|
||||
|
|
|
@ -25,9 +25,10 @@ const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30);
|
|||
const CONN_MAX_RETRIES: usize = 10;
|
||||
const PING_INTERVAL: Duration = Duration::from_secs(15);
|
||||
const LOOP_DELAY: Duration = Duration::from_secs(1);
|
||||
const PING_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const FAILED_PING_THRESHOLD: usize = 4;
|
||||
|
||||
const DEFAULT_PING_TIMEOUT_MILLIS: u64 = 10_000;
|
||||
|
||||
// -- Protocol messages --
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
@ -112,7 +113,7 @@ impl PeerInfo {
|
|||
/// PeerConnState: possible states for our tentative connections to given peer
|
||||
/// This structure is only interested in recording connection info for outgoing
|
||||
/// TCP connections
|
||||
#[derive(Copy, Clone, Debug, PartialEq)]
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||
pub enum PeerConnState {
|
||||
/// This entry represents ourself (the local node)
|
||||
Ourself,
|
||||
|
@ -184,6 +185,8 @@ pub struct FullMeshPeeringStrategy {
|
|||
next_ping_id: AtomicU64,
|
||||
ping_endpoint: Arc<Endpoint<PingMessage, Self>>,
|
||||
peer_list_endpoint: Arc<Endpoint<PeerListMessage, Self>>,
|
||||
|
||||
ping_timeout_millis: AtomicU64,
|
||||
}
|
||||
|
||||
impl FullMeshPeeringStrategy {
|
||||
|
@ -220,6 +223,7 @@ impl FullMeshPeeringStrategy {
|
|||
next_ping_id: AtomicU64::new(42),
|
||||
ping_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/Ping".into()),
|
||||
peer_list_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/PeerList".into()),
|
||||
ping_timeout_millis: DEFAULT_PING_TIMEOUT_MILLIS.into(),
|
||||
});
|
||||
|
||||
strat.update_public_peer_list(&strat.known_hosts.read().unwrap());
|
||||
|
@ -331,6 +335,12 @@ impl FullMeshPeeringStrategy {
|
|||
self.public_peer_list.load_full()
|
||||
}
|
||||
|
||||
/// Set the timeout for ping messages, in milliseconds
|
||||
pub fn set_ping_timeout_millis(&self, timeout: u64) {
|
||||
self.ping_timeout_millis
|
||||
.store(timeout, atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
// -- internal stuff --
|
||||
|
||||
fn update_public_peer_list(&self, known_hosts: &KnownHosts) {
|
||||
|
@ -372,6 +382,8 @@ impl FullMeshPeeringStrategy {
|
|||
let peer_list_hash = self.known_hosts.read().unwrap().hash;
|
||||
let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed);
|
||||
let ping_time = Instant::now();
|
||||
let ping_timeout =
|
||||
Duration::from_millis(self.ping_timeout_millis.load(atomic::Ordering::Relaxed));
|
||||
let ping_msg = PingMessage {
|
||||
id: ping_id,
|
||||
peer_list_hash,
|
||||
|
@ -385,7 +397,7 @@ impl FullMeshPeeringStrategy {
|
|||
);
|
||||
let ping_response = select! {
|
||||
r = self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH) => r,
|
||||
_ = tokio::time::sleep(PING_TIMEOUT) => Err(Error::Message("Ping timeout".into())),
|
||||
_ = tokio::time::sleep(ping_timeout) => Err(Error::Message("Ping timeout".into())),
|
||||
};
|
||||
|
||||
match ping_response {
|
||||
|
|
|
@ -79,7 +79,7 @@ impl ServerConn {
|
|||
|
||||
debug!(
|
||||
"Handshake complete (server) with {}@{}",
|
||||
hex::encode(&peer_id),
|
||||
hex::encode(peer_id),
|
||||
remote_addr
|
||||
);
|
||||
|
||||
|
|
26
src/util.rs
26
src/util.rs
|
@ -1,5 +1,4 @@
|
|||
use std::net::SocketAddr;
|
||||
use std::net::ToSocketAddrs;
|
||||
|
||||
use log::info;
|
||||
use serde::Serialize;
|
||||
|
@ -18,9 +17,7 @@ where
|
|||
T: Serialize + ?Sized,
|
||||
{
|
||||
let mut wr = Vec::with_capacity(128);
|
||||
let mut se = rmp_serde::Serializer::new(&mut wr)
|
||||
.with_struct_map()
|
||||
.with_string_variants();
|
||||
let mut se = rmp_serde::Serializer::new(&mut wr).with_struct_map();
|
||||
val.serialize(&mut se)?;
|
||||
Ok(wr)
|
||||
}
|
||||
|
@ -63,7 +60,7 @@ pub fn watch_ctrl_c() -> watch::Receiver<bool> {
|
|||
pub fn parse_peer_addr(peer: &str) -> Option<(NodeID, SocketAddr)> {
|
||||
let delim = peer.find('@')?;
|
||||
let (key, ip) = peer.split_at(delim);
|
||||
let pubkey = NodeID::from_slice(&hex::decode(&key).ok()?)?;
|
||||
let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?;
|
||||
let ip = ip[1..].parse::<SocketAddr>().ok()?;
|
||||
Some((pubkey, ip))
|
||||
}
|
||||
|
@ -71,12 +68,29 @@ pub fn parse_peer_addr(peer: &str) -> Option<(NodeID, SocketAddr)> {
|
|||
/// Parse and resolve a peer's address including public key, written in the format:
|
||||
/// `<public key hex>@<ip or hostname>:<port>`
|
||||
pub fn parse_and_resolve_peer_addr(peer: &str) -> Option<(NodeID, Vec<SocketAddr>)> {
|
||||
use std::net::ToSocketAddrs;
|
||||
|
||||
let delim = peer.find('@')?;
|
||||
let (key, host) = peer.split_at(delim);
|
||||
let pubkey = NodeID::from_slice(&hex::decode(&key).ok()?)?;
|
||||
let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?;
|
||||
let hosts = host[1..].to_socket_addrs().ok()?.collect::<Vec<_>>();
|
||||
if hosts.is_empty() {
|
||||
return None;
|
||||
}
|
||||
Some((pubkey, hosts))
|
||||
}
|
||||
|
||||
/// async version of parse_and_resolve_peer_addr
|
||||
pub async fn parse_and_resolve_peer_addr_async(peer: &str) -> Option<(NodeID, Vec<SocketAddr>)> {
|
||||
let delim = peer.find('@')?;
|
||||
let (key, host) = peer.split_at(delim);
|
||||
let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?;
|
||||
let hosts = tokio::net::lookup_host(&host[1..])
|
||||
.await
|
||||
.ok()?
|
||||
.collect::<Vec<_>>();
|
||||
if hosts.is_empty() {
|
||||
return None;
|
||||
}
|
||||
Some((pubkey, hosts))
|
||||
}
|
||||
|
|
1
target
Symbolic link
1
target
Symbolic link
|
@ -0,0 +1 @@
|
|||
/home/lx.nobackup/rust/netapp.target/
|
Loading…
Reference in a new issue