more complete admin API #298

Merged
lx merged 48 commits from admin-api into main 2022-05-24 10:16:40 +00:00
5 changed files with 152 additions and 49 deletions
Showing only changes of commit 7a19daafbd - Show all commits

View file

@ -3,7 +3,7 @@ use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use futures::future::Future; use futures::future::Future;
use http::header::CONTENT_TYPE; use http::header::{CONTENT_TYPE, ALLOW, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN};
use hyper::{Body, Request, Response}; use hyper::{Body, Request, Response};
use opentelemetry::trace::{SpanRef, Tracer}; use opentelemetry::trace::{SpanRef, Tracer};
@ -17,6 +17,7 @@ use crate::error::*;
use crate::generic_server::*; use crate::generic_server::*;
use crate::admin::router::{Authorization, Endpoint}; use crate::admin::router::{Authorization, Endpoint};
use crate::admin::cluster::*;
pub struct AdminApiServer { pub struct AdminApiServer {
garage: Arc<Garage>, garage: Arc<Garage>,
@ -56,6 +57,15 @@ impl AdminApiServer {
} }
} }
fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> {
Ok(Response::builder()
.status(204)
.header(ALLOW, "OPTIONS, GET, POST")
.header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, GET, POST")
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
.body(Body::empty())?)
}
fn handle_metrics(&self) -> Result<Response<Body>, Error> { fn handle_metrics(&self) -> Result<Response<Body>, Error> {
let mut buffer = vec![]; let mut buffer = vec![];
let encoder = TextEncoder::new(); let encoder = TextEncoder::new();
@ -110,7 +120,9 @@ impl ApiHandler for AdminApiServer {
} }
match endpoint { match endpoint {
Endpoint::Options => self.handle_options(&req),
Endpoint::Metrics => self.handle_metrics(), Endpoint::Metrics => self.handle_metrics(),
Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await,
_ => Err(Error::NotImplemented(format!( _ => Err(Error::NotImplemented(format!(
"Admin endpoint {} not implemented yet", "Admin endpoint {} not implemented yet",
endpoint.name() endpoint.name()

69
src/api/admin/cluster.rs Normal file
View file

@ -0,0 +1,69 @@
use std::sync::Arc;
use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use serde::{Serialize};
use hyper::{Body, Request, Response, StatusCode};
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_rpc::layout::*;
use garage_rpc::system::*;
use garage_model::garage::Garage;
use crate::error::*;
pub async fn handle_get_cluster_status(
garage: &Arc<Garage>,
) -> Result<Response<Body>, Error> {
let layout = garage.system.get_cluster_layout();
let res = GetClusterStatusResponse {
known_nodes: garage.system.get_known_nodes()
.into_iter()
.map(|i| (hex::encode(i.id), KnownNodeResp {
addr: i.addr,
is_up: i.is_up,
last_seen_secs_ago: i.last_seen_secs_ago,
hostname: i.status.hostname,
}))
.collect(),
roles: layout.roles.items()
.iter()
.filter(|(_, _, v)| v.0.is_some())
.map(|(k, _, v)| (hex::encode(k), v.0.clone()))
.collect(),
staged_role_changes: layout.staging.items()
.iter()
.filter(|(k, _, v)| layout.roles.get(k) != Some(v))
.map(|(k, _, v)| (hex::encode(k), v.0.clone()))
.collect(),
};
let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(resp_json))?)
}
#[derive(Serialize)]
struct GetClusterStatusResponse {
#[serde(rename = "knownNodes")]
known_nodes: HashMap<String, KnownNodeResp>,
roles: HashMap<String, Option<NodeRole>>,
#[serde(rename = "stagedRoleChanges")]
staged_role_changes: HashMap<String, Option<NodeRole>>,
}
#[derive(Serialize)]
struct KnownNodeResp {
addr: SocketAddr,
is_up: bool,
last_seen_secs_ago: Option<u64>,
hostname: String,
}

View file

@ -1,2 +1,4 @@
pub mod api_server; pub mod api_server;
mod router; mod router;
mod cluster;

View file

@ -14,8 +14,8 @@ router_match! {@func
/// List of all Admin API endpoints. /// List of all Admin API endpoints.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum Endpoint { pub enum Endpoint {
Metrics,
Options, Options,
Metrics,
GetClusterStatus, GetClusterStatus,
GetClusterLayout, GetClusterLayout,
UpdateClusterLayout, UpdateClusterLayout,

View file

@ -312,6 +312,67 @@ impl System {
); );
} }
// ---- Administrative operations (directly available and
// also available through RPC) ----
pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> {
let node_status = self.node_status.read().unwrap();
let known_nodes = self
.fullmesh
.get_peer_list()
.iter()
.map(|n| KnownNodeInfo {
id: n.id.into(),
addr: n.addr,
is_up: n.is_up(),
last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()),
status: node_status
.get(&n.id.into())
.cloned()
.map(|(_, st)| st)
.unwrap_or(NodeStatus {
hostname: "?".to_string(),
replication_factor: 0,
cluster_layout_version: 0,
cluster_layout_staging_hash: Hash::from([0u8; 32]),
}),
})
.collect::<Vec<_>>();
known_nodes
}
pub fn get_cluster_layout(&self) -> ClusterLayout {
self.ring.borrow().layout.clone()
}
pub async fn connect(&self, node: &str) -> Result<(), Error> {
let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| {
Error::Message(format!(
"Unable to parse or resolve node specification: {}",
node
))
})?;
let mut errors = vec![];
for ip in addrs.iter() {
match self
.netapp
.clone()
.try_connect(*ip, pubkey)
.await
.err_context(CONNECT_ERROR_MESSAGE)
{
Ok(()) => return Ok(()),
Err(e) => {
errors.push((*ip, e));
}
}
}
return Err(Error::Message(format!(
"Could not connect to specified peers. Errors: {:?}",
errors
)));
}
// ---- INTERNALS ---- // ---- INTERNALS ----
async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> { async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> {
@ -384,32 +445,11 @@ impl System {
self.local_status.swap(Arc::new(new_si)); self.local_status.swap(Arc::new(new_si));
} }
// --- RPC HANDLERS ---
async fn handle_connect(&self, node: &str) -> Result<SystemRpc, Error> { async fn handle_connect(&self, node: &str) -> Result<SystemRpc, Error> {
let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| { self.connect(node).await?;
Error::Message(format!( Ok(SystemRpc::Ok)
"Unable to parse or resolve node specification: {}",
node
))
})?;
let mut errors = vec![];
for ip in addrs.iter() {
match self
.netapp
.clone()
.try_connect(*ip, pubkey)
.await
.err_context(CONNECT_ERROR_MESSAGE)
{
Ok(()) => return Ok(SystemRpc::Ok),
Err(e) => {
errors.push((*ip, e));
}
}
}
return Err(Error::Message(format!(
"Could not connect to specified peers. Errors: {:?}",
errors
)));
} }
fn handle_pull_cluster_layout(&self) -> SystemRpc { fn handle_pull_cluster_layout(&self) -> SystemRpc {
@ -418,31 +458,11 @@ impl System {
} }
fn handle_get_known_nodes(&self) -> SystemRpc { fn handle_get_known_nodes(&self) -> SystemRpc {
let node_status = self.node_status.read().unwrap(); let known_nodes = self.get_known_nodes();
let known_nodes = self
.fullmesh
.get_peer_list()
.iter()
.map(|n| KnownNodeInfo {
id: n.id.into(),
addr: n.addr,
is_up: n.is_up(),
last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()),
status: node_status
.get(&n.id.into())
.cloned()
.map(|(_, st)| st)
.unwrap_or(NodeStatus {
hostname: "?".to_string(),
replication_factor: 0,
cluster_layout_version: 0,
cluster_layout_staging_hash: Hash::from([0u8; 32]),
}),
})
.collect::<Vec<_>>();
SystemRpc::ReturnKnownNodes(known_nodes) SystemRpc::ReturnKnownNodes(known_nodes)
} }
async fn handle_advertise_status( async fn handle_advertise_status(
self: &Arc<Self>, self: &Arc<Self>,
from: Uuid, from: Uuid,