forked from Deuxfleurs/garage
Make UUID & Hash Copy and remove some .clone() noise
This commit is contained in:
parent
8915224966
commit
ec59e896c6
12 changed files with 64 additions and 80 deletions
|
@ -123,7 +123,7 @@ async fn handle_put(
|
||||||
versions: Vec::new(),
|
versions: Vec::new(),
|
||||||
};
|
};
|
||||||
object.versions.push(Box::new(ObjectVersion {
|
object.versions.push(Box::new(ObjectVersion {
|
||||||
uuid: version_uuid.clone(),
|
uuid: version_uuid,
|
||||||
timestamp: now_msec(),
|
timestamp: now_msec(),
|
||||||
mime_type: mime_type.to_string(),
|
mime_type: mime_type.to_string(),
|
||||||
size: first_block.len() as u64,
|
size: first_block.len() as u64,
|
||||||
|
@ -139,7 +139,7 @@ async fn handle_put(
|
||||||
}
|
}
|
||||||
|
|
||||||
let version = Version {
|
let version = Version {
|
||||||
uuid: version_uuid.clone(),
|
uuid: version_uuid,
|
||||||
deleted: false,
|
deleted: false,
|
||||||
blocks: Vec::new(),
|
blocks: Vec::new(),
|
||||||
bucket: bucket.into(),
|
bucket: bucket.into(),
|
||||||
|
@ -147,12 +147,11 @@ async fn handle_put(
|
||||||
};
|
};
|
||||||
|
|
||||||
let first_block_hash = hash(&first_block[..]);
|
let first_block_hash = hash(&first_block[..]);
|
||||||
object.versions[0].data = ObjectVersionData::FirstBlock(first_block_hash.clone());
|
object.versions[0].data = ObjectVersionData::FirstBlock(first_block_hash);
|
||||||
garage.object_table.insert(&object).await?;
|
garage.object_table.insert(&object).await?;
|
||||||
|
|
||||||
let mut next_offset = first_block.len();
|
let mut next_offset = first_block.len();
|
||||||
let mut put_curr_version_block =
|
let mut put_curr_version_block = put_block_meta(garage.clone(), &version, 0, first_block_hash);
|
||||||
put_block_meta(garage.clone(), &version, 0, first_block_hash.clone());
|
|
||||||
let mut put_curr_block = garage
|
let mut put_curr_block = garage
|
||||||
.block_manager
|
.block_manager
|
||||||
.rpc_put_block(first_block_hash, first_block);
|
.rpc_put_block(first_block_hash, first_block);
|
||||||
|
@ -163,12 +162,8 @@ async fn handle_put(
|
||||||
if let Some(block) = next_block {
|
if let Some(block) = next_block {
|
||||||
let block_hash = hash(&block[..]);
|
let block_hash = hash(&block[..]);
|
||||||
let block_len = block.len();
|
let block_len = block.len();
|
||||||
put_curr_version_block = put_block_meta(
|
put_curr_version_block =
|
||||||
garage.clone(),
|
put_block_meta(garage.clone(), &version, next_offset as u64, block_hash);
|
||||||
&version,
|
|
||||||
next_offset as u64,
|
|
||||||
block_hash.clone(),
|
|
||||||
);
|
|
||||||
put_curr_block = garage.block_manager.rpc_put_block(block_hash, block);
|
put_curr_block = garage.block_manager.rpc_put_block(block_hash, block);
|
||||||
next_offset += block_len;
|
next_offset += block_len;
|
||||||
} else {
|
} else {
|
||||||
|
@ -191,14 +186,11 @@ async fn put_block_meta(
|
||||||
hash: Hash,
|
hash: Hash,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let mut version = version.clone();
|
let mut version = version.clone();
|
||||||
version.blocks.push(VersionBlock {
|
version.blocks.push(VersionBlock { offset, hash: hash });
|
||||||
offset,
|
|
||||||
hash: hash.clone(),
|
|
||||||
});
|
|
||||||
|
|
||||||
let block_ref = BlockRef {
|
let block_ref = BlockRef {
|
||||||
block: hash,
|
block: hash,
|
||||||
version: version.uuid.clone(),
|
version: version.uuid,
|
||||||
deleted: false,
|
deleted: false,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -279,7 +271,7 @@ async fn handle_delete(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<U
|
||||||
versions: Vec::new(),
|
versions: Vec::new(),
|
||||||
};
|
};
|
||||||
object.versions.push(Box::new(ObjectVersion {
|
object.versions.push(Box::new(ObjectVersion {
|
||||||
uuid: version_uuid.clone(),
|
uuid: version_uuid,
|
||||||
timestamp: now_msec(),
|
timestamp: now_msec(),
|
||||||
mime_type: "application/x-delete-marker".into(),
|
mime_type: "application/x-delete-marker".into(),
|
||||||
size: 0,
|
size: 0,
|
||||||
|
@ -339,7 +331,7 @@ async fn handle_get(
|
||||||
let mut blocks = version
|
let mut blocks = version
|
||||||
.blocks
|
.blocks
|
||||||
.iter()
|
.iter()
|
||||||
.map(|vb| (vb.hash.clone(), None))
|
.map(|vb| (vb.hash, None))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
blocks[0].1 = Some(first_block);
|
blocks[0].1 = Some(first_block);
|
||||||
|
|
||||||
|
|
13
src/block.rs
13
src/block.rs
|
@ -156,13 +156,10 @@ impl BlockManager {
|
||||||
warn!("Block {:?} is corrupted. Deleting and resyncing.", hash);
|
warn!("Block {:?} is corrupted. Deleting and resyncing.", hash);
|
||||||
fs::remove_file(path).await?;
|
fs::remove_file(path).await?;
|
||||||
self.put_to_resync(&hash, 0)?;
|
self.put_to_resync(&hash, 0)?;
|
||||||
return Err(Error::CorruptData(hash.clone()));
|
return Err(Error::CorruptData(*hash));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Message::PutBlock(PutBlockMessage {
|
Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
|
||||||
hash: hash.clone(),
|
|
||||||
data,
|
|
||||||
}))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
|
pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
|
||||||
|
@ -273,7 +270,7 @@ impl BlockManager {
|
||||||
if needed_by_others {
|
if needed_by_others {
|
||||||
let ring = garage.system.ring.borrow().clone();
|
let ring = garage.system.ring.borrow().clone();
|
||||||
let who = ring.walk_ring(&hash, garage.system.config.data_replication_factor);
|
let who = ring.walk_ring(&hash, garage.system.config.data_replication_factor);
|
||||||
let msg = Arc::new(Message::NeedBlockQuery(hash.clone()));
|
let msg = Arc::new(Message::NeedBlockQuery(*hash));
|
||||||
let who_needs_fut = who.iter().map(|to| {
|
let who_needs_fut = who.iter().map(|to| {
|
||||||
self.rpc_client
|
self.rpc_client
|
||||||
.call(to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT)
|
.call(to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT)
|
||||||
|
@ -329,7 +326,7 @@ impl BlockManager {
|
||||||
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
|
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
|
||||||
let ring = self.system.ring.borrow().clone();
|
let ring = self.system.ring.borrow().clone();
|
||||||
let who = ring.walk_ring(&hash, self.system.config.data_replication_factor);
|
let who = ring.walk_ring(&hash, self.system.config.data_replication_factor);
|
||||||
let msg = Arc::new(Message::GetBlock(hash.clone()));
|
let msg = Arc::new(Message::GetBlock(*hash));
|
||||||
let mut resp_stream = who
|
let mut resp_stream = who
|
||||||
.iter()
|
.iter()
|
||||||
.map(|to| self.rpc_client.call(to, msg.clone(), BLOCK_RW_TIMEOUT))
|
.map(|to| self.rpc_client.call(to, msg.clone(), BLOCK_RW_TIMEOUT))
|
||||||
|
@ -374,7 +371,7 @@ impl BlockManager {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if !block_ref.deleted {
|
if !block_ref.deleted {
|
||||||
last_hash = Some(block_ref.block.clone());
|
last_hash = Some(block_ref.block);
|
||||||
self.put_to_resync(&block_ref.block, 0)?;
|
self.put_to_resync(&block_ref.block, 0)?;
|
||||||
}
|
}
|
||||||
i += 1;
|
i += 1;
|
||||||
|
|
|
@ -5,7 +5,7 @@ use sha2::{Digest, Sha256};
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq)]
|
#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)]
|
||||||
pub struct FixedBytes32([u8; 32]);
|
pub struct FixedBytes32([u8; 32]);
|
||||||
|
|
||||||
impl From<[u8; 32]> for FixedBytes32 {
|
impl From<[u8; 32]> for FixedBytes32 {
|
||||||
|
|
|
@ -299,7 +299,7 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let status_keys = status.iter().map(|x| x.id.clone()).collect::<HashSet<_>>();
|
let status_keys = status.iter().map(|x| x.id).collect::<HashSet<_>>();
|
||||||
if config
|
if config
|
||||||
.members
|
.members
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -347,7 +347,7 @@ async fn cmd_configure(
|
||||||
let mut candidates = vec![];
|
let mut candidates = vec![];
|
||||||
for adv in status.iter() {
|
for adv in status.iter() {
|
||||||
if hex::encode(&adv.id).starts_with(&args.node_id) {
|
if hex::encode(&adv.id).starts_with(&args.node_id) {
|
||||||
candidates.push(adv.id.clone());
|
candidates.push(adv.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if candidates.len() != 1 {
|
if candidates.len() != 1 {
|
||||||
|
@ -401,7 +401,7 @@ async fn cmd_remove(
|
||||||
let mut candidates = vec![];
|
let mut candidates = vec![];
|
||||||
for (key, _) in config.members.iter() {
|
for (key, _) in config.members.iter() {
|
||||||
if hex::encode(key).starts_with(&args.node_id) {
|
if hex::encode(key).starts_with(&args.node_id) {
|
||||||
candidates.push(key.clone());
|
candidates.push(*key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if candidates.len() != 1 {
|
if candidates.len() != 1 {
|
||||||
|
|
|
@ -125,9 +125,9 @@ impl Status {
|
||||||
fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
|
fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
|
||||||
let addr = SocketAddr::new(ip, info.rpc_port);
|
let addr = SocketAddr::new(ip, info.rpc_port);
|
||||||
let old_status = self.nodes.insert(
|
let old_status = self.nodes.insert(
|
||||||
info.id.clone(),
|
info.id,
|
||||||
StatusEntry {
|
StatusEntry {
|
||||||
addr: addr.clone(),
|
addr,
|
||||||
remaining_ping_attempts: MAX_FAILED_PINGS,
|
remaining_ping_attempts: MAX_FAILED_PINGS,
|
||||||
state_info: info.state_info.clone(),
|
state_info: info.state_info.clone(),
|
||||||
},
|
},
|
||||||
|
@ -177,7 +177,7 @@ impl Ring {
|
||||||
|
|
||||||
new_ring.push(RingEntry {
|
new_ring.push(RingEntry {
|
||||||
location: location.into(),
|
location: location.into(),
|
||||||
node: id.clone(),
|
node: *id,
|
||||||
datacenter,
|
datacenter,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -227,10 +227,10 @@ impl Ring {
|
||||||
delta += 1;
|
delta += 1;
|
||||||
|
|
||||||
if !datacenters.contains(&self.ring[i].datacenter) {
|
if !datacenters.contains(&self.ring[i].datacenter) {
|
||||||
ret.push(self.ring[i].node.clone());
|
ret.push(self.ring[i].node);
|
||||||
datacenters.push(self.ring[i].datacenter);
|
datacenters.push(self.ring[i].datacenter);
|
||||||
} else if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) {
|
} else if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) {
|
||||||
ret.push(self.ring[i].node.clone());
|
ret.push(self.ring[i].node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -363,9 +363,9 @@ impl System {
|
||||||
let status = self.status.borrow().clone();
|
let status = self.status.borrow().clone();
|
||||||
let ring = self.ring.borrow().clone();
|
let ring = self.ring.borrow().clone();
|
||||||
Message::Ping(PingMessage {
|
Message::Ping(PingMessage {
|
||||||
id: self.id.clone(),
|
id: self.id,
|
||||||
rpc_port: self.config.rpc_bind_addr.port(),
|
rpc_port: self.config.rpc_bind_addr.port(),
|
||||||
status_hash: status.hash.clone(),
|
status_hash: status.hash,
|
||||||
config_version: ring.config.version,
|
config_version: ring.config.version,
|
||||||
state_info: self.state_info.clone(),
|
state_info: self.state_info.clone(),
|
||||||
})
|
})
|
||||||
|
@ -387,7 +387,7 @@ impl System {
|
||||||
.config
|
.config
|
||||||
.bootstrap_peers
|
.bootstrap_peers
|
||||||
.iter()
|
.iter()
|
||||||
.map(|ip| (ip.clone(), None))
|
.map(|ip| (*ip, None))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
self.clone().ping_nodes(bootstrap_peers).await;
|
self.clone().ping_nodes(bootstrap_peers).await;
|
||||||
|
|
||||||
|
@ -407,7 +407,7 @@ impl System {
|
||||||
async move {
|
async move {
|
||||||
(
|
(
|
||||||
id_option,
|
id_option,
|
||||||
addr.clone(),
|
addr,
|
||||||
sys.rpc_client
|
sys.rpc_client
|
||||||
.by_addr()
|
.by_addr()
|
||||||
.call(&addr, ping_msg_ref, PING_TIMEOUT)
|
.call(&addr, ping_msg_ref, PING_TIMEOUT)
|
||||||
|
@ -430,18 +430,18 @@ impl System {
|
||||||
if is_new {
|
if is_new {
|
||||||
has_changes = true;
|
has_changes = true;
|
||||||
to_advertise.push(AdvertisedNode {
|
to_advertise.push(AdvertisedNode {
|
||||||
id: info.id.clone(),
|
id: info.id,
|
||||||
addr: addr.clone(),
|
addr: *addr,
|
||||||
state_info: info.state_info.clone(),
|
state_info: info.state_info.clone(),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
if is_new || status.hash != info.status_hash {
|
if is_new || status.hash != info.status_hash {
|
||||||
self.background
|
self.background
|
||||||
.spawn_cancellable(self.clone().pull_status(info.id.clone()).map(Ok));
|
.spawn_cancellable(self.clone().pull_status(info.id).map(Ok));
|
||||||
}
|
}
|
||||||
if is_new || ring.config.version < info.config_version {
|
if is_new || ring.config.version < info.config_version {
|
||||||
self.background
|
self.background
|
||||||
.spawn_cancellable(self.clone().pull_config(info.id.clone()).map(Ok));
|
.spawn_cancellable(self.clone().pull_config(info.id).map(Ok));
|
||||||
}
|
}
|
||||||
} else if let Some(id) = id_option {
|
} else if let Some(id) = id_option {
|
||||||
let remaining_attempts = status
|
let remaining_attempts = status
|
||||||
|
@ -489,7 +489,7 @@ impl System {
|
||||||
if is_new {
|
if is_new {
|
||||||
status.recalculate_hash();
|
status.recalculate_hash();
|
||||||
}
|
}
|
||||||
let status_hash = status.hash.clone();
|
let status_hash = status.hash;
|
||||||
let config_version = self.ring.borrow().config.version;
|
let config_version = self.ring.borrow().config.version;
|
||||||
|
|
||||||
update_locked.0.broadcast(Arc::new(status))?;
|
update_locked.0.broadcast(Arc::new(status))?;
|
||||||
|
@ -497,11 +497,11 @@ impl System {
|
||||||
|
|
||||||
if is_new || status_hash != ping.status_hash {
|
if is_new || status_hash != ping.status_hash {
|
||||||
self.background
|
self.background
|
||||||
.spawn_cancellable(self.clone().pull_status(ping.id.clone()).map(Ok));
|
.spawn_cancellable(self.clone().pull_status(ping.id).map(Ok));
|
||||||
}
|
}
|
||||||
if is_new || config_version < ping.config_version {
|
if is_new || config_version < ping.config_version {
|
||||||
self.background
|
self.background
|
||||||
.spawn_cancellable(self.clone().pull_config(ping.id.clone()).map(Ok));
|
.spawn_cancellable(self.clone().pull_config(ping.id).map(Ok));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(self.make_ping())
|
Ok(self.make_ping())
|
||||||
|
@ -517,8 +517,8 @@ impl System {
|
||||||
status.state_info.clone()
|
status.state_info.clone()
|
||||||
};
|
};
|
||||||
mem.push(AdvertisedNode {
|
mem.push(AdvertisedNode {
|
||||||
id: node.clone(),
|
id: *node,
|
||||||
addr: status.addr.clone(),
|
addr: status.addr,
|
||||||
state_info,
|
state_info,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -545,7 +545,7 @@ impl System {
|
||||||
// learn our own ip address
|
// learn our own ip address
|
||||||
let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_bind_addr.port());
|
let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_bind_addr.port());
|
||||||
let old_self = status.nodes.insert(
|
let old_self = status.nodes.insert(
|
||||||
node.id.clone(),
|
node.id,
|
||||||
StatusEntry {
|
StatusEntry {
|
||||||
addr: self_addr,
|
addr: self_addr,
|
||||||
remaining_ping_attempts: MAX_FAILED_PINGS,
|
remaining_ping_attempts: MAX_FAILED_PINGS,
|
||||||
|
@ -557,7 +557,7 @@ impl System {
|
||||||
Some(x) => x.addr != self_addr,
|
Some(x) => x.addr != self_addr,
|
||||||
};
|
};
|
||||||
} else if !status.nodes.contains_key(&node.id) {
|
} else if !status.nodes.contains_key(&node.id) {
|
||||||
to_ping.push((node.addr.clone(), Some(node.id.clone())));
|
to_ping.push((node.addr, Some(node.id)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if has_changed {
|
if has_changed {
|
||||||
|
@ -607,7 +607,7 @@ impl System {
|
||||||
.nodes
|
.nodes
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|(id, _)| **id != self.id)
|
.filter(|(id, _)| **id != self.id)
|
||||||
.map(|(id, status)| (status.addr.clone(), Some(id.clone())))
|
.map(|(id, status)| (status.addr, Some(*id)))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
self.clone().ping_nodes(ping_addrs).await;
|
self.clone().ping_nodes(ping_addrs).await;
|
||||||
|
|
|
@ -113,7 +113,7 @@ impl TableSchema for ObjectTable {
|
||||||
.is_err()
|
.is_err()
|
||||||
{
|
{
|
||||||
let deleted_version = Version {
|
let deleted_version = Version {
|
||||||
uuid: v.uuid.clone(),
|
uuid: v.uuid,
|
||||||
deleted: true,
|
deleted: true,
|
||||||
blocks: vec![],
|
blocks: vec![],
|
||||||
bucket: old_v.bucket.clone(),
|
bucket: old_v.bucket.clone(),
|
||||||
|
|
|
@ -53,7 +53,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
||||||
let addr = {
|
let addr = {
|
||||||
let status = self.status.borrow().clone();
|
let status = self.status.borrow().clone();
|
||||||
match status.nodes.get(to.borrow()) {
|
match status.nodes.get(to.borrow()) {
|
||||||
Some(status) => status.addr.clone(),
|
Some(status) => status.addr,
|
||||||
None => {
|
None => {
|
||||||
return Err(Error::Message(format!(
|
return Err(Error::Message(format!(
|
||||||
"Peer ID not found: {:?}",
|
"Peer ID not found: {:?}",
|
||||||
|
@ -93,7 +93,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
||||||
.map(|to| {
|
.map(|to| {
|
||||||
let self2 = self.clone();
|
let self2 = self.clone();
|
||||||
let msg = msg.clone();
|
let msg = msg.clone();
|
||||||
async move { self2.call(to.clone(), msg, timeout).await }
|
async move { self2.call(to, msg, timeout).await }
|
||||||
})
|
})
|
||||||
.collect::<FuturesUnordered<_>>();
|
.collect::<FuturesUnordered<_>>();
|
||||||
|
|
||||||
|
|
|
@ -195,7 +195,7 @@ where
|
||||||
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
|
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
|
||||||
for node in who {
|
for node in who {
|
||||||
if !call_list.contains_key(&node) {
|
if !call_list.contains_key(&node) {
|
||||||
call_list.insert(node.clone(), vec![]);
|
call_list.insert(node, vec![]);
|
||||||
}
|
}
|
||||||
call_list.get_mut(&node).unwrap().push(e_enc.clone());
|
call_list.get_mut(&node).unwrap().push(e_enc.clone());
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,12 +38,12 @@ impl TableFullReplication {
|
||||||
|
|
||||||
// Recalculate neighbors
|
// Recalculate neighbors
|
||||||
let ring = system.ring.borrow().clone();
|
let ring = system.ring.borrow().clone();
|
||||||
let my_id = system.id.clone();
|
let my_id = system.id;
|
||||||
|
|
||||||
let mut nodes = vec![];
|
let mut nodes = vec![];
|
||||||
for (node, _) in ring.config.members.iter() {
|
for (node, _) in ring.config.members.iter() {
|
||||||
let node_ranking = hash(&[node.as_slice(), my_id.as_slice()].concat());
|
let node_ranking = hash(&[node.as_slice(), my_id.as_slice()].concat());
|
||||||
nodes.push((node.clone(), node_ranking));
|
nodes.push((*node, node_ranking));
|
||||||
}
|
}
|
||||||
nodes.sort_by(|(_, rank1), (_, rank2)| rank1.cmp(rank2));
|
nodes.sort_by(|(_, rank1), (_, rank2)| rank1.cmp(rank2));
|
||||||
let mut neighbors = nodes
|
let mut neighbors = nodes
|
||||||
|
@ -69,7 +69,7 @@ impl TableReplication for TableFullReplication {
|
||||||
// Inconvenient: only suitable to reasonably small tables
|
// Inconvenient: only suitable to reasonably small tables
|
||||||
|
|
||||||
fn read_nodes(&self, _hash: &Hash, system: &System) -> Vec<UUID> {
|
fn read_nodes(&self, _hash: &Hash, system: &System) -> Vec<UUID> {
|
||||||
vec![system.id.clone()]
|
vec![system.id]
|
||||||
}
|
}
|
||||||
fn read_quorum(&self) -> usize {
|
fn read_quorum(&self) -> usize {
|
||||||
1
|
1
|
||||||
|
|
|
@ -47,7 +47,7 @@ impl TableReplication for TableShardedReplication {
|
||||||
|
|
||||||
ret.push([0u8; 32].into());
|
ret.push([0u8; 32].into());
|
||||||
for entry in ring.ring.iter() {
|
for entry in ring.ring.iter() {
|
||||||
ret.push(entry.location.clone());
|
ret.push(entry.location);
|
||||||
}
|
}
|
||||||
ret.push([0xFFu8; 32].into());
|
ret.push([0xFFu8; 32].into());
|
||||||
ret
|
ret
|
||||||
|
|
|
@ -228,7 +228,7 @@ where
|
||||||
partition: &TodoPartition,
|
partition: &TodoPartition,
|
||||||
must_exit: &mut watch::Receiver<bool>,
|
must_exit: &mut watch::Receiver<bool>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let my_id = self.table.system.id.clone();
|
let my_id = self.table.system.id;
|
||||||
let nodes = self
|
let nodes = self
|
||||||
.table
|
.table
|
||||||
.replication
|
.replication
|
||||||
|
@ -251,7 +251,7 @@ where
|
||||||
self.clone().do_sync_with(
|
self.clone().do_sync_with(
|
||||||
partition.clone(),
|
partition.clone(),
|
||||||
root_cks.clone(),
|
root_cks.clone(),
|
||||||
node.clone(),
|
*node,
|
||||||
partition.retain,
|
partition.retain,
|
||||||
must_exit.clone(),
|
must_exit.clone(),
|
||||||
)
|
)
|
||||||
|
@ -361,8 +361,8 @@ where
|
||||||
.range_checksum_cached_hash(&sub_range, must_exit)
|
.range_checksum_cached_hash(&sub_range, must_exit)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if let Some(hash) = &sub_ck.hash {
|
if let Some(hash) = sub_ck.hash {
|
||||||
children.push((sub_range.clone(), hash.clone()));
|
children.push((sub_range.clone(), hash));
|
||||||
if sub_ck.time < time {
|
if sub_ck.time < time {
|
||||||
time = sub_ck.time;
|
time = sub_ck.time;
|
||||||
}
|
}
|
||||||
|
@ -527,7 +527,7 @@ where
|
||||||
self.table.handle_update(diff_items).await?;
|
self.table.handle_update(diff_items).await?;
|
||||||
}
|
}
|
||||||
if items_to_send.len() > 0 {
|
if items_to_send.len() > 0 {
|
||||||
self.send_items(who.clone(), items_to_send).await?;
|
self.send_items(who, items_to_send).await?;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Err(Error::BadRequest(format!(
|
return Err(Error::BadRequest(format!(
|
||||||
|
@ -688,7 +688,7 @@ where
|
||||||
|
|
||||||
impl SyncTodo {
|
impl SyncTodo {
|
||||||
fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, table: &Table<F, R>) {
|
fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, table: &Table<F, R>) {
|
||||||
let my_id = table.system.id.clone();
|
let my_id = table.system.id;
|
||||||
|
|
||||||
self.todo.clear();
|
self.todo.clear();
|
||||||
|
|
||||||
|
@ -696,19 +696,14 @@ impl SyncTodo {
|
||||||
let split_points = table.replication.split_points(&ring);
|
let split_points = table.replication.split_points(&ring);
|
||||||
|
|
||||||
for i in 0..split_points.len() - 1 {
|
for i in 0..split_points.len() - 1 {
|
||||||
let begin = split_points[i].clone();
|
let begin = split_points[i];
|
||||||
let end = split_points[i + 1].clone();
|
let end = split_points[i + 1];
|
||||||
let nodes = table.replication.replication_nodes(&begin, &ring);
|
let nodes = table.replication.replication_nodes(&begin, &ring);
|
||||||
|
|
||||||
let retain = nodes.contains(&my_id);
|
let retain = nodes.contains(&my_id);
|
||||||
if !retain {
|
if !retain {
|
||||||
// Check if we have some data to send, otherwise skip
|
// Check if we have some data to send, otherwise skip
|
||||||
if table
|
if table.store.range(begin..end).next().is_none() {
|
||||||
.store
|
|
||||||
.range(begin.clone()..end.clone())
|
|
||||||
.next()
|
|
||||||
.is_none()
|
|
||||||
{
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -723,7 +718,7 @@ impl SyncTodo {
|
||||||
old_ring: &Ring,
|
old_ring: &Ring,
|
||||||
new_ring: &Ring,
|
new_ring: &Ring,
|
||||||
) {
|
) {
|
||||||
let my_id = table.system.id.clone();
|
let my_id = table.system.id;
|
||||||
|
|
||||||
// If it is us who are entering or leaving the system,
|
// If it is us who are entering or leaving the system,
|
||||||
// initiate a full sync instead of incremental sync
|
// initiate a full sync instead of incremental sync
|
||||||
|
@ -738,8 +733,8 @@ impl SyncTodo {
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.chain(table.replication.split_points(old_ring).drain(..))
|
.chain(table.replication.split_points(old_ring).drain(..))
|
||||||
.chain(table.replication.split_points(new_ring).drain(..))
|
.chain(table.replication.split_points(new_ring).drain(..))
|
||||||
.chain(self.todo.iter().map(|x| x.begin.clone()))
|
.chain(self.todo.iter().map(|x| x.begin))
|
||||||
.chain(self.todo.iter().map(|x| x.end.clone()))
|
.chain(self.todo.iter().map(|x| x.end))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
all_points.sort();
|
all_points.sort();
|
||||||
all_points.dedup();
|
all_points.dedup();
|
||||||
|
@ -749,8 +744,8 @@ impl SyncTodo {
|
||||||
let mut new_todo = vec![];
|
let mut new_todo = vec![];
|
||||||
|
|
||||||
for i in 0..all_points.len() - 1 {
|
for i in 0..all_points.len() - 1 {
|
||||||
let begin = all_points[i].clone();
|
let begin = all_points[i];
|
||||||
let end = all_points[i + 1].clone();
|
let end = all_points[i + 1];
|
||||||
let was_ours = table
|
let was_ours = table
|
||||||
.replication
|
.replication
|
||||||
.replication_nodes(&begin, &old_ring)
|
.replication_nodes(&begin, &old_ring)
|
||||||
|
|
|
@ -77,8 +77,8 @@ impl TableSchema for VersionTable {
|
||||||
.blocks
|
.blocks
|
||||||
.iter()
|
.iter()
|
||||||
.map(|vb| BlockRef {
|
.map(|vb| BlockRef {
|
||||||
block: vb.hash.clone(),
|
block: vb.hash,
|
||||||
version: old_v.uuid.clone(),
|
version: old_v.uuid,
|
||||||
deleted: true,
|
deleted: true,
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
Loading…
Reference in a new issue