Sync algorithm

This commit is contained in:
Quentin 2024-03-27 15:09:18 +01:00
parent 0b57200eeb
commit a146a0babc
Signed by: quentin
GPG key ID: E9602264D639FF68

View file

@ -1,3 +1,4 @@
use anyhow::{bail, Result};
use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde::{Deserialize, Deserializer, Serialize, Serializer};
use im::{OrdMap, OrdSet, ordset}; use im::{OrdMap, OrdSet, ordset};
@ -23,9 +24,14 @@ pub struct DavDag {
/// Partial synchronization graph /// Partial synchronization graph
/// parent -> direct children /// parent -> direct children
pub successors: OrdMap<UniqueIdent, OrdSet<UniqueIdent>>, pub successors: OrdMap<UniqueIdent, OrdSet<UniqueIdent>>,
pub ancestors: OrdMap<UniqueIdent, OrdSet<UniqueIdent>>,
/// All nodes
pub all_nodes: OrdSet<UniqueIdent>,
/// Head nodes /// Head nodes
pub heads: OrdSet<UniqueIdent>, pub heads: OrdSet<UniqueIdent>,
/// Origin nodes
pub origins: OrdSet<UniqueIdent>,
} }
#[derive(Clone, Serialize, Deserialize, Debug)] #[derive(Clone, Serialize, Deserialize, Debug)]
@ -54,11 +60,57 @@ impl DavDag {
} }
// HELPER functions // HELPER functions
/// All HEAD events
pub fn heads_vec(&self) -> Vec<UniqueIdent> { pub fn heads_vec(&self) -> Vec<UniqueIdent> {
self.heads.clone().into_iter().collect() self.heads.clone().into_iter().collect()
} }
/// Resolve a sync token
pub fn resolve(&self, known: UniqueIdent) -> Result<OrdSet<UniqueIdent>> {
let already_known = self.all_ancestors(known);
// We can't capture all missing events if we are not connected
// to all sinks of the graph, ie. if we don't already know all the sinks.
if !self.origins.is_subset(already_known.clone()) {
bail!("Not enough history to produce a correct diff, a full resync is needed");
}
// Missing items are all existing graph items from which
// we removed all items known by the given node.
// In other words, all values in all_nodes that are not in already_known.
Ok(self.all_nodes.clone().relative_complement(already_known))
}
/// Find all ancestors of a given
fn all_ancestors(&self, known: UniqueIdent) -> OrdSet<UniqueIdent> {
let mut all_known: OrdSet<UniqueIdent> = OrdSet::new();
let mut to_collect = vec![known];
loop {
let cursor = match to_collect.pop() {
// Loop stops here
None => break,
Some(v) => v,
};
if all_known.insert(cursor).is_some() {
// Item already processed
continue
}
// Collect parents
let parents = match self.ancestors.get(&cursor) {
None => continue,
Some(c) => c,
};
to_collect.extend(parents.iter());
}
all_known
}
// INTERNAL functions // INTERNAL functions
/// Register a WebDAV item (put, copy, move)
fn register(&mut self, ident: UniqueIdent, entry: IndexEntry) { fn register(&mut self, ident: UniqueIdent, entry: IndexEntry) {
// Insert item in the source of trust // Insert item in the source of trust
self.table.insert(ident, entry.clone()); self.table.insert(ident, entry.clone());
@ -68,6 +120,7 @@ impl DavDag {
self.idx_by_filename.insert(filename, ident); self.idx_by_filename.insert(filename, ident);
} }
/// Unregister a WebDAV item (delete, move)
fn unregister(&mut self, ident: &UniqueIdent) { fn unregister(&mut self, ident: &UniqueIdent) {
// Query the source of truth to get the information we // Query the source of truth to get the information we
// need to clean the indexes // need to clean the indexes
@ -84,8 +137,11 @@ impl DavDag {
// @FIXME: maybe in case of error we could simply disable the sync graph // @FIXME: maybe in case of error we could simply disable the sync graph
// and ask the client to rely on manual sync. For now, we are skipping the event // and ask the client to rely on manual sync. For now, we are skipping the event
// which is midly satisfying. // which is midly satisfying.
/// When an event is processed, update the synchronization DAG
fn sync_dag(&mut self, child: &UniqueIdent, parents: &[UniqueIdent]) -> bool { fn sync_dag(&mut self, child: &UniqueIdent, parents: &[UniqueIdent]) -> bool {
// All parents must exist in successors otherwise we can't accept item // --- Update SUCCESSORS
// All parents must exist in successors otherwise we can't accept item:
// do the check + update successors // do the check + update successors
let mut try_successors = self.successors.clone(); let mut try_successors = self.successors.clone();
for par in parents.iter() { for par in parents.iter() {
@ -99,15 +155,29 @@ impl DavDag {
} }
self.successors = try_successors; self.successors = try_successors;
// This event is also a future successor
self.successors.insert(*child, ordset![]);
// --- Update ANCESTORS
// We register ancestors as it is required for the sync algorithm
self.ancestors.insert(*child, parents.iter().fold(ordset![], |mut acc, p| {
acc.insert(*p);
acc
}));
// --- Update ORIGINS
// If this event has no parents, it's an origin
if parents.is_empty() {
self.origins.insert(*child);
}
// --- Update HEADS
// Remove from HEADS this event's parents // Remove from HEADS this event's parents
parents.iter().for_each(|par| { self.heads.remove(par); }); parents.iter().for_each(|par| { self.heads.remove(par); });
// This event becomes a new HEAD in turn // This event becomes a new HEAD in turn
self.heads.insert(*child); self.heads.insert(*child);
// This event is also a future successor
self.successors.insert(*child, ordset![]);
true true
} }
} }
@ -160,6 +230,8 @@ impl<'de> Deserialize<'de> for DavDag {
val.heads.into_iter().for_each(|ident| { val.heads.into_iter().for_each(|ident| {
davdag.successors.insert(ident, ordset![]); davdag.successors.insert(ident, ordset![]);
davdag.heads.insert(ident); davdag.heads.insert(ident);
davdag.origins.insert(ident);
davdag.all_nodes.insert(ident);
}); });
Ok(davdag) Ok(davdag)