New model for buckets #172
13 changed files with 70 additions and 70 deletions
|
@ -466,7 +466,7 @@ impl AdminRpcHandler {
|
||||||
F: TableSchema + 'static,
|
F: TableSchema + 'static,
|
||||||
R: TableReplication + 'static,
|
R: TableReplication + 'static,
|
||||||
{
|
{
|
||||||
writeln!(to, "\nTable stats for {}", t.data.name).unwrap();
|
writeln!(to, "\nTable stats for {}", F::TABLE_NAME).unwrap();
|
||||||
if opt.detailed {
|
if opt.detailed {
|
||||||
writeln!(to, " number of items: {}", t.data.store.len()).unwrap();
|
writeln!(to, " number of items: {}", t.data.store.len()).unwrap();
|
||||||
writeln!(
|
writeln!(
|
||||||
|
|
|
@ -44,6 +44,8 @@ pub struct BlockRefTable {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TableSchema for BlockRefTable {
|
impl TableSchema for BlockRefTable {
|
||||||
|
const TABLE_NAME: &'static str = "block_ref";
|
||||||
|
|
||||||
type P = Hash;
|
type P = Hash;
|
||||||
type S = Uuid;
|
type S = Uuid;
|
||||||
type E = BlockRef;
|
type E = BlockRef;
|
||||||
|
|
|
@ -114,6 +114,8 @@ impl Crdt for Bucket {
|
||||||
pub struct BucketTable;
|
pub struct BucketTable;
|
||||||
|
|
||||||
impl TableSchema for BucketTable {
|
impl TableSchema for BucketTable {
|
||||||
|
const TABLE_NAME: &'static str = "bucket";
|
||||||
|
|
||||||
type P = EmptyKey;
|
type P = EmptyKey;
|
||||||
type S = String;
|
type S = String;
|
||||||
type E = Bucket;
|
type E = Bucket;
|
||||||
|
|
|
@ -93,7 +93,6 @@ impl Garage {
|
||||||
meta_rep_param.clone(),
|
meta_rep_param.clone(),
|
||||||
system.clone(),
|
system.clone(),
|
||||||
&db,
|
&db,
|
||||||
"block_ref".to_string(),
|
|
||||||
);
|
);
|
||||||
|
|
||||||
info!("Initialize version_table...");
|
info!("Initialize version_table...");
|
||||||
|
@ -105,7 +104,6 @@ impl Garage {
|
||||||
meta_rep_param.clone(),
|
meta_rep_param.clone(),
|
||||||
system.clone(),
|
system.clone(),
|
||||||
&db,
|
&db,
|
||||||
"version".to_string(),
|
|
||||||
);
|
);
|
||||||
|
|
||||||
info!("Initialize object_table...");
|
info!("Initialize object_table...");
|
||||||
|
@ -117,26 +115,13 @@ impl Garage {
|
||||||
meta_rep_param,
|
meta_rep_param,
|
||||||
system.clone(),
|
system.clone(),
|
||||||
&db,
|
&db,
|
||||||
"object".to_string(),
|
|
||||||
);
|
);
|
||||||
|
|
||||||
info!("Initialize bucket_table...");
|
info!("Initialize bucket_table...");
|
||||||
let bucket_table = Table::new(
|
let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db);
|
||||||
BucketTable,
|
|
||||||
control_rep_param.clone(),
|
|
||||||
system.clone(),
|
|
||||||
&db,
|
|
||||||
"bucket".to_string(),
|
|
||||||
);
|
|
||||||
|
|
||||||
info!("Initialize key_table_table...");
|
info!("Initialize key_table_table...");
|
||||||
let key_table = Table::new(
|
let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db);
|
||||||
KeyTable,
|
|
||||||
control_rep_param,
|
|
||||||
system.clone(),
|
|
||||||
&db,
|
|
||||||
"key".to_string(),
|
|
||||||
);
|
|
||||||
|
|
||||||
info!("Initialize Garage...");
|
info!("Initialize Garage...");
|
||||||
let garage = Arc::new(Self {
|
let garage = Arc::new(Self {
|
||||||
|
|
|
@ -120,6 +120,8 @@ pub enum KeyFilter {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TableSchema for KeyTable {
|
impl TableSchema for KeyTable {
|
||||||
|
const TABLE_NAME: &'static str = "key";
|
||||||
|
|
||||||
type P = EmptyKey;
|
type P = EmptyKey;
|
||||||
type S = String;
|
type S = String;
|
||||||
type E = Key;
|
type E = Key;
|
||||||
|
|
|
@ -217,6 +217,8 @@ pub struct ObjectTable {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TableSchema for ObjectTable {
|
impl TableSchema for ObjectTable {
|
||||||
|
const TABLE_NAME: &'static str = "object";
|
||||||
|
|
||||||
type P = String;
|
type P = String;
|
||||||
type S = String;
|
type S = String;
|
||||||
type E = Object;
|
type E = Object;
|
||||||
|
|
|
@ -114,6 +114,8 @@ pub struct VersionTable {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TableSchema for VersionTable {
|
impl TableSchema for VersionTable {
|
||||||
|
const TABLE_NAME: &'static str = "version";
|
||||||
|
|
||||||
type P = Hash;
|
type P = Hash;
|
||||||
type S = EmptyKey;
|
type S = EmptyKey;
|
||||||
type E = Version;
|
type E = Version;
|
||||||
|
|
|
@ -19,7 +19,6 @@ use crate::schema::*;
|
||||||
pub struct TableData<F: TableSchema, R: TableReplication> {
|
pub struct TableData<F: TableSchema, R: TableReplication> {
|
||||||
system: Arc<System>,
|
system: Arc<System>,
|
||||||
|
|
||||||
pub name: String,
|
|
||||||
pub(crate) instance: F,
|
pub(crate) instance: F,
|
||||||
pub(crate) replication: R,
|
pub(crate) replication: R,
|
||||||
|
|
||||||
|
@ -36,31 +35,24 @@ where
|
||||||
F: TableSchema,
|
F: TableSchema,
|
||||||
R: TableReplication,
|
R: TableReplication,
|
||||||
{
|
{
|
||||||
pub fn new(
|
pub fn new(system: Arc<System>, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
|
||||||
system: Arc<System>,
|
|
||||||
name: String,
|
|
||||||
instance: F,
|
|
||||||
replication: R,
|
|
||||||
db: &sled::Db,
|
|
||||||
) -> Arc<Self> {
|
|
||||||
let store = db
|
let store = db
|
||||||
.open_tree(&format!("{}:table", name))
|
.open_tree(&format!("{}:table", F::TABLE_NAME))
|
||||||
.expect("Unable to open DB tree");
|
.expect("Unable to open DB tree");
|
||||||
|
|
||||||
let merkle_tree = db
|
let merkle_tree = db
|
||||||
.open_tree(&format!("{}:merkle_tree", name))
|
.open_tree(&format!("{}:merkle_tree", F::TABLE_NAME))
|
||||||
.expect("Unable to open DB Merkle tree tree");
|
.expect("Unable to open DB Merkle tree tree");
|
||||||
let merkle_todo = db
|
let merkle_todo = db
|
||||||
.open_tree(&format!("{}:merkle_todo", name))
|
.open_tree(&format!("{}:merkle_todo", F::TABLE_NAME))
|
||||||
.expect("Unable to open DB Merkle TODO tree");
|
.expect("Unable to open DB Merkle TODO tree");
|
||||||
|
|
||||||
let gc_todo = db
|
let gc_todo = db
|
||||||
.open_tree(&format!("{}:gc_todo_v2", name))
|
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
|
||||||
.expect("Unable to open DB tree");
|
.expect("Unable to open DB tree");
|
||||||
|
|
||||||
Arc::new(Self {
|
Arc::new(Self {
|
||||||
system,
|
system,
|
||||||
name,
|
|
||||||
instance,
|
instance,
|
||||||
replication,
|
replication,
|
||||||
store,
|
store,
|
||||||
|
@ -245,7 +237,7 @@ where
|
||||||
Err(e) => match F::try_migrate(bytes) {
|
Err(e) => match F::try_migrate(bytes) {
|
||||||
Some(x) => Ok(x),
|
Some(x) => Ok(x),
|
||||||
None => {
|
None => {
|
||||||
warn!("Unable to decode entry of {}: {}", self.name, e);
|
warn!("Unable to decode entry of {}: {}", F::TABLE_NAME, e);
|
||||||
for line in hexdump::hexdump_iter(bytes) {
|
for line in hexdump::hexdump_iter(bytes) {
|
||||||
debug!("{}", line);
|
debug!("{}", line);
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,11 +57,11 @@ where
|
||||||
pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
|
pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
|
||||||
let endpoint = system
|
let endpoint = system
|
||||||
.netapp
|
.netapp
|
||||||
.endpoint(format!("garage_table/gc.rs/Rpc:{}", data.name));
|
.endpoint(format!("garage_table/gc.rs/Rpc:{}", F::TABLE_NAME));
|
||||||
|
|
||||||
let gc = Arc::new(Self {
|
let gc = Arc::new(Self {
|
||||||
system: system.clone(),
|
system: system.clone(),
|
||||||
data: data.clone(),
|
data,
|
||||||
endpoint,
|
endpoint,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -69,7 +69,7 @@ where
|
||||||
|
|
||||||
let gc1 = gc.clone();
|
let gc1 = gc.clone();
|
||||||
system.background.spawn_worker(
|
system.background.spawn_worker(
|
||||||
format!("GC loop for {}", data.name),
|
format!("GC loop for {}", F::TABLE_NAME),
|
||||||
move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
|
move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -90,7 +90,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("({}) Error doing GC: {}", self.data.name, e);
|
warn!("({}) Error doing GC: {}", F::TABLE_NAME, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -160,7 +160,7 @@ where
|
||||||
return Ok(Some(Duration::from_secs(60)));
|
return Ok(Some(Duration::from_secs(60)));
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("({}) GC: doing {} items", self.data.name, entries.len());
|
debug!("({}) GC: doing {} items", F::TABLE_NAME, entries.len());
|
||||||
|
|
||||||
// Split entries to GC by the set of nodes on which they are stored.
|
// Split entries to GC by the set of nodes on which they are stored.
|
||||||
// Here we call them partitions but they are not exactly
|
// Here we call them partitions but they are not exactly
|
||||||
|
@ -262,7 +262,8 @@ where
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"({}) GC: {} items successfully pushed, will try to delete.",
|
"({}) GC: {} items successfully pushed, will try to delete.",
|
||||||
self.data.name, n_items
|
F::TABLE_NAME,
|
||||||
|
n_items
|
||||||
);
|
);
|
||||||
|
|
||||||
// Step 2: delete tombstones everywhere.
|
// Step 2: delete tombstones everywhere.
|
||||||
|
|
|
@ -82,7 +82,7 @@ where
|
||||||
|
|
||||||
let ret2 = ret.clone();
|
let ret2 = ret.clone();
|
||||||
background.spawn_worker(
|
background.spawn_worker(
|
||||||
format!("Merkle tree updater for {}", ret.data.name),
|
format!("Merkle tree updater for {}", F::TABLE_NAME),
|
||||||
|must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit),
|
|must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -97,14 +97,16 @@ where
|
||||||
if let Err(e) = self.update_item(&key[..], &valhash[..]) {
|
if let Err(e) = self.update_item(&key[..], &valhash[..]) {
|
||||||
warn!(
|
warn!(
|
||||||
"({}) Error while updating Merkle tree item: {}",
|
"({}) Error while updating Merkle tree item: {}",
|
||||||
self.data.name, e
|
F::TABLE_NAME,
|
||||||
|
e
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(
|
warn!(
|
||||||
"({}) Error while iterating on Merkle todo tree: {}",
|
"({}) Error while iterating on Merkle todo tree: {}",
|
||||||
self.data.name, e
|
F::TABLE_NAME,
|
||||||
|
e
|
||||||
);
|
);
|
||||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||||
}
|
}
|
||||||
|
@ -147,7 +149,8 @@ where
|
||||||
if !deleted {
|
if !deleted {
|
||||||
debug!(
|
debug!(
|
||||||
"({}) Item not deleted from Merkle todo because it changed: {:?}",
|
"({}) Item not deleted from Merkle todo because it changed: {:?}",
|
||||||
self.data.name, k
|
F::TABLE_NAME,
|
||||||
|
k
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -183,7 +186,7 @@ where
|
||||||
// should not happen
|
// should not happen
|
||||||
warn!(
|
warn!(
|
||||||
"({}) Replacing intermediate node with empty node, should not happen.",
|
"({}) Replacing intermediate node with empty node, should not happen.",
|
||||||
self.data.name
|
F::TABLE_NAME
|
||||||
);
|
);
|
||||||
Some(MerkleNode::Empty)
|
Some(MerkleNode::Empty)
|
||||||
} else if children.len() == 1 {
|
} else if children.len() == 1 {
|
||||||
|
@ -195,7 +198,7 @@ where
|
||||||
MerkleNode::Empty => {
|
MerkleNode::Empty => {
|
||||||
warn!(
|
warn!(
|
||||||
"({}) Single subnode in tree is empty Merkle node",
|
"({}) Single subnode in tree is empty Merkle node",
|
||||||
self.data.name
|
F::TABLE_NAME
|
||||||
);
|
);
|
||||||
Some(MerkleNode::Empty)
|
Some(MerkleNode::Empty)
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,12 +57,19 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
|
||||||
|
|
||||||
/// Trait for the schema used in a table
|
/// Trait for the schema used in a table
|
||||||
pub trait TableSchema: Send + Sync {
|
pub trait TableSchema: Send + Sync {
|
||||||
|
/// The name of the table in the database
|
||||||
|
const TABLE_NAME: &'static str;
|
||||||
|
|
||||||
/// The partition key used in that table
|
/// The partition key used in that table
|
||||||
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||||
/// The sort key used int that table
|
/// The sort key used int that table
|
||||||
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||||
|
|
||||||
/// They type for an entry in that table
|
/// They type for an entry in that table
|
||||||
type E: Entry<Self::P, Self::S>;
|
type E: Entry<Self::P, Self::S>;
|
||||||
|
|
||||||
|
/// The type for a filter that can be applied to select entries
|
||||||
|
/// (e.g. filter out deleted entries)
|
||||||
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||||
|
|
||||||
// Action to take if not able to decode current version:
|
// Action to take if not able to decode current version:
|
||||||
|
|
|
@ -77,13 +77,13 @@ where
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
let endpoint = system
|
let endpoint = system
|
||||||
.netapp
|
.netapp
|
||||||
.endpoint(format!("garage_table/sync.rs/Rpc:{}", data.name));
|
.endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME));
|
||||||
|
|
||||||
let todo = SyncTodo { todo: vec![] };
|
let todo = SyncTodo { todo: vec![] };
|
||||||
|
|
||||||
let syncer = Arc::new(Self {
|
let syncer = Arc::new(Self {
|
||||||
system: system.clone(),
|
system: system.clone(),
|
||||||
data: data.clone(),
|
data,
|
||||||
merkle,
|
merkle,
|
||||||
todo: Mutex::new(todo),
|
todo: Mutex::new(todo),
|
||||||
endpoint,
|
endpoint,
|
||||||
|
@ -95,13 +95,13 @@ where
|
||||||
|
|
||||||
let s1 = syncer.clone();
|
let s1 = syncer.clone();
|
||||||
system.background.spawn_worker(
|
system.background.spawn_worker(
|
||||||
format!("table sync watcher for {}", data.name),
|
format!("table sync watcher for {}", F::TABLE_NAME),
|
||||||
move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
|
move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
|
||||||
);
|
);
|
||||||
|
|
||||||
let s2 = syncer.clone();
|
let s2 = syncer.clone();
|
||||||
system.background.spawn_worker(
|
system.background.spawn_worker(
|
||||||
format!("table syncer for {}", data.name),
|
format!("table syncer for {}", F::TABLE_NAME),
|
||||||
move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
|
move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -128,7 +128,7 @@ where
|
||||||
_ = ring_recv.changed().fuse() => {
|
_ = ring_recv.changed().fuse() => {
|
||||||
let new_ring = ring_recv.borrow();
|
let new_ring = ring_recv.borrow();
|
||||||
if !Arc::ptr_eq(&new_ring, &prev_ring) {
|
if !Arc::ptr_eq(&new_ring, &prev_ring) {
|
||||||
debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name);
|
debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
|
||||||
self.add_full_sync();
|
self.add_full_sync();
|
||||||
prev_ring = new_ring.clone();
|
prev_ring = new_ring.clone();
|
||||||
}
|
}
|
||||||
|
@ -146,7 +146,7 @@ where
|
||||||
_ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
|
_ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
|
||||||
if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
|
if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
|
||||||
nothing_to_do_since = None;
|
nothing_to_do_since = None;
|
||||||
debug!("({}) Interval passed, adding full sync to syncer todo list", self.data.name);
|
debug!("({}) Interval passed, adding full sync to syncer todo list", F::TABLE_NAME);
|
||||||
self.add_full_sync();
|
self.add_full_sync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -177,7 +177,9 @@ where
|
||||||
if let Err(e) = res {
|
if let Err(e) = res {
|
||||||
warn!(
|
warn!(
|
||||||
"({}) Error while syncing {:?}: {}",
|
"({}) Error while syncing {:?}: {}",
|
||||||
self.data.name, partition, e
|
F::TABLE_NAME,
|
||||||
|
partition,
|
||||||
|
e
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -205,7 +207,9 @@ where
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"({}) Syncing {:?} with {:?}...",
|
"({}) Syncing {:?} with {:?}...",
|
||||||
self.data.name, partition, nodes
|
F::TABLE_NAME,
|
||||||
|
partition,
|
||||||
|
nodes
|
||||||
);
|
);
|
||||||
let mut sync_futures = nodes
|
let mut sync_futures = nodes
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -219,7 +223,7 @@ where
|
||||||
while let Some(r) = sync_futures.next().await {
|
while let Some(r) = sync_futures.next().await {
|
||||||
if let Err(e) = r {
|
if let Err(e) = r {
|
||||||
n_errors += 1;
|
n_errors += 1;
|
||||||
warn!("({}) Sync error: {}", self.data.name, e);
|
warn!("({}) Sync error: {}", F::TABLE_NAME, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if n_errors > self.data.replication.max_write_errors() {
|
if n_errors > self.data.replication.max_write_errors() {
|
||||||
|
@ -272,7 +276,7 @@ where
|
||||||
if nodes.contains(&self.system.id) {
|
if nodes.contains(&self.system.id) {
|
||||||
warn!(
|
warn!(
|
||||||
"({}) Interrupting offload as partitions seem to have changed",
|
"({}) Interrupting offload as partitions seem to have changed",
|
||||||
self.data.name
|
F::TABLE_NAME
|
||||||
);
|
);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -286,7 +290,7 @@ where
|
||||||
counter += 1;
|
counter += 1;
|
||||||
info!(
|
info!(
|
||||||
"({}) Offloading {} items from {:?}..{:?} ({})",
|
"({}) Offloading {} items from {:?}..{:?} ({})",
|
||||||
self.data.name,
|
F::TABLE_NAME,
|
||||||
items.len(),
|
items.len(),
|
||||||
begin,
|
begin,
|
||||||
end,
|
end,
|
||||||
|
@ -329,7 +333,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
if not_removed > 0 {
|
if not_removed > 0 {
|
||||||
debug!("({}) {} items not removed during offload because they changed in between (trying again...)", self.data.name, not_removed);
|
debug!("({}) {} items not removed during offload because they changed in between (trying again...)", F::TABLE_NAME, not_removed);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -360,7 +364,9 @@ where
|
||||||
if root_ck.is_empty() {
|
if root_ck.is_empty() {
|
||||||
debug!(
|
debug!(
|
||||||
"({}) Sync {:?} with {:?}: partition is empty.",
|
"({}) Sync {:?} with {:?}: partition is empty.",
|
||||||
self.data.name, partition, who
|
F::TABLE_NAME,
|
||||||
|
partition,
|
||||||
|
who
|
||||||
);
|
);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
@ -384,7 +390,9 @@ where
|
||||||
SyncRpc::RootCkDifferent(false) => {
|
SyncRpc::RootCkDifferent(false) => {
|
||||||
debug!(
|
debug!(
|
||||||
"({}) Sync {:?} with {:?}: no difference",
|
"({}) Sync {:?} with {:?}: no difference",
|
||||||
self.data.name, partition, who
|
F::TABLE_NAME,
|
||||||
|
partition,
|
||||||
|
who
|
||||||
);
|
);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
@ -413,11 +421,11 @@ where
|
||||||
// Just send that item directly
|
// Just send that item directly
|
||||||
if let Some(val) = self.data.store.get(&ik[..])? {
|
if let Some(val) = self.data.store.get(&ik[..])? {
|
||||||
if blake2sum(&val[..]) != ivhash {
|
if blake2sum(&val[..]) != ivhash {
|
||||||
warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik);
|
warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik);
|
||||||
}
|
}
|
||||||
todo_items.push(val.to_vec());
|
todo_items.push(val.to_vec());
|
||||||
} else {
|
} else {
|
||||||
warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik);
|
warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
MerkleNode::Intermediate(l) => {
|
MerkleNode::Intermediate(l) => {
|
||||||
|
@ -482,7 +490,7 @@ where
|
||||||
async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
|
async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
|
||||||
info!(
|
info!(
|
||||||
"({}) Sending {} items to {:?}",
|
"({}) Sending {} items to {:?}",
|
||||||
self.data.name,
|
F::TABLE_NAME,
|
||||||
item_value_list.len(),
|
item_value_list.len(),
|
||||||
who
|
who
|
||||||
);
|
);
|
||||||
|
|
|
@ -55,18 +55,12 @@ where
|
||||||
{
|
{
|
||||||
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
|
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
|
||||||
|
|
||||||
pub fn new(
|
pub fn new(instance: F, replication: R, system: Arc<System>, db: &sled::Db) -> Arc<Self> {
|
||||||
instance: F,
|
|
||||||
replication: R,
|
|
||||||
system: Arc<System>,
|
|
||||||
db: &sled::Db,
|
|
||||||
name: String,
|
|
||||||
) -> Arc<Self> {
|
|
||||||
let endpoint = system
|
let endpoint = system
|
||||||
.netapp
|
.netapp
|
||||||
.endpoint(format!("garage_table/table.rs/Rpc:{}", name));
|
.endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME));
|
||||||
|
|
||||||
let data = TableData::new(system.clone(), name, instance, replication, db);
|
let data = TableData::new(system.clone(), instance, replication, db);
|
||||||
|
|
||||||
let merkle_updater = MerkleUpdater::launch(&system.background, data.clone());
|
let merkle_updater = MerkleUpdater::launch(&system.background, data.clone());
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue