diff --git a/src/bayou.rs b/src/bayou.rs index 3201783..f95bd82 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -9,16 +9,12 @@ use serde::{Deserialize, Serialize}; use tokio::io::AsyncReadExt; use tokio::sync::{watch, Notify}; -use k2v_client::{BatchDeleteOp, BatchReadOp, CausalityToken, Filter, K2vClient, K2vValue}; -use rusoto_s3::{ - DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, -}; - use crate::cryptoblob::*; use crate::login::Credentials; use crate::timestamp::*; use crate::storage; + const KEEP_STATE_EVERY: usize = 64; // Checkpointing interval constants: a checkpoint is not made earlier @@ -61,7 +57,7 @@ pub struct Bayou { last_try_checkpoint: Option, watch: Arc, - last_sync_watch_ct: Option, + last_sync_watch_ct: storage::RowRef, } impl Bayou { @@ -69,6 +65,7 @@ impl Bayou { let k2v_client = creds.row_client()?; let s3_client = creds.blob_client()?; + let target = k2v_client.row(&path, WATCH_SK); let watch = K2vWatch::new(creds, path.clone(), WATCH_SK.to_string())?; Ok(Self { @@ -81,7 +78,7 @@ impl Bayou { last_sync: None, last_try_checkpoint: None, watch, - last_sync_watch_ct: None, + last_sync_watch_ct: target, }) } @@ -134,7 +131,7 @@ impl Bayou { // 3. List all operations starting from checkpoint let ts_ser = self.checkpoint.0.to_string(); debug!("(sync) looking up operations starting at {}", ts_ser); - let ops_map = self.k2v.select(storage::Selector::Range { begin: &ts_ser, end: WATCH_SK }).await?; + let ops_map = self.k2v.select(storage::Selector::Range { shard_key: &self.path, begin: &ts_ser, end: WATCH_SK }).await?; /*let ops_map = self .k2v .read_batch(&[BatchReadOp { @@ -158,18 +155,22 @@ impl Bayou { let mut ops = vec![]; for row_value in ops_map { - let ts = row_value.timestamp(); - if val.value.len() != 1 { - bail!("Invalid operation, has {} values", val.value.len()); + let row = row_value.to_ref(); + let sort_key = row.key().1; + let ts = sort_key.parse::().map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?; + + let val = row_value.content(); + if val.len() != 1 { + bail!("Invalid operation, has {} values", row_value.content().len()); } - match &val.value[0] { - K2vValue::Value(v) => { + match &val[0] { + storage::Alternative::Value(v) => { let op = open_deserialize::(v, &self.key)?; - debug!("(sync) operation {}: {} {:?}", tsstr, base64::encode(v), op); + debug!("(sync) operation {}: {} {:?}", sort_key, base64::encode(v), op); ops.push((ts, op)); } - K2vValue::Tombstone => { - unreachable!(); + storage::Alternative::Tombstone => { + continue; } } } @@ -372,13 +373,12 @@ impl Bayou { let cryptoblob = seal_serialize(&state_cp, &self.key)?; debug!("(cp) checkpoint body length: {}", cryptoblob.len()); - let por = PutObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/checkpoint/{}", self.path, ts_cp.to_string()), - body: Some(cryptoblob.into()), - ..Default::default() - }; - self.s3.put_object(por).await?; + self.s3 + .blob(format!("{}/checkpoint/{}", self.path, ts_cp.to_string()).as_str()) + .set_value(cryptoblob.into()) + .push() + .await?; + // Drop old checkpoints (but keep at least CHECKPOINTS_TO_KEEP of them) let ecp_len = existing_checkpoints.len(); @@ -388,25 +388,22 @@ impl Bayou { // Delete blobs for (_ts, key) in existing_checkpoints[..last_to_keep].iter() { debug!("(cp) drop old checkpoint {}", key); - let dor = DeleteObjectRequest { - bucket: self.bucket.clone(), - key: key.to_string(), - ..Default::default() - }; - self.s3.delete_object(dor).await?; + self.s3 + .blob(key) + .rm() + .await?; } // Delete corresponding range of operations let ts_ser = existing_checkpoints[last_to_keep].0.to_string(); self.k2v - .delete_batch(&[BatchDeleteOp { - partition_key: &self.path, - prefix: None, - start: None, - end: Some(&ts_ser), - single_item: false, - }]) + .rm(storage::Selector::Range{ + shard_key: &self.path, + begin: "", + end: &ts_ser + }) .await?; + } Ok(()) @@ -425,22 +422,14 @@ impl Bayou { async fn list_checkpoints(&self) -> Result> { let prefix = format!("{}/checkpoint/", self.path); - let lor = ListObjectsV2Request { - bucket: self.bucket.clone(), - max_keys: Some(1000), - prefix: Some(prefix.clone()), - ..Default::default() - }; - - let checkpoints_res = self.s3.list_objects_v2(lor).await?; + let checkpoints_res = self.s3.list(&prefix).await?; let mut checkpoints = vec![]; - for object in checkpoints_res.contents.unwrap_or_default() { - if let Some(key) = object.key { - if let Some(ckid) = key.strip_prefix(&prefix) { - if let Ok(ts) = ckid.parse::() { - checkpoints.push((ts, key)); - } + for object in checkpoints_res { + let key = object.key(); + if let Some(ckid) = key.strip_prefix(&prefix) { + if let Ok(ts) = ckid.parse::() { + checkpoints.push((ts, key.into())); } } } @@ -454,23 +443,25 @@ impl Bayou { struct K2vWatch { pk: String, sk: String, - rx: watch::Receiver>, + rx: watch::Receiver, notify: Notify, } impl K2vWatch { /// Creates a new watch and launches subordinate threads. /// These threads hold Weak pointers to the struct; - /// the exit when the Arc is dropped. + /// they exit when the Arc is dropped. fn new(creds: &Credentials, pk: String, sk: String) -> Result> { - let (tx, rx) = watch::channel::>(None); + let row_client = creds.row_client()?; + + let (tx, rx) = watch::channel::(row_client.row(&pk, &sk)); let notify = Notify::new(); let watch = Arc::new(K2vWatch { pk, sk, rx, notify }); tokio::spawn(Self::background_task( Arc::downgrade(&watch), - creds.k2v_client()?, + row_client, tx, )); @@ -479,41 +470,42 @@ impl K2vWatch { async fn background_task( self_weak: Weak, - k2v: K2vClient, - tx: watch::Sender>, + k2v: storage::RowStore, + tx: watch::Sender, ) { - let mut ct = None; + let mut row = match Weak::upgrade(&self_weak) { + Some(this) => k2v.row(&this.pk, &this.sk), + None => { + error!("can't start loop"); + return + }, + }; + while let Some(this) = Weak::upgrade(&self_weak) { debug!( - "bayou k2v watch bg loop iter ({}, {}): ct = {:?}", - this.pk, this.sk, ct + "bayou k2v watch bg loop iter ({}, {})", + this.pk, this.sk ); tokio::select!( _ = tokio::time::sleep(Duration::from_secs(60)) => continue, - update = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => { + update = row.poll() => { + //update = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => { match update { Err(e) => { error!("Error in bayou k2v wait value changed: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } - Ok(cv) => { - if tx.send(Some(cv.causality.clone())).is_err() { + Ok(new_value) => { + row = new_value.to_ref(); + if tx.send(XXX).is_err() { break; } - ct = Some(cv.causality); } } } _ = this.notify.notified() => { let rand = u128::to_be_bytes(thread_rng().gen()).to_vec(); - if let Err(e) = k2v - .insert_item( - &this.pk, - &this.sk, - rand, - ct.clone(), - ) - .await + if let Err(e) = row.set_value(rand).push().await { error!("Error in bayou k2v watch updater loop: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index c3a9390..9899ae8 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -81,7 +81,7 @@ async fn incoming_mail_watch_process_internal( tokio::select! { inc_k = wait_new_mail => Some(inc_k), - _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()), + _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key), _ = lock_held.changed() => None, _ = rx_inbox_id.changed() => None, } diff --git a/src/storage/garage.rs b/src/storage/garage.rs index 0abeb4d..aef9a0d 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -28,11 +28,18 @@ impl IRowStore for GrgStore { fn select(&self, selector: Selector) -> AsyncResult> { unimplemented!(); } + + fn rm(&self, selector: Selector) -> AsyncResult<()> { + unimplemented!(); + } } impl IRowRef for GrgRef { - fn clone_boxed(&self) -> RowRef { + /*fn clone_boxed(&self) -> RowRef { unimplemented!(); + }*/ + fn to_orphan(&self) -> RowRefOrphan { + unimplemented!() } fn key(&self) -> (&str, &str) { diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs index 8db4eff..a4436e6 100644 --- a/src/storage/in_memory.rs +++ b/src/storage/in_memory.rs @@ -29,6 +29,10 @@ impl IRowStore for MemStore { fn select(&self, selector: Selector) -> AsyncResult> { unimplemented!(); } + + fn rm(&self, selector: Selector) -> AsyncResult<()> { + unimplemented!(); + } } impl IRowRef for MemRef { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index c3bf19f..2e3c0ee 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -21,9 +21,9 @@ pub enum Alternative { type ConcurrentValues = Vec; pub enum Selector<'a> { - Range { begin: &'a str, end: &'a str }, - List (Vec<(&'a str, &'a str)>), - Prefix (&'a str), + Range { shard_key: &'a str, begin: &'a str, end: &'a str }, + List (Vec<(&'a str, &'a str)>), // list of (shard_key, sort_key) + Prefix { shard_key: &'a str, prefix: &'a str }, } #[derive(Debug)] @@ -80,12 +80,14 @@ pub trait IRowStore { fn row(&self, partition: &str, sort: &str) -> RowRef; fn select(&self, selector: Selector) -> AsyncResult>; + fn rm(&self, selector: Selector) -> AsyncResult<()>; } pub type RowStore = Box; pub trait IRowRef { - fn clone_boxed(&self) -> RowRef; + /*fn clone_boxed(&self) -> RowRef;*/ + fn to_orphan(&self) -> RowRefOrphan; fn key(&self) -> (&str, &str); fn set_value(&self, content: Vec) -> RowValue; fn fetch(&self) -> AsyncResult; @@ -93,11 +95,17 @@ pub trait IRowRef fn poll(&self) -> AsyncResult; } pub type RowRef = Box; -impl Clone for RowRef { +/*impl Clone for RowRef { fn clone(&self) -> Self { return self.clone_boxed() } +}*/ + +pub trait IRowRefOrphan +{ + fn attach(&self, store: &RowStore) -> RowRef; } +pub type RowRefOrphan = Box; pub trait IRowValue {