Storage trait new implementation
This commit is contained in:
parent
684f4de225
commit
3d41f40dc8
10 changed files with 315 additions and 373 deletions
134
src/bayou.rs
134
src/bayou.rs
|
@ -45,8 +45,7 @@ pub struct Bayou<S: BayouState> {
|
||||||
path: String,
|
path: String,
|
||||||
key: Key,
|
key: Key,
|
||||||
|
|
||||||
k2v: storage::RowStore,
|
storage: storage::Store,
|
||||||
s3: storage::BlobStore,
|
|
||||||
|
|
||||||
checkpoint: (Timestamp, S),
|
checkpoint: (Timestamp, S),
|
||||||
history: Vec<(Timestamp, S::Op, Option<S>)>,
|
history: Vec<(Timestamp, S::Op, Option<S>)>,
|
||||||
|
@ -60,17 +59,16 @@ pub struct Bayou<S: BayouState> {
|
||||||
|
|
||||||
impl<S: BayouState> Bayou<S> {
|
impl<S: BayouState> Bayou<S> {
|
||||||
pub fn new(creds: &Credentials, path: String) -> Result<Self> {
|
pub fn new(creds: &Credentials, path: String) -> Result<Self> {
|
||||||
let k2v_client = creds.row_client()?;
|
let storage = creds.storage.build()?;
|
||||||
let s3_client = creds.blob_client()?;
|
|
||||||
|
|
||||||
let target = k2v_client.row(&path, WATCH_SK);
|
//let target = k2v_client.row(&path, WATCH_SK);
|
||||||
let watch = K2vWatch::new(creds, path.clone(), WATCH_SK.to_string())?;
|
let target = storage::RowRef::new(&path, WATCH_SK);
|
||||||
|
let watch = K2vWatch::new(creds, target.clone())?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
path,
|
path,
|
||||||
|
storage,
|
||||||
key: creds.keys.master.clone(),
|
key: creds.keys.master.clone(),
|
||||||
k2v: k2v_client,
|
|
||||||
s3: s3_client,
|
|
||||||
checkpoint: (Timestamp::zero(), S::default()),
|
checkpoint: (Timestamp::zero(), S::default()),
|
||||||
history: vec![],
|
history: vec![],
|
||||||
last_sync: None,
|
last_sync: None,
|
||||||
|
@ -96,9 +94,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
} else {
|
} else {
|
||||||
debug!("(sync) loading checkpoint: {}", key);
|
debug!("(sync) loading checkpoint: {}", key);
|
||||||
|
|
||||||
let obj_res = self.s3.blob(key).fetch().await?;
|
let buf = self.storage.blob_fetch(&storage::BlobRef(key.to_string())).await?.value;
|
||||||
let buf = obj_res.content().ok_or(anyhow!("object can't be empty"))?;
|
|
||||||
|
|
||||||
debug!("(sync) checkpoint body length: {}", buf.len());
|
debug!("(sync) checkpoint body length: {}", buf.len());
|
||||||
|
|
||||||
let ck = open_deserialize::<S>(&buf, &self.key)?;
|
let ck = open_deserialize::<S>(&buf, &self.key)?;
|
||||||
|
@ -129,42 +125,26 @@ impl<S: BayouState> Bayou<S> {
|
||||||
// 3. List all operations starting from checkpoint
|
// 3. List all operations starting from checkpoint
|
||||||
let ts_ser = self.checkpoint.0.to_string();
|
let ts_ser = self.checkpoint.0.to_string();
|
||||||
debug!("(sync) looking up operations starting at {}", ts_ser);
|
debug!("(sync) looking up operations starting at {}", ts_ser);
|
||||||
let ops_map = self.k2v.select(storage::Selector::Range { shard_key: &self.path, begin: &ts_ser, end: WATCH_SK }).await?;
|
let ops_map = self.storage.row_fetch(&storage::Selector::Range {
|
||||||
/*let ops_map = self
|
shard: &self.path,
|
||||||
.k2v
|
sort_begin: &ts_ser,
|
||||||
.read_batch(&[BatchReadOp {
|
sort_end: WATCH_SK
|
||||||
partition_key: &self.path,
|
}).await?;
|
||||||
filter: Filter {
|
|
||||||
start: Some(&ts_ser),
|
|
||||||
end: Some(WATCH_SK),
|
|
||||||
prefix: None,
|
|
||||||
limit: None,
|
|
||||||
reverse: false,
|
|
||||||
},
|
|
||||||
single_item: false,
|
|
||||||
conflicts_only: false,
|
|
||||||
tombstones: false,
|
|
||||||
}])
|
|
||||||
.await?
|
|
||||||
.into_iter()
|
|
||||||
.next()
|
|
||||||
.ok_or(anyhow!("Missing K2V result"))?
|
|
||||||
.items;*/
|
|
||||||
|
|
||||||
let mut ops = vec![];
|
let mut ops = vec![];
|
||||||
for row_value in ops_map {
|
for row_value in ops_map {
|
||||||
let row = row_value.to_ref();
|
let row = row_value.row_ref;
|
||||||
let sort_key = row.key().1;
|
let sort_key = row.uid.sort;
|
||||||
let ts = sort_key.parse::<Timestamp>().map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?;
|
let ts = sort_key.parse::<Timestamp>().map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?;
|
||||||
|
|
||||||
let val = row_value.content();
|
let val = row_value.value;
|
||||||
if val.len() != 1 {
|
if val.len() != 1 {
|
||||||
bail!("Invalid operation, has {} values", row_value.content().len());
|
bail!("Invalid operation, has {} values", val.len());
|
||||||
}
|
}
|
||||||
match &val[0] {
|
match &val[0] {
|
||||||
storage::Alternative::Value(v) => {
|
storage::Alternative::Value(v) => {
|
||||||
let op = open_deserialize::<S::Op>(v, &self.key)?;
|
let op = open_deserialize::<S::Op>(v, &self.key)?;
|
||||||
debug!("(sync) operation {}: {} {:?}", sort_key, base64::encode(v), op);
|
debug!("(sync) operation {}: {:?}", sort_key, op);
|
||||||
ops.push((ts, op));
|
ops.push((ts, op));
|
||||||
}
|
}
|
||||||
storage::Alternative::Tombstone => {
|
storage::Alternative::Tombstone => {
|
||||||
|
@ -231,7 +211,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
|
|
||||||
// Save info that sync has been done
|
// Save info that sync has been done
|
||||||
self.last_sync = new_last_sync;
|
self.last_sync = new_last_sync;
|
||||||
self.last_sync_watch_ct = self.k2v.from_orphan(new_last_sync_watch_ct).expect("Source & target storage must be compatible");
|
self.last_sync_watch_ct = new_last_sync_watch_ct;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -243,7 +223,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
Some(t) => Instant::now() > t + (CHECKPOINT_INTERVAL / 5),
|
Some(t) => Instant::now() > t + (CHECKPOINT_INTERVAL / 5),
|
||||||
_ => true,
|
_ => true,
|
||||||
};
|
};
|
||||||
let changed = self.last_sync_watch_ct.to_orphan() != *self.watch.rx.borrow();
|
let changed = self.last_sync_watch_ct != *self.watch.rx.borrow();
|
||||||
if too_old || changed {
|
if too_old || changed {
|
||||||
self.sync().await?;
|
self.sync().await?;
|
||||||
}
|
}
|
||||||
|
@ -263,12 +243,12 @@ impl<S: BayouState> Bayou<S> {
|
||||||
.map(|(ts, _, _)| ts)
|
.map(|(ts, _, _)| ts)
|
||||||
.unwrap_or(&self.checkpoint.0),
|
.unwrap_or(&self.checkpoint.0),
|
||||||
);
|
);
|
||||||
self.k2v
|
|
||||||
.row(&self.path, &ts.to_string())
|
|
||||||
.set_value(&seal_serialize(&op, &self.key)?)
|
|
||||||
.push()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
|
let row_val = storage::RowVal::new(
|
||||||
|
storage::RowRef::new(&self.path, &ts.to_string()),
|
||||||
|
seal_serialize(&op, &self.key)?,
|
||||||
|
);
|
||||||
|
self.storage.row_insert(vec![row_val]).await?;
|
||||||
self.watch.notify.notify_one();
|
self.watch.notify.notify_one();
|
||||||
|
|
||||||
let new_state = self.state().apply(&op);
|
let new_state = self.state().apply(&op);
|
||||||
|
@ -368,12 +348,11 @@ impl<S: BayouState> Bayou<S> {
|
||||||
let cryptoblob = seal_serialize(&state_cp, &self.key)?;
|
let cryptoblob = seal_serialize(&state_cp, &self.key)?;
|
||||||
debug!("(cp) checkpoint body length: {}", cryptoblob.len());
|
debug!("(cp) checkpoint body length: {}", cryptoblob.len());
|
||||||
|
|
||||||
self.s3
|
let blob_val = storage::BlobVal::new(
|
||||||
.blob(format!("{}/checkpoint/{}", self.path, ts_cp.to_string()).as_str())
|
storage::BlobRef(format!("{}/checkpoint/{}", self.path, ts_cp.to_string())),
|
||||||
.set_value(cryptoblob.into())
|
cryptoblob.into(),
|
||||||
.push()
|
);
|
||||||
.await?;
|
self.storage.blob_insert(&blob_val).await?;
|
||||||
|
|
||||||
|
|
||||||
// Drop old checkpoints (but keep at least CHECKPOINTS_TO_KEEP of them)
|
// Drop old checkpoints (but keep at least CHECKPOINTS_TO_KEEP of them)
|
||||||
let ecp_len = existing_checkpoints.len();
|
let ecp_len = existing_checkpoints.len();
|
||||||
|
@ -383,22 +362,16 @@ impl<S: BayouState> Bayou<S> {
|
||||||
// Delete blobs
|
// Delete blobs
|
||||||
for (_ts, key) in existing_checkpoints[..last_to_keep].iter() {
|
for (_ts, key) in existing_checkpoints[..last_to_keep].iter() {
|
||||||
debug!("(cp) drop old checkpoint {}", key);
|
debug!("(cp) drop old checkpoint {}", key);
|
||||||
self.s3
|
self.storage.blob_rm(&storage::BlobRef(key.to_string())).await?;
|
||||||
.blob(key)
|
|
||||||
.rm()
|
|
||||||
.await?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete corresponding range of operations
|
// Delete corresponding range of operations
|
||||||
let ts_ser = existing_checkpoints[last_to_keep].0.to_string();
|
let ts_ser = existing_checkpoints[last_to_keep].0.to_string();
|
||||||
self.k2v
|
self.storage.row_rm(&storage::Selector::Range {
|
||||||
.rm(storage::Selector::Range{
|
shard: &self.path,
|
||||||
shard_key: &self.path,
|
sort_begin: "",
|
||||||
begin: "",
|
sort_end: &ts_ser
|
||||||
end: &ts_ser
|
}).await?
|
||||||
})
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -417,11 +390,11 @@ impl<S: BayouState> Bayou<S> {
|
||||||
async fn list_checkpoints(&self) -> Result<Vec<(Timestamp, String)>> {
|
async fn list_checkpoints(&self) -> Result<Vec<(Timestamp, String)>> {
|
||||||
let prefix = format!("{}/checkpoint/", self.path);
|
let prefix = format!("{}/checkpoint/", self.path);
|
||||||
|
|
||||||
let checkpoints_res = self.s3.list(&prefix).await?;
|
let checkpoints_res = self.storage.blob_list(&prefix).await?;
|
||||||
|
|
||||||
let mut checkpoints = vec![];
|
let mut checkpoints = vec![];
|
||||||
for object in checkpoints_res {
|
for object in checkpoints_res {
|
||||||
let key = object.key();
|
let key = object.0;
|
||||||
if let Some(ckid) = key.strip_prefix(&prefix) {
|
if let Some(ckid) = key.strip_prefix(&prefix) {
|
||||||
if let Ok(ts) = ckid.parse::<Timestamp>() {
|
if let Ok(ts) = ckid.parse::<Timestamp>() {
|
||||||
checkpoints.push((ts, key.into()));
|
checkpoints.push((ts, key.into()));
|
||||||
|
@ -436,9 +409,8 @@ impl<S: BayouState> Bayou<S> {
|
||||||
// ---- Bayou watch in K2V ----
|
// ---- Bayou watch in K2V ----
|
||||||
|
|
||||||
struct K2vWatch {
|
struct K2vWatch {
|
||||||
pk: String,
|
target: storage::RowRef,
|
||||||
sk: String,
|
rx: watch::Receiver<storage::RowRef>,
|
||||||
rx: watch::Receiver<storage::OrphanRowRef>,
|
|
||||||
notify: Notify,
|
notify: Notify,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -446,17 +418,17 @@ impl K2vWatch {
|
||||||
/// Creates a new watch and launches subordinate threads.
|
/// Creates a new watch and launches subordinate threads.
|
||||||
/// These threads hold Weak pointers to the struct;
|
/// These threads hold Weak pointers to the struct;
|
||||||
/// they exit when the Arc is dropped.
|
/// they exit when the Arc is dropped.
|
||||||
fn new(creds: &Credentials, pk: String, sk: String) -> Result<Arc<Self>> {
|
fn new(creds: &Credentials, target: storage::RowRef) -> Result<Arc<Self>> {
|
||||||
let row_client = creds.row_client()?;
|
let storage = creds.storage.build()?;
|
||||||
|
|
||||||
let (tx, rx) = watch::channel::<storage::OrphanRowRef>(row_client.row(&pk, &sk).to_orphan());
|
let (tx, rx) = watch::channel::<storage::RowRef>(target.clone());
|
||||||
let notify = Notify::new();
|
let notify = Notify::new();
|
||||||
|
|
||||||
let watch = Arc::new(K2vWatch { pk, sk, rx, notify });
|
let watch = Arc::new(K2vWatch { target, rx, notify });
|
||||||
|
|
||||||
tokio::spawn(Self::background_task(
|
tokio::spawn(Self::background_task(
|
||||||
Arc::downgrade(&watch),
|
Arc::downgrade(&watch),
|
||||||
row_client,
|
storage,
|
||||||
tx,
|
tx,
|
||||||
));
|
));
|
||||||
|
|
||||||
|
@ -465,11 +437,11 @@ impl K2vWatch {
|
||||||
|
|
||||||
async fn background_task(
|
async fn background_task(
|
||||||
self_weak: Weak<Self>,
|
self_weak: Weak<Self>,
|
||||||
k2v: storage::RowStore,
|
storage: storage::Store,
|
||||||
tx: watch::Sender<storage::OrphanRowRef>,
|
tx: watch::Sender<storage::RowRef>,
|
||||||
) {
|
) {
|
||||||
let mut row = match Weak::upgrade(&self_weak) {
|
let mut row = match Weak::upgrade(&self_weak) {
|
||||||
Some(this) => k2v.row(&this.pk, &this.sk),
|
Some(this) => this.target.clone(),
|
||||||
None => {
|
None => {
|
||||||
error!("can't start loop");
|
error!("can't start loop");
|
||||||
return
|
return
|
||||||
|
@ -479,20 +451,19 @@ impl K2vWatch {
|
||||||
while let Some(this) = Weak::upgrade(&self_weak) {
|
while let Some(this) = Weak::upgrade(&self_weak) {
|
||||||
debug!(
|
debug!(
|
||||||
"bayou k2v watch bg loop iter ({}, {})",
|
"bayou k2v watch bg loop iter ({}, {})",
|
||||||
this.pk, this.sk
|
this.target.uid.shard, this.target.uid.sort
|
||||||
);
|
);
|
||||||
tokio::select!(
|
tokio::select!(
|
||||||
_ = tokio::time::sleep(Duration::from_secs(60)) => continue,
|
_ = tokio::time::sleep(Duration::from_secs(60)) => continue,
|
||||||
update = row.poll() => {
|
update = storage.row_poll(&row) => {
|
||||||
//update = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => {
|
|
||||||
match update {
|
match update {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Error in bayou k2v wait value changed: {}", e);
|
error!("Error in bayou k2v wait value changed: {}", e);
|
||||||
tokio::time::sleep(Duration::from_secs(30)).await;
|
tokio::time::sleep(Duration::from_secs(30)).await;
|
||||||
}
|
}
|
||||||
Ok(new_value) => {
|
Ok(new_value) => {
|
||||||
row = new_value.to_ref();
|
row = new_value.row_ref;
|
||||||
if tx.send(row.to_orphan()).is_err() {
|
if tx.send(row.clone()).is_err() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -500,7 +471,8 @@ impl K2vWatch {
|
||||||
}
|
}
|
||||||
_ = this.notify.notified() => {
|
_ = this.notify.notified() => {
|
||||||
let rand = u128::to_be_bytes(thread_rng().gen()).to_vec();
|
let rand = u128::to_be_bytes(thread_rng().gen()).to_vec();
|
||||||
if let Err(e) = row.set_value(&rand).push().await
|
let row_val = storage::RowVal::new(row.clone(), rand);
|
||||||
|
if let Err(e) = storage.row_insert(vec![row_val]).await
|
||||||
{
|
{
|
||||||
error!("Error in bayou k2v watch updater loop: {}", e);
|
error!("Error in bayou k2v watch updater loop: {}", e);
|
||||||
tokio::time::sleep(Duration::from_secs(30)).await;
|
tokio::time::sleep(Duration::from_secs(30)).await;
|
||||||
|
|
|
@ -85,11 +85,11 @@ impl LdapLoginProvider {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result<Builders> {
|
fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result<Builder> {
|
||||||
let storage: Builders = match &self.storage_specific {
|
let storage: Builder = match &self.storage_specific {
|
||||||
StorageSpecific::InMemory => Box::new(storage::in_memory::FullMem::new(
|
StorageSpecific::InMemory => storage::in_memory::MemBuilder::new(
|
||||||
&get_attr(user, &self.username_attr)?
|
&get_attr(user, &self.username_attr)?
|
||||||
)),
|
),
|
||||||
StorageSpecific::Garage { from_config, bucket_source } => {
|
StorageSpecific::Garage { from_config, bucket_source } => {
|
||||||
let aws_access_key_id = get_attr(user, &from_config.aws_access_key_id_attr)?;
|
let aws_access_key_id = get_attr(user, &from_config.aws_access_key_id_attr)?;
|
||||||
let aws_secret_access_key = get_attr(user, &from_config.aws_secret_access_key_attr)?;
|
let aws_secret_access_key = get_attr(user, &from_config.aws_secret_access_key_attr)?;
|
||||||
|
@ -99,14 +99,14 @@ impl LdapLoginProvider {
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
Box::new(storage::garage::GrgCreds {
|
storage::garage::GarageBuilder::new(storage::garage::GarageConf {
|
||||||
region: from_config.aws_region.clone(),
|
region: from_config.aws_region.clone(),
|
||||||
s3_endpoint: from_config.s3_endpoint.clone(),
|
s3_endpoint: from_config.s3_endpoint.clone(),
|
||||||
k2v_endpoint: from_config.k2v_endpoint.clone(),
|
k2v_endpoint: from_config.k2v_endpoint.clone(),
|
||||||
aws_access_key_id,
|
aws_access_key_id,
|
||||||
aws_secret_access_key,
|
aws_secret_access_key,
|
||||||
bucket
|
bucket,
|
||||||
})
|
})?
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -33,23 +33,15 @@ pub type ArcLoginProvider = Arc<dyn LoginProvider + Send + Sync>;
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Credentials {
|
pub struct Credentials {
|
||||||
/// The storage credentials are used to authenticate access to the underlying storage (S3, K2V)
|
/// The storage credentials are used to authenticate access to the underlying storage (S3, K2V)
|
||||||
pub storage: Builders,
|
pub storage: Builder,
|
||||||
/// The cryptographic keys are used to encrypt and decrypt data stored in S3 and K2V
|
/// The cryptographic keys are used to encrypt and decrypt data stored in S3 and K2V
|
||||||
pub keys: CryptoKeys,
|
pub keys: CryptoKeys,
|
||||||
}
|
}
|
||||||
impl Credentials {
|
|
||||||
pub fn row_client(&self) -> Result<RowStore> {
|
|
||||||
Ok(self.storage.row_store()?)
|
|
||||||
}
|
|
||||||
pub fn blob_client(&self) -> Result<BlobStore> {
|
|
||||||
Ok(self.storage.blob_store()?)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct PublicCredentials {
|
pub struct PublicCredentials {
|
||||||
/// The storage credentials are used to authenticate access to the underlying storage (S3, K2V)
|
/// The storage credentials are used to authenticate access to the underlying storage (S3, K2V)
|
||||||
pub storage: Builders,
|
pub storage: Builder,
|
||||||
pub public_key: PublicKey,
|
pub public_key: PublicKey,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -88,16 +88,16 @@ impl LoginProvider for StaticLoginProvider {
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::debug!(user=%username, "fetch keys");
|
tracing::debug!(user=%username, "fetch keys");
|
||||||
let storage: storage::Builders = match &user.config.storage {
|
let storage: storage::Builder = match &user.config.storage {
|
||||||
StaticStorage::InMemory => Box::new(storage::in_memory::FullMem::new(username)),
|
StaticStorage::InMemory => storage::in_memory::MemBuilder::new(username),
|
||||||
StaticStorage::Garage(grgconf) => Box::new(storage::garage::GrgCreds {
|
StaticStorage::Garage(grgconf) => storage::garage::GarageBuilder::new(storage::garage::GarageConf {
|
||||||
region: grgconf.aws_region.clone(),
|
region: grgconf.aws_region.clone(),
|
||||||
k2v_endpoint: grgconf.k2v_endpoint.clone(),
|
k2v_endpoint: grgconf.k2v_endpoint.clone(),
|
||||||
s3_endpoint: grgconf.s3_endpoint.clone(),
|
s3_endpoint: grgconf.s3_endpoint.clone(),
|
||||||
aws_access_key_id: grgconf.aws_access_key_id.clone(),
|
aws_access_key_id: grgconf.aws_access_key_id.clone(),
|
||||||
aws_secret_access_key: grgconf.aws_secret_access_key.clone(),
|
aws_secret_access_key: grgconf.aws_secret_access_key.clone(),
|
||||||
bucket: grgconf.bucket.clone(),
|
bucket: grgconf.bucket.clone(),
|
||||||
}),
|
})?,
|
||||||
};
|
};
|
||||||
|
|
||||||
let cr = CryptoRoot(user.config.crypto_root.clone());
|
let cr = CryptoRoot(user.config.crypto_root.clone());
|
||||||
|
@ -114,16 +114,16 @@ impl LoginProvider for StaticLoginProvider {
|
||||||
Some(u) => u,
|
Some(u) => u,
|
||||||
};
|
};
|
||||||
|
|
||||||
let storage: storage::Builders = match &user.config.storage {
|
let storage: storage::Builder = match &user.config.storage {
|
||||||
StaticStorage::InMemory => Box::new(storage::in_memory::FullMem::new(&user.username)),
|
StaticStorage::InMemory => storage::in_memory::MemBuilder::new(&user.username),
|
||||||
StaticStorage::Garage(grgconf) => Box::new(storage::garage::GrgCreds {
|
StaticStorage::Garage(grgconf) => storage::garage::GarageBuilder::new(storage::garage::GarageConf {
|
||||||
region: grgconf.aws_region.clone(),
|
region: grgconf.aws_region.clone(),
|
||||||
k2v_endpoint: grgconf.k2v_endpoint.clone(),
|
k2v_endpoint: grgconf.k2v_endpoint.clone(),
|
||||||
s3_endpoint: grgconf.s3_endpoint.clone(),
|
s3_endpoint: grgconf.s3_endpoint.clone(),
|
||||||
aws_access_key_id: grgconf.aws_access_key_id.clone(),
|
aws_access_key_id: grgconf.aws_access_key_id.clone(),
|
||||||
aws_secret_access_key: grgconf.aws_secret_access_key.clone(),
|
aws_secret_access_key: grgconf.aws_secret_access_key.clone(),
|
||||||
bucket: grgconf.bucket.clone(),
|
bucket: grgconf.bucket.clone(),
|
||||||
}),
|
})?,
|
||||||
};
|
};
|
||||||
|
|
||||||
let cr = CryptoRoot(user.config.crypto_root.clone());
|
let cr = CryptoRoot(user.config.crypto_root.clone());
|
||||||
|
|
|
@ -5,6 +5,7 @@ use std::sync::{Arc, Weak};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use anyhow::{anyhow, bail, Result};
|
use anyhow::{anyhow, bail, Result};
|
||||||
|
use base64::Engine;
|
||||||
use futures::{future::BoxFuture, FutureExt};
|
use futures::{future::BoxFuture, FutureExt};
|
||||||
//use tokio::io::AsyncReadExt;
|
//use tokio::io::AsyncReadExt;
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
@ -50,13 +51,11 @@ async fn incoming_mail_watch_process_internal(
|
||||||
creds: Credentials,
|
creds: Credentials,
|
||||||
mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
|
mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut lock_held = k2v_lock_loop(creds.row_client()?, INCOMING_PK, INCOMING_LOCK_SK);
|
let mut lock_held = k2v_lock_loop(creds.storage.build()?, storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK));
|
||||||
|
let storage = creds.storage.build()?;
|
||||||
let k2v = creds.row_client()?;
|
|
||||||
let s3 = creds.blob_client()?;
|
|
||||||
|
|
||||||
let mut inbox: Option<Arc<Mailbox>> = None;
|
let mut inbox: Option<Arc<Mailbox>> = None;
|
||||||
let mut incoming_key = k2v.row(INCOMING_PK, INCOMING_WATCH_SK);
|
let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let maybe_updated_incoming_key = if *lock_held.borrow() {
|
let maybe_updated_incoming_key = if *lock_held.borrow() {
|
||||||
|
@ -64,9 +63,9 @@ async fn incoming_mail_watch_process_internal(
|
||||||
|
|
||||||
let wait_new_mail = async {
|
let wait_new_mail = async {
|
||||||
loop {
|
loop {
|
||||||
match incoming_key.poll().await
|
match storage.row_poll(&incoming_key).await
|
||||||
{
|
{
|
||||||
Ok(row_val) => break row_val.to_ref(),
|
Ok(row_val) => break row_val.row_ref,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Error in wait_new_mail: {}", e);
|
error!("Error in wait_new_mail: {}", e);
|
||||||
tokio::time::sleep(Duration::from_secs(30)).await;
|
tokio::time::sleep(Duration::from_secs(30)).await;
|
||||||
|
@ -77,7 +76,7 @@ async fn incoming_mail_watch_process_internal(
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
inc_k = wait_new_mail => Some(inc_k),
|
inc_k = wait_new_mail => Some(inc_k),
|
||||||
_ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(k2v.from_orphan(incoming_key.to_orphan()).expect("Incompatible source & target storage")),
|
_ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()),
|
||||||
_ = lock_held.changed() => None,
|
_ = lock_held.changed() => None,
|
||||||
_ = rx_inbox_id.changed() => None,
|
_ = rx_inbox_id.changed() => None,
|
||||||
}
|
}
|
||||||
|
@ -119,7 +118,7 @@ async fn incoming_mail_watch_process_internal(
|
||||||
// If we were able to open INBOX, and we have mail,
|
// If we were able to open INBOX, and we have mail,
|
||||||
// fetch new mail
|
// fetch new mail
|
||||||
if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) {
|
if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) {
|
||||||
match handle_incoming_mail(&user, &s3, inbox, &lock_held).await {
|
match handle_incoming_mail(&user, &storage, inbox, &lock_held).await {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
incoming_key = updated_incoming_key;
|
incoming_key = updated_incoming_key;
|
||||||
}
|
}
|
||||||
|
@ -136,20 +135,20 @@ async fn incoming_mail_watch_process_internal(
|
||||||
|
|
||||||
async fn handle_incoming_mail(
|
async fn handle_incoming_mail(
|
||||||
user: &Arc<User>,
|
user: &Arc<User>,
|
||||||
blobs: &storage::BlobStore,
|
storage: &storage::Store,
|
||||||
inbox: &Arc<Mailbox>,
|
inbox: &Arc<Mailbox>,
|
||||||
lock_held: &watch::Receiver<bool>,
|
lock_held: &watch::Receiver<bool>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mails_res = blobs.list("incoming/").await?;
|
let mails_res = storage.blob_list("incoming/").await?;
|
||||||
|
|
||||||
for object in mails_res {
|
for object in mails_res {
|
||||||
if !*lock_held.borrow() {
|
if !*lock_held.borrow() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
let key = object.key();
|
let key = object.0;
|
||||||
if let Some(mail_id) = key.strip_prefix("incoming/") {
|
if let Some(mail_id) = key.strip_prefix("incoming/") {
|
||||||
if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() {
|
if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() {
|
||||||
move_incoming_message(user, blobs, inbox, mail_id).await?;
|
move_incoming_message(user, storage, inbox, mail_id).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -159,7 +158,7 @@ async fn handle_incoming_mail(
|
||||||
|
|
||||||
async fn move_incoming_message(
|
async fn move_incoming_message(
|
||||||
user: &Arc<User>,
|
user: &Arc<User>,
|
||||||
s3: &storage::BlobStore,
|
storage: &storage::Store,
|
||||||
inbox: &Arc<Mailbox>,
|
inbox: &Arc<Mailbox>,
|
||||||
id: UniqueIdent,
|
id: UniqueIdent,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
@ -168,14 +167,15 @@ async fn move_incoming_message(
|
||||||
let object_key = format!("incoming/{}", id);
|
let object_key = format!("incoming/{}", id);
|
||||||
|
|
||||||
// 1. Fetch message from S3
|
// 1. Fetch message from S3
|
||||||
let object = s3.blob(&object_key).fetch().await?;
|
let object = storage.blob_fetch(&storage::BlobRef(object_key)).await?;
|
||||||
|
|
||||||
// 1.a decrypt message key from headers
|
// 1.a decrypt message key from headers
|
||||||
//info!("Object metadata: {:?}", get_result.metadata);
|
//info!("Object metadata: {:?}", get_result.metadata);
|
||||||
let key_encrypted_b64 = object
|
let key_encrypted_b64 = object
|
||||||
.get_meta(MESSAGE_KEY)
|
.meta
|
||||||
|
.get(MESSAGE_KEY)
|
||||||
.ok_or(anyhow!("Missing key in metadata"))?;
|
.ok_or(anyhow!("Missing key in metadata"))?;
|
||||||
let key_encrypted = base64::decode(key_encrypted_b64)?;
|
let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?;
|
||||||
let message_key = sodiumoxide::crypto::sealedbox::open(
|
let message_key = sodiumoxide::crypto::sealedbox::open(
|
||||||
&key_encrypted,
|
&key_encrypted,
|
||||||
&user.creds.keys.public,
|
&user.creds.keys.public,
|
||||||
|
@ -186,28 +186,28 @@ async fn move_incoming_message(
|
||||||
cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?;
|
cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?;
|
||||||
|
|
||||||
// 1.b retrieve message body
|
// 1.b retrieve message body
|
||||||
let obj_body = object.content().ok_or(anyhow!("Missing object body"))?;
|
let obj_body = object.value;
|
||||||
let plain_mail = cryptoblob::open(&obj_body, &message_key)
|
let plain_mail = cryptoblob::open(&obj_body, &message_key)
|
||||||
.map_err(|_| anyhow!("Cannot decrypt email content"))?;
|
.map_err(|_| anyhow!("Cannot decrypt email content"))?;
|
||||||
|
|
||||||
// 2 parse mail and add to inbox
|
// 2 parse mail and add to inbox
|
||||||
let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?;
|
let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?;
|
||||||
inbox
|
inbox
|
||||||
.append_from_s3(msg, id, object.to_ref(), message_key)
|
.append_from_s3(msg, id, object.blob_ref.clone(), message_key)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// 3 delete from incoming
|
// 3 delete from incoming
|
||||||
object.to_ref().rm().await?;
|
storage.blob_rm(&object.blob_ref).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ----
|
// ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ----
|
||||||
|
|
||||||
fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) -> watch::Receiver<bool> {
|
fn k2v_lock_loop(storage: storage::Store, row_ref: storage::RowRef) -> watch::Receiver<bool> {
|
||||||
let (held_tx, held_rx) = watch::channel(false);
|
let (held_tx, held_rx) = watch::channel(false);
|
||||||
|
|
||||||
tokio::spawn(k2v_lock_loop_internal(k2v, pk, sk, held_tx));
|
tokio::spawn(k2v_lock_loop_internal(storage, row_ref, held_tx));
|
||||||
|
|
||||||
held_rx
|
held_rx
|
||||||
}
|
}
|
||||||
|
@ -216,13 +216,12 @@ fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) ->
|
||||||
enum LockState {
|
enum LockState {
|
||||||
Unknown,
|
Unknown,
|
||||||
Empty,
|
Empty,
|
||||||
Held(UniqueIdent, u64, storage::OrphanRowRef),
|
Held(UniqueIdent, u64, storage::RowRef),
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn k2v_lock_loop_internal(
|
async fn k2v_lock_loop_internal(
|
||||||
k2v: storage::RowStore,
|
storage: storage::Store,
|
||||||
pk: &'static str,
|
row_ref: storage::RowRef,
|
||||||
sk: &'static str,
|
|
||||||
held_tx: watch::Sender<bool>,
|
held_tx: watch::Sender<bool>,
|
||||||
) {
|
) {
|
||||||
let (state_tx, mut state_rx) = watch::channel::<LockState>(LockState::Unknown);
|
let (state_tx, mut state_rx) = watch::channel::<LockState>(LockState::Unknown);
|
||||||
|
@ -232,10 +231,10 @@ async fn k2v_lock_loop_internal(
|
||||||
|
|
||||||
// Loop 1: watch state of lock in K2V, save that in corresponding watch channel
|
// Loop 1: watch state of lock in K2V, save that in corresponding watch channel
|
||||||
let watch_lock_loop: BoxFuture<Result<()>> = async {
|
let watch_lock_loop: BoxFuture<Result<()>> = async {
|
||||||
let mut ct = k2v.row(pk, sk);
|
let mut ct = row_ref.clone();
|
||||||
loop {
|
loop {
|
||||||
info!("k2v watch lock loop iter: ct = {:?}", ct);
|
info!("k2v watch lock loop iter: ct = {:?}", ct);
|
||||||
match ct.poll().await {
|
match storage.row_poll(&ct).await {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(
|
error!(
|
||||||
"Error in k2v wait value changed: {} ; assuming we no longer hold lock.",
|
"Error in k2v wait value changed: {} ; assuming we no longer hold lock.",
|
||||||
|
@ -246,7 +245,7 @@ async fn k2v_lock_loop_internal(
|
||||||
}
|
}
|
||||||
Ok(cv) => {
|
Ok(cv) => {
|
||||||
let mut lock_state = None;
|
let mut lock_state = None;
|
||||||
for v in cv.content().iter() {
|
for v in cv.value.iter() {
|
||||||
if let storage::Alternative::Value(vbytes) = v {
|
if let storage::Alternative::Value(vbytes) = v {
|
||||||
if vbytes.len() == 32 {
|
if vbytes.len() == 32 {
|
||||||
let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap());
|
let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap());
|
||||||
|
@ -260,7 +259,7 @@ async fn k2v_lock_loop_internal(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let new_ct = cv.to_ref();
|
let new_ct = cv.row_ref;
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}",
|
"k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}",
|
||||||
|
@ -268,7 +267,7 @@ async fn k2v_lock_loop_internal(
|
||||||
);
|
);
|
||||||
state_tx.send(
|
state_tx.send(
|
||||||
lock_state
|
lock_state
|
||||||
.map(|(pid, ts)| LockState::Held(pid, ts, new_ct.to_orphan()))
|
.map(|(pid, ts)| LockState::Held(pid, ts, new_ct.clone()))
|
||||||
.unwrap_or(LockState::Empty),
|
.unwrap_or(LockState::Empty),
|
||||||
)?;
|
)?;
|
||||||
ct = new_ct;
|
ct = new_ct;
|
||||||
|
@ -358,10 +357,10 @@ async fn k2v_lock_loop_internal(
|
||||||
));
|
));
|
||||||
lock[8..].copy_from_slice(&our_pid.0);
|
lock[8..].copy_from_slice(&our_pid.0);
|
||||||
let row = match ct {
|
let row = match ct {
|
||||||
Some(orphan) => k2v.from_orphan(orphan).expect("Source & target must be storage compatible"),
|
Some(existing) => existing,
|
||||||
None => k2v.row(pk, sk),
|
None => row_ref.clone(),
|
||||||
};
|
};
|
||||||
if let Err(e) = row.set_value(&lock).push().await {
|
if let Err(e) = storage.row_insert(vec![storage::RowVal::new(row, lock)]).await {
|
||||||
error!("Could not take lock: {}", e);
|
error!("Could not take lock: {}", e);
|
||||||
tokio::time::sleep(Duration::from_secs(30)).await;
|
tokio::time::sleep(Duration::from_secs(30)).await;
|
||||||
}
|
}
|
||||||
|
@ -377,7 +376,7 @@ async fn k2v_lock_loop_internal(
|
||||||
info!("lock loop exited, releasing");
|
info!("lock loop exited, releasing");
|
||||||
|
|
||||||
if !held_tx.is_closed() {
|
if !held_tx.is_closed() {
|
||||||
warn!("wierd...");
|
warn!("weird...");
|
||||||
let _ = held_tx.send(false);
|
let _ = held_tx.send(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -387,8 +386,10 @@ async fn k2v_lock_loop_internal(
|
||||||
_ => None,
|
_ => None,
|
||||||
};
|
};
|
||||||
if let Some(ct) = release {
|
if let Some(ct) = release {
|
||||||
let row = k2v.from_orphan(ct).expect("Incompatible source & target storage");
|
match storage.row_rm(&storage::Selector::Single(&ct)).await {
|
||||||
let _ = row.rm().await;
|
Err(e) => warn!("Unable to release lock {:?}: {}", ct, e),
|
||||||
|
Ok(_) => (),
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -410,30 +411,32 @@ impl EncryptedMessage {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
|
pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
|
||||||
let s3_client = creds.storage.blob_store()?;
|
let storage = creds.storage.build()?;
|
||||||
let k2v_client = creds.storage.row_store()?;
|
|
||||||
|
|
||||||
// Get causality token of previous watch key
|
// Get causality token of previous watch key
|
||||||
let query = k2v_client.row(INCOMING_PK, INCOMING_WATCH_SK);
|
let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
|
||||||
let watch_ct = match query.fetch().await {
|
let watch_ct = match storage.row_fetch(&storage::Selector::Single(&query)).await {
|
||||||
Err(_) => query,
|
Err(_) => query,
|
||||||
Ok(cv) => cv.to_ref(),
|
Ok(cv) => cv.into_iter().next().map(|v| v.row_ref).unwrap_or(query),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Write mail to encrypted storage
|
// Write mail to encrypted storage
|
||||||
let encrypted_key =
|
let encrypted_key =
|
||||||
sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key);
|
sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key);
|
||||||
let key_header = base64::encode(&encrypted_key);
|
let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_key);
|
||||||
|
|
||||||
let mut send = s3_client
|
let blob_val = storage::BlobVal::new(
|
||||||
.blob(&format!("incoming/{}", gen_ident()))
|
storage::BlobRef(format!("incoming/{}", gen_ident())),
|
||||||
.set_value(self.encrypted_body.clone().into());
|
self.encrypted_body.clone().into(),
|
||||||
send.set_meta(MESSAGE_KEY, &key_header);
|
).with_meta(MESSAGE_KEY.to_string(), key_header);
|
||||||
send.push().await?;
|
storage.blob_insert(&blob_val).await?;
|
||||||
|
|
||||||
// Update watch key to signal new mail
|
// Update watch key to signal new mail
|
||||||
watch_ct.set_value(gen_ident().0.as_ref()).push().await?;
|
let watch_val = storage::RowVal::new(
|
||||||
|
watch_ct.clone(),
|
||||||
|
gen_ident().0.to_vec(),
|
||||||
|
);
|
||||||
|
storage.row_insert(vec![watch_val]).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,7 @@ use crate::login::Credentials;
|
||||||
use crate::mail::uidindex::*;
|
use crate::mail::uidindex::*;
|
||||||
use crate::mail::unique_ident::*;
|
use crate::mail::unique_ident::*;
|
||||||
use crate::mail::IMF;
|
use crate::mail::IMF;
|
||||||
use crate::storage::{RowStore, BlobStore, self};
|
use crate::storage::{Store, RowRef, RowVal, BlobRef, BlobVal, Selector, self};
|
||||||
use crate::timestamp::now_msec;
|
use crate::timestamp::now_msec;
|
||||||
|
|
||||||
pub struct Mailbox {
|
pub struct Mailbox {
|
||||||
|
@ -44,8 +44,7 @@ impl Mailbox {
|
||||||
let mbox = RwLock::new(MailboxInternal {
|
let mbox = RwLock::new(MailboxInternal {
|
||||||
id,
|
id,
|
||||||
encryption_key: creds.keys.master.clone(),
|
encryption_key: creds.keys.master.clone(),
|
||||||
k2v: creds.storage.row_store()?,
|
storage: creds.storage.build()?,
|
||||||
s3: creds.storage.blob_store()?,
|
|
||||||
uid_index,
|
uid_index,
|
||||||
mail_path,
|
mail_path,
|
||||||
});
|
});
|
||||||
|
@ -178,10 +177,7 @@ struct MailboxInternal {
|
||||||
id: UniqueIdent,
|
id: UniqueIdent,
|
||||||
mail_path: String,
|
mail_path: String,
|
||||||
encryption_key: Key,
|
encryption_key: Key,
|
||||||
|
storage: Store,
|
||||||
k2v: RowStore,
|
|
||||||
s3: BlobStore,
|
|
||||||
|
|
||||||
uid_index: Bayou<UidIndex>,
|
uid_index: Bayou<UidIndex>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,15 +196,15 @@ impl MailboxInternal {
|
||||||
|
|
||||||
async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
|
async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
|
||||||
let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
|
let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
|
||||||
let ops = ids.iter().map(|id| (self.mail_path.as_str(), id.as_str())).collect::<Vec<_>>();
|
let ops = ids.iter().map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())).collect::<Vec<_>>();
|
||||||
let res_vec = self.k2v.select(storage::Selector::List(ops)).await?;
|
let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?;
|
||||||
|
|
||||||
let mut meta_vec = vec![];
|
let mut meta_vec = vec![];
|
||||||
for res in res_vec.into_iter() {
|
for res in res_vec.into_iter() {
|
||||||
let mut meta_opt = None;
|
let mut meta_opt = None;
|
||||||
|
|
||||||
// Resolve conflicts
|
// Resolve conflicts
|
||||||
for v in res.content().iter() {
|
for v in res.value.iter() {
|
||||||
match v {
|
match v {
|
||||||
storage::Alternative::Tombstone => (),
|
storage::Alternative::Tombstone => (),
|
||||||
storage::Alternative::Value(v) => {
|
storage::Alternative::Value(v) => {
|
||||||
|
@ -227,7 +223,7 @@ impl MailboxInternal {
|
||||||
if let Some(meta) = meta_opt {
|
if let Some(meta) = meta_opt {
|
||||||
meta_vec.push(meta);
|
meta_vec.push(meta);
|
||||||
} else {
|
} else {
|
||||||
bail!("No valid meta value in k2v for {:?}", res.to_ref().key());
|
bail!("No valid meta value in k2v for {:?}", res.row_ref);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -235,9 +231,9 @@ impl MailboxInternal {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
|
async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
|
||||||
let obj_res = self.s3.blob(&format!("{}/{}", self.mail_path, id)).fetch().await?;
|
let obj_res = self.storage.blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))).await?;
|
||||||
let body = obj_res.content().ok_or(anyhow!("missing body"))?;
|
let body = obj_res.value;
|
||||||
cryptoblob::open(body, message_key)
|
cryptoblob::open(&body, message_key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- Functions for changing the mailbox ----
|
// ---- Functions for changing the mailbox ----
|
||||||
|
@ -270,7 +266,10 @@ impl MailboxInternal {
|
||||||
async {
|
async {
|
||||||
// Encrypt and save mail body
|
// Encrypt and save mail body
|
||||||
let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
|
let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
|
||||||
self.s3.blob(&format!("{}/{}", self.mail_path, ident)).set_value(message_blob).push().await?;
|
self.storage.blob_insert(&BlobVal::new(
|
||||||
|
BlobRef(format!("{}/{}", self.mail_path, ident)),
|
||||||
|
message_blob,
|
||||||
|
)).await?;
|
||||||
Ok::<_, anyhow::Error>(())
|
Ok::<_, anyhow::Error>(())
|
||||||
},
|
},
|
||||||
async {
|
async {
|
||||||
|
@ -282,7 +281,10 @@ impl MailboxInternal {
|
||||||
rfc822_size: mail.raw.len(),
|
rfc822_size: mail.raw.len(),
|
||||||
};
|
};
|
||||||
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
|
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
|
||||||
self.k2v.row(&self.mail_path, &ident.to_string()).set_value(&meta_blob).push().await?;
|
self.storage.row_insert(vec![RowVal::new(
|
||||||
|
RowRef::new(&self.mail_path, &ident.to_string()),
|
||||||
|
meta_blob,
|
||||||
|
)]).await?;
|
||||||
Ok::<_, anyhow::Error>(())
|
Ok::<_, anyhow::Error>(())
|
||||||
},
|
},
|
||||||
self.uid_index.opportunistic_sync()
|
self.uid_index.opportunistic_sync()
|
||||||
|
@ -307,14 +309,14 @@ impl MailboxInternal {
|
||||||
&mut self,
|
&mut self,
|
||||||
mail: IMF<'a>,
|
mail: IMF<'a>,
|
||||||
ident: UniqueIdent,
|
ident: UniqueIdent,
|
||||||
blob_ref: storage::BlobRef,
|
blob_src: storage::BlobRef,
|
||||||
message_key: Key,
|
message_key: Key,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
futures::try_join!(
|
futures::try_join!(
|
||||||
async {
|
async {
|
||||||
// Copy mail body from previous location
|
// Copy mail body from previous location
|
||||||
let dst = self.s3.blob(&format!("{}/{}", self.mail_path, ident));
|
let blob_dst = BlobRef(format!("{}/{}", self.mail_path, ident));
|
||||||
blob_ref.copy(&dst).await?;
|
self.storage.blob_copy(&blob_src, &blob_dst).await?;
|
||||||
Ok::<_, anyhow::Error>(())
|
Ok::<_, anyhow::Error>(())
|
||||||
},
|
},
|
||||||
async {
|
async {
|
||||||
|
@ -326,7 +328,10 @@ impl MailboxInternal {
|
||||||
rfc822_size: mail.raw.len(),
|
rfc822_size: mail.raw.len(),
|
||||||
};
|
};
|
||||||
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
|
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
|
||||||
self.k2v.row(&self.mail_path, &ident.to_string()).set_value(&meta_blob).push().await?;
|
self.storage.row_insert(vec![RowVal::new(
|
||||||
|
RowRef::new(&self.mail_path, &ident.to_string()),
|
||||||
|
meta_blob,
|
||||||
|
)]).await?;
|
||||||
Ok::<_, anyhow::Error>(())
|
Ok::<_, anyhow::Error>(())
|
||||||
},
|
},
|
||||||
self.uid_index.opportunistic_sync()
|
self.uid_index.opportunistic_sync()
|
||||||
|
@ -350,13 +355,13 @@ impl MailboxInternal {
|
||||||
futures::try_join!(
|
futures::try_join!(
|
||||||
async {
|
async {
|
||||||
// Delete mail body from S3
|
// Delete mail body from S3
|
||||||
self.s3.blob(&format!("{}/{}", self.mail_path, ident)).rm().await?;
|
self.storage.blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))).await?;
|
||||||
Ok::<_, anyhow::Error>(())
|
Ok::<_, anyhow::Error>(())
|
||||||
},
|
},
|
||||||
async {
|
async {
|
||||||
// Delete mail meta from K2V
|
// Delete mail meta from K2V
|
||||||
let sk = ident.to_string();
|
let sk = ident.to_string();
|
||||||
self.k2v.row(&self.mail_path, &sk).fetch().await?.to_ref().rm().await?;
|
self.storage.row_rm(&Selector::Single(&RowRef::new(&self.mail_path, &sk))).await?;
|
||||||
Ok::<_, anyhow::Error>(())
|
Ok::<_, anyhow::Error>(())
|
||||||
}
|
}
|
||||||
)?;
|
)?;
|
||||||
|
@ -402,15 +407,19 @@ impl MailboxInternal {
|
||||||
|
|
||||||
futures::try_join!(
|
futures::try_join!(
|
||||||
async {
|
async {
|
||||||
let dst = self.s3.blob(&format!("{}/{}", self.mail_path, new_id));
|
let dst = BlobRef(format!("{}/{}", self.mail_path, new_id));
|
||||||
self.s3.blob(&format!("{}/{}", from.mail_path, source_id)).copy(&dst).await?;
|
let src = BlobRef(format!("{}/{}", from.mail_path, source_id));
|
||||||
|
self.storage.blob_copy(&src, &dst).await?;
|
||||||
Ok::<_, anyhow::Error>(())
|
Ok::<_, anyhow::Error>(())
|
||||||
},
|
},
|
||||||
async {
|
async {
|
||||||
// Copy mail meta in K2V
|
// Copy mail meta in K2V
|
||||||
let meta = &from.fetch_meta(&[source_id]).await?[0];
|
let meta = &from.fetch_meta(&[source_id]).await?[0];
|
||||||
let meta_blob = seal_serialize(meta, &self.encryption_key)?;
|
let meta_blob = seal_serialize(meta, &self.encryption_key)?;
|
||||||
self.k2v.row(&self.mail_path, &new_id.to_string()).set_value(&meta_blob).push().await?;
|
self.storage.row_insert(vec![RowVal::new(
|
||||||
|
RowRef::new(&self.mail_path, &new_id.to_string()),
|
||||||
|
meta_blob,
|
||||||
|
)]).await?;
|
||||||
Ok::<_, anyhow::Error>(())
|
Ok::<_, anyhow::Error>(())
|
||||||
},
|
},
|
||||||
self.uid_index.opportunistic_sync(),
|
self.uid_index.opportunistic_sync(),
|
||||||
|
|
|
@ -33,7 +33,7 @@ const MAILBOX_LIST_SK: &str = "list";
|
||||||
pub struct User {
|
pub struct User {
|
||||||
pub username: String,
|
pub username: String,
|
||||||
pub creds: Credentials,
|
pub creds: Credentials,
|
||||||
pub k2v: storage::RowStore,
|
pub storage: storage::Store,
|
||||||
pub mailboxes: std::sync::Mutex<HashMap<UniqueIdent, Weak<Mailbox>>>,
|
pub mailboxes: std::sync::Mutex<HashMap<UniqueIdent, Weak<Mailbox>>>,
|
||||||
|
|
||||||
tx_inbox_id: watch::Sender<Option<(UniqueIdent, ImapUidvalidity)>>,
|
tx_inbox_id: watch::Sender<Option<(UniqueIdent, ImapUidvalidity)>>,
|
||||||
|
@ -41,7 +41,7 @@ pub struct User {
|
||||||
|
|
||||||
impl User {
|
impl User {
|
||||||
pub async fn new(username: String, creds: Credentials) -> Result<Arc<Self>> {
|
pub async fn new(username: String, creds: Credentials) -> Result<Arc<Self>> {
|
||||||
let cache_key = (username.clone(), creds.storage.clone());
|
let cache_key = (username.clone(), creds.storage.unique());
|
||||||
|
|
||||||
{
|
{
|
||||||
let cache = USER_CACHE.lock().unwrap();
|
let cache = USER_CACHE.lock().unwrap();
|
||||||
|
@ -81,11 +81,7 @@ impl User {
|
||||||
let mb_uidvalidity = mb.current_uid_index().await.uidvalidity;
|
let mb_uidvalidity = mb.current_uid_index().await.uidvalidity;
|
||||||
if mb_uidvalidity > uidvalidity {
|
if mb_uidvalidity > uidvalidity {
|
||||||
list.update_uidvalidity(name, mb_uidvalidity);
|
list.update_uidvalidity(name, mb_uidvalidity);
|
||||||
let orphan = match ct {
|
self.save_mailbox_list(&list, ct).await?;
|
||||||
Some(x) => Some(x.to_orphan()),
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
self.save_mailbox_list(&list, orphan).await?;
|
|
||||||
}
|
}
|
||||||
Ok(Some(mb))
|
Ok(Some(mb))
|
||||||
} else {
|
} else {
|
||||||
|
@ -108,11 +104,7 @@ impl User {
|
||||||
let (mut list, ct) = self.load_mailbox_list().await?;
|
let (mut list, ct) = self.load_mailbox_list().await?;
|
||||||
match list.create_mailbox(name) {
|
match list.create_mailbox(name) {
|
||||||
CreatedMailbox::Created(_, _) => {
|
CreatedMailbox::Created(_, _) => {
|
||||||
let orphan = match ct {
|
self.save_mailbox_list(&list, ct).await?;
|
||||||
Some(x) => Some(x.to_orphan()),
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
self.save_mailbox_list(&list, orphan).await?;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
CreatedMailbox::Existed(_, _) => Err(anyhow!("Mailbox {} already exists", name)),
|
CreatedMailbox::Existed(_, _) => Err(anyhow!("Mailbox {} already exists", name)),
|
||||||
|
@ -129,11 +121,7 @@ impl User {
|
||||||
if list.has_mailbox(name) {
|
if list.has_mailbox(name) {
|
||||||
// TODO: actually delete mailbox contents
|
// TODO: actually delete mailbox contents
|
||||||
list.set_mailbox(name, None);
|
list.set_mailbox(name, None);
|
||||||
let orphan = match ct {
|
self.save_mailbox_list(&list, ct).await?;
|
||||||
Some(x) => Some(x.to_orphan()),
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
self.save_mailbox_list(&list, orphan).await?;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
bail!("Mailbox {} does not exist", name);
|
bail!("Mailbox {} does not exist", name);
|
||||||
|
@ -154,11 +142,7 @@ impl User {
|
||||||
if old_name == INBOX {
|
if old_name == INBOX {
|
||||||
list.rename_mailbox(old_name, new_name)?;
|
list.rename_mailbox(old_name, new_name)?;
|
||||||
if !self.ensure_inbox_exists(&mut list, &ct).await? {
|
if !self.ensure_inbox_exists(&mut list, &ct).await? {
|
||||||
let orphan = match ct {
|
self.save_mailbox_list(&list, ct).await?;
|
||||||
Some(x) => Some(x.to_orphan()),
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
self.save_mailbox_list(&list, orphan).await?;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let names = list.existing_mailbox_names();
|
let names = list.existing_mailbox_names();
|
||||||
|
@ -182,11 +166,7 @@ impl User {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let orphan = match ct {
|
self.save_mailbox_list(&list, ct).await?;
|
||||||
Some(x) => Some(x.to_orphan()),
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
self.save_mailbox_list(&list, orphan).await?;
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -194,14 +174,14 @@ impl User {
|
||||||
// ---- Internal user & mailbox management ----
|
// ---- Internal user & mailbox management ----
|
||||||
|
|
||||||
async fn open(username: String, creds: Credentials) -> Result<Arc<Self>> {
|
async fn open(username: String, creds: Credentials) -> Result<Arc<Self>> {
|
||||||
let k2v = creds.row_client()?;
|
let storage = creds.storage.build()?;
|
||||||
|
|
||||||
let (tx_inbox_id, rx_inbox_id) = watch::channel(None);
|
let (tx_inbox_id, rx_inbox_id) = watch::channel(None);
|
||||||
|
|
||||||
let user = Arc::new(Self {
|
let user = Arc::new(Self {
|
||||||
username,
|
username,
|
||||||
creds: creds.clone(),
|
creds: creds.clone(),
|
||||||
k2v,
|
storage,
|
||||||
tx_inbox_id,
|
tx_inbox_id,
|
||||||
mailboxes: std::sync::Mutex::new(HashMap::new()),
|
mailboxes: std::sync::Mutex::new(HashMap::new()),
|
||||||
});
|
});
|
||||||
|
@ -245,19 +225,25 @@ impl User {
|
||||||
// ---- Mailbox list management ----
|
// ---- Mailbox list management ----
|
||||||
|
|
||||||
async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<storage::RowRef>)> {
|
async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<storage::RowRef>)> {
|
||||||
let (mut list, row) = match self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK).fetch().await {
|
let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK);
|
||||||
|
let (mut list, row) = match self.storage.row_fetch(&storage::Selector::Single(&row_ref)).await {
|
||||||
Err(storage::StorageError::NotFound) => (MailboxList::new(), None),
|
Err(storage::StorageError::NotFound) => (MailboxList::new(), None),
|
||||||
Err(e) => return Err(e.into()),
|
Err(e) => return Err(e.into()),
|
||||||
Ok(rv) => {
|
Ok(rv) => {
|
||||||
let mut list = MailboxList::new();
|
let mut list = MailboxList::new();
|
||||||
for v in rv.content() {
|
let (row_ref, row_vals) = match rv.into_iter().next() {
|
||||||
|
Some(row_val) => (row_val.row_ref, row_val.value),
|
||||||
|
None => (row_ref, vec![]),
|
||||||
|
};
|
||||||
|
|
||||||
|
for v in row_vals {
|
||||||
if let storage::Alternative::Value(vbytes) = v {
|
if let storage::Alternative::Value(vbytes) = v {
|
||||||
let list2 =
|
let list2 =
|
||||||
open_deserialize::<MailboxList>(&vbytes, &self.creds.keys.master)?;
|
open_deserialize::<MailboxList>(&vbytes, &self.creds.keys.master)?;
|
||||||
list.merge(list2);
|
list.merge(list2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(list, Some(rv.to_ref()))
|
(list, Some(row_ref))
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -278,11 +264,7 @@ impl User {
|
||||||
let saved;
|
let saved;
|
||||||
let (inbox_id, inbox_uidvalidity) = match list.create_mailbox(INBOX) {
|
let (inbox_id, inbox_uidvalidity) = match list.create_mailbox(INBOX) {
|
||||||
CreatedMailbox::Created(i, v) => {
|
CreatedMailbox::Created(i, v) => {
|
||||||
let orphan = match ct {
|
self.save_mailbox_list(list, ct.clone()).await?;
|
||||||
Some(x) => Some(x.to_orphan()),
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
self.save_mailbox_list(list, orphan).await?;
|
|
||||||
saved = true;
|
saved = true;
|
||||||
(i, v)
|
(i, v)
|
||||||
}
|
}
|
||||||
|
@ -302,14 +284,12 @@ impl User {
|
||||||
async fn save_mailbox_list(
|
async fn save_mailbox_list(
|
||||||
&self,
|
&self,
|
||||||
list: &MailboxList,
|
list: &MailboxList,
|
||||||
ct: Option<storage::OrphanRowRef>,
|
ct: Option<storage::RowRef>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let list_blob = seal_serialize(list, &self.creds.keys.master)?;
|
let list_blob = seal_serialize(list, &self.creds.keys.master)?;
|
||||||
let rref = match ct {
|
let rref = ct.unwrap_or(storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK));
|
||||||
Some(x) => self.k2v.from_orphan(x).expect("Source & target must be same storage"),
|
let row_val = storage::RowVal::new(rref, list_blob);
|
||||||
None => self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK),
|
self.storage.row_insert(vec![row_val]).await?;
|
||||||
};
|
|
||||||
rref.set_value(&list_blob).push().await?;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -482,6 +462,6 @@ enum CreatedMailbox {
|
||||||
// ---- User cache ----
|
// ---- User cache ----
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref USER_CACHE: std::sync::Mutex<HashMap<(String, storage::Builders), Weak<User>>> =
|
static ref USER_CACHE: std::sync::Mutex<HashMap<(String, storage::UnicityBuffer), Weak<User>>> =
|
||||||
std::sync::Mutex::new(HashMap::new());
|
std::sync::Mutex::new(HashMap::new());
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
use crate::storage::*;
|
use crate::storage::*;
|
||||||
|
use serde::Serialize;
|
||||||
|
|
||||||
#[derive(Clone, Debug, Hash)]
|
#[derive(Clone, Debug, Serialize)]
|
||||||
pub struct GarageBuilder {
|
pub struct GarageConf {
|
||||||
pub region: String,
|
pub region: String,
|
||||||
pub s3_endpoint: String,
|
pub s3_endpoint: String,
|
||||||
pub k2v_endpoint: String,
|
pub k2v_endpoint: String,
|
||||||
|
@ -10,10 +11,28 @@ pub struct GarageBuilder {
|
||||||
pub bucket: String,
|
pub bucket: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct GarageBuilder {
|
||||||
|
conf: GarageConf,
|
||||||
|
unicity: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GarageBuilder {
|
||||||
|
pub fn new(conf: GarageConf) -> anyhow::Result<Arc<Self>> {
|
||||||
|
let mut unicity: Vec<u8> = vec![];
|
||||||
|
unicity.extend_from_slice(file!().as_bytes());
|
||||||
|
unicity.append(&mut rmp_serde::to_vec(&conf)?);
|
||||||
|
Ok(Arc::new(Self { conf, unicity }))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl IBuilder for GarageBuilder {
|
impl IBuilder for GarageBuilder {
|
||||||
fn build(&self) -> Box<dyn IStore> {
|
fn build(&self) -> Result<Store, StorageError> {
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
}
|
}
|
||||||
|
fn unique(&self) -> UnicityBuffer {
|
||||||
|
UnicityBuffer(self.unicity.clone())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct GarageStore {
|
pub struct GarageStore {
|
||||||
|
@ -33,7 +52,7 @@ impl IStore for GarageStore {
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
|
|
||||||
}
|
}
|
||||||
async fn row_poll(&self, value: RowRef) -> Result<RowVal, StorageError> {
|
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,6 +60,9 @@ impl IStore for GarageStore {
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
async fn blob_insert(&self, blob_val: &BlobVal) -> Result<BlobVal, StorageError> {
|
||||||
|
unimplemented!();
|
||||||
|
}
|
||||||
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<BlobVal, StorageError> {
|
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<BlobVal, StorageError> {
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
|
|
||||||
|
|
|
@ -14,17 +14,35 @@ pub type ArcBlob = Arc<RwLock<HashMap<String, Vec<u8>>>>;
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct MemBuilder {
|
pub struct MemBuilder {
|
||||||
user: String,
|
user: String,
|
||||||
url: String,
|
unicity: Vec<u8>,
|
||||||
row: ArcRow,
|
row: ArcRow,
|
||||||
blob: ArcBlob,
|
blob: ArcBlob,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl MemBuilder {
|
||||||
|
pub fn new(user: &str) -> Arc<Self> {
|
||||||
|
let mut unicity: Vec<u8> = vec![];
|
||||||
|
unicity.extend_from_slice(file!().as_bytes());
|
||||||
|
unicity.extend_from_slice(user.as_bytes());
|
||||||
|
Arc::new(Self {
|
||||||
|
user: user.to_string(),
|
||||||
|
unicity,
|
||||||
|
row: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
blob: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl IBuilder for MemBuilder {
|
impl IBuilder for MemBuilder {
|
||||||
fn build(&self) -> Box<dyn IStore> {
|
fn build(&self) -> Result<Store, StorageError> {
|
||||||
Box::new(MemStore {
|
Ok(Box::new(MemStore {
|
||||||
row: self.row.clone(),
|
row: self.row.clone(),
|
||||||
blob: self.blob.clone(),
|
blob: self.blob.clone(),
|
||||||
})
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn unique(&self) -> UnicityBuffer {
|
||||||
|
UnicityBuffer(self.unicity.clone())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,7 +74,7 @@ impl IStore for MemStore {
|
||||||
.or(Err(StorageError::Internal))?
|
.or(Err(StorageError::Internal))?
|
||||||
.get(*shard)
|
.get(*shard)
|
||||||
.ok_or(StorageError::NotFound)?
|
.ok_or(StorageError::NotFound)?
|
||||||
.range((Included(sort_begin.to_string()), Included(sort_end.to_string())))
|
.range((Included(sort_begin.to_string()), Excluded(sort_end.to_string())))
|
||||||
.map(|(k, v)| RowVal {
|
.map(|(k, v)| RowVal {
|
||||||
row_ref: RowRef { uid: RowUid { shard: shard.to_string(), sort: k.to_string() }, causality: Some("c".to_string()) },
|
row_ref: RowRef { uid: RowUid { shard: shard.to_string(), sort: k.to_string() }, causality: Some("c".to_string()) },
|
||||||
value: vec![Alternative::Value(v.clone())],
|
value: vec![Alternative::Value(v.clone())],
|
||||||
|
@ -100,7 +118,7 @@ impl IStore for MemStore {
|
||||||
},
|
},
|
||||||
Selector::Single(row_ref) => {
|
Selector::Single(row_ref) => {
|
||||||
let bytes = self.inner_fetch(row_ref)?;
|
let bytes = self.inner_fetch(row_ref)?;
|
||||||
Ok(vec![RowVal{ row_ref: row_ref.clone(), value: vec![Alternative::Value(bytes)]}])
|
Ok(vec![RowVal{ row_ref: (*row_ref).clone(), value: vec![Alternative::Value(bytes)]}])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -113,7 +131,7 @@ impl IStore for MemStore {
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
|
|
||||||
}
|
}
|
||||||
async fn row_poll(&self, value: RowRef) -> Result<RowVal, StorageError> {
|
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,6 +139,9 @@ impl IStore for MemStore {
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
async fn blob_insert(&self, blob_val: &BlobVal) -> Result<BlobVal, StorageError> {
|
||||||
|
unimplemented!();
|
||||||
|
}
|
||||||
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<BlobVal, StorageError> {
|
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<BlobVal, StorageError> {
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
|
|
||||||
|
|
|
@ -11,9 +11,9 @@
|
||||||
pub mod in_memory;
|
pub mod in_memory;
|
||||||
pub mod garage;
|
pub mod garage;
|
||||||
|
|
||||||
use std::hash::{Hash, Hasher};
|
use std::sync::Arc;
|
||||||
|
use std::hash::Hash;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use futures::future::BoxFuture;
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
|
@ -23,45 +23,95 @@ pub enum Alternative {
|
||||||
}
|
}
|
||||||
type ConcurrentValues = Vec<Alternative>;
|
type ConcurrentValues = Vec<Alternative>;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum StorageError {
|
pub enum StorageError {
|
||||||
NotFound,
|
NotFound,
|
||||||
Internal,
|
Internal,
|
||||||
}
|
}
|
||||||
|
impl std::fmt::Display for StorageError {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
f.write_str("Storage Error: ")?;
|
||||||
|
match self {
|
||||||
|
Self::NotFound => f.write_str("Item not found"),
|
||||||
|
Self::Internal => f.write_str("An internal error occured"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl std::error::Error for StorageError {}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone, PartialEq)]
|
||||||
pub struct RowUid {
|
pub struct RowUid {
|
||||||
shard: String,
|
pub shard: String,
|
||||||
sort: String,
|
pub sort: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone, PartialEq)]
|
||||||
pub struct RowRef {
|
pub struct RowRef {
|
||||||
uid: RowUid,
|
pub uid: RowUid,
|
||||||
causality: Option<String>,
|
pub causality: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RowRef {
|
||||||
|
pub fn new(shard: &str, sort: &str) -> Self {
|
||||||
|
Self {
|
||||||
|
uid: RowUid {
|
||||||
|
shard: shard.to_string(),
|
||||||
|
sort: sort.to_string(),
|
||||||
|
},
|
||||||
|
causality: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RowVal {
|
pub struct RowVal {
|
||||||
row_ref: RowRef,
|
pub row_ref: RowRef,
|
||||||
value: ConcurrentValues,
|
pub value: ConcurrentValues,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RowVal {
|
||||||
|
pub fn new(row_ref: RowRef, value: Vec<u8>) -> Self {
|
||||||
|
Self {
|
||||||
|
row_ref,
|
||||||
|
value: vec![Alternative::Value(value)],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct BlobRef(pub String);
|
||||||
|
impl BlobRef {
|
||||||
|
pub fn new(key: &str) -> Self {
|
||||||
|
Self(key.to_string())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct BlobRef(String);
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct BlobVal {
|
pub struct BlobVal {
|
||||||
blob_ref: BlobRef,
|
pub blob_ref: BlobRef,
|
||||||
meta: HashMap<String, String>,
|
pub meta: HashMap<String, String>,
|
||||||
value: Vec<u8>,
|
pub value: Vec<u8>,
|
||||||
|
}
|
||||||
|
impl BlobVal {
|
||||||
|
pub fn new(blob_ref: BlobRef, value: Vec<u8>) -> Self {
|
||||||
|
Self {
|
||||||
|
blob_ref, value,
|
||||||
|
meta: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_meta(mut self, k: String, v: String) -> Self {
|
||||||
|
self.meta.insert(k, v);
|
||||||
|
self
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum Selector<'a> {
|
pub enum Selector<'a> {
|
||||||
Range { shard: &'a str, sort_begin: &'a str, sort_end: &'a str },
|
Range { shard: &'a str, sort_begin: &'a str, sort_end: &'a str },
|
||||||
List (Vec<RowRef>), // list of (shard_key, sort_key)
|
List (Vec<RowRef>), // list of (shard_key, sort_key)
|
||||||
Prefix { shard: &'a str, sort_prefix: &'a str },
|
Prefix { shard: &'a str, sort_prefix: &'a str },
|
||||||
Single(RowRef),
|
Single(&'a RowRef),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
@ -69,131 +119,24 @@ pub trait IStore {
|
||||||
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError>;
|
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError>;
|
||||||
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError>;
|
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError>;
|
||||||
async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError>;
|
async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError>;
|
||||||
async fn row_poll(&self, value: RowRef) -> Result<RowVal, StorageError>;
|
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError>;
|
||||||
|
|
||||||
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError>;
|
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError>;
|
||||||
|
async fn blob_insert(&self, blob_val: &BlobVal) -> Result<BlobVal, StorageError>;
|
||||||
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<BlobVal, StorageError>;
|
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<BlobVal, StorageError>;
|
||||||
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError>;
|
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError>;
|
||||||
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>;
|
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait IBuilder {
|
#[derive(Clone,Debug,PartialEq,Eq,Hash)]
|
||||||
fn build(&self) -> Box<dyn IStore>;
|
pub struct UnicityBuffer(Vec<u8>);
|
||||||
|
|
||||||
|
pub trait IBuilder: std::fmt::Debug {
|
||||||
|
fn build(&self) -> Result<Store, StorageError>;
|
||||||
|
|
||||||
|
/// Returns an opaque buffer that uniquely identifies this builder
|
||||||
|
fn unique(&self) -> UnicityBuffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub type Builder = Arc<dyn IBuilder + Send + Sync>;
|
||||||
|
pub type Store = Box<dyn IStore + Send + Sync>;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
|
||||||
pub enum OrphanRowRef {
|
|
||||||
Garage(garage::GrgOrphanRowRef),
|
|
||||||
Memory(in_memory::MemOrphanRowRef),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
impl std::fmt::Display for StorageError {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
f.write_str("Storage Error: ")?;
|
|
||||||
match self {
|
|
||||||
Self::NotFound => f.write_str("Item not found"),
|
|
||||||
Self::Internal => f.write_str("An internal error occured"),
|
|
||||||
Self::IncompatibleOrphan => f.write_str("Incompatible orphan"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl std::error::Error for StorageError {}
|
|
||||||
|
|
||||||
// Utils
|
|
||||||
pub type AsyncResult<'a, T> = BoxFuture<'a, Result<T, StorageError>>;
|
|
||||||
|
|
||||||
// ----- Builders
|
|
||||||
pub trait IBuilders {
|
|
||||||
fn box_clone(&self) -> Builders;
|
|
||||||
fn row_store(&self) -> Result<RowStore, StorageError>;
|
|
||||||
fn blob_store(&self) -> Result<BlobStore, StorageError>;
|
|
||||||
fn url(&self) -> &str;
|
|
||||||
}
|
|
||||||
pub type Builders = Box<dyn IBuilders + Send + Sync>;
|
|
||||||
impl Clone for Builders {
|
|
||||||
fn clone(&self) -> Self {
|
|
||||||
self.box_clone()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl std::fmt::Debug for Builders {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
f.write_str("aerogramme::storage::Builder")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl PartialEq for Builders {
|
|
||||||
fn eq(&self, other: &Self) -> bool {
|
|
||||||
self.url() == other.url()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl Eq for Builders {}
|
|
||||||
impl Hash for Builders {
|
|
||||||
fn hash<H: Hasher>(&self, state: &mut H) {
|
|
||||||
self.url().hash(state);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ------ Row
|
|
||||||
pub trait IRowStore
|
|
||||||
{
|
|
||||||
fn row(&self, partition: &str, sort: &str) -> RowRef;
|
|
||||||
fn select(&self, selector: Selector) -> AsyncResult<Vec<RowValue>>;
|
|
||||||
fn rm(&self, selector: Selector) -> AsyncResult<()>;
|
|
||||||
fn from_orphan(&self, orphan: OrphanRowRef) -> Result<RowRef, StorageError>;
|
|
||||||
}
|
|
||||||
pub type RowStore = Box<dyn IRowStore + Sync + Send>;
|
|
||||||
|
|
||||||
pub trait IRowRef: std::fmt::Debug
|
|
||||||
{
|
|
||||||
fn to_orphan(&self) -> OrphanRowRef;
|
|
||||||
fn key(&self) -> (&str, &str);
|
|
||||||
fn set_value(&self, content: &[u8]) -> RowValue;
|
|
||||||
fn fetch(&self) -> AsyncResult<RowValue>;
|
|
||||||
fn rm(&self) -> AsyncResult<()>;
|
|
||||||
fn poll(&self) -> AsyncResult<RowValue>;
|
|
||||||
}
|
|
||||||
pub type RowRef<'a> = Box<dyn IRowRef + Send + Sync + 'a>;
|
|
||||||
|
|
||||||
pub trait IRowValue: std::fmt::Debug
|
|
||||||
{
|
|
||||||
fn to_ref(&self) -> RowRef;
|
|
||||||
fn content(&self) -> ConcurrentValues;
|
|
||||||
fn push(&self) -> AsyncResult<()>;
|
|
||||||
}
|
|
||||||
pub type RowValue = Box<dyn IRowValue + Send + Sync>;
|
|
||||||
|
|
||||||
// ------- Blob
|
|
||||||
pub trait IBlobStore
|
|
||||||
{
|
|
||||||
fn blob(&self, key: &str) -> BlobRef;
|
|
||||||
fn list(&self, prefix: &str) -> AsyncResult<Vec<BlobRef>>;
|
|
||||||
}
|
|
||||||
pub type BlobStore = Box<dyn IBlobStore + Send + Sync>;
|
|
||||||
|
|
||||||
pub trait IBlobRef
|
|
||||||
{
|
|
||||||
fn set_value(&self, content: Vec<u8>) -> BlobValue;
|
|
||||||
fn key(&self) -> &str;
|
|
||||||
fn fetch(&self) -> AsyncResult<BlobValue>;
|
|
||||||
fn copy(&self, dst: &BlobRef) -> AsyncResult<()>;
|
|
||||||
fn rm(&self) -> AsyncResult<()>;
|
|
||||||
}
|
|
||||||
pub type BlobRef = Box<dyn IBlobRef + Send + Sync>;
|
|
||||||
|
|
||||||
pub trait IBlobValue {
|
|
||||||
fn to_ref(&self) -> BlobRef;
|
|
||||||
fn get_meta(&self, key: &str) -> Option<&[u8]>;
|
|
||||||
fn set_meta(&mut self, key: &str, val: &str);
|
|
||||||
fn content(&self) -> Option<&[u8]>;
|
|
||||||
fn push(&self) -> AsyncResult<()>;
|
|
||||||
}
|
|
||||||
pub type BlobValue = Box<dyn IBlobValue + Send + Sync>;
|
|
||||||
*/
|
|
||||||
|
|
Loading…
Reference in a new issue