diff --git a/src/bayou.rs b/src/bayou.rs index afe3c75..3c525b3 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -58,12 +58,12 @@ pub struct Bayou { } impl Bayou { - pub fn new(creds: &Credentials, path: String) -> Result { - let storage = creds.storage.build()?; + pub async fn new(creds: &Credentials, path: String) -> Result { + let storage = creds.storage.build().await?; //let target = k2v_client.row(&path, WATCH_SK); let target = storage::RowRef::new(&path, WATCH_SK); - let watch = K2vWatch::new(creds, target.clone())?; + let watch = K2vWatch::new(creds, target.clone()).await?; Ok(Self { path, @@ -418,8 +418,8 @@ impl K2vWatch { /// Creates a new watch and launches subordinate threads. /// These threads hold Weak pointers to the struct; /// they exit when the Arc is dropped. - fn new(creds: &Credentials, target: storage::RowRef) -> Result> { - let storage = creds.storage.build()?; + async fn new(creds: &Credentials, target: storage::RowRef) -> Result> { + let storage = creds.storage.build().await?; let (tx, rx) = watch::channel::(target.clone()); let notify = Notify::new(); diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 2a6c947..3eafac7 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -51,8 +51,8 @@ async fn incoming_mail_watch_process_internal( creds: Credentials, mut rx_inbox_id: watch::Receiver>, ) -> Result<()> { - let mut lock_held = k2v_lock_loop(creds.storage.build()?, storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK)); - let storage = creds.storage.build()?; + let mut lock_held = k2v_lock_loop(creds.storage.build().await?, storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK)); + let storage = creds.storage.build().await?; let mut inbox: Option> = None; let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); @@ -411,7 +411,7 @@ impl EncryptedMessage { } pub async fn deliver_to(self: Arc, creds: PublicCredentials) -> Result<()> { - let storage = creds.storage.build()?; + let storage = creds.storage.build().await?; // Get causality token of previous watch key let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 65f44b1..60a91dd 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -25,7 +25,7 @@ impl Mailbox { let index_path = format!("index/{}", id); let mail_path = format!("mail/{}", id); - let mut uid_index = Bayou::::new(creds, index_path)?; + let mut uid_index = Bayou::::new(creds, index_path).await?; uid_index.sync().await?; let uidvalidity = uid_index.state().uidvalidity; @@ -44,7 +44,7 @@ impl Mailbox { let mbox = RwLock::new(MailboxInternal { id, encryption_key: creds.keys.master.clone(), - storage: creds.storage.build()?, + storage: creds.storage.build().await?, uid_index, mail_path, }); diff --git a/src/mail/user.rs b/src/mail/user.rs index 8413cbf..8d12c58 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -174,7 +174,7 @@ impl User { // ---- Internal user & mailbox management ---- async fn open(username: String, creds: Credentials) -> Result> { - let storage = creds.storage.build()?; + let storage = creds.storage.build().await?; let (tx_inbox_id, rx_inbox_id) = watch::channel(None); diff --git a/src/storage/garage.rs b/src/storage/garage.rs index f202067..5d00ed6 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -1,5 +1,6 @@ use crate::storage::*; use serde::Serialize; +use aws_sdk_s3 as s3; #[derive(Clone, Debug, Serialize)] pub struct GarageConf { @@ -26,9 +27,26 @@ impl GarageBuilder { } } +#[async_trait] impl IBuilder for GarageBuilder { - fn build(&self) -> Result { - unimplemented!(); + async fn build(&self) -> Result { + let creds = s3::config::Credentials::new( + self.conf.aws_access_key_id.clone(), + self.conf.aws_secret_access_key.clone(), + None, + None, + "aerogramme" + ); + + let config = aws_config::from_env() + .region(aws_config::Region::new(self.conf.region.clone())) + .credentials_provider(creds) + .endpoint_url(self.conf.s3_endpoint.clone()) + .load() + .await; + + let s3_client = aws_sdk_s3::Client::new(&config); + Ok(Box::new(GarageStore { s3: s3_client })) } fn unique(&self) -> UnicityBuffer { UnicityBuffer(self.unicity.clone()) @@ -36,7 +54,7 @@ impl IBuilder for GarageBuilder { } pub struct GarageStore { - dummy: String, + s3: s3::Client, } #[async_trait] diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs index fb6e599..723bca0 100644 --- a/src/storage/in_memory.rs +++ b/src/storage/in_memory.rs @@ -106,8 +106,9 @@ impl MemBuilder { } } +#[async_trait] impl IBuilder for MemBuilder { - fn build(&self) -> Result { + async fn build(&self) -> Result { Ok(Box::new(MemStore { row: self.row.clone(), blob: self.blob.clone(), diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 0fedfab..10149e9 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -157,8 +157,9 @@ pub trait IStore { #[derive(Clone,Debug,PartialEq,Eq,Hash)] pub struct UnicityBuffer(Vec); +#[async_trait] pub trait IBuilder: std::fmt::Debug { - fn build(&self) -> Result; + async fn build(&self) -> Result; /// Returns an opaque buffer that uniquely identifies this builder fn unique(&self) -> UnicityBuffer;