diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs new file mode 100644 index 00000000..8a274c26 --- /dev/null +++ b/src/admin_rpc.rs @@ -0,0 +1,146 @@ +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; + +use crate::data::*; +use crate::error::Error; +use crate::rpc_server::*; +use crate::server::Garage; +use crate::table::*; +use crate::*; + +use crate::bucket_table::*; + +pub const ADMIN_RPC_PATH: &str = "_admin"; + +#[derive(Debug, Serialize, Deserialize)] +pub enum AdminRPC { + BucketOperation(BucketOperation), + + // Replies + Ok, + BucketList(Vec), + BucketInfo(Bucket), +} + +impl RpcMessage for AdminRPC {} + +pub struct AdminRpcHandler { + garage: Arc, +} + +impl AdminRpcHandler { + pub fn new(garage: Arc) -> Arc { + Arc::new(Self { garage }) + } + + pub fn register_handler(self: Arc, rpc_server: &mut RpcServer) { + rpc_server.add_handler::(ADMIN_RPC_PATH.to_string(), move |msg, _addr| { + let self2 = self.clone(); + async move { + match msg { + AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, + _ => Err(Error::Message(format!("Invalid RPC"))), + } + } + }); + } + + async fn handle_bucket_cmd(&self, cmd: BucketOperation) -> Result { + match cmd { + BucketOperation::List => { + let bucket_names = self + .garage + .bucket_table + .get_range(&EmptyKey, None, Some(()), 10000) + .await? + .iter() + .map(|b| b.name.to_string()) + .collect::>(); + Ok(AdminRPC::BucketList(bucket_names)) + } + BucketOperation::Info(query) => { + let bucket = self + .garage + .bucket_table + .get(&EmptyKey, &query.name) + .await? + .filter(|b| !b.deleted); + match bucket { + Some(b) => Ok(AdminRPC::BucketInfo(b)), + None => Err(Error::Message(format!("Bucket {} not found", query.name))), + } + } + BucketOperation::Create(query) => { + let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?; + if bucket.as_ref().filter(|b| !b.deleted).is_some() { + return Err(Error::Message(format!( + "Bucket {} already exists", + query.name + ))); + } + let new_time = match bucket { + Some(b) => std::cmp::max(b.timestamp + 1, now_msec()), + None => now_msec(), + }; + self.garage + .bucket_table + .insert(&Bucket { + name: query.name, + timestamp: new_time, + deleted: false, + authorized_keys: vec![], + }) + .await?; + Ok(AdminRPC::Ok) + } + BucketOperation::Delete(query) => { + let bucket = match self + .garage + .bucket_table + .get(&EmptyKey, &query.name) + .await? + .filter(|b| !b.deleted) + { + None => { + return Err(Error::Message(format!( + "Bucket {} does not exist", + query.name + ))); + } + Some(b) => b, + }; + let objects = self + .garage + .object_table + .get_range(&query.name, None, Some(()), 10) + .await?; + if !objects.is_empty() { + return Err(Error::Message(format!( + "Bucket {} is not empty", + query.name + ))); + } + if !query.yes { + return Err(Error::Message(format!( + "Add --yes flag to really perform this operation" + ))); + } + self.garage + .bucket_table + .insert(&Bucket { + name: query.name, + timestamp: std::cmp::max(bucket.timestamp + 1, now_msec()), + deleted: true, + authorized_keys: vec![], + }) + .await?; + Ok(AdminRPC::Ok) + } + _ => { + // TODO + Err(Error::Message(format!("Not implemented"))) + } + } + } +} diff --git a/src/api_server.rs b/src/api_server.rs index 4070be6c..52464f07 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -12,7 +12,7 @@ use hyper::{Body, Method, Request, Response, Server, StatusCode}; use crate::data::*; use crate::error::Error; use crate::http_util::*; -use crate::table::EmptySortKey; +use crate::table::EmptyKey; use crate::block::INLINE_THRESHOLD; use crate::block_ref_table::*; @@ -307,7 +307,7 @@ async fn handle_get( } ObjectVersionData::FirstBlock(first_block_hash) => { let read_first_block = garage.block_manager.rpc_get_block(&first_block_hash); - let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptySortKey); + let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?; let version = match version { diff --git a/src/block.rs b/src/block.rs index d7daea01..c84f193b 100644 --- a/src/block.rs +++ b/src/block.rs @@ -98,7 +98,7 @@ impl BlockManager { Message::NeedBlockQuery(h) => { self2.need_block(&h).await.map(Message::NeedBlockReply) } - _ => Err(Error::Message(format!("Invalid RPC"))), + _ => Err(Error::BadRequest(format!("Unexpected RPC message"))), } } }); @@ -262,7 +262,7 @@ impl BlockManager { let garage = self.garage.load_full().unwrap(); let active_refs = garage .block_ref_table - .get_range(&hash, &[0u8; 32].into(), Some(()), 1) + .get_range(&hash, None, Some(()), 1) .await?; let needed_by_others = !active_refs.is_empty(); if needed_by_others { diff --git a/src/bucket_table.rs b/src/bucket_table.rs new file mode 100644 index 00000000..be7dd348 --- /dev/null +++ b/src/bucket_table.rs @@ -0,0 +1,79 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +use crate::table::*; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Bucket { + // Primary key + pub name: String, + + // Timestamp and deletion + // Upon version increment, all info is replaced + pub timestamp: u64, + pub deleted: bool, + + // Authorized keys + pub authorized_keys: Vec, +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct AllowedKey { + pub access_key_id: String, + pub timestamp: u64, + pub allowed_read: bool, + pub allowed_write: bool, +} + +impl Entry for Bucket { + fn partition_key(&self) -> &EmptyKey { + &EmptyKey + } + fn sort_key(&self) -> &String { + &self.name + } + + fn merge(&mut self, other: &Self) { + if other.timestamp < self.timestamp { + *self = other.clone(); + return; + } + if self.timestamp > other.timestamp { + return; + } + for ak in other.authorized_keys.iter() { + match self + .authorized_keys + .binary_search_by(|our_ak| our_ak.access_key_id.cmp(&ak.access_key_id)) + { + Ok(i) => { + let our_ak = &mut self.authorized_keys[i]; + if ak.timestamp > our_ak.timestamp { + our_ak.timestamp = ak.timestamp; + our_ak.allowed_read = ak.allowed_read; + our_ak.allowed_write = ak.allowed_write; + } + } + Err(i) => { + self.authorized_keys.insert(i, ak.clone()); + } + } + } + } +} + +pub struct BucketTable; + +#[async_trait] +impl TableSchema for BucketTable { + type P = EmptyKey; + type S = String; + type E = Bucket; + type Filter = (); + + async fn updated(&self, _old: Option, _new: Option) {} + + fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { + !entry.deleted + } +} diff --git a/src/error.rs b/src/error.rs index 7c648fab..678ab72d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -43,8 +43,8 @@ pub enum Error { #[error(display = "Tokio join error: {}", _0)] TokioJoin(#[error(source)] tokio::task::JoinError), - #[error(display = "RPC error: {}", _0)] - RPCError(String), + #[error(display = "RPC error: {} (status code {})", _0, _1)] + RPCError(String, StatusCode), #[error(display = "Bad request: {}", _0)] BadRequest(String), diff --git a/src/main.rs b/src/main.rs index 89953223..08f37dd5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,9 +10,11 @@ mod table_sync; mod block; mod block_ref_table; +mod bucket_table; mod object_table; mod version_table; +mod admin_rpc; mod api_server; mod http_util; mod rpc_client; @@ -20,6 +22,7 @@ mod rpc_server; mod server; mod tls_util; +use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::net::SocketAddr; use std::path::PathBuf; @@ -32,6 +35,8 @@ use membership::*; use rpc_client::*; use server::TlsConfig; +use admin_rpc::*; + const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); #[derive(StructOpt, Debug)] @@ -62,13 +67,13 @@ pub enum Command { #[structopt(name = "status")] Status, - /// Configure Garage node - #[structopt(name = "configure")] - Configure(ConfigureOpt), + /// Garage node operations + #[structopt(name = "node")] + Node(NodeOperation), - /// Remove Garage node from cluster - #[structopt(name = "remove")] - Remove(RemoveOpt), + /// Bucket operations + #[structopt(name = "bucket")] + Bucket(BucketOperation), } #[derive(StructOpt, Debug)] @@ -79,7 +84,18 @@ pub struct ServerOpt { } #[derive(StructOpt, Debug)] -pub struct ConfigureOpt { +pub enum NodeOperation { + /// Configure Garage node + #[structopt(name = "configure")] + Configure(ConfigureNodeOpt), + + /// Remove Garage node from cluster + #[structopt(name = "remove")] + Remove(RemoveNodeOpt), +} + +#[derive(StructOpt, Debug)] +pub struct ConfigureNodeOpt { /// Node to configure (prefix of hexadecimal node id) node_id: String, @@ -91,7 +107,7 @@ pub struct ConfigureOpt { } #[derive(StructOpt, Debug)] -pub struct RemoveOpt { +pub struct RemoveNodeOpt { /// Node to configure (prefix of hexadecimal node id) node_id: String, @@ -100,6 +116,67 @@ pub struct RemoveOpt { yes: bool, } +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub enum BucketOperation { + /// List buckets + #[structopt(name = "list")] + List, + + /// Get bucket info + #[structopt(name = "info")] + Info(BucketOpt), + + /// Create bucket + #[structopt(name = "create")] + Create(BucketOpt), + + /// Delete bucket + #[structopt(name = "delete")] + Delete(DeleteBucketOpt), + + /// Allow key to read or write to bucket + #[structopt(name = "allow")] + Allow(PermBucketOpt), + + /// Allow key to read or write to bucket + #[structopt(name = "deny")] + Deny(PermBucketOpt), +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct BucketOpt { + /// Bucket name + pub name: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct DeleteBucketOpt { + /// Bucket name + pub name: String, + + /// If this flag is not given, the bucket won't be deleted + #[structopt(long = "yes")] + pub yes: bool, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct PermBucketOpt { + /// Access key ID + #[structopt(long = "key")] + pub key: String, + + /// Allow/deny read operations + #[structopt(long = "read")] + pub read: bool, + + /// Allow/deny write operations + #[structopt(long = "write")] + pub write: bool, + + /// Bucket name + pub bucket: String, +} + #[tokio::main] async fn main() { let opt = Opt::from_args(); @@ -119,7 +196,9 @@ async fn main() { let rpc_http_cli = Arc::new(RpcHttpClient::new(&tls_config).expect("Could not create RPC client")); - let rpc_cli = RpcAddrClient::new(rpc_http_cli, "_membership".into()); + let membership_rpc_cli = + RpcAddrClient::new(rpc_http_cli.clone(), MEMBERSHIP_RPC_PATH.to_string()); + let admin_rpc_cli = RpcAddrClient::new(rpc_http_cli.clone(), ADMIN_RPC_PATH.to_string()); let resp = match opt.cmd { Command::Server(server_opt) => { @@ -131,11 +210,16 @@ async fn main() { server::run_server(server_opt.config_file).await } - Command::Status => cmd_status(rpc_cli, opt.rpc_host).await, - Command::Configure(configure_opt) => { - cmd_configure(rpc_cli, opt.rpc_host, configure_opt).await + Command::Status => cmd_status(membership_rpc_cli, opt.rpc_host).await, + Command::Node(NodeOperation::Configure(configure_opt)) => { + cmd_configure(membership_rpc_cli, opt.rpc_host, configure_opt).await + } + Command::Node(NodeOperation::Remove(remove_opt)) => { + cmd_remove(membership_rpc_cli, opt.rpc_host, remove_opt).await + } + Command::Bucket(bo) => { + cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::BucketOperation(bo)).await } - Command::Remove(remove_opt) => cmd_remove(rpc_cli, opt.rpc_host, remove_opt).await, }; if let Err(e) = resp { @@ -201,7 +285,7 @@ async fn cmd_status(rpc_cli: RpcAddrClient, rpc_host: SocketAddr) -> Re async fn cmd_configure( rpc_cli: RpcAddrClient, rpc_host: SocketAddr, - args: ConfigureOpt, + args: ConfigureNodeOpt, ) -> Result<(), Error> { let status = match rpc_cli .call(&rpc_host, &Message::PullStatus, DEFAULT_TIMEOUT) @@ -254,7 +338,7 @@ async fn cmd_configure( async fn cmd_remove( rpc_cli: RpcAddrClient, rpc_host: SocketAddr, - args: RemoveOpt, + args: RemoveNodeOpt, ) -> Result<(), Error> { let mut config = match rpc_cli .call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT) @@ -296,3 +380,28 @@ async fn cmd_remove( .await?; Ok(()) } + +async fn cmd_admin( + rpc_cli: RpcAddrClient, + rpc_host: SocketAddr, + args: AdminRPC, +) -> Result<(), Error> { + match rpc_cli.call(&rpc_host, args, DEFAULT_TIMEOUT).await? { + AdminRPC::Ok => { + println!("Ok."); + } + AdminRPC::BucketList(bl) => { + println!("List of buckets:"); + for bucket in bl { + println!("{}", bucket); + } + } + AdminRPC::BucketInfo(bucket) => { + println!("{:?}", bucket); + } + r => { + eprintln!("Unexpected response: {:?}", r); + } + } + Ok(()) +} diff --git a/src/membership.rs b/src/membership.rs index 08dd5f2f..99b0388d 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -27,6 +27,8 @@ const PING_INTERVAL: Duration = Duration::from_secs(10); const PING_TIMEOUT: Duration = Duration::from_secs(2); const MAX_FAILED_PINGS: usize = 3; +pub const MEMBERSHIP_RPC_PATH: &str = "_membership"; + #[derive(Debug, Serialize, Deserialize)] pub enum Message { Ok, @@ -277,9 +279,9 @@ impl System { let rpc_http_client = Arc::new(RpcHttpClient::new(&config.rpc_tls).expect("Could not create RPC client")); - let rpc_path = "_membership"; + let rpc_path = MEMBERSHIP_RPC_PATH.to_string(); let rpc_client = RpcClient::new( - RpcAddrClient::::new(rpc_http_client.clone(), rpc_path.into()), + RpcAddrClient::::new(rpc_http_client.clone(), rpc_path.clone()), background.clone(), status.clone(), ); @@ -294,7 +296,7 @@ impl System { update_lock: Mutex::new((update_status, update_ring)), background, }); - sys.clone().register_handler(rpc_server, rpc_path.into()); + sys.clone().register_handler(rpc_server, rpc_path); sys } @@ -310,7 +312,7 @@ impl System { Message::AdvertiseNodesUp(adv) => self2.handle_advertise_nodes_up(&adv).await, Message::AdvertiseConfig(adv) => self2.handle_advertise_config(&adv).await, - _ => Err(Error::Message(format!("Unexpected RPC message"))), + _ => Err(Error::BadRequest(format!("Unexpected RPC message"))), } } }); diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 95288269..35debb53 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -9,7 +9,7 @@ use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; use futures_util::future::FutureExt; use hyper::client::{Client, HttpConnector}; -use hyper::{Body, Method, Request, StatusCode}; +use hyper::{Body, Method, Request}; use tokio::sync::watch; use crate::background::BackgroundRunner; @@ -228,12 +228,14 @@ impl RpcHttpClient { e })?; - if resp.status() == StatusCode::OK { - let body = hyper::body::to_bytes(resp.into_body()).await?; - let msg = rmp_serde::decode::from_read::<_, Result>(body.into_buf())?; - msg.map_err(Error::RPCError) - } else { - Err(Error::RPCError(format!("Status code {}", resp.status()))) + let status = resp.status(); + let body = hyper::body::to_bytes(resp.into_body()).await?; + match rmp_serde::decode::from_read::<_, Result>(body.into_buf()) { + Err(e) => + Err(Error::RPCError(format!("Invalid reply"), status)), + Ok(Err(e)) => + Err(Error::RPCError(e, status)), + Ok(Ok(x)) => Ok(x), } } } diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 83f8ddc9..7a6a57ee 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -106,7 +106,8 @@ impl RpcServer { let resp_waiter = tokio::spawn(handler(req, addr)); match resp_waiter.await { - Err(_err) => { + Err(err) => { + eprintln!("Handler await error: {}", err); let mut ise = Response::default(); *ise.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; Ok(ise) diff --git a/src/server.rs b/src/server.rs index 6b4b5b6b..979d76f9 100644 --- a/src/server.rs +++ b/src/server.rs @@ -13,13 +13,16 @@ use crate::error::Error; use crate::membership::System; use crate::rpc_server::RpcServer; use crate::table::*; +use crate::table_fullcopy::*; use crate::table_sharded::*; use crate::block::*; use crate::block_ref_table::*; +use crate::bucket_table::*; use crate::object_table::*; use crate::version_table::*; +use crate::admin_rpc::*; use crate::api_server; #[derive(Deserialize, Debug, Clone)] @@ -38,12 +41,25 @@ pub struct Config { #[serde(default = "default_replication_factor")] pub meta_replication_factor: usize, + #[serde(default = "default_epidemic_factor")] + pub meta_epidemic_factor: usize, + #[serde(default = "default_replication_factor")] pub data_replication_factor: usize, pub rpc_tls: Option, } +fn default_block_size() -> usize { + 1048576 +} +fn default_replication_factor() -> usize { + 3 +} +fn default_epidemic_factor() -> usize { + 3 +} + #[derive(Deserialize, Debug, Clone)] pub struct TlsConfig { pub ca_cert: String, @@ -57,6 +73,7 @@ pub struct Garage { pub system: Arc, pub block_manager: Arc, + pub bucket_table: Arc>, pub object_table: Arc>, pub version_table: Arc>, pub block_ref_table: Arc>, @@ -89,6 +106,11 @@ impl Garage { read_quorum: (system.config.meta_replication_factor + 1) / 2, }; + let control_rep_param = TableFullReplication::new( + system.config.meta_epidemic_factor, + (system.config.meta_epidemic_factor + 1) / 2, + ); + println!("Initialize block_ref_table..."); let block_ref_table = Table::new( BlockRefTable { @@ -131,17 +153,32 @@ impl Garage { ) .await; + println!("Initialize bucket_table..."); + let bucket_table = Table::new( + BucketTable, + control_rep_param.clone(), + system.clone(), + &db, + "bucket".to_string(), + rpc_server, + ) + .await; + println!("Initialize Garage..."); let garage = Arc::new(Self { db, system: system.clone(), block_manager, background, + bucket_table, object_table, version_table, block_ref_table, }); + println!("Crate admin RPC handler..."); + AdminRpcHandler::new(garage.clone()).register_handler(rpc_server); + println!("Start block manager background thread..."); garage.block_manager.garage.swap(Some(garage.clone())); garage.block_manager.clone().spawn_background_worker().await; @@ -150,13 +187,6 @@ impl Garage { } } -fn default_block_size() -> usize { - 1048576 -} -fn default_replication_factor() -> usize { - 3 -} - fn read_config(config_file: PathBuf) -> Result { let mut file = std::fs::OpenOptions::new() .read(true) diff --git a/src/table.rs b/src/table.rs index 619c96d2..37fb2f51 100644 --- a/src/table.rs +++ b/src/table.rs @@ -36,7 +36,8 @@ pub enum TableRPC { ReadEntry(F::P, F::S), ReadEntryResponse(Option), - ReadRange(F::P, F::S, Option, usize), + // Read range: read all keys in partition P, possibly starting at a certain sort key offset + ReadRange(F::P, Option, Option, usize), Update(Vec>), @@ -62,13 +63,18 @@ pub trait Entry: fn merge(&mut self, other: &Self); } -#[derive(Clone, Serialize, Deserialize)] -pub struct EmptySortKey; -impl SortKey for EmptySortKey { +#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct EmptyKey; +impl SortKey for EmptyKey { fn sort_key(&self) -> &[u8] { &[] } } +impl PartitionKey for EmptyKey { + fn hash(&self) -> Hash { + [0u8; 32].into() + } +} impl> PartitionKey for T { fn hash(&self) -> Hash { @@ -272,15 +278,15 @@ where pub async fn get_range( self: &Arc, partition_key: &F::P, - begin_sort_key: &F::S, + begin_sort_key: Option, filter: Option, limit: usize, ) -> Result, Error> { let hash = partition_key.hash(); let who = self.replication.read_nodes(&hash, &self.system); - let rpc = - TableRPC::::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit); + let rpc = TableRPC::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); + let resps = self .rpc_client .try_call_many( @@ -378,7 +384,7 @@ where .await?; Ok(TableRPC::SyncRPC(response)) } - _ => Err(Error::RPCError(format!("Unexpected table RPC"))), + _ => Err(Error::BadRequest(format!("Unexpected table RPC"))), } } @@ -394,12 +400,15 @@ where fn handle_read_range( &self, p: &F::P, - s: &F::S, + s: &Option, filter: &Option, limit: usize, ) -> Result>, Error> { let partition_hash = p.hash(); - let first_key = self.tree_key(p, s); + let first_key = match s { + None => partition_hash.to_vec(), + Some(sk) => self.tree_key(p, sk), + }; let mut ret = vec![]; for item in self.store.range(first_key..) { let (key, value) = item?; diff --git a/src/table_sync.rs b/src/table_sync.rs index 550ad0f0..0f3e90d2 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -438,7 +438,7 @@ where .spawn(self.clone().send_items(who.clone(), items_to_send)); } } else { - return Err(Error::Message(format!( + return Err(Error::BadRequest(format!( "Unexpected response to sync RPC checksums: {}", debug_serialize(&rpc_resp) ))); diff --git a/src/version_table.rs b/src/version_table.rs index 24109981..dfd27812 100644 --- a/src/version_table.rs +++ b/src/version_table.rs @@ -30,12 +30,12 @@ pub struct VersionBlock { pub hash: Hash, } -impl Entry for Version { +impl Entry for Version { fn partition_key(&self) -> &Hash { &self.uuid } - fn sort_key(&self) -> &EmptySortKey { - &EmptySortKey + fn sort_key(&self) -> &EmptyKey { + &EmptyKey } fn merge(&mut self, other: &Self) { @@ -63,7 +63,7 @@ pub struct VersionTable { #[async_trait] impl TableSchema for VersionTable { type P = Hash; - type S = EmptySortKey; + type S = EmptyKey; type E = Version; type Filter = ();