From 43ce5e4ab4ebe317bb9263de5d56b90dc68ea7eb Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sun, 12 Apr 2020 23:05:53 +0200 Subject: [PATCH] Fix table RPC to not be interruptible --- TODO | 33 ++++++++++----------------------- src/rpc_server.rs | 26 +++++++++++++++----------- 2 files changed, 25 insertions(+), 34 deletions(-) diff --git a/TODO b/TODO index 8032d2e0..3294e62f 100644 --- a/TODO +++ b/TODO @@ -1,29 +1,16 @@ -Object table ------------- - - -Rename version table to object table -In value handle the different versions - -So that the table becomes bucket + Sort key = object key -> CRDT(list of versions) - -CRDT merge rule: -- keep one complete version (the one with the highest timestamp) -- keep all incomplete versions with timestamps higher than the complete version - -Cleanup rule: remove incomplete versions after a given delay (say 24h) - - -Block table +Replication ----------- -Table is version_UUID -> BTreeMap<(offset, block hash)> OR Deleted (= CRDT top) +- for each interval of tokens, we know the list of nodes that are responsible +- every node watches the current ring and state of the network +- and thus determines the interval of tokens for which they are responsible -Block reference table ---------------------- -Table is block_Hash + Sort key: version_UUID -> boolean (true when deleted) +To do list +---------- -Since the hash key is the same as for the blocks themselves, -we can simply consider the updates to this table as events that increase/decrease a reference counter. +- important: check block values on read and repare corrupted block contents +- less a priority: hinted handoff +- FIXME in rpc_server when garage shuts down and futures can be interrupted + (tokio::spawn should be replaced by a new function background::spawn_joinable) diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 17da6f86..b75d67fd 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -76,22 +76,26 @@ async fn handler( // and the request handler simply sits there waiting for the task to finish. // (if it's cancelled, that's not an issue) // (TODO FIXME except if garage happens to shut down at that point) - let write_fut = async move { garage.block_manager.write_block(&m.hash, &m.data).await }; + let write_fut = async move { + garage.block_manager.write_block(&m.hash, &m.data).await + }; tokio::spawn(write_fut).await? } Message::GetBlock(h) => garage.block_manager.read_block(&h).await, Message::TableRPC(table, msg) => { - // For now, table RPCs use transactions that are not async so even if the future - // is canceled, the db should be in a consistent state. - if let Some(rpc_handler) = garage.table_rpc_handlers.get(&table) { - rpc_handler - .handle(&msg[..]) - .await - .map(|rep| Message::TableRPC(table.to_string(), rep)) - } else { - Ok(Message::Error(format!("Unknown table: {}", table))) - } + // Same trick for table RPCs than for PutBlock + let op_fut = async move { + if let Some(rpc_handler) = garage.table_rpc_handlers.get(&table) { + rpc_handler + .handle(&msg[..]) + .await + .map(|rep| Message::TableRPC(table.to_string(), rep)) + } else { + Ok(Message::Error(format!("Unknown table: {}", table))) + } + }; + tokio::spawn(op_fut).await? } _ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))),