Refactor code for apply/revert, implement Update/Apply/RevertLayout
Some checks failed
continuous-integration/drone/push Build is failing
continuous-integration/drone/pr Build is failing

This commit is contained in:
Alex 2022-05-06 17:14:09 +02:00
parent 01c4876fb4
commit dd54d0b2b1
Signed by: lx
GPG key ID: 0E496D15096376BE
13 changed files with 179 additions and 63 deletions

2
Cargo.lock generated
View file

@ -845,7 +845,6 @@ dependencies = [
"garage_table 0.7.0", "garage_table 0.7.0",
"garage_util 0.7.0", "garage_util 0.7.0",
"garage_web", "garage_web",
"git-version",
"hex", "hex",
"hmac", "hmac",
"http", "http",
@ -1031,6 +1030,7 @@ dependencies = [
"futures-util", "futures-util",
"garage_util 0.7.0", "garage_util 0.7.0",
"gethostname", "gethostname",
"git-version",
"hex", "hex",
"hyper", "hyper",
"k8s-openapi", "k8s-openapi",

View file

@ -19,6 +19,7 @@ Returns internal Garage metrics in Prometheus format.
Returns the cluster's current status in JSON, including: Returns the cluster's current status in JSON, including:
- ID of the node being queried and its version of the Garage daemon
- Live nodes - Live nodes
- Currently configured cluster layout - Currently configured cluster layout
- Staged changes to the cluster layout - Staged changes to the cluster layout
@ -27,6 +28,8 @@ Example response body:
```json ```json
{ {
"node": "ec79480e0ce52ae26fd00c9da684e4fa56658d9c64cdcecb094e936de0bfe71f",
"garage_version": "git:v0.8.0",
"knownNodes": { "knownNodes": {
"ec79480e0ce52ae26fd00c9da684e4fa56658d9c64cdcecb094e936de0bfe71f": { "ec79480e0ce52ae26fd00c9da684e4fa56658d9c64cdcecb094e936de0bfe71f": {
"addr": "10.0.0.11:3901", "addr": "10.0.0.11:3901",

View file

@ -126,10 +126,15 @@ impl ApiHandler for AdminApiServer {
Endpoint::Metrics => self.handle_metrics(), Endpoint::Metrics => self.handle_metrics(),
Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await, Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await,
Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await, Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await,
Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await,
Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await,
Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage, req).await,
/*
_ => Err(Error::NotImplemented(format!( _ => Err(Error::NotImplemented(format!(
"Admin endpoint {} not implemented yet", "Admin endpoint {} not implemented yet",
endpoint.name() endpoint.name()
))), ))),
*/
} }
} }
} }

View file

@ -2,19 +2,24 @@ use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use serde::Serialize; use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use hyper::{Body, Response, StatusCode}; use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_rpc::layout::*; use garage_rpc::layout::*;
use garage_util::error::Error as GarageError;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use crate::error::*; use crate::error::*;
use crate::helpers::*;
pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
let res = GetClusterStatusResponse { let res = GetClusterStatusResponse {
node: hex::encode(garage.system.id),
garage_version: garage.system.garage_version(),
known_nodes: garage known_nodes: garage
.system .system
.get_known_nodes() .get_known_nodes()
@ -72,6 +77,8 @@ fn get_cluster_layout(garage: &Arc<Garage>) -> GetClusterLayoutResponse {
#[derive(Serialize)] #[derive(Serialize)]
struct GetClusterStatusResponse { struct GetClusterStatusResponse {
node: String,
garage_version: &'static str,
#[serde(rename = "knownNodes")] #[serde(rename = "knownNodes")]
known_nodes: HashMap<String, KnownNodeResp>, known_nodes: HashMap<String, KnownNodeResp>,
layout: GetClusterLayoutResponse, layout: GetClusterLayoutResponse,
@ -92,3 +99,67 @@ struct KnownNodeResp {
last_seen_secs_ago: Option<u64>, last_seen_secs_ago: Option<u64>,
hostname: String, hostname: String,
} }
pub async fn handle_update_cluster_layout(
garage: &Arc<Garage>,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?;
let mut layout = garage.system.get_cluster_layout();
let mut roles = layout.roles.clone();
roles.merge(&layout.staging);
for (node, role) in updates {
let node = hex::decode(node).ok_or_bad_request("Invalid node identifier")?;
let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?;
layout
.staging
.merge(&roles.update_mutator(node, NodeRoleV(role)));
}
garage.system.update_cluster_layout(&layout).await?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::empty())?)
}
pub async fn handle_apply_cluster_layout(
garage: &Arc<Garage>,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
let layout = garage.system.get_cluster_layout();
let layout = layout.apply_staged_changes(Some(param.version))?;
garage.system.update_cluster_layout(&layout).await?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::empty())?)
}
pub async fn handle_revert_cluster_layout(
garage: &Arc<Garage>,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
let layout = garage.system.get_cluster_layout();
let layout = layout.revert_staged_changes(Some(param.version))?;
garage.system.update_cluster_layout(&layout).await?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::empty())?)
}
type UpdateClusterLayoutRequest = HashMap<String, Option<NodeRole>>;
#[derive(Deserialize)]
struct ApplyRevertLayoutRequest {
version: u64,
}

