diff --git a/src/api_server.rs b/src/api_server.rs index 52f33969..13fd5038 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -24,7 +24,7 @@ pub async fn run_api_server( garage: Arc, shutdown_signal: impl Future, ) -> Result<(), Error> { - let addr = ([0, 0, 0, 0], garage.system.config.api_port).into(); + let addr = ([0, 0, 0, 0, 0, 0, 0, 0], garage.system.config.api_port).into(); let service = make_service_fn(|conn: &AddrStream| { let garage = garage.clone(); @@ -215,12 +215,12 @@ async fn put_block(garage: Arc, hash: Hash, data: Vec) -> Result<(), .ring .borrow() .clone() - .walk_ring(&hash, garage.system.config.meta_replication_factor); + .walk_ring(&hash, garage.system.config.data_replication_factor); rpc_try_call_many( garage.system.clone(), &who[..], &Message::PutBlock(PutBlockMessage { hash, data }), - (garage.system.config.meta_replication_factor + 1) / 2, + (garage.system.config.data_replication_factor + 1) / 2, DEFAULT_TIMEOUT, ) .await?; @@ -362,7 +362,7 @@ async fn get_block(garage: Arc, hash: &Hash) -> Result, Error> { .ring .borrow() .clone() - .walk_ring(&hash, garage.system.config.meta_replication_factor); + .walk_ring(&hash, garage.system.config.data_replication_factor); let resps = rpc_try_call_many( garage.system.clone(), &who[..], diff --git a/src/error.rs b/src/error.rs index 7a116954..661621c9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -32,6 +32,9 @@ pub enum Error { #[error(display = "Timeout: {}", _0)] RPCTimeout(#[error(source)] tokio::time::Elapsed), + #[error(display = "Tokio join error: {}", _0)] + TokioJoin(#[error(source)] tokio::task::JoinError), + #[error(display = "RPC error: {}", _0)] RPCError(String), diff --git a/src/rpc_server.rs b/src/rpc_server.rs index ddfc5e04..f54b5099 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -56,19 +56,32 @@ async fn handler( ); let sys = garage.system.clone(); - let resp = err_to_msg(match &msg { - Message::Ping(ping) => sys.handle_ping(&addr, ping).await, + let resp = err_to_msg(match msg { + Message::Ping(ping) => sys.handle_ping(&addr, &ping).await, Message::PullStatus => sys.handle_pull_status(), Message::PullConfig => sys.handle_pull_config(), - Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await, - Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await, + Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(&adv).await, + Message::AdvertiseConfig(adv) => sys.handle_advertise_config(&adv).await, - Message::PutBlock(m) => garage.block_manager.write_block(&m.hash, &m.data).await, + Message::PutBlock(m) => { + // A RPC can be interrupted in the middle, however we don't want to write partial blocks, + // which might happen if the write_block() future is cancelled in the middle. + // To solve this, the write itself is in a spawned task that has its own separate lifetime, + // and the request handler simply sits there waiting for the task to finish. + // (if it's cancelled, that's not an issue) + // (TODO FIXME except if garage happens to shut down at that point) + let write_fut = async move { + garage.block_manager.write_block(&m.hash, &m.data).await + }; + tokio::spawn(write_fut).await? + } Message::GetBlock(h) => garage.block_manager.read_block(&h).await, Message::TableRPC(table, msg) => { - if let Some(rpc_handler) = garage.table_rpc_handlers.get(table) { + // For now, table RPCs use transactions that are not async so even if the future + // is canceled, the db should be in a consistent state. + if let Some(rpc_handler) = garage.table_rpc_handlers.get(&table) { rpc_handler .handle(&msg[..]) .await @@ -90,7 +103,7 @@ pub async fn run_rpc_server( garage: Arc, shutdown_signal: impl Future, ) -> Result<(), Error> { - let bind_addr = ([0, 0, 0, 0], garage.system.config.rpc_port).into(); + let bind_addr = ([0, 0, 0, 0, 0, 0, 0, 0], garage.system.config.rpc_port).into(); let service = make_service_fn(|conn: &AddrStream| { let client_addr = conn.remote_addr();