NLnet task 3 #667
1 changed files with 125 additions and 28 deletions
|
@ -143,7 +143,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
||||||
self.data.queue_insert(tx, e)
|
self.data.queue_insert(tx, e)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error>
|
pub async fn insert_many<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
I: IntoIterator<Item = IE> + Send + Sync,
|
I: IntoIterator<Item = IE> + Send + Sync,
|
||||||
IE: Borrow<F::E> + Send + Sync,
|
IE: Borrow<F::E> + Send + Sync,
|
||||||
|
@ -161,52 +161,149 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn insert_many_internal<I, IE>(&self, entries: I) -> Result<(), Error>
|
async fn insert_many_internal<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
I: IntoIterator<Item = IE> + Send + Sync,
|
I: IntoIterator<Item = IE> + Send + Sync,
|
||||||
IE: Borrow<F::E> + Send + Sync,
|
IE: Borrow<F::E> + Send + Sync,
|
||||||
{
|
{
|
||||||
let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
|
// The different items will have to be stored on possibly different nodes.
|
||||||
|
// We will here batch all items into a single request for each concerned
|
||||||
|
// node, with all of the entries it must store within that request.
|
||||||
|
// Each entry has to be saved to a specific list of "write sets", i.e. a set
|
||||||
|
// of node within wich a quorum must be achieved. In normal operation, there
|
||||||
|
// is a single write set which corresponds to the quorum in the current
|
||||||
|
// cluster layout, but when the layout is updated, multiple write sets might
|
||||||
|
// have to be handled at once. Here, since we are sending many entries, we
|
||||||
|
// will have to handle many write sets in all cases. The algorihtm is thus
|
||||||
|
// to send one request to each node with all the items it must save,
|
||||||
|
// and keep track of the OK responses within each write set: if for all sets
|
||||||
|
// a quorum of nodes has answered OK, then the insert has succeeded and
|
||||||
|
// consistency properties (read-after-write) are preserved.
|
||||||
|
|
||||||
|
// Some code here might feel redundant with RpcHelper::try_write_many_sets,
|
||||||
|
// but I think deduplicating could lead to more spaghetti instead of
|
||||||
|
// improving the readability, so I'm leaving as is.
|
||||||
|
|
||||||
|
let quorum = self.data.replication.write_quorum();
|
||||||
|
|
||||||
|
// Serialize all entries and compute the write sets for each of them.
|
||||||
|
// In the case of sharded table replication, this also takes an "ack lock"
|
||||||
|
// to the layout manager to avoid ack'ing newer versions which are not
|
||||||
|
// taken into account by writes in progress (the ack can happen later, once
|
||||||
|
// all writes that didn't take the new layout into account are finished).
|
||||||
|
// These locks are released when entries_vec is dropped, i.e. when this
|
||||||
|
// function returns.
|
||||||
|
let mut entries_vec = Vec::new();
|
||||||
for entry in entries.into_iter() {
|
for entry in entries.into_iter() {
|
||||||
let entry = entry.borrow();
|
let entry = entry.borrow();
|
||||||
let hash = entry.partition_key().hash();
|
let hash = entry.partition_key().hash();
|
||||||
// TODO: use write sets
|
let write_sets = self.data.replication.write_sets(&hash);
|
||||||
let who = self.data.replication.storage_nodes(&hash);
|
|
||||||
let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
|
let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
|
||||||
for node in who {
|
entries_vec.push((write_sets, e_enc));
|
||||||
call_list.entry(node).or_default().push(e_enc.clone());
|
}
|
||||||
|
|
||||||
|
// Compute a deduplicated list of all of the write sets,
|
||||||
|
// and compute an index from each node to the position of the sets in which
|
||||||
|
// it takes part, to optimize the detection of a quorum.
|
||||||
|
let mut write_sets = entries_vec
|
||||||
|
.iter()
|
||||||
|
.map(|(wss, _)| wss.as_ref().iter().map(|ws| ws.as_slice()))
|
||||||
|
.flatten()
|
||||||
|
.collect::<Vec<&[Uuid]>>();
|
||||||
|
write_sets.sort();
|
||||||
|
write_sets.dedup();
|
||||||
|
let mut write_set_index = HashMap::<&Uuid, Vec<usize>>::new();
|
||||||
|
for (i, write_set) in write_sets.iter().enumerate() {
|
||||||
|
for node in write_set.iter() {
|
||||||
|
write_set_index.entry(node).or_default().push(i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let call_futures = call_list.drain().map(|(node, entries)| async move {
|
// Build a map of all nodes to the entries that must be sent to that node.
|
||||||
let rpc = TableRpc::<F>::Update(entries);
|
let mut call_list: HashMap<Uuid, Vec<_>> = HashMap::new();
|
||||||
|
for (write_sets, entry_enc) in entries_vec.iter() {
|
||||||
|
for write_set in write_sets.as_ref().iter() {
|
||||||
|
for node in write_set.iter() {
|
||||||
|
call_list.entry(*node).or_default().push(entry_enc.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let resp = self
|
// Build futures to actually perform each of the corresponding RPC calls
|
||||||
.system
|
let call_count = call_list.len();
|
||||||
.rpc_helper()
|
let call_futures = call_list.into_iter().map(|(node, entries)| {
|
||||||
.call(
|
let this = self.clone();
|
||||||
&self.endpoint,
|
let tracer = opentelemetry::global::tracer("garage");
|
||||||
node,
|
let span = tracer.start(format!("RPC to {:?}", node));
|
||||||
rpc,
|
let fut = async move {
|
||||||
RequestStrategy::with_priority(PRIO_NORMAL),
|
let rpc = TableRpc::<F>::Update(entries);
|
||||||
)
|
let resp = this
|
||||||
.await?;
|
.system
|
||||||
Ok::<_, Error>((node, resp))
|
.rpc_helper()
|
||||||
|
.call(
|
||||||
|
&this.endpoint,
|
||||||
|
node,
|
||||||
|
rpc,
|
||||||
|
RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(quorum),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
(node, resp)
|
||||||
|
};
|
||||||
|
fut.with_context(Context::current_with_span(span))
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Run all requests in parallel thanks to FuturesUnordered, and collect results.
|
||||||
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
|
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
|
||||||
|
let mut set_counters = vec![(0, 0); write_sets.len()];
|
||||||
|
let mut successes = 0;
|
||||||
let mut errors = vec![];
|
let mut errors = vec![];
|
||||||
|
|
||||||
while let Some(resp) = resps.next().await {
|
while let Some((node, resp)) = resps.next().await {
|
||||||
if let Err(e) = resp {
|
match resp {
|
||||||
errors.push(e);
|
Ok(_) => {
|
||||||
|
successes += 1;
|
||||||
|
for set in write_set_index.get(&node).unwrap().iter() {
|
||||||
|
set_counters[*set].0 += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
errors.push(e);
|
||||||
|
for set in write_set_index.get(&node).unwrap().iter() {
|
||||||
|
set_counters[*set].1 += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if set_counters.iter().all(|(ok_cnt, _)| *ok_cnt >= quorum) {
|
||||||
|
// Success
|
||||||
|
|
||||||
|
// Continue all other requests in background
|
||||||
|
tokio::spawn(async move {
|
||||||
|
resps.collect::<Vec<(Uuid, Result<_, _>)>>().await;
|
||||||
|
});
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
if set_counters
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.any(|(i, (_, err_cnt))| err_cnt + quorum > write_sets[i].len())
|
||||||
|
{
|
||||||
|
// Too many errors in this set, we know we won't get a quorum
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if errors.len() > self.data.replication.max_write_errors() {
|
|
||||||
Err(Error::Message("Too many errors".into()))
|
// Failure, could not get quorum within at least one set
|
||||||
} else {
|
let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
|
||||||
Ok(())
|
Err(Error::Quorum(
|
||||||
}
|
quorum,
|
||||||
|
Some(write_sets.len()),
|
||||||
|
successes,
|
||||||
|
call_count,
|
||||||
|
errors,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get(
|
pub async fn get(
|
||||||
|
|
Loading…
Reference in a new issue