Compare commits
5 commits
Author | SHA1 | Date | |
---|---|---|---|
170ddbfac4 | |||
2484d7654a | |||
b7beb15492 | |||
34aade6ce9 | |||
6df6411b72 |
11 changed files with 78 additions and 66 deletions
36
.drone.yml
36
.drone.yml
|
@ -1,28 +1,17 @@
|
||||||
|
---
|
||||||
kind: pipeline
|
kind: pipeline
|
||||||
name: default
|
name: default
|
||||||
|
|
||||||
workspace:
|
|
||||||
base: /drone
|
|
||||||
|
|
||||||
clone:
|
|
||||||
disable: true
|
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: clone
|
|
||||||
image: alpine/git
|
|
||||||
commands:
|
|
||||||
- mkdir -p cargo
|
|
||||||
- git clone $DRONE_GIT_HTTP_URL
|
|
||||||
- cd netapp
|
|
||||||
- git checkout $DRONE_COMMIT
|
|
||||||
|
|
||||||
- name: style
|
- name: style
|
||||||
image: rust:1.58-buster
|
image: rust:1.58-buster
|
||||||
environment:
|
environment:
|
||||||
CARGO_HOME: /drone/cargo
|
CARGO_HOME: /drone/cargo
|
||||||
|
volumes:
|
||||||
|
- name: cargo
|
||||||
|
path: /drone/cargo
|
||||||
commands:
|
commands:
|
||||||
- rustup component add rustfmt clippy
|
- rustup component add rustfmt clippy
|
||||||
- cd netapp
|
|
||||||
- cargo fmt -- --check
|
- cargo fmt -- --check
|
||||||
- cargo clippy --all-features -- --deny warnings
|
- cargo clippy --all-features -- --deny warnings
|
||||||
- cargo clippy --example fullmesh -- --deny warnings
|
- cargo clippy --example fullmesh -- --deny warnings
|
||||||
|
@ -32,11 +21,13 @@ steps:
|
||||||
image: rust:1.58-buster
|
image: rust:1.58-buster
|
||||||
environment:
|
environment:
|
||||||
CARGO_HOME: /drone/cargo
|
CARGO_HOME: /drone/cargo
|
||||||
|
volumes:
|
||||||
|
- name: cargo
|
||||||
|
path: /drone/cargo
|
||||||
commands:
|
commands:
|
||||||
- apt-get update
|
- apt-get update
|
||||||
- apt-get install --yes libsodium-dev
|
- apt-get install --yes libsodium-dev
|
||||||
- cargo install -f cargo-all-features
|
- cargo install -f cargo-all-features
|
||||||
- cd netapp
|
|
||||||
- cargo build-all-features
|
- cargo build-all-features
|
||||||
- cargo build --example fullmesh
|
- cargo build --example fullmesh
|
||||||
- cargo build --example basalt --features "basalt"
|
- cargo build --example basalt --features "basalt"
|
||||||
|
@ -45,8 +36,19 @@ steps:
|
||||||
image: rust:1.58-buster
|
image: rust:1.58-buster
|
||||||
environment:
|
environment:
|
||||||
CARGO_HOME: /drone/cargo
|
CARGO_HOME: /drone/cargo
|
||||||
|
volumes:
|
||||||
|
- name: cargo
|
||||||
|
path: /drone/cargo
|
||||||
commands:
|
commands:
|
||||||
- apt-get update
|
- apt-get update
|
||||||
- apt-get install --yes libsodium-dev
|
- apt-get install --yes libsodium-dev
|
||||||
- cd netapp
|
|
||||||
- cargo test --all-features -- --test-threads 1
|
- cargo test --all-features -- --test-threads 1
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
- name: cargo
|
||||||
|
temp: {}
|
||||||
|
---
|
||||||
|
kind: signature
|
||||||
|
hmac: f0d1a9e8d85a22c1d9084b4d90c9930be9700da52284f1875ece996cc52a6ce9
|
||||||
|
|
||||||
|
...
|
||||||
|
|
6
Cargo.lock
generated
6
Cargo.lock
generated
|
@ -428,7 +428,7 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "netapp"
|
name = "netapp"
|
||||||
version = "0.5.2"
|
version = "0.10.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
@ -677,9 +677,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rmp-serde"
|
name = "rmp-serde"
|
||||||
version = "0.15.5"
|
version = "1.1.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "723ecff9ad04f4ad92fe1c8ca6c20d2196d9286e9c60727c4cb5511629260e9d"
|
checksum = "bffea85eea980d8a74453e5d02a8d93028f3c34725de143085a844ebe953258a"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"byteorder",
|
"byteorder",
|
||||||
"rmp",
|
"rmp",
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "netapp"
|
name = "netapp"
|
||||||
version = "0.5.2"
|
version = "0.10.0"
|
||||||
authors = ["Alex Auvolat <alex@adnab.me>"]
|
authors = ["Alex Auvolat <alex@adnab.me>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
license-file = "LICENSE"
|
license-file = "LICENSE"
|
||||||
|
@ -27,7 +27,7 @@ tokio-util = { version = "0.7", default-features = false, features = ["compat",
|
||||||
tokio-stream = "0.1.7"
|
tokio-stream = "0.1.7"
|
||||||
|
|
||||||
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
||||||
rmp-serde = "0.15"
|
rmp-serde = "1.1"
|
||||||
hex = "0.4.2"
|
hex = "0.4.2"
|
||||||
|
|
||||||
rand = { version = "0.8" }
|
rand = { version = "0.8" }
|
||||||
|
|
|
@ -72,7 +72,7 @@ async fn main() {
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("Node private key: {}", hex::encode(&privkey));
|
info!("Node private key: {}", hex::encode(&privkey));
|
||||||
info!("Node public key: {}", hex::encode(&privkey.public_key()));
|
info!("Node public key: {}", hex::encode(privkey.public_key()));
|
||||||
|
|
||||||
let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
|
let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
|
||||||
let listen_addr: SocketAddr = opt.listen_addr.parse().unwrap();
|
let listen_addr: SocketAddr = opt.listen_addr.parse().unwrap();
|
||||||
|
@ -94,7 +94,7 @@ async fn main() {
|
||||||
|
|
||||||
info!("Add more peers to this mesh by running: fullmesh -n {} -l 127.0.0.1:$((1000 + $RANDOM)) -b {}@{}",
|
info!("Add more peers to this mesh by running: fullmesh -n {} -l 127.0.0.1:$((1000 + $RANDOM)) -b {}@{}",
|
||||||
hex::encode(&netid),
|
hex::encode(&netid),
|
||||||
hex::encode(&privkey.public_key()),
|
hex::encode(privkey.public_key()),
|
||||||
listen_addr);
|
listen_addr);
|
||||||
|
|
||||||
let watch_cancel = netapp::util::watch_ctrl_c();
|
let watch_cancel = netapp::util::watch_ctrl_c();
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use std::cmp::Ordering;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
|
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
|
@ -44,7 +45,7 @@ impl BytesBuf {
|
||||||
|
|
||||||
/// Takes the whole content of the buffer and returns it as a single Bytes unit
|
/// Takes the whole content of the buffer and returns it as a single Bytes unit
|
||||||
pub fn take_all(&mut self) -> Bytes {
|
pub fn take_all(&mut self) -> Bytes {
|
||||||
if self.buf.len() == 0 {
|
if self.buf.is_empty() {
|
||||||
Bytes::new()
|
Bytes::new()
|
||||||
} else if self.buf.len() == 1 {
|
} else if self.buf.len() == 1 {
|
||||||
self.buf_len = 0;
|
self.buf_len = 0;
|
||||||
|
@ -82,31 +83,35 @@ impl BytesBuf {
|
||||||
fn take_exact_ok(&mut self, len: usize) -> Bytes {
|
fn take_exact_ok(&mut self, len: usize) -> Bytes {
|
||||||
assert!(len <= self.buf_len);
|
assert!(len <= self.buf_len);
|
||||||
let front = self.buf.pop_front().unwrap();
|
let front = self.buf.pop_front().unwrap();
|
||||||
if front.len() > len {
|
match front.len().cmp(&len) {
|
||||||
self.buf.push_front(front.slice(len..));
|
Ordering::Greater => {
|
||||||
self.buf_len -= len;
|
self.buf.push_front(front.slice(len..));
|
||||||
front.slice(..len)
|
self.buf_len -= len;
|
||||||
} else if front.len() == len {
|
front.slice(..len)
|
||||||
self.buf_len -= len;
|
}
|
||||||
front
|
Ordering::Equal => {
|
||||||
} else {
|
self.buf_len -= len;
|
||||||
let mut ret = BytesMut::with_capacity(len);
|
front
|
||||||
ret.extend_from_slice(&front[..]);
|
}
|
||||||
self.buf_len -= front.len();
|
Ordering::Less => {
|
||||||
while ret.len() < len {
|
let mut ret = BytesMut::with_capacity(len);
|
||||||
let front = self.buf.pop_front().unwrap();
|
ret.extend_from_slice(&front[..]);
|
||||||
if front.len() > len - ret.len() {
|
self.buf_len -= front.len();
|
||||||
let take = len - ret.len();
|
while ret.len() < len {
|
||||||
ret.extend_from_slice(&front[..take]);
|
let front = self.buf.pop_front().unwrap();
|
||||||
self.buf.push_front(front.slice(take..));
|
if front.len() > len - ret.len() {
|
||||||
self.buf_len -= take;
|
let take = len - ret.len();
|
||||||
break;
|
ret.extend_from_slice(&front[..take]);
|
||||||
} else {
|
self.buf.push_front(front.slice(take..));
|
||||||
ret.extend_from_slice(&front[..]);
|
self.buf_len -= take;
|
||||||
self.buf_len -= front.len();
|
break;
|
||||||
}
|
} else {
|
||||||
|
ret.extend_from_slice(&front[..]);
|
||||||
|
self.buf_len -= front.len();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ret.freeze()
|
||||||
}
|
}
|
||||||
ret.freeze()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,6 +121,12 @@ impl BytesBuf {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Default for BytesBuf {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<Bytes> for BytesBuf {
|
impl From<Bytes> for BytesBuf {
|
||||||
fn from(b: Bytes) -> BytesBuf {
|
fn from(b: Bytes) -> BytesBuf {
|
||||||
let mut ret = BytesBuf::new();
|
let mut ret = BytesBuf::new();
|
||||||
|
|
|
@ -65,7 +65,7 @@ impl ClientConn {
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"Handshake complete (client) with {}@{}",
|
"Handshake complete (client) with {}@{}",
|
||||||
hex::encode(&peer_id),
|
hex::encode(peer_id),
|
||||||
remote_addr
|
remote_addr
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -250,7 +250,7 @@ impl CancelOnDrop {
|
||||||
fn for_stream(self, stream: ByteStream) -> CancelOnDropStream {
|
fn for_stream(self, stream: ByteStream) -> CancelOnDropStream {
|
||||||
CancelOnDropStream {
|
CancelOnDropStream {
|
||||||
cancel: Some(self),
|
cancel: Some(self),
|
||||||
stream: stream,
|
stream,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,11 +54,11 @@ pub struct OrderTagStream(u64);
|
||||||
|
|
||||||
impl OrderTag {
|
impl OrderTag {
|
||||||
/// Create a new stream from which to generate order tags. Example:
|
/// Create a new stream from which to generate order tags. Example:
|
||||||
/// ```ignore
|
/// ```ignore
|
||||||
/// let stream = OrderTag.stream();
|
/// let stream = OrderTag.stream();
|
||||||
/// let tag_1 = stream.order(1);
|
/// let tag_1 = stream.order(1);
|
||||||
/// let tag_2 = stream.order(2);
|
/// let tag_2 = stream.order(2);
|
||||||
/// ```
|
/// ```
|
||||||
pub fn stream() -> OrderTagStream {
|
pub fn stream() -> OrderTagStream {
|
||||||
OrderTagStream(thread_rng().gen())
|
OrderTagStream(thread_rng().gen())
|
||||||
}
|
}
|
||||||
|
@ -155,7 +155,7 @@ impl<M: Message> Req<M> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn from_enc(enc: ReqEnc) -> Result<Self, rmp_serde::decode::Error> {
|
pub(crate) fn from_enc(enc: ReqEnc) -> Result<Self, rmp_serde::decode::Error> {
|
||||||
let msg = rmp_serde::decode::from_read_ref(&enc.msg)?;
|
let msg = rmp_serde::decode::from_slice(&enc.msg)?;
|
||||||
Ok(Req {
|
Ok(Req {
|
||||||
msg: Arc::new(msg),
|
msg: Arc::new(msg),
|
||||||
msg_ser: Some(enc.msg),
|
msg_ser: Some(enc.msg),
|
||||||
|
@ -316,7 +316,7 @@ impl<M: Message> Resp<M> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn from_enc(enc: RespEnc) -> Result<Self, Error> {
|
pub(crate) fn from_enc(enc: RespEnc) -> Result<Self, Error> {
|
||||||
let msg = rmp_serde::decode::from_read_ref(&enc.msg)?;
|
let msg = rmp_serde::decode::from_slice(&enc.msg)?;
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
_phantom: Default::default(),
|
_phantom: Default::default(),
|
||||||
msg,
|
msg,
|
||||||
|
|
|
@ -113,7 +113,7 @@ impl PeerInfo {
|
||||||
/// PeerConnState: possible states for our tentative connections to given peer
|
/// PeerConnState: possible states for our tentative connections to given peer
|
||||||
/// This structure is only interested in recording connection info for outgoing
|
/// This structure is only interested in recording connection info for outgoing
|
||||||
/// TCP connections
|
/// TCP connections
|
||||||
#[derive(Copy, Clone, Debug, PartialEq)]
|
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||||
pub enum PeerConnState {
|
pub enum PeerConnState {
|
||||||
/// This entry represents ourself (the local node)
|
/// This entry represents ourself (the local node)
|
||||||
Ourself,
|
Ourself,
|
||||||
|
|
|
@ -79,7 +79,7 @@ impl ServerConn {
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"Handshake complete (server) with {}@{}",
|
"Handshake complete (server) with {}@{}",
|
||||||
hex::encode(&peer_id),
|
hex::encode(peer_id),
|
||||||
remote_addr
|
remote_addr
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
10
src/util.rs
10
src/util.rs
|
@ -17,9 +17,7 @@ where
|
||||||
T: Serialize + ?Sized,
|
T: Serialize + ?Sized,
|
||||||
{
|
{
|
||||||
let mut wr = Vec::with_capacity(128);
|
let mut wr = Vec::with_capacity(128);
|
||||||
let mut se = rmp_serde::Serializer::new(&mut wr)
|
let mut se = rmp_serde::Serializer::new(&mut wr).with_struct_map();
|
||||||
.with_struct_map()
|
|
||||||
.with_string_variants();
|
|
||||||
val.serialize(&mut se)?;
|
val.serialize(&mut se)?;
|
||||||
Ok(wr)
|
Ok(wr)
|
||||||
}
|
}
|
||||||
|
@ -62,7 +60,7 @@ pub fn watch_ctrl_c() -> watch::Receiver<bool> {
|
||||||
pub fn parse_peer_addr(peer: &str) -> Option<(NodeID, SocketAddr)> {
|
pub fn parse_peer_addr(peer: &str) -> Option<(NodeID, SocketAddr)> {
|
||||||
let delim = peer.find('@')?;
|
let delim = peer.find('@')?;
|
||||||
let (key, ip) = peer.split_at(delim);
|
let (key, ip) = peer.split_at(delim);
|
||||||
let pubkey = NodeID::from_slice(&hex::decode(&key).ok()?)?;
|
let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?;
|
||||||
let ip = ip[1..].parse::<SocketAddr>().ok()?;
|
let ip = ip[1..].parse::<SocketAddr>().ok()?;
|
||||||
Some((pubkey, ip))
|
Some((pubkey, ip))
|
||||||
}
|
}
|
||||||
|
@ -74,7 +72,7 @@ pub fn parse_and_resolve_peer_addr(peer: &str) -> Option<(NodeID, Vec<SocketAddr
|
||||||
|
|
||||||
let delim = peer.find('@')?;
|
let delim = peer.find('@')?;
|
||||||
let (key, host) = peer.split_at(delim);
|
let (key, host) = peer.split_at(delim);
|
||||||
let pubkey = NodeID::from_slice(&hex::decode(&key).ok()?)?;
|
let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?;
|
||||||
let hosts = host[1..].to_socket_addrs().ok()?.collect::<Vec<_>>();
|
let hosts = host[1..].to_socket_addrs().ok()?.collect::<Vec<_>>();
|
||||||
if hosts.is_empty() {
|
if hosts.is_empty() {
|
||||||
return None;
|
return None;
|
||||||
|
@ -86,7 +84,7 @@ pub fn parse_and_resolve_peer_addr(peer: &str) -> Option<(NodeID, Vec<SocketAddr
|
||||||
pub async fn parse_and_resolve_peer_addr_async(peer: &str) -> Option<(NodeID, Vec<SocketAddr>)> {
|
pub async fn parse_and_resolve_peer_addr_async(peer: &str) -> Option<(NodeID, Vec<SocketAddr>)> {
|
||||||
let delim = peer.find('@')?;
|
let delim = peer.find('@')?;
|
||||||
let (key, host) = peer.split_at(delim);
|
let (key, host) = peer.split_at(delim);
|
||||||
let pubkey = NodeID::from_slice(&hex::decode(&key).ok()?)?;
|
let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?;
|
||||||
let hosts = tokio::net::lookup_host(&host[1..])
|
let hosts = tokio::net::lookup_host(&host[1..])
|
||||||
.await
|
.await
|
||||||
.ok()?
|
.ok()?
|
||||||
|
|
1
target
Symbolic link
1
target
Symbolic link
|
@ -0,0 +1 @@
|
||||||
|
/home/lx.nobackup/rust/netapp.target/
|
Loading…
Reference in a new issue