Testing DAG sync

This commit is contained in:
Quentin 2024-03-27 16:16:37 +01:00
parent a146a0babc
commit 9afbfeb427
Signed by: quentin
GPG key ID: E9602264D639FF68

View file

@ -4,14 +4,18 @@ use im::{OrdMap, OrdSet, ordset};
use aero_bayou::*; use aero_bayou::*;
use crate::unique_ident::UniqueIdent; use crate::unique_ident::{gen_ident, UniqueIdent};
/// Parents are only persisted in the event log, /// Parents are only persisted in the event log,
/// not in the checkpoints. /// not in the checkpoints.
pub type Parents = Vec<UniqueIdent>; pub type Token = UniqueIdent;
pub type Parents = Vec<Token>;
pub type SyncDesc = (Parents, Token);
pub type BlobId = UniqueIdent;
pub type Etag = String; pub type Etag = String;
pub type FileName = String; pub type FileName = String;
pub type IndexEntry = (FileName, Etag); pub type IndexEntry = (BlobId, FileName, Etag);
#[derive(Clone, Default)] #[derive(Clone, Default)]
pub struct DavDag { pub struct DavDag {
@ -22,8 +26,6 @@ pub struct DavDag {
pub idx_by_filename: OrdMap<FileName, UniqueIdent>, pub idx_by_filename: OrdMap<FileName, UniqueIdent>,
/// Partial synchronization graph /// Partial synchronization graph
/// parent -> direct children
pub successors: OrdMap<UniqueIdent, OrdSet<UniqueIdent>>,
pub ancestors: OrdMap<UniqueIdent, OrdSet<UniqueIdent>>, pub ancestors: OrdMap<UniqueIdent, OrdSet<UniqueIdent>>,
/// All nodes /// All nodes
@ -37,52 +39,68 @@ pub struct DavDag {
#[derive(Clone, Serialize, Deserialize, Debug)] #[derive(Clone, Serialize, Deserialize, Debug)]
pub enum DavDagOp { pub enum DavDagOp {
/// Merge is a virtual operation run when multiple heads are discovered /// Merge is a virtual operation run when multiple heads are discovered
Merge(Parents, UniqueIdent), Merge(SyncDesc),
/// Add an item to the collection /// Add an item to the collection
Put(Parents, UniqueIdent, IndexEntry), Put(SyncDesc, IndexEntry),
/// Delete an item from the collection /// Delete an item from the collection
Delete(Parents, UniqueIdent), Delete(SyncDesc, BlobId),
}
impl DavDagOp {
pub fn token(&self) -> Token {
match self {
Self::Merge((_, t)) => *t,
Self::Put((_, t), _) => *t,
Self::Delete((_, t), _) => *t,
}
}
} }
impl DavDag { impl DavDag {
pub fn op_merge(&self, ident: UniqueIdent) -> DavDagOp { pub fn op_merge(&self) -> DavDagOp {
DavDagOp::Merge(self.heads_vec(), ident) DavDagOp::Merge(self.sync_desc())
} }
pub fn op_put(&self, ident: UniqueIdent, entry: IndexEntry) -> DavDagOp { pub fn op_put(&self, entry: IndexEntry) -> DavDagOp {
DavDagOp::Put(self.heads_vec(), ident, entry) DavDagOp::Put(self.sync_desc(), entry)
} }
pub fn op_delete(&self, ident: UniqueIdent) -> DavDagOp { pub fn op_delete(&self, ident: BlobId) -> DavDagOp {
DavDagOp::Delete(self.heads_vec(), ident) DavDagOp::Delete(self.sync_desc(), ident)
} }
// HELPER functions // HELPER functions
/// All HEAD events pub fn heads_vec(&self) -> Vec<Token> {
pub fn heads_vec(&self) -> Vec<UniqueIdent> {
self.heads.clone().into_iter().collect() self.heads.clone().into_iter().collect()
} }
/// A sync descriptor
pub fn sync_desc(&self) -> SyncDesc {
(self.heads_vec(), gen_ident())
}
/// Resolve a sync token /// Resolve a sync token
pub fn resolve(&self, known: UniqueIdent) -> Result<OrdSet<UniqueIdent>> { pub fn resolve(&self, known: UniqueIdent) -> Result<OrdSet<UniqueIdent>> {
let already_known = self.all_ancestors(known); let already_known = self.all_ancestors(known);
// We can't capture all missing events if we are not connected // We can't capture all missing events if we are not connected
// to all sinks of the graph, ie. if we don't already know all the sinks. // to all sinks of the graph,
// ie. if we don't already know all the sinks,
// ie. if we are missing so much history that
// the event log has been transformed into a checkpoint
if !self.origins.is_subset(already_known.clone()) { if !self.origins.is_subset(already_known.clone()) {
bail!("Not enough history to produce a correct diff, a full resync is needed"); bail!("Not enough history to produce a correct diff, a full resync is needed");
} }
// Missing items are all existing graph items from which // Missing items are *all existing graph items* from which
// we removed all items known by the given node. // we removed *all items known by the given node*.
// In other words, all values in all_nodes that are not in already_known. // In other words, all values in `all_nodes` that are not in `already_known`.
Ok(self.all_nodes.clone().relative_complement(already_known)) Ok(self.all_nodes.clone().relative_complement(already_known))
} }
/// Find all ancestors of a given /// Find all ancestors of a given node
fn all_ancestors(&self, known: UniqueIdent) -> OrdSet<UniqueIdent> { fn all_ancestors(&self, known: UniqueIdent) -> OrdSet<UniqueIdent> {
let mut all_known: OrdSet<UniqueIdent> = OrdSet::new(); let mut all_known: OrdSet<UniqueIdent> = OrdSet::new();
let mut to_collect = vec![known]; let mut to_collect = vec![known];
@ -111,21 +129,23 @@ impl DavDag {
// INTERNAL functions // INTERNAL functions
/// Register a WebDAV item (put, copy, move) /// Register a WebDAV item (put, copy, move)
fn register(&mut self, ident: UniqueIdent, entry: IndexEntry) { fn register(&mut self, entry: IndexEntry) {
let (blob_id, filename, _etag) = entry.clone();
// Insert item in the source of trust // Insert item in the source of trust
self.table.insert(ident, entry.clone()); self.table.insert(blob_id, entry);
// Update the cache // Update the cache
let (filename, _etag) = entry; self.idx_by_filename.insert(filename, blob_id);
self.idx_by_filename.insert(filename, ident);
} }
/// Unregister a WebDAV item (delete, move) /// Unregister a WebDAV item (delete, move)
fn unregister(&mut self, ident: &UniqueIdent) { fn unregister(&mut self, ident: &UniqueIdent) {
// Query the source of truth to get the information we // Query the source of truth to get the information we
// need to clean the indexes // need to clean the indexes
let (filename, _etag) = match self.table.get(ident) { let (_blob_id, filename, _etag) = match self.table.get(ident) {
Some(v) => v, Some(v) => v,
// Element does not exist, return early
None => return, None => return,
}; };
self.idx_by_filename.remove(filename); self.idx_by_filename.remove(filename);
@ -134,29 +154,9 @@ impl DavDag {
self.table.remove(ident); self.table.remove(ident);
} }
// @FIXME: maybe in case of error we could simply disable the sync graph
// and ask the client to rely on manual sync. For now, we are skipping the event
// which is midly satisfying.
/// When an event is processed, update the synchronization DAG /// When an event is processed, update the synchronization DAG
fn sync_dag(&mut self, child: &UniqueIdent, parents: &[UniqueIdent]) -> bool { fn sync_dag(&mut self, sync_desc: &SyncDesc) -> bool {
// --- Update SUCCESSORS let (parents, child) = sync_desc;
// All parents must exist in successors otherwise we can't accept item:
// do the check + update successors
let mut try_successors = self.successors.clone();
for par in parents.iter() {
match try_successors.get_mut(par) {
None => {
tracing::warn!("Unable to push a Dav DAG sync op into the graph, an event is missing, it's a bug");
return false
},
Some(v) => v.insert(*child),
};
}
self.successors = try_successors;
// This event is also a future successor
self.successors.insert(*child, ordset![]);
// --- Update ANCESTORS // --- Update ANCESTORS
// We register ancestors as it is required for the sync algorithm // We register ancestors as it is required for the sync algorithm
@ -178,6 +178,9 @@ impl DavDag {
// This event becomes a new HEAD in turn // This event becomes a new HEAD in turn
self.heads.insert(*child); self.heads.insert(*child);
// --- Update ALL NODES
self.all_nodes.insert(*child);
true true
} }
} }
@ -189,18 +192,18 @@ impl BayouState for DavDag {
let mut new = self.clone(); let mut new = self.clone();
match op { match op {
DavDagOp::Put(parents, ident, entry) => { DavDagOp::Put(sync_desc, entry) => {
if new.sync_dag(ident, parents.as_slice()) { if new.sync_dag(sync_desc) {
new.register(*ident, entry.clone()); new.register(entry.clone());
} }
}, },
DavDagOp::Delete(parents, ident) => { DavDagOp::Delete(sync_desc, blob_id) => {
if new.sync_dag(ident, parents.as_slice()) { if new.sync_dag(sync_desc) {
new.unregister(ident); new.unregister(blob_id);
} }
}, },
DavDagOp::Merge(parents, ident) => { DavDagOp::Merge(sync_desc) => {
new.sync_dag(ident, parents.as_slice()); new.sync_dag(sync_desc);
} }
} }
@ -211,7 +214,7 @@ impl BayouState for DavDag {
// CUSTOM SERIALIZATION & DESERIALIZATION // CUSTOM SERIALIZATION & DESERIALIZATION
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct DavDagSerializedRepr { struct DavDagSerializedRepr {
items: Vec<(UniqueIdent, IndexEntry)>, items: Vec<IndexEntry>,
heads: Vec<UniqueIdent>, heads: Vec<UniqueIdent>,
} }
@ -224,11 +227,10 @@ impl<'de> Deserialize<'de> for DavDag {
let mut davdag = DavDag::default(); let mut davdag = DavDag::default();
// Build the table + index // Build the table + index
val.items.into_iter().for_each(|(ident, entry)| davdag.register(ident, entry)); val.items.into_iter().for_each(|entry| davdag.register(entry));
// Initialize the synchronization DAG with its roots // Initialize the synchronization DAG with its roots
val.heads.into_iter().for_each(|ident| { val.heads.into_iter().for_each(|ident| {
davdag.successors.insert(ident, ordset![]);
davdag.heads.insert(ident); davdag.heads.insert(ident);
davdag.origins.insert(ident); davdag.origins.insert(ident);
davdag.all_nodes.insert(ident); davdag.all_nodes.insert(ident);
@ -244,7 +246,7 @@ impl Serialize for DavDag {
S: Serializer, S: Serializer,
{ {
// Indexes are rebuilt on the fly, we serialize only the core database // Indexes are rebuilt on the fly, we serialize only the core database
let items = self.table.iter().map(|(ident, entry)| (*ident, entry.clone())).collect(); let items = self.table.iter().map(|(_, entry)| entry.clone()).collect();
// We keep only the head entries from the sync graph, // We keep only the head entries from the sync graph,
// these entries will be used to initialize it back when deserializing // these entries will be used to initialize it back when deserializing
@ -255,3 +257,53 @@ impl Serialize for DavDag {
val.serialize(serializer) val.serialize(serializer)
} }
} }
// ---- TESTS ----
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn base() {
let mut state = DavDag::default();
// Add item 1
{
let m = UniqueIdent([0x01; 24]);
let ev = state.op_put((m, "cal.ics".into(), "321-321".into()));
state = state.apply(&ev);
assert_eq!(state.table.len(), 1);
assert_eq!(state.resolve(ev.token()).unwrap().len(), 0);
}
// Add 2 concurrent items
let (t1, t2) = {
let blob1 = UniqueIdent([0x02; 24]);
let ev1 = state.op_put((blob1, "cal2.ics".into(), "321-321".into()));
let blob2 = UniqueIdent([0x01; 24]);
let ev2 = state.op_delete(blob2);
state = state.apply(&ev1);
state = state.apply(&ev2);
assert_eq!(state.table.len(), 1);
assert_eq!(state.resolve(ev1.token()).unwrap(), ordset![ev2.token()]);
(ev1.token(), ev2.token())
};
// Add later a new item
{
let blob3 = UniqueIdent([0x03; 24]);
let ev = state.op_put((blob3, "cal3.ics".into(), "321-321".into()));
state = state.apply(&ev);
assert_eq!(state.table.len(), 2);
assert_eq!(state.resolve(ev.token()).unwrap().len(), 0);
assert_eq!(state.resolve(t1).unwrap(), ordset![t2, ev.token()]);
}
}
}