diff --git a/.drone.yml b/.drone.yml index 1c2362ff..07f50771 100644 --- a/.drone.yml +++ b/.drone.yml @@ -36,8 +36,19 @@ steps: branch: - nonexistent_skip_this_step + - name: code quality + image: superboum/garage_builder_amd64:4 + volumes: + - name: cargo_home + path: /drone/cargo + environment: + CARGO_HOME: /drone/cargo + commands: + - cargo fmt -- --check + - cargo clippy -- --deny warnings + - name: build - image: superboum/garage_builder_amd64:3 + image: superboum/garage_builder_amd64:4 volumes: - name: cargo_home path: /drone/cargo @@ -45,11 +56,10 @@ steps: CARGO_HOME: /drone/cargo commands: - pwd - - cargo fmt -- --check - cargo build - name: cargo-test - image: superboum/garage_builder_amd64:3 + image: superboum/garage_builder_amd64:4 volumes: - name: cargo_home path: /drone/cargo @@ -85,7 +95,7 @@ steps: - nonexistent_skip_this_step - name: smoke-test - image: superboum/garage_builder_amd64:3 + image: superboum/garage_builder_amd64:4 volumes: - name: cargo_home path: /drone/cargo @@ -129,6 +139,6 @@ steps: --- kind: signature -hmac: d584c2a15ede6d5702fbe27ae5ae2b2bf7a04461ae7aed2d53cbda83b7fd503e +hmac: e919f8a66d20ebfeeec56b291a8a0fdd59a482601da987fcf533d96d24768744 ... diff --git a/script/builder_image/Dockerfile b/script/builder_image/Dockerfile index 00dea191..908c7e3c 100644 --- a/script/builder_image/Dockerfile +++ b/script/builder_image/Dockerfile @@ -3,5 +3,5 @@ RUN apt-get update && \ apt-get install --yes libsodium-dev awscli python-pip wget rclone openssl socat && \ rm -rf /var/lib/apt/lists/* RUN wget https://dl.min.io/client/mc/release/linux-amd64/mc -O /usr/local/bin/mc && chmod +x /usr/local/bin/mc -RUN rustup component add rustfmt +RUN rustup component add rustfmt clippy RUN pip install s3cmd diff --git a/src/api/api_server.rs b/src/api/api_server.rs index ab8bd736..8f5ccfd2 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -90,9 +90,9 @@ async fn handler_inner(garage: Arc, req: Request) -> Result api_key.allow_write(&bucket), }; if !allowed { - return Err(Error::Forbidden(format!( - "Operation is not allowed for this key." - ))); + return Err(Error::Forbidden( + "Operation is not allowed for this key.".to_string(), + )); } let mut params = HashMap::new(); @@ -104,16 +104,16 @@ async fn handler_inner(garage: Arc, req: Request) -> Result { + match *req.method() { + Method::HEAD => { // HeadObject query Ok(handle_head(garage, &req, &bucket, &key).await?) } - &Method::GET => { + Method::GET => { // GetObject query Ok(handle_get(garage, &req, &bucket, &key).await?) } - &Method::PUT => { + Method::PUT => { if params.contains_key(&"partnumber".to_string()) && params.contains_key(&"uploadid".to_string()) { @@ -152,7 +152,7 @@ async fn handler_inner(garage: Arc, req: Request) -> Result { + Method::DELETE => { if params.contains_key(&"uploadid".to_string()) { // AbortMultipartUpload query let upload_id = params.get("uploadid").unwrap(); @@ -162,7 +162,7 @@ async fn handler_inner(garage: Arc, req: Request) -> Result { + Method::POST => { if params.contains_key(&"uploads".to_string()) { // CreateMultipartUpload call Ok(handle_create_multipart_upload(garage, &req, &bucket, &key).await?) @@ -179,16 +179,16 @@ async fn handler_inner(garage: Arc, req: Request) -> Result Err(Error::BadRequest(format!("Invalid method"))), + _ => Err(Error::BadRequest("Invalid method".to_string())), } } else { - match req.method() { - &Method::PUT => { + match *req.method() { + Method::PUT => { // CreateBucket // If we're here, the bucket already exists, so just answer ok debug!( @@ -203,19 +203,19 @@ async fn handler_inner(garage: Arc, req: Request) -> Result { + Method::HEAD => { // HeadBucket let empty_body: Body = Body::from(vec![]); let response = Response::builder().body(empty_body).unwrap(); Ok(response) } - &Method::DELETE => { + Method::DELETE => { // DeleteBucket query Err(Error::Forbidden( "Cannot delete buckets using S3 api, please talk to Garage directly".into(), )) } - &Method::GET => { + Method::GET => { if params.contains_key("location") { // GetBucketLocation call Ok(handle_get_bucket_location(garage)?) @@ -225,7 +225,7 @@ async fn handler_inner(garage: Arc, req: Request) -> Result { + Method::POST => { if params.contains_key(&"delete".to_string()) { // DeleteObjects Ok(handle_delete_objects(garage, bucket, req, content_sha256).await?) @@ -235,10 +235,10 @@ async fn handler_inner(garage: Arc, req: Request) -> Result") ); - Err(Error::BadRequest(format!("Unsupported call"))) + Err(Error::BadRequest("Unsupported call".to_string())) } } - _ => Err(Error::BadRequest(format!("Invalid method"))), + _ => Err(Error::BadRequest("Invalid method".to_string())), } } } @@ -253,7 +253,7 @@ fn parse_bucket_key(path: &str) -> Result<(&str, Option<&str>), Error> { let (bucket, key) = match path.find('/') { Some(i) => { let key = &path[i + 1..]; - if key.len() > 0 { + if !key.is_empty() { (&path[..i], Some(key)) } else { (&path[..i], None) @@ -261,8 +261,8 @@ fn parse_bucket_key(path: &str) -> Result<(&str, Option<&str>), Error> { } None => (path, None), }; - if bucket.len() == 0 { - return Err(Error::BadRequest(format!("No bucket specified"))); + if bucket.is_empty() { + return Err(Error::BadRequest("No bucket specified".to_string())); } Ok((bucket, key)) } diff --git a/src/api/error.rs b/src/api/error.rs index a3cdfdbd..76a1211e 100644 --- a/src/api/error.rs +++ b/src/api/error.rs @@ -21,7 +21,7 @@ pub enum Error { /// Error related to HTTP #[error(display = "Internal error (HTTP error): {}", _0)] - HTTP(#[error(source)] http::Error), + Http(#[error(source)] http::Error), // Category: cannot process /// No proper api key was used, or the signature was invalid @@ -39,11 +39,11 @@ pub enum Error { // Category: bad request /// The request contained an invalid UTF-8 sequence in its path or in other parameters #[error(display = "Invalid UTF-8: {}", _0)] - InvalidUTF8Str(#[error(source)] std::str::Utf8Error), + InvalidUtf8Str(#[error(source)] std::str::Utf8Error), /// The request used an invalid path #[error(display = "Invalid UTF-8: {}", _0)] - InvalidUTF8String(#[error(source)] std::string::FromUtf8Error), + InvalidUtf8String(#[error(source)] std::string::FromUtf8Error), /// Some base64 encoded data was badly encoded #[error(display = "Invalid base64: {}", _0)] @@ -51,7 +51,7 @@ pub enum Error { /// The client sent invalid XML data #[error(display = "Invalid XML: {}", _0)] - InvalidXML(String), + InvalidXml(String), /// The client sent a header with invalid value #[error(display = "Invalid header value: {}", _0)] @@ -68,7 +68,7 @@ pub enum Error { impl From for Error { fn from(err: roxmltree::Error) -> Self { - Self::InvalidXML(format!("{}", err)) + Self::InvalidXml(format!("{}", err)) } } @@ -78,8 +78,8 @@ impl Error { match self { Error::NotFound => StatusCode::NOT_FOUND, Error::Forbidden(_) => StatusCode::FORBIDDEN, - Error::InternalError(GarageError::RPC(_)) => StatusCode::SERVICE_UNAVAILABLE, - Error::InternalError(_) | Error::Hyper(_) | Error::HTTP(_) => { + Error::InternalError(GarageError::Rpc(_)) => StatusCode::SERVICE_UNAVAILABLE, + Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => { StatusCode::INTERNAL_SERVER_ERROR } _ => StatusCode::BAD_REQUEST, @@ -91,8 +91,8 @@ impl Error { Error::NotFound => "NoSuchKey", Error::Forbidden(_) => "AccessDenied", Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed", - Error::InternalError(GarageError::RPC(_)) => "ServiceUnavailable", - Error::InternalError(_) | Error::Hyper(_) | Error::HTTP(_) => "InternalError", + Error::InternalError(GarageError::Rpc(_)) => "ServiceUnavailable", + Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => "InternalError", _ => "InvalidRequest", } } @@ -134,7 +134,7 @@ impl OkOrBadRequest for Option { fn ok_or_bad_request(self, reason: &'static str) -> Result { match self { Some(x) => Ok(x), - None => Err(Error::BadRequest(format!("{}", reason))), + None => Err(Error::BadRequest(reason.to_string())), } } } @@ -166,10 +166,9 @@ impl OkOrInternalError for Option { fn ok_or_internal_error(self, reason: &'static str) -> Result { match self { Some(x) => Ok(x), - None => Err(Error::InternalError(GarageError::Message(format!( - "{}", - reason - )))), + None => Err(Error::InternalError(GarageError::Message( + reason.to_string(), + ))), } } } diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs index 187fe347..7069489b 100644 --- a/src/api/s3_copy.rs +++ b/src/api/s3_copy.rs @@ -33,8 +33,7 @@ pub async fn handle_copy( .versions() .iter() .rev() - .filter(|v| v.is_complete()) - .next() + .find(|v| v.is_complete()) .ok_or(Error::NotFound)?; let source_last_state = match &source_last_v.state { diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs index 05387403..9d2a67f5 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3_delete.rs @@ -17,17 +17,19 @@ async fn handle_delete_internal( garage: &Garage, bucket: &str, key: &str, -) -> Result<(UUID, UUID), Error> { +) -> Result<(Uuid, Uuid), Error> { let object = garage .object_table .get(&bucket.to_string(), &key.to_string()) .await? .ok_or(Error::NotFound)?; // No need to delete - let interesting_versions = object.versions().iter().filter(|v| match v.state { - ObjectVersionState::Aborted => false, - ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false, - _ => true, + let interesting_versions = object.versions().iter().filter(|v| { + !matches!( + v.state, + ObjectVersionState::Aborted + | ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) + ) }); let mut version_to_delete = None; diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs index 15c0ed0a..b804d8ee 100644 --- a/src/api/s3_get.rs +++ b/src/api/s3_get.rs @@ -28,7 +28,7 @@ fn object_headers( version_meta.headers.content_type.to_string(), ) .header("Last-Modified", date_str) - .header("Accept-Ranges", format!("bytes")); + .header("Accept-Ranges", "bytes".to_string()); if !version_meta.etag.is_empty() { resp = resp.header("ETag", format!("\"{}\"", version_meta.etag)); @@ -97,8 +97,7 @@ pub async fn handle_head( .versions() .iter() .rev() - .filter(|v| v.is_data()) - .next() + .find(|v| v.is_data()) .ok_or(Error::NotFound)?; let version_meta = match &version.state { @@ -137,8 +136,7 @@ pub async fn handle_get( .versions() .iter() .rev() - .filter(|v| v.is_complete()) - .next() + .find(|v| v.is_complete()) .ok_or(Error::NotFound)?; let last_v_data = match &last_v.state { @@ -160,7 +158,9 @@ pub async fn handle_get( let range_str = range.to_str()?; let mut ranges = http_range::HttpRange::parse(range_str, last_v_meta.size)?; if ranges.len() > 1 { - return Err(Error::BadRequest(format!("Multiple ranges not supported"))); + return Err(Error::BadRequest( + "Multiple ranges not supported".to_string(), + )); } else { ranges.pop() } @@ -236,7 +236,7 @@ async fn handle_get_range( end: u64, ) -> Result, Error> { if end > version_meta.size { - return Err(Error::BadRequest(format!("Range not included in file"))); + return Err(Error::BadRequest("Range not included in file".to_string())); } let resp_builder = object_headers(version, version_meta) @@ -282,7 +282,7 @@ async fn handle_get_range( } // Keep only blocks that have an intersection with the requested range if true_offset < end && true_offset + b.size > begin { - blocks.push((b.clone(), true_offset)); + blocks.push((*b, true_offset)); } true_offset += b.size; } @@ -303,9 +303,9 @@ async fn handle_get_range( } else { end - true_offset }; - Result::::Ok(Bytes::from( + Result::::Ok( data.slice(start_in_block as usize..end_in_block as usize), - )) + ) } }) .buffered(2); diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index 4d6c32bc..80fefd56 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -50,7 +50,7 @@ pub fn parse_list_objects_query( .ok_or_bad_request("Invalid value for max-keys") }) .unwrap_or(Ok(1000))?, - prefix: params.get("prefix").cloned().unwrap_or(String::new()), + prefix: params.get("prefix").cloned().unwrap_or_default(), marker: params.get("marker").cloned(), continuation_token: params.get("continuation-token").cloned(), start_after: params.get("start-after").cloned(), @@ -72,10 +72,13 @@ pub async fn handle_list( if let Some(ct) = &query.continuation_token { String::from_utf8(base64::decode(ct.as_bytes())?)? } else { - query.start_after.clone().unwrap_or(query.prefix.clone()) + query + .start_after + .clone() + .unwrap_or_else(|| query.prefix.clone()) } } else { - query.marker.clone().unwrap_or(query.prefix.clone()) + query.marker.clone().unwrap_or_else(|| query.prefix.clone()) }; debug!( @@ -155,7 +158,7 @@ pub async fn handle_list( truncated = None; break 'query_loop; } - if objects.len() > 0 { + if !objects.is_empty() { next_chunk_start = objects[objects.len() - 1].key.clone(); } } diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index c4e3b818..bb6cf579 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -46,7 +46,7 @@ pub async fn handle_put( let body = req.into_body(); let mut chunker = BodyChunker::new(body, garage.config.block_size); - let first_block = chunker.next().await?.unwrap_or(vec![]); + let first_block = chunker.next().await?.unwrap_or_default(); // If body is small enough, store it directly in the object table // as "inline data". We can then return immediately. @@ -160,16 +160,18 @@ fn ensure_checksum_matches( ) -> Result<(), Error> { if let Some(expected_sha256) = content_sha256 { if expected_sha256 != data_sha256sum { - return Err(Error::BadRequest(format!( - "Unable to validate x-amz-content-sha256" - ))); + return Err(Error::BadRequest( + "Unable to validate x-amz-content-sha256".to_string(), + )); } else { trace!("Successfully validated x-amz-content-sha256"); } } if let Some(expected_md5) = content_md5 { if expected_md5.trim_matches('"') != base64::encode(data_md5sum) { - return Err(Error::BadRequest(format!("Unable to validate content-md5"))); + return Err(Error::BadRequest( + "Unable to validate content-md5".to_string(), + )); } else { trace!("Successfully validated content-md5"); } @@ -291,7 +293,7 @@ impl BodyChunker { self.read_all = true; } } - if self.buf.len() == 0 { + if self.buf.is_empty() { Ok(None) } else if self.buf.len() <= self.block_size { let block = self.buf.drain(..).collect::>(); @@ -303,7 +305,7 @@ impl BodyChunker { } } -pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response { +pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response { Response::builder() .header("x-amz-version-id", hex::encode(version_uuid)) .header("ETag", format!("\"{}\"", md5sum_hex)) @@ -387,8 +389,8 @@ pub async fn handle_put_part( futures::try_join!(garage.object_table.get(&bucket, &key), chunker.next(),)?; // Check object is valid and multipart block can be accepted - let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?; - let object = object.ok_or(Error::BadRequest(format!("Object not found")))?; + let first_block = first_block.ok_or_else(|| Error::BadRequest("Empty body".to_string()))?; + let object = object.ok_or_else(|| Error::BadRequest("Object not found".to_string()))?; if !object .versions() @@ -462,21 +464,21 @@ pub async fn handle_complete_multipart_upload( garage.version_table.get(&version_uuid, &EmptyKey), )?; - let object = object.ok_or(Error::BadRequest(format!("Object not found")))?; + let object = object.ok_or_else(|| Error::BadRequest("Object not found".to_string()))?; let mut object_version = object .versions() .iter() .find(|v| v.uuid == version_uuid && v.is_uploading()) .cloned() - .ok_or(Error::BadRequest(format!("Version not found")))?; + .ok_or_else(|| Error::BadRequest("Version not found".to_string()))?; - let version = version.ok_or(Error::BadRequest(format!("Version not found")))?; - if version.blocks.len() == 0 { - return Err(Error::BadRequest(format!("No data was uploaded"))); + let version = version.ok_or_else(|| Error::BadRequest("Version not found".to_string()))?; + if version.blocks.is_empty() { + return Err(Error::BadRequest("No data was uploaded".to_string())); } let headers = match object_version.state { - ObjectVersionState::Uploading(headers) => headers.clone(), + ObjectVersionState::Uploading(headers) => headers, _ => unreachable!(), }; @@ -493,7 +495,9 @@ pub async fn handle_complete_multipart_upload( .map(|x| (&x.part_number, &x.etag)) .eq(parts); if !same_parts { - return Err(Error::BadRequest(format!("We don't have the same parts"))); + return Err(Error::BadRequest( + "We don't have the same parts".to_string(), + )); } // Calculate etag of final object @@ -557,7 +561,7 @@ pub async fn handle_abort_multipart_upload( .object_table .get(&bucket.to_string(), &key.to_string()) .await?; - let object = object.ok_or(Error::BadRequest(format!("Object not found")))?; + let object = object.ok_or_else(|| Error::BadRequest("Object not found".to_string()))?; let object_version = object .versions() @@ -629,14 +633,14 @@ pub(crate) fn get_headers(req: &Request) -> Result Result { +fn decode_upload_id(id: &str) -> Result { let id_bin = hex::decode(id).ok_or_bad_request("Invalid upload ID")?; if id_bin.len() != 32 { return None.ok_or_bad_request("Invalid upload ID"); } let mut uuid = [0u8; 32]; uuid.copy_from_slice(&id_bin[..]); - Ok(UUID::from(uuid)) + Ok(Uuid::from(uuid)) } #[derive(Debug)] diff --git a/src/api/signature.rs b/src/api/signature.rs index 7fcab0f9..0f7a7437 100644 --- a/src/api/signature.rs +++ b/src/api/signature.rs @@ -43,13 +43,12 @@ pub async fn check_signature( let date = headers .get("x-amz-date") .ok_or_bad_request("Missing X-Amz-Date field")?; - let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) - .ok_or_bad_request("Invalid date")? - .into(); + let date: NaiveDateTime = + NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?; let date: DateTime = DateTime::from_utc(date, Utc); if Utc::now() - date > Duration::hours(24) { - return Err(Error::BadRequest(format!("Date is too old"))); + return Err(Error::BadRequest("Date is too old".to_string())); } let scope = format!( @@ -66,10 +65,7 @@ pub async fn check_signature( .get(&EmptyKey, &authorization.key_id) .await? .filter(|k| !k.deleted.get()) - .ok_or(Error::Forbidden(format!( - "No such key: {}", - authorization.key_id - )))?; + .ok_or_else(|| Error::Forbidden(format!("No such key: {}", authorization.key_id)))?; let canonical_request = canonical_request( request.method(), @@ -95,7 +91,7 @@ pub async fn check_signature( trace!("Canonical request: ``{}``", canonical_request); trace!("String to sign: ``{}``", string_to_sign); trace!("Expected: {}, got: {}", signature, authorization.signature); - return Err(Error::Forbidden(format!("Invalid signature"))); + return Err(Error::Forbidden("Invalid signature".to_string())); } let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" @@ -107,7 +103,7 @@ pub async fn check_signature( .ok_or_bad_request("Invalid content sha256 hash")?; Some( Hash::try_from(&bytes[..]) - .ok_or(Error::BadRequest(format!("Invalid content sha256 hash")))?, + .ok_or_else(|| Error::BadRequest("Invalid content sha256 hash".to_string()))?, ) }; @@ -175,9 +171,9 @@ fn parse_query_authorization(headers: &HashMap) -> Result, body: &[u8]) -> Resul let expected_sha256 = content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?; if expected_sha256 != sha256sum(body) { - return Err(Error::BadRequest(format!( - "Request content hash does not match signed hash" - ))); + return Err(Error::BadRequest( + "Request content hash does not match signed hash".to_string(), + )); } Ok(()) } diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index d04dd7a1..f2d11bb3 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use garage_util::error::Error; -use garage_table::crdt::CRDT; +use garage_table::crdt::Crdt; use garage_table::replication::*; use garage_table::*; @@ -25,7 +25,7 @@ pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30); pub const ADMIN_RPC_PATH: &str = "_admin"; #[derive(Debug, Serialize, Deserialize)] -pub enum AdminRPC { +pub enum AdminRpc { BucketOperation(BucketOperation), KeyOperation(KeyOperation), LaunchRepair(RepairOpt), @@ -39,35 +39,35 @@ pub enum AdminRPC { KeyInfo(Key), } -impl RpcMessage for AdminRPC {} +impl RpcMessage for AdminRpc {} pub struct AdminRpcHandler { garage: Arc, - rpc_client: Arc>, + rpc_client: Arc>, } impl AdminRpcHandler { pub fn new(garage: Arc) -> Arc { - let rpc_client = garage.system.clone().rpc_client::(ADMIN_RPC_PATH); + let rpc_client = garage.system.clone().rpc_client::(ADMIN_RPC_PATH); Arc::new(Self { garage, rpc_client }) } pub fn register_handler(self: Arc, rpc_server: &mut RpcServer) { - rpc_server.add_handler::(ADMIN_RPC_PATH.to_string(), move |msg, _addr| { + rpc_server.add_handler::(ADMIN_RPC_PATH.to_string(), move |msg, _addr| { let self2 = self.clone(); async move { match msg { - AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, - AdminRPC::KeyOperation(ko) => self2.handle_key_cmd(ko).await, - AdminRPC::LaunchRepair(opt) => self2.handle_launch_repair(opt).await, - AdminRPC::Stats(opt) => self2.handle_stats(opt).await, - _ => Err(Error::BadRPC(format!("Invalid RPC"))), + AdminRpc::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, + AdminRpc::KeyOperation(ko) => self2.handle_key_cmd(ko).await, + AdminRpc::LaunchRepair(opt) => self2.handle_launch_repair(opt).await, + AdminRpc::Stats(opt) => self2.handle_stats(opt).await, + _ => Err(Error::BadRpc("Invalid RPC".to_string())), } } }); } - async fn handle_bucket_cmd(&self, cmd: BucketOperation) -> Result { + async fn handle_bucket_cmd(&self, cmd: BucketOperation) -> Result { match cmd { BucketOperation::List => { let bucket_names = self @@ -78,17 +78,17 @@ impl AdminRpcHandler { .iter() .map(|b| b.name.to_string()) .collect::>(); - Ok(AdminRPC::BucketList(bucket_names)) + Ok(AdminRpc::BucketList(bucket_names)) } BucketOperation::Info(query) => { let bucket = self.get_existing_bucket(&query.name).await?; - Ok(AdminRPC::BucketInfo(bucket)) + Ok(AdminRpc::BucketInfo(bucket)) } BucketOperation::Create(query) => { let bucket = match self.garage.bucket_table.get(&EmptyKey, &query.name).await? { Some(mut bucket) => { if !bucket.is_deleted() { - return Err(Error::BadRPC(format!( + return Err(Error::BadRpc(format!( "Bucket {} already exists", query.name ))); @@ -101,7 +101,7 @@ impl AdminRpcHandler { None => Bucket::new(query.name.clone()), }; self.garage.bucket_table.insert(&bucket).await?; - Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name))) + Ok(AdminRpc::Ok(format!("Bucket {} was created.", query.name))) } BucketOperation::Delete(query) => { let mut bucket = self.get_existing_bucket(&query.name).await?; @@ -111,12 +111,12 @@ impl AdminRpcHandler { .get_range(&query.name, None, Some(DeletedFilter::NotDeleted), 10) .await?; if !objects.is_empty() { - return Err(Error::BadRPC(format!("Bucket {} is not empty", query.name))); + return Err(Error::BadRpc(format!("Bucket {} is not empty", query.name))); } if !query.yes { - return Err(Error::BadRPC(format!( - "Add --yes flag to really perform this operation" - ))); + return Err(Error::BadRpc( + "Add --yes flag to really perform this operation".to_string(), + )); } // --- done checking, now commit --- for (key_id, _, _) in bucket.authorized_keys() { @@ -131,7 +131,7 @@ impl AdminRpcHandler { } bucket.state.update(BucketState::Deleted); self.garage.bucket_table.insert(&bucket).await?; - Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name))) + Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name))) } BucketOperation::Allow(query) => { let key = self.get_existing_key(&query.key_pattern).await?; @@ -142,7 +142,7 @@ impl AdminRpcHandler { .await?; self.update_bucket_key(bucket, &key.key_id, allow_read, allow_write) .await?; - Ok(AdminRPC::Ok(format!( + Ok(AdminRpc::Ok(format!( "New permissions for {} on {}: read {}, write {}.", &key.key_id, &query.bucket, allow_read, allow_write ))) @@ -156,7 +156,7 @@ impl AdminRpcHandler { .await?; self.update_bucket_key(bucket, &key.key_id, allow_read, allow_write) .await?; - Ok(AdminRPC::Ok(format!( + Ok(AdminRpc::Ok(format!( "New permissions for {} on {}: read {}, write {}.", &key.key_id, &query.bucket, allow_read, allow_write ))) @@ -165,9 +165,9 @@ impl AdminRpcHandler { let mut bucket = self.get_existing_bucket(&query.bucket).await?; if !(query.allow ^ query.deny) { - return Err(Error::Message(format!( - "You must specify exactly one flag, either --allow or --deny" - ))); + return Err(Error::Message( + "You must specify exactly one flag, either --allow or --deny".to_string(), + )); } if let BucketState::Present(state) = bucket.state.get_mut() { @@ -179,7 +179,7 @@ impl AdminRpcHandler { format!("Website access denied for {}", &query.bucket) }; - Ok(AdminRPC::Ok(msg.to_string())) + Ok(AdminRpc::Ok(msg)) } else { unreachable!(); } @@ -187,7 +187,7 @@ impl AdminRpcHandler { } } - async fn handle_key_cmd(&self, cmd: KeyOperation) -> Result { + async fn handle_key_cmd(&self, cmd: KeyOperation) -> Result { match cmd { KeyOperation::List => { let key_ids = self @@ -203,29 +203,29 @@ impl AdminRpcHandler { .iter() .map(|k| (k.key_id.to_string(), k.name.get().clone())) .collect::>(); - Ok(AdminRPC::KeyList(key_ids)) + Ok(AdminRpc::KeyList(key_ids)) } KeyOperation::Info(query) => { let key = self.get_existing_key(&query.key_pattern).await?; - Ok(AdminRPC::KeyInfo(key)) + Ok(AdminRpc::KeyInfo(key)) } KeyOperation::New(query) => { let key = Key::new(query.name); self.garage.key_table.insert(&key).await?; - Ok(AdminRPC::KeyInfo(key)) + Ok(AdminRpc::KeyInfo(key)) } KeyOperation::Rename(query) => { let mut key = self.get_existing_key(&query.key_pattern).await?; key.name.update(query.new_name); self.garage.key_table.insert(&key).await?; - Ok(AdminRPC::KeyInfo(key)) + Ok(AdminRpc::KeyInfo(key)) } KeyOperation::Delete(query) => { let key = self.get_existing_key(&query.key_pattern).await?; if !query.yes { - return Err(Error::BadRPC(format!( - "Add --yes flag to really perform this operation" - ))); + return Err(Error::BadRpc( + "Add --yes flag to really perform this operation".to_string(), + )); } // --- done checking, now commit --- for (ab_name, _, _) in key.authorized_buckets.items().iter() { @@ -240,7 +240,7 @@ impl AdminRpcHandler { } let del_key = Key::delete(key.key_id.to_string()); self.garage.key_table.insert(&del_key).await?; - Ok(AdminRPC::Ok(format!( + Ok(AdminRpc::Ok(format!( "Key {} was deleted successfully.", key.key_id ))) @@ -252,11 +252,12 @@ impl AdminRpcHandler { } let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name); self.garage.key_table.insert(&imported_key).await?; - Ok(AdminRPC::KeyInfo(imported_key)) + Ok(AdminRpc::KeyInfo(imported_key)) } } } + #[allow(clippy::ptr_arg)] async fn get_existing_bucket(&self, bucket: &String) -> Result { self.garage .bucket_table @@ -264,10 +265,7 @@ impl AdminRpcHandler { .await? .filter(|b| !b.is_deleted()) .map(Ok) - .unwrap_or(Err(Error::BadRPC(format!( - "Bucket {} does not exist", - bucket - )))) + .unwrap_or_else(|| Err(Error::BadRpc(format!("Bucket {} does not exist", bucket)))) } async fn get_existing_key(&self, pattern: &str) -> Result { @@ -298,7 +296,7 @@ impl AdminRpcHandler { async fn update_bucket_key( &self, mut bucket: Bucket, - key_id: &String, + key_id: &str, allow_read: bool, allow_write: bool, ) -> Result<(), Error> { @@ -313,9 +311,9 @@ impl AdminRpcHandler { }, )); } else { - return Err(Error::Message(format!( - "Bucket is deleted in update_bucket_key" - ))); + return Err(Error::Message( + "Bucket is deleted in update_bucket_key".to_string(), + )); } self.garage.bucket_table.insert(&bucket).await?; Ok(()) @@ -325,14 +323,14 @@ impl AdminRpcHandler { async fn update_key_bucket( &self, key: &Key, - bucket: &String, + bucket: &str, allow_read: bool, allow_write: bool, ) -> Result<(), Error> { let mut key = key.clone(); let old_map = key.authorized_buckets.take_and_clear(); key.authorized_buckets.merge(&old_map.update_mutator( - bucket.clone(), + bucket.to_string(), PermissionSet { allow_read, allow_write, @@ -342,11 +340,11 @@ impl AdminRpcHandler { Ok(()) } - async fn handle_launch_repair(self: &Arc, opt: RepairOpt) -> Result { + async fn handle_launch_repair(self: &Arc, opt: RepairOpt) -> Result { if !opt.yes { - return Err(Error::BadRPC(format!( - "Please provide the --yes flag to initiate repair operations." - ))); + return Err(Error::BadRpc( + "Please provide the --yes flag to initiate repair operations.".to_string(), + )); } if opt.all_nodes { let mut opt_to_send = opt.clone(); @@ -359,17 +357,17 @@ impl AdminRpcHandler { .rpc_client .call( *node, - AdminRPC::LaunchRepair(opt_to_send.clone()), + AdminRpc::LaunchRepair(opt_to_send.clone()), ADMIN_RPC_TIMEOUT, ) .await .is_err() { - failures.push(node.clone()); + failures.push(*node); } } if failures.is_empty() { - Ok(AdminRPC::Ok(format!("Repair launched on all nodes"))) + Ok(AdminRpc::Ok("Repair launched on all nodes".to_string())) } else { Err(Error::Message(format!( "Could not launch repair on nodes: {:?} (launched successfully on other nodes)", @@ -386,14 +384,14 @@ impl AdminRpcHandler { .spawn_worker("Repair worker".into(), move |must_exit| async move { repair.repair_worker(opt, must_exit).await }); - Ok(AdminRPC::Ok(format!( + Ok(AdminRpc::Ok(format!( "Repair launched on {:?}", self.garage.system.id ))) } } - async fn handle_stats(&self, opt: StatsOpt) -> Result { + async fn handle_stats(&self, opt: StatsOpt) -> Result { if opt.all_nodes { let mut ret = String::new(); let ring = self.garage.system.ring.borrow().clone(); @@ -406,21 +404,21 @@ impl AdminRpcHandler { writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); match self .rpc_client - .call(*node, AdminRPC::Stats(opt), ADMIN_RPC_TIMEOUT) + .call(*node, AdminRpc::Stats(opt), ADMIN_RPC_TIMEOUT) .await { - Ok(AdminRPC::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(), + Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(), Ok(x) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(), Err(e) => writeln!(&mut ret, "Error: {}", e).unwrap(), } } - Ok(AdminRPC::Ok(ret)) + Ok(AdminRpc::Ok(ret)) } else { - Ok(AdminRPC::Ok(self.gather_stats_local(opt)?)) + Ok(AdminRpc::Ok(self.gather_stats_local(opt))) } } - fn gather_stats_local(&self, opt: StatsOpt) -> Result { + fn gather_stats_local(&self, opt: StatsOpt) -> String { let mut ret = String::new(); writeln!( &mut ret, @@ -445,11 +443,11 @@ impl AdminRpcHandler { writeln!(&mut ret, " {:?} {}", n, c).unwrap(); } - self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt)?; - self.gather_table_stats(&mut ret, &self.garage.key_table, &opt)?; - self.gather_table_stats(&mut ret, &self.garage.object_table, &opt)?; - self.gather_table_stats(&mut ret, &self.garage.version_table, &opt)?; - self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt)?; + self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt); + self.gather_table_stats(&mut ret, &self.garage.key_table, &opt); + self.gather_table_stats(&mut ret, &self.garage.object_table, &opt); + self.gather_table_stats(&mut ret, &self.garage.version_table, &opt); + self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt); writeln!(&mut ret, "\nBlock manager stats:").unwrap(); if opt.detailed { @@ -467,15 +465,10 @@ impl AdminRpcHandler { ) .unwrap(); - Ok(ret) + ret } - fn gather_table_stats( - &self, - to: &mut String, - t: &Arc>, - opt: &StatsOpt, - ) -> Result<(), Error> + fn gather_table_stats(&self, to: &mut String, t: &Arc>, opt: &StatsOpt) where F: TableSchema + 'static, R: TableReplication + 'static, @@ -497,6 +490,5 @@ impl AdminRpcHandler { ) .unwrap(); writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap(); - Ok(()) } } diff --git a/src/garage/cli.rs b/src/garage/cli.rs index 55cd222b..bfe7e08e 100644 --- a/src/garage/cli.rs +++ b/src/garage/cli.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; use serde::{Deserialize, Serialize}; use structopt::StructOpt; -use garage_util::data::UUID; +use garage_util::data::Uuid; use garage_util::error::Error; use garage_util::time::*; @@ -294,7 +294,7 @@ pub struct StatsOpt { pub async fn cli_cmd( cmd: Command, membership_rpc_cli: RpcAddrClient, - admin_rpc_cli: RpcAddrClient, + admin_rpc_cli: RpcAddrClient, rpc_host: SocketAddr, ) -> Result<(), Error> { match cmd { @@ -306,11 +306,11 @@ pub async fn cli_cmd( cmd_remove(membership_rpc_cli, rpc_host, remove_opt).await } Command::Bucket(bo) => { - cmd_admin(admin_rpc_cli, rpc_host, AdminRPC::BucketOperation(bo)).await + cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::BucketOperation(bo)).await } - Command::Key(ko) => cmd_admin(admin_rpc_cli, rpc_host, AdminRPC::KeyOperation(ko)).await, - Command::Repair(ro) => cmd_admin(admin_rpc_cli, rpc_host, AdminRPC::LaunchRepair(ro)).await, - Command::Stats(so) => cmd_admin(admin_rpc_cli, rpc_host, AdminRPC::Stats(so)).await, + Command::Key(ko) => cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::KeyOperation(ko)).await, + Command::Repair(ro) => cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::LaunchRepair(ro)).await, + Command::Stats(so) => cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::Stats(so)).await, _ => unreachable!(), } } @@ -385,9 +385,9 @@ pub async fn cmd_status( } pub fn find_matching_node( - cand: impl std::iter::Iterator, + cand: impl std::iter::Iterator, pattern: &str, -) -> Result { +) -> Result { let mut candidates = vec![]; for c in cand { if hex::encode(&c).starts_with(&pattern) { @@ -446,12 +446,14 @@ pub async fn cmd_configure( capacity: args .capacity .expect("Please specifiy a capacity with the -c flag"), - tag: args.tag.unwrap_or("".to_string()), + tag: args.tag.unwrap_or_default(), }, Some(old) => NetworkConfigEntry { - datacenter: args.datacenter.unwrap_or(old.datacenter.to_string()), + datacenter: args + .datacenter + .unwrap_or_else(|| old.datacenter.to_string()), capacity: args.capacity.unwrap_or(old.capacity), - tag: args.tag.unwrap_or(old.tag.to_string()), + tag: args.tag.unwrap_or_else(|| old.tag.to_string()), }, }; @@ -504,30 +506,30 @@ pub async fn cmd_remove( } pub async fn cmd_admin( - rpc_cli: RpcAddrClient, + rpc_cli: RpcAddrClient, rpc_host: SocketAddr, - args: AdminRPC, + args: AdminRpc, ) -> Result<(), Error> { match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await?? { - AdminRPC::Ok(msg) => { + AdminRpc::Ok(msg) => { println!("{}", msg); } - AdminRPC::BucketList(bl) => { + AdminRpc::BucketList(bl) => { println!("List of buckets:"); for bucket in bl { println!("{}", bucket); } } - AdminRPC::BucketInfo(bucket) => { + AdminRpc::BucketInfo(bucket) => { print_bucket_info(&bucket); } - AdminRPC::KeyList(kl) => { + AdminRpc::KeyList(kl) => { println!("List of keys:"); for key in kl { println!("{}\t{}", key.0, key.1); } } - AdminRPC::KeyInfo(key) => { + AdminRpc::KeyInfo(key) => { print_key_info(&key); } r => { diff --git a/src/garage/server.rs b/src/garage/server.rs index 97a9bec2..74b7f1ff 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -25,7 +25,7 @@ async fn shutdown_signal(send_cancel: watch::Sender) -> Result<(), Error> Ok(()) } -async fn wait_from(mut chan: watch::Receiver) -> () { +async fn wait_from(mut chan: watch::Receiver) { while !*chan.borrow() { if chan.changed().await.is_err() { return; @@ -43,7 +43,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { let db = sled::open(&db_path).expect("Unable to open sled DB"); info!("Initialize RPC server..."); - let mut rpc_server = RpcServer::new(config.rpc_bind_addr.clone(), config.rpc_tls.clone()); + let mut rpc_server = RpcServer::new(config.rpc_bind_addr, config.rpc_tls.clone()); info!("Initializing background runner..."); let (send_cancel, watch_cancel) = watch::channel(false); @@ -66,9 +66,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { let web_server = run_web_server(garage, wait_from(watch_cancel.clone())); futures::try_join!( - bootstrap.map(|rv| { + bootstrap.map(|()| { info!("Bootstrap done"); - Ok(rv) + Ok(()) }), run_rpc_server.map(|rv| { info!("RPC server exited"); diff --git a/src/model/block.rs b/src/model/block.rs index 5f428fe1..348f0711 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; @@ -137,7 +137,7 @@ impl BlockManager { Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await, Message::GetBlock(h) => self.read_block(h).await, Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply), - _ => Err(Error::BadRPC(format!("Unexpected RPC message"))), + _ => Err(Error::BadRpc("Unexpected RPC message".to_string())), } } @@ -280,8 +280,8 @@ impl BlockManager { if let Err(e) = self.resync_iter(&mut must_exit).await { warn!("Error in block resync loop: {}", e); select! { - _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => (), - _ = must_exit.changed().fuse() => (), + _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {}, + _ = must_exit.changed().fuse() => {}, } } } @@ -304,15 +304,15 @@ impl BlockManager { } else { let delay = tokio::time::sleep(Duration::from_millis(time_msec - now)); select! { - _ = delay.fuse() => (), - _ = self.resync_notify.notified().fuse() => (), - _ = must_exit.changed().fuse() => (), + _ = delay.fuse() => {}, + _ = self.resync_notify.notified().fuse() => {}, + _ = must_exit.changed().fuse() => {}, } } } else { select! { - _ = self.resync_notify.notified().fuse() => (), - _ = must_exit.changed().fuse() => (), + _ = self.resync_notify.notified().fuse() => {}, + _ = must_exit.changed().fuse() => {}, } } Ok(()) @@ -342,7 +342,7 @@ impl BlockManager { let mut who = self.replication.write_nodes(&hash); if who.len() < self.replication.write_quorum() { - return Err(Error::Message(format!("Not trying to offload block because we don't have a quorum of nodes to write to"))); + return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string())); } who.retain(|id| *id != self.system.id); @@ -362,14 +362,14 @@ impl BlockManager { } } _ => { - return Err(Error::Message(format!( - "Unexpected response to NeedBlockQuery RPC" - ))); + return Err(Error::Message( + "Unexpected response to NeedBlockQuery RPC".to_string(), + )); } } } - if need_nodes.len() > 0 { + if !need_nodes.is_empty() { trace!( "Block {:?} needed by {} nodes, sending", hash, @@ -478,7 +478,7 @@ impl BlockManager { fn repair_aux_read_dir_rec<'a>( &'a self, - path: &'a PathBuf, + path: &'a Path, must_exit: &'a watch::Receiver, ) -> BoxFuture<'a, Result<(), Error>> { // Lists all blocks on disk and adds them to the resync queue. diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs index 95451e9c..f8f529c4 100644 --- a/src/model/block_ref_table.rs +++ b/src/model/block_ref_table.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use garage_util::data::*; -use garage_table::crdt::CRDT; +use garage_table::crdt::Crdt; use garage_table::*; use crate::block::*; @@ -14,18 +14,18 @@ pub struct BlockRef { pub block: Hash, /// Id of the Version for the object containing this block, used as sorting key - pub version: UUID, + pub version: Uuid, // Keep track of deleted status /// Is the Version that contains this block deleted pub deleted: crdt::Bool, } -impl Entry for BlockRef { +impl Entry for BlockRef { fn partition_key(&self) -> &Hash { &self.block } - fn sort_key(&self) -> &UUID { + fn sort_key(&self) -> &Uuid { &self.version } fn is_tombstone(&self) -> bool { @@ -33,7 +33,7 @@ impl Entry for BlockRef { } } -impl CRDT for BlockRef { +impl Crdt for BlockRef { fn merge(&mut self, other: &Self) { self.deleted.merge(&other.deleted); } @@ -45,12 +45,12 @@ pub struct BlockRefTable { impl TableSchema for BlockRefTable { type P = Hash; - type S = UUID; + type S = Uuid; type E = BlockRef; type Filter = DeletedFilter; fn updated(&self, old: Option, new: Option) { - let block = &old.as_ref().or(new.as_ref()).unwrap().block; + let block = &old.as_ref().or_else(|| new.as_ref()).unwrap().block; let was_before = old.as_ref().map(|x| !x.deleted.get()).unwrap_or(false); let is_after = new.as_ref().map(|x| !x.deleted.get()).unwrap_or(false); if is_after && !was_before { diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 69901b8d..168ed713 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -use garage_table::crdt::CRDT; +use garage_table::crdt::Crdt; use garage_table::*; use crate::key_table::PermissionSet; @@ -15,7 +15,7 @@ pub struct Bucket { /// Name of the bucket pub name: String, /// State, and configuration if not deleted, of the bucket - pub state: crdt::LWW, + pub state: crdt::Lww, } /// State of a bucket @@ -27,7 +27,7 @@ pub enum BucketState { Present(BucketParams), } -impl CRDT for BucketState { +impl Crdt for BucketState { fn merge(&mut self, o: &Self) { match o { BucketState::Deleted => *self = BucketState::Deleted, @@ -44,34 +44,40 @@ impl CRDT for BucketState { #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct BucketParams { /// Map of key with access to the bucket, and what kind of access they give - pub authorized_keys: crdt::LWWMap, + pub authorized_keys: crdt::LwwMap, /// Is the bucket served as http - pub website: crdt::LWW, -} - -impl CRDT for BucketParams { - fn merge(&mut self, o: &Self) { - self.authorized_keys.merge(&o.authorized_keys); - self.website.merge(&o.website); - } + pub website: crdt::Lww, } impl BucketParams { /// Create an empty BucketParams with no authorized keys and no website accesss pub fn new() -> Self { BucketParams { - authorized_keys: crdt::LWWMap::new(), - website: crdt::LWW::new(false), + authorized_keys: crdt::LwwMap::new(), + website: crdt::Lww::new(false), } } } +impl Crdt for BucketParams { + fn merge(&mut self, o: &Self) { + self.authorized_keys.merge(&o.authorized_keys); + self.website.merge(&o.website); + } +} + +impl Default for BucketParams { + fn default() -> Self { + Self::new() + } +} + impl Bucket { /// Initializes a new instance of the Bucket struct pub fn new(name: String) -> Self { Bucket { name, - state: crdt::LWW::new(BucketState::Present(BucketParams::new())), + state: crdt::Lww::new(BucketState::Present(BucketParams::new())), } } @@ -99,7 +105,7 @@ impl Entry for Bucket { } } -impl CRDT for Bucket { +impl Crdt for Bucket { fn merge(&mut self, other: &Self) { self.state.merge(&other.state); } diff --git a/src/model/garage.rs b/src/model/garage.rs index 797a91e5..5c6c21f2 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -95,7 +95,7 @@ impl Garage { BlockRefTable { block_manager: block_manager.clone(), }, - data_rep_param.clone(), + data_rep_param, system.clone(), &db, "block_ref".to_string(), @@ -121,7 +121,7 @@ impl Garage { background: background.clone(), version_table: version_table.clone(), }, - meta_rep_param.clone(), + meta_rep_param, system.clone(), &db, "object".to_string(), @@ -141,7 +141,7 @@ impl Garage { info!("Initialize key_table_table..."); let key_table = Table::new( KeyTable, - control_rep_param.clone(), + control_rep_param, system.clone(), &db, "key".to_string(), @@ -152,9 +152,9 @@ impl Garage { let garage = Arc::new(Self { config, db, - system: system.clone(), - block_manager, background, + system, + block_manager, bucket_table, key_table, object_table, diff --git a/src/model/key_table.rs b/src/model/key_table.rs index ba1f6b81..a6186aa9 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -13,14 +13,14 @@ pub struct Key { pub secret_key: String, /// Name for the key - pub name: crdt::LWW, + pub name: crdt::Lww, /// Is the key deleted pub deleted: crdt::Bool, /// Buckets in which the key is authorized. Empty if `Key` is deleted // CRDT interaction: deleted implies authorized_buckets is empty - pub authorized_buckets: crdt::LWWMap, + pub authorized_buckets: crdt::LwwMap, } impl Key { @@ -31,9 +31,9 @@ impl Key { Self { key_id, secret_key, - name: crdt::LWW::new(name), + name: crdt::Lww::new(name), deleted: crdt::Bool::new(false), - authorized_buckets: crdt::LWWMap::new(), + authorized_buckets: crdt::LwwMap::new(), } } @@ -42,9 +42,9 @@ impl Key { Self { key_id: key_id.to_string(), secret_key: secret_key.to_string(), - name: crdt::LWW::new(name.to_string()), + name: crdt::Lww::new(name.to_string()), deleted: crdt::Bool::new(false), - authorized_buckets: crdt::LWWMap::new(), + authorized_buckets: crdt::LwwMap::new(), } } @@ -53,9 +53,9 @@ impl Key { Self { key_id, secret_key: "".into(), - name: crdt::LWW::new("".to_string()), + name: crdt::Lww::new("".to_string()), deleted: crdt::Bool::new(true), - authorized_buckets: crdt::LWWMap::new(), + authorized_buckets: crdt::LwwMap::new(), } } @@ -85,7 +85,7 @@ pub struct PermissionSet { pub allow_write: bool, } -impl AutoCRDT for PermissionSet { +impl AutoCrdt for PermissionSet { const WARN_IF_DIFFERENT: bool = true; } @@ -98,7 +98,7 @@ impl Entry for Key { } } -impl CRDT for Key { +impl Crdt for Key { fn merge(&mut self, other: &Self) { self.name.merge(&other.name); self.deleted.merge(&other.deleted); diff --git a/src/model/object_table.rs b/src/model/object_table.rs index b0611822..d743a2b6 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -40,6 +40,7 @@ impl Object { } /// Adds a version if it wasn't already present + #[allow(clippy::result_unit_err)] pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> { match self .versions @@ -63,7 +64,7 @@ impl Object { #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersion { /// Id of the version - pub uuid: UUID, + pub uuid: Uuid, /// Timestamp of when the object was created pub timestamp: u64, /// State of the version @@ -81,7 +82,7 @@ pub enum ObjectVersionState { Aborted, } -impl CRDT for ObjectVersionState { +impl Crdt for ObjectVersionState { fn merge(&mut self, other: &Self) { use ObjectVersionState::*; match other { @@ -114,7 +115,7 @@ pub enum ObjectVersionData { FirstBlock(ObjectVersionMeta, Hash), } -impl AutoCRDT for ObjectVersionData { +impl AutoCrdt for ObjectVersionData { const WARN_IF_DIFFERENT: bool = true; } @@ -139,24 +140,18 @@ pub struct ObjectVersionHeaders { } impl ObjectVersion { - fn cmp_key(&self) -> (u64, UUID) { + fn cmp_key(&self) -> (u64, Uuid) { (self.timestamp, self.uuid) } /// Is the object version currently being uploaded pub fn is_uploading(&self) -> bool { - match self.state { - ObjectVersionState::Uploading(_) => true, - _ => false, - } + matches!(self.state, ObjectVersionState::Uploading(_)) } /// Is the object version completely received pub fn is_complete(&self) -> bool { - match self.state { - ObjectVersionState::Complete(_) => true, - _ => false, - } + matches!(self.state, ObjectVersionState::Complete(_)) } /// Is the object version available (received and not a tombstone) @@ -183,7 +178,7 @@ impl Entry for Object { } } -impl CRDT for Object { +impl Crdt for Object { fn merge(&mut self, other: &Self) { // Merge versions from other into here for other_v in other.versions.iter() { @@ -207,8 +202,7 @@ impl CRDT for Object { .iter() .enumerate() .rev() - .filter(|(_, v)| v.is_complete()) - .next() + .find(|(_, v)| v.is_complete()) .map(|(vi, _)| vi); if let Some(last_vi) = last_complete { diff --git a/src/model/version_table.rs b/src/model/version_table.rs index dd088224..bff7d4bb 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -14,7 +14,7 @@ use crate::block_ref_table::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Version { /// UUID of the version, used as partition key - pub uuid: UUID, + pub uuid: Uuid, // Actual data: the blocks for this version // In the case of a multipart upload, also store the etags @@ -35,7 +35,7 @@ pub struct Version { } impl Version { - pub fn new(uuid: UUID, bucket: String, key: String, deleted: bool) -> Self { + pub fn new(uuid: Uuid, bucket: String, key: String, deleted: bool) -> Self { Self { uuid, deleted: deleted.into(), @@ -78,7 +78,7 @@ pub struct VersionBlock { pub size: u64, } -impl AutoCRDT for VersionBlock { +impl AutoCrdt for VersionBlock { const WARN_IF_DIFFERENT: bool = true; } @@ -94,7 +94,7 @@ impl Entry for Version { } } -impl CRDT for Version { +impl Crdt for Version { fn merge(&mut self, other: &Self) { self.deleted.merge(&other.deleted); diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 5f7bbc96..da7dcf8f 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use std::fmt::Write as FmtWrite; use std::io::{Read, Write}; use std::net::{IpAddr, SocketAddr}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -56,7 +56,7 @@ impl RpcMessage for Message {} /// A ping, containing informations about status and config #[derive(Debug, Serialize, Deserialize)] pub struct PingMessage { - id: UUID, + id: Uuid, rpc_port: u16, status_hash: Hash, @@ -69,7 +69,7 @@ pub struct PingMessage { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AdvertisedNode { /// Id of the node this advertisement relates to - pub id: UUID, + pub id: Uuid, /// IP and port of the node pub addr: SocketAddr, @@ -84,7 +84,7 @@ pub struct AdvertisedNode { /// This node's membership manager pub struct System { /// The id of this node - pub id: UUID, + pub id: Uuid, persist_config: Persister, persist_status: Persister>, @@ -114,7 +114,7 @@ struct Updaters { #[derive(Debug, Clone)] pub struct Status { /// Mapping of each node id to its known status - pub nodes: HashMap>, + pub nodes: HashMap>, /// Hash of `nodes`, used to detect when nodes have different views of the cluster pub hash: Hash, } @@ -198,15 +198,15 @@ impl Status { } } -fn gen_node_id(metadata_dir: &PathBuf) -> Result { - let mut id_file = metadata_dir.clone(); +fn gen_node_id(metadata_dir: &Path) -> Result { + let mut id_file = metadata_dir.to_path_buf(); id_file.push("node_id"); if id_file.as_path().exists() { let mut f = std::fs::File::open(id_file.as_path())?; let mut d = vec![]; f.read_to_end(&mut d)?; if d.len() != 32 { - return Err(Error::Message(format!("Corrupt node_id file"))); + return Err(Error::Message("Corrupt node_id file".to_string())); } let mut id = [0u8; 32]; @@ -256,7 +256,7 @@ impl System { let state_info = StateInfo { hostname: gethostname::gethostname() .into_string() - .unwrap_or("".to_string()), + .unwrap_or_else(|_| "".to_string()), }; let ring = Ring::new(net_config); @@ -296,12 +296,12 @@ impl System { match msg { Message::Ping(ping) => self2.handle_ping(&addr, &ping).await, - Message::PullStatus => self2.handle_pull_status(), - Message::PullConfig => self2.handle_pull_config(), + Message::PullStatus => Ok(self2.handle_pull_status()), + Message::PullConfig => Ok(self2.handle_pull_config()), Message::AdvertiseNodesUp(adv) => self2.handle_advertise_nodes_up(&adv).await, Message::AdvertiseConfig(adv) => self2.handle_advertise_config(&adv).await, - _ => Err(Error::BadRPC(format!("Unexpected RPC message"))), + _ => Err(Error::BadRpc("Unexpected RPC message".to_string())), } } }); @@ -358,18 +358,18 @@ impl System { ) { let self2 = self.clone(); self.background - .spawn_worker(format!("discovery loop"), |stop_signal| { + .spawn_worker("discovery loop".to_string(), |stop_signal| { self2.discovery_loop(peers, consul_host, consul_service_name, stop_signal) }); let self2 = self.clone(); self.background - .spawn_worker(format!("ping loop"), |stop_signal| { + .spawn_worker("ping loop".to_string(), |stop_signal| { self2.ping_loop(stop_signal) }); } - async fn ping_nodes(self: Arc, peers: Vec<(SocketAddr, Option)>) { + async fn ping_nodes(self: Arc, peers: Vec<(SocketAddr, Option)>) { let ping_msg = self.make_ping(); let ping_resps = join_all(peers.iter().map(|(addr, id_option)| { let sys = self.clone(); @@ -424,7 +424,6 @@ impl System { warn!("Node {:?} seems to be down.", id); if !ring.config.members.contains_key(id) { info!("Removing node {:?} from status (not in config and not responding to pings anymore)", id); - drop(st); status.nodes.remove(&id); has_changes = true; } @@ -438,7 +437,7 @@ impl System { self.update_status(&update_locked, status).await; drop(update_locked); - if to_advertise.len() > 0 { + if !to_advertise.is_empty() { self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT) .await; } @@ -474,15 +473,13 @@ impl System { Ok(self.make_ping()) } - fn handle_pull_status(&self) -> Result { - Ok(Message::AdvertiseNodesUp( - self.status.borrow().to_serializable_membership(self), - )) + fn handle_pull_status(&self) -> Message { + Message::AdvertiseNodesUp(self.status.borrow().to_serializable_membership(self)) } - fn handle_pull_config(&self) -> Result { + fn handle_pull_config(&self) -> Message { let ring = self.ring.borrow().clone(); - Ok(Message::AdvertiseConfig(ring.config.clone())) + Message::AdvertiseConfig(ring.config.clone()) } async fn handle_advertise_nodes_up( @@ -530,7 +527,7 @@ impl System { self.update_status(&update_lock, status).await; drop(update_lock); - if to_ping.len() > 0 { + if !to_ping.is_empty() { self.background .spawn_cancellable(self.clone().ping_nodes(to_ping).map(Ok)); } @@ -576,8 +573,8 @@ impl System { self.clone().ping_nodes(ping_addrs).await; select! { - _ = restart_at.fuse() => (), - _ = stop_signal.changed().fuse() => (), + _ = restart_at.fuse() => {}, + _ = stop_signal.changed().fuse() => {}, } } } @@ -595,7 +592,7 @@ impl System { }; while !*stop_signal.borrow() { - let not_configured = self.ring.borrow().config.members.len() == 0; + let not_configured = self.ring.borrow().config.members.is_empty(); let no_peers = self.status.borrow().nodes.len() < 3; let bad_peers = self .status @@ -613,11 +610,8 @@ impl System { .map(|ip| (*ip, None)) .collect::>(); - match self.persist_status.load_async().await { - Ok(peers) => { - ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id)))); - } - _ => (), + if let Ok(peers) = self.persist_status.load_async().await { + ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id)))); } if let Some((consul_host, consul_service_name)) = &consul_config { @@ -636,15 +630,17 @@ impl System { let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); select! { - _ = restart_at.fuse() => (), - _ = stop_signal.changed().fuse() => (), + _ = restart_at.fuse() => {}, + _ = stop_signal.changed().fuse() => {}, } } } + // for some reason fixing this is causing compilation error, see https://github.com/rust-lang/rust-clippy/issues/7052 + #[allow(clippy::manual_async_fn)] fn pull_status( self: Arc, - peer: UUID, + peer: Uuid, ) -> impl futures::future::Future + Send + 'static { async move { let resp = self @@ -657,7 +653,7 @@ impl System { } } - async fn pull_config(self: Arc, peer: UUID) { + async fn pull_config(self: Arc, peer: Uuid) { let resp = self .rpc_client .call(peer, Message::PullConfig, PING_TIMEOUT) @@ -672,18 +668,15 @@ impl System { let mut list = status.to_serializable_membership(&self); // Combine with old peer list to make sure no peer is lost - match self.persist_status.load_async().await { - Ok(old_list) => { - for pp in old_list { - if !list.iter().any(|np| pp.id == np.id) { - list.push(pp); - } + if let Ok(old_list) = self.persist_status.load_async().await { + for pp in old_list { + if !list.iter().any(|np| pp.id == np.id) { + list.push(pp); } } - _ => (), } - if list.len() > 0 { + if !list.is_empty() { info!("Persisting new peer list ({} peers)", list.len()); self.persist_status .save_async(&list) diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index bffd7f1f..0f94d0f6 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -32,7 +32,7 @@ pub const MAX_REPLICATION: usize = 3; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkConfig { /// Map of each node's id to it's configuration - pub members: HashMap, + pub members: HashMap, /// Version of this config pub version: u64, } @@ -73,7 +73,7 @@ pub struct RingEntry { /// The prefix of the Hash of object which should use this entry pub location: Hash, /// The nodes in which a matching object should get stored - pub nodes: [UUID; MAX_REPLICATION], + pub nodes: [Uuid; MAX_REPLICATION], } impl Ring { @@ -92,7 +92,7 @@ impl Ring { let n_datacenters = datacenters.len(); // Prepare ring - let mut partitions: Vec> = partitions_idx + let mut partitions: Vec> = partitions_idx .iter() .map(|_i| Vec::new()) .collect::>(); @@ -141,8 +141,7 @@ impl Ring { if i_round >= node_info.capacity { continue; } - for pos2 in *pos..q.len() { - let qv = q[pos2]; + for (pos2, &qv) in q.iter().enumerate().skip(*pos) { if partitions[qv].len() != rep { continue; } @@ -181,7 +180,7 @@ impl Ring { let top = (i as u16) << (16 - PARTITION_BITS); let mut hash = [0u8; 32]; hash[0..2].copy_from_slice(&u16::to_be_bytes(top)[..]); - let nodes = nodes.iter().map(|(id, _info)| **id).collect::>(); + let nodes = nodes.iter().map(|(id, _info)| **id).collect::>(); RingEntry { location: hash.into(), nodes: nodes.try_into().unwrap(), @@ -205,7 +204,7 @@ impl Ring { for (i, entry) in self.ring.iter().enumerate() { ret.push((i as u16, entry.location)); } - if ret.len() > 0 { + if !ret.is_empty() { assert_eq!(ret[0].1, [0u8; 32].into()); } @@ -214,7 +213,7 @@ impl Ring { // TODO rename this function as it no longer walk the ring /// Walk the ring to find the n servers in which data should be replicated - pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec { + pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec { if self.ring.len() != 1 << PARTITION_BITS { warn!("Ring not yet ready, read/writes will be lost!"); return vec![]; @@ -234,6 +233,6 @@ impl Ring { assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16); assert!(n <= partition.nodes.len()); - partition.nodes[..n].iter().cloned().collect::>() + partition.nodes[..n].to_vec() } } diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs index 8a6cc721..5ed43d44 100644 --- a/src/rpc/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -19,7 +19,7 @@ use tokio::sync::{watch, Semaphore}; use garage_util::background::BackgroundRunner; use garage_util::config::TlsConfig; use garage_util::data::*; -use garage_util::error::{Error, RPCError}; +use garage_util::error::{Error, RpcError}; use crate::membership::Status; use crate::rpc_server::RpcMessage; @@ -70,7 +70,7 @@ pub struct RpcClient { status: watch::Receiver>, background: Arc, - local_handler: ArcSwapOption<(UUID, LocalHandlerFn)>, + local_handler: ArcSwapOption<(Uuid, LocalHandlerFn)>, rpc_addr_client: RpcAddrClient, } @@ -91,7 +91,7 @@ impl RpcClient { } /// Set the local handler, to process RPC to this node without network usage - pub fn set_local_handler(&self, my_id: UUID, handler: F) + pub fn set_local_handler(&self, my_id: Uuid, handler: F) where F: Fn(Arc) -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, @@ -110,12 +110,12 @@ impl RpcClient { } /// Make a RPC call - pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result { + pub async fn call(&self, to: Uuid, msg: M, timeout: Duration) -> Result { self.call_arc(to, Arc::new(msg), timeout).await } /// Make a RPC call from a message stored in an Arc - pub async fn call_arc(&self, to: UUID, msg: Arc, timeout: Duration) -> Result { + pub async fn call_arc(&self, to: Uuid, msg: Arc, timeout: Duration) -> Result { if let Some(lh) = self.local_handler.load_full() { let (my_id, local_handler) = lh.as_ref(); if to.borrow() == my_id { @@ -128,7 +128,7 @@ impl RpcClient { if node_status.is_up() { node_status } else { - return Err(Error::from(RPCError::NodeDown(to))); + return Err(Error::from(RpcError::NodeDown(to))); } } None => { @@ -152,7 +152,7 @@ impl RpcClient { } /// Make a RPC call to multiple servers, returning a Vec containing each result - pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec> { + pub async fn call_many(&self, to: &[Uuid], msg: M, timeout: Duration) -> Vec> { let msg = Arc::new(msg); let mut resp_stream = to .iter() @@ -170,7 +170,7 @@ impl RpcClient { /// strategy could not be respected due to too many errors pub async fn try_call_many( self: &Arc, - to: &[UUID], + to: &[Uuid], msg: M, strategy: RequestStrategy, ) -> Result, Error> { @@ -222,7 +222,7 @@ impl RpcClient { Ok(results) } else { let errors = errors.iter().map(|e| format!("{}", e)).collect::>(); - Err(Error::from(RPCError::TooManyErrors(errors))) + Err(Error::from(RpcError::TooManyErrors(errors))) } } } @@ -240,7 +240,7 @@ impl RpcAddrClient { pub fn new(http_client: Arc, path: String) -> Self { Self { phantom: PhantomData::default(), - http_client: http_client, + http_client, path, } } @@ -251,7 +251,7 @@ impl RpcAddrClient { to_addr: &SocketAddr, msg: MB, timeout: Duration, - ) -> Result, RPCError> + ) -> Result, RpcError> where MB: Borrow, { @@ -268,8 +268,8 @@ pub struct RpcHttpClient { } enum ClientMethod { - HTTP(Client), - HTTPS(Client, hyper::Body>), + Http(Client), + Https(Client, hyper::Body>), } impl RpcHttpClient { @@ -294,9 +294,9 @@ impl RpcHttpClient { let connector = tls_util::HttpsConnectorFixedDnsname::::new(config, "garage"); - ClientMethod::HTTPS(Client::builder().build(connector)) + ClientMethod::Https(Client::builder().build(connector)) } else { - ClientMethod::HTTP(Client::new()) + ClientMethod::Http(Client::new()) }; Ok(RpcHttpClient { method, @@ -311,14 +311,14 @@ impl RpcHttpClient { to_addr: &SocketAddr, msg: MB, timeout: Duration, - ) -> Result, RPCError> + ) -> Result, RpcError> where MB: Borrow, M: RpcMessage, { let uri = match self.method { - ClientMethod::HTTP(_) => format!("http://{}/{}", to_addr, path), - ClientMethod::HTTPS(_) => format!("https://{}/{}", to_addr, path), + ClientMethod::Http(_) => format!("http://{}/{}", to_addr, path), + ClientMethod::Https(_) => format!("https://{}/{}", to_addr, path), }; let req = Request::builder() @@ -327,8 +327,8 @@ impl RpcHttpClient { .body(Body::from(rmp_to_vec_all_named(msg.borrow())?))?; let resp_fut = match &self.method { - ClientMethod::HTTP(client) => client.request(req).fuse(), - ClientMethod::HTTPS(client) => client.request(req).fuse(), + ClientMethod::Http(client) => client.request(req).fuse(), + ClientMethod::Https(client) => client.request(req).fuse(), }; trace!("({}) Acquiring request_limiter slot...", path); diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs index 4419a6f0..81361ab9 100644 --- a/src/rpc/rpc_server.rs +++ b/src/rpc/rpc_server.rs @@ -57,7 +57,7 @@ where trace!( "Request message: {}", serde_json::to_string(&msg) - .unwrap_or("".into()) + .unwrap_or_else(|_| "".into()) .chars() .take(100) .collect::() @@ -77,7 +77,7 @@ where let rep_bytes = rmp_to_vec_all_named::>(&Err(err_str))?; let mut err_response = Response::new(Body::from(rep_bytes)); *err_response.status_mut() = match e { - Error::BadRPC(_) => StatusCode::BAD_REQUEST, + Error::BadRpc(_) => StatusCode::BAD_REQUEST, _ => StatusCode::INTERNAL_SERVER_ERROR, }; warn!( @@ -123,7 +123,7 @@ impl RpcServer { req: Request, addr: SocketAddr, ) -> Result, Error> { - if req.method() != &Method::POST { + if req.method() != Method::POST { let mut bad_request = Response::default(); *bad_request.status_mut() = StatusCode::BAD_REQUEST; return Ok(bad_request); @@ -201,7 +201,7 @@ impl RpcServer { .get_ref() .0 .peer_addr() - .unwrap_or(([0, 0, 0, 0], 0).into()); + .unwrap_or_else(|_| ([0, 0, 0, 0], 0).into()); let self_arc = self_arc.clone(); async move { Ok::<_, Error>(service_fn(move |req: Request| { diff --git a/src/table/crdt/bool.rs b/src/table/crdt/bool.rs index 1989c92e..53af8f82 100644 --- a/src/table/crdt/bool.rs +++ b/src/table/crdt/bool.rs @@ -27,7 +27,7 @@ impl From for Bool { } } -impl CRDT for Bool { +impl Crdt for Bool { fn merge(&mut self, other: &Self) { self.0 = self.0 || other.0; } diff --git a/src/table/crdt/crdt.rs b/src/table/crdt/crdt.rs index 636b6df6..a8f1b9aa 100644 --- a/src/table/crdt/crdt.rs +++ b/src/table/crdt/crdt.rs @@ -18,7 +18,7 @@ use garage_util::data::*; /// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order. /// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`, /// as this would imply a cycle in the partial order. -pub trait CRDT { +pub trait Crdt { /// Merge the two datastructures according to the CRDT rules. /// `self` is modified to contain the merged CRDT value. `other` is not modified. /// @@ -31,16 +31,16 @@ pub trait CRDT { /// All types that implement `Ord` (a total order) can also implement a trivial CRDT /// defined by the merge rule: `a ⊔ b = max(a, b)`. Implement this trait for your type /// to enable this behavior. -pub trait AutoCRDT: Ord + Clone + std::fmt::Debug { +pub trait AutoCrdt: Ord + Clone + std::fmt::Debug { /// WARN_IF_DIFFERENT: emit a warning when values differ. Set this to true if /// different values in your application should never happen. Set this to false /// if you are actually relying on the semantics of `a ⊔ b = max(a, b)`. const WARN_IF_DIFFERENT: bool; } -impl CRDT for T +impl Crdt for T where - T: AutoCRDT, + T: AutoCrdt, { fn merge(&mut self, other: &Self) { if Self::WARN_IF_DIFFERENT && self != other { @@ -52,22 +52,20 @@ where *self = other.clone(); } warn!("Making an arbitrary choice: {:?}", self); - } else { - if other > self { - *self = other.clone(); - } + } else if other > self { + *self = other.clone(); } } } -impl AutoCRDT for String { +impl AutoCrdt for String { const WARN_IF_DIFFERENT: bool = true; } -impl AutoCRDT for bool { +impl AutoCrdt for bool { const WARN_IF_DIFFERENT: bool = true; } -impl AutoCRDT for FixedBytes32 { +impl AutoCrdt for FixedBytes32 { const WARN_IF_DIFFERENT: bool = true; } diff --git a/src/table/crdt/lww.rs b/src/table/crdt/lww.rs index 3b1b2406..be197d88 100644 --- a/src/table/crdt/lww.rs +++ b/src/table/crdt/lww.rs @@ -36,14 +36,14 @@ use crate::crdt::crdt::*; /// This scheme is used by AWS S3 or Soundcloud and often without knowing /// in enterprise when reconciliating databases with ad-hoc scripts. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct LWW { +pub struct Lww { ts: u64, v: T, } -impl LWW +impl Lww where - T: CRDT, + T: Crdt, { /// Creates a new CRDT /// @@ -99,9 +99,9 @@ where } } -impl CRDT for LWW +impl Crdt for Lww where - T: Clone + CRDT, + T: Clone + Crdt, { fn merge(&mut self, other: &Self) { if other.ts > self.ts { diff --git a/src/table/crdt/lww_map.rs b/src/table/crdt/lww_map.rs index 7b372191..36bbf667 100644 --- a/src/table/crdt/lww_map.rs +++ b/src/table/crdt/lww_map.rs @@ -22,14 +22,14 @@ use crate::crdt::crdt::*; /// the serialization cost `O(n)` would still have to be paid at each modification, so we are /// actually not losing anything here. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct LWWMap { +pub struct LwwMap { vals: Vec<(K, u64, V)>, } -impl LWWMap +impl LwwMap where K: Ord, - V: CRDT, + V: Crdt, { /// Create a new empty map CRDT pub fn new() -> Self { @@ -94,7 +94,7 @@ where /// put_my_crdt_value(a); /// ``` pub fn take_and_clear(&mut self) -> Self { - let vals = std::mem::replace(&mut self.vals, vec![]); + let vals = std::mem::take(&mut self.vals); Self { vals } } /// Removes all values from the map @@ -113,16 +113,22 @@ where pub fn items(&self) -> &[(K, u64, V)] { &self.vals[..] } + /// Returns the number of items in the map pub fn len(&self) -> usize { self.vals.len() } + + /// Returns true if the map is empty + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } -impl CRDT for LWWMap +impl Crdt for LwwMap where K: Clone + Ord, - V: Clone + CRDT, + V: Clone + Crdt, { fn merge(&mut self, other: &Self) { for (k, ts2, v2) in other.vals.iter() { @@ -143,3 +149,13 @@ where } } } + +impl Default for LwwMap +where + K: Ord, + V: Crdt, +{ + fn default() -> Self { + Self::new() + } +} diff --git a/src/table/crdt/map.rs b/src/table/crdt/map.rs index c4a30a26..e2aee40a 100644 --- a/src/table/crdt/map.rs +++ b/src/table/crdt/map.rs @@ -22,7 +22,7 @@ pub struct Map { impl Map where K: Clone + Ord, - V: Clone + CRDT, + V: Clone + Crdt, { /// Create a new empty map CRDT pub fn new() -> Self { @@ -62,12 +62,17 @@ where pub fn len(&self) -> usize { self.vals.len() } + + /// Returns true if the map is empty + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } -impl CRDT for Map +impl Crdt for Map where K: Clone + Ord, - V: Clone + CRDT, + V: Clone + Crdt, { fn merge(&mut self, other: &Self) { for (k, v2) in other.vals.iter() { @@ -82,3 +87,13 @@ where } } } + +impl Default for Map +where + K: Clone + Ord, + V: Clone + Crdt, +{ + fn default() -> Self { + Self::new() + } +} diff --git a/src/table/crdt/mod.rs b/src/table/crdt/mod.rs index eb75d061..9663a5a5 100644 --- a/src/table/crdt/mod.rs +++ b/src/table/crdt/mod.rs @@ -10,6 +10,7 @@ //! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type) mod bool; +#[allow(clippy::module_inception)] mod crdt; mod lww; mod lww_map; diff --git a/src/table/data.rs b/src/table/data.rs index 542a8481..e7e85e65 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -11,7 +11,7 @@ use garage_util::error::*; use garage_rpc::membership::System; -use crate::crdt::CRDT; +use crate::crdt::Crdt; use crate::replication::*; use crate::schema::*; @@ -151,7 +151,7 @@ where if Some(&new_entry) != old_entry.as_ref() { let new_bytes = rmp_to_vec_all_named(&new_entry) - .map_err(Error::RMPEncode) + .map_err(Error::RmpEncode) .map_err(sled::transaction::ConflictableTransactionError::Abort)?; let new_bytes_hash = blake2sum(&new_bytes[..]); mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; diff --git a/src/table/gc.rs b/src/table/gc.rs index 694a3789..73e08827 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -24,23 +24,23 @@ use crate::schema::*; const TABLE_GC_BATCH_SIZE: usize = 1024; const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); -pub struct TableGC { +pub struct TableGc { system: Arc, data: Arc>, - rpc_client: Arc>, + rpc_client: Arc>, } #[derive(Serialize, Deserialize)] -enum GcRPC { +enum GcRpc { Update(Vec), DeleteIfEqualHash(Vec<(ByteBuf, Hash)>), Ok, } -impl RpcMessage for GcRPC {} +impl RpcMessage for GcRpc {} -impl TableGC +impl TableGc where F: TableSchema + 'static, R: TableReplication + 'static, @@ -51,7 +51,7 @@ where rpc_server: &mut RpcServer, ) -> Arc { let rpc_path = format!("table_{}/gc", data.name); - let rpc_client = system.rpc_client::(&rpc_path); + let rpc_client = system.rpc_client::(&rpc_path); let gc = Arc::new(Self { system: system.clone(), @@ -85,8 +85,8 @@ where } } select! { - _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => (), - _ = must_exit.changed().fuse() => (), + _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => {}, + _ = must_exit.changed().fuse() => {}, } } } @@ -120,7 +120,7 @@ where self.todo_remove_if_equal(&k[..], vhash)?; } - if entries.len() == 0 { + if entries.is_empty() { // Nothing to do in this iteration return Ok(false); } @@ -168,7 +168,7 @@ where async fn try_send_and_delete( &self, - nodes: Vec, + nodes: Vec, items: Vec<(ByteBuf, Hash, ByteBuf)>, ) -> Result<(), Error> { let n_items = items.len(); @@ -183,7 +183,7 @@ where self.rpc_client .try_call_many( &nodes[..], - GcRPC::Update(updates), + GcRpc::Update(updates), RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT), ) .await?; @@ -196,7 +196,7 @@ where self.rpc_client .try_call_many( &nodes[..], - GcRPC::DeleteIfEqualHash(deletes.clone()), + GcRpc::DeleteIfEqualHash(deletes.clone()), RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT), ) .await?; @@ -221,7 +221,7 @@ where fn register_handler(self: &Arc, rpc_server: &mut RpcServer, path: String) { let self2 = self.clone(); - rpc_server.add_handler::(path, move |msg, _addr| { + rpc_server.add_handler::(path, move |msg, _addr| { let self2 = self2.clone(); async move { self2.handle_rpc(&msg).await } }); @@ -234,20 +234,20 @@ where }); } - async fn handle_rpc(self: &Arc, message: &GcRPC) -> Result { + async fn handle_rpc(self: &Arc, message: &GcRpc) -> Result { match message { - GcRPC::Update(items) => { + GcRpc::Update(items) => { self.data.update_many(items)?; - Ok(GcRPC::Ok) + Ok(GcRpc::Ok) } - GcRPC::DeleteIfEqualHash(items) => { + GcRpc::DeleteIfEqualHash(items) => { for (key, vhash) in items.iter() { self.data.delete_if_equal_hash(&key[..], *vhash)?; self.todo_remove_if_equal(&key[..], *vhash)?; } - Ok(GcRPC::Ok) + Ok(GcRpc::Ok) } - _ => Err(Error::Message(format!("Unexpected GC RPC"))), + _ => Err(Error::Message("Unexpected GC RPC".to_string())), } } } diff --git a/src/table/lib.rs b/src/table/lib.rs index c3e14ab8..53d2c93b 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -1,4 +1,5 @@ #![recursion_limit = "1024"] +#![allow(clippy::comparison_chain)] #[macro_use] extern crate log; diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 39b87aa1..5c5cbec7 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -111,8 +111,8 @@ where } } else { select! { - _ = self.data.merkle_todo_notify.notified().fuse() => (), - _ = must_exit.changed().fuse() => (), + _ = self.data.merkle_todo_notify.notified().fuse() => {}, + _ = must_exit.changed().fuse() => {}, } } } @@ -121,10 +121,10 @@ where fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> { let khash = blake2sum(k); - let new_vhash = if vhash_by.len() == 0 { + let new_vhash = if vhash_by.is_empty() { None } else { - Some(Hash::try_from(&vhash_by[..]).unwrap()) + Some(Hash::try_from(vhash_by).unwrap()) }; let key = MerkleNodeKey { @@ -168,14 +168,7 @@ where // This update is an Option<_>, so that it is None if the update is a no-op // and we can thus skip recalculating and re-storing everything let mutate = match self.read_node_txn(tx, &key)? { - MerkleNode::Empty => { - if let Some(vhv) = new_vhash { - Some(MerkleNode::Leaf(k.to_vec(), vhv)) - } else { - // Nothing to do, keep empty node - None - } - } + MerkleNode::Empty => new_vhash.map(|vhv| MerkleNode::Leaf(k.to_vec(), vhv)), MerkleNode::Intermediate(mut children) => { let key2 = key.next_key(khash); if let Some(subhash) = self.update_item_rec(tx, k, khash, &key2, new_vhash)? { @@ -186,7 +179,7 @@ where intermediate_set_child(&mut children, key2.prefix[i], subhash); } - if children.len() == 0 { + if children.is_empty() { // should not happen warn!( "({}) Replacing intermediate node with empty node, should not happen.", diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index a6b4c98c..3ce7c0bf 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -19,14 +19,14 @@ pub struct TableFullReplication { } impl TableReplication for TableFullReplication { - fn read_nodes(&self, _hash: &Hash) -> Vec { + fn read_nodes(&self, _hash: &Hash) -> Vec { vec![self.system.id] } fn read_quorum(&self) -> usize { 1 } - fn write_nodes(&self, _hash: &Hash) -> Vec { + fn write_nodes(&self, _hash: &Hash) -> Vec { let ring = self.system.ring.borrow(); ring.config.members.keys().cloned().collect::>() } diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index c2c78c8b..64996828 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -8,12 +8,12 @@ pub trait TableReplication: Send + Sync { // To understand various replication methods /// Which nodes to send read requests to - fn read_nodes(&self, hash: &Hash) -> Vec; + fn read_nodes(&self, hash: &Hash) -> Vec; /// Responses needed to consider a read succesfull fn read_quorum(&self) -> usize; /// Which nodes to send writes to - fn write_nodes(&self, hash: &Hash) -> Vec; + fn write_nodes(&self, hash: &Hash) -> Vec; /// Responses needed to consider a write succesfull fn write_quorum(&self) -> usize; fn max_write_errors(&self) -> usize; diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index f2d89729..93b95a38 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -25,7 +25,7 @@ pub struct TableShardedReplication { } impl TableReplication for TableShardedReplication { - fn read_nodes(&self, hash: &Hash) -> Vec { + fn read_nodes(&self, hash: &Hash) -> Vec { let ring = self.system.ring.borrow().clone(); ring.walk_ring(&hash, self.replication_factor) } @@ -33,7 +33,7 @@ impl TableReplication for TableShardedReplication { self.read_quorum } - fn write_nodes(&self, hash: &Hash) -> Vec { + fn write_nodes(&self, hash: &Hash) -> Vec { let ring = self.system.ring.borrow(); ring.walk_ring(&hash, self.replication_factor) } diff --git a/src/table/schema.rs b/src/table/schema.rs index 13517271..4d6050e8 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; -use crate::crdt::CRDT; +use crate::crdt::Crdt; /// Trait for field used to partition data pub trait PartitionKey { @@ -18,7 +18,7 @@ impl PartitionKey for String { impl PartitionKey for Hash { fn hash(&self) -> Hash { - self.clone() + *self } } @@ -42,7 +42,7 @@ impl SortKey for Hash { /// Trait for an entry in a table. It must be sortable and partitionnable. pub trait Entry: - CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + Crdt + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { /// Get the key used to partition fn partition_key(&self) -> &P; diff --git a/src/table/sync.rs b/src/table/sync.rs index 3130abe8..a3afbbba 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -34,11 +34,11 @@ pub struct TableSyncer { merkle: Arc>, todo: Mutex, - rpc_client: Arc>, + rpc_client: Arc>, } #[derive(Serialize, Deserialize)] -pub(crate) enum SyncRPC { +pub(crate) enum SyncRpc { RootCkHash(Partition, Hash), RootCkDifferent(bool), GetNode(MerkleNodeKey), @@ -47,7 +47,7 @@ pub(crate) enum SyncRPC { Ok, } -impl RpcMessage for SyncRPC {} +impl RpcMessage for SyncRpc {} struct SyncTodo { todo: Vec, @@ -75,7 +75,7 @@ where rpc_server: &mut RpcServer, ) -> Arc { let rpc_path = format!("table_{}/sync", data.name); - let rpc_client = system.rpc_client::(&rpc_path); + let rpc_client = system.rpc_client::(&rpc_path); let todo = SyncTodo { todo: vec![] }; @@ -114,7 +114,7 @@ where fn register_handler(self: &Arc, rpc_server: &mut RpcServer, path: String) { let self2 = self.clone(); - rpc_server.add_handler::(path, move |msg, _addr| { + rpc_server.add_handler::(path, move |msg, _addr| { let self2 = self2.clone(); async move { self2.handle_rpc(&msg).await } }); @@ -150,14 +150,12 @@ where if let Some(busy) = busy_opt { if busy { nothing_to_do_since = None; - } else { - if nothing_to_do_since.is_none() { - nothing_to_do_since = Some(Instant::now()); - } + } else if nothing_to_do_since.is_none() { + nothing_to_do_since = Some(Instant::now()); } } } - _ = must_exit.changed().fuse() => (), + _ = must_exit.changed().fuse() => {}, _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) { nothing_to_do_since = None; @@ -277,7 +275,7 @@ where } } - if items.len() > 0 { + if !items.is_empty() { let nodes = self .data .replication @@ -292,9 +290,10 @@ where break; } if nodes.len() < self.data.replication.write_quorum() { - return Err(Error::Message(format!( + return Err(Error::Message( "Not offloading as we don't have a quorum of nodes to write to." - ))); + .to_string(), + )); } counter += 1; @@ -317,15 +316,15 @@ where async fn offload_items( self: &Arc, - items: &Vec<(Vec, Arc)>, - nodes: &[UUID], + items: &[(Vec, Arc)], + nodes: &[Uuid], ) -> Result<(), Error> { let values = items.iter().map(|(_k, v)| v.clone()).collect::>(); self.rpc_client .try_call_many( - &nodes[..], - SyncRPC::Items(values), + nodes, + SyncRpc::Items(values), RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT), ) .await?; @@ -363,7 +362,7 @@ where async fn do_sync_with( self: Arc, partition: TodoPartition, - who: UUID, + who: Uuid, must_exit: watch::Receiver, ) -> Result<(), Error> { let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?; @@ -382,20 +381,20 @@ where .rpc_client .call( who, - SyncRPC::RootCkHash(partition.partition, root_ck_hash), + SyncRpc::RootCkHash(partition.partition, root_ck_hash), TABLE_SYNC_RPC_TIMEOUT, ) .await?; let mut todo = match root_resp { - SyncRPC::RootCkDifferent(false) => { + SyncRpc::RootCkDifferent(false) => { debug!( "({}) Sync {:?} with {:?}: no difference", self.data.name, partition, who ); return Ok(()); } - SyncRPC::RootCkDifferent(true) => VecDeque::from(vec![root_ck_key]), + SyncRpc::RootCkDifferent(true) => VecDeque::from(vec![root_ck_key]), x => { return Err(Error::Message(format!( "Invalid respone to RootCkHash RPC: {}", @@ -432,10 +431,10 @@ where // and compare it with local node let remote_node = match self .rpc_client - .call(who, SyncRPC::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT) + .call(who, SyncRpc::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT) .await? { - SyncRPC::Node(_, node) => node, + SyncRpc::Node(_, node) => node, x => { return Err(Error::Message(format!( "Invalid respone to GetNode RPC: {}", @@ -467,7 +466,7 @@ where } if todo_items.len() >= 256 { - self.send_items(who, std::mem::replace(&mut todo_items, vec![])) + self.send_items(who, std::mem::take(&mut todo_items)) .await?; } } @@ -479,7 +478,7 @@ where Ok(()) } - async fn send_items(&self, who: UUID, item_value_list: Vec>) -> Result<(), Error> { + async fn send_items(&self, who: Uuid, item_value_list: Vec>) -> Result<(), Error> { info!( "({}) Sending {} items to {:?}", self.data.name, @@ -494,9 +493,9 @@ where let rpc_resp = self .rpc_client - .call(who, SyncRPC::Items(values), TABLE_SYNC_RPC_TIMEOUT) + .call(who, SyncRpc::Items(values), TABLE_SYNC_RPC_TIMEOUT) .await?; - if let SyncRPC::Ok = rpc_resp { + if let SyncRpc::Ok = rpc_resp { Ok(()) } else { Err(Error::Message(format!( @@ -508,22 +507,22 @@ where // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== - async fn handle_rpc(self: &Arc, message: &SyncRPC) -> Result { + async fn handle_rpc(self: &Arc, message: &SyncRpc) -> Result { match message { - SyncRPC::RootCkHash(range, h) => { + SyncRpc::RootCkHash(range, h) => { let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; let hash = hash_of::(&root_ck)?; - Ok(SyncRPC::RootCkDifferent(hash != *h)) + Ok(SyncRpc::RootCkDifferent(hash != *h)) } - SyncRPC::GetNode(k) => { + SyncRpc::GetNode(k) => { let node = self.merkle.read_node(&k)?; - Ok(SyncRPC::Node(k.clone(), node)) + Ok(SyncRpc::Node(k.clone(), node)) } - SyncRPC::Items(items) => { + SyncRpc::Items(items) => { self.data.update_many(items)?; - Ok(SyncRPC::Ok) + Ok(SyncRpc::Ok) } - _ => Err(Error::Message(format!("Unexpected sync RPC"))), + _ => Err(Error::Message("Unexpected sync RPC".to_string())), } } } diff --git a/src/table/table.rs b/src/table/table.rs index e203b178..eb9bd25c 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -13,7 +13,7 @@ use garage_rpc::membership::System; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; -use crate::crdt::CRDT; +use crate::crdt::Crdt; use crate::data::*; use crate::gc::*; use crate::merkle::*; @@ -28,11 +28,11 @@ pub struct Table { pub data: Arc>, pub merkle_updater: Arc>, pub syncer: Arc>, - rpc_client: Arc>>, + rpc_client: Arc>>, } #[derive(Serialize, Deserialize)] -pub(crate) enum TableRPC { +pub(crate) enum TableRpc { Ok, ReadEntry(F::P, F::S), @@ -44,7 +44,7 @@ pub(crate) enum TableRPC { Update(Vec>), } -impl RpcMessage for TableRPC {} +impl RpcMessage for TableRpc {} impl Table where @@ -62,7 +62,7 @@ where rpc_server: &mut RpcServer, ) -> Arc { let rpc_path = format!("table_{}", name); - let rpc_client = system.rpc_client::>(&rpc_path); + let rpc_client = system.rpc_client::>(&rpc_path); let data = TableData::new(system.clone(), name, instance, replication, db); @@ -74,7 +74,7 @@ where merkle_updater.clone(), rpc_server, ); - TableGC::launch(system.clone(), data.clone(), rpc_server); + TableGc::launch(system.clone(), data.clone(), rpc_server); let table = Arc::new(Self { system, @@ -95,7 +95,7 @@ where //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); - let rpc = TableRPC::::Update(vec![e_enc]); + let rpc = TableRpc::::Update(vec![e_enc]); self.rpc_client .try_call_many( @@ -109,22 +109,19 @@ where } pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { - let mut call_list = HashMap::new(); + let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); for entry in entries.iter() { let hash = entry.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { - if !call_list.contains_key(&node) { - call_list.insert(node, vec![]); - } - call_list.get_mut(&node).unwrap().push(e_enc.clone()); + call_list.entry(node).or_default().push(e_enc.clone()); } } let call_futures = call_list.drain().map(|(node, entries)| async move { - let rpc = TableRPC::::Update(entries); + let rpc = TableRpc::::Update(entries); let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; Ok::<_, Error>((node, resp)) @@ -153,7 +150,7 @@ where let who = self.data.replication.read_nodes(&hash); //eprintln!("get who: {:?}", who); - let rpc = TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); + let rpc = TableRpc::::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self .rpc_client .try_call_many( @@ -168,7 +165,7 @@ where let mut ret = None; let mut not_all_same = false; for resp in resps { - if let TableRPC::ReadEntryResponse(value) = resp { + if let TableRpc::ReadEntryResponse(value) = resp { if let Some(v_bytes) = value { let v = self.data.decode_entry(v_bytes.as_slice())?; ret = match ret { @@ -183,7 +180,7 @@ where } } } else { - return Err(Error::Message(format!("Invalid return value to read"))); + return Err(Error::Message("Invalid return value to read".to_string())); } } if let Some(ret_entry) = &ret { @@ -208,7 +205,7 @@ where let hash = partition_key.hash(); let who = self.data.replication.read_nodes(&hash); - let rpc = TableRPC::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); + let rpc = TableRpc::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); let resps = self .rpc_client @@ -224,7 +221,7 @@ where let mut ret = BTreeMap::new(); let mut to_repair = BTreeMap::new(); for resp in resps { - if let TableRPC::Update(entries) = resp { + if let TableRpc::Update(entries) = resp { for entry_bytes in entries.iter() { let entry = self.data.decode_entry(entry_bytes.as_slice())?; let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key()); @@ -264,12 +261,12 @@ where // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== - async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { + async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); self.rpc_client .try_call_many( - &who[..], - TableRPC::::Update(vec![what_enc]), + who, + TableRpc::::Update(vec![what_enc]), RequestStrategy::with_quorum(who.len()).with_timeout(TABLE_RPC_TIMEOUT), ) .await?; @@ -280,7 +277,7 @@ where fn register_handler(self: Arc, rpc_server: &mut RpcServer, path: String) { let self2 = self.clone(); - rpc_server.add_handler::, _, _>(path, move |msg, _addr| { + rpc_server.add_handler::, _, _>(path, move |msg, _addr| { let self2 = self2.clone(); async move { self2.handle(&msg).await } }); @@ -293,21 +290,21 @@ where }); } - async fn handle(self: &Arc, msg: &TableRPC) -> Result, Error> { + async fn handle(self: &Arc, msg: &TableRpc) -> Result, Error> { match msg { - TableRPC::ReadEntry(key, sort_key) => { + TableRpc::ReadEntry(key, sort_key) => { let value = self.data.read_entry(key, sort_key)?; - Ok(TableRPC::ReadEntryResponse(value)) + Ok(TableRpc::ReadEntryResponse(value)) } - TableRPC::ReadRange(key, begin_sort_key, filter, limit) => { + TableRpc::ReadRange(key, begin_sort_key, filter, limit) => { let values = self.data.read_range(key, begin_sort_key, filter, *limit)?; - Ok(TableRPC::Update(values)) + Ok(TableRpc::Update(values)) } - TableRPC::Update(pairs) => { + TableRpc::Update(pairs) => { self.data.update_many(pairs)?; - Ok(TableRPC::Ok) + Ok(TableRpc::Ok) } - _ => Err(Error::BadRPC(format!("Unexpected table RPC"))), + _ => Err(Error::BadRpc("Unexpected table RPC".to_string())), } } } diff --git a/src/util/data.rs b/src/util/data.rs index 34ee8a18..6df51cd0 100644 --- a/src/util/data.rs +++ b/src/util/data.rs @@ -72,7 +72,7 @@ impl FixedBytes32 { &mut self.0[..] } /// Copy to a slice - pub fn to_vec(&self) -> Vec { + pub fn to_vec(self) -> Vec { self.0.to_vec() } /// Try building a FixedBytes32 from a slice @@ -88,7 +88,7 @@ impl FixedBytes32 { } /// A 32 bytes UUID -pub type UUID = FixedBytes32; +pub type Uuid = FixedBytes32; /// A 256 bit cryptographic hash, can be sha256 or blake2 depending on provenance pub type Hash = FixedBytes32; @@ -127,7 +127,7 @@ pub fn fasthash(data: &[u8]) -> FastHash { } /// Generate a random 32 bytes UUID -pub fn gen_uuid() -> UUID { +pub fn gen_uuid() -> Uuid { rand::thread_rng().gen::<[u8; 32]>().into() } diff --git a/src/util/error.rs b/src/util/error.rs index 32dccbe6..c3d84e63 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -7,24 +7,24 @@ use crate::data::*; /// RPC related errors #[derive(Debug, Error)] -pub enum RPCError { +pub enum RpcError { #[error(display = "Node is down: {:?}.", _0)] - NodeDown(UUID), + NodeDown(Uuid), #[error(display = "Timeout: {}", _0)] Timeout(#[error(source)] tokio::time::error::Elapsed), #[error(display = "HTTP error: {}", _0)] - HTTP(#[error(source)] http::Error), + Http(#[error(source)] http::Error), #[error(display = "Hyper error: {}", _0)] Hyper(#[error(source)] hyper::Error), #[error(display = "Messagepack encode error: {}", _0)] - RMPEncode(#[error(source)] rmp_serde::encode::Error), + RmpEncode(#[error(source)] rmp_serde::encode::Error), #[error(display = "Messagepack decode error: {}", _0)] - RMPDecode(#[error(source)] rmp_serde::decode::Error), + RmpDecode(#[error(source)] rmp_serde::decode::Error), #[error(display = "Too many errors: {:?}", _0)] TooManyErrors(Vec), @@ -40,26 +40,26 @@ pub enum Error { Hyper(#[error(source)] hyper::Error), #[error(display = "HTTP error: {}", _0)] - HTTP(#[error(source)] http::Error), + Http(#[error(source)] http::Error), #[error(display = "Invalid HTTP header value: {}", _0)] - HTTPHeader(#[error(source)] http::header::ToStrError), + HttpHeader(#[error(source)] http::header::ToStrError), #[error(display = "TLS error: {}", _0)] - TLS(#[error(source)] rustls::TLSError), + Tls(#[error(source)] rustls::TLSError), #[error(display = "PKI error: {}", _0)] - PKI(#[error(source)] webpki::Error), + Pki(#[error(source)] webpki::Error), #[error(display = "Sled error: {}", _0)] Sled(#[error(source)] sled::Error), #[error(display = "Messagepack encode error: {}", _0)] - RMPEncode(#[error(source)] rmp_serde::encode::Error), + RmpEncode(#[error(source)] rmp_serde::encode::Error), #[error(display = "Messagepack decode error: {}", _0)] - RMPDecode(#[error(source)] rmp_serde::decode::Error), + RmpDecode(#[error(source)] rmp_serde::decode::Error), #[error(display = "JSON error: {}", _0)] - JSON(#[error(source)] serde_json::error::Error), + Json(#[error(source)] serde_json::error::Error), #[error(display = "TOML decode error: {}", _0)] TomlDecode(#[error(source)] toml::de::Error), @@ -67,13 +67,13 @@ pub enum Error { TokioJoin(#[error(source)] tokio::task::JoinError), #[error(display = "RPC call error: {}", _0)] - RPC(#[error(source)] RPCError), + Rpc(#[error(source)] RpcError), #[error(display = "Remote error: {} (status code {})", _0, _1)] RemoteError(String, StatusCode), #[error(display = "Bad RPC: {}", _0)] - BadRPC(String), + BadRpc(String), #[error(display = "Corrupt data: does not match hash {:?}", _0)] CorruptData(Hash), @@ -93,12 +93,12 @@ impl From> for Error { impl From> for Error { fn from(_e: tokio::sync::watch::error::SendError) -> Error { - Error::Message(format!("Watch send error")) + Error::Message("Watch send error".to_string()) } } impl From> for Error { fn from(_e: tokio::sync::mpsc::error::SendError) -> Error { - Error::Message(format!("MPSC send error")) + Error::Message("MPSC send error".to_string()) } } diff --git a/src/util/persister.rs b/src/util/persister.rs index 93b7cdf4..9e1a1910 100644 --- a/src/util/persister.rs +++ b/src/util/persister.rs @@ -1,5 +1,5 @@ use std::io::{Read, Write}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -18,8 +18,8 @@ impl Persister where T: Serialize + for<'de> Deserialize<'de>, { - pub fn new(base_dir: &PathBuf, file_name: &str) -> Self { - let mut path = base_dir.clone(); + pub fn new(base_dir: &Path, file_name: &str) -> Self { + let mut path = base_dir.to_path_buf(); path.push(file_name); Self { path, diff --git a/src/web/error.rs b/src/web/error.rs index f6afbb42..08717ce1 100644 --- a/src/web/error.rs +++ b/src/web/error.rs @@ -21,7 +21,7 @@ pub enum Error { /// The request contained an invalid UTF-8 sequence in its path or in other parameters #[error(display = "Invalid UTF-8: {}", _0)] - InvalidUTF8(#[error(source)] std::str::Utf8Error), + InvalidUtf8(#[error(source)] std::str::Utf8Error), /// The client send a header with invalid value #[error(display = "Invalid header value: {}", _0)] @@ -38,7 +38,7 @@ impl Error { match self { Error::NotFound => StatusCode::NOT_FOUND, Error::ApiError(e) => e.http_status_code(), - Error::InternalError(GarageError::RPC(_)) => StatusCode::SERVICE_UNAVAILABLE, + Error::InternalError(GarageError::Rpc(_)) => StatusCode::SERVICE_UNAVAILABLE, Error::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR, _ => StatusCode::BAD_REQUEST, } diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 9635eca6..babde62a 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -71,7 +71,7 @@ async fn serve_file(garage: Arc, req: Request) -> Result, req: Request) -> Result handle_head(garage, &req, &bucket, &key).await?, - &Method::GET => handle_get(garage, &req, bucket, &key).await?, - _ => return Err(Error::BadRequest(format!("HTTP method not supported"))), + let res = match *req.method() { + Method::HEAD => handle_head(garage, &req, &bucket, &key).await?, + Method::GET => handle_get(garage, &req, bucket, &key).await?, + _ => return Err(Error::BadRequest("HTTP method not supported".to_string())), }; Ok(res) @@ -118,7 +118,7 @@ fn authority_to_host(authority: &str) -> Result<&str, Error> { let mut iter = authority.chars().enumerate(); let (_, first_char) = iter .next() - .ok_or(Error::BadRequest(format!("Authority is empty")))?; + .ok_or_else(|| Error::BadRequest("Authority is empty".to_string()))?; let split = match first_char { '[' => { @@ -133,7 +133,7 @@ fn authority_to_host(authority: &str) -> Result<&str, Error> { } } } - _ => iter.skip_while(|(_, c)| c != &':').next(), + _ => iter.find(|(_, c)| *c == ':'), }; match split { @@ -158,7 +158,7 @@ fn host_to_bucket<'a>(host: &'a str, root: &str) -> &'a str { } let len_diff = host.len() - root.len(); - let missing_starting_dot = root.chars().next() != Some('.'); + let missing_starting_dot = !root.starts_with('.'); let cursor = if missing_starting_dot { len_diff - 1 } else { @@ -175,10 +175,10 @@ fn host_to_bucket<'a>(host: &'a str, root: &str) -> &'a str { fn path_to_key<'a>(path: &'a str, index: &str) -> Result, Error> { let path_utf8 = percent_encoding::percent_decode_str(&path).decode_utf8()?; - if path_utf8.chars().next() != Some('/') { - return Err(Error::BadRequest(format!( - "Path must start with a / (slash)" - ))); + if !path_utf8.starts_with('/') { + return Err(Error::BadRequest( + "Path must start with a / (slash)".to_string(), + )); } match path_utf8.chars().last() {