Begin implement bucket management & admin commands

This commit is contained in:
Alex 2020-04-19 17:15:48 +02:00
parent 302502f4c1
commit a6129d8626
13 changed files with 433 additions and 55 deletions

146
src/admin_rpc.rs Normal file
View File

@ -0,0 +1,146 @@
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use crate::data::*;
use crate::error::Error;
use crate::rpc_server::*;
use crate::server::Garage;
use crate::table::*;
use crate::*;
use crate::bucket_table::*;
pub const ADMIN_RPC_PATH: &str = "_admin";
#[derive(Debug, Serialize, Deserialize)]
pub enum AdminRPC {
BucketOperation(BucketOperation),
// Replies
Ok,
BucketList(Vec<String>),
BucketInfo(Bucket),
}
impl RpcMessage for AdminRPC {}
pub struct AdminRpcHandler {
garage: Arc<Garage>,
}
impl AdminRpcHandler {
pub fn new(garage: Arc<Garage>) -> Arc<Self> {
Arc::new(Self { garage })
}
pub fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer) {
rpc_server.add_handler::<AdminRPC, _, _>(ADMIN_RPC_PATH.to_string(), move |msg, _addr| {
let self2 = self.clone();
async move {
match msg {
AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await,
_ => Err(Error::Message(format!("Invalid RPC"))),
}
}
});
}
async fn handle_bucket_cmd(&self, cmd: BucketOperation) -> Result<AdminRPC, Error> {
match cmd {
BucketOperation::List => {
let bucket_names = self
.garage
.bucket_table
.get_range(&EmptyKey, None, Some(()), 10000)
.await?
.iter()
.map(|b| b.name.to_string())
.collect::<Vec<_>>();
Ok(AdminRPC::BucketList(bucket_names))
}
BucketOperation::Info(query) => {
let bucket = self
.garage
.bucket_table
.get(&EmptyKey, &query.name)
.await?
.filter(|b| !b.deleted);
match bucket {
Some(b) => Ok(AdminRPC::BucketInfo(b)),
None => Err(Error::Message(format!("Bucket {} not found", query.name))),
}
}
BucketOperation::Create(query) => {
let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?;
if bucket.as_ref().filter(|b| !b.deleted).is_some() {
return Err(Error::Message(format!(
"Bucket {} already exists",
query.name
)));
}
let new_time = match bucket {
Some(b) => std::cmp::max(b.timestamp + 1, now_msec()),
None => now_msec(),
};
self.garage
.bucket_table
.insert(&Bucket {
name: query.name,
timestamp: new_time,
deleted: false,
authorized_keys: vec![],
})
.await?;
Ok(AdminRPC::Ok)
}
BucketOperation::Delete(query) => {
let bucket = match self
.garage
.bucket_table
.get(&EmptyKey, &query.name)
.await?
.filter(|b| !b.deleted)
{
None => {
return Err(Error::Message(format!(
"Bucket {} does not exist",
query.name
)));
}
Some(b) => b,
};
let objects = self
.garage
.object_table
.get_range(&query.name, None, Some(()), 10)
.await?;
if !objects.is_empty() {
return Err(Error::Message(format!(
"Bucket {} is not empty",
query.name
)));
}
if !query.yes {
return Err(Error::Message(format!(
"Add --yes flag to really perform this operation"
)));
}
self.garage
.bucket_table
.insert(&Bucket {
name: query.name,
timestamp: std::cmp::max(bucket.timestamp + 1, now_msec()),
deleted: true,
authorized_keys: vec![],
})
.await?;
Ok(AdminRPC::Ok)
}
_ => {
// TODO
Err(Error::Message(format!("Not implemented")))
}
}
}
}

View File

