Many improvements on ring/replication and its configuration:
- Explicit "replication_mode" configuration parameters that takes either "none", "2" or "3" as values, instead of letting user configure replication factor themselves. These are presets whose corresponding replication/quorum values can be found in replication/mode.rs - Explicit support for single-node and two-node deployments (number of nodes must be at least "replication_mode", with "none" we can have only one node) - Ring is now stored much more compactly with 256*8 + n*32 bytes, instead of 256*32 bytes - Support for gateway-only nodes that do not store data (these nodes still need a metadata_directory to store the list of bucket and keys since those are stored on all nodes; it also technically needs a data_directory to start but it will stay empty unless we have bugs)
This commit is contained in:
parent
c8aa1eb481
commit
b490ebc7f6
10 changed files with 252 additions and 109 deletions
|
@ -31,9 +31,7 @@ bootstrap_peers = [
|
||||||
"127.0.0.1:3903"
|
"127.0.0.1:3903"
|
||||||
]
|
]
|
||||||
max_concurrent_rpc_requests = 12
|
max_concurrent_rpc_requests = 12
|
||||||
data_replication_factor = 3
|
replication_mode = "3"
|
||||||
meta_replication_factor = 3
|
|
||||||
meta_epidemic_fanout = 3
|
|
||||||
|
|
||||||
[s3_api]
|
[s3_api]
|
||||||
api_bind_addr = "0.0.0.0:$((3910+$count))" # the S3 API port, HTTP without TLS. Add a reverse proxy for the TLS part.
|
api_bind_addr = "0.0.0.0:$((3910+$count))" # the S3 API port, HTTP without TLS. Add a reverse proxy for the TLS part.
|
||||||
|
|
|
@ -430,8 +430,8 @@ impl AdminRpcHandler {
|
||||||
// Gather ring statistics
|
// Gather ring statistics
|
||||||
let ring = self.garage.system.ring.borrow().clone();
|
let ring = self.garage.system.ring.borrow().clone();
|
||||||
let mut ring_nodes = HashMap::new();
|
let mut ring_nodes = HashMap::new();
|
||||||
for r in ring.ring.iter() {
|
for (_i, loc) in ring.partitions().iter() {
|
||||||
for n in r.nodes.iter() {
|
for n in ring.get_nodes(loc, ring.replication_factor).iter() {
|
||||||
if !ring_nodes.contains_key(n) {
|
if !ring_nodes.contains_key(n) {
|
||||||
ring_nodes.insert(*n, 0usize);
|
ring_nodes.insert(*n, 0usize);
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,6 +80,10 @@ pub struct ConfigureNodeOpt {
|
||||||
#[structopt(short = "c", long = "capacity")]
|
#[structopt(short = "c", long = "capacity")]
|
||||||
capacity: Option<u32>,
|
capacity: Option<u32>,
|
||||||
|
|
||||||
|
/// Gateway-only node
|
||||||
|
#[structopt(short = "g", long = "gateway")]
|
||||||
|
gateway: bool,
|
||||||
|
|
||||||
/// Optional node tag
|
/// Optional node tag
|
||||||
#[structopt(short = "t", long = "tag")]
|
#[structopt(short = "t", long = "tag")]
|
||||||
tag: Option<String>,
|
tag: Option<String>,
|
||||||
|
@ -339,7 +343,12 @@ pub async fn cmd_status(
|
||||||
if let Some(cfg) = config.members.get(&adv.id) {
|
if let Some(cfg) = config.members.get(&adv.id) {
|
||||||
println!(
|
println!(
|
||||||
"{:?}\t{}\t{}\t[{}]\t{}\t{}",
|
"{:?}\t{}\t{}\t[{}]\t{}\t{}",
|
||||||
adv.id, adv.state_info.hostname, adv.addr, cfg.tag, cfg.datacenter, cfg.capacity
|
adv.id,
|
||||||
|
adv.state_info.hostname,
|
||||||
|
adv.addr,
|
||||||
|
cfg.tag,
|
||||||
|
cfg.datacenter,
|
||||||
|
cfg.capacity_string()
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
println!(
|
println!(
|
||||||
|
@ -366,7 +375,7 @@ pub async fn cmd_status(
|
||||||
adv.addr,
|
adv.addr,
|
||||||
cfg.tag,
|
cfg.tag,
|
||||||
cfg.datacenter,
|
cfg.datacenter,
|
||||||
cfg.capacity,
|
cfg.capacity_string(),
|
||||||
(now_msec() - adv.last_seen) / 1000,
|
(now_msec() - adv.last_seen) / 1000,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -375,7 +384,10 @@ pub async fn cmd_status(
|
||||||
if !status.iter().any(|x| x.id == *id) {
|
if !status.iter().any(|x| x.id == *id) {
|
||||||
println!(
|
println!(
|
||||||
"{:?}\t{}\t{}\t{}\tnever seen",
|
"{:?}\t{}\t{}\t{}\tnever seen",
|
||||||
id, cfg.tag, cfg.datacenter, cfg.capacity
|
id,
|
||||||
|
cfg.tag,
|
||||||
|
cfg.datacenter,
|
||||||
|
cfg.capacity_string(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -438,23 +450,44 @@ pub async fn cmd_configure(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if args.capacity.is_some() && args.gateway {
|
||||||
|
return Err(Error::Message(
|
||||||
|
"-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into()));
|
||||||
|
}
|
||||||
|
if args.capacity == Some(0) {
|
||||||
|
return Err(Error::Message("Invalid capacity value: 0".into()));
|
||||||
|
}
|
||||||
|
|
||||||
let new_entry = match config.members.get(&added_node) {
|
let new_entry = match config.members.get(&added_node) {
|
||||||
None => NetworkConfigEntry {
|
None => {
|
||||||
|
let capacity = match args.capacity {
|
||||||
|
Some(c) => Some(c),
|
||||||
|
None if args.gateway => None,
|
||||||
|
_ => return Err(Error::Message(
|
||||||
|
"Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())),
|
||||||
|
};
|
||||||
|
NetworkConfigEntry {
|
||||||
datacenter: args
|
datacenter: args
|
||||||
.datacenter
|
.datacenter
|
||||||
.expect("Please specifiy a datacenter with the -d flag"),
|
.expect("Please specifiy a datacenter with the -d flag"),
|
||||||
capacity: args
|
capacity,
|
||||||
.capacity
|
|
||||||
.expect("Please specifiy a capacity with the -c flag"),
|
|
||||||
tag: args.tag.unwrap_or_default(),
|
tag: args.tag.unwrap_or_default(),
|
||||||
},
|
}
|
||||||
Some(old) => NetworkConfigEntry {
|
}
|
||||||
|
Some(old) => {
|
||||||
|
let capacity = match args.capacity {
|
||||||
|
Some(c) => Some(c),
|
||||||
|
None if args.gateway => None,
|
||||||
|
_ => old.capacity,
|
||||||
|
};
|
||||||
|
NetworkConfigEntry {
|
||||||
datacenter: args
|
datacenter: args
|
||||||
.datacenter
|
.datacenter
|
||||||
.unwrap_or_else(|| old.datacenter.to_string()),
|
.unwrap_or_else(|| old.datacenter.to_string()),
|
||||||
capacity: args.capacity.unwrap_or(old.capacity),
|
capacity,
|
||||||
tag: args.tag.unwrap_or_else(|| old.tag.to_string()),
|
tag: args.tag.unwrap_or_else(|| old.tag.to_string()),
|
||||||
},
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
config.members.insert(added_node, new_entry);
|
config.members.insert(added_node, new_entry);
|
||||||
|
|
|
@ -7,6 +7,7 @@ use garage_rpc::membership::System;
|
||||||
use garage_rpc::rpc_client::RpcHttpClient;
|
use garage_rpc::rpc_client::RpcHttpClient;
|
||||||
use garage_rpc::rpc_server::RpcServer;
|
use garage_rpc::rpc_server::RpcServer;
|
||||||
|
|
||||||
|
use garage_table::replication::ReplicationMode;
|
||||||
use garage_table::replication::TableFullReplication;
|
use garage_table::replication::TableFullReplication;
|
||||||
use garage_table::replication::TableShardedReplication;
|
use garage_table::replication::TableShardedReplication;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
|
@ -50,6 +51,9 @@ impl Garage {
|
||||||
background: Arc<BackgroundRunner>,
|
background: Arc<BackgroundRunner>,
|
||||||
rpc_server: &mut RpcServer,
|
rpc_server: &mut RpcServer,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
|
let replication_mode = ReplicationMode::parse(&config.replication_mode)
|
||||||
|
.expect("Invalid replication_mode in config file.");
|
||||||
|
|
||||||
info!("Initialize membership management system...");
|
info!("Initialize membership management system...");
|
||||||
let rpc_http_client = Arc::new(
|
let rpc_http_client = Arc::new(
|
||||||
RpcHttpClient::new(config.max_concurrent_rpc_requests, &config.rpc_tls)
|
RpcHttpClient::new(config.max_concurrent_rpc_requests, &config.rpc_tls)
|
||||||
|
@ -60,32 +64,33 @@ impl Garage {
|
||||||
rpc_http_client,
|
rpc_http_client,
|
||||||
background.clone(),
|
background.clone(),
|
||||||
rpc_server,
|
rpc_server,
|
||||||
|
replication_mode.replication_factor(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let data_rep_param = TableShardedReplication {
|
let data_rep_param = TableShardedReplication {
|
||||||
system: system.clone(),
|
system: system.clone(),
|
||||||
replication_factor: config.data_replication_factor,
|
replication_factor: replication_mode.replication_factor(),
|
||||||
write_quorum: (config.data_replication_factor + 1) / 2,
|
write_quorum: replication_mode.write_quorum(),
|
||||||
read_quorum: 1,
|
read_quorum: 1,
|
||||||
};
|
};
|
||||||
|
|
||||||
let meta_rep_param = TableShardedReplication {
|
let meta_rep_param = TableShardedReplication {
|
||||||
system: system.clone(),
|
system: system.clone(),
|
||||||
replication_factor: config.meta_replication_factor,
|
replication_factor: replication_mode.replication_factor(),
|
||||||
write_quorum: (config.meta_replication_factor + 1) / 2,
|
write_quorum: replication_mode.write_quorum(),
|
||||||
read_quorum: (config.meta_replication_factor + 1) / 2,
|
read_quorum: replication_mode.read_quorum(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let control_rep_param = TableFullReplication {
|
let control_rep_param = TableFullReplication {
|
||||||
system: system.clone(),
|
system: system.clone(),
|
||||||
max_faults: config.control_write_max_faults,
|
max_faults: replication_mode.control_write_max_faults(),
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("Initialize block manager...");
|
info!("Initialize block manager...");
|
||||||
let block_manager = BlockManager::new(
|
let block_manager = BlockManager::new(
|
||||||
&db,
|
&db,
|
||||||
config.data_dir.clone(),
|
config.data_dir.clone(),
|
||||||
data_rep_param.clone(),
|
data_rep_param,
|
||||||
system.clone(),
|
system.clone(),
|
||||||
rpc_server,
|
rpc_server,
|
||||||
);
|
);
|
||||||
|
@ -95,7 +100,7 @@ impl Garage {
|
||||||
BlockRefTable {
|
BlockRefTable {
|
||||||
block_manager: block_manager.clone(),
|
block_manager: block_manager.clone(),
|
||||||
},
|
},
|
||||||
data_rep_param,
|
meta_rep_param.clone(),
|
||||||
system.clone(),
|
system.clone(),
|
||||||
&db,
|
&db,
|
||||||
"block_ref".to_string(),
|
"block_ref".to_string(),
|
||||||
|
|
|
@ -95,6 +95,7 @@ pub struct System {
|
||||||
rpc_http_client: Arc<RpcHttpClient>,
|
rpc_http_client: Arc<RpcHttpClient>,
|
||||||
rpc_client: Arc<RpcClient<Message>>,
|
rpc_client: Arc<RpcClient<Message>>,
|
||||||
|
|
||||||
|
replication_factor: usize,
|
||||||
pub(crate) status: watch::Receiver<Arc<Status>>,
|
pub(crate) status: watch::Receiver<Arc<Status>>,
|
||||||
/// The ring
|
/// The ring
|
||||||
pub ring: watch::Receiver<Arc<Ring>>,
|
pub ring: watch::Receiver<Arc<Ring>>,
|
||||||
|
@ -228,6 +229,7 @@ impl System {
|
||||||
rpc_http_client: Arc<RpcHttpClient>,
|
rpc_http_client: Arc<RpcHttpClient>,
|
||||||
background: Arc<BackgroundRunner>,
|
background: Arc<BackgroundRunner>,
|
||||||
rpc_server: &mut RpcServer,
|
rpc_server: &mut RpcServer,
|
||||||
|
replication_factor: usize,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID");
|
let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID");
|
||||||
info!("Node ID: {}", hex::encode(&id));
|
info!("Node ID: {}", hex::encode(&id));
|
||||||
|
@ -259,7 +261,7 @@ impl System {
|
||||||
.unwrap_or_else(|_| "<invalid utf-8>".to_string()),
|
.unwrap_or_else(|_| "<invalid utf-8>".to_string()),
|
||||||
};
|
};
|
||||||
|
|
||||||
let ring = Ring::new(net_config);
|
let ring = Ring::new(net_config, replication_factor);
|
||||||
let (update_ring, ring) = watch::channel(Arc::new(ring));
|
let (update_ring, ring) = watch::channel(Arc::new(ring));
|
||||||
|
|
||||||
let rpc_path = MEMBERSHIP_RPC_PATH.to_string();
|
let rpc_path = MEMBERSHIP_RPC_PATH.to_string();
|
||||||
|
@ -277,6 +279,7 @@ impl System {
|
||||||
state_info,
|
state_info,
|
||||||
rpc_http_client,
|
rpc_http_client,
|
||||||
rpc_client,
|
rpc_client,
|
||||||
|
replication_factor,
|
||||||
status,
|
status,
|
||||||
ring,
|
ring,
|
||||||
update_lock: Mutex::new(Updaters {
|
update_lock: Mutex::new(Updaters {
|
||||||
|
@ -543,7 +546,7 @@ impl System {
|
||||||
let ring: Arc<Ring> = self.ring.borrow().clone();
|
let ring: Arc<Ring> = self.ring.borrow().clone();
|
||||||
|
|
||||||
if adv.version > ring.config.version {
|
if adv.version > ring.config.version {
|
||||||
let ring = Ring::new(adv.clone());
|
let ring = Ring::new(adv.clone(), self.replication_factor);
|
||||||
update_lock.update_ring.send(Arc::new(ring))?;
|
update_lock.update_ring.send(Arc::new(ring))?;
|
||||||
drop(update_lock);
|
drop(update_lock);
|
||||||
|
|
||||||
|
|
153
src/rpc/ring.rs
153
src/rpc/ring.rs
|
@ -7,10 +7,9 @@ use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
|
||||||
// A partition number is encoded on 16 bits,
|
/// A partition id, which is stored on 16 bits
|
||||||
// i.e. we have up to 2**16 partitions.
|
/// i.e. we have up to 2**16 partitions.
|
||||||
// (in practice we have exactly 2**PARTITION_BITS partitions)
|
/// (in practice we have exactly 2**PARTITION_BITS partitions)
|
||||||
/// A partition id, stored on 16 bits
|
|
||||||
pub type Partition = u16;
|
pub type Partition = u16;
|
||||||
|
|
||||||
// TODO: make this constant parametrizable in the config file
|
// TODO: make this constant parametrizable in the config file
|
||||||
|
@ -23,11 +22,6 @@ pub const PARTITION_BITS: usize = 8;
|
||||||
|
|
||||||
const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
|
const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
|
||||||
|
|
||||||
// TODO: make this constant paraetrizable in the config file
|
|
||||||
// (most deployments use a replication factor of 3, so...)
|
|
||||||
/// The maximum number of time an object might get replicated
|
|
||||||
pub const MAX_REPLICATION: usize = 3;
|
|
||||||
|
|
||||||
/// The user-defined configuration of the cluster's nodes
|
/// The user-defined configuration of the cluster's nodes
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct NetworkConfig {
|
pub struct NetworkConfig {
|
||||||
|
@ -53,40 +47,72 @@ pub struct NetworkConfigEntry {
|
||||||
/// geodistribution
|
/// geodistribution
|
||||||
pub datacenter: String,
|
pub datacenter: String,
|
||||||
/// The (relative) capacity of the node
|
/// The (relative) capacity of the node
|
||||||
pub capacity: u32,
|
/// If this is set to None, the node does not participate in storing data for the system
|
||||||
|
/// and is only active as an API gateway to other nodes
|
||||||
|
pub capacity: Option<u32>,
|
||||||
/// A tag to recognize the entry, not used for other things than display
|
/// A tag to recognize the entry, not used for other things than display
|
||||||
pub tag: String,
|
pub tag: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl NetworkConfigEntry {
|
||||||
|
pub fn capacity_string(&self) -> String {
|
||||||
|
match self.capacity {
|
||||||
|
Some(c) => format!("{}", c),
|
||||||
|
None => "gateway".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// A ring distributing fairly objects to nodes
|
/// A ring distributing fairly objects to nodes
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Ring {
|
pub struct Ring {
|
||||||
|
/// The replication factor for this ring
|
||||||
|
pub replication_factor: usize,
|
||||||
|
|
||||||
/// The network configuration used to generate this ring
|
/// The network configuration used to generate this ring
|
||||||
pub config: NetworkConfig,
|
pub config: NetworkConfig,
|
||||||
/// The list of entries in the ring
|
|
||||||
pub ring: Vec<RingEntry>,
|
// Internal order of nodes used to make a more compact representation of the ring
|
||||||
|
nodes: Vec<Uuid>,
|
||||||
|
|
||||||
|
// The list of entries in the ring
|
||||||
|
ring: Vec<RingEntry>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Type to store compactly the id of a node in the system
|
||||||
|
// Change this to u16 the day we want to have more than 256 nodes in a cluster
|
||||||
|
type CompactNodeType = u8;
|
||||||
|
|
||||||
|
// The maximum number of times an object might get replicated
|
||||||
|
// This must be at least 3 because Garage supports 3-way replication
|
||||||
|
// Here we use 6 so that the size of a ring entry is 8 bytes
|
||||||
|
// (2 bytes partition id, 6 bytes node numbers as u8s)
|
||||||
|
const MAX_REPLICATION: usize = 6;
|
||||||
|
|
||||||
/// An entry in the ring
|
/// An entry in the ring
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct RingEntry {
|
struct RingEntry {
|
||||||
/// The prefix of the Hash of object which should use this entry
|
// The two first bytes of the first hash that goes in this partition
|
||||||
pub location: Hash,
|
// (the next bytes are zeroes)
|
||||||
/// The nodes in which a matching object should get stored
|
hash_prefix: u16,
|
||||||
pub nodes: [Uuid; MAX_REPLICATION],
|
// The nodes that store this partition, stored as a list of positions in the `nodes`
|
||||||
|
// field of the Ring structure
|
||||||
|
// Only items 0 up to ring.replication_factor - 1 are used, others are zeros
|
||||||
|
nodes_buf: [CompactNodeType; MAX_REPLICATION],
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Ring {
|
impl Ring {
|
||||||
// TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6
|
// TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6
|
||||||
// levels of imbrication. It is basically impossible to test, maintain, or understand for an
|
// levels of imbrication. It is basically impossible to test, maintain, or understand for an
|
||||||
// outsider.
|
// outsider.
|
||||||
pub(crate) fn new(config: NetworkConfig) -> Self {
|
pub(crate) fn new(config: NetworkConfig, replication_factor: usize) -> Self {
|
||||||
// Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
|
// Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
|
||||||
let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
|
let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
|
||||||
|
|
||||||
let datacenters = config
|
let datacenters = config
|
||||||
.members
|
.members
|
||||||
.iter()
|
.iter()
|
||||||
|
.filter(|(_id, info)| info.capacity.is_some())
|
||||||
.map(|(_id, info)| info.datacenter.as_str())
|
.map(|(_id, info)| info.datacenter.as_str())
|
||||||
.collect::<HashSet<&str>>();
|
.collect::<HashSet<&str>>();
|
||||||
let n_datacenters = datacenters.len();
|
let n_datacenters = datacenters.len();
|
||||||
|
@ -101,6 +127,7 @@ impl Ring {
|
||||||
let mut queues = config
|
let mut queues = config
|
||||||
.members
|
.members
|
||||||
.iter()
|
.iter()
|
||||||
|
.filter(|(_id, info)| info.capacity.is_some())
|
||||||
.map(|(node_id, node_info)| {
|
.map(|(node_id, node_info)| {
|
||||||
let mut parts = partitions_idx
|
let mut parts = partitions_idx
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -119,11 +146,13 @@ impl Ring {
|
||||||
let max_capacity = config
|
let max_capacity = config
|
||||||
.members
|
.members
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(_, node_info)| node_info.capacity)
|
.filter_map(|(_, node_info)| node_info.capacity)
|
||||||
.fold(0, std::cmp::max);
|
.fold(0, std::cmp::max);
|
||||||
|
|
||||||
|
assert!(replication_factor <= MAX_REPLICATION);
|
||||||
|
|
||||||
// Fill up ring
|
// Fill up ring
|
||||||
for rep in 0..MAX_REPLICATION {
|
for rep in 0..replication_factor {
|
||||||
queues.sort_by_key(|(ni, _np, _q, _p)| {
|
queues.sort_by_key(|(ni, _np, _q, _p)| {
|
||||||
let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat();
|
let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat();
|
||||||
fasthash(&queue_data[..])
|
fasthash(&queue_data[..])
|
||||||
|
@ -138,7 +167,7 @@ impl Ring {
|
||||||
let remaining0 = remaining;
|
let remaining0 = remaining;
|
||||||
for i_round in 0..max_capacity {
|
for i_round in 0..max_capacity {
|
||||||
for (node_id, node_info, q, pos) in queues.iter_mut() {
|
for (node_id, node_info, q, pos) in queues.iter_mut() {
|
||||||
if i_round >= node_info.capacity {
|
if i_round >= node_info.capacity.unwrap() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
for (pos2, &qv) in q.iter().enumerate().skip(*pos) {
|
for (pos2, &qv) in q.iter().enumerate().skip(*pos) {
|
||||||
|
@ -166,34 +195,58 @@ impl Ring {
|
||||||
// No progress made, exit
|
// No progress made, exit
|
||||||
warn!("Could not build ring, not enough nodes configured.");
|
warn!("Could not build ring, not enough nodes configured.");
|
||||||
return Self {
|
return Self {
|
||||||
|
replication_factor,
|
||||||
config,
|
config,
|
||||||
|
nodes: vec![],
|
||||||
ring: vec![],
|
ring: vec![],
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Make a canonical order for nodes
|
||||||
|
let nodes = config
|
||||||
|
.members
|
||||||
|
.iter()
|
||||||
|
.filter(|(_id, info)| info.capacity.is_some())
|
||||||
|
.map(|(id, _)| *id)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let nodes_rev = nodes
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(i, id)| (*id, i as CompactNodeType))
|
||||||
|
.collect::<HashMap<Uuid, CompactNodeType>>();
|
||||||
|
|
||||||
let ring = partitions
|
let ring = partitions
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(i, nodes)| {
|
.map(|(i, nodes)| {
|
||||||
let top = (i as u16) << (16 - PARTITION_BITS);
|
let top = (i as u16) << (16 - PARTITION_BITS);
|
||||||
let mut hash = [0u8; 32];
|
let nodes = nodes
|
||||||
hash[0..2].copy_from_slice(&u16::to_be_bytes(top)[..]);
|
.iter()
|
||||||
let nodes = nodes.iter().map(|(id, _info)| **id).collect::<Vec<Uuid>>();
|
.map(|(id, _info)| *nodes_rev.get(id).unwrap())
|
||||||
|
.collect::<Vec<CompactNodeType>>();
|
||||||
|
assert!(nodes.len() == replication_factor);
|
||||||
|
let mut nodes_buf = [0u8; MAX_REPLICATION];
|
||||||
|
nodes_buf[..replication_factor].copy_from_slice(&nodes[..]);
|
||||||
RingEntry {
|
RingEntry {
|
||||||
location: hash.into(),
|
hash_prefix: top,
|
||||||
nodes: nodes.try_into().unwrap(),
|
nodes_buf,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
Self { config, ring }
|
Self {
|
||||||
|
replication_factor,
|
||||||
|
config,
|
||||||
|
nodes,
|
||||||
|
ring,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the partition in which data would fall on
|
/// Get the partition in which data would fall on
|
||||||
pub fn partition_of(&self, from: &Hash) -> Partition {
|
pub fn partition_of(&self, position: &Hash) -> Partition {
|
||||||
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
|
let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap());
|
||||||
top >> (16 - PARTITION_BITS)
|
top >> (16 - PARTITION_BITS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -202,7 +255,9 @@ impl Ring {
|
||||||
let mut ret = vec![];
|
let mut ret = vec![];
|
||||||
|
|
||||||
for (i, entry) in self.ring.iter().enumerate() {
|
for (i, entry) in self.ring.iter().enumerate() {
|
||||||
ret.push((i as u16, entry.location));
|
let mut location = [0u8; 32];
|
||||||
|
location[..2].copy_from_slice(&u16::to_be_bytes(entry.hash_prefix)[..]);
|
||||||
|
ret.push((i as u16, location.into()));
|
||||||
}
|
}
|
||||||
if !ret.is_empty() {
|
if !ret.is_empty() {
|
||||||
assert_eq!(ret[0].1, [0u8; 32].into());
|
assert_eq!(ret[0].1, [0u8; 32].into());
|
||||||
|
@ -211,28 +266,38 @@ impl Ring {
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO rename this function as it no longer walk the ring
|
|
||||||
/// Walk the ring to find the n servers in which data should be replicated
|
/// Walk the ring to find the n servers in which data should be replicated
|
||||||
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<Uuid> {
|
pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec<Uuid> {
|
||||||
if self.ring.len() != 1 << PARTITION_BITS {
|
if self.ring.len() != 1 << PARTITION_BITS {
|
||||||
warn!("Ring not yet ready, read/writes will be lost!");
|
warn!("Ring not yet ready, read/writes will be lost!");
|
||||||
return vec![];
|
return vec![];
|
||||||
}
|
}
|
||||||
|
|
||||||
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
|
let partition_idx = self.partition_of(position) as usize;
|
||||||
let partition_idx = (top >> (16 - PARTITION_BITS)) as usize;
|
|
||||||
// TODO why computing two time in the same way and asserting?
|
|
||||||
assert_eq!(partition_idx, self.partition_of(from) as usize);
|
|
||||||
|
|
||||||
let partition = &self.ring[partition_idx];
|
let partition = &self.ring[partition_idx];
|
||||||
|
|
||||||
let partition_top =
|
let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap());
|
||||||
u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
|
// Check that we haven't messed up our partition table, i.e. that this partition
|
||||||
// TODO is this an assertion on the validity of PARTITION_MASK_U16? If so, it should
|
// table entrey indeed corresponds to the item we are storing
|
||||||
// probably be a test more than a runtime assertion
|
assert_eq!(
|
||||||
assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16);
|
partition.hash_prefix & PARTITION_MASK_U16,
|
||||||
|
top & PARTITION_MASK_U16
|
||||||
|
);
|
||||||
|
|
||||||
assert!(n <= partition.nodes.len());
|
assert!(n <= self.replication_factor);
|
||||||
partition.nodes[..n].to_vec()
|
partition.nodes_buf[..n]
|
||||||
|
.iter()
|
||||||
|
.map(|i| self.nodes[*i as usize])
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_ring_entry_size() {
|
||||||
|
assert_eq!(std::mem::size_of::<RingEntry>(), 8);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,10 @@
|
||||||
mod parameters;
|
mod parameters;
|
||||||
|
|
||||||
mod fullcopy;
|
mod fullcopy;
|
||||||
|
mod mode;
|
||||||
mod sharded;
|
mod sharded;
|
||||||
|
|
||||||
pub use fullcopy::TableFullReplication;
|
pub use fullcopy::TableFullReplication;
|
||||||
|
pub use mode::ReplicationMode;
|
||||||
pub use parameters::*;
|
pub use parameters::*;
|
||||||
pub use sharded::TableShardedReplication;
|
pub use sharded::TableShardedReplication;
|
||||||
|
|
47
src/table/replication/mode.rs
Normal file
47
src/table/replication/mode.rs
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
pub enum ReplicationMode {
|
||||||
|
None,
|
||||||
|
TwoWay,
|
||||||
|
ThreeWay,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReplicationMode {
|
||||||
|
pub fn parse(v: &str) -> Option<Self> {
|
||||||
|
match v {
|
||||||
|
"none" | "1" => Some(Self::None),
|
||||||
|
"2" => Some(Self::TwoWay),
|
||||||
|
"3" => Some(Self::ThreeWay),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn control_write_max_faults(&self) -> usize {
|
||||||
|
match self {
|
||||||
|
Self::None => 0,
|
||||||
|
_ => 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn replication_factor(&self) -> usize {
|
||||||
|
match self {
|
||||||
|
Self::None => 1,
|
||||||
|
Self::TwoWay => 2,
|
||||||
|
Self::ThreeWay => 3,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn read_quorum(&self) -> usize {
|
||||||
|
match self {
|
||||||
|
Self::None => 1,
|
||||||
|
Self::TwoWay => 1,
|
||||||
|
Self::ThreeWay => 2,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write_quorum(&self) -> usize {
|
||||||
|
match self {
|
||||||
|
Self::None => 1,
|
||||||
|
Self::TwoWay => 2,
|
||||||
|
Self::ThreeWay => 2,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,8 +26,8 @@ pub struct TableShardedReplication {
|
||||||
|
|
||||||
impl TableReplication for TableShardedReplication {
|
impl TableReplication for TableShardedReplication {
|
||||||
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
||||||
let ring = self.system.ring.borrow().clone();
|
let ring = self.system.ring.borrow();
|
||||||
ring.walk_ring(&hash, self.replication_factor)
|
ring.get_nodes(&hash, self.replication_factor)
|
||||||
}
|
}
|
||||||
fn read_quorum(&self) -> usize {
|
fn read_quorum(&self) -> usize {
|
||||||
self.read_quorum
|
self.read_quorum
|
||||||
|
@ -35,7 +35,7 @@ impl TableReplication for TableShardedReplication {
|
||||||
|
|
||||||
fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
||||||
let ring = self.system.ring.borrow();
|
let ring = self.system.ring.borrow();
|
||||||
ring.walk_ring(&hash, self.replication_factor)
|
ring.get_nodes(&hash, self.replication_factor)
|
||||||
}
|
}
|
||||||
fn write_quorum(&self) -> usize {
|
fn write_quorum(&self) -> usize {
|
||||||
self.write_quorum
|
self.write_quorum
|
||||||
|
|
|
@ -15,6 +15,17 @@ pub struct Config {
|
||||||
/// Path where to store data. Can be slower, but need higher volume
|
/// Path where to store data. Can be slower, but need higher volume
|
||||||
pub data_dir: PathBuf,
|
pub data_dir: PathBuf,
|
||||||
|
|
||||||
|
/// Size of data blocks to save to disk
|
||||||
|
#[serde(default = "default_block_size")]
|
||||||
|
pub block_size: usize,
|
||||||
|
|
||||||
|
/// Replication mode. Supported values:
|
||||||
|
/// - none, 1 -> no replication
|
||||||
|
/// - 2 -> 2-way replication
|
||||||
|
/// - 3 -> 3-way replication
|
||||||
|
// (we can add more aliases for this later)
|
||||||
|
pub replication_mode: String,
|
||||||
|
|
||||||
/// Address to bind for RPC
|
/// Address to bind for RPC
|
||||||
pub rpc_bind_addr: SocketAddr,
|
pub rpc_bind_addr: SocketAddr,
|
||||||
|
|
||||||
|
@ -26,6 +37,13 @@ pub struct Config {
|
||||||
/// Consul service name to use
|
/// Consul service name to use
|
||||||
pub consul_service_name: Option<String>,
|
pub consul_service_name: Option<String>,
|
||||||
|
|
||||||
|
/// Configuration for RPC TLS
|
||||||
|
pub rpc_tls: Option<TlsConfig>,
|
||||||
|
|
||||||
|
/// Max number of concurrent RPC request
|
||||||
|
#[serde(default = "default_max_concurrent_rpc_requests")]
|
||||||
|
pub max_concurrent_rpc_requests: usize,
|
||||||
|
|
||||||
/// Sled cache size, in bytes
|
/// Sled cache size, in bytes
|
||||||
#[serde(default = "default_sled_cache_capacity")]
|
#[serde(default = "default_sled_cache_capacity")]
|
||||||
pub sled_cache_capacity: u64,
|
pub sled_cache_capacity: u64,
|
||||||
|
@ -34,28 +52,6 @@ pub struct Config {
|
||||||
#[serde(default = "default_sled_flush_every_ms")]
|
#[serde(default = "default_sled_flush_every_ms")]
|
||||||
pub sled_flush_every_ms: u64,
|
pub sled_flush_every_ms: u64,
|
||||||
|
|
||||||
/// Max number of concurrent RPC request
|
|
||||||
#[serde(default = "default_max_concurrent_rpc_requests")]
|
|
||||||
pub max_concurrent_rpc_requests: usize,
|
|
||||||
|
|
||||||
/// Size of data blocks to save to disk
|
|
||||||
#[serde(default = "default_block_size")]
|
|
||||||
pub block_size: usize,
|
|
||||||
|
|
||||||
#[serde(default = "default_control_write_max_faults")]
|
|
||||||
pub control_write_max_faults: usize,
|
|
||||||
|
|
||||||
/// How many nodes should hold a copy of meta data
|
|
||||||
#[serde(default = "default_replication_factor")]
|
|
||||||
pub meta_replication_factor: usize,
|
|
||||||
|
|
||||||
/// How many nodes should hold a copy of data
|
|
||||||
#[serde(default = "default_replication_factor")]
|
|
||||||
pub data_replication_factor: usize,
|
|
||||||
|
|
||||||
/// Configuration for RPC TLS
|
|
||||||
pub rpc_tls: Option<TlsConfig>,
|
|
||||||
|
|
||||||
/// Configuration for S3 api
|
/// Configuration for S3 api
|
||||||
pub s3_api: ApiConfig,
|
pub s3_api: ApiConfig,
|
||||||
|
|
||||||
|
@ -106,12 +102,6 @@ fn default_max_concurrent_rpc_requests() -> usize {
|
||||||
fn default_block_size() -> usize {
|
fn default_block_size() -> usize {
|
||||||
1048576
|
1048576
|
||||||
}
|
}
|
||||||
fn default_replication_factor() -> usize {
|
|
||||||
3
|
|
||||||
}
|
|
||||||
fn default_control_write_max_faults() -> usize {
|
|
||||||
1
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read and parse configuration
|
/// Read and parse configuration
|
||||||
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
|
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
|
||||||
|
|
Loading…
Reference in a new issue