Implementn basic LMTP server
This commit is contained in:
parent
01d82c14ca
commit
553a15a82a
7 changed files with 350 additions and 14 deletions
5
Cargo.lock
generated
5
Cargo.lock
generated
|
@ -1106,6 +1106,8 @@ dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"base64",
|
"base64",
|
||||||
"clap",
|
"clap",
|
||||||
|
"duplexify",
|
||||||
|
"futures",
|
||||||
"hex",
|
"hex",
|
||||||
"im",
|
"im",
|
||||||
"itertools",
|
"itertools",
|
||||||
|
@ -1122,9 +1124,11 @@ dependencies = [
|
||||||
"rusoto_s3",
|
"rusoto_s3",
|
||||||
"rusoto_signature",
|
"rusoto_signature",
|
||||||
"serde",
|
"serde",
|
||||||
|
"smtp-message",
|
||||||
"smtp-server",
|
"smtp-server",
|
||||||
"sodiumoxide",
|
"sodiumoxide",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"tokio-util",
|
||||||
"toml",
|
"toml",
|
||||||
"zstd",
|
"zstd",
|
||||||
]
|
]
|
||||||
|
@ -2059,6 +2063,7 @@ checksum = "f988a1a1adc2fb21f9c12aa96441da33a1728193ae0b95d2be22dbd17fcb4e5c"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
|
"futures-io",
|
||||||
"futures-sink",
|
"futures-sink",
|
||||||
"pin-project-lite 0.2.9",
|
"pin-project-lite 0.2.9",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
|
|
@ -12,7 +12,9 @@ argon2 = "0.3"
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
base64 = "0.13"
|
base64 = "0.13"
|
||||||
clap = { version = "3.1.18", features = ["derive", "env"] }
|
clap = { version = "3.1.18", features = ["derive", "env"] }
|
||||||
|
duplexify = "1.1.0"
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
|
futures = "0.3"
|
||||||
im = "15"
|
im = "15"
|
||||||
itertools = "0.10"
|
itertools = "0.10"
|
||||||
lazy_static = "1.4"
|
lazy_static = "1.4"
|
||||||
|
@ -28,10 +30,12 @@ rand = "0.8.5"
|
||||||
rmp-serde = "0.15"
|
rmp-serde = "0.15"
|
||||||
rpassword = "6.0"
|
rpassword = "6.0"
|
||||||
sodiumoxide = "0.2"
|
sodiumoxide = "0.2"
|
||||||
tokio = "1.17.0"
|
tokio = { version = "1.18", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
|
||||||
|
tokio-util = { version = "0.7", features = [ "compat" ] }
|
||||||
toml = "0.5"
|
toml = "0.5"
|
||||||
zstd = { version = "0.9", default-features = false }
|
zstd = { version = "0.9", default-features = false }
|
||||||
|
|
||||||
|
smtp-message = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" }
|
||||||
smtp-server = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" }
|
smtp-server = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" }
|
||||||
|
|
||||||
k2v-client = { git = "https://git.deuxfleurs.fr/Deuxfleurs/garage.git", branch = "improve-k2v-client" }
|
k2v-client = { git = "https://git.deuxfleurs.fr/Deuxfleurs/garage.git", branch = "improve-k2v-client" }
|
||||||
|
|
|
@ -68,6 +68,7 @@ pub struct LoginLdapConfig {
|
||||||
#[derive(Deserialize, Debug, Clone)]
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
pub struct LmtpConfig {
|
pub struct LmtpConfig {
|
||||||
pub bind_addr: SocketAddr,
|
pub bind_addr: SocketAddr,
|
||||||
|
pub hostname: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn read_config(config_file: PathBuf) -> Result<Config> {
|
pub fn read_config(config_file: PathBuf) -> Result<Config> {
|
||||||
|
|
263
src/lmtp.rs
Normal file
263
src/lmtp.rs
Normal file
|
@ -0,0 +1,263 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::{pin::Pin, sync::Arc};
|
||||||
|
|
||||||
|
use anyhow::{bail, Result};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use duplexify::Duplex;
|
||||||
|
use futures::{io, AsyncRead, AsyncReadExt, AsyncWrite};
|
||||||
|
use futures::{stream, stream::FuturesUnordered, StreamExt};
|
||||||
|
use log::*;
|
||||||
|
use rusoto_s3::{PutObjectRequest, S3Client, S3};
|
||||||
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
|
use tokio::select;
|
||||||
|
use tokio::sync::watch;
|
||||||
|
use tokio_util::compat::*;
|
||||||
|
|
||||||
|
use smtp_message::{Email, EscapedDataReader, Reply, ReplyCode};
|
||||||
|
use smtp_server::{reply, Config, ConnectionMetadata, Decision, MailMetadata, Protocol};
|
||||||
|
|
||||||
|
use crate::config::*;
|
||||||
|
use crate::cryptoblob::*;
|
||||||
|
use crate::login::*;
|
||||||
|
use crate::mail_uuid::*;
|
||||||
|
|
||||||
|
pub struct LmtpServer {
|
||||||
|
bind_addr: SocketAddr,
|
||||||
|
hostname: String,
|
||||||
|
login_provider: Arc<dyn LoginProvider + Send + Sync>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LmtpServer {
|
||||||
|
pub fn new(
|
||||||
|
config: LmtpConfig,
|
||||||
|
login_provider: Arc<dyn LoginProvider + Send + Sync>,
|
||||||
|
) -> Arc<Self> {
|
||||||
|
Arc::new(Self {
|
||||||
|
bind_addr: config.bind_addr,
|
||||||
|
hostname: config.hostname,
|
||||||
|
login_provider,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(self: &Arc<Self>, mut must_exit: watch::Receiver<bool>) -> Result<()> {
|
||||||
|
let tcp = TcpListener::bind(self.bind_addr).await?;
|
||||||
|
let mut connections = FuturesUnordered::new();
|
||||||
|
|
||||||
|
while !*must_exit.borrow() {
|
||||||
|
let wait_conn_finished = async {
|
||||||
|
if connections.is_empty() {
|
||||||
|
futures::future::pending().await
|
||||||
|
} else {
|
||||||
|
connections.next().await
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let (socket, remote_addr) = select! {
|
||||||
|
a = tcp.accept() => a?,
|
||||||
|
_ = wait_conn_finished => continue,
|
||||||
|
_ = must_exit.changed() => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
let conn = tokio::spawn(smtp_server::interact(
|
||||||
|
socket.compat(),
|
||||||
|
smtp_server::IsAlreadyTls::No,
|
||||||
|
Conn { remote_addr },
|
||||||
|
self.clone(),
|
||||||
|
));
|
||||||
|
|
||||||
|
connections.push(conn);
|
||||||
|
}
|
||||||
|
drop(tcp);
|
||||||
|
|
||||||
|
info!("LMTP server shutting down, draining remaining connections...");
|
||||||
|
while connections.next().await.is_some() {}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
|
pub struct Conn {
|
||||||
|
remote_addr: SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Message {
|
||||||
|
to: Vec<PublicCredentials>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Config for LmtpServer {
|
||||||
|
const PROTOCOL: Protocol = Protocol::Lmtp;
|
||||||
|
|
||||||
|
type ConnectionUserMeta = Conn;
|
||||||
|
type MailUserMeta = Message;
|
||||||
|
|
||||||
|
fn hostname(&self, _conn_meta: &ConnectionMetadata<Conn>) -> &str {
|
||||||
|
&self.hostname
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn new_mail(&self, _conn_meta: &mut ConnectionMetadata<Conn>) -> Message {
|
||||||
|
Message { to: vec![] }
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn tls_accept<IO>(
|
||||||
|
&self,
|
||||||
|
_io: IO,
|
||||||
|
_conn_meta: &mut ConnectionMetadata<Conn>,
|
||||||
|
) -> io::Result<Duplex<Pin<Box<dyn Send + AsyncRead>>, Pin<Box<dyn Send + AsyncWrite>>>>
|
||||||
|
where
|
||||||
|
IO: Send + AsyncRead + AsyncWrite,
|
||||||
|
{
|
||||||
|
Err(io::Error::new(
|
||||||
|
io::ErrorKind::InvalidInput,
|
||||||
|
"TLS not implemented for LMTP server",
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn filter_from(
|
||||||
|
&self,
|
||||||
|
from: Option<Email>,
|
||||||
|
meta: &mut MailMetadata<Message>,
|
||||||
|
_conn_meta: &mut ConnectionMetadata<Conn>,
|
||||||
|
) -> Decision<Option<Email>> {
|
||||||
|
Decision::Accept {
|
||||||
|
reply: reply::okay_from().convert(),
|
||||||
|
res: from,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn filter_to(
|
||||||
|
&self,
|
||||||
|
to: Email,
|
||||||
|
meta: &mut MailMetadata<Message>,
|
||||||
|
_conn_meta: &mut ConnectionMetadata<Conn>,
|
||||||
|
) -> Decision<Email> {
|
||||||
|
let to_str = match to.hostname.as_ref() {
|
||||||
|
Some(h) => format!("{}@{}", to.localpart, h),
|
||||||
|
None => to.localpart.to_string(),
|
||||||
|
};
|
||||||
|
match self.login_provider.public_login(&to_str).await {
|
||||||
|
Ok(creds) => {
|
||||||
|
meta.user.to.push(creds);
|
||||||
|
Decision::Accept {
|
||||||
|
reply: reply::okay_to().convert(),
|
||||||
|
res: to,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => Decision::Reject {
|
||||||
|
reply: Reply {
|
||||||
|
code: ReplyCode::POLICY_REASON,
|
||||||
|
ecode: None,
|
||||||
|
text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_mail<'a, R>(
|
||||||
|
&self,
|
||||||
|
reader: &mut EscapedDataReader<'a, R>,
|
||||||
|
_mail: MailMetadata<Message>,
|
||||||
|
_conn_meta: &mut ConnectionMetadata<Conn>,
|
||||||
|
) -> Decision<()>
|
||||||
|
where
|
||||||
|
R: Send + Unpin + AsyncRead,
|
||||||
|
{
|
||||||
|
unreachable!();
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_mail_multi<'a, 'slife0, 'slife1, 'stream, R>(
|
||||||
|
&'slife0 self,
|
||||||
|
reader: &mut EscapedDataReader<'a, R>,
|
||||||
|
meta: MailMetadata<Message>,
|
||||||
|
conn_meta: &'slife1 mut ConnectionMetadata<Conn>,
|
||||||
|
) -> Pin<Box<dyn futures::Stream<Item = Decision<()>> + Send + 'stream>>
|
||||||
|
where
|
||||||
|
R: Send + Unpin + AsyncRead,
|
||||||
|
'slife0: 'stream,
|
||||||
|
'slife1: 'stream,
|
||||||
|
Self: 'stream,
|
||||||
|
{
|
||||||
|
let err_response_stream = |meta: MailMetadata<Message>, msg: String| {
|
||||||
|
Box::pin(
|
||||||
|
stream::iter(meta.user.to.into_iter()).map(move |_| Decision::Reject {
|
||||||
|
reply: Reply {
|
||||||
|
code: ReplyCode::POLICY_REASON,
|
||||||
|
ecode: None,
|
||||||
|
text: vec![smtp_message::MaybeUtf8::Utf8(msg.clone())],
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut text = Vec::new();
|
||||||
|
if reader.read_to_end(&mut text).await.is_err() {
|
||||||
|
return err_response_stream(meta, "io error".into());
|
||||||
|
}
|
||||||
|
reader.complete();
|
||||||
|
|
||||||
|
let encrypted_message = match EncryptedMessage::new(text) {
|
||||||
|
Ok(x) => Arc::new(x),
|
||||||
|
Err(e) => return err_response_stream(meta, e.to_string()),
|
||||||
|
};
|
||||||
|
|
||||||
|
Box::pin(stream::iter(meta.user.to.into_iter()).then(move |creds| {
|
||||||
|
let encrypted_message = encrypted_message.clone();
|
||||||
|
async move {
|
||||||
|
match encrypted_message.deliver_to(creds).await {
|
||||||
|
Ok(()) => Decision::Accept {
|
||||||
|
reply: reply::okay_mail().convert(),
|
||||||
|
res: (),
|
||||||
|
},
|
||||||
|
Err(e) => Decision::Reject {
|
||||||
|
reply: Reply {
|
||||||
|
code: ReplyCode::POLICY_REASON,
|
||||||
|
ecode: None,
|
||||||
|
text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
|
struct EncryptedMessage {
|
||||||
|
key: Key,
|
||||||
|
encrypted_body: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EncryptedMessage {
|
||||||
|
fn new(body: Vec<u8>) -> Result<Self> {
|
||||||
|
let key = gen_key();
|
||||||
|
let encrypted_body = seal(&body, &key)?;
|
||||||
|
Ok(Self {
|
||||||
|
key,
|
||||||
|
encrypted_body,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
|
||||||
|
let s3_client = creds.storage.s3_client()?;
|
||||||
|
|
||||||
|
let encrypted_key =
|
||||||
|
sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key);
|
||||||
|
let key_header = base64::encode(&encrypted_key);
|
||||||
|
|
||||||
|
let mut por = PutObjectRequest::default();
|
||||||
|
por.bucket = creds.storage.bucket.clone();
|
||||||
|
por.key = format!("incoming/{}", gen_uuid().to_string());
|
||||||
|
por.metadata = Some(
|
||||||
|
[("Message-Key".to_string(), key_header)]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<HashMap<_, _>>(),
|
||||||
|
);
|
||||||
|
por.body = Some(self.encrypted_body.clone().into());
|
||||||
|
s3_client.put_object(por).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
|
@ -71,6 +71,12 @@ impl Serialize for MailUuid {
|
||||||
where
|
where
|
||||||
S: Serializer,
|
S: Serializer,
|
||||||
{
|
{
|
||||||
serializer.serialize_str(&hex::encode(self.0))
|
serializer.serialize_str(&self.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ToString for MailUuid {
|
||||||
|
fn to_string(&self) -> String {
|
||||||
|
hex::encode(self.0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
12
src/main.rs
12
src/main.rs
|
@ -1,6 +1,7 @@
|
||||||
mod bayou;
|
mod bayou;
|
||||||
mod config;
|
mod config;
|
||||||
mod cryptoblob;
|
mod cryptoblob;
|
||||||
|
mod lmtp;
|
||||||
mod login;
|
mod login;
|
||||||
mod mail_uuid;
|
mod mail_uuid;
|
||||||
mod mailbox;
|
mod mailbox;
|
||||||
|
@ -35,6 +36,11 @@ enum Command {
|
||||||
#[clap(short, long, env = "CONFIG_FILE", default_value = "mailrage.toml")]
|
#[clap(short, long, env = "CONFIG_FILE", default_value = "mailrage.toml")]
|
||||||
config_file: PathBuf,
|
config_file: PathBuf,
|
||||||
},
|
},
|
||||||
|
/// TEST TEST TEST
|
||||||
|
Test {
|
||||||
|
#[clap(short, long, env = "CONFIG_FILE", default_value = "mailrage.toml")]
|
||||||
|
config_file: PathBuf,
|
||||||
|
},
|
||||||
/// Initializes key pairs for a user and adds a key decryption password
|
/// Initializes key pairs for a user and adds a key decryption password
|
||||||
FirstLogin {
|
FirstLogin {
|
||||||
#[clap(flatten)]
|
#[clap(flatten)]
|
||||||
|
@ -125,6 +131,12 @@ async fn main() -> Result<()> {
|
||||||
let server = Server::new(config)?;
|
let server = Server::new(config)?;
|
||||||
server.run().await?;
|
server.run().await?;
|
||||||
}
|
}
|
||||||
|
Command::Test { config_file } => {
|
||||||
|
let config = read_config(config_file)?;
|
||||||
|
|
||||||
|
let server = Server::new(config)?;
|
||||||
|
server.test().await?;
|
||||||
|
}
|
||||||
Command::FirstLogin {
|
Command::FirstLogin {
|
||||||
creds,
|
creds,
|
||||||
user_secrets,
|
user_secrets,
|
||||||
|
|
|
@ -1,18 +1,23 @@
|
||||||
use anyhow::{bail, Result};
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use anyhow::{bail, Result};
|
||||||
|
use futures::{try_join, StreamExt};
|
||||||
|
use log::*;
|
||||||
use rusoto_signature::Region;
|
use rusoto_signature::Region;
|
||||||
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use crate::config::*;
|
use crate::config::*;
|
||||||
|
use crate::lmtp::*;
|
||||||
use crate::login::{ldap_provider::*, static_provider::*, *};
|
use crate::login::{ldap_provider::*, static_provider::*, *};
|
||||||
use crate::mailbox::Mailbox;
|
use crate::mailbox::Mailbox;
|
||||||
|
|
||||||
pub struct Server {
|
pub struct Server {
|
||||||
pub login_provider: Box<dyn LoginProvider>,
|
pub login_provider: Arc<dyn LoginProvider + Send + Sync>,
|
||||||
|
pub lmtp_server: Option<Arc<LmtpServer>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Server {
|
impl Server {
|
||||||
pub fn new(config: Config) -> Result<Arc<Self>> {
|
pub fn new(config: Config) -> Result<Self> {
|
||||||
let s3_region = Region::Custom {
|
let s3_region = Region::Custom {
|
||||||
name: config.aws_region.clone(),
|
name: config.aws_region.clone(),
|
||||||
endpoint: config.s3_endpoint,
|
endpoint: config.s3_endpoint,
|
||||||
|
@ -21,17 +26,43 @@ impl Server {
|
||||||
name: config.aws_region,
|
name: config.aws_region,
|
||||||
endpoint: config.k2v_endpoint,
|
endpoint: config.k2v_endpoint,
|
||||||
};
|
};
|
||||||
let login_provider: Box<dyn LoginProvider> = match (config.login_static, config.login_ldap)
|
let login_provider: Arc<dyn LoginProvider + Send + Sync> =
|
||||||
{
|
match (config.login_static, config.login_ldap) {
|
||||||
(Some(st), None) => Box::new(StaticLoginProvider::new(st, k2v_region, s3_region)?),
|
(Some(st), None) => Arc::new(StaticLoginProvider::new(st, k2v_region, s3_region)?),
|
||||||
(None, Some(ld)) => Box::new(LdapLoginProvider::new(ld, k2v_region, s3_region)?),
|
(None, Some(ld)) => Arc::new(LdapLoginProvider::new(ld, k2v_region, s3_region)?),
|
||||||
(Some(_), Some(_)) => bail!("A single login provider must be set up in config file"),
|
(Some(_), Some(_)) => {
|
||||||
(None, None) => bail!("No login provider is set up in config file"),
|
bail!("A single login provider must be set up in config file")
|
||||||
};
|
}
|
||||||
Ok(Arc::new(Self { login_provider }))
|
(None, None) => bail!("No login provider is set up in config file"),
|
||||||
|
};
|
||||||
|
|
||||||
|
let lmtp_server = config
|
||||||
|
.lmtp
|
||||||
|
.map(|cfg| LmtpServer::new(cfg, login_provider.clone()));
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
login_provider,
|
||||||
|
lmtp_server,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(self: &Arc<Self>) -> Result<()> {
|
pub async fn run(&self) -> Result<()> {
|
||||||
|
let (exit_signal, provoke_exit) = watch_ctrl_c();
|
||||||
|
let exit_on_err = move |err: anyhow::Error| {
|
||||||
|
error!("Error: {}", err);
|
||||||
|
let _ = provoke_exit.send(true);
|
||||||
|
};
|
||||||
|
|
||||||
|
try_join!(async {
|
||||||
|
match self.lmtp_server.as_ref() {
|
||||||
|
None => Ok(()),
|
||||||
|
Some(s) => s.run(exit_signal.clone()).await,
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn test(&self) -> Result<()> {
|
||||||
let creds = self.login_provider.login("lx", "plop").await?;
|
let creds = self.login_provider.login("lx", "plop").await?;
|
||||||
|
|
||||||
let mut mailbox = Mailbox::new(&creds, "TestMailbox".to_string()).await?;
|
let mut mailbox = Mailbox::new(&creds, "TestMailbox".to_string()).await?;
|
||||||
|
@ -41,3 +72,17 @@ impl Server {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn watch_ctrl_c() -> (watch::Receiver<bool>, Arc<watch::Sender<bool>>) {
|
||||||
|
let (send_cancel, watch_cancel) = watch::channel(false);
|
||||||
|
let send_cancel = Arc::new(send_cancel);
|
||||||
|
let send_cancel_2 = send_cancel.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
tokio::signal::ctrl_c()
|
||||||
|
.await
|
||||||
|
.expect("failed to install CTRL+C signal handler");
|
||||||
|
info!("Received CTRL+C, shutting down.");
|
||||||
|
send_cancel.send(true).unwrap();
|
||||||
|
});
|
||||||
|
(watch_cancel, send_cancel_2)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue