2024-03-20 16:31:54 +00:00
|
|
|
pub mod namespace;
|
|
|
|
|
2024-04-04 13:40:26 +00:00
|
|
|
use anyhow::{anyhow, bail, Result};
|
2024-03-27 09:33:46 +00:00
|
|
|
use tokio::sync::RwLock;
|
2024-03-26 14:08:04 +00:00
|
|
|
|
2024-03-27 09:33:46 +00:00
|
|
|
use aero_bayou::Bayou;
|
2024-03-26 14:08:04 +00:00
|
|
|
use aero_user::login::Credentials;
|
2024-04-04 13:40:26 +00:00
|
|
|
use aero_user::cryptoblob::{self, gen_key, Key};
|
|
|
|
use aero_user::storage::{self, BlobRef, BlobVal, Store};
|
2024-03-26 14:08:04 +00:00
|
|
|
|
|
|
|
use crate::unique_ident::*;
|
2024-04-04 09:28:15 +00:00
|
|
|
use crate::davdag::{DavDag, IndexEntry, Token, BlobId, SyncChange};
|
2024-03-26 14:08:04 +00:00
|
|
|
|
2024-03-20 16:31:54 +00:00
|
|
|
pub struct Calendar {
|
2024-03-27 09:33:46 +00:00
|
|
|
pub(super) id: UniqueIdent,
|
|
|
|
internal: RwLock<CalendarInternal>,
|
2024-03-20 16:31:54 +00:00
|
|
|
}
|
2024-03-26 14:08:04 +00:00
|
|
|
|
|
|
|
impl Calendar {
|
|
|
|
pub(crate) async fn open(
|
|
|
|
creds: &Credentials,
|
|
|
|
id: UniqueIdent,
|
2024-04-04 09:28:15 +00:00
|
|
|
) -> Result<Self> {
|
|
|
|
let bayou_path = format!("calendar/dag/{}", id);
|
|
|
|
let cal_path = format!("calendar/events/{}", id);
|
|
|
|
|
|
|
|
let mut davdag = Bayou::<DavDag>::new(creds, bayou_path).await?;
|
|
|
|
davdag.sync().await?;
|
|
|
|
|
|
|
|
let internal = RwLock::new(CalendarInternal {
|
|
|
|
id,
|
|
|
|
encryption_key: creds.keys.master.clone(),
|
|
|
|
storage: creds.storage.build().await?,
|
|
|
|
davdag,
|
|
|
|
cal_path,
|
|
|
|
});
|
|
|
|
|
|
|
|
Ok(Self { id, internal })
|
|
|
|
}
|
|
|
|
|
2024-04-04 09:57:32 +00:00
|
|
|
// ---- DAG sync utilities
|
|
|
|
|
2024-04-04 09:28:15 +00:00
|
|
|
/// Sync data with backing store
|
|
|
|
pub async fn force_sync(&self) -> Result<()> {
|
|
|
|
self.internal.write().await.force_sync().await
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Sync data with backing store only if changes are detected
|
|
|
|
/// or last sync is too old
|
|
|
|
pub async fn opportunistic_sync(&self) -> Result<()> {
|
|
|
|
self.internal.write().await.opportunistic_sync().await
|
|
|
|
}
|
|
|
|
|
2024-04-04 09:57:32 +00:00
|
|
|
// ---- Data API
|
|
|
|
|
|
|
|
/// Access the DAG internal data (you can get the list of files for example)
|
|
|
|
pub async fn dag(&self) -> DavDag {
|
|
|
|
// Cloning is cheap
|
|
|
|
self.internal.read().await.davdag.state().clone()
|
2024-04-04 09:28:15 +00:00
|
|
|
}
|
|
|
|
|
2024-04-04 09:57:32 +00:00
|
|
|
/// The diff API is a write API as we might need to push a merge node
|
|
|
|
/// to get a new sync token
|
2024-04-04 09:28:15 +00:00
|
|
|
pub async fn diff(&self, sync_token: Token) -> Result<(Token, Vec<SyncChange>)> {
|
2024-04-04 09:57:32 +00:00
|
|
|
self.internal.write().await.diff(sync_token).await
|
2024-04-04 09:28:15 +00:00
|
|
|
}
|
|
|
|
|
2024-04-04 09:57:32 +00:00
|
|
|
/// Get a specific event
|
2024-04-04 12:59:47 +00:00
|
|
|
pub async fn get(&self, evt_id: UniqueIdent) -> Result<Vec<u8>> {
|
|
|
|
self.internal.read().await.get(evt_id).await
|
2024-04-04 09:57:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Put a specific event
|
2024-04-04 09:28:15 +00:00
|
|
|
pub async fn put<'a>(&self, entry: IndexEntry, evt: &'a [u8]) -> Result<Token> {
|
|
|
|
self.internal.write().await.put(entry, evt).await
|
|
|
|
}
|
|
|
|
|
2024-04-04 09:57:32 +00:00
|
|
|
/// Delete a specific event
|
2024-04-04 09:28:15 +00:00
|
|
|
pub async fn delete(&self, blob_id: UniqueIdent) -> Result<Token> {
|
|
|
|
self.internal.write().await.delete(blob_id).await
|
2024-03-26 14:08:04 +00:00
|
|
|
}
|
|
|
|
}
|
2024-03-27 09:33:46 +00:00
|
|
|
|
2024-04-04 09:57:32 +00:00
|
|
|
use base64::Engine;
|
|
|
|
const MESSAGE_KEY: &str = "message-key";
|
2024-03-27 09:33:46 +00:00
|
|
|
struct CalendarInternal {
|
2024-04-04 09:57:32 +00:00
|
|
|
#[allow(dead_code)]
|
2024-03-27 09:33:46 +00:00
|
|
|
id: UniqueIdent,
|
|
|
|
cal_path: String,
|
|
|
|
encryption_key: Key,
|
|
|
|
storage: Store,
|
2024-04-04 09:28:15 +00:00
|
|
|
davdag: Bayou<DavDag>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl CalendarInternal {
|
|
|
|
async fn force_sync(&mut self) -> Result<()> {
|
|
|
|
self.davdag.sync().await?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn opportunistic_sync(&mut self) -> Result<()> {
|
|
|
|
self.davdag.opportunistic_sync().await?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2024-04-04 12:59:47 +00:00
|
|
|
async fn get(&self, blob_id: BlobId) -> Result<Vec<u8>> {
|
|
|
|
// Fetch message from S3
|
|
|
|
let blob_ref = storage::BlobRef(format!("{}/{}", self.cal_path, blob_id));
|
|
|
|
let object = self.storage.blob_fetch(&blob_ref).await?;
|
|
|
|
|
|
|
|
// Decrypt message key from headers
|
|
|
|
let key_encrypted_b64 = object
|
|
|
|
.meta
|
|
|
|
.get(MESSAGE_KEY)
|
|
|
|
.ok_or(anyhow!("Missing key in metadata"))?;
|
|
|
|
let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?;
|
|
|
|
let message_key_raw = cryptoblob::open(&key_encrypted, &self.encryption_key)?;
|
|
|
|
let message_key =
|
|
|
|
cryptoblob::Key::from_slice(&message_key_raw).ok_or(anyhow!("Invalid message key"))?;
|
|
|
|
|
|
|
|
// Decrypt body
|
|
|
|
let body = object.value;
|
|
|
|
cryptoblob::open(&body, &message_key)
|
2024-04-04 09:28:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
async fn put<'a>(&mut self, entry: IndexEntry, evt: &'a [u8]) -> Result<Token> {
|
2024-04-04 09:57:32 +00:00
|
|
|
let message_key = gen_key();
|
|
|
|
|
|
|
|
let encrypted_msg_key = cryptoblob::seal(&message_key.as_ref(), &self.encryption_key)?;
|
|
|
|
let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_msg_key);
|
|
|
|
|
|
|
|
// Write event to S3
|
|
|
|
let message_blob = cryptoblob::seal(evt, &message_key)?;
|
|
|
|
let blob_val = BlobVal::new(
|
|
|
|
BlobRef(format!("{}/{}", self.cal_path, entry.0)),
|
|
|
|
message_blob,
|
|
|
|
)
|
|
|
|
.with_meta(MESSAGE_KEY.to_string(), key_header);
|
|
|
|
|
|
|
|
self.storage
|
|
|
|
.blob_insert(blob_val)
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
// Add entry to Bayou
|
|
|
|
let davstate = self.davdag.state();
|
|
|
|
let put_op = davstate.op_put(entry);
|
|
|
|
let token = put_op.token();
|
|
|
|
self.davdag.push(put_op).await?;
|
|
|
|
|
|
|
|
Ok(token)
|
2024-04-04 09:28:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
async fn delete(&mut self, blob_id: BlobId) -> Result<Token> {
|
2024-04-04 13:40:26 +00:00
|
|
|
let davstate = self.davdag.state();
|
|
|
|
|
|
|
|
if davstate.table.contains_key(&blob_id) {
|
|
|
|
bail!("Cannot delete event that doesn't exist");
|
|
|
|
}
|
|
|
|
|
|
|
|
let del_op = davstate.op_delete(blob_id);
|
|
|
|
let token = del_op.token();
|
|
|
|
self.davdag.push(del_op).await?;
|
|
|
|
|
|
|
|
let blob_ref = BlobRef(format!("{}/{}", self.cal_path, blob_id));
|
|
|
|
self.storage.blob_rm(&blob_ref).await?;
|
|
|
|
|
|
|
|
Ok(token)
|
2024-04-04 09:28:15 +00:00
|
|
|
}
|
|
|
|
|
2024-04-04 09:57:32 +00:00
|
|
|
async fn diff(&mut self, sync_token: Token) -> Result<(Token, Vec<SyncChange>)> {
|
2024-04-04 13:40:26 +00:00
|
|
|
let davstate = self.davdag.state();
|
|
|
|
|
|
|
|
let token_changed = davstate.resolve(sync_token)?;
|
|
|
|
let changes = token_changed
|
|
|
|
.iter()
|
|
|
|
.filter_map(|t: &Token| davstate.change.get(t))
|
|
|
|
.map(|s| s.clone())
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
let heads = davstate.heads_vec();
|
|
|
|
let token = match heads.as_slice() {
|
|
|
|
[ token ] => *token,
|
|
|
|
_ => {
|
|
|
|
let op_mg = davstate.op_merge();
|
|
|
|
let token = op_mg.token();
|
|
|
|
self.davdag.push(op_mg).await?;
|
|
|
|
token
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
Ok((token, changes))
|
2024-04-04 09:28:15 +00:00
|
|
|
}
|
2024-03-27 09:33:46 +00:00
|
|
|
}
|