s3 is now implemented
This commit is contained in:
parent
1057661da7
commit
0f7764d9f0
6 changed files with 83 additions and 10 deletions
|
@ -352,7 +352,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
storage::BlobRef(format!("{}/checkpoint/{}", self.path, ts_cp.to_string())),
|
storage::BlobRef(format!("{}/checkpoint/{}", self.path, ts_cp.to_string())),
|
||||||
cryptoblob.into(),
|
cryptoblob.into(),
|
||||||
);
|
);
|
||||||
self.storage.blob_insert(&blob_val).await?;
|
self.storage.blob_insert(blob_val).await?;
|
||||||
|
|
||||||
// Drop old checkpoints (but keep at least CHECKPOINTS_TO_KEEP of them)
|
// Drop old checkpoints (but keep at least CHECKPOINTS_TO_KEEP of them)
|
||||||
let ecp_len = existing_checkpoints.len();
|
let ecp_len = existing_checkpoints.len();
|
||||||
|
|
|
@ -429,7 +429,7 @@ impl EncryptedMessage {
|
||||||
storage::BlobRef(format!("incoming/{}", gen_ident())),
|
storage::BlobRef(format!("incoming/{}", gen_ident())),
|
||||||
self.encrypted_body.clone().into(),
|
self.encrypted_body.clone().into(),
|
||||||
).with_meta(MESSAGE_KEY.to_string(), key_header);
|
).with_meta(MESSAGE_KEY.to_string(), key_header);
|
||||||
storage.blob_insert(&blob_val).await?;
|
storage.blob_insert(blob_val).await?;
|
||||||
|
|
||||||
// Update watch key to signal new mail
|
// Update watch key to signal new mail
|
||||||
let watch_val = storage::RowVal::new(
|
let watch_val = storage::RowVal::new(
|
||||||
|
|
|
@ -266,7 +266,7 @@ impl MailboxInternal {
|
||||||
async {
|
async {
|
||||||
// Encrypt and save mail body
|
// Encrypt and save mail body
|
||||||
let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
|
let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
|
||||||
self.storage.blob_insert(&BlobVal::new(
|
self.storage.blob_insert(BlobVal::new(
|
||||||
BlobRef(format!("{}/{}", self.mail_path, ident)),
|
BlobRef(format!("{}/{}", self.mail_path, ident)),
|
||||||
message_blob,
|
message_blob,
|
||||||
)).await?;
|
)).await?;
|
||||||
|
|
|
@ -117,20 +117,93 @@ impl IStore for GarageStore {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
tracing::debug!("Fetched {}/{}", self.s3_bucket, blob_ref.0);
|
||||||
Ok(BlobVal::new(blob_ref.clone(), buffer))
|
Ok(BlobVal::new(blob_ref.clone(), buffer))
|
||||||
}
|
}
|
||||||
async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError> {
|
async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> {
|
||||||
unimplemented!();
|
let streamable_value = s3::primitives::ByteStream::from(blob_val.value);
|
||||||
|
|
||||||
|
let maybe_send = self.s3
|
||||||
|
.put_object()
|
||||||
|
.bucket(self.s3_bucket.to_string())
|
||||||
|
.key(blob_val.blob_ref.0.to_string())
|
||||||
|
.body(streamable_value)
|
||||||
|
.send()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match maybe_send {
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("unable to send object: {}", e);
|
||||||
|
Err(StorageError::Internal)
|
||||||
|
}
|
||||||
|
Ok(_) => {
|
||||||
|
tracing::debug!("Inserted {}/{}", self.s3_bucket, blob_val.blob_ref.0);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> {
|
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> {
|
||||||
unimplemented!();
|
let maybe_copy = self.s3
|
||||||
|
.copy_object()
|
||||||
|
.bucket(self.s3_bucket.to_string())
|
||||||
|
.key(dst.0.clone())
|
||||||
|
.copy_source(format!("/{}/{}", self.s3_bucket.to_string(), src.0.clone()))
|
||||||
|
.send()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match maybe_copy {
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("unable to copy object {} to {} (bucket: {}), error: {}", src.0, dst.0, self.s3_bucket, e);
|
||||||
|
Err(StorageError::Internal)
|
||||||
|
},
|
||||||
|
Ok(_) => {
|
||||||
|
tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.s3_bucket);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
|
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
|
||||||
unimplemented!();
|
let maybe_list = self.s3
|
||||||
|
.list_objects_v2()
|
||||||
|
.bucket(self.s3_bucket.to_string())
|
||||||
|
.prefix(prefix)
|
||||||
|
.into_paginator()
|
||||||
|
.send()
|
||||||
|
.try_collect()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match maybe_list {
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("listing prefix {} on bucket {} failed: {}", prefix, self.s3_bucket, e);
|
||||||
|
Err(StorageError::Internal)
|
||||||
|
}
|
||||||
|
Ok(pagin_list_out) => Ok(pagin_list_out
|
||||||
|
.into_iter()
|
||||||
|
.map(|list_out| list_out.contents.unwrap_or(vec![]))
|
||||||
|
.flatten()
|
||||||
|
.map(|obj| BlobRef(obj.key.unwrap_or(String::new())))
|
||||||
|
.collect::<Vec<_>>()),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
|
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
|
||||||
unimplemented!();
|
let maybe_delete = self.s3
|
||||||
|
.delete_object()
|
||||||
|
.bucket(self.s3_bucket.to_string())
|
||||||
|
.key(blob_ref.0.clone())
|
||||||
|
.send()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match maybe_delete {
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("unable to delete {} (bucket: {}), error {}", blob_ref.0, self.s3_bucket, e);
|
||||||
|
Err(StorageError::Internal)
|
||||||
|
},
|
||||||
|
Ok(_) => {
|
||||||
|
tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.s3_bucket);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -278,7 +278,7 @@ impl IStore for MemStore {
|
||||||
let store = self.blob.read().or(Err(StorageError::Internal))?;
|
let store = self.blob.read().or(Err(StorageError::Internal))?;
|
||||||
store.get(&blob_ref.0).ok_or(StorageError::NotFound).map(|v| v.to_blob_val(blob_ref))
|
store.get(&blob_ref.0).ok_or(StorageError::NotFound).map(|v| v.to_blob_val(blob_ref))
|
||||||
}
|
}
|
||||||
async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError> {
|
async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> {
|
||||||
tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert");
|
tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert");
|
||||||
let mut store = self.blob.write().or(Err(StorageError::Internal))?;
|
let mut store = self.blob.write().or(Err(StorageError::Internal))?;
|
||||||
let entry = store.entry(blob_val.blob_ref.0.clone()).or_default();
|
let entry = store.entry(blob_val.blob_ref.0.clone()).or_default();
|
||||||
|
|
|
@ -148,7 +148,7 @@ pub trait IStore {
|
||||||
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError>;
|
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError>;
|
||||||
|
|
||||||
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError>;
|
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError>;
|
||||||
async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError>;
|
async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError>;
|
||||||
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError>;
|
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError>;
|
||||||
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError>;
|
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError>;
|
||||||
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>;
|
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>;
|
||||||
|
|
Loading…
Reference in a new issue