forked from lx/netapp
Implement version tag for application as well
This commit is contained in:
parent
96a3cc1e1f
commit
8858c94289
6 changed files with 46 additions and 28 deletions
|
@ -94,7 +94,7 @@ async fn main() {
|
||||||
info!("KYEV SK {}", hex::encode(&privkey));
|
info!("KYEV SK {}", hex::encode(&privkey));
|
||||||
info!("KYEV PK {}", hex::encode(&privkey.public_key()));
|
info!("KYEV PK {}", hex::encode(&privkey.public_key()));
|
||||||
|
|
||||||
let netapp = NetApp::new(netid, privkey);
|
let netapp = NetApp::new(0u64, netid, privkey);
|
||||||
|
|
||||||
let mut bootstrap_peers = vec![];
|
let mut bootstrap_peers = vec![];
|
||||||
for peer in opt.bootstrap_peers.iter() {
|
for peer in opt.bootstrap_peers.iter() {
|
||||||
|
|
|
@ -71,7 +71,7 @@ async fn main() {
|
||||||
info!("Node public address: {:?}", public_addr);
|
info!("Node public address: {:?}", public_addr);
|
||||||
info!("Node listen address: {}", listen_addr);
|
info!("Node listen address: {}", listen_addr);
|
||||||
|
|
||||||
let netapp = NetApp::new(netid.clone(), privkey.clone());
|
let netapp = NetApp::new(0u64, netid.clone(), privkey.clone());
|
||||||
|
|
||||||
let mut bootstrap_peers = vec![];
|
let mut bootstrap_peers = vec![];
|
||||||
for peer in opt.bootstrap_peers.iter() {
|
for peer in opt.bootstrap_peers.iter() {
|
||||||
|
|
|
@ -52,6 +52,7 @@ impl ClientConn {
|
||||||
let remote_addr = socket.peer_addr()?;
|
let remote_addr = socket.peer_addr()?;
|
||||||
let mut socket = socket.compat();
|
let mut socket = socket.compat();
|
||||||
|
|
||||||
|
// Do handshake to authenticate and prove our identity to server
|
||||||
let handshake = handshake_client(
|
let handshake = handshake_client(
|
||||||
&mut socket,
|
&mut socket,
|
||||||
netapp.netid.clone(),
|
netapp.netid.clone(),
|
||||||
|
@ -67,11 +68,25 @@ impl ClientConn {
|
||||||
remote_addr
|
remote_addr
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Create BoxStream layer that encodes content
|
||||||
let (read, write) = socket.split();
|
let (read, write) = socket.split();
|
||||||
|
let (mut read, write) =
|
||||||
let (read, write) =
|
|
||||||
BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();
|
BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();
|
||||||
|
|
||||||
|
// Before doing anything, receive version tag and
|
||||||
|
// check they are running the same version as us
|
||||||
|
let mut their_version_tag = VersionTag::default();
|
||||||
|
read.read_exact(&mut their_version_tag[..]).await?;
|
||||||
|
if their_version_tag != netapp.version_tag {
|
||||||
|
let msg = format!(
|
||||||
|
"Different netapp versions: {:?} (theirs) vs. {:?} (ours)",
|
||||||
|
their_version_tag, netapp.version_tag
|
||||||
|
);
|
||||||
|
error!("{}", msg);
|
||||||
|
return Err(Error::VersionMismatch(msg));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build and launch stuff that manages sending requests client-side
|
||||||
let (query_send, query_recv) = mpsc::unbounded_channel();
|
let (query_send, query_recv) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let (stop_recv_loop, stop_recv_loop_recv) = watch::channel(false);
|
let (stop_recv_loop, stop_recv_loop_recv) = watch::channel(false);
|
||||||
|
|
|
@ -24,6 +24,14 @@ use crate::proto::*;
|
||||||
use crate::server::*;
|
use crate::server::*;
|
||||||
use crate::util::*;
|
use crate::util::*;
|
||||||
|
|
||||||
|
/// Tag which is exchanged between client and server upon connection establishment
|
||||||
|
/// to check that they are running compatible versions of Netapp,
|
||||||
|
/// composed of 8 bytes for Netapp version and 8 bytes for client version
|
||||||
|
pub(crate) type VersionTag = [u8; 16];
|
||||||
|
|
||||||
|
/// Value of the Netapp version used in the version tag
|
||||||
|
pub(crate) const NETAPP_VERSION_TAG: u64 = 0x6e65746170700004; // netapp 0x0004
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
pub(crate) struct HelloMessage {
|
pub(crate) struct HelloMessage {
|
||||||
pub server_addr: Option<IpAddr>,
|
pub server_addr: Option<IpAddr>,
|
||||||
|
@ -48,6 +56,8 @@ type OnDisconnectHandler = Box<dyn Fn(NodeID, bool) + Send + Sync>;
|
||||||
pub struct NetApp {
|
pub struct NetApp {
|
||||||
listen_params: ArcSwapOption<ListenParams>,
|
listen_params: ArcSwapOption<ListenParams>,
|
||||||
|
|
||||||
|
/// Version tag, 8 bytes for netapp version, 8 bytes for app version
|
||||||
|
pub version_tag: VersionTag,
|
||||||
/// Network secret key
|
/// Network secret key
|
||||||
pub netid: auth::Key,
|
pub netid: auth::Key,
|
||||||
/// Our peer ID
|
/// Our peer ID
|
||||||
|
@ -76,10 +86,15 @@ impl NetApp {
|
||||||
/// using `.listen()`
|
/// using `.listen()`
|
||||||
///
|
///
|
||||||
/// Our Peer ID is the public key associated to the secret key given here.
|
/// Our Peer ID is the public key associated to the secret key given here.
|
||||||
pub fn new(netid: auth::Key, privkey: ed25519::SecretKey) -> Arc<Self> {
|
pub fn new(app_version_tag: u64, netid: auth::Key, privkey: ed25519::SecretKey) -> Arc<Self> {
|
||||||
|
let mut version_tag = [0u8; 16];
|
||||||
|
version_tag[0..8].copy_from_slice(&u64::to_be_bytes(NETAPP_VERSION_TAG)[..]);
|
||||||
|
version_tag[8..16].copy_from_slice(&u64::to_be_bytes(app_version_tag)[..]);
|
||||||
|
|
||||||
let id = privkey.public_key();
|
let id = privkey.public_key();
|
||||||
let netapp = Arc::new(Self {
|
let netapp = Arc::new(Self {
|
||||||
listen_params: ArcSwapOption::new(None),
|
listen_params: ArcSwapOption::new(None),
|
||||||
|
version_tag,
|
||||||
netid,
|
netid,
|
||||||
id,
|
id,
|
||||||
privkey,
|
privkey,
|
||||||
|
|
21
src/proto.rs
21
src/proto.rs
|
@ -1,7 +1,7 @@
|
||||||
use std::collections::{HashMap, VecDeque};
|
use std::collections::{HashMap, VecDeque};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use log::{error, trace};
|
use log::trace;
|
||||||
|
|
||||||
use futures::{AsyncReadExt, AsyncWriteExt};
|
use futures::{AsyncReadExt, AsyncWriteExt};
|
||||||
use kuska_handshake::async_std::BoxStreamWrite;
|
use kuska_handshake::async_std::BoxStreamWrite;
|
||||||
|
@ -12,10 +12,6 @@ use async_trait::async_trait;
|
||||||
|
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
|
|
||||||
/// Tag which is exchanged between client and server upon connection establishment
|
|
||||||
/// to check that they are running compatible versions of Netapp
|
|
||||||
pub const VERSION_TAG: [u8; 8] = [b'n', b'e', b't', b'a', b'p', b'p', 0x00, 0x04];
|
|
||||||
|
|
||||||
/// Priority of a request (click to read more about priorities).
|
/// Priority of a request (click to read more about priorities).
|
||||||
///
|
///
|
||||||
/// This priority value is used to priorize messages
|
/// This priority value is used to priorize messages
|
||||||
|
@ -118,10 +114,6 @@ pub(crate) trait SendLoop: Sync {
|
||||||
where
|
where
|
||||||
W: AsyncWriteExt + Unpin + Send + Sync,
|
W: AsyncWriteExt + Unpin + Send + Sync,
|
||||||
{
|
{
|
||||||
// Before anything, send version tag, which is checked in recv_loop
|
|
||||||
write.write_all(&VERSION_TAG[..]).await?;
|
|
||||||
write.flush().await?;
|
|
||||||
|
|
||||||
let mut sending = SendQueue::new();
|
let mut sending = SendQueue::new();
|
||||||
let mut should_exit = false;
|
let mut should_exit = false;
|
||||||
while !should_exit || !sending.is_empty() {
|
while !should_exit || !sending.is_empty() {
|
||||||
|
@ -198,17 +190,6 @@ pub(crate) trait RecvLoop: Sync + 'static {
|
||||||
where
|
where
|
||||||
R: AsyncReadExt + Unpin + Send + Sync,
|
R: AsyncReadExt + Unpin + Send + Sync,
|
||||||
{
|
{
|
||||||
let mut their_version_tag = [0u8; 8];
|
|
||||||
read.read_exact(&mut their_version_tag[..]).await?;
|
|
||||||
if their_version_tag != VERSION_TAG {
|
|
||||||
let msg = format!(
|
|
||||||
"Different netapp versions: {:?} (theirs) vs. {:?} (ours)",
|
|
||||||
their_version_tag, VERSION_TAG
|
|
||||||
);
|
|
||||||
error!("{}", msg);
|
|
||||||
return Err(Error::VersionMismatch(msg));
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut receiving = HashMap::new();
|
let mut receiving = HashMap::new();
|
||||||
loop {
|
loop {
|
||||||
trace!("recv_loop: reading packet");
|
trace!("recv_loop: reading packet");
|
||||||
|
|
|
@ -20,7 +20,7 @@ use tokio::select;
|
||||||
use tokio::sync::{mpsc, watch};
|
use tokio::sync::{mpsc, watch};
|
||||||
use tokio_util::compat::*;
|
use tokio_util::compat::*;
|
||||||
|
|
||||||
use futures::io::AsyncReadExt;
|
use futures::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
@ -67,6 +67,7 @@ impl ServerConn {
|
||||||
let remote_addr = socket.peer_addr()?;
|
let remote_addr = socket.peer_addr()?;
|
||||||
let mut socket = socket.compat();
|
let mut socket = socket.compat();
|
||||||
|
|
||||||
|
// Do handshake to authenticate client
|
||||||
let handshake = handshake_server(
|
let handshake = handshake_server(
|
||||||
&mut socket,
|
&mut socket,
|
||||||
netapp.netid.clone(),
|
netapp.netid.clone(),
|
||||||
|
@ -82,11 +83,17 @@ impl ServerConn {
|
||||||
remote_addr
|
remote_addr
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Create BoxStream layer that encodes content
|
||||||
let (read, write) = socket.split();
|
let (read, write) = socket.split();
|
||||||
|
let (read, mut write) =
|
||||||
let (read, write) =
|
|
||||||
BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();
|
BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();
|
||||||
|
|
||||||
|
// Before doing anything, send version tag, so that client
|
||||||
|
// can check and disconnect if version is wrong
|
||||||
|
write.write_all(&netapp.version_tag[..]).await?;
|
||||||
|
write.flush().await?;
|
||||||
|
|
||||||
|
// Build and launch stuff that handles requests server-side
|
||||||
let (resp_send, resp_recv) = mpsc::unbounded_channel();
|
let (resp_send, resp_recv) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let conn = Arc::new(ServerConn {
|
let conn = Arc::new(ServerConn {
|
||||||
|
|
Loading…
Reference in a new issue