forked from Deuxfleurs/garage
Implement missing handler for read_range
This commit is contained in:
parent
b4e96bdcf0
commit
29a1e94f23
1 changed files with 36 additions and 0 deletions
36
src/table.rs
36
src/table.rs
|
@ -388,6 +388,10 @@ impl<F: TableSchema + 'static> Table<F> {
|
||||||
let value = self.handle_read_entry(&key, &sort_key)?;
|
let value = self.handle_read_entry(&key, &sort_key)?;
|
||||||
Ok(TableRPC::ReadEntryResponse(value))
|
Ok(TableRPC::ReadEntryResponse(value))
|
||||||
}
|
}
|
||||||
|
TableRPC::ReadRange(key, begin_sort_key, filter, limit) => {
|
||||||
|
let values = self.handle_read_range(&key, &begin_sort_key, &filter, limit)?;
|
||||||
|
Ok(TableRPC::Update(values))
|
||||||
|
}
|
||||||
TableRPC::Update(pairs) => {
|
TableRPC::Update(pairs) => {
|
||||||
self.handle_update(pairs).await?;
|
self.handle_update(pairs).await?;
|
||||||
Ok(TableRPC::Ok)
|
Ok(TableRPC::Ok)
|
||||||
|
@ -412,6 +416,38 @@ impl<F: TableSchema + 'static> Table<F> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn handle_read_range(
|
||||||
|
&self,
|
||||||
|
p: &F::P,
|
||||||
|
s: &F::S,
|
||||||
|
filter: &Option<F::Filter>,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<Vec<Arc<ByteBuf>>, Error> {
|
||||||
|
let partition_hash = p.hash();
|
||||||
|
let first_key = self.tree_key(p, s);
|
||||||
|
let mut ret = vec![];
|
||||||
|
for item in self.store.range(first_key..) {
|
||||||
|
let (key, value) = item?;
|
||||||
|
if &key[..32] != partition_hash.as_slice() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let keep = match filter {
|
||||||
|
None => true,
|
||||||
|
Some(f) => {
|
||||||
|
let entry = rmp_serde::decode::from_read_ref::<_, F::E>(value.as_ref())?;
|
||||||
|
F::matches_filter(&entry, f)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if keep {
|
||||||
|
ret.push(Arc::new(ByteBuf::from(value.as_ref())));
|
||||||
|
}
|
||||||
|
if ret.len() >= limit {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(ret)
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn handle_update(
|
pub async fn handle_update(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
mut entries: Vec<Arc<ByteBuf>>,
|
mut entries: Vec<Arc<ByteBuf>>,
|
||||||
|
|
Loading…
Reference in a new issue