in-memory storage #32
4 changed files with 8 additions and 13 deletions
15
src/bayou.rs
15
src/bayou.rs
|
@ -1,4 +1,3 @@
|
||||||
use std::str::FromStr;
|
|
||||||
use std::sync::{Arc, Weak};
|
use std::sync::{Arc, Weak};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
@ -6,7 +5,6 @@ use anyhow::{anyhow, bail, Result};
|
||||||
use log::{debug, error, info};
|
use log::{debug, error, info};
|
||||||
use rand::prelude::*;
|
use rand::prelude::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::io::AsyncReadExt;
|
|
||||||
use tokio::sync::{watch, Notify};
|
use tokio::sync::{watch, Notify};
|
||||||
|
|
||||||
use crate::cryptoblob::*;
|
use crate::cryptoblob::*;
|
||||||
|
@ -233,7 +231,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
|
|
||||||
// Save info that sync has been done
|
// Save info that sync has been done
|
||||||
self.last_sync = new_last_sync;
|
self.last_sync = new_last_sync;
|
||||||
self.last_sync_watch_ct = new_last_sync_watch_ct;
|
self.last_sync_watch_ct = self.k2v.from_orphan(new_last_sync_watch_ct).expect("Source & target storage must be compatible");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -245,7 +243,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
Some(t) => Instant::now() > t + (CHECKPOINT_INTERVAL / 5),
|
Some(t) => Instant::now() > t + (CHECKPOINT_INTERVAL / 5),
|
||||||
_ => true,
|
_ => true,
|
||||||
};
|
};
|
||||||
let changed = self.last_sync_watch_ct != *self.watch.rx.borrow();
|
let changed = self.last_sync_watch_ct.to_orphan() != *self.watch.rx.borrow();
|
||||||
if too_old || changed {
|
if too_old || changed {
|
||||||
self.sync().await?;
|
self.sync().await?;
|
||||||
}
|
}
|
||||||
|
@ -266,12 +264,9 @@ impl<S: BayouState> Bayou<S> {
|
||||||
.unwrap_or(&self.checkpoint.0),
|
.unwrap_or(&self.checkpoint.0),
|
||||||
);
|
);
|
||||||
self.k2v
|
self.k2v
|
||||||
.insert_item(
|
.row(&self.path, &ts.to_string())
|
||||||
&self.path,
|
.set_value(seal_serialize(&op, &self.key)?)
|
||||||
&ts.to_string(),
|
.push()
|
||||||
seal_serialize(&op, &self.key)?,
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
self.watch.notify.notify_one();
|
self.watch.notify.notify_one();
|
||||||
|
|
|
@ -6,7 +6,7 @@ pub struct GrgStore {}
|
||||||
pub struct GrgRef {}
|
pub struct GrgRef {}
|
||||||
pub struct GrgValue {}
|
pub struct GrgValue {}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
pub struct GrgOrphanRowRef {}
|
pub struct GrgOrphanRowRef {}
|
||||||
|
|
||||||
impl IBuilders for GrgCreds {
|
impl IBuilders for GrgCreds {
|
||||||
|
|
|
@ -7,7 +7,7 @@ pub struct MemStore {}
|
||||||
pub struct MemRef {}
|
pub struct MemRef {}
|
||||||
pub struct MemValue {}
|
pub struct MemValue {}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
pub struct MemOrphanRowRef {}
|
pub struct MemOrphanRowRef {}
|
||||||
|
|
||||||
impl IBuilders for FullMem {
|
impl IBuilders for FullMem {
|
||||||
|
|
|
@ -20,7 +20,7 @@ pub enum Alternative {
|
||||||
}
|
}
|
||||||
type ConcurrentValues = Vec<Alternative>;
|
type ConcurrentValues = Vec<Alternative>;
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
pub enum OrphanRowRef {
|
pub enum OrphanRowRef {
|
||||||
Garage(garage::GrgOrphanRowRef),
|
Garage(garage::GrgOrphanRowRef),
|
||||||
Memory(in_memory::MemOrphanRowRef),
|
Memory(in_memory::MemOrphanRowRef),
|
||||||
|
|
Loading…
Reference in a new issue