Compare commits

..

No commits in common. "3ecd14b9f6202ad3c5513c6ad7422bd408134002" and "707442f5de416fdbed4681a33b739f0a787b7834" have entirely different histories.

2 changed files with 39 additions and 132 deletions

View file

@ -299,7 +299,9 @@ impl RpcHelper {
if let Some((req_to, fut)) = requests.next() { if let Some((req_to, fut)) = requests.next() {
let tracer = opentelemetry::global::tracer("garage"); let tracer = opentelemetry::global::tracer("garage");
let span = tracer.start(format!("RPC to {:?}", req_to)); let span = tracer.start(format!("RPC to {:?}", req_to));
resp_stream.push(fut.with_context(Context::current_with_span(span))); resp_stream.push(tokio::spawn(
fut.with_context(Context::current_with_span(span)),
));
} else { } else {
break; break;
} }
@ -311,7 +313,7 @@ impl RpcHelper {
} }
// Wait for one request to terminate // Wait for one request to terminate
match resp_stream.next().await.unwrap() { match resp_stream.next().await.unwrap().unwrap() {
Ok(msg) => { Ok(msg) => {
successes.push(msg); successes.push(msg);
} }
@ -446,7 +448,7 @@ impl RpcHelper {
let tracer = opentelemetry::global::tracer("garage"); let tracer = opentelemetry::global::tracer("garage");
let span = tracer.start(format!("RPC to {:?}", to)); let span = tracer.start(format!("RPC to {:?}", to));
let fut = async move { (to, self2.call(&endpoint2, to, msg, strategy).await) }; let fut = async move { (to, self2.call(&endpoint2, to, msg, strategy).await) };
fut.with_context(Context::current_with_span(span)) tokio::spawn(fut.with_context(Context::current_with_span(span)))
}); });
let mut resp_stream = requests.collect::<FuturesUnordered<_>>(); let mut resp_stream = requests.collect::<FuturesUnordered<_>>();
@ -455,7 +457,9 @@ impl RpcHelper {
let mut set_counters = vec![(0, 0); to_sets.len()]; let mut set_counters = vec![(0, 0); to_sets.len()];
while let Some((node, resp)) = resp_stream.next().await { while !resp_stream.is_empty() {
let (node, resp) = resp_stream.next().await.unwrap().unwrap();
match resp { match resp {
Ok(msg) => { Ok(msg) => {
for set in peers.get(&node).unwrap().iter() { for set in peers.get(&node).unwrap().iter() {
@ -471,12 +475,12 @@ impl RpcHelper {
} }
} }
if set_counters.iter().all(|(ok_cnt, _)| *ok_cnt >= quorum) { if set_counters.iter().all(|x| x.0 >= quorum) {
// Success // Success
// Continue all other requets in background // Continue all other requets in background
tokio::spawn(async move { tokio::spawn(async move {
resp_stream.collect::<Vec<(Uuid, Result<_, _>)>>().await; resp_stream.collect::<Vec<Result<_, _>>>().await;
}); });
return Ok(successes); return Ok(successes);
@ -485,7 +489,7 @@ impl RpcHelper {
if set_counters if set_counters
.iter() .iter()
.enumerate() .enumerate()
.any(|(i, (_, err_cnt))| err_cnt + quorum > to_sets[i].len()) .any(|(i, x)| x.1 + quorum > to_sets[i].len())
{ {
// Too many errors in this set, we know we won't get a quorum // Too many errors in this set, we know we won't get a quorum
break; break;

View file

@ -143,7 +143,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
self.data.queue_insert(tx, e) self.data.queue_insert(tx, e)
} }
pub async fn insert_many<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error> pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error>
where where
I: IntoIterator<Item = IE> + Send + Sync, I: IntoIterator<Item = IE> + Send + Sync,
IE: Borrow<F::E> + Send + Sync, IE: Borrow<F::E> + Send + Sync,
@ -161,149 +161,52 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
Ok(()) Ok(())
} }
async fn insert_many_internal<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error> async fn insert_many_internal<I, IE>(&self, entries: I) -> Result<(), Error>
where where
I: IntoIterator<Item = IE> + Send + Sync, I: IntoIterator<Item = IE> + Send + Sync,
IE: Borrow<F::E> + Send + Sync, IE: Borrow<F::E> + Send + Sync,
{ {
// The different items will have to be stored on possibly different nodes. let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
// We will here batch all items into a single request for each concerned
// node, with all of the entries it must store within that request.
// Each entry has to be saved to a specific list of "write sets", i.e. a set
// of node within wich a quorum must be achieved. In normal operation, there
// is a single write set which corresponds to the quorum in the current
// cluster layout, but when the layout is updated, multiple write sets might
// have to be handled at once. Here, since we are sending many entries, we
// will have to handle many write sets in all cases. The algorihtm is thus
// to send one request to each node with all the items it must save,
// and keep track of the OK responses within each write set: if for all sets
// a quorum of nodes has answered OK, then the insert has succeeded and
// consistency properties (read-after-write) are preserved.
// Some code here might feel redundant with RpcHelper::try_write_many_sets,
// but I think deduplicating could lead to more spaghetti instead of
// improving the readability, so I'm leaving as is.
let quorum = self.data.replication.write_quorum();
// Serialize all entries and compute the write sets for each of them.
// In the case of sharded table replication, this also takes an "ack lock"
// to the layout manager to avoid ack'ing newer versions which are not
// taken into account by writes in progress (the ack can happen later, once
// all writes that didn't take the new layout into account are finished).
// These locks are released when entries_vec is dropped, i.e. when this
// function returns.
let mut entries_vec = Vec::new();
for entry in entries.into_iter() { for entry in entries.into_iter() {
let entry = entry.borrow(); let entry = entry.borrow();
let hash = entry.partition_key().hash(); let hash = entry.partition_key().hash();
let write_sets = self.data.replication.write_sets(&hash); // TODO: use write sets
let who = self.data.replication.storage_nodes(&hash);
let e_enc = Arc::new(ByteBuf::from(entry.encode()?)); let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
entries_vec.push((write_sets, e_enc)); for node in who {
} call_list.entry(node).or_default().push(e_enc.clone());
// Compute a deduplicated list of all of the write sets,
// and compute an index from each node to the position of the sets in which
// it takes part, to optimize the detection of a quorum.
let mut write_sets = entries_vec
.iter()
.map(|(wss, _)| wss.as_ref().iter().map(|ws| ws.as_slice()))
.flatten()
.collect::<Vec<&[Uuid]>>();
write_sets.sort();
write_sets.dedup();
let mut write_set_index = HashMap::<&Uuid, Vec<usize>>::new();
for (i, write_set) in write_sets.iter().enumerate() {
for node in write_set.iter() {
write_set_index.entry(node).or_default().push(i);
} }
} }
// Build a map of all nodes to the entries that must be sent to that node. let call_futures = call_list.drain().map(|(node, entries)| async move {
let mut call_list: HashMap<Uuid, Vec<_>> = HashMap::new();
for (write_sets, entry_enc) in entries_vec.iter() {
for write_set in write_sets.as_ref().iter() {
for node in write_set.iter() {
call_list.entry(*node).or_default().push(entry_enc.clone())
}
}
}
// Build futures to actually perform each of the corresponding RPC calls
let call_count = call_list.len();
let call_futures = call_list.into_iter().map(|(node, entries)| {
let this = self.clone();
let tracer = opentelemetry::global::tracer("garage");
let span = tracer.start(format!("RPC to {:?}", node));
let fut = async move {
let rpc = TableRpc::<F>::Update(entries); let rpc = TableRpc::<F>::Update(entries);
let resp = this
let resp = self
.system .system
.rpc_helper() .rpc_helper()
.call( .call(
&this.endpoint, &self.endpoint,
node, node,
rpc, rpc,
RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(quorum), RequestStrategy::with_priority(PRIO_NORMAL),
) )
.await; .await?;
(node, resp) Ok::<_, Error>((node, resp))
};
fut.with_context(Context::current_with_span(span))
}); });
// Run all requests in parallel thanks to FuturesUnordered, and collect results.
let mut resps = call_futures.collect::<FuturesUnordered<_>>(); let mut resps = call_futures.collect::<FuturesUnordered<_>>();
let mut set_counters = vec![(0, 0); write_sets.len()];
let mut successes = 0;
let mut errors = vec![]; let mut errors = vec![];
while let Some((node, resp)) = resps.next().await { while let Some(resp) = resps.next().await {
match resp { if let Err(e) = resp {
Ok(_) => {
successes += 1;
for set in write_set_index.get(&node).unwrap().iter() {
set_counters[*set].0 += 1;
}
}
Err(e) => {
errors.push(e); errors.push(e);
for set in write_set_index.get(&node).unwrap().iter() {
set_counters[*set].1 += 1;
} }
} }
if errors.len() > self.data.replication.max_write_errors() {
Err(Error::Message("Too many errors".into()))
} else {
Ok(())
} }
if set_counters.iter().all(|(ok_cnt, _)| *ok_cnt >= quorum) {
// Success
// Continue all other requests in background
tokio::spawn(async move {
resps.collect::<Vec<(Uuid, Result<_, _>)>>().await;
});
return Ok(());
}
if set_counters
.iter()
.enumerate()
.any(|(i, (_, err_cnt))| err_cnt + quorum > write_sets[i].len())
{
// Too many errors in this set, we know we won't get a quorum
break;
}
}
// Failure, could not get quorum within at least one set
let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
Err(Error::Quorum(
quorum,
Some(write_sets.len()),
successes,
call_count,
errors,
))
} }
pub async fn get( pub async fn get(