View file

@ -1,4 +1,6 @@
use hyper::{Body, Request};
use idna::domain_to_unicode; use idna::domain_to_unicode;
use serde::Deserialize;
use garage_util::data::*; use garage_util::data::*;
@ -163,6 +165,12 @@ pub fn key_after_prefix(pfx: &str) -> Option<String> {
None None
} }
pub async fn parse_json_body<T: for<'de> Deserialize<'de>>(req: Request<Body>) -> Result<T, Error> {
let body = hyper::body::to_bytes(req.into_body()).await?;
let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
Ok(resp)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View file

@ -13,6 +13,7 @@ use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*; use garage_model::k2v::item_table::*;
use crate::error::*; use crate::error::*;
use crate::helpers::*;
use crate::k2v::range::read_range; use crate::k2v::range::read_range;
pub async fn handle_insert_batch( pub async fn handle_insert_batch(
@ -20,9 +21,7 @@ pub async fn handle_insert_batch(
bucket_id: Uuid, bucket_id: Uuid,
req: Request<Body>, req: Request<Body>,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let body = hyper::body::to_bytes(req.into_body()).await?; let items = parse_json_body::<Vec<InsertBatchItem>>(req).await?;
let items: Vec<InsertBatchItem> =
serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
let mut items2 = vec![]; let mut items2 = vec![];
for it in items { for it in items {
@ -52,9 +51,7 @@ pub async fn handle_read_batch(
bucket_id: Uuid, bucket_id: Uuid,
req: Request<Body>, req: Request<Body>,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let body = hyper::body::to_bytes(req.into_body()).await?; let queries = parse_json_body::<Vec<ReadBatchQuery>>(req).await?;
let queries: Vec<ReadBatchQuery> =
serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
let resp_results = futures::future::join_all( let resp_results = futures::future::join_all(
queries queries
@ -149,9 +146,7 @@ pub async fn handle_delete_batch(
bucket_id: Uuid, bucket_id: Uuid,
req: Request<Body>, req: Request<Body>,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let body = hyper::body::to_bytes(req.into_body()).await?; let queries = parse_json_body::<Vec<DeleteBatchQuery>>(req).await?;
let queries: Vec<DeleteBatchQuery> =
serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
let resp_results = futures::future::join_all( let resp_results = futures::future::join_all(
queries queries

View file

@ -29,7 +29,6 @@ garage_util = { version = "0.7.0", path = "../util" }
garage_web = { version = "0.7.0", path = "../web" } garage_web = { version = "0.7.0", path = "../web" }
bytes = "1.0" bytes = "1.0"
git-version = "0.3.4"
hex = "0.4" hex = "0.4"
tracing = { version = "0.1.30", features = ["log-always"] } tracing = { version = "0.1.30", features = ["log-always"] }
pretty_env_logger = "0.4" pretty_env_logger = "0.4"

View file

@ -696,11 +696,7 @@ impl AdminRpcHandler {
writeln!( writeln!(
&mut ret, &mut ret,
"\nGarage version: {}", "\nGarage version: {}",
option_env!("GIT_VERSION").unwrap_or(git_version::git_version!( self.garage.system.garage_version(),
prefix = "git:",
cargo_prefix = "cargo:",
fallback = "unknown"
))
) )
.unwrap(); .unwrap();

View file

@ -1,5 +1,4 @@
use garage_util::crdt::Crdt; use garage_util::crdt::Crdt;
use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_rpc::layout::*; use garage_rpc::layout::*;
@ -211,31 +210,9 @@ pub async fn cmd_apply_layout(
rpc_host: NodeID, rpc_host: NodeID,
apply_opt: ApplyLayoutOpt, apply_opt: ApplyLayoutOpt,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut layout = fetch_layout(rpc_cli, rpc_host).await?; let layout = fetch_layout(rpc_cli, rpc_host).await?;
match apply_opt.version { let layout = layout.apply_staged_changes(apply_opt.version)?;
None => {
println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout.");
println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes.");
return Err(Error::Message("--version flag is missing".into()));
}
Some(v) => {
if v != layout.version + 1 {
return Err(Error::Message("Invalid value of --version flag".into()));
}
}
}
layout.roles.merge(&layout.staging);
if !layout.calculate_partition_assignation() {
return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into()));
}
layout.staging.clear();
layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]);
layout.version += 1;
send_layout(rpc_cli, rpc_host, layout).await?; send_layout(rpc_cli, rpc_host, layout).await?;
@ -250,25 +227,9 @@ pub async fn cmd_revert_layout(
rpc_host: NodeID, rpc_host: NodeID,
revert_opt: RevertLayoutOpt, revert_opt: RevertLayoutOpt,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut layout = fetch_layout(rpc_cli, rpc_host).await?; let layout = fetch_layout(rpc_cli, rpc_host).await?;
match revert_opt.version { let layout = layout.revert_staged_changes(revert_opt.version)?;
None => {
println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout.");
println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes.");
return Err(Error::Message("--version flag is missing".into()));
}
Some(v) => {
if v != layout.version + 1 {
return Err(Error::Message("Invalid value of --version flag".into()));
}
}
}
layout.staging.clear();
layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]);
layout.version += 1;
send_layout(rpc_cli, rpc_host, layout).await?; send_layout(rpc_cli, rpc_host, layout).await?;

View file

@ -19,6 +19,7 @@ garage_util = { version = "0.7.0", path = "../util" }
arc-swap = "1.0" arc-swap = "1.0"
bytes = "1.0" bytes = "1.0"
gethostname = "0.2" gethostname = "0.2"
git-version = "0.3.4"
hex = "0.4" hex = "0.4"
tracing = "0.1.30" tracing = "0.1.30"
rand = "0.8" rand = "0.8"

View file

@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use garage_util::crdt::{AutoCrdt, Crdt, LwwMap}; use garage_util::crdt::{AutoCrdt, Crdt, LwwMap};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*;
use crate::ring::*; use crate::ring::*;
@ -100,6 +101,61 @@ impl ClusterLayout {
} }
} }
pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
match version {
None => {
let error = r#"
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
"#;
return Err(Error::Message(error.into()));
}
Some(v) => {
if v != self.version + 1 {
return Err(Error::Message("Invalid new layout version".into()));
}
}
}
self.roles.merge(&self.staging);
self.roles.retain(|(_, _, v)| v.0.is_some());
if !self.calculate_partition_assignation() {
return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into()));
}
self.staging.clear();
self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
self.version += 1;
Ok(self)
}
pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
match version {
None => {
let error = r#"
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
"#;
return Err(Error::Message(error.into()));
}
Some(v) => {
if v != self.version + 1 {
return Err(Error::Message("Invalid new layout version".into()));
}
}
}
self.staging.clear();
self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
self.version += 1;
Ok(self)
}
/// Returns a list of IDs of nodes that currently have /// Returns a list of IDs of nodes that currently have
/// a role in the cluster /// a role in the cluster
pub fn node_ids(&self) -> &[Uuid] { pub fn node_ids(&self) -> &[Uuid] {

View file

@ -315,6 +315,14 @@ impl System {
// ---- Administrative operations (directly available and // ---- Administrative operations (directly available and
// also available through RPC) ---- // also available through RPC) ----
pub fn garage_version(&self) -> &'static str {
option_env!("GIT_VERSION").unwrap_or(git_version::git_version!(
prefix = "git:",
cargo_prefix = "cargo:",
fallback = "unknown"
))
}
pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> { pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> {
let node_status = self.node_status.read().unwrap(); let node_status = self.node_status.read().unwrap();
let known_nodes = self let known_nodes = self
@ -345,6 +353,14 @@ impl System {
self.ring.borrow().layout.clone() self.ring.borrow().layout.clone()
} }
pub async fn update_cluster_layout(
self: &Arc<Self>,
layout: &ClusterLayout,
) -> Result<(), Error> {
self.handle_advertise_cluster_layout(layout).await?;
Ok(())
}
pub async fn connect(&self, node: &str) -> Result<(), Error> { pub async fn connect(&self, node: &str) -> Result<(), Error> {
let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| { let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| {
Error::Message(format!( Error::Message(format!(
@ -495,7 +511,7 @@ impl System {
} }
async fn handle_advertise_cluster_layout( async fn handle_advertise_cluster_layout(
self: Arc<Self>, self: &Arc<Self>,
adv: &ClusterLayout, adv: &ClusterLayout,
) -> Result<SystemRpc, Error> { ) -> Result<SystemRpc, Error> {
let update_ring = self.update_ring.lock().await; let update_ring = self.update_ring.lock().await;

View file

@ -140,6 +140,11 @@ where
self.vals.clear(); self.vals.clear();
} }
/// Retain only values that match a certain predicate
pub fn retain(&mut self, pred: impl FnMut(&(K, u64, V)) -> bool) {
self.vals.retain(pred);
}
/// Get a reference to the value assigned to a key /// Get a reference to the value assigned to a key
pub fn get(&self, k: &K) -> Option<&V> { pub fn get(&self, k: &K) -> Option<&V> {
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) { match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) {