not very clear how we pass data across channel

This commit is contained in:
Quentin 2023-11-16 18:27:24 +01:00
parent 916b27d87e
commit 6da8b815b6
Signed by: quentin
GPG key ID: E9602264D639FF68
5 changed files with 89 additions and 78 deletions

View file

@ -9,16 +9,12 @@ use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::sync::{watch, Notify}; use tokio::sync::{watch, Notify};
use k2v_client::{BatchDeleteOp, BatchReadOp, CausalityToken, Filter, K2vClient, K2vValue};
use rusoto_s3::{
DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
};
use crate::cryptoblob::*; use crate::cryptoblob::*;
use crate::login::Credentials; use crate::login::Credentials;
use crate::timestamp::*; use crate::timestamp::*;
use crate::storage; use crate::storage;
const KEEP_STATE_EVERY: usize = 64; const KEEP_STATE_EVERY: usize = 64;
// Checkpointing interval constants: a checkpoint is not made earlier // Checkpointing interval constants: a checkpoint is not made earlier
@ -61,7 +57,7 @@ pub struct Bayou<S: BayouState> {
last_try_checkpoint: Option<Instant>, last_try_checkpoint: Option<Instant>,
watch: Arc<K2vWatch>, watch: Arc<K2vWatch>,
last_sync_watch_ct: Option<CausalityToken>, last_sync_watch_ct: storage::RowRef,
} }
impl<S: BayouState> Bayou<S> { impl<S: BayouState> Bayou<S> {
@ -69,6 +65,7 @@ impl<S: BayouState> Bayou<S> {
let k2v_client = creds.row_client()?; let k2v_client = creds.row_client()?;
let s3_client = creds.blob_client()?; let s3_client = creds.blob_client()?;
let target = k2v_client.row(&path, WATCH_SK);
let watch = K2vWatch::new(creds, path.clone(), WATCH_SK.to_string())?; let watch = K2vWatch::new(creds, path.clone(), WATCH_SK.to_string())?;
Ok(Self { Ok(Self {
@ -81,7 +78,7 @@ impl<S: BayouState> Bayou<S> {
last_sync: None, last_sync: None,
last_try_checkpoint: None, last_try_checkpoint: None,
watch, watch,
last_sync_watch_ct: None, last_sync_watch_ct: target,
}) })
} }
@ -134,7 +131,7 @@ impl<S: BayouState> Bayou<S> {
// 3. List all operations starting from checkpoint // 3. List all operations starting from checkpoint
let ts_ser = self.checkpoint.0.to_string(); let ts_ser = self.checkpoint.0.to_string();
debug!("(sync) looking up operations starting at {}", ts_ser); debug!("(sync) looking up operations starting at {}", ts_ser);
let ops_map = self.k2v.select(storage::Selector::Range { begin: &ts_ser, end: WATCH_SK }).await?; let ops_map = self.k2v.select(storage::Selector::Range { shard_key: &self.path, begin: &ts_ser, end: WATCH_SK }).await?;
/*let ops_map = self /*let ops_map = self
.k2v .k2v
.read_batch(&[BatchReadOp { .read_batch(&[BatchReadOp {
@ -158,18 +155,22 @@ impl<S: BayouState> Bayou<S> {
let mut ops = vec![]; let mut ops = vec![];
for row_value in ops_map { for row_value in ops_map {
let ts = row_value.timestamp(); let row = row_value.to_ref();
if val.value.len() != 1 { let sort_key = row.key().1;
bail!("Invalid operation, has {} values", val.value.len()); let ts = sort_key.parse::<Timestamp>().map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?;
let val = row_value.content();
if val.len() != 1 {
bail!("Invalid operation, has {} values", row_value.content().len());
} }
match &val.value[0] { match &val[0] {
K2vValue::Value(v) => { storage::Alternative::Value(v) => {
let op = open_deserialize::<S::Op>(v, &self.key)?; let op = open_deserialize::<S::Op>(v, &self.key)?;
debug!("(sync) operation {}: {} {:?}", tsstr, base64::encode(v), op); debug!("(sync) operation {}: {} {:?}", sort_key, base64::encode(v), op);
ops.push((ts, op)); ops.push((ts, op));
} }
K2vValue::Tombstone => { storage::Alternative::Tombstone => {
unreachable!(); continue;
} }
} }
} }
@ -372,13 +373,12 @@ impl<S: BayouState> Bayou<S> {
let cryptoblob = seal_serialize(&state_cp, &self.key)?; let cryptoblob = seal_serialize(&state_cp, &self.key)?;
debug!("(cp) checkpoint body length: {}", cryptoblob.len()); debug!("(cp) checkpoint body length: {}", cryptoblob.len());
let por = PutObjectRequest { self.s3
bucket: self.bucket.clone(), .blob(format!("{}/checkpoint/{}", self.path, ts_cp.to_string()).as_str())
key: format!("{}/checkpoint/{}", self.path, ts_cp.to_string()), .set_value(cryptoblob.into())
body: Some(cryptoblob.into()), .push()
..Default::default() .await?;
};
self.s3.put_object(por).await?;
// Drop old checkpoints (but keep at least CHECKPOINTS_TO_KEEP of them) // Drop old checkpoints (but keep at least CHECKPOINTS_TO_KEEP of them)
let ecp_len = existing_checkpoints.len(); let ecp_len = existing_checkpoints.len();
@ -388,25 +388,22 @@ impl<S: BayouState> Bayou<S> {
// Delete blobs // Delete blobs
for (_ts, key) in existing_checkpoints[..last_to_keep].iter() { for (_ts, key) in existing_checkpoints[..last_to_keep].iter() {
debug!("(cp) drop old checkpoint {}", key); debug!("(cp) drop old checkpoint {}", key);
let dor = DeleteObjectRequest { self.s3
bucket: self.bucket.clone(), .blob(key)
key: key.to_string(), .rm()
..Default::default() .await?;
};
self.s3.delete_object(dor).await?;
} }
// Delete corresponding range of operations // Delete corresponding range of operations
let ts_ser = existing_checkpoints[last_to_keep].0.to_string(); let ts_ser = existing_checkpoints[last_to_keep].0.to_string();
self.k2v self.k2v
.delete_batch(&[BatchDeleteOp { .rm(storage::Selector::Range{
partition_key: &self.path, shard_key: &self.path,
prefix: None, begin: "",
start: None, end: &ts_ser
end: Some(&ts_ser), })
single_item: false,
}])
.await?; .await?;
} }
Ok(()) Ok(())
@ -425,22 +422,14 @@ impl<S: BayouState> Bayou<S> {
async fn list_checkpoints(&self) -> Result<Vec<(Timestamp, String)>> { async fn list_checkpoints(&self) -> Result<Vec<(Timestamp, String)>> {
let prefix = format!("{}/checkpoint/", self.path); let prefix = format!("{}/checkpoint/", self.path);
let lor = ListObjectsV2Request { let checkpoints_res = self.s3.list(&prefix).await?;
bucket: self.bucket.clone(),
max_keys: Some(1000),
prefix: Some(prefix.clone()),
..Default::default()
};
let checkpoints_res = self.s3.list_objects_v2(lor).await?;
let mut checkpoints = vec![]; let mut checkpoints = vec![];
for object in checkpoints_res.contents.unwrap_or_default() { for object in checkpoints_res {
if let Some(key) = object.key { let key = object.key();
if let Some(ckid) = key.strip_prefix(&prefix) { if let Some(ckid) = key.strip_prefix(&prefix) {
if let Ok(ts) = ckid.parse::<Timestamp>() { if let Ok(ts) = ckid.parse::<Timestamp>() {
checkpoints.push((ts, key)); checkpoints.push((ts, key.into()));
}
} }
} }
} }
@ -454,23 +443,25 @@ impl<S: BayouState> Bayou<S> {
struct K2vWatch { struct K2vWatch {
pk: String, pk: String,
sk: String, sk: String,
rx: watch::Receiver<Option<CausalityToken>>, rx: watch::Receiver<storage::RowRef>,
notify: Notify, notify: Notify,
} }
impl K2vWatch { impl K2vWatch {
/// Creates a new watch and launches subordinate threads. /// Creates a new watch and launches subordinate threads.
/// These threads hold Weak pointers to the struct; /// These threads hold Weak pointers to the struct;
/// the exit when the Arc is dropped. /// they exit when the Arc is dropped.
fn new(creds: &Credentials, pk: String, sk: String) -> Result<Arc<Self>> { fn new(creds: &Credentials, pk: String, sk: String) -> Result<Arc<Self>> {
let (tx, rx) = watch::channel::<Option<CausalityToken>>(None); let row_client = creds.row_client()?;
let (tx, rx) = watch::channel::<storage::RowRef>(row_client.row(&pk, &sk));
let notify = Notify::new(); let notify = Notify::new();
let watch = Arc::new(K2vWatch { pk, sk, rx, notify }); let watch = Arc::new(K2vWatch { pk, sk, rx, notify });
tokio::spawn(Self::background_task( tokio::spawn(Self::background_task(
Arc::downgrade(&watch), Arc::downgrade(&watch),
creds.k2v_client()?, row_client,
tx, tx,
)); ));
@ -479,41 +470,42 @@ impl K2vWatch {
async fn background_task( async fn background_task(
self_weak: Weak<Self>, self_weak: Weak<Self>,
k2v: K2vClient, k2v: storage::RowStore,
tx: watch::Sender<Option<CausalityToken>>, tx: watch::Sender<storage::RowRef>,
) { ) {
let mut ct = None; let mut row = match Weak::upgrade(&self_weak) {
Some(this) => k2v.row(&this.pk, &this.sk),
None => {
error!("can't start loop");
return
},
};
while let Some(this) = Weak::upgrade(&self_weak) { while let Some(this) = Weak::upgrade(&self_weak) {
debug!( debug!(
"bayou k2v watch bg loop iter ({}, {}): ct = {:?}", "bayou k2v watch bg loop iter ({}, {})",
this.pk, this.sk, ct this.pk, this.sk
); );
tokio::select!( tokio::select!(
_ = tokio::time::sleep(Duration::from_secs(60)) => continue, _ = tokio::time::sleep(Duration::from_secs(60)) => continue,
update = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => { update = row.poll() => {
//update = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => {
match update { match update {
Err(e) => { Err(e) => {
error!("Error in bayou k2v wait value changed: {}", e); error!("Error in bayou k2v wait value changed: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await; tokio::time::sleep(Duration::from_secs(30)).await;
} }
Ok(cv) => { Ok(new_value) => {
if tx.send(Some(cv.causality.clone())).is_err() { row = new_value.to_ref();
if tx.send(XXX).is_err() {
break; break;
} }
ct = Some(cv.causality);
} }
} }
} }
_ = this.notify.notified() => { _ = this.notify.notified() => {
let rand = u128::to_be_bytes(thread_rng().gen()).to_vec(); let rand = u128::to_be_bytes(thread_rng().gen()).to_vec();
if let Err(e) = k2v if let Err(e) = row.set_value(rand).push().await
.insert_item(
&this.pk,
&this.sk,
rand,
ct.clone(),
)
.await
{ {
error!("Error in bayou k2v watch updater loop: {}", e); error!("Error in bayou k2v watch updater loop: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await; tokio::time::sleep(Duration::from_secs(30)).await;

View file

@ -81,7 +81,7 @@ async fn incoming_mail_watch_process_internal(
tokio::select! { tokio::select! {
inc_k = wait_new_mail => Some(inc_k), inc_k = wait_new_mail => Some(inc_k),
_ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()), _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key),
_ = lock_held.changed() => None, _ = lock_held.changed() => None,
_ = rx_inbox_id.changed() => None, _ = rx_inbox_id.changed() => None,
} }

View file

@ -28,11 +28,18 @@ impl IRowStore for GrgStore {
fn select(&self, selector: Selector) -> AsyncResult<Vec<RowValue>> { fn select(&self, selector: Selector) -> AsyncResult<Vec<RowValue>> {
unimplemented!(); unimplemented!();
} }
fn rm(&self, selector: Selector) -> AsyncResult<()> {
unimplemented!();
}
} }
impl IRowRef for GrgRef { impl IRowRef for GrgRef {
fn clone_boxed(&self) -> RowRef { /*fn clone_boxed(&self) -> RowRef {
unimplemented!(); unimplemented!();
}*/
fn to_orphan(&self) -> RowRefOrphan {
unimplemented!()
} }
fn key(&self) -> (&str, &str) { fn key(&self) -> (&str, &str) {

View file

@ -29,6 +29,10 @@ impl IRowStore for MemStore {
fn select(&self, selector: Selector) -> AsyncResult<Vec<RowValue>> { fn select(&self, selector: Selector) -> AsyncResult<Vec<RowValue>> {
unimplemented!(); unimplemented!();
} }
fn rm(&self, selector: Selector) -> AsyncResult<()> {
unimplemented!();
}
} }
impl IRowRef for MemRef { impl IRowRef for MemRef {

View file

@ -21,9 +21,9 @@ pub enum Alternative {
type ConcurrentValues = Vec<Alternative>; type ConcurrentValues = Vec<Alternative>;
pub enum Selector<'a> { pub enum Selector<'a> {
Range { begin: &'a str, end: &'a str }, Range { shard_key: &'a str, begin: &'a str, end: &'a str },
List (Vec<(&'a str, &'a str)>), List (Vec<(&'a str, &'a str)>), // list of (shard_key, sort_key)
Prefix (&'a str), Prefix { shard_key: &'a str, prefix: &'a str },
} }
#[derive(Debug)] #[derive(Debug)]
@ -80,12 +80,14 @@ pub trait IRowStore
{ {
fn row(&self, partition: &str, sort: &str) -> RowRef; fn row(&self, partition: &str, sort: &str) -> RowRef;
fn select(&self, selector: Selector) -> AsyncResult<Vec<RowValue>>; fn select(&self, selector: Selector) -> AsyncResult<Vec<RowValue>>;
fn rm(&self, selector: Selector) -> AsyncResult<()>;
} }
pub type RowStore = Box<dyn IRowStore + Sync + Send>; pub type RowStore = Box<dyn IRowStore + Sync + Send>;
pub trait IRowRef pub trait IRowRef
{ {
fn clone_boxed(&self) -> RowRef; /*fn clone_boxed(&self) -> RowRef;*/
fn to_orphan(&self) -> RowRefOrphan;
fn key(&self) -> (&str, &str); fn key(&self) -> (&str, &str);
fn set_value(&self, content: Vec<u8>) -> RowValue; fn set_value(&self, content: Vec<u8>) -> RowValue;
fn fetch(&self) -> AsyncResult<RowValue>; fn fetch(&self) -> AsyncResult<RowValue>;
@ -93,11 +95,17 @@ pub trait IRowRef
fn poll(&self) -> AsyncResult<RowValue>; fn poll(&self) -> AsyncResult<RowValue>;
} }
pub type RowRef = Box<dyn IRowRef + Send + Sync>; pub type RowRef = Box<dyn IRowRef + Send + Sync>;
impl Clone for RowRef { /*impl Clone for RowRef {
fn clone(&self) -> Self { fn clone(&self) -> Self {
return self.clone_boxed() return self.clone_boxed()
} }
}*/
pub trait IRowRefOrphan
{
fn attach(&self, store: &RowStore) -> RowRef;
} }
pub type RowRefOrphan = Box<dyn IRowRefOrphan + Send + Sync>;
pub trait IRowValue pub trait IRowValue
{ {