Implement ToString and FromStr for bayou timestamp
This commit is contained in:
parent
dd62efa24c
commit
0700e27127
2 changed files with 24 additions and 16 deletions
42
src/bayou.rs
42
src/bayou.rs
|
@ -1,3 +1,4 @@
|
||||||
|
use std::str::FromStr;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use anyhow::{anyhow, bail, Result};
|
use anyhow::{anyhow, bail, Result};
|
||||||
|
@ -123,7 +124,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// 3. List all operations starting from checkpoint
|
// 3. List all operations starting from checkpoint
|
||||||
let ts_ser = self.checkpoint.0.serialize();
|
let ts_ser = self.checkpoint.0.to_string();
|
||||||
debug!("(sync) looking up operations starting at {}", ts_ser);
|
debug!("(sync) looking up operations starting at {}", ts_ser);
|
||||||
let ops_map = self
|
let ops_map = self
|
||||||
.k2v
|
.k2v
|
||||||
|
@ -148,8 +149,9 @@ impl<S: BayouState> Bayou<S> {
|
||||||
|
|
||||||
let mut ops = vec![];
|
let mut ops = vec![];
|
||||||
for (tsstr, val) in ops_map {
|
for (tsstr, val) in ops_map {
|
||||||
let ts = Timestamp::parse(&tsstr)
|
let ts = tsstr
|
||||||
.ok_or(anyhow!("Invalid operation timestamp: {}", tsstr))?;
|
.parse::<Timestamp>()
|
||||||
|
.map_err(|_| anyhow!("Invalid operation timestamp: {}", tsstr))?;
|
||||||
if val.value.len() != 1 {
|
if val.value.len() != 1 {
|
||||||
bail!("Invalid operation, has {} values", val.value.len());
|
bail!("Invalid operation, has {} values", val.value.len());
|
||||||
}
|
}
|
||||||
|
@ -251,7 +253,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
self.k2v
|
self.k2v
|
||||||
.insert_item(
|
.insert_item(
|
||||||
&self.path,
|
&self.path,
|
||||||
&ts.serialize(),
|
&ts.to_string(),
|
||||||
seal_serialize(&op, &self.key)?,
|
seal_serialize(&op, &self.key)?,
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
|
@ -316,7 +318,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
let ts_cp = self.history[i_cp].0;
|
let ts_cp = self.history[i_cp].0;
|
||||||
debug!(
|
debug!(
|
||||||
"(cp) we could checkpoint at time {} (index {} in history)",
|
"(cp) we could checkpoint at time {} (index {} in history)",
|
||||||
ts_cp.serialize(),
|
ts_cp.to_string(),
|
||||||
i_cp
|
i_cp
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -330,13 +332,13 @@ impl<S: BayouState> Bayou<S> {
|
||||||
{
|
{
|
||||||
debug!(
|
debug!(
|
||||||
"(cp) last checkpoint is too recent: {}, not checkpointing",
|
"(cp) last checkpoint is too recent: {}, not checkpointing",
|
||||||
last_cp.0.serialize()
|
last_cp.0.to_string()
|
||||||
);
|
);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("(cp) saving checkpoint at {}", ts_cp.serialize());
|
debug!("(cp) saving checkpoint at {}", ts_cp.to_string());
|
||||||
|
|
||||||
// Calculate state at time of checkpoint
|
// Calculate state at time of checkpoint
|
||||||
let mut last_known_state = (0, &self.checkpoint.1);
|
let mut last_known_state = (0, &self.checkpoint.1);
|
||||||
|
@ -356,7 +358,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
|
|
||||||
let mut por = PutObjectRequest::default();
|
let mut por = PutObjectRequest::default();
|
||||||
por.bucket = self.bucket.clone();
|
por.bucket = self.bucket.clone();
|
||||||
por.key = format!("{}/checkpoint/{}", self.path, ts_cp.serialize());
|
por.key = format!("{}/checkpoint/{}", self.path, ts_cp.to_string());
|
||||||
por.body = Some(cryptoblob.into());
|
por.body = Some(cryptoblob.into());
|
||||||
self.s3.put_object(por).await?;
|
self.s3.put_object(por).await?;
|
||||||
|
|
||||||
|
@ -375,7 +377,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete corresponding range of operations
|
// Delete corresponding range of operations
|
||||||
let ts_ser = existing_checkpoints[last_to_keep].0.serialize();
|
let ts_ser = existing_checkpoints[last_to_keep].0.to_string();
|
||||||
self.k2v
|
self.k2v
|
||||||
.delete_batch(&[BatchDeleteOp {
|
.delete_batch(&[BatchDeleteOp {
|
||||||
partition_key: &self.path,
|
partition_key: &self.path,
|
||||||
|
@ -414,7 +416,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
for object in checkpoints_res.contents.unwrap_or_default() {
|
for object in checkpoints_res.contents.unwrap_or_default() {
|
||||||
if let Some(key) = object.key {
|
if let Some(key) = object.key {
|
||||||
if let Some(ckid) = key.strip_prefix(&prefix) {
|
if let Some(ckid) = key.strip_prefix(&prefix) {
|
||||||
if let Some(ts) = Timestamp::parse(ckid) {
|
if let Ok(ts) = ckid.parse::<Timestamp>() {
|
||||||
checkpoints.push((ts, key));
|
checkpoints.push((ts, key));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -451,20 +453,26 @@ impl Timestamp {
|
||||||
pub fn zero() -> Self {
|
pub fn zero() -> Self {
|
||||||
Self { msec: 0, rand: 0 }
|
Self { msec: 0, rand: 0 }
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn serialize(&self) -> String {
|
impl ToString for Timestamp {
|
||||||
|
fn to_string(&self) -> String {
|
||||||
let mut bytes = [0u8; 16];
|
let mut bytes = [0u8; 16];
|
||||||
bytes[0..8].copy_from_slice(&u64::to_be_bytes(self.msec));
|
bytes[0..8].copy_from_slice(&u64::to_be_bytes(self.msec));
|
||||||
bytes[8..16].copy_from_slice(&u64::to_be_bytes(self.rand));
|
bytes[8..16].copy_from_slice(&u64::to_be_bytes(self.rand));
|
||||||
hex::encode(&bytes)
|
hex::encode(&bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn parse(v: &str) -> Option<Self> {
|
|
||||||
let bytes = hex::decode(v).ok()?;
|
|
||||||
if bytes.len() != 16 {
|
|
||||||
return None;
|
|
||||||
}
|
}
|
||||||
Some(Self {
|
|
||||||
|
impl FromStr for Timestamp {
|
||||||
|
type Err = &'static str;
|
||||||
|
|
||||||
|
fn from_str(s: &str) -> Result<Timestamp, &'static str> {
|
||||||
|
let bytes = hex::decode(s).map_err(|_| "invalid hex")?;
|
||||||
|
if bytes.len() != 16 {
|
||||||
|
return Err("bad length");
|
||||||
|
}
|
||||||
|
Ok(Self {
|
||||||
msec: u64::from_be_bytes(bytes[0..8].try_into().unwrap()),
|
msec: u64::from_be_bytes(bytes[0..8].try_into().unwrap()),
|
||||||
rand: u64::from_be_bytes(bytes[8..16].try_into().unwrap()),
|
rand: u64::from_be_bytes(bytes[8..16].try_into().unwrap()),
|
||||||
})
|
})
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
|
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use rand::prelude::*;
|
use rand::prelude::*;
|
||||||
|
|
Loading…
Reference in a new issue