@ -12,7 +12,7 @@ use hyper::{Body, Method, Request, Response, Server, StatusCode};
use crate::data::*;
use crate::error::Error;
use crate::http_util::*;
use crate::table::EmptySortKey;
use crate::table::EmptyKey;
use crate::block::INLINE_THRESHOLD;
use crate::block_ref_table::*;
@ -307,7 +307,7 @@ async fn handle_get(
}
ObjectVersionData::FirstBlock(first_block_hash) => {
let read_first_block = garage.block_manager.rpc_get_block(&first_block_hash);
let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptySortKey);
let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?;
let version = match version {

View File

@ -98,7 +98,7 @@ impl BlockManager {
Message::NeedBlockQuery(h) => {
self2.need_block(&h).await.map(Message::NeedBlockReply)
}
_ => Err(Error::Message(format!("Invalid RPC"))),
_ => Err(Error::BadRequest(format!("Unexpected RPC message"))),
}
}
});
@ -262,7 +262,7 @@ impl BlockManager {
let garage = self.garage.load_full().unwrap();
let active_refs = garage
.block_ref_table
.get_range(&hash, &[0u8; 32].into(), Some(()), 1)
.get_range(&hash, None, Some(()), 1)
.await?;
let needed_by_others = !active_refs.is_empty();
if needed_by_others {

79
src/bucket_table.rs Normal file
View File

@ -0,0 +1,79 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use crate::table::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket {
// Primary key
pub name: String,
// Timestamp and deletion
// Upon version increment, all info is replaced
pub timestamp: u64,
pub deleted: bool,
// Authorized keys
pub authorized_keys: Vec<AllowedKey>,
}
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct AllowedKey {
pub access_key_id: String,
pub timestamp: u64,
pub allowed_read: bool,
pub allowed_write: bool,
}
impl Entry<EmptyKey, String> for Bucket {
fn partition_key(&self) -> &EmptyKey {
&EmptyKey
}
fn sort_key(&self) -> &String {
&self.name
}
fn merge(&mut self, other: &Self) {
if other.timestamp < self.timestamp {
*self = other.clone();
return;
}
if self.timestamp > other.timestamp {
return;
}
for ak in other.authorized_keys.iter() {
match self
.authorized_keys
.binary_search_by(|our_ak| our_ak.access_key_id.cmp(&ak.access_key_id))
{
Ok(i) => {
let our_ak = &mut self.authorized_keys[i];
if ak.timestamp > our_ak.timestamp {
our_ak.timestamp = ak.timestamp;
our_ak.allowed_read = ak.allowed_read;
our_ak.allowed_write = ak.allowed_write;
}
}
Err(i) => {
self.authorized_keys.insert(i, ak.clone());
}
}
}
}
}
pub struct BucketTable;
#[async_trait]
impl TableSchema for BucketTable {
type P = EmptyKey;
type S = String;
type E = Bucket;
type Filter = ();
async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {}
fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {
!entry.deleted
}
}

View File

@ -43,8 +43,8 @@ pub enum Error {
#[error(display = "Tokio join error: {}", _0)]
TokioJoin(#[error(source)] tokio::task::JoinError),
#[error(display = "RPC error: {}", _0)]
RPCError(String),
#[error(display = "RPC error: {} (status code {})", _0, _1)]
RPCError(String, StatusCode),
#[error(display = "Bad request: {}", _0)]
BadRequest(String),

View File

@ -10,9 +10,11 @@ mod table_sync;
mod block;
mod block_ref_table;
mod bucket_table;
mod object_table;
mod version_table;
mod admin_rpc;
mod api_server;
mod http_util;
mod rpc_client;
@ -20,6 +22,7 @@ mod rpc_server;
mod server;
mod tls_util;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::net::SocketAddr;
use std::path::PathBuf;
@ -32,6 +35,8 @@ use membership::*;
use rpc_client::*;
use server::TlsConfig;
use admin_rpc::*;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(StructOpt, Debug)]
@ -62,13 +67,13 @@ pub enum Command {
#[structopt(name = "status")]
Status,
/// Configure Garage node
#[structopt(name = "configure")]
Configure(ConfigureOpt),
/// Garage node operations
#[structopt(name = "node")]
Node(NodeOperation),
/// Remove Garage node from cluster
#[structopt(name = "remove")]
Remove(RemoveOpt),
/// Bucket operations
#[structopt(name = "bucket")]
Bucket(BucketOperation),
}
#[derive(StructOpt, Debug)]
@ -79,7 +84,18 @@ pub struct ServerOpt {
}
#[derive(StructOpt, Debug)]
pub struct ConfigureOpt {
pub enum NodeOperation {
/// Configure Garage node
#[structopt(name = "configure")]
Configure(ConfigureNodeOpt),
/// Remove Garage node from cluster
#[structopt(name = "remove")]
Remove(RemoveNodeOpt),
}
#[derive(StructOpt, Debug)]
pub struct ConfigureNodeOpt {
/// Node to configure (prefix of hexadecimal node id)
node_id: String,
@ -91,7 +107,7 @@ pub struct ConfigureOpt {
}
#[derive(StructOpt, Debug)]
pub struct RemoveOpt {
pub struct RemoveNodeOpt {
/// Node to configure (prefix of hexadecimal node id)
node_id: String,
@ -100,6 +116,67 @@ pub struct RemoveOpt {
yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub enum BucketOperation {
/// List buckets
#[structopt(name = "list")]
List,
/// Get bucket info
#[structopt(name = "info")]
Info(BucketOpt),
/// Create bucket
#[structopt(name = "create")]
Create(BucketOpt),
/// Delete bucket
#[structopt(name = "delete")]
Delete(DeleteBucketOpt),
/// Allow key to read or write to bucket
#[structopt(name = "allow")]
Allow(PermBucketOpt),
/// Allow key to read or write to bucket
#[structopt(name = "deny")]
Deny(PermBucketOpt),
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct BucketOpt {
/// Bucket name
pub name: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct DeleteBucketOpt {
/// Bucket name
pub name: String,
/// If this flag is not given, the bucket won't be deleted
#[structopt(long = "yes")]
pub yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct PermBucketOpt {
/// Access key ID
#[structopt(long = "key")]
pub key: String,
/// Allow/deny read operations
#[structopt(long = "read")]
pub read: bool,
/// Allow/deny write operations
#[structopt(long = "write")]
pub write: bool,
/// Bucket name
pub bucket: String,
}
#[tokio::main]
async fn main() {
let opt = Opt::from_args();
@ -119,7 +196,9 @@ async fn main() {
let rpc_http_cli =
Arc::new(RpcHttpClient::new(&tls_config).expect("Could not create RPC client"));
let rpc_cli = RpcAddrClient::new(rpc_http_cli, "_membership".into());
let membership_rpc_cli =
RpcAddrClient::new(rpc_http_cli.clone(), MEMBERSHIP_RPC_PATH.to_string());
let admin_rpc_cli = RpcAddrClient::new(rpc_http_cli.clone(), ADMIN_RPC_PATH.to_string());
let resp = match opt.cmd {
Command::Server(server_opt) => {
@ -131,11 +210,16 @@ async fn main() {
server::run_server(server_opt.config_file).await
}
Command::Status => cmd_status(rpc_cli, opt.rpc_host).await,
Command::Configure(configure_opt) => {
cmd_configure(rpc_cli, opt.rpc_host, configure_opt).await
Command::Status => cmd_status(membership_rpc_cli, opt.rpc_host).await,
Command::Node(NodeOperation::Configure(configure_opt)) => {
cmd_configure(membership_rpc_cli, opt.rpc_host, configure_opt).await
}
Command::Node(NodeOperation::Remove(remove_opt)) => {
cmd_remove(membership_rpc_cli, opt.rpc_host, remove_opt).await
}
Command::Bucket(bo) => {
cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::BucketOperation(bo)).await
}
Command::Remove(remove_opt) => cmd_remove(rpc_cli, opt.rpc_host, remove_opt).await,
};
if let Err(e) = resp {
@ -201,7 +285,7 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re
async fn cmd_configure(
rpc_cli: RpcAddrClient<Message>,
rpc_host: SocketAddr,
args: ConfigureOpt,
args: ConfigureNodeOpt,
) -> Result<(), Error> {
let status = match rpc_cli
.call(&rpc_host, &Message::PullStatus, DEFAULT_TIMEOUT)
@ -254,7 +338,7 @@ async fn cmd_configure(
async fn cmd_remove(
rpc_cli: RpcAddrClient<Message>,
rpc_host: SocketAddr,
args: RemoveOpt,
args: RemoveNodeOpt,
) -> Result<(), Error> {
let mut config = match rpc_cli
.call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT)
@ -296,3 +380,28 @@ async fn cmd_remove(
.await?;
Ok(())
}
async fn cmd_admin(
rpc_cli: RpcAddrClient<AdminRPC>,
rpc_host: SocketAddr,
args: AdminRPC,
) -> Result<(), Error> {
match rpc_cli.call(&rpc_host, args, DEFAULT_TIMEOUT).await? {
AdminRPC::Ok => {
println!("Ok.");
}
AdminRPC::BucketList(bl) => {
println!("List of buckets:");
for bucket in bl {
println!("{}", bucket);
}
}
AdminRPC::BucketInfo(bucket) => {
println!("{:?}", bucket);
}
r => {
eprintln!("Unexpected response: {:?}", r);
}
}
Ok(())
}

View File

@ -27,6 +27,8 @@ const PING_INTERVAL: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILED_PINGS: usize = 3;
pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
Ok,
@ -277,9 +279,9 @@ impl System {
let rpc_http_client =
Arc::new(RpcHttpClient::new(&config.rpc_tls).expect("Could not create RPC client"));
let rpc_path = "_membership";
let rpc_path = MEMBERSHIP_RPC_PATH.to_string();
let rpc_client = RpcClient::new(
RpcAddrClient::<Message>::new(rpc_http_client.clone(), rpc_path.into()),
RpcAddrClient::<Message>::new(rpc_http_client.clone(), rpc_path.clone()),
background.clone(),
status.clone(),
);
@ -294,7 +296,7 @@ impl System {
update_lock: Mutex::new((update_status, update_ring)),
background,
});
sys.clone().register_handler(rpc_server, rpc_path.into());
sys.clone().register_handler(rpc_server, rpc_path);
sys
}
@ -310,7 +312,7 @@ impl System {
Message::AdvertiseNodesUp(adv) => self2.handle_advertise_nodes_up(&adv).await,
Message::AdvertiseConfig(adv) => self2.handle_advertise_config(&adv).await,
_ => Err(Error::Message(format!("Unexpected RPC message"))),
_ => Err(Error::BadRequest(format!("Unexpected RPC message"))),
}
}
});

View File

@ -9,7 +9,7 @@ use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use futures_util::future::FutureExt;
use hyper::client::{Client, HttpConnector};
use hyper::{Body, Method, Request, StatusCode};
use hyper::{Body, Method, Request};
use tokio::sync::watch;
use crate::background::BackgroundRunner;
@ -228,12 +228,14 @@ impl RpcHttpClient {
e
})?;
if resp.status() == StatusCode::OK {
let body = hyper::body::to_bytes(resp.into_body()).await?;
let msg = rmp_serde::decode::from_read::<_, Result<M, String>>(body.into_buf())?;
msg.map_err(Error::RPCError)
} else {
Err(Error::RPCError(format!("Status code {}", resp.status())))
let status = resp.status();
let body = hyper::body::to_bytes(resp.into_body()).await?;
match rmp_serde::decode::from_read::<_, Result<M, String>>(body.into_buf()) {
Err(e) =>
Err(Error::RPCError(format!("Invalid reply"), status)),
Ok(Err(e)) =>
Err(Error::RPCError(e, status)),
Ok(Ok(x)) => Ok(x),
}
}
}

View File

@ -106,7 +106,8 @@ impl RpcServer {
let resp_waiter = tokio::spawn(handler(req, addr));
match resp_waiter.await {
Err(_err) => {
Err(err) => {
eprintln!("Handler await error: {}", err);
let mut ise = Response::default();
*ise.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
Ok(ise)

View File

@ -13,13 +13,16 @@ use crate::error::Error;
use crate::membership::System;
use crate::rpc_server::RpcServer;
use crate::table::*;
use crate::table_fullcopy::*;
use crate::table_sharded::*;
use crate::block::*;
use crate::block_ref_table::*;
use crate::bucket_table::*;
use crate::object_table::*;
use crate::version_table::*;
use crate::admin_rpc::*;
use crate::api_server;
#[derive(Deserialize, Debug, Clone)]
@ -38,12 +41,25 @@ pub struct Config {
#[serde(default = "default_replication_factor")]
pub meta_replication_factor: usize,
#[serde(default = "default_epidemic_factor")]
pub meta_epidemic_factor: usize,
#[serde(default = "default_replication_factor")]
pub data_replication_factor: usize,
pub rpc_tls: Option<TlsConfig>,
}
fn default_block_size() -> usize {
1048576
}
fn default_replication_factor() -> usize {
3
}
fn default_epidemic_factor() -> usize {
3
}
#[derive(Deserialize, Debug, Clone)]
pub struct TlsConfig {
pub ca_cert: String,
@ -57,6 +73,7 @@ pub struct Garage {
pub system: Arc<System>,
pub block_manager: Arc<BlockManager>,
pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
@ -89,6 +106,11 @@ impl Garage {
read_quorum: (system.config.meta_replication_factor + 1) / 2,
};
let control_rep_param = TableFullReplication::new(
system.config.meta_epidemic_factor,
(system.config.meta_epidemic_factor + 1) / 2,
);
println!("Initialize block_ref_table...");
let block_ref_table = Table::new(
BlockRefTable {
@ -131,17 +153,32 @@ impl Garage {
)
.await;
println!("Initialize bucket_table...");
let bucket_table = Table::new(
BucketTable,
control_rep_param.clone(),
system.clone(),
&db,
"bucket".to_string(),
rpc_server,
)
.await;
println!("Initialize Garage...");
let garage = Arc::new(Self {
db,
system: system.clone(),
block_manager,
background,
bucket_table,
object_table,
version_table,
block_ref_table,
});
println!("Crate admin RPC handler...");
AdminRpcHandler::new(garage.clone()).register_handler(rpc_server);
println!("Start block manager background thread...");
garage.block_manager.garage.swap(Some(garage.clone()));
garage.block_manager.clone().spawn_background_worker().await;
@ -150,13 +187,6 @@ impl Garage {
}
}
fn default_block_size() -> usize {
1048576
}
fn default_replication_factor() -> usize {
3
}
fn read_config(config_file: PathBuf) -> Result<Config, Error> {
let mut file = std::fs::OpenOptions::new()
.read(true)

View File

@ -36,7 +36,8 @@ pub enum TableRPC<F: TableSchema> {
ReadEntry(F::P, F::S),
ReadEntryResponse(Option<ByteBuf>),
ReadRange(F::P, F::S, Option<F::Filter>, usize),
// Read range: read all keys in partition P, possibly starting at a certain sort key offset
ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize),
Update(Vec<Arc<ByteBuf>>),
@ -62,13 +63,18 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
fn merge(&mut self, other: &Self);
}
#[derive(Clone, Serialize, Deserialize)]
pub struct EmptySortKey;
impl SortKey for EmptySortKey {
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct EmptyKey;
impl SortKey for EmptyKey {
fn sort_key(&self) -> &[u8] {
&[]
}
}
impl PartitionKey for EmptyKey {
fn hash(&self) -> Hash {
[0u8; 32].into()
}
}
impl<T: AsRef<str>> PartitionKey for T {
fn hash(&self) -> Hash {
@ -272,15 +278,15 @@ where
pub async fn get_range(
self: &Arc<Self>,
partition_key: &F::P,
begin_sort_key: &F::S,
begin_sort_key: Option<F::S>,
filter: Option<F::Filter>,
limit: usize,
) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash();
let who = self.replication.read_nodes(&hash, &self.system);
let rpc =
TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit);
let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
let resps = self
.rpc_client
.try_call_many(
@ -378,7 +384,7 @@ where
.await?;
Ok(TableRPC::SyncRPC(response))
}
_ => Err(Error::RPCError(format!("Unexpected table RPC"))),
_ => Err(Error::BadRequest(format!("Unexpected table RPC"))),
}
}
@ -394,12 +400,15 @@ where
fn handle_read_range(
&self,
p: &F::P,
s: &F::S,
s: &Option<F::S>,
filter: &Option<F::Filter>,
limit: usize,
) -> Result<Vec<Arc<ByteBuf>>, Error> {
let partition_hash = p.hash();
let first_key = self.tree_key(p, s);
let first_key = match s {
None => partition_hash.to_vec(),
Some(sk) => self.tree_key(p, sk),
};
let mut ret = vec![];
for item in self.store.range(first_key..) {
let (key, value) = item?;

View File

@ -438,7 +438,7 @@ where
.spawn(self.clone().send_items(who.clone(), items_to_send));
}
} else {
return Err(Error::Message(format!(
return Err(Error::BadRequest(format!(
"Unexpected response to sync RPC checksums: {}",
debug_serialize(&rpc_resp)
)));

View File

@ -30,12 +30,12 @@ pub struct VersionBlock {
pub hash: Hash,
}
impl Entry<Hash, EmptySortKey> for Version {
impl Entry<Hash, EmptyKey> for Version {
fn partition_key(&self) -> &Hash {
&self.uuid
}
fn sort_key(&self) -> &EmptySortKey {
&EmptySortKey
fn sort_key(&self) -> &EmptyKey {
&EmptyKey
}
fn merge(&mut self, other: &Self) {
@ -63,7 +63,7 @@ pub struct VersionTable {
#[async_trait]
impl TableSchema for VersionTable {
type P = Hash;
type S = EmptySortKey;
type S = EmptyKey;
type E = Version;
type Filter = ();