in-memory storage #32

Merged
quentin merged 65 commits from in-memory into main 2023-12-27 16:35:43 +00:00
7 changed files with 36 additions and 16 deletions
Showing only changes of commit 012c6ad672 - Show all commits

View file

@ -58,12 +58,12 @@ pub struct Bayou<S: BayouState> {
}
impl<S: BayouState> Bayou<S> {
pub fn new(creds: &Credentials, path: String) -> Result<Self> {
let storage = creds.storage.build()?;
pub async fn new(creds: &Credentials, path: String) -> Result<Self> {
let storage = creds.storage.build().await?;
//let target = k2v_client.row(&path, WATCH_SK);
let target = storage::RowRef::new(&path, WATCH_SK);
let watch = K2vWatch::new(creds, target.clone())?;
let watch = K2vWatch::new(creds, target.clone()).await?;
Ok(Self {
path,
@ -418,8 +418,8 @@ impl K2vWatch {
/// Creates a new watch and launches subordinate threads.
/// These threads hold Weak pointers to the struct;
/// they exit when the Arc is dropped.
fn new(creds: &Credentials, target: storage::RowRef) -> Result<Arc<Self>> {
let storage = creds.storage.build()?;
async fn new(creds: &Credentials, target: storage::RowRef) -> Result<Arc<Self>> {
let storage = creds.storage.build().await?;
let (tx, rx) = watch::channel::<storage::RowRef>(target.clone());
let notify = Notify::new();

View file

@ -51,8 +51,8 @@ async fn incoming_mail_watch_process_internal(
creds: Credentials,
mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
) -> Result<()> {
let mut lock_held = k2v_lock_loop(creds.storage.build()?, storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK));
let storage = creds.storage.build()?;
let mut lock_held = k2v_lock_loop(creds.storage.build().await?, storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK));
let storage = creds.storage.build().await?;
let mut inbox: Option<Arc<Mailbox>> = None;
let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
@ -411,7 +411,7 @@ impl EncryptedMessage {
}
pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
let storage = creds.storage.build()?;
let storage = creds.storage.build().await?;
// Get causality token of previous watch key
let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);

View file

@ -25,7 +25,7 @@ impl Mailbox {
let index_path = format!("index/{}", id);
let mail_path = format!("mail/{}", id);
let mut uid_index = Bayou::<UidIndex>::new(creds, index_path)?;
let mut uid_index = Bayou::<UidIndex>::new(creds, index_path).await?;
uid_index.sync().await?;
let uidvalidity = uid_index.state().uidvalidity;
@ -44,7 +44,7 @@ impl Mailbox {
let mbox = RwLock::new(MailboxInternal {
id,
encryption_key: creds.keys.master.clone(),
storage: creds.storage.build()?,
storage: creds.storage.build().await?,
uid_index,
mail_path,
});

View file

@ -174,7 +174,7 @@ impl User {
// ---- Internal user & mailbox management ----
async fn open(username: String, creds: Credentials) -> Result<Arc<Self>> {
let storage = creds.storage.build()?;
let storage = creds.storage.build().await?;
let (tx_inbox_id, rx_inbox_id) = watch::channel(None);

View file

@ -1,5 +1,6 @@
use crate::storage::*;
use serde::Serialize;
use aws_sdk_s3 as s3;
#[derive(Clone, Debug, Serialize)]
pub struct GarageConf {
@ -26,9 +27,26 @@ impl GarageBuilder {
}
}
#[async_trait]
impl IBuilder for GarageBuilder {
fn build(&self) -> Result<Store, StorageError> {
unimplemented!();
async fn build(&self) -> Result<Store, StorageError> {
let creds = s3::config::Credentials::new(
self.conf.aws_access_key_id.clone(),
self.conf.aws_secret_access_key.clone(),
None,
None,
"aerogramme"
);
let config = aws_config::from_env()
.region(aws_config::Region::new(self.conf.region.clone()))
.credentials_provider(creds)
.endpoint_url(self.conf.s3_endpoint.clone())
.load()
.await;
let s3_client = aws_sdk_s3::Client::new(&config);
Ok(Box::new(GarageStore { s3: s3_client }))
}
fn unique(&self) -> UnicityBuffer {
UnicityBuffer(self.unicity.clone())
@ -36,7 +54,7 @@ impl IBuilder for GarageBuilder {
}
pub struct GarageStore {
dummy: String,
s3: s3::Client,
}
#[async_trait]

View file

@ -106,8 +106,9 @@ impl MemBuilder {
}
}
#[async_trait]
impl IBuilder for MemBuilder {
fn build(&self) -> Result<Store, StorageError> {
async fn build(&self) -> Result<Store, StorageError> {
Ok(Box::new(MemStore {
row: self.row.clone(),
blob: self.blob.clone(),

View file

@ -157,8 +157,9 @@ pub trait IStore {
#[derive(Clone,Debug,PartialEq,Eq,Hash)]
pub struct UnicityBuffer(Vec<u8>);
#[async_trait]
pub trait IBuilder: std::fmt::Debug {
fn build(&self) -> Result<Store, StorageError>;
async fn build(&self) -> Result<Store, StorageError>;
/// Returns an opaque buffer that uniquely identifies this builder
fn unique(&self) -> UnicityBuffer;