From c9c6b0dbd41e20d19b91c6615c46da6f45925bca Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 23 Apr 2020 17:05:46 +0000 Subject: [PATCH] Reorganize code --- src/admin_rpc.rs | 17 +-- src/{ => api}/api_server.rs | 18 +-- src/{ => api}/http_util.rs | 0 src/api/mod.rs | 2 + src/config.rs | 66 +++++++++++ src/error.rs | 2 +- src/main.rs | 28 ++--- src/{ => rpc}/membership.rs | 66 +++++++---- src/rpc/mod.rs | 4 + src/{ => rpc}/rpc_client.rs | 10 +- src/{ => rpc}/rpc_server.rs | 5 +- src/{ => rpc}/tls_util.rs | 0 src/server.rs | 172 +++++++++-------------------- src/{ => store}/block.rs | 28 +++-- src/{ => store}/block_ref_table.rs | 3 +- src/{ => store}/bucket_table.rs | 0 src/store/mod.rs | 5 + src/{ => store}/object_table.rs | 7 +- src/{ => store}/version_table.rs | 7 +- src/table/mod.rs | 6 + src/{ => table}/table.rs | 10 +- src/{ => table}/table_fullcopy.rs | 2 +- src/{ => table}/table_sharded.rs | 2 +- src/{ => table}/table_sync.rs | 2 +- 24 files changed, 254 insertions(+), 208 deletions(-) rename src/{ => api}/api_server.rs (96%) rename src/{ => api}/http_util.rs (100%) create mode 100644 src/api/mod.rs create mode 100644 src/config.rs rename src/{ => rpc}/membership.rs (92%) create mode 100644 src/rpc/mod.rs rename src/{ => rpc}/rpc_client.rs (98%) rename src/{ => rpc}/rpc_server.rs (99%) rename src/{ => rpc}/tls_util.rs (100%) rename src/{ => store}/block.rs (95%) rename src/{ => store}/block_ref_table.rs (97%) rename src/{ => store}/bucket_table.rs (100%) create mode 100644 src/store/mod.rs rename src/{ => store}/object_table.rs (97%) rename src/{ => store}/version_table.rs (96%) create mode 100644 src/table/mod.rs rename src/{ => table}/table.rs (98%) rename src/{ => table}/table_fullcopy.rs (98%) rename src/{ => table}/table_sharded.rs (97%) rename src/{ => table}/table_sync.rs (99%) diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs index 458df3602..fe59f92e3 100644 --- a/src/admin_rpc.rs +++ b/src/admin_rpc.rs @@ -5,15 +5,18 @@ use tokio::sync::watch; use crate::data::*; use crate::error::Error; -use crate::rpc_client::*; -use crate::rpc_server::*; use crate::server::Garage; -use crate::table::*; -use crate::*; -use crate::block_ref_table::*; -use crate::bucket_table::*; -use crate::version_table::*; +use crate::table::*; + +use crate::rpc::rpc_client::*; +use crate::rpc::rpc_server::*; + +use crate::store::block_ref_table::*; +use crate::store::bucket_table::*; +use crate::store::version_table::*; + +use crate::*; pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30); pub const ADMIN_RPC_PATH: &str = "_admin"; diff --git a/src/api_server.rs b/src/api/api_server.rs similarity index 96% rename from src/api_server.rs rename to src/api/api_server.rs index f4bb4177a..a80b2ea25 100644 --- a/src/api_server.rs +++ b/src/api/api_server.rs @@ -11,14 +11,16 @@ use hyper::{Body, Method, Request, Response, Server, StatusCode}; use crate::data::*; use crate::error::Error; -use crate::http_util::*; +use crate::server::Garage; + use crate::table::EmptyKey; -use crate::block::INLINE_THRESHOLD; -use crate::block_ref_table::*; -use crate::object_table::*; -use crate::server::Garage; -use crate::version_table::*; +use crate::store::block::INLINE_THRESHOLD; +use crate::store::block_ref_table::*; +use crate::store::object_table::*; +use crate::store::version_table::*; + +use crate::api::http_util::*; type BodyType = Box + Send + Unpin>; @@ -26,7 +28,7 @@ pub async fn run_api_server( garage: Arc, shutdown_signal: impl Future, ) -> Result<(), Error> { - let addr = &garage.system.config.api_bind_addr; + let addr = &garage.config.api_bind_addr; let service = make_service_fn(|conn: &AddrStream| { let garage = garage.clone(); @@ -111,7 +113,7 @@ async fn handle_put( ) -> Result { let version_uuid = gen_uuid(); - let mut chunker = BodyChunker::new(body, garage.system.config.block_size); + let mut chunker = BodyChunker::new(body, garage.config.block_size); let first_block = match chunker.next().await? { Some(x) => x, None => return Err(Error::BadRequest(format!("Empty body"))), diff --git a/src/http_util.rs b/src/api/http_util.rs similarity index 100% rename from src/http_util.rs rename to src/api/http_util.rs diff --git a/src/api/mod.rs b/src/api/mod.rs new file mode 100644 index 000000000..8e62d1e79 --- /dev/null +++ b/src/api/mod.rs @@ -0,0 +1,2 @@ +pub mod api_server; +pub mod http_util; diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 000000000..7a6ae3f29 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,66 @@ +use std::io::Read; +use std::net::SocketAddr; +use std::path::PathBuf; + +use serde::Deserialize; + +use crate::error::Error; + +#[derive(Deserialize, Debug, Clone)] +pub struct Config { + pub metadata_dir: PathBuf, + pub data_dir: PathBuf, + + pub api_bind_addr: SocketAddr, + pub rpc_bind_addr: SocketAddr, + + pub bootstrap_peers: Vec, + + #[serde(default = "default_max_concurrent_rpc_requests")] + pub max_concurrent_rpc_requests: usize, + + #[serde(default = "default_block_size")] + pub block_size: usize, + + #[serde(default = "default_replication_factor")] + pub meta_replication_factor: usize, + + #[serde(default = "default_epidemic_factor")] + pub meta_epidemic_factor: usize, + + #[serde(default = "default_replication_factor")] + pub data_replication_factor: usize, + + pub rpc_tls: Option, +} + +fn default_max_concurrent_rpc_requests() -> usize { + 12 +} +fn default_block_size() -> usize { + 1048576 +} +fn default_replication_factor() -> usize { + 3 +} +fn default_epidemic_factor() -> usize { + 3 +} + +#[derive(Deserialize, Debug, Clone)] +pub struct TlsConfig { + pub ca_cert: String, + pub node_cert: String, + pub node_key: String, +} + +pub fn read_config(config_file: PathBuf) -> Result { + let mut file = std::fs::OpenOptions::new() + .read(true) + .open(config_file.as_path())?; + + let mut config = String::new(); + file.read_to_string(&mut config)?; + + Ok(toml::from_str(&config)?) +} diff --git a/src/error.rs b/src/error.rs index e217f9ae0..6290dc243 100644 --- a/src/error.rs +++ b/src/error.rs @@ -3,7 +3,7 @@ use hyper::StatusCode; use std::io; use crate::data::Hash; -use crate::rpc_client::RPCError; +use crate::rpc::rpc_client::RPCError; #[derive(Debug, Error)] pub enum Error { diff --git a/src/main.rs b/src/main.rs index 0b41805be..c693b12c6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,29 +3,18 @@ #[macro_use] extern crate log; +mod background; +mod config; mod data; mod error; -mod background; -mod membership; +mod api; +mod rpc; +mod store; mod table; -mod table_fullcopy; -mod table_sharded; -mod table_sync; - -mod block; -mod block_ref_table; -mod bucket_table; -mod object_table; -mod version_table; mod admin_rpc; -mod api_server; -mod http_util; -mod rpc_client; -mod rpc_server; mod server; -mod tls_util; use std::collections::HashSet; use std::net::SocketAddr; @@ -36,11 +25,12 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use structopt::StructOpt; +use config::TlsConfig; use data::*; use error::Error; -use membership::*; -use rpc_client::*; -use server::TlsConfig; + +use rpc::membership::*; +use rpc::rpc_client::*; use admin_rpc::*; diff --git a/src/membership.rs b/src/rpc/membership.rs similarity index 92% rename from src/membership.rs rename to src/rpc/membership.rs index 87b065a7a..e05095360 100644 --- a/src/membership.rs +++ b/src/rpc/membership.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::hash::Hash as StdHash; use std::hash::Hasher; -use std::io::Read; +use std::io::{Read, Write}; use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -20,9 +20,9 @@ use tokio::sync::Mutex; use crate::background::BackgroundRunner; use crate::data::*; use crate::error::Error; -use crate::rpc_client::*; -use crate::rpc_server::*; -use crate::server::Config; + +use crate::rpc::rpc_client::*; +use crate::rpc::rpc_server::*; const PING_INTERVAL: Duration = Duration::from_secs(10); const PING_TIMEOUT: Duration = Duration::from_secs(2); @@ -78,8 +78,9 @@ pub struct NetworkConfigEntry { } pub struct System { - pub config: Config, pub id: UUID, + pub data_dir: PathBuf, + pub rpc_local_port: u16, pub state_info: StateInfo, @@ -251,6 +252,29 @@ impl Ring { } } +fn gen_node_id(metadata_dir: &PathBuf) -> Result { + let mut id_file = metadata_dir.clone(); + id_file.push("node_id"); + if id_file.as_path().exists() { + let mut f = std::fs::File::open(id_file.as_path())?; + let mut d = vec![]; + f.read_to_end(&mut d)?; + if d.len() != 32 { + return Err(Error::Message(format!("Corrupt node_id file"))); + } + + let mut id = [0u8; 32]; + id.copy_from_slice(&d[..]); + Ok(id.into()) + } else { + let id = gen_uuid(); + + let mut f = std::fs::File::create(id_file.as_path())?; + f.write_all(id.as_slice())?; + Ok(id) + } +} + fn read_network_config(metadata_dir: &PathBuf) -> Result { let mut path = metadata_dir.clone(); path.push("network_config"); @@ -270,12 +294,15 @@ fn read_network_config(metadata_dir: &PathBuf) -> Result { impl System { pub fn new( - config: Config, - id: UUID, + data_dir: PathBuf, + rpc_http_client: Arc, background: Arc, rpc_server: &mut RpcServer, ) -> Arc { - let net_config = match read_network_config(&config.metadata_dir) { + let id = gen_node_id(&data_dir).expect("Unable to read or generate node ID"); + info!("Node ID: {}", hex::encode(&id)); + + let net_config = match read_network_config(&data_dir) { Ok(x) => x, Err(e) => { info!( @@ -309,11 +336,6 @@ impl System { ring.rebuild_ring(); let (update_ring, ring) = watch::channel(Arc::new(ring)); - let rpc_http_client = Arc::new( - RpcHttpClient::new(config.max_concurrent_rpc_requests, &config.rpc_tls) - .expect("Could not create RPC client"), - ); - let rpc_path = MEMBERSHIP_RPC_PATH.to_string(); let rpc_client = RpcClient::new( RpcAddrClient::::new(rpc_http_client.clone(), rpc_path.clone()), @@ -322,8 +344,9 @@ impl System { ); let sys = Arc::new(System { - config, id, + data_dir, + rpc_local_port: rpc_server.bind_addr.port(), state_info, rpc_http_client, rpc_client, @@ -363,7 +386,7 @@ impl System { } async fn save_network_config(self: Arc) -> Result<(), Error> { - let mut path = self.config.metadata_dir.clone(); + let mut path = self.data_dir.clone(); path.push("network_config"); let ring = self.ring.borrow().clone(); @@ -379,7 +402,7 @@ impl System { let ring = self.ring.borrow().clone(); Message::Ping(PingMessage { id: self.id, - rpc_port: self.config.rpc_bind_addr.port(), + rpc_port: self.rpc_local_port, status_hash: status.hash, config_version: ring.config.version, state_info: self.state_info.clone(), @@ -397,13 +420,8 @@ impl System { self.rpc_client.call_many(&to[..], msg, timeout).await; } - pub async fn bootstrap(self: Arc) { - let bootstrap_peers = self - .config - .bootstrap_peers - .iter() - .map(|ip| (*ip, None)) - .collect::>(); + pub async fn bootstrap(self: Arc, peers: &[SocketAddr]) { + let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::>(); self.clone().ping_nodes(bootstrap_peers).await; self.clone() @@ -557,7 +575,7 @@ impl System { for node in adv.iter() { if node.id == self.id { // learn our own ip address - let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_bind_addr.port()); + let self_addr = SocketAddr::new(node.addr.ip(), self.rpc_local_port); let old_self = status.nodes.insert( node.id, Arc::new(StatusEntry { diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs new file mode 100644 index 000000000..83fd0aac4 --- /dev/null +++ b/src/rpc/mod.rs @@ -0,0 +1,4 @@ +pub mod membership; +pub mod rpc_client; +pub mod rpc_server; +pub mod tls_util; diff --git a/src/rpc_client.rs b/src/rpc/rpc_client.rs similarity index 98% rename from src/rpc_client.rs rename to src/rpc/rpc_client.rs index ba036c600..027a3cde1 100644 --- a/src/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -20,10 +20,12 @@ use tokio::sync::{watch, Semaphore}; use crate::background::BackgroundRunner; use crate::data::*; use crate::error::Error; -use crate::membership::Status; -use crate::rpc_server::RpcMessage; -use crate::server::TlsConfig; -use crate::tls_util; + +use crate::rpc::membership::Status; +use crate::rpc::rpc_server::RpcMessage; +use crate::rpc::tls_util; + +use crate::config::TlsConfig; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); diff --git a/src/rpc_server.rs b/src/rpc/rpc_server.rs similarity index 99% rename from src/rpc_server.rs rename to src/rpc/rpc_server.rs index bcf7496f1..4ee539091 100644 --- a/src/rpc_server.rs +++ b/src/rpc/rpc_server.rs @@ -16,10 +16,11 @@ use tokio::net::{TcpListener, TcpStream}; use tokio_rustls::server::TlsStream; use tokio_rustls::TlsAcceptor; +use crate::config::TlsConfig; use crate::data::*; use crate::error::Error; -use crate::server::TlsConfig; -use crate::tls_util; + +use crate::rpc::tls_util; pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {} diff --git a/src/tls_util.rs b/src/rpc/tls_util.rs similarity index 100% rename from src/tls_util.rs rename to src/rpc/tls_util.rs diff --git a/src/server.rs b/src/server.rs index 3ea291059..de04615f6 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,79 +1,34 @@ -use std::io::{Read, Write}; -use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use futures_util::future::*; -use serde::Deserialize; use tokio::sync::watch; use crate::background::*; -use crate::data::*; +use crate::config::*; use crate::error::Error; -use crate::membership::System; -use crate::rpc_server::RpcServer; -use crate::table::*; -use crate::table_fullcopy::*; -use crate::table_sharded::*; -use crate::block::*; -use crate::block_ref_table::*; -use crate::bucket_table::*; -use crate::object_table::*; -use crate::version_table::*; +use crate::rpc::membership::System; +use crate::rpc::rpc_client::RpcHttpClient; +use crate::rpc::rpc_server::RpcServer; + +use crate::table::table_fullcopy::*; +use crate::table::table_sharded::*; +use crate::table::*; + +use crate::store::block::*; +use crate::store::block_ref_table::*; +use crate::store::bucket_table::*; +use crate::store::object_table::*; +use crate::store::version_table::*; + +use crate::api::api_server; use crate::admin_rpc::*; -use crate::api_server; - -#[derive(Deserialize, Debug, Clone)] -pub struct Config { - pub metadata_dir: PathBuf, - pub data_dir: PathBuf, - - pub api_bind_addr: SocketAddr, - pub rpc_bind_addr: SocketAddr, - - pub bootstrap_peers: Vec, - - #[serde(default = "default_max_concurrent_rpc_requests")] - pub max_concurrent_rpc_requests: usize, - - #[serde(default = "default_block_size")] - pub block_size: usize, - - #[serde(default = "default_replication_factor")] - pub meta_replication_factor: usize, - - #[serde(default = "default_epidemic_factor")] - pub meta_epidemic_factor: usize, - - #[serde(default = "default_replication_factor")] - pub data_replication_factor: usize, - - pub rpc_tls: Option, -} - -fn default_max_concurrent_rpc_requests() -> usize { - 12 -} -fn default_block_size() -> usize { - 1048576 -} -fn default_replication_factor() -> usize { - 3 -} -fn default_epidemic_factor() -> usize { - 3 -} - -#[derive(Deserialize, Debug, Clone)] -pub struct TlsConfig { - pub ca_cert: String, - pub node_cert: String, - pub node_key: String, -} pub struct Garage { + pub config: Config, + pub db: sled::Db, pub background: Arc, pub system: Arc, @@ -88,33 +43,46 @@ pub struct Garage { impl Garage { pub async fn new( config: Config, - id: UUID, db: sled::Db, background: Arc, rpc_server: &mut RpcServer, ) -> Arc { info!("Initialize membership management system..."); - let system = System::new(config.clone(), id, background.clone(), rpc_server); - - info!("Initialize block manager..."); - let block_manager = - BlockManager::new(&db, config.data_dir.clone(), system.clone(), rpc_server); + let rpc_http_client = Arc::new( + RpcHttpClient::new(config.max_concurrent_rpc_requests, &config.rpc_tls) + .expect("Could not create RPC client"), + ); + let system = System::new( + config.metadata_dir.clone(), + rpc_http_client, + background.clone(), + rpc_server, + ); let data_rep_param = TableShardedReplication { - replication_factor: system.config.data_replication_factor, - write_quorum: (system.config.data_replication_factor + 1) / 2, + replication_factor: config.data_replication_factor, + write_quorum: (config.data_replication_factor + 1) / 2, read_quorum: 1, }; let meta_rep_param = TableShardedReplication { - replication_factor: system.config.meta_replication_factor, - write_quorum: (system.config.meta_replication_factor + 1) / 2, - read_quorum: (system.config.meta_replication_factor + 1) / 2, + replication_factor: config.meta_replication_factor, + write_quorum: (config.meta_replication_factor + 1) / 2, + read_quorum: (config.meta_replication_factor + 1) / 2, }; let control_rep_param = TableFullReplication::new( - system.config.meta_epidemic_factor, - (system.config.meta_epidemic_factor + 1) / 2, + config.meta_epidemic_factor, + (config.meta_epidemic_factor + 1) / 2, + ); + + info!("Initialize block manager..."); + let block_manager = BlockManager::new( + &db, + config.data_dir.clone(), + data_rep_param.clone(), + system.clone(), + rpc_server, ); info!("Initialize block_ref_table..."); @@ -172,6 +140,7 @@ impl Garage { info!("Initialize Garage..."); let garage = Arc::new(Self { + config, db, system: system.clone(), block_manager, @@ -193,40 +162,6 @@ impl Garage { } } -fn read_config(config_file: PathBuf) -> Result { - let mut file = std::fs::OpenOptions::new() - .read(true) - .open(config_file.as_path())?; - - let mut config = String::new(); - file.read_to_string(&mut config)?; - - Ok(toml::from_str(&config)?) -} - -fn gen_node_id(metadata_dir: &PathBuf) -> Result { - let mut id_file = metadata_dir.clone(); - id_file.push("node_id"); - if id_file.as_path().exists() { - let mut f = std::fs::File::open(id_file.as_path())?; - let mut d = vec![]; - f.read_to_end(&mut d)?; - if d.len() != 32 { - return Err(Error::Message(format!("Corrupt node_id file"))); - } - - let mut id = [0u8; 32]; - id.copy_from_slice(&d[..]); - Ok(id.into()) - } else { - let id = gen_uuid(); - - let mut f = std::fs::File::create(id_file.as_path())?; - f.write_all(id.as_slice())?; - Ok(id) - } -} - async fn shutdown_signal(send_cancel: watch::Sender) -> Result<(), Error> { // Wait for the CTRL+C signal tokio::signal::ctrl_c() @@ -249,9 +184,6 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Loading configuration..."); let config = read_config(config_file).expect("Unable to read config file"); - let id = gen_node_id(&config.metadata_dir).expect("Unable to read or generate node ID"); - info!("Node ID: {}", hex::encode(&id)); - info!("Opening database..."); let mut db_path = config.metadata_dir.clone(); db_path.push("db"); @@ -264,17 +196,21 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { let (send_cancel, watch_cancel) = watch::channel(false); let background = BackgroundRunner::new(16, watch_cancel.clone()); - let garage = Garage::new(config, id, db, background.clone(), &mut rpc_server).await; + let garage = Garage::new(config, db, background.clone(), &mut rpc_server).await; info!("Initializing RPC and API servers..."); let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone())); let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); futures::try_join!( - garage.system.clone().bootstrap().map(|rv| { - info!("Bootstrap done"); - Ok(rv) - }), + garage + .system + .clone() + .bootstrap(&garage.config.bootstrap_peers[..]) + .map(|rv| { + info!("Bootstrap done"); + Ok(rv) + }), run_rpc_server.map(|rv| { info!("RPC server exited"); rv diff --git a/src/block.rs b/src/store/block.rs similarity index 95% rename from src/block.rs rename to src/store/block.rs index 23222a7f5..e2ef32e0e 100644 --- a/src/block.rs +++ b/src/store/block.rs @@ -14,11 +14,16 @@ use tokio::sync::{watch, Mutex, Notify}; use crate::data; use crate::data::*; use crate::error::Error; -use crate::membership::System; -use crate::rpc_client::*; -use crate::rpc_server::*; -use crate::block_ref_table::*; +use crate::rpc::membership::System; +use crate::rpc::rpc_client::*; +use crate::rpc::rpc_server::*; + +use crate::table::table_sharded::TableShardedReplication; +use crate::table::TableReplication; + +use crate::store::block_ref_table::*; + use crate::server::Garage; pub const INLINE_THRESHOLD: usize = 3072; @@ -47,6 +52,7 @@ pub struct PutBlockMessage { impl RpcMessage for Message {} pub struct BlockManager { + pub replication: TableShardedReplication, pub data_dir: PathBuf, pub data_dir_lock: Mutex<()>, @@ -64,6 +70,7 @@ impl BlockManager { pub fn new( db: &sled::Db, data_dir: PathBuf, + replication: TableShardedReplication, system: Arc, rpc_server: &mut RpcServer, ) -> Arc { @@ -80,6 +87,7 @@ impl BlockManager { let rpc_client = system.rpc_client::(rpc_path); let block_manager = Arc::new(Self { + replication, data_dir, data_dir_lock: Mutex::new(()), rc, @@ -302,8 +310,8 @@ impl BlockManager { .await?; let needed_by_others = !active_refs.is_empty(); if needed_by_others { - let ring = garage.system.ring.borrow().clone(); - let who = ring.walk_ring(&hash, garage.system.config.data_replication_factor); + let ring = self.system.ring.borrow().clone(); + let who = self.replication.replication_nodes(&hash, &ring); let msg = Arc::new(Message::NeedBlockQuery(*hash)); let who_needs_fut = who.iter().map(|to| { self.rpc_client @@ -361,8 +369,7 @@ impl BlockManager { } pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { - let ring = self.system.ring.borrow().clone(); - let who = ring.walk_ring(&hash, self.system.config.data_replication_factor); + let who = self.replication.read_nodes(&hash, &self.system); let resps = self .rpc_client .try_call_many( @@ -386,13 +393,12 @@ impl BlockManager { } pub async fn rpc_put_block(&self, hash: Hash, data: Vec) -> Result<(), Error> { - let ring = self.system.ring.borrow().clone(); - let who = ring.walk_ring(&hash, self.system.config.data_replication_factor); + let who = self.replication.write_nodes(&hash, &self.system); self.rpc_client .try_call_many( &who[..], Message::PutBlock(PutBlockMessage { hash, data }), - RequestStrategy::with_quorum((self.system.config.data_replication_factor + 1) / 2) + RequestStrategy::with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), ) .await?; diff --git a/src/block_ref_table.rs b/src/store/block_ref_table.rs similarity index 97% rename from src/block_ref_table.rs rename to src/store/block_ref_table.rs index 6a256aa32..c8a2a2a10 100644 --- a/src/block_ref_table.rs +++ b/src/store/block_ref_table.rs @@ -5,9 +5,10 @@ use std::sync::Arc; use crate::background::*; use crate::data::*; use crate::error::Error; + use crate::table::*; -use crate::block::*; +use crate::store::block::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct BlockRef { diff --git a/src/bucket_table.rs b/src/store/bucket_table.rs similarity index 100% rename from src/bucket_table.rs rename to src/store/bucket_table.rs diff --git a/src/store/mod.rs b/src/store/mod.rs new file mode 100644 index 000000000..afadc9bba --- /dev/null +++ b/src/store/mod.rs @@ -0,0 +1,5 @@ +pub mod block; +pub mod block_ref_table; +pub mod bucket_table; +pub mod object_table; +pub mod version_table; diff --git a/src/object_table.rs b/src/store/object_table.rs similarity index 97% rename from src/object_table.rs rename to src/store/object_table.rs index edad4925d..97de0cdb5 100644 --- a/src/object_table.rs +++ b/src/store/object_table.rs @@ -5,10 +5,11 @@ use std::sync::Arc; use crate::background::BackgroundRunner; use crate::data::*; use crate::error::Error; -use crate::table::*; -use crate::table_sharded::*; -use crate::version_table::*; +use crate::table::table_sharded::*; +use crate::table::*; + +use crate::store::version_table::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { diff --git a/src/version_table.rs b/src/store/version_table.rs similarity index 96% rename from src/version_table.rs rename to src/store/version_table.rs index 74174dce9..d25a56ca0 100644 --- a/src/version_table.rs +++ b/src/store/version_table.rs @@ -5,10 +5,11 @@ use std::sync::Arc; use crate::background::BackgroundRunner; use crate::data::*; use crate::error::Error; -use crate::table::*; -use crate::table_sharded::*; -use crate::block_ref_table::*; +use crate::table::table_sharded::*; +use crate::table::*; + +use crate::store::block_ref_table::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Version { diff --git a/src/table/mod.rs b/src/table/mod.rs new file mode 100644 index 000000000..e03b8d0b0 --- /dev/null +++ b/src/table/mod.rs @@ -0,0 +1,6 @@ +pub mod table; +pub mod table_fullcopy; +pub mod table_sharded; +pub mod table_sync; + +pub use table::*; diff --git a/src/table.rs b/src/table/table.rs similarity index 98% rename from src/table.rs rename to src/table/table.rs index a3d02d0c1..50e8739a1 100644 --- a/src/table.rs +++ b/src/table/table.rs @@ -10,10 +10,12 @@ use serde_bytes::ByteBuf; use crate::data::*; use crate::error::Error; -use crate::membership::{Ring, System}; -use crate::rpc_client::*; -use crate::rpc_server::*; -use crate::table_sync::*; + +use crate::rpc::membership::{Ring, System}; +use crate::rpc::rpc_client::*; +use crate::rpc::rpc_server::*; + +use crate::table::table_sync::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); diff --git a/src/table_fullcopy.rs b/src/table/table_fullcopy.rs similarity index 98% rename from src/table_fullcopy.rs rename to src/table/table_fullcopy.rs index 2fcf56db4..2cd2e464b 100644 --- a/src/table_fullcopy.rs +++ b/src/table/table_fullcopy.rs @@ -2,7 +2,7 @@ use arc_swap::ArcSwapOption; use std::sync::Arc; use crate::data::*; -use crate::membership::{Ring, System}; +use crate::rpc::membership::{Ring, System}; use crate::table::*; #[derive(Clone)] diff --git a/src/table_sharded.rs b/src/table/table_sharded.rs similarity index 97% rename from src/table_sharded.rs rename to src/table/table_sharded.rs index c17ea0d4b..5190f5d40 100644 --- a/src/table_sharded.rs +++ b/src/table/table_sharded.rs @@ -1,5 +1,5 @@ use crate::data::*; -use crate::membership::{Ring, System}; +use crate::rpc::membership::{Ring, System}; use crate::table::*; #[derive(Clone)] diff --git a/src/table_sync.rs b/src/table/table_sync.rs similarity index 99% rename from src/table_sync.rs rename to src/table/table_sync.rs index 60d5c4df5..8f6582a7a 100644 --- a/src/table_sync.rs +++ b/src/table/table_sync.rs @@ -14,7 +14,7 @@ use tokio::sync::{mpsc, watch}; use crate::data::*; use crate::error::Error; -use crate::membership::Ring; +use crate::rpc::membership::Ring; use crate::table::*; const MAX_DEPTH: usize = 16;