WIP add content defined chunking #42

Closed
trinity-1686a wants to merge 42 commits from content-defined-chunking into master
3 changed files with 43 additions and 33 deletions
Showing only changes of commit f4aad8fe6e - Show all commits

View file

@ -1,14 +1,14 @@
use std::sync::Arc;
use std::fmt::Write;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use garage_util::error::Error; use garage_util::error::Error;
use garage_table::crdt::CRDT; use garage_table::crdt::CRDT;
use garage_table::*;
use garage_table::replication::*; use garage_table::replication::*;
use garage_table::*;
use garage_rpc::rpc_client::*; use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*; use garage_rpc::rpc_server::*;
@ -17,8 +17,8 @@ use garage_model::bucket_table::*;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::key_table::*; use garage_model::key_table::*;
use crate::repair::Repair;
use crate::cli::*; use crate::cli::*;
use crate::repair::Repair;
use crate::*; use crate::*;
pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30); pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30);
@ -366,7 +366,6 @@ impl AdminRpcHandler {
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRPC, Error> { async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRPC, Error> {
if opt.all_nodes { if opt.all_nodes {
let mut ret = String::new(); let mut ret = String::new();
let ring = self.garage.system.ring.borrow().clone(); let ring = self.garage.system.ring.borrow().clone();
@ -378,11 +377,7 @@ impl AdminRpcHandler {
writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
match self match self
.rpc_client .rpc_client
.call( .call(*node, AdminRPC::Stats(opt), ADMIN_RPC_TIMEOUT)
*node,
AdminRPC::Stats(opt),
ADMIN_RPC_TIMEOUT,
)
.await .await
{ {
Ok(AdminRPC::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(), Ok(AdminRPC::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(),
@ -398,7 +393,12 @@ impl AdminRpcHandler {
fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> { fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> {
let mut ret = String::new(); let mut ret = String::new();
writeln!(&mut ret, "\nGarage version: {}", git_version::git_version!()).unwrap(); writeln!(
&mut ret,
"\nGarage version: {}",
git_version::git_version!()
)
.unwrap();
// Gather ring statistics // Gather ring statistics
let ring = self.garage.system.ring.borrow().clone(); let ring = self.garage.system.ring.borrow().clone();
@ -423,7 +423,12 @@ impl AdminRpcHandler {
self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt)?; self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt)?;
writeln!(&mut ret, "\nBlock manager stats:").unwrap(); writeln!(&mut ret, "\nBlock manager stats:").unwrap();
writeln!(&mut ret, " resync queue length: {}", self.garage.block_manager.resync_queue.len()).unwrap(); writeln!(
&mut ret,
" resync queue length: {}",
self.garage.block_manager.resync_queue.len()
)
.unwrap();
if opt.detailed { if opt.detailed {
writeln!(&mut ret, "\nDetailed stats not implemented yet.").unwrap(); writeln!(&mut ret, "\nDetailed stats not implemented yet.").unwrap();
@ -432,10 +437,20 @@ impl AdminRpcHandler {
Ok(ret) Ok(ret)
} }
fn gather_table_stats<F: TableSchema, R: TableReplication>(&self, to: &mut String, t: &Arc<Table<F, R>>, _opt: &StatsOpt) -> Result<(), Error> { fn gather_table_stats<F: TableSchema, R: TableReplication>(
&self,
to: &mut String,
t: &Arc<Table<F, R>>,
_opt: &StatsOpt,
) -> Result<(), Error> {
writeln!(to, "\nTable stats for {}", t.data.name).unwrap(); writeln!(to, "\nTable stats for {}", t.data.name).unwrap();
writeln!(to, " number of items: {}", t.data.store.len()).unwrap(); writeln!(to, " number of items: {}", t.data.store.len()).unwrap();
writeln!(to, " Merkle updater todo queue length: {}", t.data.merkle_updater.todo.len()).unwrap(); writeln!(
to,
" Merkle updater todo queue length: {}",
t.data.merkle_updater.todo.len()
)
.unwrap();
Ok(()) Ok(())
} }
} }

View file

@ -17,7 +17,6 @@ use garage_model::key_table::*;
use crate::admin_rpc::*; use crate::admin_rpc::*;
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
pub enum Command { pub enum Command {
/// Run Garage server /// Run Garage server
@ -270,12 +269,12 @@ pub struct StatsOpt {
pub detailed: bool, pub detailed: bool,
} }
pub async fn cli_cmd( pub async fn cli_cmd(
cmd: Command, cmd: Command,
membership_rpc_cli: RpcAddrClient<Message>, membership_rpc_cli: RpcAddrClient<Message>,
admin_rpc_cli: RpcAddrClient<AdminRPC>, admin_rpc_cli: RpcAddrClient<AdminRPC>,
rpc_host: SocketAddr) -> Result<(), Error> { rpc_host: SocketAddr,
) -> Result<(), Error> {
match cmd { match cmd {
Command::Status => cmd_status(membership_rpc_cli, rpc_host).await, Command::Status => cmd_status(membership_rpc_cli, rpc_host).await,
Command::Node(NodeOperation::Configure(configure_opt)) => { Command::Node(NodeOperation::Configure(configure_opt)) => {
@ -287,21 +286,17 @@ pub async fn cli_cmd(
Command::Bucket(bo) => { Command::Bucket(bo) => {
cmd_admin(admin_rpc_cli, rpc_host, AdminRPC::BucketOperation(bo)).await cmd_admin(admin_rpc_cli, rpc_host, AdminRPC::BucketOperation(bo)).await
} }
Command::Key(ko) => { Command::Key(ko) => cmd_admin(admin_rpc_cli, rpc_host, AdminRPC::KeyOperation(ko)).await,
cmd_admin(admin_rpc_cli, rpc_host, AdminRPC::KeyOperation(ko)).await Command::Repair(ro) => cmd_admin(admin_rpc_cli, rpc_host, AdminRPC::LaunchRepair(ro)).await,
} Command::Stats(so) => cmd_admin(admin_rpc_cli, rpc_host, AdminRPC::Stats(so)).await,
Command::Repair(ro) => {
cmd_admin(admin_rpc_cli, rpc_host, AdminRPC::LaunchRepair(ro)).await
}
Command::Stats(so) => {
cmd_admin(admin_rpc_cli, rpc_host, AdminRPC::Stats(so)).await
}
_ => unreachable!(), _ => unreachable!(),
} }
} }
pub async fn cmd_status(
pub async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Result<(), Error> { rpc_cli: RpcAddrClient<Message>,
rpc_host: SocketAddr,
) -> Result<(), Error> {
let status = match rpc_cli let status = match rpc_cli
.call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
.await?? .await??

View file

@ -4,24 +4,24 @@
extern crate log; extern crate log;
mod admin_rpc; mod admin_rpc;
mod cli;
mod repair; mod repair;
mod server; mod server;
mod cli;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Duration;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use structopt::StructOpt; use structopt::StructOpt;
use garage_util::error::Error;
use garage_util::config::TlsConfig; use garage_util::config::TlsConfig;
use garage_util::error::Error;
use garage_rpc::rpc_client::*;
use garage_rpc::membership::*; use garage_rpc::membership::*;
use garage_rpc::rpc_client::*;
use cli::*;
use admin_rpc::*; use admin_rpc::*;
use cli::*;
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
#[structopt(name = "garage")] #[structopt(name = "garage")]