WIP implem storage
This commit is contained in:
parent
9afbfeb427
commit
f179479c30
2 changed files with 113 additions and 27 deletions
|
@ -9,7 +9,7 @@ use aero_user::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key
|
|||
use aero_user::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store};
|
||||
|
||||
use crate::unique_ident::*;
|
||||
use crate::davdag::DavDag;
|
||||
use crate::davdag::{DavDag, IndexEntry, Token, BlobId, SyncChange};
|
||||
|
||||
pub struct Calendar {
|
||||
pub(super) id: UniqueIdent,
|
||||
|
@ -20,8 +20,49 @@ impl Calendar {
|
|||
pub(crate) async fn open(
|
||||
creds: &Credentials,
|
||||
id: UniqueIdent,
|
||||
) -> Result<Self> {
|
||||
todo!();
|
||||
) -> Result<Self> {
|
||||
let bayou_path = format!("calendar/dag/{}", id);
|
||||
let cal_path = format!("calendar/events/{}", id);
|
||||
|
||||
let mut davdag = Bayou::<DavDag>::new(creds, bayou_path).await?;
|
||||
davdag.sync().await?;
|
||||
|
||||
let internal = RwLock::new(CalendarInternal {
|
||||
id,
|
||||
encryption_key: creds.keys.master.clone(),
|
||||
storage: creds.storage.build().await?,
|
||||
davdag,
|
||||
cal_path,
|
||||
});
|
||||
|
||||
Ok(Self { id, internal })
|
||||
}
|
||||
|
||||
/// Sync data with backing store
|
||||
pub async fn force_sync(&self) -> Result<()> {
|
||||
self.internal.write().await.force_sync().await
|
||||
}
|
||||
|
||||
/// Sync data with backing store only if changes are detected
|
||||
/// or last sync is too old
|
||||
pub async fn opportunistic_sync(&self) -> Result<()> {
|
||||
self.internal.write().await.opportunistic_sync().await
|
||||
}
|
||||
|
||||
pub async fn get(&self, blob_id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
|
||||
self.internal.read().await.get(blob_id, message_key).await
|
||||
}
|
||||
|
||||
pub async fn diff(&self, sync_token: Token) -> Result<(Token, Vec<SyncChange>)> {
|
||||
self.internal.read().await.diff(sync_token).await
|
||||
}
|
||||
|
||||
pub async fn put<'a>(&self, entry: IndexEntry, evt: &'a [u8]) -> Result<Token> {
|
||||
self.internal.write().await.put(entry, evt).await
|
||||
}
|
||||
|
||||
pub async fn delete(&self, blob_id: UniqueIdent) -> Result<Token> {
|
||||
self.internal.write().await.delete(blob_id).await
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -30,5 +71,35 @@ struct CalendarInternal {
|
|||
cal_path: String,
|
||||
encryption_key: Key,
|
||||
storage: Store,
|
||||
uid_index: Bayou<DavDag>,
|
||||
davdag: Bayou<DavDag>,
|
||||
}
|
||||
|
||||
impl CalendarInternal {
|
||||
async fn force_sync(&mut self) -> Result<()> {
|
||||
self.davdag.sync().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn opportunistic_sync(&mut self) -> Result<()> {
|
||||
self.davdag.opportunistic_sync().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get(&self, blob_id: BlobId, message_key: &Key) -> Result<Vec<u8>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn put<'a>(&mut self, entry: IndexEntry, evt: &'a [u8]) -> Result<Token> {
|
||||
//@TODO write event to S3
|
||||
//@TODO add entry into Bayou
|
||||
todo!();
|
||||
}
|
||||
|
||||
async fn delete(&mut self, blob_id: BlobId) -> Result<Token> {
|
||||
todo!();
|
||||
}
|
||||
|
||||
async fn diff(&self, sync_token: Token) -> Result<(Token, Vec<SyncChange>)> {
|
||||
todo!();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,20 +20,31 @@ pub type IndexEntry = (BlobId, FileName, Etag);
|
|||
#[derive(Clone, Default)]
|
||||
pub struct DavDag {
|
||||
/// Source of trust
|
||||
pub table: OrdMap<UniqueIdent, IndexEntry>,
|
||||
pub table: OrdMap<BlobId, IndexEntry>,
|
||||
|
||||
/// Indexes optimized for queries
|
||||
pub idx_by_filename: OrdMap<FileName, UniqueIdent>,
|
||||
pub idx_by_filename: OrdMap<FileName, BlobId>,
|
||||
|
||||
// ------------ Below this line, data is ephemeral, ie. not checkpointed
|
||||
|
||||
/// Partial synchronization graph
|
||||
pub ancestors: OrdMap<UniqueIdent, OrdSet<UniqueIdent>>,
|
||||
pub ancestors: OrdMap<Token, OrdSet<Token>>,
|
||||
|
||||
/// All nodes
|
||||
pub all_nodes: OrdSet<UniqueIdent>,
|
||||
pub all_nodes: OrdSet<Token>,
|
||||
/// Head nodes
|
||||
pub heads: OrdSet<UniqueIdent>,
|
||||
pub heads: OrdSet<Token>,
|
||||
/// Origin nodes
|
||||
pub origins: OrdSet<UniqueIdent>,
|
||||
pub origins: OrdSet<Token>,
|
||||
|
||||
/// File change token by token
|
||||
pub change: OrdMap<Token, SyncChange>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum SyncChange {
|
||||
Ok(FileName),
|
||||
NotFound(FileName),
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, Debug)]
|
||||
|
@ -66,8 +77,8 @@ impl DavDag {
|
|||
DavDagOp::Put(self.sync_desc(), entry)
|
||||
}
|
||||
|
||||
pub fn op_delete(&self, ident: BlobId) -> DavDagOp {
|
||||
DavDagOp::Delete(self.sync_desc(), ident)
|
||||
pub fn op_delete(&self, blob_id: BlobId) -> DavDagOp {
|
||||
DavDagOp::Delete(self.sync_desc(), blob_id)
|
||||
}
|
||||
|
||||
// HELPER functions
|
||||
|
@ -129,33 +140,41 @@ impl DavDag {
|
|||
// INTERNAL functions
|
||||
|
||||
/// Register a WebDAV item (put, copy, move)
|
||||
fn register(&mut self, entry: IndexEntry) {
|
||||
fn register(&mut self, sync_token: Option<Token>, entry: IndexEntry) {
|
||||
let (blob_id, filename, _etag) = entry.clone();
|
||||
|
||||
// Insert item in the source of trust
|
||||
self.table.insert(blob_id, entry);
|
||||
|
||||
// Update the cache
|
||||
self.idx_by_filename.insert(filename, blob_id);
|
||||
self.idx_by_filename.insert(filename.to_string(), blob_id);
|
||||
|
||||
// Record the change in the ephemeral synchronization map
|
||||
if let Some(sync_token) = sync_token {
|
||||
self.change.insert(sync_token, SyncChange::Ok(filename));
|
||||
}
|
||||
}
|
||||
|
||||
/// Unregister a WebDAV item (delete, move)
|
||||
fn unregister(&mut self, ident: &UniqueIdent) {
|
||||
fn unregister(&mut self, sync_token: Token, blob_id: &BlobId) {
|
||||
// Query the source of truth to get the information we
|
||||
// need to clean the indexes
|
||||
let (_blob_id, filename, _etag) = match self.table.get(ident) {
|
||||
let (_blob_id, filename, _etag) = match self.table.get(blob_id) {
|
||||
Some(v) => v,
|
||||
// Element does not exist, return early
|
||||
None => return,
|
||||
};
|
||||
self.idx_by_filename.remove(filename);
|
||||
|
||||
// Record the change in the ephemeral synchronization map
|
||||
self.change.insert(sync_token, SyncChange::NotFound(filename.to_string()));
|
||||
|
||||
// Finally clear item from the source of trust
|
||||
self.table.remove(ident);
|
||||
self.table.remove(blob_id);
|
||||
}
|
||||
|
||||
/// When an event is processed, update the synchronization DAG
|
||||
fn sync_dag(&mut self, sync_desc: &SyncDesc) -> bool {
|
||||
fn sync_dag(&mut self, sync_desc: &SyncDesc) {
|
||||
let (parents, child) = sync_desc;
|
||||
|
||||
// --- Update ANCESTORS
|
||||
|
@ -180,8 +199,6 @@ impl DavDag {
|
|||
|
||||
// --- Update ALL NODES
|
||||
self.all_nodes.insert(*child);
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -193,14 +210,12 @@ impl BayouState for DavDag {
|
|||
|
||||
match op {
|
||||
DavDagOp::Put(sync_desc, entry) => {
|
||||
if new.sync_dag(sync_desc) {
|
||||
new.register(entry.clone());
|
||||
}
|
||||
new.sync_dag(sync_desc);
|
||||
new.register(Some(sync_desc.1), entry.clone());
|
||||
},
|
||||
DavDagOp::Delete(sync_desc, blob_id) => {
|
||||
if new.sync_dag(sync_desc) {
|
||||
new.unregister(blob_id);
|
||||
}
|
||||
new.sync_dag(sync_desc);
|
||||
new.unregister(sync_desc.1, blob_id);
|
||||
},
|
||||
DavDagOp::Merge(sync_desc) => {
|
||||
new.sync_dag(sync_desc);
|
||||
|
@ -227,7 +242,7 @@ impl<'de> Deserialize<'de> for DavDag {
|
|||
let mut davdag = DavDag::default();
|
||||
|
||||
// Build the table + index
|
||||
val.items.into_iter().for_each(|entry| davdag.register(entry));
|
||||
val.items.into_iter().for_each(|entry| davdag.register(None, entry));
|
||||
|
||||
// Initialize the synchronization DAG with its roots
|
||||
val.heads.into_iter().for_each(|ident| {
|
||||
|
|
Loading…
Reference in a new issue