insert logic

This commit is contained in:
Quentin 2023-12-26 18:33:56 +01:00
parent 78f2d86fc8
commit 18bba784ee
Signed by: quentin
GPG key ID: E9602264D639FF68

View file

@ -97,33 +97,41 @@ fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) ->
#[async_trait] #[async_trait]
impl IStore for GarageStore { impl IStore for GarageStore {
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> { async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
let batch_op = match select { let (pk_list, batch_op) = match select {
Selector::Range { shard, sort_begin, sort_end } => vec![k2v_client::BatchReadOp { Selector::Range { shard, sort_begin, sort_end } => (
partition_key: shard, vec![shard.to_string()],
filter: k2v_client::Filter { vec![k2v_client::BatchReadOp {
start: Some(sort_begin), partition_key: shard,
end: Some(sort_end), filter: k2v_client::Filter {
..k2v_client::Filter::default() start: Some(sort_begin),
}, end: Some(sort_end),
..k2v_client::BatchReadOp::default() ..k2v_client::Filter::default()
}], },
Selector::List(row_ref_list) => row_ref_list.iter().map(|row_ref| k2v_client::BatchReadOp { ..k2v_client::BatchReadOp::default()
partition_key: &row_ref.uid.shard, }]
filter: k2v_client::Filter { ),
start: Some(&row_ref.uid.sort), Selector::List(row_ref_list) => (
..k2v_client::Filter::default() row_ref_list.iter().map(|row_ref| row_ref.uid.shard.to_string()).collect::<Vec<_>>(),
}, row_ref_list.iter().map(|row_ref| k2v_client::BatchReadOp {
single_item: true, partition_key: &row_ref.uid.shard,
..k2v_client::BatchReadOp::default() filter: k2v_client::Filter {
}).collect::<Vec<_>>(), start: Some(&row_ref.uid.sort),
Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchReadOp { ..k2v_client::Filter::default()
},
single_item: true,
..k2v_client::BatchReadOp::default()
}).collect::<Vec<_>>()
),
Selector::Prefix { shard, sort_prefix } => (
vec![shard.to_string()],
vec![k2v_client::BatchReadOp {
partition_key: shard, partition_key: shard,
filter: k2v_client::Filter { filter: k2v_client::Filter {
prefix: Some(sort_prefix), prefix: Some(sort_prefix),
..k2v_client::Filter::default() ..k2v_client::Filter::default()
}, },
..k2v_client::BatchReadOp::default() ..k2v_client::BatchReadOp::default()
}], }]),
Selector::Single(row_ref) => { Selector::Single(row_ref) => {
let causal_value = match self.k2v.read_item(&row_ref.uid.shard, &row_ref.uid.sort).await { let causal_value = match self.k2v.read_item(&row_ref.uid.shard, &row_ref.uid.sort).await {
Err(e) => { Err(e) => {
@ -138,7 +146,7 @@ impl IStore for GarageStore {
}, },
}; };
let all_res = match self.k2v.read_batch(&batch_op).await { let all_raw_res = match self.k2v.read_batch(&batch_op).await {
Err(e) => { Err(e) => {
tracing::error!("k2v read batch failed for {:?}, bucket {} with err: {}", select, self.bucket, e); tracing::error!("k2v read batch failed for {:?}, bucket {} with err: {}", select, self.bucket, e);
return Err(StorageError::Internal); return Err(StorageError::Internal);
@ -146,15 +154,41 @@ impl IStore for GarageStore {
Ok(v) => v, Ok(v) => v,
}; };
unimplemented!(); let row_vals = all_raw_res
.into_iter()
.fold(vec![], |mut acc, v| {
acc.extend(v.items);
acc
})
.into_iter()
.zip(pk_list.into_iter())
.map(|((sk, cv), pk)| causal_to_row_val(RowRef::new(&pk, &sk), cv))
.collect::<Vec<_>>();
Ok(row_vals)
} }
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
unimplemented!(); unimplemented!();
} }
async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> { async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
unimplemented!(); let batch_ops = values.iter().map(|v| k2v_client::BatchInsertOp {
partition_key: &v.row_ref.uid.shard,
sort_key: &v.row_ref.uid.sort,
causality: v.row_ref.causality.clone().map(|ct| ct.into()),
value: v.value.iter().next().map(|cv| match cv {
Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()),
Alternative::Tombstone => k2v_client::K2vValue::Tombstone,
}).unwrap_or(k2v_client::K2vValue::Tombstone)
}).collect::<Vec<_>>();
match self.k2v.insert_batch(&batch_ops).await {
Err(e) => {
tracing::error!("k2v can't insert some value: {}", e);
Err(StorageError::Internal)
},
Ok(v) => Ok(v),
}
} }
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> { async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
unimplemented!(); unimplemented!();