cargo format

This commit is contained in:
Quentin 2023-12-27 14:58:28 +01:00
parent 54c9736a24
commit 7ac24ad913
Signed by: quentin
GPG key ID: E9602264D639FF68
12 changed files with 547 additions and 348 deletions

View file

@ -9,9 +9,8 @@ use tokio::sync::{watch, Notify};
use crate::cryptoblob::*; use crate::cryptoblob::*;
use crate::login::Credentials; use crate::login::Credentials;
use crate::timestamp::*;
use crate::storage; use crate::storage;
use crate::timestamp::*;
const KEEP_STATE_EVERY: usize = 64; const KEEP_STATE_EVERY: usize = 64;
@ -94,7 +93,11 @@ impl<S: BayouState> Bayou<S> {
} else { } else {
debug!("(sync) loading checkpoint: {}", key); debug!("(sync) loading checkpoint: {}", key);
let buf = self.storage.blob_fetch(&storage::BlobRef(key.to_string())).await?.value; let buf = self
.storage
.blob_fetch(&storage::BlobRef(key.to_string()))
.await?
.value;
debug!("(sync) checkpoint body length: {}", buf.len()); debug!("(sync) checkpoint body length: {}", buf.len());
let ck = open_deserialize::<S>(&buf, &self.key)?; let ck = open_deserialize::<S>(&buf, &self.key)?;
@ -125,17 +128,22 @@ impl<S: BayouState> Bayou<S> {
// 3. List all operations starting from checkpoint // 3. List all operations starting from checkpoint
let ts_ser = self.checkpoint.0.to_string(); let ts_ser = self.checkpoint.0.to_string();
debug!("(sync) looking up operations starting at {}", ts_ser); debug!("(sync) looking up operations starting at {}", ts_ser);
let ops_map = self.storage.row_fetch(&storage::Selector::Range { let ops_map = self
shard: &self.path, .storage
sort_begin: &ts_ser, .row_fetch(&storage::Selector::Range {
sort_end: WATCH_SK shard: &self.path,
}).await?; sort_begin: &ts_ser,
sort_end: WATCH_SK,
})
.await?;
let mut ops = vec![]; let mut ops = vec![];
for row_value in ops_map { for row_value in ops_map {
let row = row_value.row_ref; let row = row_value.row_ref;
let sort_key = row.uid.sort; let sort_key = row.uid.sort;
let ts = sort_key.parse::<Timestamp>().map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?; let ts = sort_key
.parse::<Timestamp>()
.map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?;
let val = row_value.value; let val = row_value.value;
if val.len() != 1 { if val.len() != 1 {
@ -211,7 +219,7 @@ impl<S: BayouState> Bayou<S> {
// Save info that sync has been done // Save info that sync has been done
self.last_sync = new_last_sync; self.last_sync = new_last_sync;
self.last_sync_watch_ct = new_last_sync_watch_ct; self.last_sync_watch_ct = new_last_sync_watch_ct;
Ok(()) Ok(())
} }
@ -362,16 +370,20 @@ impl<S: BayouState> Bayou<S> {
// Delete blobs // Delete blobs
for (_ts, key) in existing_checkpoints[..last_to_keep].iter() { for (_ts, key) in existing_checkpoints[..last_to_keep].iter() {
debug!("(cp) drop old checkpoint {}", key); debug!("(cp) drop old checkpoint {}", key);
self.storage.blob_rm(&storage::BlobRef(key.to_string())).await?; self.storage
.blob_rm(&storage::BlobRef(key.to_string()))
.await?;
} }
// Delete corresponding range of operations // Delete corresponding range of operations
let ts_ser = existing_checkpoints[last_to_keep].0.to_string(); let ts_ser = existing_checkpoints[last_to_keep].0.to_string();
self.storage.row_rm(&storage::Selector::Range { self.storage
shard: &self.path, .row_rm(&storage::Selector::Range {
sort_begin: "", shard: &self.path,
sort_end: &ts_ser sort_begin: "",
}).await? sort_end: &ts_ser,
})
.await?
} }
Ok(()) Ok(())
@ -426,11 +438,7 @@ impl K2vWatch {
let watch = Arc::new(K2vWatch { target, rx, notify }); let watch = Arc::new(K2vWatch { target, rx, notify });
tokio::spawn(Self::background_task( tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx));
Arc::downgrade(&watch),
storage,
tx,
));
Ok(watch) Ok(watch)
} }
@ -444,8 +452,8 @@ impl K2vWatch {
Some(this) => this.target.clone(), Some(this) => this.target.clone(),
None => { None => {
error!("can't start loop"); error!("can't start loop");
return return;
}, }
}; };
while let Some(this) = Weak::upgrade(&self_weak) { while let Some(this) = Weak::upgrade(&self_weak) {

View file

@ -30,7 +30,10 @@ enum BucketSource {
enum StorageSpecific { enum StorageSpecific {
InMemory, InMemory,
Garage { from_config: LdapGarageConfig, bucket_source: BucketSource }, Garage {
from_config: LdapGarageConfig,
bucket_source: BucketSource,
},
} }
impl LdapLoginProvider { impl LdapLoginProvider {
@ -57,22 +60,24 @@ impl LdapLoginProvider {
let specific = match config.storage { let specific = match config.storage {
LdapStorage::InMemory => StorageSpecific::InMemory, LdapStorage::InMemory => StorageSpecific::InMemory,
LdapStorage::Garage(grgconf) => { LdapStorage::Garage(grgconf) => {
let bucket_source = match (grgconf.default_bucket.clone(), grgconf.bucket_attr.clone()) { let bucket_source =
(Some(b), None) => BucketSource::Constant(b), match (grgconf.default_bucket.clone(), grgconf.bucket_attr.clone()) {
(None, Some(a)) => BucketSource::Attr(a), (Some(b), None) => BucketSource::Constant(b),
_ => bail!("Must set `bucket` or `bucket_attr`, but not both"), (None, Some(a)) => BucketSource::Attr(a),
}; _ => bail!("Must set `bucket` or `bucket_attr`, but not both"),
};
if let BucketSource::Attr(a) = &bucket_source { if let BucketSource::Attr(a) = &bucket_source {
attrs_to_retrieve.push(a.clone()); attrs_to_retrieve.push(a.clone());
} }
StorageSpecific::Garage { from_config: grgconf, bucket_source } StorageSpecific::Garage {
}, from_config: grgconf,
bucket_source,
}
}
}; };
Ok(Self { Ok(Self {
ldap_server: config.ldap_server, ldap_server: config.ldap_server,
pre_bind_on_login: config.pre_bind_on_login, pre_bind_on_login: config.pre_bind_on_login,
@ -89,27 +94,32 @@ impl LdapLoginProvider {
async fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result<Builder> { async fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result<Builder> {
let storage: Builder = match &self.storage_specific { let storage: Builder = match &self.storage_specific {
StorageSpecific::InMemory => self.in_memory_store.builder( StorageSpecific::InMemory => {
&get_attr(user, &self.username_attr)? self.in_memory_store
).await, .builder(&get_attr(user, &self.username_attr)?)
StorageSpecific::Garage { from_config, bucket_source } => { .await
}
StorageSpecific::Garage {
from_config,
bucket_source,
} => {
let aws_access_key_id = get_attr(user, &from_config.aws_access_key_id_attr)?; let aws_access_key_id = get_attr(user, &from_config.aws_access_key_id_attr)?;
let aws_secret_access_key = get_attr(user, &from_config.aws_secret_access_key_attr)?; let aws_secret_access_key =
get_attr(user, &from_config.aws_secret_access_key_attr)?;
let bucket = match bucket_source { let bucket = match bucket_source {
BucketSource::Constant(b) => b.clone(), BucketSource::Constant(b) => b.clone(),
BucketSource::Attr(a) => get_attr(user, &a)?, BucketSource::Attr(a) => get_attr(user, &a)?,
}; };
storage::garage::GarageBuilder::new(storage::garage::GarageConf { storage::garage::GarageBuilder::new(storage::garage::GarageConf {
region: from_config.aws_region.clone(), region: from_config.aws_region.clone(),
s3_endpoint: from_config.s3_endpoint.clone(), s3_endpoint: from_config.s3_endpoint.clone(),
k2v_endpoint: from_config.k2v_endpoint.clone(), k2v_endpoint: from_config.k2v_endpoint.clone(),
aws_access_key_id, aws_access_key_id,
aws_secret_access_key, aws_secret_access_key,
bucket, bucket,
})? })?
}, }
}; };
Ok(storage) Ok(storage)
@ -172,7 +182,6 @@ impl LoginProvider for LdapLoginProvider {
drop(ldap); drop(ldap);
Ok(Credentials { storage, keys }) Ok(Credentials { storage, keys })
} }
@ -215,7 +224,7 @@ impl LoginProvider for LdapLoginProvider {
let cr = CryptoRoot(crstr); let cr = CryptoRoot(crstr);
let public_key = cr.public_key()?; let public_key = cr.public_key()?;
// storage // storage
let storage = self.storage_creds_from_ldap_user(&user).await?; let storage = self.storage_creds_from_ldap_user(&user).await?;
drop(ldap); drop(ldap);

View file

@ -1,8 +1,8 @@
pub mod ldap_provider; pub mod ldap_provider;
pub mod static_provider; pub mod static_provider;
use std::sync::Arc;
use base64::Engine; use base64::Engine;
use std::sync::Arc;
use anyhow::{anyhow, bail, Context, Result}; use anyhow::{anyhow, bail, Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
@ -45,7 +45,7 @@ pub struct PublicCredentials {
pub public_key: PublicKey, pub public_key: PublicKey,
} }
use serde::{Serialize, Deserialize}; use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CryptoRoot(pub String); pub struct CryptoRoot(pub String);
@ -73,47 +73,59 @@ impl CryptoRoot {
pub fn public_key(&self) -> Result<PublicKey> { pub fn public_key(&self) -> Result<PublicKey> {
match self.0.splitn(4, ':').collect::<Vec<&str>>()[..] { match self.0.splitn(4, ':').collect::<Vec<&str>>()[..] {
[ "aero", "cryptoroot", "pass", b64blob ] => { ["aero", "cryptoroot", "pass", b64blob] => {
let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?;
if blob.len() < 32 { if blob.len() < 32 {
bail!("Decoded data is {} bytes long, expect at least 32 bytes", blob.len()); bail!(
"Decoded data is {} bytes long, expect at least 32 bytes",
blob.len()
);
} }
PublicKey::from_slice(&blob[..32]).context("must be a valid public key") PublicKey::from_slice(&blob[..32]).context("must be a valid public key")
}, }
[ "aero", "cryptoroot", "cleartext", b64blob ] => { ["aero", "cryptoroot", "cleartext", b64blob] => {
let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?;
Ok(CryptoKeys::deserialize(&blob)?.public) Ok(CryptoKeys::deserialize(&blob)?.public)
}, }
[ "aero", "cryptoroot", "incoming", b64blob ] => { ["aero", "cryptoroot", "incoming", b64blob] => {
let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?;
if blob.len() < 32 { if blob.len() < 32 {
bail!("Decoded data is {} bytes long, expect at least 32 bytes", blob.len()); bail!(
"Decoded data is {} bytes long, expect at least 32 bytes",
blob.len()
);
} }
PublicKey::from_slice(&blob[..32]).context("must be a valid public key") PublicKey::from_slice(&blob[..32]).context("must be a valid public key")
}, }
[ "aero", "cryptoroot", "keyring", _ ] => { ["aero", "cryptoroot", "keyring", _] => {
bail!("keyring is not yet implemented!") bail!("keyring is not yet implemented!")
}, }
_ => bail!(format!("passed string '{}' is not a valid cryptoroot", self.0)), _ => bail!(format!(
"passed string '{}' is not a valid cryptoroot",
self.0
)),
} }
} }
pub fn crypto_keys(&self, password: &str) -> Result<CryptoKeys> { pub fn crypto_keys(&self, password: &str) -> Result<CryptoKeys> {
match self.0.splitn(4, ':').collect::<Vec<&str>>()[..] { match self.0.splitn(4, ':').collect::<Vec<&str>>()[..] {
[ "aero", "cryptoroot", "pass", b64blob ] => { ["aero", "cryptoroot", "pass", b64blob] => {
let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?;
CryptoKeys::password_open(password, &blob) CryptoKeys::password_open(password, &blob)
}, }
[ "aero", "cryptoroot", "cleartext", b64blob ] => { ["aero", "cryptoroot", "cleartext", b64blob] => {
let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?;
CryptoKeys::deserialize(&blob) CryptoKeys::deserialize(&blob)
}, }
[ "aero", "cryptoroot", "incoming", _ ] => { ["aero", "cryptoroot", "incoming", _] => {
bail!("incoming cryptoroot does not contain a crypto key!") bail!("incoming cryptoroot does not contain a crypto key!")
}, }
[ "aero", "cryptoroot", "keyring", _ ] =>{ ["aero", "cryptoroot", "keyring", _] => {
bail!("keyring is not yet implemented!") bail!("keyring is not yet implemented!")
}, }
_ => bail!(format!("passed string '{}' is not a valid cryptoroot", self.0)), _ => bail!(format!(
"passed string '{}' is not a valid cryptoroot",
self.0
)),
} }
} }
} }
@ -132,9 +144,6 @@ pub struct CryptoKeys {
// ---- // ----
impl CryptoKeys { impl CryptoKeys {
/// Initialize a new cryptography root /// Initialize a new cryptography root
pub fn init() -> Self { pub fn init() -> Self {
@ -202,7 +211,11 @@ fn derive_password_key(kdf_salt: &[u8], password: &str) -> Result<Key> {
Ok(Key::from_slice(&argon2_kdf(kdf_salt, password.as_bytes(), 32)?).unwrap()) Ok(Key::from_slice(&argon2_kdf(kdf_salt, password.as_bytes(), 32)?).unwrap())
} }
fn try_open_encrypted_keys(kdf_salt: &[u8], password: &str, encrypted_keys: &[u8]) -> Result<Vec<u8>> { fn try_open_encrypted_keys(
kdf_salt: &[u8],
password: &str,
encrypted_keys: &[u8],
) -> Result<Vec<u8>> {
let password_key = derive_password_key(kdf_salt, password)?; let password_key = derive_password_key(kdf_salt, password)?;
open(encrypted_keys, &password_key) open(encrypted_keys, &password_key)
} }
@ -210,7 +223,7 @@ fn try_open_encrypted_keys(kdf_salt: &[u8], password: &str, encrypted_keys: &[u8
// ---- UTIL ---- // ---- UTIL ----
pub fn argon2_kdf(salt: &[u8], password: &[u8], output_len: usize) -> Result<Vec<u8>> { pub fn argon2_kdf(salt: &[u8], password: &[u8], output_len: usize) -> Result<Vec<u8>> {
use argon2::{Algorithm, Argon2, ParamsBuilder, PasswordHasher, Version, password_hash}; use argon2::{password_hash, Algorithm, Argon2, ParamsBuilder, PasswordHasher, Version};
let params = ParamsBuilder::new() let params = ParamsBuilder::new()
.output_len(output_len) .output_len(output_len)
@ -219,7 +232,8 @@ pub fn argon2_kdf(salt: &[u8], password: &[u8], output_len: usize) -> Result<Vec
let argon2 = Argon2::new(Algorithm::default(), Version::default(), params); let argon2 = Argon2::new(Algorithm::default(), Version::default(), params);
let b64_salt = base64::engine::general_purpose::STANDARD_NO_PAD.encode(salt); let b64_salt = base64::engine::general_purpose::STANDARD_NO_PAD.encode(salt);
let valid_salt = password_hash::Salt::from_b64(&b64_salt).map_err(|e| anyhow!("Invalid salt, error {}", e))?; let valid_salt = password_hash::Salt::from_b64(&b64_salt)
.map_err(|e| anyhow!("Invalid salt, error {}", e))?;
let hash = argon2 let hash = argon2
.hash_password(password, valid_salt) .hash_password(password, valid_salt)
.map_err(|e| anyhow!("Unable to hash: {}", e))?; .map_err(|e| anyhow!("Unable to hash: {}", e))?;

View file

@ -1,8 +1,8 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
use std::path::PathBuf; use std::path::PathBuf;
use tokio::sync::watch; use std::sync::Arc;
use tokio::signal::unix::{signal, SignalKind}; use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::watch;
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result};
use async_trait::async_trait; use async_trait::async_trait;
@ -28,7 +28,8 @@ pub struct StaticLoginProvider {
} }
pub async fn update_user_list(config: PathBuf, up: watch::Sender<UserDatabase>) -> Result<()> { pub async fn update_user_list(config: PathBuf, up: watch::Sender<UserDatabase>) -> Result<()> {
let mut stream = signal(SignalKind::user_defined1()).expect("failed to install SIGUSR1 signal hander for reload"); let mut stream = signal(SignalKind::user_defined1())
.expect("failed to install SIGUSR1 signal hander for reload");
loop { loop {
let ulist: UserList = match read_config(config.clone()) { let ulist: UserList = match read_config(config.clone()) {
@ -42,7 +43,12 @@ pub async fn update_user_list(config: PathBuf, up: watch::Sender<UserDatabase>)
let users = ulist let users = ulist
.into_iter() .into_iter()
.map(|(username, config)| (username.clone() , Arc::new(ContextualUserEntry { username, config }))) .map(|(username, config)| {
(
username.clone(),
Arc::new(ContextualUserEntry { username, config }),
)
})
.collect::<HashMap<_, _>>(); .collect::<HashMap<_, _>>();
let mut users_by_email = HashMap::new(); let mut users_by_email = HashMap::new();
@ -51,14 +57,18 @@ pub async fn update_user_list(config: PathBuf, up: watch::Sender<UserDatabase>)
if users_by_email.contains_key(m) { if users_by_email.contains_key(m) {
tracing::warn!("Several users have the same email address: {}", m); tracing::warn!("Several users have the same email address: {}", m);
stream.recv().await; stream.recv().await;
continue continue;
} }
users_by_email.insert(m.clone(), u.clone()); users_by_email.insert(m.clone(), u.clone());
} }
} }
tracing::info!("{} users loaded", users.len()); tracing::info!("{} users loaded", users.len());
up.send(UserDatabase { users, users_by_email }).context("update user db config")?; up.send(UserDatabase {
users,
users_by_email,
})
.context("update user db config")?;
stream.recv().await; stream.recv().await;
tracing::info!("Received SIGUSR1, reloading"); tracing::info!("Received SIGUSR1, reloading");
} }
@ -71,7 +81,10 @@ impl StaticLoginProvider {
tokio::spawn(update_user_list(config.user_list, tx)); tokio::spawn(update_user_list(config.user_list, tx));
rx.changed().await?; rx.changed().await?;
Ok(Self { user_db: rx, in_memory_store: storage::in_memory::MemDb::new() }) Ok(Self {
user_db: rx,
in_memory_store: storage::in_memory::MemDb::new(),
})
} }
} }
@ -95,14 +108,16 @@ impl LoginProvider for StaticLoginProvider {
tracing::debug!(user=%username, "fetch keys"); tracing::debug!(user=%username, "fetch keys");
let storage: storage::Builder = match &user.config.storage { let storage: storage::Builder = match &user.config.storage {
StaticStorage::InMemory => self.in_memory_store.builder(username).await, StaticStorage::InMemory => self.in_memory_store.builder(username).await,
StaticStorage::Garage(grgconf) => storage::garage::GarageBuilder::new(storage::garage::GarageConf { StaticStorage::Garage(grgconf) => {
region: grgconf.aws_region.clone(), storage::garage::GarageBuilder::new(storage::garage::GarageConf {
k2v_endpoint: grgconf.k2v_endpoint.clone(), region: grgconf.aws_region.clone(),
s3_endpoint: grgconf.s3_endpoint.clone(), k2v_endpoint: grgconf.k2v_endpoint.clone(),
aws_access_key_id: grgconf.aws_access_key_id.clone(), s3_endpoint: grgconf.s3_endpoint.clone(),
aws_secret_access_key: grgconf.aws_secret_access_key.clone(), aws_access_key_id: grgconf.aws_access_key_id.clone(),
bucket: grgconf.bucket.clone(), aws_secret_access_key: grgconf.aws_secret_access_key.clone(),
})?, bucket: grgconf.bucket.clone(),
})?
}
}; };
let cr = CryptoRoot(user.config.crypto_root.clone()); let cr = CryptoRoot(user.config.crypto_root.clone());
@ -124,14 +139,16 @@ impl LoginProvider for StaticLoginProvider {
let storage: storage::Builder = match &user.config.storage { let storage: storage::Builder = match &user.config.storage {
StaticStorage::InMemory => self.in_memory_store.builder(&user.username).await, StaticStorage::InMemory => self.in_memory_store.builder(&user.username).await,
StaticStorage::Garage(grgconf) => storage::garage::GarageBuilder::new(storage::garage::GarageConf { StaticStorage::Garage(grgconf) => {
region: grgconf.aws_region.clone(), storage::garage::GarageBuilder::new(storage::garage::GarageConf {
k2v_endpoint: grgconf.k2v_endpoint.clone(), region: grgconf.aws_region.clone(),
s3_endpoint: grgconf.s3_endpoint.clone(), k2v_endpoint: grgconf.k2v_endpoint.clone(),
aws_access_key_id: grgconf.aws_access_key_id.clone(), s3_endpoint: grgconf.s3_endpoint.clone(),
aws_secret_access_key: grgconf.aws_secret_access_key.clone(), aws_access_key_id: grgconf.aws_access_key_id.clone(),
bucket: grgconf.bucket.clone(), aws_secret_access_key: grgconf.aws_secret_access_key.clone(),
})?, bucket: grgconf.bucket.clone(),
})?
}
}; };
let cr = CryptoRoot(user.config.crypto_root.clone()); let cr = CryptoRoot(user.config.crypto_root.clone());

View file

@ -51,7 +51,10 @@ async fn incoming_mail_watch_process_internal(
creds: Credentials, creds: Credentials,
mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>, mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
) -> Result<()> { ) -> Result<()> {
let mut lock_held = k2v_lock_loop(creds.storage.build().await?, storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK)); let mut lock_held = k2v_lock_loop(
creds.storage.build().await?,
storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK),
);
let storage = creds.storage.build().await?; let storage = creds.storage.build().await?;
let mut inbox: Option<Arc<Mailbox>> = None; let mut inbox: Option<Arc<Mailbox>> = None;
@ -63,8 +66,7 @@ async fn incoming_mail_watch_process_internal(
let wait_new_mail = async { let wait_new_mail = async {
loop { loop {
match storage.row_poll(&incoming_key).await match storage.row_poll(&incoming_key).await {
{
Ok(row_val) => break row_val.row_ref, Ok(row_val) => break row_val.row_ref,
Err(e) => { Err(e) => {
error!("Error in wait_new_mail: {}", e); error!("Error in wait_new_mail: {}", e);
@ -360,7 +362,10 @@ async fn k2v_lock_loop_internal(
Some(existing) => existing, Some(existing) => existing,
None => row_ref.clone(), None => row_ref.clone(),
}; };
if let Err(e) = storage.row_insert(vec![storage::RowVal::new(row, lock)]).await { if let Err(e) = storage
.row_insert(vec![storage::RowVal::new(row, lock)])
.await
{
error!("Could not take lock: {}", e); error!("Could not take lock: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await; tokio::time::sleep(Duration::from_secs(30)).await;
} }
@ -428,14 +433,12 @@ impl EncryptedMessage {
let blob_val = storage::BlobVal::new( let blob_val = storage::BlobVal::new(
storage::BlobRef(format!("incoming/{}", gen_ident())), storage::BlobRef(format!("incoming/{}", gen_ident())),
self.encrypted_body.clone().into(), self.encrypted_body.clone().into(),
).with_meta(MESSAGE_KEY.to_string(), key_header); )
.with_meta(MESSAGE_KEY.to_string(), key_header);
storage.blob_insert(blob_val).await?; storage.blob_insert(blob_val).await?;
// Update watch key to signal new mail // Update watch key to signal new mail
let watch_val = storage::RowVal::new( let watch_val = storage::RowVal::new(watch_ct.clone(), gen_ident().0.to_vec());
watch_ct.clone(),
gen_ident().0.to_vec(),
);
storage.row_insert(vec![watch_val]).await?; storage.row_insert(vec![watch_val]).await?;
Ok(()) Ok(())
} }

View file

@ -8,7 +8,7 @@ use crate::login::Credentials;
use crate::mail::uidindex::*; use crate::mail::uidindex::*;
use crate::mail::unique_ident::*; use crate::mail::unique_ident::*;
use crate::mail::IMF; use crate::mail::IMF;
use crate::storage::{Store, RowRef, RowVal, BlobRef, BlobVal, Selector, self}; use crate::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store};
use crate::timestamp::now_msec; use crate::timestamp::now_msec;
pub struct Mailbox { pub struct Mailbox {
@ -196,7 +196,10 @@ impl MailboxInternal {
async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> { async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>(); let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
let ops = ids.iter().map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())).collect::<Vec<_>>(); let ops = ids
.iter()
.map(|id| RowRef::new(self.mail_path.as_str(), id.as_str()))
.collect::<Vec<_>>();
let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?; let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?;
let mut meta_vec = vec![]; let mut meta_vec = vec![];
@ -231,7 +234,10 @@ impl MailboxInternal {
} }
async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> { async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
let obj_res = self.storage.blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))).await?; let obj_res = self
.storage
.blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id)))
.await?;
let body = obj_res.value; let body = obj_res.value;
cryptoblob::open(&body, message_key) cryptoblob::open(&body, message_key)
} }
@ -266,10 +272,12 @@ impl MailboxInternal {
async { async {
// Encrypt and save mail body // Encrypt and save mail body
let message_blob = cryptoblob::seal(mail.raw, &message_key)?; let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
self.storage.blob_insert(BlobVal::new( self.storage
BlobRef(format!("{}/{}", self.mail_path, ident)), .blob_insert(BlobVal::new(
message_blob, BlobRef(format!("{}/{}", self.mail_path, ident)),
)).await?; message_blob,
))
.await?;
Ok::<_, anyhow::Error>(()) Ok::<_, anyhow::Error>(())
}, },
async { async {
@ -281,10 +289,12 @@ impl MailboxInternal {
rfc822_size: mail.raw.len(), rfc822_size: mail.raw.len(),
}; };
let meta_blob = seal_serialize(&meta, &self.encryption_key)?; let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
self.storage.row_insert(vec![RowVal::new( self.storage
RowRef::new(&self.mail_path, &ident.to_string()), .row_insert(vec![RowVal::new(
meta_blob, RowRef::new(&self.mail_path, &ident.to_string()),
)]).await?; meta_blob,
)])
.await?;
Ok::<_, anyhow::Error>(()) Ok::<_, anyhow::Error>(())
}, },
self.uid_index.opportunistic_sync() self.uid_index.opportunistic_sync()
@ -328,10 +338,12 @@ impl MailboxInternal {
rfc822_size: mail.raw.len(), rfc822_size: mail.raw.len(),
}; };
let meta_blob = seal_serialize(&meta, &self.encryption_key)?; let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
self.storage.row_insert(vec![RowVal::new( self.storage
.row_insert(vec![RowVal::new(
RowRef::new(&self.mail_path, &ident.to_string()), RowRef::new(&self.mail_path, &ident.to_string()),
meta_blob, meta_blob,
)]).await?; )])
.await?;
Ok::<_, anyhow::Error>(()) Ok::<_, anyhow::Error>(())
}, },
self.uid_index.opportunistic_sync() self.uid_index.opportunistic_sync()
@ -355,17 +367,25 @@ impl MailboxInternal {
futures::try_join!( futures::try_join!(
async { async {
// Delete mail body from S3 // Delete mail body from S3
self.storage.blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))).await?; self.storage
.blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident)))
.await?;
Ok::<_, anyhow::Error>(()) Ok::<_, anyhow::Error>(())
}, },
async { async {
// Delete mail meta from K2V // Delete mail meta from K2V
let sk = ident.to_string(); let sk = ident.to_string();
let res = self.storage let res = self
.row_fetch(&storage::Selector::Single(&RowRef::new(&self.mail_path, &sk))) .storage
.row_fetch(&storage::Selector::Single(&RowRef::new(
&self.mail_path,
&sk,
)))
.await?; .await?;
if let Some(row_val) = res.into_iter().next() { if let Some(row_val) = res.into_iter().next() {
self.storage.row_rm(&storage::Selector::Single(&row_val.row_ref)).await?; self.storage
.row_rm(&storage::Selector::Single(&row_val.row_ref))
.await?;
} }
Ok::<_, anyhow::Error>(()) Ok::<_, anyhow::Error>(())
} }
@ -421,10 +441,12 @@ impl MailboxInternal {
// Copy mail meta in K2V // Copy mail meta in K2V
let meta = &from.fetch_meta(&[source_id]).await?[0]; let meta = &from.fetch_meta(&[source_id]).await?[0];
let meta_blob = seal_serialize(meta, &self.encryption_key)?; let meta_blob = seal_serialize(meta, &self.encryption_key)?;
self.storage.row_insert(vec![RowVal::new( self.storage
RowRef::new(&self.mail_path, &new_id.to_string()), .row_insert(vec![RowVal::new(
meta_blob, RowRef::new(&self.mail_path, &new_id.to_string()),
)]).await?; meta_blob,
)])
.await?;
Ok::<_, anyhow::Error>(()) Ok::<_, anyhow::Error>(())
}, },
self.uid_index.opportunistic_sync(), self.uid_index.opportunistic_sync(),

View file

@ -226,7 +226,11 @@ impl User {
async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<storage::RowRef>)> { async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<storage::RowRef>)> {
let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK); let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK);
let (mut list, row) = match self.storage.row_fetch(&storage::Selector::Single(&row_ref)).await { let (mut list, row) = match self
.storage
.row_fetch(&storage::Selector::Single(&row_ref))
.await
{
Err(storage::StorageError::NotFound) => (MailboxList::new(), None), Err(storage::StorageError::NotFound) => (MailboxList::new(), None),
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
Ok(rv) => { Ok(rv) => {

View file

@ -1,6 +1,5 @@
#![feature(async_fn_in_trait)] #![feature(async_fn_in_trait)]
mod timestamp;
mod bayou; mod bayou;
mod config; mod config;
mod cryptoblob; mod cryptoblob;
@ -11,17 +10,18 @@ mod login;
mod mail; mod mail;
mod server; mod server;
mod storage; mod storage;
mod timestamp;
use std::path::PathBuf;
use std::io::Read; use std::io::Read;
use std::path::PathBuf;
use anyhow::{bail, Result, Context}; use anyhow::{bail, Context, Result};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use nix::{unistd::Pid, sys::signal}; use nix::{sys::signal, unistd::Pid};
use config::*; use config::*;
use server::Server;
use login::{static_provider::*, *}; use login::{static_provider::*, *};
use server::Server;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)] #[clap(author, version, about, long_about = None)]
@ -58,7 +58,7 @@ enum ToolsCommand {
PasswordHash { PasswordHash {
#[clap(env = "AEROGRAMME_PASSWORD")] #[clap(env = "AEROGRAMME_PASSWORD")]
maybe_password: Option<String>, maybe_password: Option<String>,
} },
} }
#[derive(Subcommand, Debug)] #[derive(Subcommand, Debug)]
@ -138,7 +138,7 @@ enum AccountManagement {
maybe_new_password: Option<String>, maybe_new_password: Option<String>,
#[clap(short, long)] #[clap(short, long)]
login: String login: String,
}, },
} }
@ -165,11 +165,11 @@ async fn main() -> Result<()> {
CompanionCommand::Daemon => { CompanionCommand::Daemon => {
let server = Server::from_companion_config(config).await?; let server = Server::from_companion_config(config).await?;
server.run().await?; server.run().await?;
}, }
CompanionCommand::Reload { pid } => reload(*pid, config.pid)?, CompanionCommand::Reload { pid } => reload(*pid, config.pid)?,
CompanionCommand::Wizard => { CompanionCommand::Wizard => {
unimplemented!(); unimplemented!();
}, }
CompanionCommand::Account(cmd) => { CompanionCommand::Account(cmd) => {
let user_file = config.users.user_list; let user_file = config.users.user_list;
account_management(&args.command, cmd, user_file)?; account_management(&args.command, cmd, user_file)?;
@ -179,22 +179,24 @@ async fn main() -> Result<()> {
ProviderCommand::Daemon => { ProviderCommand::Daemon => {
let server = Server::from_provider_config(config).await?; let server = Server::from_provider_config(config).await?;
server.run().await?; server.run().await?;
}, }
ProviderCommand::Reload { pid } => reload(*pid, config.pid)?, ProviderCommand::Reload { pid } => reload(*pid, config.pid)?,
ProviderCommand::Account(cmd) => { ProviderCommand::Account(cmd) => {
let user_file = match config.users { let user_file = match config.users {
UserManagement::Static(conf) => conf.user_list, UserManagement::Static(conf) => conf.user_list,
UserManagement::Ldap(_) => panic!("LDAP account management is not supported from Aerogramme.") UserManagement::Ldap(_) => {
panic!("LDAP account management is not supported from Aerogramme.")
}
}; };
account_management(&args.command, cmd, user_file)?; account_management(&args.command, cmd, user_file)?;
} }
}, },
(Command::Provider(_), AnyConfig::Companion(_)) => { (Command::Provider(_), AnyConfig::Companion(_)) => {
bail!("Your want to run a 'Provider' command but your configuration file has role 'Companion'."); bail!("Your want to run a 'Provider' command but your configuration file has role 'Companion'.");
}, }
(Command::Companion(_), AnyConfig::Provider(_)) => { (Command::Companion(_), AnyConfig::Provider(_)) => {
bail!("Your want to run a 'Companion' command but your configuration file has role 'Provider'."); bail!("Your want to run a 'Companion' command but your configuration file has role 'Provider'.");
}, }
(Command::Tools(subcommand), _) => match subcommand { (Command::Tools(subcommand), _) => match subcommand {
ToolsCommand::PasswordHash { maybe_password } => { ToolsCommand::PasswordHash { maybe_password } => {
let password = match maybe_password { let password = match maybe_password {
@ -202,60 +204,64 @@ async fn main() -> Result<()> {
None => rpassword::prompt_password("Enter password: ")?, None => rpassword::prompt_password("Enter password: ")?,
}; };
println!("{}", hash_password(&password)?); println!("{}", hash_password(&password)?);
}, }
ToolsCommand::CryptoRoot(crcommand) => { ToolsCommand::CryptoRoot(crcommand) => match crcommand {
match crcommand { CryptoRootCommand::New { maybe_password } => {
CryptoRootCommand::New { maybe_password } => { let password = match maybe_password {
let password = match maybe_password { Some(pwd) => pwd.clone(),
Some(pwd) => pwd.clone(), None => {
None => { let password = rpassword::prompt_password("Enter password: ")?;
let password = rpassword::prompt_password("Enter password: ")?; let password_confirm =
let password_confirm = rpassword::prompt_password("Confirm password: ")?; rpassword::prompt_password("Confirm password: ")?;
if password != password_confirm { if password != password_confirm {
bail!("Passwords don't match."); bail!("Passwords don't match.");
}
password
} }
}; password
let crypto_keys = CryptoKeys::init(); }
let cr = CryptoRoot::create_pass(&password, &crypto_keys)?; };
println!("{}", cr.0); let crypto_keys = CryptoKeys::init();
}, let cr = CryptoRoot::create_pass(&password, &crypto_keys)?;
CryptoRootCommand::NewClearText => { println!("{}", cr.0);
let crypto_keys = CryptoKeys::init(); }
let cr = CryptoRoot::create_cleartext(&crypto_keys); CryptoRootCommand::NewClearText => {
println!("{}", cr.0); let crypto_keys = CryptoKeys::init();
}, let cr = CryptoRoot::create_cleartext(&crypto_keys);
CryptoRootCommand::ChangePassword { maybe_old_password, maybe_new_password, crypto_root } => { println!("{}", cr.0);
let old_password = match maybe_old_password { }
Some(pwd) => pwd.to_string(), CryptoRootCommand::ChangePassword {
None => rpassword::prompt_password("Enter old password: ")?, maybe_old_password,
}; maybe_new_password,
crypto_root,
} => {
let old_password = match maybe_old_password {
Some(pwd) => pwd.to_string(),
None => rpassword::prompt_password("Enter old password: ")?,
};
let new_password = match maybe_new_password { let new_password = match maybe_new_password {
Some(pwd) => pwd.to_string(), Some(pwd) => pwd.to_string(),
None => { None => {
let password = rpassword::prompt_password("Enter new password: ")?; let password = rpassword::prompt_password("Enter new password: ")?;
let password_confirm = rpassword::prompt_password("Confirm new password: ")?; let password_confirm =
if password != password_confirm { rpassword::prompt_password("Confirm new password: ")?;
bail!("Passwords don't match."); if password != password_confirm {
} bail!("Passwords don't match.");
password
} }
}; password
}
};
let keys = CryptoRoot(crypto_root.to_string()).crypto_keys(&old_password)?; let keys = CryptoRoot(crypto_root.to_string()).crypto_keys(&old_password)?;
let cr = CryptoRoot::create_pass(&new_password, &keys)?; let cr = CryptoRoot::create_pass(&new_password, &keys)?;
println!("{}", cr.0); println!("{}", cr.0);
}, }
CryptoRootCommand::DeriveIncoming { crypto_root } => { CryptoRootCommand::DeriveIncoming { crypto_root } => {
let pubkey = CryptoRoot(crypto_root.to_string()).public_key()?; let pubkey = CryptoRoot(crypto_root.to_string()).public_key()?;
let cr = CryptoRoot::create_incoming(&pubkey); let cr = CryptoRoot::create_incoming(&pubkey);
println!("{}", cr.0); println!("{}", cr.0);
},
} }
}, },
} },
} }
Ok(()) Ok(())
@ -264,12 +270,12 @@ async fn main() -> Result<()> {
fn reload(pid: Option<i32>, pid_path: Option<PathBuf>) -> Result<()> { fn reload(pid: Option<i32>, pid_path: Option<PathBuf>) -> Result<()> {
let final_pid = match (pid, pid_path) { let final_pid = match (pid, pid_path) {
(Some(pid), _) => pid, (Some(pid), _) => pid,
(_, Some(path)) => { (_, Some(path)) => {
let mut f = std::fs::OpenOptions::new().read(true).open(path)?; let mut f = std::fs::OpenOptions::new().read(true).open(path)?;
let mut pidstr = String::new(); let mut pidstr = String::new();
f.read_to_string(&mut pidstr)?; f.read_to_string(&mut pidstr)?;
pidstr.parse::<i32>()? pidstr.parse::<i32>()?
}, }
_ => bail!("Unable to infer your daemon's PID"), _ => bail!("Unable to infer your daemon's PID"),
}; };
let pid = Pid::from_raw(final_pid); let pid = Pid::from_raw(final_pid);
@ -278,13 +284,15 @@ fn reload(pid: Option<i32>, pid_path: Option<PathBuf>) -> Result<()> {
} }
fn account_management(root: &Command, cmd: &AccountManagement, users: PathBuf) -> Result<()> { fn account_management(root: &Command, cmd: &AccountManagement, users: PathBuf) -> Result<()> {
let mut ulist: UserList = read_config(users.clone()).context(format!("'{:?}' must be a user database", users))?; let mut ulist: UserList =
read_config(users.clone()).context(format!("'{:?}' must be a user database", users))?;
match cmd { match cmd {
AccountManagement::Add { login, setup } => { AccountManagement::Add { login, setup } => {
tracing::debug!(user=login, "will-create"); tracing::debug!(user = login, "will-create");
let stp: SetupEntry = read_config(setup.clone()).context(format!("'{:?}' must be a setup file", setup))?; let stp: SetupEntry = read_config(setup.clone())
tracing::debug!(user=login, "loaded setup entry"); .context(format!("'{:?}' must be a setup file", setup))?;
tracing::debug!(user = login, "loaded setup entry");
let password = match stp.clear_password { let password = match stp.clear_password {
Some(pwd) => pwd, Some(pwd) => pwd,
@ -307,21 +315,28 @@ fn account_management(root: &Command, cmd: &AccountManagement, users: PathBuf) -
let hash = hash_password(password.as_str()).context("unable to hash password")?; let hash = hash_password(password.as_str()).context("unable to hash password")?;
ulist.insert(login.clone(), UserEntry { ulist.insert(
email_addresses: stp.email_addresses, login.clone(),
password: hash, UserEntry {
crypto_root: crypto_root.0, email_addresses: stp.email_addresses,
storage: stp.storage, password: hash,
}); crypto_root: crypto_root.0,
storage: stp.storage,
},
);
write_config(users.clone(), &ulist)?; write_config(users.clone(), &ulist)?;
}, }
AccountManagement::Delete { login } => { AccountManagement::Delete { login } => {
tracing::debug!(user=login, "will-delete"); tracing::debug!(user = login, "will-delete");
ulist.remove(login); ulist.remove(login);
write_config(users.clone(), &ulist)?; write_config(users.clone(), &ulist)?;
}, }
AccountManagement::ChangePassword { maybe_old_password, maybe_new_password, login } => { AccountManagement::ChangePassword {
maybe_old_password,
maybe_new_password,
login,
} => {
let mut user = ulist.remove(login).context("user must exist first")?; let mut user = ulist.remove(login).context("user must exist first")?;
let old_password = match maybe_old_password { let old_password = match maybe_old_password {
@ -345,16 +360,16 @@ fn account_management(root: &Command, cmd: &AccountManagement, users: PathBuf) -
} }
password password
} }
}; };
let new_hash = hash_password(&new_password)?; let new_hash = hash_password(&new_password)?;
let new_crypto_root = CryptoRoot::create_pass(&new_password, &crypto_keys)?; let new_crypto_root = CryptoRoot::create_pass(&new_password, &crypto_keys)?;
user.password = new_hash; user.password = new_hash;
user.crypto_root = new_crypto_root.0; user.crypto_root = new_crypto_root.0;
ulist.insert(login.clone(), user); ulist.insert(login.clone(), user);
write_config(users.clone(), &ulist)?; write_config(users.clone(), &ulist)?;
}, }
}; };
Ok(()) Ok(())

View file

@ -1,6 +1,6 @@
use std::sync::Arc;
use std::path::PathBuf;
use std::io::Write; use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use futures::try_join; use futures::try_join;
@ -26,7 +26,11 @@ impl Server {
let lmtp_server = None; let lmtp_server = None;
let imap_server = Some(imap::new(config.imap, login.clone()).await?); let imap_server = Some(imap::new(config.imap, login.clone()).await?);
Ok(Self { lmtp_server, imap_server, pid_file: config.pid }) Ok(Self {
lmtp_server,
imap_server,
pid_file: config.pid,
})
} }
pub async fn from_provider_config(config: ProviderConfig) -> Result<Self> { pub async fn from_provider_config(config: ProviderConfig) -> Result<Self> {
@ -39,12 +43,16 @@ impl Server {
let lmtp_server = Some(LmtpServer::new(config.lmtp, login.clone())); let lmtp_server = Some(LmtpServer::new(config.lmtp, login.clone()));
let imap_server = Some(imap::new(config.imap, login.clone()).await?); let imap_server = Some(imap::new(config.imap, login.clone()).await?);
Ok(Self { lmtp_server, imap_server, pid_file: config.pid }) Ok(Self {
lmtp_server,
imap_server,
pid_file: config.pid,
})
} }
pub async fn run(self) -> Result<()> { pub async fn run(self) -> Result<()> {
let pid = std::process::id(); let pid = std::process::id();
tracing::info!(pid=pid, "Starting main loops"); tracing::info!(pid = pid, "Starting main loops");
// write the pid file // write the pid file
if let Some(pid_file) = self.pid_file { if let Some(pid_file) = self.pid_file {
@ -57,7 +65,6 @@ impl Server {
drop(file); drop(file);
} }
let (exit_signal, provoke_exit) = watch_ctrl_c(); let (exit_signal, provoke_exit) = watch_ctrl_c();
let _exit_on_err = move |err: anyhow::Error| { let _exit_on_err = move |err: anyhow::Error| {
error!("Error: {}", err); error!("Error: {}", err);

View file

@ -1,10 +1,6 @@
use crate::storage::*; use crate::storage::*;
use aws_sdk_s3::{self as s3, error::SdkError, operation::get_object::GetObjectError};
use serde::Serialize; use serde::Serialize;
use aws_sdk_s3::{
self as s3,
error::SdkError,
operation::get_object::GetObjectError,
};
#[derive(Clone, Debug, Serialize)] #[derive(Clone, Debug, Serialize)]
pub struct GarageConf { pub struct GarageConf {
@ -28,18 +24,18 @@ impl GarageBuilder {
unicity.extend_from_slice(file!().as_bytes()); unicity.extend_from_slice(file!().as_bytes());
unicity.append(&mut rmp_serde::to_vec(&conf)?); unicity.append(&mut rmp_serde::to_vec(&conf)?);
Ok(Arc::new(Self { conf, unicity })) Ok(Arc::new(Self { conf, unicity }))
} }
} }
#[async_trait] #[async_trait]
impl IBuilder for GarageBuilder { impl IBuilder for GarageBuilder {
async fn build(&self) -> Result<Store, StorageError> { async fn build(&self) -> Result<Store, StorageError> {
let s3_creds = s3::config::Credentials::new( let s3_creds = s3::config::Credentials::new(
self.conf.aws_access_key_id.clone(), self.conf.aws_access_key_id.clone(),
self.conf.aws_secret_access_key.clone(), self.conf.aws_secret_access_key.clone(),
None, None,
None, None,
"aerogramme" "aerogramme",
); );
let s3_config = aws_config::from_env() let s3_config = aws_config::from_env()
@ -51,12 +47,12 @@ impl IBuilder for GarageBuilder {
let s3_client = aws_sdk_s3::Client::new(&s3_config); let s3_client = aws_sdk_s3::Client::new(&s3_config);
let k2v_config = k2v_client::K2vClientConfig { let k2v_config = k2v_client::K2vClientConfig {
endpoint: self.conf.k2v_endpoint.clone(), endpoint: self.conf.k2v_endpoint.clone(),
region: self.conf.region.clone(), region: self.conf.region.clone(),
aws_access_key_id: self.conf.aws_access_key_id.clone(), aws_access_key_id: self.conf.aws_access_key_id.clone(),
aws_secret_access_key: self.conf.aws_secret_access_key.clone(), aws_secret_access_key: self.conf.aws_secret_access_key.clone(),
bucket: self.conf.bucket.clone(), bucket: self.conf.bucket.clone(),
user_agent: None, user_agent: None,
}; };
let k2v_client = match k2v_client::K2vClient::new(k2v_config) { let k2v_client = match k2v_client::K2vClient::new(k2v_config) {
@ -67,7 +63,7 @@ impl IBuilder for GarageBuilder {
Ok(v) => v, Ok(v) => v,
}; };
Ok(Box::new(GarageStore { Ok(Box::new(GarageStore {
bucket: self.conf.bucket.clone(), bucket: self.conf.bucket.clone(),
s3: s3_client, s3: s3_client,
k2v: k2v_client, k2v: k2v_client,
@ -86,19 +82,30 @@ pub struct GarageStore {
fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) -> RowVal { fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) -> RowVal {
let new_row_ref = row_ref.with_causality(causal_value.causality.into()); let new_row_ref = row_ref.with_causality(causal_value.causality.into());
let row_values = causal_value.value.into_iter().map(|k2v_value| match k2v_value { let row_values = causal_value
k2v_client::K2vValue::Tombstone => Alternative::Tombstone, .value
k2v_client::K2vValue::Value(v) => Alternative::Value(v), .into_iter()
}).collect::<Vec<_>>(); .map(|k2v_value| match k2v_value {
k2v_client::K2vValue::Tombstone => Alternative::Tombstone,
k2v_client::K2vValue::Value(v) => Alternative::Value(v),
})
.collect::<Vec<_>>();
RowVal { row_ref: new_row_ref, value: row_values } RowVal {
row_ref: new_row_ref,
value: row_values,
}
} }
#[async_trait] #[async_trait]
impl IStore for GarageStore { impl IStore for GarageStore {
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> { async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
let (pk_list, batch_op) = match select { let (pk_list, batch_op) = match select {
Selector::Range { shard, sort_begin, sort_end } => ( Selector::Range {
shard,
sort_begin,
sort_end,
} => (
vec![shard.to_string()], vec![shard.to_string()],
vec![k2v_client::BatchReadOp { vec![k2v_client::BatchReadOp {
partition_key: shard, partition_key: shard,
@ -108,49 +115,71 @@ impl IStore for GarageStore {
..k2v_client::Filter::default() ..k2v_client::Filter::default()
}, },
..k2v_client::BatchReadOp::default() ..k2v_client::BatchReadOp::default()
}] }],
), ),
Selector::List(row_ref_list) => ( Selector::List(row_ref_list) => (
row_ref_list.iter().map(|row_ref| row_ref.uid.shard.to_string()).collect::<Vec<_>>(), row_ref_list
row_ref_list.iter().map(|row_ref| k2v_client::BatchReadOp { .iter()
partition_key: &row_ref.uid.shard, .map(|row_ref| row_ref.uid.shard.to_string())
filter: k2v_client::Filter { .collect::<Vec<_>>(),
start: Some(&row_ref.uid.sort), row_ref_list
..k2v_client::Filter::default() .iter()
}, .map(|row_ref| k2v_client::BatchReadOp {
single_item: true, partition_key: &row_ref.uid.shard,
..k2v_client::BatchReadOp::default() filter: k2v_client::Filter {
}).collect::<Vec<_>>() start: Some(&row_ref.uid.sort),
..k2v_client::Filter::default()
},
single_item: true,
..k2v_client::BatchReadOp::default()
})
.collect::<Vec<_>>(),
), ),
Selector::Prefix { shard, sort_prefix } => ( Selector::Prefix { shard, sort_prefix } => (
vec![shard.to_string()], vec![shard.to_string()],
vec![k2v_client::BatchReadOp { vec![k2v_client::BatchReadOp {
partition_key: shard, partition_key: shard,
filter: k2v_client::Filter { filter: k2v_client::Filter {
prefix: Some(sort_prefix), prefix: Some(sort_prefix),
..k2v_client::Filter::default() ..k2v_client::Filter::default()
},
..k2v_client::BatchReadOp::default()
}]),
Selector::Single(row_ref) => {
let causal_value = match self.k2v.read_item(&row_ref.uid.shard, &row_ref.uid.sort).await {
Err(e) => {
tracing::error!("K2V read item shard={}, sort={}, bucket={} failed: {}", row_ref.uid.shard, row_ref.uid.sort, self.bucket, e);
return Err(StorageError::Internal);
}, },
..k2v_client::BatchReadOp::default()
}],
),
Selector::Single(row_ref) => {
let causal_value = match self
.k2v
.read_item(&row_ref.uid.shard, &row_ref.uid.sort)
.await
{
Err(e) => {
tracing::error!(
"K2V read item shard={}, sort={}, bucket={} failed: {}",
row_ref.uid.shard,
row_ref.uid.sort,
self.bucket,
e
);
return Err(StorageError::Internal);
}
Ok(v) => v, Ok(v) => v,
}; };
let row_val = causal_to_row_val((*row_ref).clone(), causal_value); let row_val = causal_to_row_val((*row_ref).clone(), causal_value);
return Ok(vec![row_val]) return Ok(vec![row_val]);
}, }
}; };
let all_raw_res = match self.k2v.read_batch(&batch_op).await { let all_raw_res = match self.k2v.read_batch(&batch_op).await {
Err(e) => { Err(e) => {
tracing::error!("k2v read batch failed for {:?}, bucket {} with err: {}", select, self.bucket, e); tracing::error!(
"k2v read batch failed for {:?}, bucket {} with err: {}",
select,
self.bucket,
e
);
return Err(StorageError::Internal); return Err(StorageError::Internal);
}, }
Ok(v) => v, Ok(v) => v,
}; };
@ -163,13 +192,17 @@ impl IStore for GarageStore {
.into_iter() .into_iter()
.zip(pk_list.into_iter()) .zip(pk_list.into_iter())
.map(|((sk, cv), pk)| causal_to_row_val(RowRef::new(&pk, &sk), cv)) .map(|((sk, cv), pk)| causal_to_row_val(RowRef::new(&pk, &sk), cv))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
Ok(row_vals) Ok(row_vals)
} }
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
let del_op = match select { let del_op = match select {
Selector::Range { shard, sort_begin, sort_end } => vec![k2v_client::BatchDeleteOp { Selector::Range {
shard,
sort_begin,
sort_end,
} => vec![k2v_client::BatchDeleteOp {
partition_key: shard, partition_key: shard,
prefix: None, prefix: None,
start: Some(sort_begin), start: Some(sort_begin),
@ -178,21 +211,24 @@ impl IStore for GarageStore {
}], }],
Selector::List(row_ref_list) => { Selector::List(row_ref_list) => {
// Insert null values with causality token = delete // Insert null values with causality token = delete
let batch_op = row_ref_list.iter().map(|v| k2v_client::BatchInsertOp { let batch_op = row_ref_list
partition_key: &v.uid.shard, .iter()
sort_key: &v.uid.sort, .map(|v| k2v_client::BatchInsertOp {
causality: v.causality.clone().map(|ct| ct.into()), partition_key: &v.uid.shard,
value: k2v_client::K2vValue::Tombstone, sort_key: &v.uid.sort,
}).collect::<Vec<_>>(); causality: v.causality.clone().map(|ct| ct.into()),
value: k2v_client::K2vValue::Tombstone,
})
.collect::<Vec<_>>();
return match self.k2v.insert_batch(&batch_op).await { return match self.k2v.insert_batch(&batch_op).await {
Err(e) => { Err(e) => {
tracing::error!("Unable to delete the list of values: {}", e); tracing::error!("Unable to delete the list of values: {}", e);
Err(StorageError::Internal) Err(StorageError::Internal)
}, }
Ok(_) => Ok(()), Ok(_) => Ok(()),
}; };
}, }
Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchDeleteOp { Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchDeleteOp {
partition_key: shard, partition_key: shard,
prefix: Some(sort_prefix), prefix: Some(sort_prefix),
@ -208,15 +244,15 @@ impl IStore for GarageStore {
causality: row_ref.causality.clone().map(|ct| ct.into()), causality: row_ref.causality.clone().map(|ct| ct.into()),
value: k2v_client::K2vValue::Tombstone, value: k2v_client::K2vValue::Tombstone,
}]; }];
return match self.k2v.insert_batch(&batch_op).await { return match self.k2v.insert_batch(&batch_op).await {
Err(e) => { Err(e) => {
tracing::error!("Unable to delete the list of values: {}", e); tracing::error!("Unable to delete the list of values: {}", e);
Err(StorageError::Internal) Err(StorageError::Internal)
}, }
Ok(_) => Ok(()), Ok(_) => Ok(()),
}; };
}, }
}; };
// Finally here we only have prefix & range // Finally here we only have prefix & range
@ -224,34 +260,46 @@ impl IStore for GarageStore {
Err(e) => { Err(e) => {
tracing::error!("delete batch error: {}", e); tracing::error!("delete batch error: {}", e);
Err(StorageError::Internal) Err(StorageError::Internal)
}, }
Ok(_) => Ok(()), Ok(_) => Ok(()),
} }
} }
async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> { async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
let batch_ops = values.iter().map(|v| k2v_client::BatchInsertOp { let batch_ops = values
partition_key: &v.row_ref.uid.shard, .iter()
sort_key: &v.row_ref.uid.sort, .map(|v| k2v_client::BatchInsertOp {
causality: v.row_ref.causality.clone().map(|ct| ct.into()), partition_key: &v.row_ref.uid.shard,
value: v.value.iter().next().map(|cv| match cv { sort_key: &v.row_ref.uid.sort,
Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()), causality: v.row_ref.causality.clone().map(|ct| ct.into()),
Alternative::Tombstone => k2v_client::K2vValue::Tombstone, value: v
}).unwrap_or(k2v_client::K2vValue::Tombstone) .value
}).collect::<Vec<_>>(); .iter()
.next()
.map(|cv| match cv {
Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()),
Alternative::Tombstone => k2v_client::K2vValue::Tombstone,
})
.unwrap_or(k2v_client::K2vValue::Tombstone),
})
.collect::<Vec<_>>();
match self.k2v.insert_batch(&batch_ops).await { match self.k2v.insert_batch(&batch_ops).await {
Err(e) => { Err(e) => {
tracing::error!("k2v can't insert some value: {}", e); tracing::error!("k2v can't insert some value: {}", e);
Err(StorageError::Internal) Err(StorageError::Internal)
}, }
Ok(v) => Ok(v), Ok(v) => Ok(v),
} }
} }
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> { async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
loop { loop {
if let Some(ct) = &value.causality { if let Some(ct) = &value.causality {
match self.k2v.poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None).await { match self
.k2v
.poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None)
.await
{
Err(e) => { Err(e) => {
tracing::error!("Unable to poll item: {}", e); tracing::error!("Unable to poll item: {}", e);
return Err(StorageError::Internal); return Err(StorageError::Internal);
@ -262,8 +310,7 @@ impl IStore for GarageStore {
} else { } else {
match self.k2v.read_item(&value.uid.shard, &value.uid.sort).await { match self.k2v.read_item(&value.uid.shard, &value.uid.sort).await {
Err(k2v_client::Error::NotFound) => { Err(k2v_client::Error::NotFound) => {
self self.k2v
.k2v
.insert_item(&value.uid.shard, &value.uid.sort, vec![0u8], None) .insert_item(&value.uid.shard, &value.uid.sort, vec![0u8], None)
.await .await
.map_err(|e| { .map_err(|e| {
@ -273,8 +320,8 @@ impl IStore for GarageStore {
} }
Err(e) => { Err(e) => {
tracing::error!("Unable to read item in polling logic: {}", e); tracing::error!("Unable to read item in polling logic: {}", e);
return Err(StorageError::Internal) return Err(StorageError::Internal);
}, }
Ok(cv) => return Ok(causal_to_row_val(value.clone(), cv)), Ok(cv) => return Ok(causal_to_row_val(value.clone(), cv)),
} }
} }
@ -282,7 +329,8 @@ impl IStore for GarageStore {
} }
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> { async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
let maybe_out = self.s3 let maybe_out = self
.s3
.get_object() .get_object()
.bucket(self.bucket.to_string()) .bucket(self.bucket.to_string())
.key(blob_ref.0.to_string()) .key(blob_ref.0.to_string())
@ -296,12 +344,12 @@ impl IStore for GarageStore {
e => { e => {
tracing::warn!("Blob Fetch Error, Service Error: {}", e); tracing::warn!("Blob Fetch Error, Service Error: {}", e);
return Err(StorageError::Internal); return Err(StorageError::Internal);
}, }
}, },
Err(e) => { Err(e) => {
tracing::warn!("Blob Fetch Error, {}", e); tracing::warn!("Blob Fetch Error, {}", e);
return Err(StorageError::Internal); return Err(StorageError::Internal);
}, }
}; };
let buffer = match object_output.body.collect().await { let buffer = match object_output.body.collect().await {
@ -316,9 +364,10 @@ impl IStore for GarageStore {
Ok(BlobVal::new(blob_ref.clone(), buffer)) Ok(BlobVal::new(blob_ref.clone(), buffer))
} }
async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> {
let streamable_value = s3::primitives::ByteStream::from(blob_val.value); let streamable_value = s3::primitives::ByteStream::from(blob_val.value);
let maybe_send = self.s3 let maybe_send = self
.s3
.put_object() .put_object()
.bucket(self.bucket.to_string()) .bucket(self.bucket.to_string())
.key(blob_val.blob_ref.0.to_string()) .key(blob_val.blob_ref.0.to_string())
@ -338,7 +387,8 @@ impl IStore for GarageStore {
} }
} }
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> {
let maybe_copy = self.s3 let maybe_copy = self
.s3
.copy_object() .copy_object()
.bucket(self.bucket.to_string()) .bucket(self.bucket.to_string())
.key(dst.0.clone()) .key(dst.0.clone())
@ -348,18 +398,24 @@ impl IStore for GarageStore {
match maybe_copy { match maybe_copy {
Err(e) => { Err(e) => {
tracing::error!("unable to copy object {} to {} (bucket: {}), error: {}", src.0, dst.0, self.bucket, e); tracing::error!(
"unable to copy object {} to {} (bucket: {}), error: {}",
src.0,
dst.0,
self.bucket,
e
);
Err(StorageError::Internal) Err(StorageError::Internal)
}, }
Ok(_) => { Ok(_) => {
tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.bucket); tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.bucket);
Ok(()) Ok(())
} }
} }
} }
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> { async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
let maybe_list = self.s3 let maybe_list = self
.s3
.list_objects_v2() .list_objects_v2()
.bucket(self.bucket.to_string()) .bucket(self.bucket.to_string())
.prefix(prefix) .prefix(prefix)
@ -370,7 +426,12 @@ impl IStore for GarageStore {
match maybe_list { match maybe_list {
Err(e) => { Err(e) => {
tracing::error!("listing prefix {} on bucket {} failed: {}", prefix, self.bucket, e); tracing::error!(
"listing prefix {} on bucket {} failed: {}",
prefix,
self.bucket,
e
);
Err(StorageError::Internal) Err(StorageError::Internal)
} }
Ok(pagin_list_out) => Ok(pagin_list_out Ok(pagin_list_out) => Ok(pagin_list_out
@ -382,7 +443,8 @@ impl IStore for GarageStore {
} }
} }
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
let maybe_delete = self.s3 let maybe_delete = self
.s3
.delete_object() .delete_object()
.bucket(self.bucket.to_string()) .bucket(self.bucket.to_string())
.key(blob_ref.0.clone()) .key(blob_ref.0.clone())
@ -391,9 +453,14 @@ impl IStore for GarageStore {
match maybe_delete { match maybe_delete {
Err(e) => { Err(e) => {
tracing::error!("unable to delete {} (bucket: {}), error {}", blob_ref.0, self.bucket, e); tracing::error!(
"unable to delete {} (bucket: {}), error {}",
blob_ref.0,
self.bucket,
e
);
Err(StorageError::Internal) Err(StorageError::Internal)
}, }
Ok(_) => { Ok(_) => {
tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.bucket); tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.bucket);
Ok(()) Ok(())
@ -401,4 +468,3 @@ impl IStore for GarageStore {
} }
} }
} }

View file

@ -1,6 +1,6 @@
use crate::storage::*; use crate::storage::*;
use std::collections::{HashMap, BTreeMap}; use std::collections::{BTreeMap, HashMap};
use std::ops::Bound::{Included, Unbounded, Excluded, self}; use std::ops::Bound::{self, Excluded, Included, Unbounded};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use tokio::sync::Notify; use tokio::sync::Notify;
@ -16,7 +16,7 @@ impl MemDb {
Self(tokio::sync::Mutex::new(HashMap::new())) Self(tokio::sync::Mutex::new(HashMap::new()))
} }
pub async fn builder(&self, username: &str) -> Arc<MemBuilder> { pub async fn builder(&self, username: &str) -> Arc<MemBuilder> {
let mut global_storage = self.0.lock().await; let mut global_storage = self.0.lock().await;
global_storage global_storage
.entry(username.to_string()) .entry(username.to_string())
@ -60,8 +60,8 @@ impl InternalRowVal {
} }
fn to_row_val(&self, row_ref: RowRef) -> RowVal { fn to_row_val(&self, row_ref: RowRef) -> RowVal {
RowVal{ RowVal {
row_ref: row_ref.with_causality(self.version.to_string()), row_ref: row_ref.with_causality(self.version.to_string()),
value: self.concurrent_values(), value: self.concurrent_values(),
} }
} }
@ -75,7 +75,7 @@ struct InternalBlobVal {
impl InternalBlobVal { impl InternalBlobVal {
fn to_blob_val(&self, bref: &BlobRef) -> BlobVal { fn to_blob_val(&self, bref: &BlobRef) -> BlobVal {
BlobVal { BlobVal {
blob_ref: bref.clone(), blob_ref: bref.clone(),
meta: self.metadata.clone(), meta: self.metadata.clone(),
value: self.data.clone(), value: self.data.clone(),
} }
@ -113,7 +113,7 @@ impl IBuilder for MemBuilder {
row: self.row.clone(), row: self.row.clone(),
blob: self.blob.clone(), blob: self.blob.clone(),
})) }))
} }
fn unique(&self) -> UnicityBuffer { fn unique(&self) -> UnicityBuffer {
UnicityBuffer(self.unicity.clone()) UnicityBuffer(self.unicity.clone())
@ -170,24 +170,32 @@ impl IStore for MemStore {
let store = self.row.read().or(Err(StorageError::Internal))?; let store = self.row.read().or(Err(StorageError::Internal))?;
match select { match select {
Selector::Range { shard, sort_begin, sort_end } => { Selector::Range {
Ok(store shard,
.get(*shard) sort_begin,
.unwrap_or(&BTreeMap::new()) sort_end,
.range((Included(sort_begin.to_string()), Excluded(sort_end.to_string()))) } => Ok(store
.map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) .get(*shard)
.collect::<Vec<_>>()) .unwrap_or(&BTreeMap::new())
}, .range((
Included(sort_begin.to_string()),
Excluded(sort_end.to_string()),
))
.map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
.collect::<Vec<_>>()),
Selector::List(rlist) => { Selector::List(rlist) => {
let mut acc = vec![]; let mut acc = vec![];
for row_ref in rlist { for row_ref in rlist {
let maybe_intval = store.get(&row_ref.uid.shard).map(|v| v.get(&row_ref.uid.sort)).flatten(); let maybe_intval = store
.get(&row_ref.uid.shard)
.map(|v| v.get(&row_ref.uid.sort))
.flatten();
if let Some(intval) = maybe_intval { if let Some(intval) = maybe_intval {
acc.push(intval.to_row_val(row_ref.clone())); acc.push(intval.to_row_val(row_ref.clone()));
} }
} }
Ok(acc) Ok(acc)
}, }
Selector::Prefix { shard, sort_prefix } => { Selector::Prefix { shard, sort_prefix } => {
let last_bound = prefix_last_bound(sort_prefix); let last_bound = prefix_last_bound(sort_prefix);
@ -197,13 +205,13 @@ impl IStore for MemStore {
.range((Included(sort_prefix.to_string()), last_bound)) .range((Included(sort_prefix.to_string()), last_bound))
.map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) .map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
.collect::<Vec<_>>()) .collect::<Vec<_>>())
}, }
Selector::Single(row_ref) => { Selector::Single(row_ref) => {
let intval = store let intval = store
.get(&row_ref.uid.shard) .get(&row_ref.uid.shard)
.ok_or(StorageError::NotFound)? .ok_or(StorageError::NotFound)?
.get(&row_ref.uid.sort) .get(&row_ref.uid.sort)
.ok_or(StorageError::NotFound)?; .ok_or(StorageError::NotFound)?;
Ok(vec![intval.to_row_val((*row_ref).clone())]) Ok(vec![intval.to_row_val((*row_ref).clone())])
} }
} }
@ -213,7 +221,12 @@ impl IStore for MemStore {
tracing::trace!(select=%select, command="row_rm"); tracing::trace!(select=%select, command="row_rm");
let values = match select { let values = match select {
Selector::Range { .. } | Selector::Prefix { .. } => self.row_fetch(select).await?.into_iter().map(|rv| rv.row_ref).collect::<Vec<_>>(), Selector::Range { .. } | Selector::Prefix { .. } => self
.row_fetch(select)
.await?
.into_iter()
.map(|rv| rv.row_ref)
.collect::<Vec<_>>(),
Selector::List(rlist) => rlist.clone(), Selector::List(rlist) => rlist.clone(),
Selector::Single(row_ref) => vec![(*row_ref).clone()], Selector::Single(row_ref) => vec![(*row_ref).clone()],
}; };
@ -282,7 +295,10 @@ impl IStore for MemStore {
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> { async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
tracing::trace!(entry=%blob_ref, command="blob_fetch"); tracing::trace!(entry=%blob_ref, command="blob_fetch");
let store = self.blob.read().or(Err(StorageError::Internal))?; let store = self.blob.read().or(Err(StorageError::Internal))?;
store.get(&blob_ref.0).ok_or(StorageError::NotFound).map(|v| v.to_blob_val(blob_ref)) store
.get(&blob_ref.0)
.ok_or(StorageError::NotFound)
.map(|v| v.to_blob_val(blob_ref))
} }
async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> {
tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert"); tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert");
@ -300,10 +316,13 @@ impl IStore for MemStore {
Ok(()) Ok(())
} }
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> { async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
tracing::trace!(prefix=prefix, command="blob_list"); tracing::trace!(prefix = prefix, command = "blob_list");
let store = self.blob.read().or(Err(StorageError::Internal))?; let store = self.blob.read().or(Err(StorageError::Internal))?;
let last_bound = prefix_last_bound(prefix); let last_bound = prefix_last_bound(prefix);
let blist = store.range((Included(prefix.to_string()), last_bound)).map(|(k, _)| BlobRef(k.to_string())).collect::<Vec<_>>(); let blist = store
.range((Included(prefix.to_string()), last_bound))
.map(|(k, _)| BlobRef(k.to_string()))
.collect::<Vec<_>>();
Ok(blist) Ok(blist)
} }
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {

View file

@ -8,13 +8,13 @@
* into the object system so it is not exposed. * into the object system so it is not exposed.
*/ */
pub mod in_memory;
pub mod garage; pub mod garage;
pub mod in_memory;
use std::sync::Arc;
use std::hash::Hash;
use std::collections::HashMap;
use async_trait::async_trait; use async_trait::async_trait;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum Alternative { pub enum Alternative {
@ -52,14 +52,18 @@ pub struct RowRef {
} }
impl std::fmt::Display for RowRef { impl std::fmt::Display for RowRef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "RowRef({}, {}, {:?})", self.uid.shard, self.uid.sort, self.causality) write!(
f,
"RowRef({}, {}, {:?})",
self.uid.shard, self.uid.sort, self.causality
)
} }
} }
impl RowRef { impl RowRef {
pub fn new(shard: &str, sort: &str) -> Self { pub fn new(shard: &str, sort: &str) -> Self {
Self { Self {
uid: RowUid { uid: RowUid {
shard: shard.to_string(), shard: shard.to_string(),
sort: sort.to_string(), sort: sort.to_string(),
}, },
@ -87,7 +91,6 @@ impl RowVal {
} }
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct BlobRef(pub String); pub struct BlobRef(pub String);
impl std::fmt::Display for BlobRef { impl std::fmt::Display for BlobRef {
@ -105,7 +108,8 @@ pub struct BlobVal {
impl BlobVal { impl BlobVal {
pub fn new(blob_ref: BlobRef, value: Vec<u8>) -> Self { pub fn new(blob_ref: BlobRef, value: Vec<u8>) -> Self {
Self { Self {
blob_ref, value, blob_ref,
value,
meta: HashMap::new(), meta: HashMap::new(),
} }
} }
@ -118,16 +122,27 @@ impl BlobVal {
#[derive(Debug)] #[derive(Debug)]
pub enum Selector<'a> { pub enum Selector<'a> {
Range { shard: &'a str, sort_begin: &'a str, sort_end: &'a str }, Range {
List (Vec<RowRef>), // list of (shard_key, sort_key) shard: &'a str,
sort_begin: &'a str,
sort_end: &'a str,
},
List(Vec<RowRef>), // list of (shard_key, sort_key)
#[allow(dead_code)] #[allow(dead_code)]
Prefix { shard: &'a str, sort_prefix: &'a str }, Prefix {
shard: &'a str,
sort_prefix: &'a str,
},
Single(&'a RowRef), Single(&'a RowRef),
} }
impl<'a> std::fmt::Display for Selector<'a> { impl<'a> std::fmt::Display for Selector<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { match self {
Self::Range { shard, sort_begin, sort_end } => write!(f, "Range({}, [{}, {}[)", shard, sort_begin, sort_end), Self::Range {
shard,
sort_begin,
sort_end,
} => write!(f, "Range({}, [{}, {}[)", shard, sort_begin, sort_end),
Self::List(list) => write!(f, "List({:?})", list), Self::List(list) => write!(f, "List({:?})", list),
Self::Prefix { shard, sort_prefix } => write!(f, "Prefix({}, {})", shard, sort_prefix), Self::Prefix { shard, sort_prefix } => write!(f, "Prefix({}, {})", shard, sort_prefix),
Self::Single(row_ref) => write!(f, "Single({})", row_ref), Self::Single(row_ref) => write!(f, "Single({})", row_ref),
@ -149,7 +164,7 @@ pub trait IStore {
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>; async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>;
} }
#[derive(Clone,Debug,PartialEq,Eq,Hash)] #[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct UnicityBuffer(Vec<u8>); pub struct UnicityBuffer(Vec<u8>);
#[async_trait] #[async_trait]