From 29a1e94f233af943243e4334447db6209e3e80f1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 17 Apr 2020 19:38:47 +0200 Subject: [PATCH] Implement missing handler for read_range --- src/table.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/table.rs b/src/table.rs index 2ae70398..9b67ad94 100644 --- a/src/table.rs +++ b/src/table.rs @@ -388,6 +388,10 @@ impl Table { let value = self.handle_read_entry(&key, &sort_key)?; Ok(TableRPC::ReadEntryResponse(value)) } + TableRPC::ReadRange(key, begin_sort_key, filter, limit) => { + let values = self.handle_read_range(&key, &begin_sort_key, &filter, limit)?; + Ok(TableRPC::Update(values)) + } TableRPC::Update(pairs) => { self.handle_update(pairs).await?; Ok(TableRPC::Ok) @@ -412,6 +416,38 @@ impl Table { } } + fn handle_read_range( + &self, + p: &F::P, + s: &F::S, + filter: &Option, + limit: usize, + ) -> Result>, Error> { + let partition_hash = p.hash(); + let first_key = self.tree_key(p, s); + let mut ret = vec![]; + for item in self.store.range(first_key..) { + let (key, value) = item?; + if &key[..32] != partition_hash.as_slice() { + break; + } + let keep = match filter { + None => true, + Some(f) => { + let entry = rmp_serde::decode::from_read_ref::<_, F::E>(value.as_ref())?; + F::matches_filter(&entry, f) + } + }; + if keep { + ret.push(Arc::new(ByteBuf::from(value.as_ref()))); + } + if ret.len() >= limit { + break; + } + } + Ok(ret) + } + pub async fn handle_update( self: &Arc, mut entries: Vec>,