Admin API refactoring: convert existing commands to API requests (step 3) #945
13 changed files with 325 additions and 189 deletions
|
@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize};
|
||||||
use garage_rpc::*;
|
use garage_rpc::*;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
use garage_util::error::Error as GarageError;
|
||||||
|
|
||||||
use garage_api_common::common_error::CommonErrorDerivative;
|
use garage_api_common::common_error::CommonErrorDerivative;
|
||||||
use garage_api_common::helpers::is_default;
|
use garage_api_common::helpers::is_default;
|
||||||
|
@ -78,11 +79,46 @@ admin_endpoints![
|
||||||
RemoveBucketAlias,
|
RemoveBucketAlias,
|
||||||
|
|
||||||
// Worker operations
|
// Worker operations
|
||||||
|
ListWorkers,
|
||||||
|
GetWorkerInfo,
|
||||||
GetWorkerVariable,
|
GetWorkerVariable,
|
||||||
SetWorkerVariable,
|
SetWorkerVariable,
|
||||||
];
|
];
|
||||||
|
|
||||||
local_admin_endpoints![GetWorkerVariable, SetWorkerVariable,];
|
local_admin_endpoints![
|
||||||
|
// Background workers
|
||||||
|
ListWorkers,
|
||||||
|
GetWorkerInfo,
|
||||||
|
GetWorkerVariable,
|
||||||
|
SetWorkerVariable,
|
||||||
|
];
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct MultiRequest<RB> {
|
||||||
|
pub node: String,
|
||||||
|
pub body: RB,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct MultiResponse<RB> {
|
||||||
|
pub success: HashMap<String, RB>,
|
||||||
|
pub error: HashMap<String, String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<RB> MultiResponse<RB> {
|
||||||
|
pub fn into_single_response(self) -> Result<RB, GarageError> {
|
||||||
|
if let Some((_, e)) = self.error.into_iter().next() {
|
||||||
|
return Err(GarageError::Message(e));
|
||||||
|
}
|
||||||
|
if self.success.len() != 1 {
|
||||||
|
return Err(GarageError::Message(format!(
|
||||||
|
"{} responses returned, expected 1",
|
||||||
|
self.success.len()
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
Ok(self.success.into_iter().next().unwrap().1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// **********************************************
|
// **********************************************
|
||||||
// Special endpoints
|
// Special endpoints
|
||||||
|
@ -596,6 +632,61 @@ pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse);
|
||||||
// Worker operations
|
// Worker operations
|
||||||
// **********************************************
|
// **********************************************
|
||||||
|
|
||||||
|
// ---- GetWorkerList ----
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||||
|
pub struct LocalListWorkersRequest {
|
||||||
|
#[serde(default)]
|
||||||
|
pub busy_only: bool,
|
||||||
|
#[serde(default)]
|
||||||
|
pub error_only: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct LocalListWorkersResponse(pub Vec<WorkerInfoResp>);
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct WorkerInfoResp {
|
||||||
|
pub id: u64,
|
||||||
|
pub name: String,
|
||||||
|
pub state: WorkerStateResp,
|
||||||
|
pub errors: u64,
|
||||||
|
pub consecutive_errors: u64,
|
||||||
|
pub last_error: Option<WorkerLastError>,
|
||||||
|
pub tranquility: Option<u32>,
|
||||||
|
pub progress: Option<String>,
|
||||||
|
pub queue_length: Option<u64>,
|
||||||
|
pub persistent_errors: Option<u64>,
|
||||||
|
pub freeform: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub enum WorkerStateResp {
|
||||||
|
Busy,
|
||||||
|
Throttled { duration_secs: f32 },
|
||||||
|
Idle,
|
||||||
|
Done,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct WorkerLastError {
|
||||||
|
pub message: String,
|
||||||
|
pub secs_ago: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- GetWorkerList ----
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct LocalGetWorkerInfoRequest {
|
||||||
|
pub id: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct LocalGetWorkerInfoResponse(pub WorkerInfoResp);
|
||||||
|
|
||||||
// ---- GetWorkerVariable ----
|
// ---- GetWorkerVariable ----
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
|
|
@ -25,6 +25,10 @@ pub enum Error {
|
||||||
#[error(display = "Access key not found: {}", _0)]
|
#[error(display = "Access key not found: {}", _0)]
|
||||||
NoSuchAccessKey(String),
|
NoSuchAccessKey(String),
|
||||||
|
|
||||||
|
/// The requested worker does not exist
|
||||||
|
#[error(display = "Worker not found: {}", _0)]
|
||||||
|
NoSuchWorker(u64),
|
||||||
|
|
||||||
/// In Import key, the key already exists
|
/// In Import key, the key already exists
|
||||||
#[error(
|
#[error(
|
||||||
display = "Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.",
|
display = "Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.",
|
||||||
|
@ -53,6 +57,7 @@ impl Error {
|
||||||
match self {
|
match self {
|
||||||
Error::Common(c) => c.aws_code(),
|
Error::Common(c) => c.aws_code(),
|
||||||
Error::NoSuchAccessKey(_) => "NoSuchAccessKey",
|
Error::NoSuchAccessKey(_) => "NoSuchAccessKey",
|
||||||
|
Error::NoSuchWorker(_) => "NoSuchWorker",
|
||||||
Error::KeyAlreadyExists(_) => "KeyAlreadyExists",
|
Error::KeyAlreadyExists(_) => "KeyAlreadyExists",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -63,7 +68,7 @@ impl ApiError for Error {
|
||||||
fn http_status_code(&self) -> StatusCode {
|
fn http_status_code(&self) -> StatusCode {
|
||||||
match self {
|
match self {
|
||||||
Error::Common(c) => c.http_status_code(),
|
Error::Common(c) => c.http_status_code(),
|
||||||
Error::NoSuchAccessKey(_) => StatusCode::NOT_FOUND,
|
Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) => StatusCode::NOT_FOUND,
|
||||||
Error::KeyAlreadyExists(_) => StatusCode::CONFLICT,
|
Error::KeyAlreadyExists(_) => StatusCode::CONFLICT,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -111,19 +111,11 @@ macro_rules! local_admin_endpoints {
|
||||||
}
|
}
|
||||||
|
|
||||||
$(
|
$(
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
pub type [< $endpoint Request >] = MultiRequest< [< Local $endpoint Request >] >;
|
||||||
pub struct [< $endpoint Request >] {
|
|
||||||
pub node: String,
|
|
||||||
pub body: [< Local $endpoint Request >],
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type [< $endpoint RequestBody >] = [< Local $endpoint Request >];
|
pub type [< $endpoint RequestBody >] = [< Local $endpoint Request >];
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
pub type [< $endpoint Response >] = MultiResponse< [< Local $endpoint Response >] >;
|
||||||
pub struct [< $endpoint Response >] {
|
|
||||||
pub success: HashMap<String, [< Local $endpoint Response >] >,
|
|
||||||
pub error: HashMap<String, String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From< [< Local $endpoint Request >] > for LocalAdminApiRequest {
|
impl From< [< Local $endpoint Request >] > for LocalAdminApiRequest {
|
||||||
fn from(req: [< Local $endpoint Request >]) -> LocalAdminApiRequest {
|
fn from(req: [< Local $endpoint Request >]) -> LocalAdminApiRequest {
|
||||||
|
|
|
@ -60,7 +60,10 @@ impl AdminApiRequest {
|
||||||
POST AddBucketAlias (body),
|
POST AddBucketAlias (body),
|
||||||
POST RemoveBucketAlias (body),
|
POST RemoveBucketAlias (body),
|
||||||
// Worker APIs
|
// Worker APIs
|
||||||
|
POST ListWorkers (body_field, query::node),
|
||||||
|
POST GetWorkerInfo (body_field, query::node),
|
||||||
POST GetWorkerVariable (body_field, query::node),
|
POST GetWorkerVariable (body_field, query::node),
|
||||||
|
POST SetWorkerVariable (body_field, query::node),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
if let Some(message) = query.nonempty_message() {
|
if let Some(message) = query.nonempty_message() {
|
||||||
|
|
|
@ -3,12 +3,59 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
use garage_util::background::*;
|
||||||
|
use garage_util::time::now_msec;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
|
||||||
use crate::api::*;
|
use crate::api::*;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::{Admin, RequestHandler};
|
use crate::{Admin, RequestHandler};
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl RequestHandler for LocalListWorkersRequest {
|
||||||
|
type Response = LocalListWorkersResponse;
|
||||||
|
|
||||||
|
async fn handle(
|
||||||
|
self,
|
||||||
|
_garage: &Arc<Garage>,
|
||||||
|
admin: &Admin,
|
||||||
|
) -> Result<LocalListWorkersResponse, Error> {
|
||||||
|
let workers = admin.background.get_worker_info();
|
||||||
|
let info = workers
|
||||||
|
.into_iter()
|
||||||
|
.filter(|(_, w)| {
|
||||||
|
(!self.busy_only
|
||||||
|
|| matches!(w.state, WorkerState::Busy | WorkerState::Throttled(_)))
|
||||||
|
&& (!self.error_only || w.errors > 0)
|
||||||
|
})
|
||||||
|
.map(|(id, w)| worker_info_to_api(id as u64, w))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
Ok(LocalListWorkersResponse(info))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl RequestHandler for LocalGetWorkerInfoRequest {
|
||||||
|
type Response = LocalGetWorkerInfoResponse;
|
||||||
|
|
||||||
|
async fn handle(
|
||||||
|
self,
|
||||||
|
_garage: &Arc<Garage>,
|
||||||
|
admin: &Admin,
|
||||||
|
) -> Result<LocalGetWorkerInfoResponse, Error> {
|
||||||
|
let info = admin
|
||||||
|
.background
|
||||||
|
.get_worker_info()
|
||||||
|
.get(&(self.id as usize))
|
||||||
|
.ok_or(Error::NoSuchWorker(self.id))?
|
||||||
|
.clone();
|
||||||
|
Ok(LocalGetWorkerInfoResponse(worker_info_to_api(
|
||||||
|
self.id, info,
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl RequestHandler for LocalGetWorkerVariableRequest {
|
impl RequestHandler for LocalGetWorkerVariableRequest {
|
||||||
type Response = LocalGetWorkerVariableResponse;
|
type Response = LocalGetWorkerVariableResponse;
|
||||||
|
@ -48,3 +95,30 @@ impl RequestHandler for LocalSetWorkerVariableRequest {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- helper functions ----
|
||||||
|
|
||||||
|
fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp {
|
||||||
|
WorkerInfoResp {
|
||||||
|
id: id,
|
||||||
|
name: info.name,
|
||||||
|
state: match info.state {
|
||||||
|
WorkerState::Busy => WorkerStateResp::Busy,
|
||||||
|
WorkerState::Throttled(t) => WorkerStateResp::Throttled { duration_secs: t },
|
||||||
|
WorkerState::Idle => WorkerStateResp::Idle,
|
||||||
|
WorkerState::Done => WorkerStateResp::Done,
|
||||||
|
},
|
||||||
|
errors: info.errors as u64,
|
||||||
|
consecutive_errors: info.consecutive_errors as u64,
|
||||||
|
last_error: info.last_error.map(|(message, t)| WorkerLastError {
|
||||||
|
message,
|
||||||
|
secs_ago: (std::cmp::max(t, now_msec()) - t) / 1000,
|
||||||
|
}),
|
||||||
|
|
||||||
|
tranquility: info.status.tranquility,
|
||||||
|
progress: info.status.progress,
|
||||||
|
queue_length: info.status.queue_length,
|
||||||
|
persistent_errors: info.status.persistent_errors,
|
||||||
|
freeform: info.status.freeform,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -141,6 +141,9 @@ macro_rules! router_match {
|
||||||
}
|
}
|
||||||
}};
|
}};
|
||||||
|
|
||||||
|
(@@parse_param $query:expr, default, $param:ident) => {{
|
||||||
|
Default::default()
|
||||||
|
}};
|
||||||
(@@parse_param $query:expr, query_opt, $param:ident) => {{
|
(@@parse_param $query:expr, query_opt, $param:ident) => {{
|
||||||
// extract optional query parameter
|
// extract optional query parameter
|
||||||
$query.$param.take().map(|param| param.into_owned())
|
$query.$param.take().map(|param| param.into_owned())
|
||||||
|
|
|
@ -22,7 +22,7 @@ use garage_rpc::*;
|
||||||
use garage_block::manager::BlockResyncErrorInfo;
|
use garage_block::manager::BlockResyncErrorInfo;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
use garage_model::helper::error::{Error, OkOrBadRequest};
|
use garage_model::helper::error::Error;
|
||||||
use garage_model::s3::mpu_table::MultipartUpload;
|
use garage_model::s3::mpu_table::MultipartUpload;
|
||||||
use garage_model::s3::version_table::Version;
|
use garage_model::s3::version_table::Version;
|
||||||
|
|
||||||
|
@ -40,17 +40,11 @@ pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
|
||||||
pub enum AdminRpc {
|
pub enum AdminRpc {
|
||||||
LaunchRepair(RepairOpt),
|
LaunchRepair(RepairOpt),
|
||||||
Stats(StatsOpt),
|
Stats(StatsOpt),
|
||||||
Worker(WorkerOperation),
|
|
||||||
BlockOperation(BlockOperation),
|
BlockOperation(BlockOperation),
|
||||||
MetaOperation(MetaOperation),
|
MetaOperation(MetaOperation),
|
||||||
|
|
||||||
// Replies
|
// Replies
|
||||||
Ok(String),
|
Ok(String),
|
||||||
WorkerList(
|
|
||||||
HashMap<usize, garage_util::background::WorkerInfo>,
|
|
||||||
WorkerListOpt,
|
|
||||||
),
|
|
||||||
WorkerInfo(usize, garage_util::background::WorkerInfo),
|
|
||||||
BlockErrorList(Vec<BlockResyncErrorInfo>),
|
BlockErrorList(Vec<BlockResyncErrorInfo>),
|
||||||
BlockInfo {
|
BlockInfo {
|
||||||
hash: Hash,
|
hash: Hash,
|
||||||
|
@ -340,27 +334,6 @@ impl AdminRpcHandler {
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ================ WORKER COMMANDS ====================
|
|
||||||
|
|
||||||
async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> {
|
|
||||||
match cmd {
|
|
||||||
WorkerOperation::List { opt } => {
|
|
||||||
let workers = self.background.get_worker_info();
|
|
||||||
Ok(AdminRpc::WorkerList(workers, *opt))
|
|
||||||
}
|
|
||||||
WorkerOperation::Info { tid } => {
|
|
||||||
let info = self
|
|
||||||
.background
|
|
||||||
.get_worker_info()
|
|
||||||
.get(tid)
|
|
||||||
.ok_or_bad_request(format!("No worker with TID {}", tid))?
|
|
||||||
.clone();
|
|
||||||
Ok(AdminRpc::WorkerInfo(*tid, info))
|
|
||||||
}
|
|
||||||
_ => unreachable!(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ================ META DB COMMANDS ====================
|
// ================ META DB COMMANDS ====================
|
||||||
|
|
||||||
async fn handle_meta_cmd(self: &Arc<Self>, mo: &MetaOperation) -> Result<AdminRpc, Error> {
|
async fn handle_meta_cmd(self: &Arc<Self>, mo: &MetaOperation) -> Result<AdminRpc, Error> {
|
||||||
|
@ -409,7 +382,6 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
|
||||||
match message {
|
match message {
|
||||||
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
|
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
|
||||||
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
|
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
|
||||||
AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
|
|
||||||
AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
|
AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
|
||||||
AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
|
AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
|
||||||
m => Err(GarageError::unexpected_rpc_message(m).into()),
|
m => Err(GarageError::unexpected_rpc_message(m).into()),
|
||||||
|
|
|
@ -17,12 +17,6 @@ pub async fn cmd_admin(
|
||||||
AdminRpc::Ok(msg) => {
|
AdminRpc::Ok(msg) => {
|
||||||
println!("{}", msg);
|
println!("{}", msg);
|
||||||
}
|
}
|
||||||
AdminRpc::WorkerList(wi, wlo) => {
|
|
||||||
print_worker_list(wi, wlo);
|
|
||||||
}
|
|
||||||
AdminRpc::WorkerInfo(tid, wi) => {
|
|
||||||
print_worker_info(tid, wi);
|
|
||||||
}
|
|
||||||
AdminRpc::BlockErrorList(el) => {
|
AdminRpc::BlockErrorList(el) => {
|
||||||
print_block_error_list(el);
|
print_block_error_list(el);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,6 @@
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use format_table::format_table;
|
use format_table::format_table;
|
||||||
use garage_util::background::*;
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::time::*;
|
use garage_util::time::*;
|
||||||
|
|
||||||
|
@ -11,121 +9,6 @@ use garage_block::manager::BlockResyncErrorInfo;
|
||||||
use garage_model::s3::mpu_table::MultipartUpload;
|
use garage_model::s3::mpu_table::MultipartUpload;
|
||||||
use garage_model::s3::version_table::*;
|
use garage_model::s3::version_table::*;
|
||||||
|
|
||||||
use crate::cli::structs::WorkerListOpt;
|
|
||||||
|
|
||||||
pub fn print_worker_list(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
|
|
||||||
let mut wi = wi.into_iter().collect::<Vec<_>>();
|
|
||||||
wi.sort_by_key(|(tid, info)| {
|
|
||||||
(
|
|
||||||
match info.state {
|
|
||||||
WorkerState::Busy | WorkerState::Throttled(_) => 0,
|
|
||||||
WorkerState::Idle => 1,
|
|
||||||
WorkerState::Done => 2,
|
|
||||||
},
|
|
||||||
*tid,
|
|
||||||
)
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut table = vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()];
|
|
||||||
for (tid, info) in wi.iter() {
|
|
||||||
if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if wlo.errors && info.errors == 0 {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let tf = timeago::Formatter::new();
|
|
||||||
let err_ago = info
|
|
||||||
.last_error
|
|
||||||
.as_ref()
|
|
||||||
.map(|(_, t)| tf.convert(Duration::from_millis(now_msec() - t)))
|
|
||||||
.unwrap_or_default();
|
|
||||||
let (total_err, consec_err) = if info.errors > 0 {
|
|
||||||
(info.errors.to_string(), info.consecutive_errors.to_string())
|
|
||||||
} else {
|
|
||||||
("-".into(), "-".into())
|
|
||||||
};
|
|
||||||
|
|
||||||
table.push(format!(
|
|
||||||
"{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
|
|
||||||
tid,
|
|
||||||
info.state,
|
|
||||||
info.name,
|
|
||||||
info.status
|
|
||||||
.tranquility
|
|
||||||
.as_ref()
|
|
||||||
.map(ToString::to_string)
|
|
||||||
.unwrap_or_else(|| "-".into()),
|
|
||||||
info.status.progress.as_deref().unwrap_or("-"),
|
|
||||||
info.status
|
|
||||||
.queue_length
|
|
||||||
.as_ref()
|
|
||||||
.map(ToString::to_string)
|
|
||||||
.unwrap_or_else(|| "-".into()),
|
|
||||||
total_err,
|
|
||||||
consec_err,
|
|
||||||
err_ago,
|
|
||||||
));
|
|
||||||
}
|
|
||||||
format_table(table);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn print_worker_info(tid: usize, info: WorkerInfo) {
|
|
||||||
let mut table = vec![];
|
|
||||||
table.push(format!("Task id:\t{}", tid));
|
|
||||||
table.push(format!("Worker name:\t{}", info.name));
|
|
||||||
match info.state {
|
|
||||||
WorkerState::Throttled(t) => {
|
|
||||||
table.push(format!(
|
|
||||||
"Worker state:\tBusy (throttled, paused for {:.3}s)",
|
|
||||||
t
|
|
||||||
));
|
|
||||||
}
|
|
||||||
s => {
|
|
||||||
table.push(format!("Worker state:\t{}", s));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if let Some(tql) = info.status.tranquility {
|
|
||||||
table.push(format!("Tranquility:\t{}", tql));
|
|
||||||
}
|
|
||||||
|
|
||||||
table.push("".into());
|
|
||||||
table.push(format!("Total errors:\t{}", info.errors));
|
|
||||||
table.push(format!("Consecutive errs:\t{}", info.consecutive_errors));
|
|
||||||
if let Some((s, t)) = info.last_error {
|
|
||||||
table.push(format!("Last error:\t{}", s));
|
|
||||||
let tf = timeago::Formatter::new();
|
|
||||||
table.push(format!(
|
|
||||||
"Last error time:\t{}",
|
|
||||||
tf.convert(Duration::from_millis(now_msec() - t))
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
table.push("".into());
|
|
||||||
if let Some(p) = info.status.progress {
|
|
||||||
table.push(format!("Progress:\t{}", p));
|
|
||||||
}
|
|
||||||
if let Some(ql) = info.status.queue_length {
|
|
||||||
table.push(format!("Queue length:\t{}", ql));
|
|
||||||
}
|
|
||||||
if let Some(pe) = info.status.persistent_errors {
|
|
||||||
table.push(format!("Persistent errors:\t{}", pe));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (i, s) in info.status.freeform.iter().enumerate() {
|
|
||||||
if i == 0 {
|
|
||||||
if table.last() != Some(&"".into()) {
|
|
||||||
table.push("".into());
|
|
||||||
}
|
|
||||||
table.push(format!("Message:\t{}", s));
|
|
||||||
} else {
|
|
||||||
table.push(format!("\t{}", s));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
format_table(table);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
|
pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
|
||||||
let now = now_msec();
|
let now = now_msec();
|
||||||
let tf = timeago::Formatter::new();
|
let tf = timeago::Formatter::new();
|
||||||
|
|
|
@ -11,6 +11,8 @@ use crate::cli_v2::*;
|
||||||
impl Cli {
|
impl Cli {
|
||||||
pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> {
|
pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> {
|
||||||
match cmd {
|
match cmd {
|
||||||
|
WorkerOperation::List { opt } => self.cmd_list_workers(opt).await,
|
||||||
|
WorkerOperation::Info { tid } => self.cmd_worker_info(tid).await,
|
||||||
WorkerOperation::Get {
|
WorkerOperation::Get {
|
||||||
all_nodes,
|
all_nodes,
|
||||||
variable,
|
variable,
|
||||||
|
@ -20,16 +22,138 @@ impl Cli {
|
||||||
variable,
|
variable,
|
||||||
value,
|
value,
|
||||||
} => self.cmd_set_var(all_nodes, variable, value).await,
|
} => self.cmd_set_var(all_nodes, variable, value).await,
|
||||||
wo => cli_v1::cmd_admin(
|
|
||||||
&self.admin_rpc_endpoint,
|
|
||||||
self.rpc_host,
|
|
||||||
AdminRpc::Worker(wo),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.ok_or_message("cli_v1"),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn cmd_list_workers(&self, opt: WorkerListOpt) -> Result<(), Error> {
|
||||||
|
let mut list = self
|
||||||
|
.api_request(ListWorkersRequest {
|
||||||
|
node: hex::encode(self.rpc_host),
|
||||||
|
body: LocalListWorkersRequest {
|
||||||
|
busy_only: opt.busy,
|
||||||
|
error_only: opt.errors,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
.await?
|
||||||
|
.into_single_response()?
|
||||||
|
.0;
|
||||||
|
|
||||||
|
list.sort_by_key(|info| {
|
||||||
|
(
|
||||||
|
match info.state {
|
||||||
|
WorkerStateResp::Busy | WorkerStateResp::Throttled { .. } => 0,
|
||||||
|
WorkerStateResp::Idle => 1,
|
||||||
|
WorkerStateResp::Done => 2,
|
||||||
|
},
|
||||||
|
info.id,
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut table =
|
||||||
|
vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()];
|
||||||
|
let tf = timeago::Formatter::new();
|
||||||
|
for info in list.iter() {
|
||||||
|
let err_ago = info
|
||||||
|
.last_error
|
||||||
|
.as_ref()
|
||||||
|
.map(|x| tf.convert(Duration::from_secs(x.secs_ago)))
|
||||||
|
.unwrap_or_default();
|
||||||
|
let (total_err, consec_err) = if info.errors > 0 {
|
||||||
|
(info.errors.to_string(), info.consecutive_errors.to_string())
|
||||||
|
} else {
|
||||||
|
("-".into(), "-".into())
|
||||||
|
};
|
||||||
|
|
||||||
|
table.push(format!(
|
||||||
|
"{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
|
||||||
|
info.id,
|
||||||
|
format_worker_state(&info.state),
|
||||||
|
info.name,
|
||||||
|
info.tranquility
|
||||||
|
.as_ref()
|
||||||
|
.map(ToString::to_string)
|
||||||
|
.unwrap_or_else(|| "-".into()),
|
||||||
|
info.progress.as_deref().unwrap_or("-"),
|
||||||
|
info.queue_length
|
||||||
|
.as_ref()
|
||||||
|
.map(ToString::to_string)
|
||||||
|
.unwrap_or_else(|| "-".into()),
|
||||||
|
total_err,
|
||||||
|
consec_err,
|
||||||
|
err_ago,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
format_table(table);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn cmd_worker_info(&self, tid: usize) -> Result<(), Error> {
|
||||||
|
let info = self
|
||||||
|
.api_request(GetWorkerInfoRequest {
|
||||||
|
node: hex::encode(self.rpc_host),
|
||||||
|
body: LocalGetWorkerInfoRequest { id: tid as u64 },
|
||||||
|
})
|
||||||
|
.await?
|
||||||
|
.into_single_response()?
|
||||||
|
.0;
|
||||||
|
|
||||||
|
let mut table = vec![];
|
||||||
|
table.push(format!("Task id:\t{}", info.id));
|
||||||
|
table.push(format!("Worker name:\t{}", info.name));
|
||||||
|
match &info.state {
|
||||||
|
WorkerStateResp::Throttled { duration_secs } => {
|
||||||
|
table.push(format!(
|
||||||
|
"Worker state:\tBusy (throttled, paused for {:.3}s)",
|
||||||
|
duration_secs
|
||||||
|
));
|
||||||
|
}
|
||||||
|
s => {
|
||||||
|
table.push(format!("Worker state:\t{}", format_worker_state(s)));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Some(tql) = info.tranquility {
|
||||||
|
table.push(format!("Tranquility:\t{}", tql));
|
||||||
|
}
|
||||||
|
|
||||||
|
table.push("".into());
|
||||||
|
table.push(format!("Total errors:\t{}", info.errors));
|
||||||
|
table.push(format!("Consecutive errs:\t{}", info.consecutive_errors));
|
||||||
|
if let Some(err) = info.last_error {
|
||||||
|
table.push(format!("Last error:\t{}", err.message));
|
||||||
|
let tf = timeago::Formatter::new();
|
||||||
|
table.push(format!(
|
||||||
|
"Last error time:\t{}",
|
||||||
|
tf.convert(Duration::from_secs(err.secs_ago))
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
table.push("".into());
|
||||||
|
if let Some(p) = info.progress {
|
||||||
|
table.push(format!("Progress:\t{}", p));
|
||||||
|
}
|
||||||
|
if let Some(ql) = info.queue_length {
|
||||||
|
table.push(format!("Queue length:\t{}", ql));
|
||||||
|
}
|
||||||
|
if let Some(pe) = info.persistent_errors {
|
||||||
|
table.push(format!("Persistent errors:\t{}", pe));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (i, s) in info.freeform.iter().enumerate() {
|
||||||
|
if i == 0 {
|
||||||
|
if table.last() != Some(&"".into()) {
|
||||||
|
table.push("".into());
|
||||||
|
}
|
||||||
|
table.push(format!("Message:\t{}", s));
|
||||||
|
} else {
|
||||||
|
table.push(format!("\t{}", s));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
format_table(table);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn cmd_get_var(&self, all: bool, var: Option<String>) -> Result<(), Error> {
|
pub async fn cmd_get_var(&self, all: bool, var: Option<String>) -> Result<(), Error> {
|
||||||
let res = self
|
let res = self
|
||||||
.api_request(GetWorkerVariableRequest {
|
.api_request(GetWorkerVariableRequest {
|
||||||
|
@ -87,3 +211,12 @@ impl Cli {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn format_worker_state(s: &WorkerStateResp) -> &'static str {
|
||||||
|
match s {
|
||||||
|
WorkerStateResp::Busy => "Busy",
|
||||||
|
WorkerStateResp::Throttled { .. } => "Busy*",
|
||||||
|
WorkerStateResp::Idle => "Idle",
|
||||||
|
WorkerStateResp::Done => "Done",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
|
@ -65,7 +64,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Initialize Admin API server and metrics collector...");
|
info!("Initialize Admin API server and metrics collector...");
|
||||||
let admin_server: Arc<AdminApiServer> = AdminApiServer::new(
|
let admin_server = AdminApiServer::new(
|
||||||
garage.clone(),
|
garage.clone(),
|
||||||
background.clone(),
|
background.clone(),
|
||||||
#[cfg(feature = "metrics")]
|
#[cfg(feature = "metrics")]
|
||||||
|
|
|
@ -6,7 +6,6 @@ pub mod worker;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use tokio::sync::{mpsc, watch};
|
use tokio::sync::{mpsc, watch};
|
||||||
|
|
||||||
use worker::WorkerProcessor;
|
use worker::WorkerProcessor;
|
||||||
|
@ -18,7 +17,7 @@ pub struct BackgroundRunner {
|
||||||
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
|
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Serialize, Deserialize, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct WorkerInfo {
|
pub struct WorkerInfo {
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub status: WorkerStatus,
|
pub status: WorkerStatus,
|
||||||
|
@ -30,7 +29,7 @@ pub struct WorkerInfo {
|
||||||
|
|
||||||
/// WorkerStatus is a struct returned by the worker with a bunch of canonical
|
/// WorkerStatus is a struct returned by the worker with a bunch of canonical
|
||||||
/// fields to indicate their status to CLI users. All fields are optional.
|
/// fields to indicate their status to CLI users. All fields are optional.
|
||||||
#[derive(Clone, Serialize, Deserialize, Debug, Default)]
|
#[derive(Clone, Debug, Default)]
|
||||||
pub struct WorkerStatus {
|
pub struct WorkerStatus {
|
||||||
pub tranquility: Option<u32>,
|
pub tranquility: Option<u32>,
|
||||||
pub progress: Option<String>,
|
pub progress: Option<String>,
|
||||||
|
|
|
@ -6,7 +6,6 @@ use async_trait::async_trait;
|
||||||
use futures::future::*;
|
use futures::future::*;
|
||||||
use futures::stream::FuturesUnordered;
|
use futures::stream::FuturesUnordered;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
use tokio::sync::{mpsc, watch};
|
use tokio::sync::{mpsc, watch};
|
||||||
|
|
||||||
|
@ -18,7 +17,7 @@ use crate::time::now_msec;
|
||||||
// will be interrupted in the middle of whatever they are doing.
|
// will be interrupted in the middle of whatever they are doing.
|
||||||
const EXIT_DEADLINE: Duration = Duration::from_secs(8);
|
const EXIT_DEADLINE: Duration = Duration::from_secs(8);
|
||||||
|
|
||||||
#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)]
|
#[derive(PartialEq, Copy, Clone, Debug)]
|
||||||
pub enum WorkerState {
|
pub enum WorkerState {
|
||||||
Busy,
|
Busy,
|
||||||
Throttled(f32),
|
Throttled(f32),
|
||||||
|
@ -26,17 +25,6 @@ pub enum WorkerState {
|
||||||
Done,
|
Done,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for WorkerState {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
match self {
|
|
||||||
WorkerState::Busy => write!(f, "Busy"),
|
|
||||||
WorkerState::Throttled(_) => write!(f, "Busy*"),
|
|
||||||
WorkerState::Idle => write!(f, "Idle"),
|
|
||||||
WorkerState::Done => write!(f, "Done"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait Worker: Send {
|
pub trait Worker: Send {
|
||||||
fn name(&self) -> String;
|
fn name(&self) -> String;
|
||||||
|
|
Loading…
Add table
Reference in a new issue