diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs index 936f8c3..7e5a8c1 100644 --- a/aero-collections/src/calendar/mod.rs +++ b/aero-collections/src/calendar/mod.rs @@ -38,6 +38,8 @@ impl Calendar { Ok(Self { id, internal }) } + // ---- DAG sync utilities + /// Sync data with backing store pub async fn force_sync(&self) -> Result<()> { self.internal.write().await.force_sync().await @@ -49,24 +51,40 @@ impl Calendar { self.internal.write().await.opportunistic_sync().await } - pub async fn get(&self, blob_id: UniqueIdent, message_key: &Key) -> Result> { - self.internal.read().await.get(blob_id, message_key).await + // ---- Data API + + /// Access the DAG internal data (you can get the list of files for example) + pub async fn dag(&self) -> DavDag { + // Cloning is cheap + self.internal.read().await.davdag.state().clone() } + /// The diff API is a write API as we might need to push a merge node + /// to get a new sync token pub async fn diff(&self, sync_token: Token) -> Result<(Token, Vec)> { - self.internal.read().await.diff(sync_token).await + self.internal.write().await.diff(sync_token).await } + /// Get a specific event + pub async fn get(&self, evt_id: UniqueIdent, message_key: &Key) -> Result> { + self.internal.read().await.get(evt_id, message_key).await + } + + /// Put a specific event pub async fn put<'a>(&self, entry: IndexEntry, evt: &'a [u8]) -> Result { self.internal.write().await.put(entry, evt).await } + /// Delete a specific event pub async fn delete(&self, blob_id: UniqueIdent) -> Result { self.internal.write().await.delete(blob_id).await } } +use base64::Engine; +const MESSAGE_KEY: &str = "message-key"; struct CalendarInternal { + #[allow(dead_code)] id: UniqueIdent, cal_path: String, encryption_key: Key, @@ -90,16 +108,37 @@ impl CalendarInternal { } async fn put<'a>(&mut self, entry: IndexEntry, evt: &'a [u8]) -> Result { - //@TODO write event to S3 - //@TODO add entry into Bayou - todo!(); + let message_key = gen_key(); + + let encrypted_msg_key = cryptoblob::seal(&message_key.as_ref(), &self.encryption_key)?; + let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_msg_key); + + // Write event to S3 + let message_blob = cryptoblob::seal(evt, &message_key)?; + let blob_val = BlobVal::new( + BlobRef(format!("{}/{}", self.cal_path, entry.0)), + message_blob, + ) + .with_meta(MESSAGE_KEY.to_string(), key_header); + + self.storage + .blob_insert(blob_val) + .await?; + + // Add entry to Bayou + let davstate = self.davdag.state(); + let put_op = davstate.op_put(entry); + let token = put_op.token(); + self.davdag.push(put_op).await?; + + Ok(token) } async fn delete(&mut self, blob_id: BlobId) -> Result { todo!(); } - async fn diff(&self, sync_token: Token) -> Result<(Token, Vec)> { + async fn diff(&mut self, sync_token: Token) -> Result<(Token, Vec)> { todo!(); } }