From dc5ec4ecf9df1664d79200dc9c41f86173ef7e07 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 1 Mar 2022 11:52:12 +0100 Subject: [PATCH 1/5] Add appropriate fsync() calls in write_block to ensure that data is persisted properly --- src/model/block.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/model/block.rs b/src/model/block.rs index ec1890bf..c7c178af 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -862,9 +862,11 @@ impl BlockManagerLocked { let data = data.inner_buffer(); let mut path = mgr.block_dir(hash); - fs::create_dir_all(&path).await?; - + let directory = path.clone(); path.push(hex::encode(hash)); + + fs::create_dir_all(&directory).await?; + let to_delete = match (mgr.is_block_compressed(hash).await, compressed) { (Ok(true), _) => return Ok(BlockRpc::Ok), (Ok(false), false) => return Ok(BlockRpc::Ok), @@ -885,6 +887,7 @@ impl BlockManagerLocked { path2.set_extension("tmp"); let mut f = fs::File::create(&path2).await?; f.write_all(data).await?; + f.sync_all().await?; drop(f); fs::rename(path2, path).await?; @@ -892,6 +895,14 @@ impl BlockManagerLocked { fs::remove_file(to_delete).await?; } + let dir = fs::OpenOptions::new() + .read(true) + .mode(0) + .open(directory) + .await?; + dir.sync_all().await?; + drop(dir); + Ok(BlockRpc::Ok) } -- 2.43.0 From f7e6f4616f79726674d0fa5e1b19b06cacdc3a2a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 1 Mar 2022 11:57:18 +0100 Subject: [PATCH 2/5] Spawn a single resync worker --- src/model/block.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/model/block.rs b/src/model/block.rs index c7c178af..f0814eda 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -38,7 +38,6 @@ use crate::garage::Garage; /// Size under which data will be stored inlined in database instead of as files pub const INLINE_THRESHOLD: usize = 3072; -pub const BACKGROUND_WORKERS: u64 = 1; pub const BACKGROUND_TRANQUILITY: u32 = 2; // Timeout for RPCs that read and write blocks to remote nodes @@ -512,17 +511,14 @@ impl BlockManager { // ---- Resync loop ---- pub fn spawn_background_worker(self: Arc) { - // Launch n simultaneous workers for background resync loop preprocessing - for i in 0..BACKGROUND_WORKERS { - let bm2 = self.clone(); - let background = self.system.background.clone(); - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(10 * (i + 1))).await; - background.spawn_worker(format!("block resync worker {}", i), move |must_exit| { - bm2.resync_loop(must_exit) - }); + // Launch a background workers for background resync loop processing + let background = self.system.background.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(10)).await; + background.spawn_worker("block resync worker".into(), move |must_exit| { + self.resync_loop(must_exit) }); - } + }); } fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> { -- 2.43.0 From d78bf379fb85c0264c9971a26724f8b933a234ee Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 1 Mar 2022 14:55:37 +0100 Subject: [PATCH 3/5] Fix resync queue to not drop items --- src/model/block.rs | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/src/model/block.rs b/src/model/block.rs index f0814eda..8329bb6f 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -562,9 +562,12 @@ impl BlockManager { } async fn resync_iter(&self, must_exit: &mut watch::Receiver) -> Result { - if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { + if let Some(first_pair_res) = self.resync_queue.iter().next() { + let (time_bytes, hash_bytes) = first_pair_res?; + let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); let now = now_msec(); + if now >= time_msec { let hash = Hash::try_from(&hash_bytes[..]).unwrap(); @@ -575,6 +578,9 @@ impl BlockManager { // don't do resync and return early, but still // make sure the item is still in queue at expected time self.put_to_resync_at(&hash, ec.next_try())?; + // ec.next_try() > now >= time_msec, so this remove + // is not removing the one we added just above + self.resync_queue.remove(time_bytes)?; return Ok(false); } } @@ -605,20 +611,25 @@ impl BlockManager { warn!("Error when resyncing {:?}: {}", hash, e); let err_counter = match self.resync_errors.get(hash.as_slice())? { - Some(ec) => ErrorCounter::decode(ec).add1(), - None => ErrorCounter::new(), + Some(ec) => ErrorCounter::decode(ec).add1(now + 1), + None => ErrorCounter::new(now + 1), }; - self.put_to_resync_at(&hash, err_counter.next_try())?; self.resync_errors .insert(hash.as_slice(), err_counter.encode())?; + + self.put_to_resync_at(&hash, err_counter.next_try())?; + // err_counter.next_try() >= now + 1 > now, + // the entry we remove from the queue is not + // the entry we inserted with put_to_resync_at + self.resync_queue.remove(time_bytes)?; } else { self.resync_errors.remove(hash.as_slice())?; + self.resync_queue.remove(time_bytes)?; } Ok(true) } else { - self.resync_queue.insert(time_bytes, hash_bytes)?; let delay = tokio::time::sleep(Duration::from_millis(time_msec - now)); select! { _ = delay.fuse() => {}, @@ -1044,19 +1055,13 @@ struct ErrorCounter { last_try: u64, } -impl Default for ErrorCounter { - fn default() -> Self { +impl ErrorCounter { + fn new(now: u64) -> Self { Self { errors: 1, - last_try: now_msec(), + last_try: now, } } -} - -impl ErrorCounter { - fn new() -> Self { - Self::default() - } fn decode(data: sled::IVec) -> Self { Self { @@ -1072,10 +1077,10 @@ impl ErrorCounter { .concat() } - fn add1(self) -> Self { + fn add1(self, now: u64) -> Self { Self { errors: self.errors + 1, - last_try: now_msec(), + last_try: now, } } -- 2.43.0 From 0af314b295f70fdf107524b08063f4d36fb4eeb6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 14 Mar 2022 11:54:00 +0100 Subject: [PATCH 4/5] Add comment for fsync --- src/model/block.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/model/block.rs b/src/model/block.rs index 8329bb6f..a41daa64 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -902,6 +902,11 @@ impl BlockManagerLocked { fs::remove_file(to_delete).await?; } + // We want to ensure that when this function returns, data is properly persisted + // to disk. The first step is the sync_all above that does an fsync on the data file. + // Now, we do an fsync on the containing directory, to ensure that the rename + // is persisted properly. See: + // http://thedjbway.b0llix.net/qmail/syncdir.html let dir = fs::OpenOptions::new() .read(true) .mode(0) -- 2.43.0 From ba6b56ae68d5842d814769418d484093865261aa Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 14 Mar 2022 12:00:23 +0100 Subject: [PATCH 5/5] Fix some new clippy lints --- src/api/s3_cors.rs | 7 +------ src/api/s3_list.rs | 6 +++--- src/api/s3_website.rs | 3 +-- src/api/signature/payload.rs | 2 +- src/garage/main.rs | 2 +- src/garage/tests/bucket.rs | 4 ++-- src/garage/tests/list.rs | 4 ++-- src/garage/tests/multipart.rs | 4 ++-- src/model/helper/bucket.rs | 6 ++---- src/model/key_table.rs | 3 +-- src/rpc/consul.rs | 6 ++---- src/rpc/kubernetes.rs | 6 ++---- src/rpc/rpc_helper.rs | 3 +-- src/web/web_server.rs | 3 +-- 14 files changed, 22 insertions(+), 37 deletions(-) diff --git a/src/api/s3_cors.rs b/src/api/s3_cors.rs index e3deaeda..ab77e23a 100644 --- a/src/api/s3_cors.rs +++ b/src/api/s3_cors.rs @@ -200,12 +200,7 @@ pub fn find_matching_cors_rule<'a>( None => vec![], }; return Ok(cors_config.iter().find(|rule| { - cors_rule_matches( - rule, - origin, - &req.method().to_string(), - request_headers.iter(), - ) + cors_rule_matches(rule, origin, req.method().as_ref(), request_headers.iter()) })); } } diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index 92998159..5852fc1b 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -1042,12 +1042,12 @@ mod tests { query.common.prefix = "a/".to_string(); assert_eq!( - common_prefix(&objs.get(0).unwrap(), &query.common), + common_prefix(objs.get(0).unwrap(), &query.common), Some("a/b/") ); query.common.prefix = "a/b/".to_string(); - assert_eq!(common_prefix(&objs.get(0).unwrap(), &query.common), None); + assert_eq!(common_prefix(objs.get(0).unwrap(), &query.common), None); } #[test] @@ -1272,7 +1272,7 @@ mod tests { Version { bucket_id: uuid, key: "a".to_string(), - uuid: uuid, + uuid, deleted: false.into(), blocks: crdt::Map::::from_iter(blocks), parts_etags: crdt::Map::::from_iter(etags), diff --git a/src/api/s3_website.rs b/src/api/s3_website.rs index 9d1da905..b464dd45 100644 --- a/src/api/s3_website.rs +++ b/src/api/s3_website.rs @@ -259,8 +259,7 @@ impl RoutingRuleInner { let has_prefix = self .condition .as_ref() - .map(|c| c.prefix.as_ref()) - .flatten() + .and_then(|c| c.prefix.as_ref()) .is_some(); self.redirect.validate(has_prefix) } diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs index 85687a23..88b58922 100644 --- a/src/api/signature/payload.rs +++ b/src/api/signature/payload.rs @@ -51,7 +51,7 @@ pub async fn check_payload_signature( let canonical_request = canonical_request( request.method(), - &request.uri().path().to_string(), + request.uri().path(), &canonical_query_string(request.uri()), &headers, &authorization.signed_headers, diff --git a/src/garage/main.rs b/src/garage/main.rs index 08ec912b..e898e680 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -115,7 +115,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { } else { let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir) .err_context(READ_KEY_ERROR)?; - if let Some(a) = config.as_ref().map(|c| c.rpc_public_addr).flatten() { + if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr) { (node_id, a) } else { let default_addr = SocketAddr::new( diff --git a/src/garage/tests/bucket.rs b/src/garage/tests/bucket.rs index 1a534042..ff5cc8da 100644 --- a/src/garage/tests/bucket.rs +++ b/src/garage/tests/bucket.rs @@ -27,7 +27,7 @@ async fn test_bucket_all() { .buckets .as_ref() .unwrap() - .into_iter() + .iter() .filter(|x| x.name.as_ref().is_some()) .find(|x| x.name.as_ref().unwrap() == "hello") .is_some()); @@ -79,7 +79,7 @@ async fn test_bucket_all() { .buckets .as_ref() .unwrap() - .into_iter() + .iter() .filter(|x| x.name.as_ref().is_some()) .find(|x| x.name.as_ref().unwrap() == "hello") .is_none()); diff --git a/src/garage/tests/list.rs b/src/garage/tests/list.rs index 6e092a3f..bb03f250 100644 --- a/src/garage/tests/list.rs +++ b/src/garage/tests/list.rs @@ -527,8 +527,8 @@ async fn test_listmultipart() { upnext = r.next_upload_id_marker; loopcnt += 1; - upcnt += r.uploads.unwrap_or(vec![]).len(); - pfxcnt += r.common_prefixes.unwrap_or(vec![]).len(); + upcnt += r.uploads.unwrap_or_default().len(); + pfxcnt += r.common_prefixes.unwrap_or_default().len(); if next.is_none() { break; diff --git a/src/garage/tests/multipart.rs b/src/garage/tests/multipart.rs index 095c9d34..7fec4de6 100644 --- a/src/garage/tests/multipart.rs +++ b/src/garage/tests/multipart.rs @@ -124,7 +124,7 @@ async fn test_uploadlistpart() { assert!(r.part_number_marker.is_none()); assert!(r.next_part_number_marker.is_some()); - assert_eq!(r.max_parts, 1 as i32); + assert_eq!(r.max_parts, 1_i32); assert!(r.is_truncated); assert_eq!(r.key.unwrap(), "a"); assert_eq!(r.upload_id.unwrap().as_str(), uid.as_str()); @@ -146,7 +146,7 @@ async fn test_uploadlistpart() { r2.part_number_marker.as_ref().unwrap(), r.next_part_number_marker.as_ref().unwrap() ); - assert_eq!(r2.max_parts, 1 as i32); + assert_eq!(r2.max_parts, 1_i32); assert!(r2.is_truncated); assert_eq!(r2.key.unwrap(), "a"); assert_eq!(r2.upload_id.unwrap().as_str(), uid.as_str()); diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 92b9f4cd..706faf26 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -30,8 +30,7 @@ impl<'a> BucketHelper<'a> { // the AWS spec, and hex-encoded UUIDs are 64 chars long. let hexbucket = hex::decode(bucket_name.as_str()) .ok() - .map(|by| Uuid::try_from(&by)) - .flatten(); + .and_then(|by| Uuid::try_from(&by)); if let Some(bucket_id) = hexbucket { Ok(self .0 @@ -46,8 +45,7 @@ impl<'a> BucketHelper<'a> { .bucket_alias_table .get(&EmptyKey, bucket_name) .await? - .map(|x| *x.state.get()) - .flatten()) + .and_then(|x| *x.state.get())) } } diff --git a/src/model/key_table.rs b/src/model/key_table.rs index d5e30f3f..330e83f0 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -106,8 +106,7 @@ impl Key { /// Get permissions for a bucket pub fn bucket_permissions(&self, bucket: &Uuid) -> BucketKeyPerm { self.params() - .map(|params| params.authorized_buckets.get(bucket)) - .flatten() + .and_then(|params| params.authorized_buckets.get(bucket)) .cloned() .unwrap_or(BucketKeyPerm::NO_PERMISSIONS) } diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index e70288dd..15acbcef 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -51,10 +51,8 @@ pub async fn get_consul_nodes( let pubkey = ent .node_meta .get("pubkey") - .map(|k| hex::decode(&k).ok()) - .flatten() - .map(|k| NodeID::from_slice(&k[..])) - .flatten(); + .and_then(|k| hex::decode(&k).ok()) + .and_then(|k| NodeID::from_slice(&k[..])); if let (Some(ip), Some(pubkey)) = (ip, pubkey) { ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); } else { diff --git a/src/rpc/kubernetes.rs b/src/rpc/kubernetes.rs index 8c0d6cdf..272d9162 100644 --- a/src/rpc/kubernetes.rs +++ b/src/rpc/kubernetes.rs @@ -63,10 +63,8 @@ pub async fn get_kubernetes_nodes( let pubkey = &node .metadata .name - .map(|k| hex::decode(&k).ok()) - .flatten() - .map(|k| NodeID::from_slice(&k[..])) - .flatten(); + .and_then(|k| hex::decode(&k).ok()) + .and_then(|k| NodeID::from_slice(&k[..])); if let Some(pubkey) = pubkey { ret.push((*pubkey, SocketAddr::new(node.spec.address, node.spec.port))) diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 1b351024..34717d3b 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -322,8 +322,7 @@ impl RpcHelper { let peer_avg_ping = peer_list .iter() .find(|x| x.id.as_ref() == to.as_slice()) - .map(|pi| pi.avg_ping) - .flatten() + .and_then(|pi| pi.avg_ping) .unwrap_or_else(|| Duration::from_secs(1)); ( to != self.0.our_node_id, diff --git a/src/web/web_server.rs b/src/web/web_server.rs index c51347a3..c3d691d0 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -175,8 +175,7 @@ async fn serve_file(garage: Arc, req: &Request) -> Result