forked from Deuxfleurs/garage
fixes
- make block_put call uninterruptible by client - used meta_replication_factor instead of data_replication_factor - listen on ipv6
This commit is contained in:
parent
9c931f5eda
commit
419c70e506
3 changed files with 27 additions and 11 deletions
|
@ -24,7 +24,7 @@ pub async fn run_api_server(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
shutdown_signal: impl Future<Output = ()>,
|
shutdown_signal: impl Future<Output = ()>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let addr = ([0, 0, 0, 0], garage.system.config.api_port).into();
|
let addr = ([0, 0, 0, 0, 0, 0, 0, 0], garage.system.config.api_port).into();
|
||||||
|
|
||||||
let service = make_service_fn(|conn: &AddrStream| {
|
let service = make_service_fn(|conn: &AddrStream| {
|
||||||
let garage = garage.clone();
|
let garage = garage.clone();
|
||||||
|
@ -215,12 +215,12 @@ async fn put_block(garage: Arc<Garage>, hash: Hash, data: Vec<u8>) -> Result<(),
|
||||||
.ring
|
.ring
|
||||||
.borrow()
|
.borrow()
|
||||||
.clone()
|
.clone()
|
||||||
.walk_ring(&hash, garage.system.config.meta_replication_factor);
|
.walk_ring(&hash, garage.system.config.data_replication_factor);
|
||||||
rpc_try_call_many(
|
rpc_try_call_many(
|
||||||
garage.system.clone(),
|
garage.system.clone(),
|
||||||
&who[..],
|
&who[..],
|
||||||
&Message::PutBlock(PutBlockMessage { hash, data }),
|
&Message::PutBlock(PutBlockMessage { hash, data }),
|
||||||
(garage.system.config.meta_replication_factor + 1) / 2,
|
(garage.system.config.data_replication_factor + 1) / 2,
|
||||||
DEFAULT_TIMEOUT,
|
DEFAULT_TIMEOUT,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -362,7 +362,7 @@ async fn get_block(garage: Arc<Garage>, hash: &Hash) -> Result<Vec<u8>, Error> {
|
||||||
.ring
|
.ring
|
||||||
.borrow()
|
.borrow()
|
||||||
.clone()
|
.clone()
|
||||||
.walk_ring(&hash, garage.system.config.meta_replication_factor);
|
.walk_ring(&hash, garage.system.config.data_replication_factor);
|
||||||
let resps = rpc_try_call_many(
|
let resps = rpc_try_call_many(
|
||||||
garage.system.clone(),
|
garage.system.clone(),
|
||||||
&who[..],
|
&who[..],
|
||||||
|
|
|
@ -32,6 +32,9 @@ pub enum Error {
|
||||||
#[error(display = "Timeout: {}", _0)]
|
#[error(display = "Timeout: {}", _0)]
|
||||||
RPCTimeout(#[error(source)] tokio::time::Elapsed),
|
RPCTimeout(#[error(source)] tokio::time::Elapsed),
|
||||||
|
|
||||||
|
#[error(display = "Tokio join error: {}", _0)]
|
||||||
|
TokioJoin(#[error(source)] tokio::task::JoinError),
|
||||||
|
|
||||||
#[error(display = "RPC error: {}", _0)]
|
#[error(display = "RPC error: {}", _0)]
|
||||||
RPCError(String),
|
RPCError(String),
|
||||||
|
|
||||||
|
|
|
@ -56,19 +56,32 @@ async fn handler(
|
||||||
);
|
);
|
||||||
|
|
||||||
let sys = garage.system.clone();
|
let sys = garage.system.clone();
|
||||||
let resp = err_to_msg(match &msg {
|
let resp = err_to_msg(match msg {
|
||||||
Message::Ping(ping) => sys.handle_ping(&addr, ping).await,
|
Message::Ping(ping) => sys.handle_ping(&addr, &ping).await,
|
||||||
|
|
||||||
Message::PullStatus => sys.handle_pull_status(),
|
Message::PullStatus => sys.handle_pull_status(),
|
||||||
Message::PullConfig => sys.handle_pull_config(),
|
Message::PullConfig => sys.handle_pull_config(),
|
||||||
Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await,
|
Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(&adv).await,
|
||||||
Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await,
|
Message::AdvertiseConfig(adv) => sys.handle_advertise_config(&adv).await,
|
||||||
|
|
||||||
Message::PutBlock(m) => garage.block_manager.write_block(&m.hash, &m.data).await,
|
Message::PutBlock(m) => {
|
||||||
|
// A RPC can be interrupted in the middle, however we don't want to write partial blocks,
|
||||||
|
// which might happen if the write_block() future is cancelled in the middle.
|
||||||
|
// To solve this, the write itself is in a spawned task that has its own separate lifetime,
|
||||||
|
// and the request handler simply sits there waiting for the task to finish.
|
||||||
|
// (if it's cancelled, that's not an issue)
|
||||||
|
// (TODO FIXME except if garage happens to shut down at that point)
|
||||||
|
let write_fut = async move {
|
||||||
|
garage.block_manager.write_block(&m.hash, &m.data).await
|
||||||
|
};
|
||||||
|
tokio::spawn(write_fut).await?
|
||||||
|
}
|
||||||
Message::GetBlock(h) => garage.block_manager.read_block(&h).await,
|
Message::GetBlock(h) => garage.block_manager.read_block(&h).await,
|
||||||
|
|
||||||
Message::TableRPC(table, msg) => {
|
Message::TableRPC(table, msg) => {
|
||||||
if let Some(rpc_handler) = garage.table_rpc_handlers.get(table) {
|
// For now, table RPCs use transactions that are not async so even if the future
|
||||||
|
// is canceled, the db should be in a consistent state.
|
||||||
|
if let Some(rpc_handler) = garage.table_rpc_handlers.get(&table) {
|
||||||
rpc_handler
|
rpc_handler
|
||||||
.handle(&msg[..])
|
.handle(&msg[..])
|
||||||
.await
|
.await
|
||||||
|
@ -90,7 +103,7 @@ pub async fn run_rpc_server(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
shutdown_signal: impl Future<Output = ()>,
|
shutdown_signal: impl Future<Output = ()>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let bind_addr = ([0, 0, 0, 0], garage.system.config.rpc_port).into();
|
let bind_addr = ([0, 0, 0, 0, 0, 0, 0, 0], garage.system.config.rpc_port).into();
|
||||||
|
|
||||||
let service = make_service_fn(|conn: &AddrStream| {
|
let service = make_service_fn(|conn: &AddrStream| {
|
||||||
let client_addr = conn.remote_addr();
|
let client_addr = conn.remote_addr();
|
||||||
|
|
Loading…
Reference in a new issue