in-memory storage #32

Merged
quentin merged 65 commits from in-memory into main 2023-12-27 16:35:43 +00:00
5 changed files with 239 additions and 222 deletions
Showing only changes of commit 684f4de225 - Show all commits

View file

@ -87,7 +87,9 @@ impl LdapLoginProvider {
fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result<Builders> { fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result<Builders> {
let storage: Builders = match &self.storage_specific { let storage: Builders = match &self.storage_specific {
StorageSpecific::InMemory => Box::new(storage::in_memory::FullMem {}), StorageSpecific::InMemory => Box::new(storage::in_memory::FullMem::new(
&get_attr(user, &self.username_attr)?
)),
StorageSpecific::Garage { from_config, bucket_source } => { StorageSpecific::Garage { from_config, bucket_source } => {
let aws_access_key_id = get_attr(user, &from_config.aws_access_key_id_attr)?; let aws_access_key_id = get_attr(user, &from_config.aws_access_key_id_attr)?;
let aws_secret_access_key = get_attr(user, &from_config.aws_secret_access_key_attr)?; let aws_secret_access_key = get_attr(user, &from_config.aws_secret_access_key_attr)?;

View file

@ -11,10 +11,15 @@ use crate::config::*;
use crate::login::*; use crate::login::*;
use crate::storage; use crate::storage;
pub struct ContextualUserEntry {
pub username: String,
pub config: UserEntry,
}
#[derive(Default)] #[derive(Default)]
pub struct UserDatabase { pub struct UserDatabase {
users: HashMap<String, Arc<UserEntry>>, users: HashMap<String, Arc<ContextualUserEntry>>,
users_by_email: HashMap<String, Arc<UserEntry>>, users_by_email: HashMap<String, Arc<ContextualUserEntry>>,
} }
pub struct StaticLoginProvider { pub struct StaticLoginProvider {
@ -35,12 +40,12 @@ pub async fn update_user_list(config: PathBuf, up: watch::Sender<UserDatabase>)
let users = ulist let users = ulist
.into_iter() .into_iter()
.map(|(k, v)| (k, Arc::new(v))) .map(|(username, config)| (username.clone() , Arc::new(ContextualUserEntry { username, config })))
.collect::<HashMap<_, _>>(); .collect::<HashMap<_, _>>();
let mut users_by_email = HashMap::new(); let mut users_by_email = HashMap::new();
for (_, u) in users.iter() { for (_, u) in users.iter() {
for m in u.email_addresses.iter() { for m in u.config.email_addresses.iter() {
if users_by_email.contains_key(m) { if users_by_email.contains_key(m) {
tracing::warn!("Several users have the same email address: {}", m); tracing::warn!("Several users have the same email address: {}", m);
continue continue
@ -78,13 +83,13 @@ impl LoginProvider for StaticLoginProvider {
}; };
tracing::debug!(user=%username, "verify password"); tracing::debug!(user=%username, "verify password");
if !verify_password(password, &user.password)? { if !verify_password(password, &user.config.password)? {
bail!("Wrong password"); bail!("Wrong password");
} }
tracing::debug!(user=%username, "fetch keys"); tracing::debug!(user=%username, "fetch keys");
let storage: storage::Builders = match &user.storage { let storage: storage::Builders = match &user.config.storage {
StaticStorage::InMemory => Box::new(storage::in_memory::FullMem {}), StaticStorage::InMemory => Box::new(storage::in_memory::FullMem::new(username)),
StaticStorage::Garage(grgconf) => Box::new(storage::garage::GrgCreds { StaticStorage::Garage(grgconf) => Box::new(storage::garage::GrgCreds {
region: grgconf.aws_region.clone(), region: grgconf.aws_region.clone(),
k2v_endpoint: grgconf.k2v_endpoint.clone(), k2v_endpoint: grgconf.k2v_endpoint.clone(),
@ -95,7 +100,7 @@ impl LoginProvider for StaticLoginProvider {
}), }),
}; };
let cr = CryptoRoot(user.crypto_root.clone()); let cr = CryptoRoot(user.config.crypto_root.clone());
let keys = cr.crypto_keys(password)?; let keys = cr.crypto_keys(password)?;
tracing::debug!(user=%username, "logged"); tracing::debug!(user=%username, "logged");
@ -109,8 +114,8 @@ impl LoginProvider for StaticLoginProvider {
Some(u) => u, Some(u) => u,
}; };
let storage: storage::Builders = match &user.storage { let storage: storage::Builders = match &user.config.storage {
StaticStorage::InMemory => Box::new(storage::in_memory::FullMem {}), StaticStorage::InMemory => Box::new(storage::in_memory::FullMem::new(&user.username)),
StaticStorage::Garage(grgconf) => Box::new(storage::garage::GrgCreds { StaticStorage::Garage(grgconf) => Box::new(storage::garage::GrgCreds {
region: grgconf.aws_region.clone(), region: grgconf.aws_region.clone(),
k2v_endpoint: grgconf.k2v_endpoint.clone(), k2v_endpoint: grgconf.k2v_endpoint.clone(),
@ -121,7 +126,7 @@ impl LoginProvider for StaticLoginProvider {
}), }),
}; };
let cr = CryptoRoot(user.crypto_root.clone()); let cr = CryptoRoot(user.config.crypto_root.clone());
let public_key = cr.public_key()?; let public_key = cr.public_key()?;
Ok(PublicCredentials { Ok(PublicCredentials {

View file

@ -1,7 +1,7 @@
use crate::storage::*; use crate::storage::*;
#[derive(Clone, Debug, Hash)] #[derive(Clone, Debug, Hash)]
pub struct GrgCreds { pub struct GarageBuilder {
pub region: String, pub region: String,
pub s3_endpoint: String, pub s3_endpoint: String,
pub k2v_endpoint: String, pub k2v_endpoint: String,
@ -9,133 +9,47 @@ pub struct GrgCreds {
pub aws_secret_access_key: String, pub aws_secret_access_key: String,
pub bucket: String, pub bucket: String,
} }
pub struct GrgStore {}
pub struct GrgRef {}
pub struct GrgValue {}
#[derive(Clone, Debug, PartialEq)] impl IBuilder for GarageBuilder {
pub struct GrgOrphanRowRef {} fn build(&self) -> Box<dyn IStore> {
impl IBuilders for GrgCreds {
fn row_store(&self) -> Result<RowStore, StorageError> {
unimplemented!();
}
fn blob_store(&self) -> Result<BlobStore, StorageError> {
unimplemented!();
}
fn url(&self) -> &str {
return "grg://unimplemented;"
}
}
impl IRowStore for GrgStore {
fn row(&self, partition: &str, sort: &str) -> RowRef {
unimplemented!();
}
fn select(&self, selector: Selector) -> AsyncResult<Vec<RowValue>> {
unimplemented!();
}
fn rm(&self, selector: Selector) -> AsyncResult<()> {
unimplemented!();
}
fn from_orphan(&self, orphan: OrphanRowRef) -> Result<RowRef, StorageError> {
unimplemented!(); unimplemented!();
} }
} }
impl IRowRef for GrgRef { pub struct GarageStore {
/*fn clone_boxed(&self) -> RowRef { dummy: String,
unimplemented!(); }
}*/
fn to_orphan(&self) -> OrphanRowRef {
unimplemented!()
}
fn key(&self) -> (&str, &str) { #[async_trait]
impl IStore for GarageStore {
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
unimplemented!();
}
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
unimplemented!(); unimplemented!();
} }
fn set_value(&self, content: &[u8]) -> RowValue { async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
unimplemented!();
}
async fn row_poll(&self, value: RowRef) -> Result<RowVal, StorageError> {
unimplemented!(); unimplemented!();
} }
fn fetch(&self) -> AsyncResult<RowValue> {
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
unimplemented!();
}
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<BlobVal, StorageError> {
unimplemented!();
}
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
unimplemented!(); unimplemented!();
} }
fn rm(&self) -> AsyncResult<()> { async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
unimplemented!();
}
fn poll(&self) -> AsyncResult<RowValue> {
unimplemented!(); unimplemented!();
} }
} }
impl std::fmt::Debug for GrgRef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
unimplemented!();
}
}
impl IRowValue for GrgValue {
fn to_ref(&self) -> RowRef {
unimplemented!();
}
fn content(&self) -> ConcurrentValues {
unimplemented!();
}
fn push(&self) -> AsyncResult<()> {
unimplemented!();
}
}
impl std::fmt::Debug for GrgValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
unimplemented!();
}
}
/*
/// A custom S3 region, composed of a region name and endpoint.
/// We use this instead of rusoto_signature::Region so that we can
/// derive Hash and Eq
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct Region {
pub name: String,
pub endpoint: String,
}
impl Region {
pub fn as_rusoto_region(&self) -> rusoto_signature::Region {
rusoto_signature::Region::Custom {
name: self.name.clone(),
endpoint: self.endpoint.clone(),
}
}
}
*/
/*
pub struct Garage {
pub s3_region: Region,
pub k2v_region: Region,
pub aws_access_key_id: String,
pub aws_secret_access_key: String,
pub bucket: String,
}
impl StoreBuilder<> for Garage {
fn row_store(&self) ->
}
pub struct K2V {}
impl RowStore for K2V {
}*/

View file

@ -1,97 +1,134 @@
use futures::FutureExt;
use crate::storage::*; use crate::storage::*;
use std::collections::{HashMap, BTreeMap};
use std::ops::Bound::{Included, Unbounded, Excluded};
use std::sync::{Arc, RwLock};
#[derive(Clone, Debug, Hash)] /// This implementation is very inneficient, and not completely correct
pub struct FullMem {} /// Indeed, when the connector is dropped, the memory is freed.
pub struct MemStore {} /// It means that when a user disconnects, its data are lost.
pub struct MemRef {} /// It's intended only for basic debugging, do not use it for advanced tests...
pub struct MemValue {}
#[derive(Clone, Debug, PartialEq)] pub type ArcRow = Arc<RwLock<HashMap<String, BTreeMap<String, Vec<u8>>>>>;
pub struct MemOrphanRowRef {} pub type ArcBlob = Arc<RwLock<HashMap<String, Vec<u8>>>>;
impl IBuilders for FullMem { #[derive(Clone, Debug)]
fn row_store(&self) -> Result<RowStore, StorageError> { pub struct MemBuilder {
unimplemented!(); user: String,
} url: String,
row: ArcRow,
blob: ArcBlob,
}
fn blob_store(&self) -> Result<BlobStore, StorageError> { impl IBuilder for MemBuilder {
unimplemented!(); fn build(&self) -> Box<dyn IStore> {
} Box::new(MemStore {
row: self.row.clone(),
fn url(&self) -> &str { blob: self.blob.clone(),
return "mem://unimplemented;" })
} }
} }
impl IRowStore for MemStore { pub struct MemStore {
fn row(&self, partition: &str, sort: &str) -> RowRef { row: ArcRow,
unimplemented!(); blob: ArcBlob,
} }
fn select(&self, selector: Selector) -> AsyncResult<Vec<RowValue>> { impl MemStore {
unimplemented!() fn inner_fetch(&self, row_ref: &RowRef) -> Result<Vec<u8>, StorageError> {
} Ok(self.row
.read()
fn rm(&self, selector: Selector) -> AsyncResult<()> { .or(Err(StorageError::Internal))?
unimplemented!(); .get(&row_ref.uid.shard)
} .ok_or(StorageError::NotFound)?
.get(&row_ref.uid.sort)
fn from_orphan(&self, orphan: OrphanRowRef) -> Result<RowRef, StorageError> { .ok_or(StorageError::NotFound)?
unimplemented!(); .clone())
} }
} }
impl IRowRef for MemRef { #[async_trait]
fn to_orphan(&self) -> OrphanRowRef { impl IStore for MemStore {
unimplemented!() async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
match select {
Selector::Range { shard, sort_begin, sort_end } => {
Ok(self.row
.read()
.or(Err(StorageError::Internal))?
.get(*shard)
.ok_or(StorageError::NotFound)?
.range((Included(sort_begin.to_string()), Included(sort_end.to_string())))
.map(|(k, v)| RowVal {
row_ref: RowRef { uid: RowUid { shard: shard.to_string(), sort: k.to_string() }, causality: Some("c".to_string()) },
value: vec![Alternative::Value(v.clone())],
})
.collect::<Vec<_>>())
},
Selector::List(rlist) => {
let mut acc = vec![];
for row_ref in rlist {
let bytes = self.inner_fetch(row_ref)?;
let row_val = RowVal {
row_ref: row_ref.clone(),
value: vec![Alternative::Value(bytes)]
};
acc.push(row_val);
}
Ok(acc)
},
Selector::Prefix { shard, sort_prefix } => {
let mut sort_end = sort_prefix.to_string();
let last_bound = match sort_end.pop() {
None => Unbounded,
Some(ch) => {
let nc = char::from_u32(ch as u32 + 1).unwrap();
sort_end.push(nc);
Excluded(sort_end)
}
};
Ok(self.row
.read()
.or(Err(StorageError::Internal))?
.get(*shard)
.ok_or(StorageError::NotFound)?
.range((Included(sort_prefix.to_string()), last_bound))
.map(|(k, v)| RowVal {
row_ref: RowRef { uid: RowUid { shard: shard.to_string(), sort: k.to_string() }, causality: Some("c".to_string()) },
value: vec![Alternative::Value(v.clone())],
})
.collect::<Vec<_>>())
},
Selector::Single(row_ref) => {
let bytes = self.inner_fetch(row_ref)?;
Ok(vec![RowVal{ row_ref: row_ref.clone(), value: vec![Alternative::Value(bytes)]}])
}
}
} }
fn key(&self) -> (&str, &str) { async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
unimplemented!(); unimplemented!();
} }
/*fn clone_boxed(&self) -> RowRef { async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
unimplemented!(); unimplemented!();
}*/
fn set_value(&self, content: &[u8]) -> RowValue { }
async fn row_poll(&self, value: RowRef) -> Result<RowVal, StorageError> {
unimplemented!(); unimplemented!();
} }
fn fetch(&self) -> AsyncResult<RowValue> {
unimplemented!();
}
fn rm(&self) -> AsyncResult<()> {
unimplemented!();
}
fn poll(&self) -> AsyncResult<RowValue> {
async {
let rv: RowValue = Box::new(MemValue{});
Ok(rv)
}.boxed()
}
}
impl std::fmt::Debug for MemRef { async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { unimplemented!();
unimplemented!();
} }
} async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<BlobVal, StorageError> {
unimplemented!();
impl IRowValue for MemValue {
fn to_ref(&self) -> RowRef { }
unimplemented!(); async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
} unimplemented!();
fn content(&self) -> ConcurrentValues { }
unimplemented!(); async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
}
fn push(&self) -> AsyncResult<()> {
unimplemented!();
}
}
impl std::fmt::Debug for MemValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
unimplemented!(); unimplemented!();
} }
} }

View file

@ -8,39 +8,97 @@
* into the object system so it is not exposed. * into the object system so it is not exposed.
*/ */
use std::hash::{Hash, Hasher};
use futures::future::BoxFuture;
pub mod in_memory; pub mod in_memory;
pub mod garage; pub mod garage;
use std::hash::{Hash, Hasher};
use std::collections::HashMap;
use futures::future::BoxFuture;
use async_trait::async_trait;
#[derive(Debug, Clone)]
pub enum Alternative { pub enum Alternative {
Tombstone, Tombstone,
Value(Vec<u8>), Value(Vec<u8>),
} }
type ConcurrentValues = Vec<Alternative>; type ConcurrentValues = Vec<Alternative>;
#[derive(Debug)]
pub enum StorageError {
NotFound,
Internal,
}
#[derive(Debug, Clone)]
pub struct RowUid {
shard: String,
sort: String,
}
#[derive(Debug, Clone)]
pub struct RowRef {
uid: RowUid,
causality: Option<String>,
}
#[derive(Debug, Clone)]
pub struct RowVal {
row_ref: RowRef,
value: ConcurrentValues,
}
#[derive(Debug, Clone)]
pub struct BlobRef(String);
#[derive(Debug, Clone)]
pub struct BlobVal {
blob_ref: BlobRef,
meta: HashMap<String, String>,
value: Vec<u8>,
}
pub enum Selector<'a> {
Range { shard: &'a str, sort_begin: &'a str, sort_end: &'a str },
List (Vec<RowRef>), // list of (shard_key, sort_key)
Prefix { shard: &'a str, sort_prefix: &'a str },
Single(RowRef),
}
#[async_trait]
pub trait IStore {
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError>;
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError>;
async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError>;
async fn row_poll(&self, value: RowRef) -> Result<RowVal, StorageError>;
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError>;
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<BlobVal, StorageError>;
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError>;
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>;
}
pub trait IBuilder {
fn build(&self) -> Box<dyn IStore>;
}
/*
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub enum OrphanRowRef { pub enum OrphanRowRef {
Garage(garage::GrgOrphanRowRef), Garage(garage::GrgOrphanRowRef),
Memory(in_memory::MemOrphanRowRef), Memory(in_memory::MemOrphanRowRef),
} }
pub enum Selector<'a> {
Range { shard_key: &'a str, begin: &'a str, end: &'a str },
List (Vec<(&'a str, &'a str)>), // list of (shard_key, sort_key)
Prefix { shard_key: &'a str, prefix: &'a str },
}
#[derive(Debug)]
pub enum StorageError {
NotFound,
Internal,
IncompatibleOrphan,
}
impl std::fmt::Display for StorageError { impl std::fmt::Display for StorageError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("Storage Error: "); f.write_str("Storage Error: ")?;
match self { match self {
Self::NotFound => f.write_str("Item not found"), Self::NotFound => f.write_str("Item not found"),
Self::Internal => f.write_str("An internal error occured"), Self::Internal => f.write_str("An internal error occured"),
@ -55,6 +113,7 @@ pub type AsyncResult<'a, T> = BoxFuture<'a, Result<T, StorageError>>;
// ----- Builders // ----- Builders
pub trait IBuilders { pub trait IBuilders {
fn box_clone(&self) -> Builders;
fn row_store(&self) -> Result<RowStore, StorageError>; fn row_store(&self) -> Result<RowStore, StorageError>;
fn blob_store(&self) -> Result<BlobStore, StorageError>; fn blob_store(&self) -> Result<BlobStore, StorageError>;
fn url(&self) -> &str; fn url(&self) -> &str;
@ -62,8 +121,7 @@ pub trait IBuilders {
pub type Builders = Box<dyn IBuilders + Send + Sync>; pub type Builders = Box<dyn IBuilders + Send + Sync>;
impl Clone for Builders { impl Clone for Builders {
fn clone(&self) -> Self { fn clone(&self) -> Self {
// @FIXME write a real implementation with a box_clone function self.box_clone()
Box::new(in_memory::FullMem{})
} }
} }
impl std::fmt::Debug for Builders { impl std::fmt::Debug for Builders {
@ -102,7 +160,7 @@ pub trait IRowRef: std::fmt::Debug
fn rm(&self) -> AsyncResult<()>; fn rm(&self) -> AsyncResult<()>;
fn poll(&self) -> AsyncResult<RowValue>; fn poll(&self) -> AsyncResult<RowValue>;
} }
pub type RowRef = Box<dyn IRowRef + Send + Sync>; pub type RowRef<'a> = Box<dyn IRowRef + Send + Sync + 'a>;
pub trait IRowValue: std::fmt::Debug pub trait IRowValue: std::fmt::Debug
{ {
@ -138,3 +196,4 @@ pub trait IBlobValue {
fn push(&self) -> AsyncResult<()>; fn push(&self) -> AsyncResult<()>;
} }
pub type BlobValue = Box<dyn IBlobValue + Send + Sync>; pub type BlobValue = Box<dyn IBlobValue + Send + Sync>;
*/