table::insert_many, version_table::updated
This commit is contained in:
parent
53289b69e5
commit
dcf58499a4
6 changed files with 104 additions and 35 deletions
|
@ -16,7 +16,7 @@ futures-core = "0.3"
|
||||||
futures-channel = "0.3"
|
futures-channel = "0.3"
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
tokio = { version = "0.2", features = ["full"] }
|
tokio = { version = "0.2", features = ["full"] }
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive", "rc"] }
|
||||||
serde_bytes = "0.11"
|
serde_bytes = "0.11"
|
||||||
bincode = "1.2.1"
|
bincode = "1.2.1"
|
||||||
err-derive = "0.2.3"
|
err-derive = "0.2.3"
|
||||||
|
|
|
@ -44,7 +44,7 @@ impl TableFormat for BlockRefTable {
|
||||||
type S = UUID;
|
type S = UUID;
|
||||||
type E = BlockRef;
|
type E = BlockRef;
|
||||||
|
|
||||||
async fn updated(&self, old: Option<&Self::E>, new: &Self::E) {
|
async fn updated(&self, old: Option<Self::E>, new: Self::E) {
|
||||||
//unimplemented!()
|
//unimplemented!()
|
||||||
// TODO
|
// TODO
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,9 +97,7 @@ impl TableFormat for ObjectTable {
|
||||||
type S = String;
|
type S = String;
|
||||||
type E = Object;
|
type E = Object;
|
||||||
|
|
||||||
async fn updated(&self, old: Option<&Self::E>, new: &Self::E) {
|
async fn updated(&self, old: Option<Self::E>, new: Self::E) {
|
||||||
let old = old.cloned();
|
|
||||||
let new = new.clone();
|
|
||||||
let garage = self.garage.read().await.as_ref().cloned().unwrap();
|
let garage = self.garage.read().await.as_ref().cloned().unwrap();
|
||||||
garage.clone().background.spawn(async move {
|
garage.clone().background.spawn(async move {
|
||||||
// Propagate deletion of old versions
|
// Propagate deletion of old versions
|
||||||
|
|
|
@ -30,10 +30,10 @@ pub struct Config {
|
||||||
#[serde(default = "default_block_size")]
|
#[serde(default = "default_block_size")]
|
||||||
pub block_size: usize,
|
pub block_size: usize,
|
||||||
|
|
||||||
#[serde(default = "default_meta_replication_factor")]
|
#[serde(default = "default_replication_factor")]
|
||||||
pub meta_replication_factor: usize,
|
pub meta_replication_factor: usize,
|
||||||
|
|
||||||
#[serde(default = "default_data_replication_factor")]
|
#[serde(default = "default_replication_factor")]
|
||||||
pub data_replication_factor: usize,
|
pub data_replication_factor: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,10 +139,7 @@ impl Garage {
|
||||||
fn default_block_size() -> usize {
|
fn default_block_size() -> usize {
|
||||||
1048576
|
1048576
|
||||||
}
|
}
|
||||||
fn default_meta_replication_factor() -> usize {
|
fn default_replication_factor() -> usize {
|
||||||
3
|
|
||||||
}
|
|
||||||
fn default_data_replication_factor() -> usize {
|
|
||||||
3
|
3
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
101
src/table.rs
101
src/table.rs
|
@ -1,8 +1,12 @@
|
||||||
use async_trait::async_trait;
|
use std::collections::HashMap;
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use serde_bytes::ByteBuf;
|
||||||
|
use futures::stream::*;
|
||||||
|
|
||||||
use crate::data::*;
|
use crate::data::*;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::membership::System;
|
use crate::membership::System;
|
||||||
|
@ -52,9 +56,9 @@ pub enum TableRPC<F: TableFormat> {
|
||||||
Ok,
|
Ok,
|
||||||
|
|
||||||
ReadEntry(F::P, F::S),
|
ReadEntry(F::P, F::S),
|
||||||
ReadEntryResponse(Option<F::E>),
|
ReadEntryResponse(Option<ByteBuf>),
|
||||||
|
|
||||||
Update(Vec<F::E>),
|
Update(Vec<Arc<ByteBuf>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Partition {
|
pub struct Partition {
|
||||||
|
@ -116,7 +120,7 @@ pub trait TableFormat: Send + Sync {
|
||||||
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||||
type E: Entry<Self::P, Self::S>;
|
type E: Entry<Self::P, Self::S>;
|
||||||
|
|
||||||
async fn updated(&self, old: Option<&Self::E>, new: &Self::E);
|
async fn updated(&self, old: Option<Self::E>, new: Self::E);
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F: TableFormat + 'static> Table<F> {
|
impl<F: TableFormat + 'static> Table<F> {
|
||||||
|
@ -152,13 +156,63 @@ impl<F: TableFormat + 'static> Table<F> {
|
||||||
.walk_ring(&hash, self.param.replication_factor);
|
.walk_ring(&hash, self.param.replication_factor);
|
||||||
eprintln!("insert who: {:?}", who);
|
eprintln!("insert who: {:?}", who);
|
||||||
|
|
||||||
let rpc = &TableRPC::<F>::Update(vec![e.clone()]);
|
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
|
||||||
|
let rpc = &TableRPC::<F>::Update(vec![e_enc]);
|
||||||
|
|
||||||
self.rpc_try_call_many(&who[..], &rpc, self.param.write_quorum)
|
self.rpc_try_call_many(&who[..], &rpc, self.param.write_quorum)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> {
|
||||||
|
let mut call_list = HashMap::new();
|
||||||
|
|
||||||
|
for entry in entries.iter() {
|
||||||
|
let hash = entry.partition_key().hash();
|
||||||
|
let who = self
|
||||||
|
.system
|
||||||
|
.members
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
.walk_ring(&hash, self.param.replication_factor);
|
||||||
|
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
|
||||||
|
for node in who {
|
||||||
|
if !call_list.contains_key(&node) {
|
||||||
|
call_list.insert(node.clone(), vec![]);
|
||||||
|
}
|
||||||
|
call_list.get_mut(&node).unwrap().push(e_enc.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let call_futures = call_list.drain()
|
||||||
|
.map(|(node, entries)| async move {
|
||||||
|
let rpc = TableRPC::<F>::Update(entries);
|
||||||
|
let rpc_bytes = rmp_to_vec_all_named(&rpc)?;
|
||||||
|
let message = Message::TableRPC(self.name.to_string(), rpc_bytes);
|
||||||
|
|
||||||
|
let resp = rpc_call(
|
||||||
|
self.system.clone(),
|
||||||
|
&node,
|
||||||
|
&message,
|
||||||
|
self.param.timeout
|
||||||
|
).await?;
|
||||||
|
Ok::<_, Error>((node, resp))
|
||||||
|
});
|
||||||
|
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
|
||||||
|
let mut errors = vec![];
|
||||||
|
|
||||||
|
while let Some(resp) = resps.next().await {
|
||||||
|
if let Err(e) = resp {
|
||||||
|
errors.push(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if errors.len() > self.param.replication_factor - self.param.write_quorum {
|
||||||
|
Err(Error::Message("Too many errors".into()))
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn get(&self, partition_key: &F::P, sort_key: &F::S) -> Result<Option<F::E>, Error> {
|
pub async fn get(&self, partition_key: &F::P, sort_key: &F::S) -> Result<Option<F::E>, Error> {
|
||||||
let hash = partition_key.hash();
|
let hash = partition_key.hash();
|
||||||
let who = self
|
let who = self
|
||||||
|
@ -178,7 +232,8 @@ impl<F: TableFormat + 'static> Table<F> {
|
||||||
let mut not_all_same = false;
|
let mut not_all_same = false;
|
||||||
for resp in resps {
|
for resp in resps {
|
||||||
if let TableRPC::ReadEntryResponse(value) = resp {
|
if let TableRPC::ReadEntryResponse(value) = resp {
|
||||||
if let Some(v) = value {
|
if let Some(v_bytes) = value {
|
||||||
|
let v = rmp_serde::decode::from_read_ref::<_, F::E>(v_bytes.as_slice())?;
|
||||||
ret = match ret {
|
ret = match ret {
|
||||||
None => Some(v),
|
None => Some(v),
|
||||||
Some(mut x) => {
|
Some(mut x) => {
|
||||||
|
@ -196,19 +251,22 @@ impl<F: TableFormat + 'static> Table<F> {
|
||||||
}
|
}
|
||||||
if let Some(ret_entry) = &ret {
|
if let Some(ret_entry) = &ret {
|
||||||
if not_all_same {
|
if not_all_same {
|
||||||
// Repair on read
|
let _: Result<_, _> = self.repair_on_read(&who[..], &ret_entry).await;
|
||||||
let _: Result<_, _> = self
|
|
||||||
.rpc_try_call_many(
|
|
||||||
&who[..],
|
|
||||||
&TableRPC::<F>::Update(vec![ret_entry.clone()]),
|
|
||||||
who.len(),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn repair_on_read(&self, who: &[UUID], what: &F::E) -> Result<(), Error> {
|
||||||
|
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(what)?));
|
||||||
|
self.rpc_try_call_many(&who[..],
|
||||||
|
&TableRPC::<F>::Update(vec![what_enc]),
|
||||||
|
who.len(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map(|_|())
|
||||||
|
}
|
||||||
|
|
||||||
async fn rpc_try_call_many(
|
async fn rpc_try_call_many(
|
||||||
&self,
|
&self,
|
||||||
who: &[UUID],
|
who: &[UUID],
|
||||||
|
@ -263,18 +321,19 @@ impl<F: TableFormat + 'static> Table<F> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<F::E>, Error> {
|
fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> {
|
||||||
let tree_key = self.tree_key(p, s);
|
let tree_key = self.tree_key(p, s);
|
||||||
if let Some(bytes) = self.store.get(&tree_key)? {
|
if let Some(bytes) = self.store.get(&tree_key)? {
|
||||||
let e = rmp_serde::decode::from_read_ref::<_, F::E>(&bytes)?;
|
Ok(Some(ByteBuf::from(bytes.to_vec())))
|
||||||
Ok(Some(e))
|
|
||||||
} else {
|
} else {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_update(&self, mut entries: Vec<F::E>) -> Result<(), Error> {
|
async fn handle_update(&self, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> {
|
||||||
for update in entries.drain(..) {
|
for update_bytes in entries.drain(..) {
|
||||||
|
let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?;
|
||||||
|
|
||||||
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
|
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
|
||||||
|
|
||||||
let (old_entry, new_entry) = self.store.transaction(|db| {
|
let (old_entry, new_entry) = self.store.transaction(|db| {
|
||||||
|
@ -297,7 +356,7 @@ impl<F: TableFormat + 'static> Table<F> {
|
||||||
Ok((old_entry, new_entry))
|
Ok((old_entry, new_entry))
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
self.instance.updated(old_entry.as_ref(), &new_entry).await;
|
self.instance.updated(old_entry, new_entry).await;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,8 +63,23 @@ impl TableFormat for VersionTable {
|
||||||
type S = EmptySortKey;
|
type S = EmptySortKey;
|
||||||
type E = Version;
|
type E = Version;
|
||||||
|
|
||||||
async fn updated(&self, old: Option<&Self::E>, new: &Self::E) {
|
async fn updated(&self, old: Option<Self::E>, new: Self::E) {
|
||||||
//unimplemented!()
|
let garage = self.garage.read().await.as_ref().cloned().unwrap();
|
||||||
// TODO
|
garage.clone().background.spawn(async move {
|
||||||
|
// Propagate deletion of version blocks
|
||||||
|
if let Some(old_v) = old {
|
||||||
|
if new.deleted && !old_v.deleted {
|
||||||
|
let deleted_block_refs = old_v.blocks.iter()
|
||||||
|
.map(|vb| BlockRef{
|
||||||
|
block: vb.hash.clone(),
|
||||||
|
version: old_v.uuid.clone(),
|
||||||
|
deleted: true,
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
garage.block_ref_table.insert_many(&deleted_block_refs[..]).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue