implement poll

This commit is contained in:
Quentin 2023-12-26 20:02:13 +01:00
parent 18bba784ee
commit 477a784e45
Signed by: quentin
GPG key ID: E9602264D639FF68

View file

@ -191,7 +191,36 @@ impl IStore for GarageStore {
} }
} }
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> { async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
unimplemented!(); loop {
if let Some(ct) = &value.causality {
match self.k2v.poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None).await {
Err(e) => {
tracing::error!("Unable to poll item: {}", e);
return Err(StorageError::Internal);
}
Ok(None) => continue,
Ok(Some(cv)) => return Ok(causal_to_row_val(value.clone(), cv)),
}
} else {
match self.k2v.read_item(&value.uid.shard, &value.uid.sort).await {
Err(k2v_client::Error::NotFound) => {
self
.k2v
.insert_item(&value.uid.shard, &value.uid.sort, vec![0u8], None)
.await
.map_err(|e| {
tracing::error!("Unable to insert item in polling logic: {}", e);
StorageError::Internal
})?;
}
Err(e) => {
tracing::error!("Unable to read item in polling logic: {}", e);
return Err(StorageError::Internal)
},
Ok(cv) => return Ok(causal_to_row_val(value.clone(), cv)),
}
}
}
} }
async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> { async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> {