incoming has been fully ported
This commit is contained in:
parent
7eb690e49d
commit
4a33ac2265
5 changed files with 47 additions and 44 deletions
|
@ -47,7 +47,7 @@ pub struct Credentials {
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct PublicCredentials {
|
pub struct PublicCredentials {
|
||||||
/// The storage credentials are used to authenticate access to the underlying storage (S3, K2V)
|
/// The storage credentials are used to authenticate access to the underlying storage (S3, K2V)
|
||||||
pub storage: StorageCredentials,
|
pub storage: Builders,
|
||||||
pub public_key: PublicKey,
|
pub public_key: PublicKey,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use std::collections::HashMap;
|
//use std::collections::HashMap;
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
use std::sync::{Arc, Weak};
|
use std::sync::{Arc, Weak};
|
||||||
|
@ -6,11 +6,7 @@ use std::time::Duration;
|
||||||
|
|
||||||
use anyhow::{anyhow, bail, Result};
|
use anyhow::{anyhow, bail, Result};
|
||||||
use futures::{future::BoxFuture, FutureExt};
|
use futures::{future::BoxFuture, FutureExt};
|
||||||
use k2v_client::{CausalityToken, K2vClient, K2vValue};
|
//use tokio::io::AsyncReadExt;
|
||||||
use rusoto_s3::{
|
|
||||||
DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
|
|
||||||
};
|
|
||||||
use tokio::io::AsyncReadExt;
|
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
use tracing::{error, info, warn};
|
use tracing::{error, info, warn};
|
||||||
|
|
||||||
|
@ -81,7 +77,7 @@ async fn incoming_mail_watch_process_internal(
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
inc_k = wait_new_mail => Some(inc_k),
|
inc_k = wait_new_mail => Some(inc_k),
|
||||||
_ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key),
|
_ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(k2v.from_orphan(incoming_key.to_orphan())),
|
||||||
_ = lock_held.changed() => None,
|
_ = lock_held.changed() => None,
|
||||||
_ = rx_inbox_id.changed() => None,
|
_ = rx_inbox_id.changed() => None,
|
||||||
}
|
}
|
||||||
|
@ -220,7 +216,7 @@ fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) ->
|
||||||
enum LockState {
|
enum LockState {
|
||||||
Unknown,
|
Unknown,
|
||||||
Empty,
|
Empty,
|
||||||
Held(UniqueIdent, u64, CausalityToken),
|
Held(UniqueIdent, u64, storage::OrphanRowRef),
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn k2v_lock_loop_internal(
|
async fn k2v_lock_loop_internal(
|
||||||
|
@ -236,10 +232,10 @@ async fn k2v_lock_loop_internal(
|
||||||
|
|
||||||
// Loop 1: watch state of lock in K2V, save that in corresponding watch channel
|
// Loop 1: watch state of lock in K2V, save that in corresponding watch channel
|
||||||
let watch_lock_loop: BoxFuture<Result<()>> = async {
|
let watch_lock_loop: BoxFuture<Result<()>> = async {
|
||||||
let mut ct = None;
|
let mut ct = k2v.row(pk, sk);
|
||||||
loop {
|
loop {
|
||||||
info!("k2v watch lock loop iter: ct = {:?}", ct);
|
info!("k2v watch lock loop iter: ct = {:?}", ct);
|
||||||
match k2v_wait_value_changed(&k2v, pk, sk, &ct).await {
|
match ct.poll().await {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(
|
error!(
|
||||||
"Error in k2v wait value changed: {} ; assuming we no longer hold lock.",
|
"Error in k2v wait value changed: {} ; assuming we no longer hold lock.",
|
||||||
|
@ -250,8 +246,8 @@ async fn k2v_lock_loop_internal(
|
||||||
}
|
}
|
||||||
Ok(cv) => {
|
Ok(cv) => {
|
||||||
let mut lock_state = None;
|
let mut lock_state = None;
|
||||||
for v in cv.value.iter() {
|
for v in cv.content().iter() {
|
||||||
if let K2vValue::Value(vbytes) = v {
|
if let storage::Alternative::Value(vbytes) = v {
|
||||||
if vbytes.len() == 32 {
|
if vbytes.len() == 32 {
|
||||||
let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap());
|
let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap());
|
||||||
let pid = UniqueIdent(vbytes[8..].try_into().unwrap());
|
let pid = UniqueIdent(vbytes[8..].try_into().unwrap());
|
||||||
|
@ -264,16 +260,18 @@ async fn k2v_lock_loop_internal(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
let new_ct = cv.to_ref();
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}",
|
"k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}",
|
||||||
ct, cv.causality, lock_state
|
ct, new_ct, lock_state
|
||||||
);
|
);
|
||||||
state_tx.send(
|
state_tx.send(
|
||||||
lock_state
|
lock_state
|
||||||
.map(|(pid, ts)| LockState::Held(pid, ts, cv.causality.clone()))
|
.map(|(pid, ts)| LockState::Held(pid, ts, new_ct.to_orphan()))
|
||||||
.unwrap_or(LockState::Empty),
|
.unwrap_or(LockState::Empty),
|
||||||
)?;
|
)?;
|
||||||
ct = Some(cv.causality);
|
ct = new_ct;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -359,7 +357,11 @@ async fn k2v_lock_loop_internal(
|
||||||
now_msec() + LOCK_DURATION.as_millis() as u64,
|
now_msec() + LOCK_DURATION.as_millis() as u64,
|
||||||
));
|
));
|
||||||
lock[8..].copy_from_slice(&our_pid.0);
|
lock[8..].copy_from_slice(&our_pid.0);
|
||||||
if let Err(e) = k2v.insert_item(pk, sk, lock, ct).await {
|
let row = match ct {
|
||||||
|
Some(orphan) => k2v.from_orphan(orphan),
|
||||||
|
None => k2v.row(pk, sk),
|
||||||
|
};
|
||||||
|
if let Err(e) = row.set_value(lock).push().await {
|
||||||
error!("Could not take lock: {}", e);
|
error!("Could not take lock: {}", e);
|
||||||
tokio::time::sleep(Duration::from_secs(30)).await;
|
tokio::time::sleep(Duration::from_secs(30)).await;
|
||||||
}
|
}
|
||||||
|
@ -385,7 +387,8 @@ async fn k2v_lock_loop_internal(
|
||||||
_ => None,
|
_ => None,
|
||||||
};
|
};
|
||||||
if let Some(ct) = release {
|
if let Some(ct) = release {
|
||||||
let _ = k2v.delete_item(pk, sk, ct.clone()).await;
|
let row = k2v.from_orphan(ct);
|
||||||
|
let _ = row.rm().await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -407,13 +410,14 @@ impl EncryptedMessage {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
|
pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
|
||||||
let s3_client = creds.storage.s3_client()?;
|
let s3_client = creds.storage.blob_store()?;
|
||||||
let k2v_client = creds.storage.k2v_client()?;
|
let k2v_client = creds.storage.row_store()?;
|
||||||
|
|
||||||
// Get causality token of previous watch key
|
// Get causality token of previous watch key
|
||||||
let watch_ct = match k2v_client.read_item(INCOMING_PK, INCOMING_WATCH_SK).await {
|
let query = k2v_client.row(INCOMING_PK, INCOMING_WATCH_SK);
|
||||||
Err(_) => None,
|
let watch_ct = match query.fetch().await {
|
||||||
Ok(cv) => Some(cv.causality),
|
Err(_) => query,
|
||||||
|
Ok(cv) => cv.to_ref(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Write mail to encrypted storage
|
// Write mail to encrypted storage
|
||||||
|
@ -421,28 +425,14 @@ impl EncryptedMessage {
|
||||||
sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key);
|
sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key);
|
||||||
let key_header = base64::encode(&encrypted_key);
|
let key_header = base64::encode(&encrypted_key);
|
||||||
|
|
||||||
let por = PutObjectRequest {
|
let mut send = s3_client
|
||||||
bucket: creds.storage.bucket.clone(),
|
.blob(&format!("incoming/{}", gen_ident()))
|
||||||
key: format!("incoming/{}", gen_ident()),
|
.set_value(self.encrypted_body.clone().into());
|
||||||
metadata: Some(
|
send.set_meta(MESSAGE_KEY, &key_header);
|
||||||
[(MESSAGE_KEY.to_string(), key_header)]
|
send.push().await?;
|
||||||
.into_iter()
|
|
||||||
.collect::<HashMap<_, _>>(),
|
|
||||||
),
|
|
||||||
body: Some(self.encrypted_body.clone().into()),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
s3_client.put_object(por).await?;
|
|
||||||
|
|
||||||
// Update watch key to signal new mail
|
// Update watch key to signal new mail
|
||||||
k2v_client
|
watch_ct.set_value(gen_ident().0.to_vec()).push().await?;
|
||||||
.insert_item(
|
|
||||||
INCOMING_PK,
|
|
||||||
INCOMING_WATCH_SK,
|
|
||||||
gen_ident().0.to_vec(),
|
|
||||||
watch_ct,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,6 +67,12 @@ impl IRowRef for GrgRef {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Debug for GrgRef {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
unimplemented!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl IRowValue for GrgValue {
|
impl IRowValue for GrgValue {
|
||||||
fn to_ref(&self) -> RowRef {
|
fn to_ref(&self) -> RowRef {
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
|
|
|
@ -72,6 +72,12 @@ impl IRowRef for MemRef {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Debug for MemRef {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
unimplemented!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl IRowValue for MemValue {
|
impl IRowValue for MemValue {
|
||||||
fn to_ref(&self) -> RowRef {
|
fn to_ref(&self) -> RowRef {
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
|
|
|
@ -91,7 +91,7 @@ pub trait IRowStore
|
||||||
}
|
}
|
||||||
pub type RowStore = Box<dyn IRowStore + Sync + Send>;
|
pub type RowStore = Box<dyn IRowStore + Sync + Send>;
|
||||||
|
|
||||||
pub trait IRowRef
|
pub trait IRowRef: std::fmt::Debug
|
||||||
{
|
{
|
||||||
/*fn clone_boxed(&self) -> RowRef;*/
|
/*fn clone_boxed(&self) -> RowRef;*/
|
||||||
fn to_orphan(&self) -> OrphanRowRef;
|
fn to_orphan(&self) -> OrphanRowRef;
|
||||||
|
@ -138,6 +138,7 @@ pub type BlobRef = Box<dyn IBlobRef + Send + Sync>;
|
||||||
pub trait IBlobValue {
|
pub trait IBlobValue {
|
||||||
fn to_ref(&self) -> BlobRef;
|
fn to_ref(&self) -> BlobRef;
|
||||||
fn get_meta(&self, key: &str) -> Option<&[u8]>;
|
fn get_meta(&self, key: &str) -> Option<&[u8]>;
|
||||||
|
fn set_meta(&mut self, key: &str, val: &str);
|
||||||
fn content(&self) -> Option<&[u8]>;
|
fn content(&self) -> Option<&[u8]>;
|
||||||
fn push(&self) -> AsyncResult<()>;
|
fn push(&self) -> AsyncResult<()>;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue