From a146a0babc25547f269c784e090e308fa831ab32 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Mar 2024 15:09:18 +0100 Subject: [PATCH] Sync algorithm --- aero-collections/src/davdag.rs | 82 +++++++++++++++++++++++++++++++--- 1 file changed, 77 insertions(+), 5 deletions(-) diff --git a/aero-collections/src/davdag.rs b/aero-collections/src/davdag.rs index 696b985..59dcc7b 100644 --- a/aero-collections/src/davdag.rs +++ b/aero-collections/src/davdag.rs @@ -1,3 +1,4 @@ +use anyhow::{bail, Result}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use im::{OrdMap, OrdSet, ordset}; @@ -23,9 +24,14 @@ pub struct DavDag { /// Partial synchronization graph /// parent -> direct children pub successors: OrdMap>, - + pub ancestors: OrdMap>, + + /// All nodes + pub all_nodes: OrdSet, /// Head nodes pub heads: OrdSet, + /// Origin nodes + pub origins: OrdSet, } #[derive(Clone, Serialize, Deserialize, Debug)] @@ -54,11 +60,57 @@ impl DavDag { } // HELPER functions + + /// All HEAD events pub fn heads_vec(&self) -> Vec { self.heads.clone().into_iter().collect() } + /// Resolve a sync token + pub fn resolve(&self, known: UniqueIdent) -> Result> { + let already_known = self.all_ancestors(known); + + // We can't capture all missing events if we are not connected + // to all sinks of the graph, ie. if we don't already know all the sinks. + if !self.origins.is_subset(already_known.clone()) { + bail!("Not enough history to produce a correct diff, a full resync is needed"); + } + + // Missing items are all existing graph items from which + // we removed all items known by the given node. + // In other words, all values in all_nodes that are not in already_known. + Ok(self.all_nodes.clone().relative_complement(already_known)) + } + + /// Find all ancestors of a given + fn all_ancestors(&self, known: UniqueIdent) -> OrdSet { + let mut all_known: OrdSet = OrdSet::new(); + let mut to_collect = vec![known]; + loop { + let cursor = match to_collect.pop() { + // Loop stops here + None => break, + Some(v) => v, + }; + + if all_known.insert(cursor).is_some() { + // Item already processed + continue + } + + // Collect parents + let parents = match self.ancestors.get(&cursor) { + None => continue, + Some(c) => c, + }; + to_collect.extend(parents.iter()); + } + all_known + } + // INTERNAL functions + + /// Register a WebDAV item (put, copy, move) fn register(&mut self, ident: UniqueIdent, entry: IndexEntry) { // Insert item in the source of trust self.table.insert(ident, entry.clone()); @@ -68,6 +120,7 @@ impl DavDag { self.idx_by_filename.insert(filename, ident); } + /// Unregister a WebDAV item (delete, move) fn unregister(&mut self, ident: &UniqueIdent) { // Query the source of truth to get the information we // need to clean the indexes @@ -84,8 +137,11 @@ impl DavDag { // @FIXME: maybe in case of error we could simply disable the sync graph // and ask the client to rely on manual sync. For now, we are skipping the event // which is midly satisfying. + + /// When an event is processed, update the synchronization DAG fn sync_dag(&mut self, child: &UniqueIdent, parents: &[UniqueIdent]) -> bool { - // All parents must exist in successors otherwise we can't accept item + // --- Update SUCCESSORS + // All parents must exist in successors otherwise we can't accept item: // do the check + update successors let mut try_successors = self.successors.clone(); for par in parents.iter() { @@ -99,15 +155,29 @@ impl DavDag { } self.successors = try_successors; + // This event is also a future successor + self.successors.insert(*child, ordset![]); + + // --- Update ANCESTORS + // We register ancestors as it is required for the sync algorithm + self.ancestors.insert(*child, parents.iter().fold(ordset![], |mut acc, p| { + acc.insert(*p); + acc + })); + + // --- Update ORIGINS + // If this event has no parents, it's an origin + if parents.is_empty() { + self.origins.insert(*child); + } + + // --- Update HEADS // Remove from HEADS this event's parents parents.iter().for_each(|par| { self.heads.remove(par); }); // This event becomes a new HEAD in turn self.heads.insert(*child); - // This event is also a future successor - self.successors.insert(*child, ordset![]); - true } } @@ -160,6 +230,8 @@ impl<'de> Deserialize<'de> for DavDag { val.heads.into_iter().for_each(|ident| { davdag.successors.insert(ident, ordset![]); davdag.heads.insert(ident); + davdag.origins.insert(ident); + davdag.all_nodes.insert(ident); }); Ok(davdag)