Garage v0.9 #473
7 changed files with 77 additions and 46 deletions
|
@ -32,7 +32,7 @@ args@{
|
||||||
ignoreLockHash,
|
ignoreLockHash,
|
||||||
}:
|
}:
|
||||||
let
|
let
|
||||||
nixifiedLockHash = "9b1f88c1c5b4639605886c7135957a8fb750d938f789300ba6dae958cae460d9";
|
nixifiedLockHash = "a68c589851ec1990d29cdc20e8b922b27c1a6b402b682f7b0d9a9e6258f25828";
|
||||||
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
|
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
|
||||||
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
|
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
|
||||||
lockHashIgnored = if ignoreLockHash
|
lockHashIgnored = if ignoreLockHash
|
||||||
|
@ -1736,6 +1736,7 @@ in
|
||||||
arc_swap = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".arc-swap."1.5.0" { inherit profileName; }).out;
|
arc_swap = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".arc-swap."1.5.0" { inherit profileName; }).out;
|
||||||
async_trait = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; }).out;
|
async_trait = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; }).out;
|
||||||
bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }).out;
|
bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }).out;
|
||||||
|
bytesize = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytesize."1.1.0" { inherit profileName; }).out;
|
||||||
${ if rootFeatures' ? "garage/consul-discovery" || rootFeatures' ? "garage_rpc/consul-discovery" || rootFeatures' ? "garage_rpc/err-derive" then "err_derive" else null } = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; }).out;
|
${ if rootFeatures' ? "garage/consul-discovery" || rootFeatures' ? "garage_rpc/consul-discovery" || rootFeatures' ? "garage_rpc/err-derive" then "err_derive" else null } = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; }).out;
|
||||||
futures = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; }).out;
|
futures = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; }).out;
|
||||||
futures_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.21" { inherit profileName; }).out;
|
futures_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.21" { inherit profileName; }).out;
|
||||||
|
|
|
@ -163,16 +163,13 @@ pub async fn handle_apply_cluster_layout(
|
||||||
|
|
||||||
let layout = garage.system.get_cluster_layout();
|
let layout = garage.system.get_cluster_layout();
|
||||||
let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
|
let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
|
||||||
//TODO : how to display msg ? Should it be in the Body Response ?
|
|
||||||
for s in msg.iter() {
|
|
||||||
println!("{}", s);
|
|
||||||
}
|
|
||||||
|
|
||||||
garage.system.update_cluster_layout(&layout).await?;
|
garage.system.update_cluster_layout(&layout).await?;
|
||||||
|
|
||||||
Ok(Response::builder()
|
Ok(Response::builder()
|
||||||
.status(StatusCode::NO_CONTENT)
|
.status(StatusCode::NO_CONTENT)
|
||||||
.body(Body::empty())?)
|
.header(http::header::CONTENT_TYPE, "text/plain")
|
||||||
|
.body(Body::from(msg.join("\n")))?)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_revert_cluster_layout(
|
pub async fn handle_revert_cluster_layout(
|
||||||
|
|
|
@ -2,8 +2,6 @@
|
||||||
#[cfg(feature = "sqlite")]
|
#[cfg(feature = "sqlite")]
|
||||||
extern crate tracing;
|
extern crate tracing;
|
||||||
|
|
||||||
#[cfg(not(any(feature = "lmdb", feature = "sled", feature = "sqlite")))]
|
|
||||||
//compile_error!("Must activate the Cargo feature for at least one DB engine: lmdb, sled or sqlite.");
|
|
||||||
#[cfg(feature = "lmdb")]
|
#[cfg(feature = "lmdb")]
|
||||||
pub mod lmdb_adapter;
|
pub mod lmdb_adapter;
|
||||||
#[cfg(feature = "sled")]
|
#[cfg(feature = "sled")]
|
||||||
|
|
|
@ -330,7 +330,7 @@ pub async fn send_layout(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn print_cluster_layout(layout: &ClusterLayout) -> bool {
|
pub fn print_cluster_layout(layout: &ClusterLayout) -> bool {
|
||||||
let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable".to_string()];
|
let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable capacity".to_string()];
|
||||||
for (id, _, role) in layout.roles.items().iter() {
|
for (id, _, role) in layout.roles.items().iter() {
|
||||||
let role = match &role.0 {
|
let role = match &role.0 {
|
||||||
Some(r) => r,
|
Some(r) => r,
|
||||||
|
@ -338,16 +338,26 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool {
|
||||||
};
|
};
|
||||||
let tags = role.tags.join(",");
|
let tags = role.tags.join(",");
|
||||||
let usage = layout.get_node_usage(id).unwrap_or(0);
|
let usage = layout.get_node_usage(id).unwrap_or(0);
|
||||||
let capacity = layout.get_node_capacity(id).unwrap_or(1);
|
let capacity = layout.get_node_capacity(id).unwrap_or(0);
|
||||||
table.push(format!(
|
if capacity > 0 {
|
||||||
"{:?}\t{}\t{}\t{}\t{} ({:.1}%)",
|
table.push(format!(
|
||||||
id,
|
"{:?}\t{}\t{}\t{}\t{} ({:.1}%)",
|
||||||
tags,
|
id,
|
||||||
role.zone,
|
tags,
|
||||||
role.capacity_string(),
|
role.zone,
|
||||||
ByteSize::b(usage as u64 * layout.partition_size).to_string_as(false),
|
role.capacity_string(),
|
||||||
(100.0 * usage as f32 * layout.partition_size as f32) / (capacity as f32)
|
ByteSize::b(usage as u64 * layout.partition_size).to_string_as(false),
|
||||||
));
|
(100.0 * usage as f32 * layout.partition_size as f32) / (capacity as f32)
|
||||||
|
));
|
||||||
|
} else {
|
||||||
|
table.push(format!(
|
||||||
|
"{:?}\t{}\t{}\t{}",
|
||||||
|
id,
|
||||||
|
tags,
|
||||||
|
role.zone,
|
||||||
|
role.capacity_string(),
|
||||||
|
));
|
||||||
|
};
|
||||||
}
|
}
|
||||||
println!();
|
println!();
|
||||||
println!("Parameters of the layout computation:");
|
println!("Parameters of the layout computation:");
|
||||||
|
|
|
@ -17,6 +17,9 @@ compile_error!("Either bundled-libs or system-libs Cargo feature must be enabled
|
||||||
#[cfg(all(feature = "bundled-libs", feature = "system-libs"))]
|
#[cfg(all(feature = "bundled-libs", feature = "system-libs"))]
|
||||||
compile_error!("Only one of bundled-libs and system-libs Cargo features must be enabled");
|
compile_error!("Only one of bundled-libs and system-libs Cargo features must be enabled");
|
||||||
|
|
||||||
|
#[cfg(not(any(feature = "lmdb", feature = "sled", feature = "sqlite")))]
|
||||||
|
compile_error!("Must activate the Cargo feature for at least one DB engine: lmdb, sled or sqlite.");
|
||||||
|
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
|
|
@ -187,11 +187,11 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
self.roles.retain(|(_, _, v)| v.0.is_some());
|
self.roles.retain(|(_, _, v)| v.0.is_some());
|
||||||
self.parameters = self.staging_parameters.get().clone();
|
self.parameters = self.staging_parameters.get().clone();
|
||||||
|
|
||||||
let msg = self.calculate_partition_assignation()?;
|
|
||||||
|
|
||||||
self.staging_roles.clear();
|
self.staging_roles.clear();
|
||||||
self.staging_hash = self.calculate_staging_hash();
|
self.staging_hash = self.calculate_staging_hash();
|
||||||
|
|
||||||
|
let msg = self.calculate_partition_assignation()?;
|
||||||
|
|
||||||
self.version += 1;
|
self.version += 1;
|
||||||
|
|
||||||
Ok((self, msg))
|
Ok((self, msg))
|
||||||
|
@ -214,8 +214,8 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
}
|
}
|
||||||
|
|
||||||
self.staging_roles.clear();
|
self.staging_roles.clear();
|
||||||
self.staging_hash = self.calculate_staging_hash();
|
|
||||||
self.staging_parameters.update(self.parameters.clone());
|
self.staging_parameters.update(self.parameters.clone());
|
||||||
|
self.staging_hash = self.calculate_staging_hash();
|
||||||
|
|
||||||
self.version += 1;
|
self.version += 1;
|
||||||
|
|
||||||
|
@ -310,11 +310,11 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
/// Check a cluster layout for internal consistency
|
/// Check a cluster layout for internal consistency
|
||||||
/// (assignation, roles, parameters, partition size)
|
/// (assignation, roles, parameters, partition size)
|
||||||
/// returns true if consistent, false if error
|
/// returns true if consistent, false if error
|
||||||
pub fn check(&self) -> bool {
|
pub fn check(&self) -> Result<(), String> {
|
||||||
// Check that the hash of the staging data is correct
|
// Check that the hash of the staging data is correct
|
||||||
let staging_hash = self.calculate_staging_hash();
|
let staging_hash = self.calculate_staging_hash();
|
||||||
if staging_hash != self.staging_hash {
|
if staging_hash != self.staging_hash {
|
||||||
return false;
|
return Err("staging_hash is incorrect".into());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that node_id_vec contains the correct list of nodes
|
// Check that node_id_vec contains the correct list of nodes
|
||||||
|
@ -329,12 +329,17 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
let mut node_id_vec = self.node_id_vec.clone();
|
let mut node_id_vec = self.node_id_vec.clone();
|
||||||
node_id_vec.sort();
|
node_id_vec.sort();
|
||||||
if expected_nodes != node_id_vec {
|
if expected_nodes != node_id_vec {
|
||||||
return false;
|
return Err(format!("node_id_vec does not contain the correct set of nodes\nnode_id_vec: {:?}\nexpected: {:?}", node_id_vec, expected_nodes));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that the assignation data has the correct length
|
// Check that the assignation data has the correct length
|
||||||
if self.ring_assignation_data.len() != (1 << PARTITION_BITS) * self.replication_factor {
|
let expected_assignation_data_len = (1 << PARTITION_BITS) * self.replication_factor;
|
||||||
return false;
|
if self.ring_assignation_data.len() != expected_assignation_data_len {
|
||||||
|
return Err(format!(
|
||||||
|
"ring_assignation_data has incorrect length {} instead of {}",
|
||||||
|
self.ring_assignation_data.len(),
|
||||||
|
expected_assignation_data_len
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that the assigned nodes are correct identifiers
|
// Check that the assigned nodes are correct identifiers
|
||||||
|
@ -342,12 +347,15 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
// and that role is not the role of a gateway nodes
|
// and that role is not the role of a gateway nodes
|
||||||
for x in self.ring_assignation_data.iter() {
|
for x in self.ring_assignation_data.iter() {
|
||||||
if *x as usize >= self.node_id_vec.len() {
|
if *x as usize >= self.node_id_vec.len() {
|
||||||
return false;
|
return Err(format!(
|
||||||
|
"ring_assignation_data contains invalid node id {}",
|
||||||
|
*x
|
||||||
|
));
|
||||||
}
|
}
|
||||||
let node = self.node_id_vec[*x as usize];
|
let node = self.node_id_vec[*x as usize];
|
||||||
match self.roles.get(&node) {
|
match self.roles.get(&node) {
|
||||||
Some(NodeRoleV(Some(x))) if x.capacity.is_some() => (),
|
Some(NodeRoleV(Some(x))) if x.capacity.is_some() => (),
|
||||||
_ => return false,
|
_ => return Err("ring_assignation_data contains id of a gateway node".into()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -357,7 +365,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
let mut nodes_of_p = self.ring_assignation_data[rf * p..rf * (p + 1)].to_vec();
|
let mut nodes_of_p = self.ring_assignation_data[rf * p..rf * (p + 1)].to_vec();
|
||||||
nodes_of_p.sort();
|
nodes_of_p.sort();
|
||||||
if nodes_of_p.iter().unique().count() != rf {
|
if nodes_of_p.iter().unique().count() != rf {
|
||||||
return false;
|
return Err(format!("partition does not contain {} unique node ids", rf));
|
||||||
}
|
}
|
||||||
// Check that every partition is spread over at least zone_redundancy zones.
|
// Check that every partition is spread over at least zone_redundancy zones.
|
||||||
let mut zones_of_p = nodes_of_p
|
let mut zones_of_p = nodes_of_p
|
||||||
|
@ -370,7 +378,10 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
zones_of_p.sort();
|
zones_of_p.sort();
|
||||||
let redundancy = self.parameters.zone_redundancy;
|
let redundancy = self.parameters.zone_redundancy;
|
||||||
if zones_of_p.iter().unique().count() < redundancy {
|
if zones_of_p.iter().unique().count() < redundancy {
|
||||||
return false;
|
return Err(format!(
|
||||||
|
"nodes of partition are in less than {} distinct zones",
|
||||||
|
redundancy
|
||||||
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -382,8 +393,14 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
for (n, usage) in node_usage.iter().enumerate() {
|
for (n, usage) in node_usage.iter().enumerate() {
|
||||||
if *usage > 0 {
|
if *usage > 0 {
|
||||||
let uuid = self.node_id_vec[n];
|
let uuid = self.node_id_vec[n];
|
||||||
if usage * self.partition_size > self.get_node_capacity(&uuid).unwrap() {
|
let partusage = usage * self.partition_size;
|
||||||
return false;
|
let nodecap = self.get_node_capacity(&uuid).unwrap();
|
||||||
|
if partusage > nodecap {
|
||||||
|
return Err(format!(
|
||||||
|
"node usage ({}) is bigger than node capacity ({})",
|
||||||
|
usage * self.partition_size,
|
||||||
|
nodecap
|
||||||
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -393,12 +410,17 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
let cl2 = self.clone();
|
let cl2 = self.clone();
|
||||||
let (_, zone_to_id) = cl2.generate_nongateway_zone_ids().unwrap();
|
let (_, zone_to_id) = cl2.generate_nongateway_zone_ids().unwrap();
|
||||||
match cl2.compute_optimal_partition_size(&zone_to_id) {
|
match cl2.compute_optimal_partition_size(&zone_to_id) {
|
||||||
Ok(s) if s != self.partition_size => return false,
|
Ok(s) if s != self.partition_size => {
|
||||||
Err(_) => return false,
|
return Err(format!(
|
||||||
|
"partition_size ({}) is different than optimal value ({})",
|
||||||
|
self.partition_size, s
|
||||||
|
))
|
||||||
|
}
|
||||||
|
Err(e) => return Err(format!("could not calculate optimal partition size: {}", e)),
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
true
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -493,9 +515,9 @@ impl ClusterLayout {
|
||||||
// We update the layout structure
|
// We update the layout structure
|
||||||
self.update_ring_from_flow(id_to_zone.len(), &gflow)?;
|
self.update_ring_from_flow(id_to_zone.len(), &gflow)?;
|
||||||
|
|
||||||
if !self.check() {
|
if let Err(e) = self.check() {
|
||||||
return Err(Error::Message(
|
return Err(Error::Message(
|
||||||
"Critical error: The computed layout happens to be incorrect".into(),
|
format!("Layout check returned an error: {}\nOriginal result of computation: <<<<\n{}\n>>>>", e, msg.join("\n"))
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1062,9 +1084,9 @@ mod tests {
|
||||||
);
|
);
|
||||||
cl.staging_roles.merge(&update);
|
cl.staging_roles.merge(&update);
|
||||||
}
|
}
|
||||||
cl.staging_hash = cl.calculate_staging_hash();
|
|
||||||
cl.staging_parameters
|
cl.staging_parameters
|
||||||
.update(LayoutParameters { zone_redundancy });
|
.update(LayoutParameters { zone_redundancy });
|
||||||
|
cl.staging_hash = cl.calculate_staging_hash();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -1081,7 +1103,7 @@ mod tests {
|
||||||
let v = cl.version;
|
let v = cl.version;
|
||||||
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
|
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
|
||||||
show_msg(&msg);
|
show_msg(&msg);
|
||||||
assert!(cl.check());
|
assert_eq!(cl.check(), Ok(()));
|
||||||
assert!(matches!(check_against_naive(&cl), Ok(true)));
|
assert!(matches!(check_against_naive(&cl), Ok(true)));
|
||||||
|
|
||||||
node_id_vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
|
node_id_vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
|
||||||
|
@ -1094,7 +1116,7 @@ mod tests {
|
||||||
let v = cl.version;
|
let v = cl.version;
|
||||||
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
|
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
|
||||||
show_msg(&msg);
|
show_msg(&msg);
|
||||||
assert!(cl.check());
|
assert_eq!(cl.check(), Ok(()));
|
||||||
assert!(matches!(check_against_naive(&cl), Ok(true)));
|
assert!(matches!(check_against_naive(&cl), Ok(true)));
|
||||||
|
|
||||||
node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000];
|
node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000];
|
||||||
|
@ -1102,7 +1124,7 @@ mod tests {
|
||||||
let v = cl.version;
|
let v = cl.version;
|
||||||
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
|
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
|
||||||
show_msg(&msg);
|
show_msg(&msg);
|
||||||
assert!(cl.check());
|
assert_eq!(cl.check(), Ok(()));
|
||||||
assert!(matches!(check_against_naive(&cl), Ok(true)));
|
assert!(matches!(check_against_naive(&cl), Ok(true)));
|
||||||
|
|
||||||
node_capacity_vec = vec![
|
node_capacity_vec = vec![
|
||||||
|
@ -1112,7 +1134,7 @@ mod tests {
|
||||||
let v = cl.version;
|
let v = cl.version;
|
||||||
let (cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
|
let (cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
|
||||||
show_msg(&msg);
|
show_msg(&msg);
|
||||||
assert!(cl.check());
|
assert_eq!(cl.check(), Ok(()));
|
||||||
assert!(matches!(check_against_naive(&cl), Ok(true)));
|
assert!(matches!(check_against_naive(&cl), Ok(true)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -565,9 +565,9 @@ impl System {
|
||||||
let update_ring = self.update_ring.lock().await;
|
let update_ring = self.update_ring.lock().await;
|
||||||
let mut layout: ClusterLayout = self.ring.borrow().layout.clone();
|
let mut layout: ClusterLayout = self.ring.borrow().layout.clone();
|
||||||
|
|
||||||
let prev_layout_check = layout.check();
|
let prev_layout_check = layout.check().is_ok();
|
||||||
if layout.merge(adv) {
|
if layout.merge(adv) {
|
||||||
if prev_layout_check && !layout.check() {
|
if prev_layout_check && !layout.check().is_ok() {
|
||||||
error!("New cluster layout is invalid, discarding.");
|
error!("New cluster layout is invalid, discarding.");
|
||||||
return Err(Error::Message(
|
return Err(Error::Message(
|
||||||
"New cluster layout is invalid, discarding.".into(),
|
"New cluster layout is invalid, discarding.".into(),
|
||||||
|
@ -620,7 +620,7 @@ impl System {
|
||||||
|
|
||||||
async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
|
async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
|
||||||
while !*stop_signal.borrow() {
|
while !*stop_signal.borrow() {
|
||||||
let not_configured = !self.ring.borrow().layout.check();
|
let not_configured = !self.ring.borrow().layout.check().is_ok();
|
||||||
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
|
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
|
||||||
let expected_n_nodes = self.ring.borrow().layout.num_nodes();
|
let expected_n_nodes = self.ring.borrow().layout.num_nodes();
|
||||||
let bad_peers = self
|
let bad_peers = self
|
||||||
|
|
Loading…
Reference in a new issue