forked from Deuxfleurs/garage
Merge pull request 'Many clippy lints fixed' (#488) from k2v-watch-range-2 into main
Reviewed-on: Deuxfleurs/garage#488
This commit is contained in:
commit
f2c256cac4
19 changed files with 40 additions and 37 deletions
|
@ -103,7 +103,7 @@ impl AdminApiServer {
|
||||||
.bucket_helper()
|
.bucket_helper()
|
||||||
.resolve_global_bucket_name(&domain_string)
|
.resolve_global_bucket_name(&domain_string)
|
||||||
.await?
|
.await?
|
||||||
.ok_or_else(|| HelperError::NoSuchBucket(domain_string))?;
|
.ok_or(HelperError::NoSuchBucket(domain_string))?;
|
||||||
|
|
||||||
let bucket = self
|
let bucket = self
|
||||||
.garage
|
.garage
|
||||||
|
|
|
@ -167,7 +167,7 @@ async fn bucket_info_results(
|
||||||
let quotas = state.quotas.get();
|
let quotas = state.quotas.get();
|
||||||
let res =
|
let res =
|
||||||
GetBucketInfoResult {
|
GetBucketInfoResult {
|
||||||
id: hex::encode(&bucket.id),
|
id: hex::encode(bucket.id),
|
||||||
global_aliases: state
|
global_aliases: state
|
||||||
.aliases
|
.aliases
|
||||||
.items()
|
.items()
|
||||||
|
@ -575,6 +575,6 @@ pub async fn handle_local_unalias_bucket(
|
||||||
// ---- HELPER ----
|
// ---- HELPER ----
|
||||||
|
|
||||||
fn parse_bucket_id(id: &str) -> Result<Uuid, Error> {
|
fn parse_bucket_id(id: &str) -> Result<Uuid, Error> {
|
||||||
let id_hex = hex::decode(&id).ok_or_bad_request("Invalid bucket id")?;
|
let id_hex = hex::decode(id).ok_or_bad_request("Invalid bucket id")?;
|
||||||
Ok(Uuid::try_from(&id_hex).ok_or_bad_request("Invalid bucket id")?)
|
Ok(Uuid::try_from(&id_hex).ok_or_bad_request("Invalid bucket id")?)
|
||||||
}
|
}
|
||||||
|
|
|
@ -305,7 +305,7 @@ fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option<Option<String>> {
|
||||||
let mut ret = None;
|
let mut ret = None;
|
||||||
for item in cbc.children() {
|
for item in cbc.children() {
|
||||||
if item.has_tag_name("LocationConstraint") {
|
if item.has_tag_name("LocationConstraint") {
|
||||||
if ret != None {
|
if ret.is_some() {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
ret = Some(item.text()?.to_string());
|
ret = Some(item.text()?.to_string());
|
||||||
|
|
|
@ -140,7 +140,7 @@ pub async fn handle_post_object(
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let decoded_policy = BASE64_STANDARD
|
let decoded_policy = BASE64_STANDARD
|
||||||
.decode(&policy)
|
.decode(policy)
|
||||||
.ok_or_bad_request("Invalid policy")?;
|
.ok_or_bad_request("Invalid policy")?;
|
||||||
let decoded_policy: Policy =
|
let decoded_policy: Policy =
|
||||||
serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?;
|
serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?;
|
||||||
|
|
|
@ -38,7 +38,7 @@ impl BlockManagerMetrics {
|
||||||
.u64_value_observer("block.compression_level", move |observer| {
|
.u64_value_observer("block.compression_level", move |observer| {
|
||||||
match compression_level {
|
match compression_level {
|
||||||
Some(v) => observer.observe(v as u64, &[]),
|
Some(v) => observer.observe(v as u64, &[]),
|
||||||
None => observer.observe(0 as u64, &[]),
|
None => observer.observe(0_u64, &[]),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.with_description("Garage compression level for node")
|
.with_description("Garage compression level for node")
|
||||||
|
|
|
@ -24,9 +24,9 @@ impl BlockRc {
|
||||||
tx: &mut db::Transaction,
|
tx: &mut db::Transaction,
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
) -> db::TxOpResult<bool> {
|
) -> db::TxOpResult<bool> {
|
||||||
let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
|
let old_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
|
||||||
match old_rc.increment().serialize() {
|
match old_rc.increment().serialize() {
|
||||||
Some(x) => tx.insert(&self.rc, &hash, x)?,
|
Some(x) => tx.insert(&self.rc, hash, x)?,
|
||||||
None => unreachable!(),
|
None => unreachable!(),
|
||||||
};
|
};
|
||||||
Ok(old_rc.is_zero())
|
Ok(old_rc.is_zero())
|
||||||
|
@ -39,10 +39,10 @@ impl BlockRc {
|
||||||
tx: &mut db::Transaction,
|
tx: &mut db::Transaction,
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
) -> db::TxOpResult<bool> {
|
) -> db::TxOpResult<bool> {
|
||||||
let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement();
|
let new_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?).decrement();
|
||||||
match new_rc.serialize() {
|
match new_rc.serialize() {
|
||||||
Some(x) => tx.insert(&self.rc, &hash, x)?,
|
Some(x) => tx.insert(&self.rc, hash, x)?,
|
||||||
None => tx.remove(&self.rc, &hash)?,
|
None => tx.remove(&self.rc, hash)?,
|
||||||
};
|
};
|
||||||
Ok(matches!(new_rc, RcEntry::Deletable { .. }))
|
Ok(matches!(new_rc, RcEntry::Deletable { .. }))
|
||||||
}
|
}
|
||||||
|
@ -57,10 +57,10 @@ impl BlockRc {
|
||||||
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
|
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
|
||||||
let now = now_msec();
|
let now = now_msec();
|
||||||
self.rc.db().transaction(|mut tx| {
|
self.rc.db().transaction(|mut tx| {
|
||||||
let rcval = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
|
let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
|
||||||
match rcval {
|
match rcval {
|
||||||
RcEntry::Deletable { at_time } if now > at_time => {
|
RcEntry::Deletable { at_time } if now > at_time => {
|
||||||
tx.remove(&self.rc, &hash)?;
|
tx.remove(&self.rc, hash)?;
|
||||||
}
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
};
|
};
|
||||||
|
|
|
@ -466,11 +466,11 @@ impl BlockStoreIterator {
|
||||||
let ent_type = data_dir_ent.file_type().await?;
|
let ent_type = data_dir_ent.file_type().await?;
|
||||||
|
|
||||||
let name = name.strip_suffix(".zst").unwrap_or(&name);
|
let name = name.strip_suffix(".zst").unwrap_or(&name);
|
||||||
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
|
if name.len() == 2 && hex::decode(name).is_ok() && ent_type.is_dir() {
|
||||||
let path = data_dir_ent.path();
|
let path = data_dir_ent.path();
|
||||||
self.path.push(ReadingDir::Pending(path));
|
self.path.push(ReadingDir::Pending(path));
|
||||||
} else if name.len() == 64 {
|
} else if name.len() == 64 {
|
||||||
if let Ok(h) = hex::decode(&name) {
|
if let Ok(h) = hex::decode(name) {
|
||||||
let mut hash = [0u8; 32];
|
let mut hash = [0u8; 32];
|
||||||
hash.copy_from_slice(&h);
|
hash.copy_from_slice(&h);
|
||||||
return Ok(Some(hash.into()));
|
return Ok(Some(hash.into()));
|
||||||
|
|
|
@ -60,7 +60,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
|
||||||
|
|
||||||
println!("==== HEALTHY NODES ====");
|
println!("==== HEALTHY NODES ====");
|
||||||
let mut healthy_nodes =
|
let mut healthy_nodes =
|
||||||
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail\tMetaAvail".to_string()];
|
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()];
|
||||||
for adv in status.iter().filter(|adv| adv.is_up) {
|
for adv in status.iter().filter(|adv| adv.is_up) {
|
||||||
match layout.roles.get(&adv.id) {
|
match layout.roles.get(&adv.id) {
|
||||||
Some(NodeRoleV(Some(cfg))) => {
|
Some(NodeRoleV(Some(cfg))) => {
|
||||||
|
|
|
@ -14,11 +14,11 @@ pub fn node_id_command(config_file: PathBuf, quiet: bool) -> Result<(), Error> {
|
||||||
garage_rpc::system::read_node_id(&config.metadata_dir).err_context(READ_KEY_ERROR)?;
|
garage_rpc::system::read_node_id(&config.metadata_dir).err_context(READ_KEY_ERROR)?;
|
||||||
|
|
||||||
let idstr = if let Some(addr) = config.rpc_public_addr {
|
let idstr = if let Some(addr) = config.rpc_public_addr {
|
||||||
let idstr = format!("{}@{}", hex::encode(&node_id), addr);
|
let idstr = format!("{}@{}", hex::encode(node_id), addr);
|
||||||
println!("{}", idstr);
|
println!("{}", idstr);
|
||||||
idstr
|
idstr
|
||||||
} else {
|
} else {
|
||||||
let idstr = hex::encode(&node_id);
|
let idstr = hex::encode(node_id);
|
||||||
println!("{}", idstr);
|
println!("{}", idstr);
|
||||||
|
|
||||||
if !quiet {
|
if !quiet {
|
||||||
|
|
|
@ -229,7 +229,7 @@ pub fn find_matching_node(
|
||||||
) -> Result<Uuid, Error> {
|
) -> Result<Uuid, Error> {
|
||||||
let mut candidates = vec![];
|
let mut candidates = vec![];
|
||||||
for c in cand {
|
for c in cand {
|
||||||
if hex::encode(&c).starts_with(&pattern) && !candidates.contains(&c) {
|
if hex::encode(c).starts_with(pattern) && !candidates.contains(&c) {
|
||||||
candidates.push(c);
|
candidates.push(c);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use base64::prelude::*;
|
||||||
use hyper::{Method, StatusCode};
|
use hyper::{Method, StatusCode};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
@ -160,7 +161,7 @@ async fn test_poll_range() {
|
||||||
json!(
|
json!(
|
||||||
{
|
{
|
||||||
"items": [
|
"items": [
|
||||||
{"sk": "test1", "ct": ct, "v": [base64::encode(b"Initial value")]},
|
{"sk": "test1", "ct": ct, "v": [BASE64_STANDARD.encode(b"Initial value")]},
|
||||||
],
|
],
|
||||||
"seenMarker": seen_marker,
|
"seenMarker": seen_marker,
|
||||||
}
|
}
|
||||||
|
@ -212,7 +213,7 @@ async fn test_poll_range() {
|
||||||
assert_json_eq!(&json_res["items"][0]["sk"], json!("test1"));
|
assert_json_eq!(&json_res["items"][0]["sk"], json!("test1"));
|
||||||
assert_json_eq!(
|
assert_json_eq!(
|
||||||
&json_res["items"][0]["v"],
|
&json_res["items"][0]["v"],
|
||||||
json!([base64::encode(b"New value")])
|
json!([BASE64_STANDARD.encode(b"New value")])
|
||||||
);
|
);
|
||||||
|
|
||||||
// Now we will add a value on a different key
|
// Now we will add a value on a different key
|
||||||
|
@ -259,6 +260,6 @@ async fn test_poll_range() {
|
||||||
assert_json_eq!(&json_res["items"][0]["sk"], json!("test2"));
|
assert_json_eq!(&json_res["items"][0]["sk"], json!("test2"));
|
||||||
assert_json_eq!(
|
assert_json_eq!(
|
||||||
&json_res["items"][0]["v"],
|
&json_res["items"][0]["v"],
|
||||||
json!([base64::encode(b"Other value")])
|
json!([BASE64_STANDARD.encode(b"Other value")])
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,7 +57,7 @@ async fn test_website() {
|
||||||
Request::builder()
|
Request::builder()
|
||||||
.method("GET")
|
.method("GET")
|
||||||
.uri(format!("http://127.0.0.1:{}/check", ctx.garage.admin_port))
|
.uri(format!("http://127.0.0.1:{}/check", ctx.garage.admin_port))
|
||||||
.header("domain", format!("{}", BCKT_NAME))
|
.header("domain", BCKT_NAME.to_string())
|
||||||
.body(Body::empty())
|
.body(Body::empty())
|
||||||
.unwrap()
|
.unwrap()
|
||||||
};
|
};
|
||||||
|
@ -92,7 +92,7 @@ async fn test_website() {
|
||||||
Request::builder()
|
Request::builder()
|
||||||
.method("GET")
|
.method("GET")
|
||||||
.uri(format!("http://127.0.0.1:{}/check", ctx.garage.admin_port))
|
.uri(format!("http://127.0.0.1:{}/check", ctx.garage.admin_port))
|
||||||
.header("domain", format!("{}", BCKT_NAME))
|
.header("domain", BCKT_NAME.to_string())
|
||||||
.body(Body::empty())
|
.body(Body::empty())
|
||||||
.unwrap()
|
.unwrap()
|
||||||
};
|
};
|
||||||
|
@ -121,7 +121,7 @@ async fn test_website() {
|
||||||
Request::builder()
|
Request::builder()
|
||||||
.method("GET")
|
.method("GET")
|
||||||
.uri(format!("http://127.0.0.1:{}/check", ctx.garage.admin_port))
|
.uri(format!("http://127.0.0.1:{}/check", ctx.garage.admin_port))
|
||||||
.header("domain", format!("{}", BCKT_NAME))
|
.header("domain", BCKT_NAME.to_string())
|
||||||
.body(Body::empty())
|
.body(Body::empty())
|
||||||
.unwrap()
|
.unwrap()
|
||||||
};
|
};
|
||||||
|
|
|
@ -159,7 +159,7 @@ impl Garage {
|
||||||
};
|
};
|
||||||
|
|
||||||
let network_key = NetworkKey::from_slice(
|
let network_key = NetworkKey::from_slice(
|
||||||
&hex::decode(&config.rpc_secret.as_ref().unwrap()).expect("Invalid RPC secret key")[..],
|
&hex::decode(config.rpc_secret.as_ref().unwrap()).expect("Invalid RPC secret key")[..],
|
||||||
)
|
)
|
||||||
.expect("Invalid RPC secret key");
|
.expect("Invalid RPC secret key");
|
||||||
|
|
||||||
|
|
|
@ -269,6 +269,7 @@ impl CountedItem for K2VItem {
|
||||||
&self.partition.partition_key
|
&self.partition.partition_key
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::bool_to_int_with_if)]
|
||||||
fn counts(&self) -> Vec<(&'static str, i64)> {
|
fn counts(&self) -> Vec<(&'static str, i64)> {
|
||||||
let values = self.values();
|
let values = self.values();
|
||||||
|
|
||||||
|
@ -313,7 +314,7 @@ mod tests {
|
||||||
values: vec![(6, DvvsValue::Value(vec![16])), (7, DvvsValue::Deleted)],
|
values: vec![(6, DvvsValue::Value(vec![16])), (7, DvvsValue::Deleted)],
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut e3 = e1.clone();
|
let mut e3 = e1;
|
||||||
e3.merge(&e2);
|
e3.merge(&e2);
|
||||||
assert_eq!(e2, e3);
|
assert_eq!(e2, e3);
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,7 @@ use crate::k2v::sub::*;
|
||||||
|
|
||||||
const POLL_RANGE_EXTRA_DELAY: Duration = Duration::from_millis(200);
|
const POLL_RANGE_EXTRA_DELAY: Duration = Duration::from_millis(200);
|
||||||
|
|
||||||
const TIMESTAMP_KEY: &'static [u8] = b"timestamp";
|
const TIMESTAMP_KEY: &[u8] = b"timestamp";
|
||||||
|
|
||||||
/// RPC messages for K2V
|
/// RPC messages for K2V
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
@ -418,7 +418,7 @@ impl K2VRpcHandler {
|
||||||
.data
|
.data
|
||||||
.update_entry_with(&item.partition, &item.sort_key, |tx, ent| {
|
.update_entry_with(&item.partition, &item.sort_key, |tx, ent| {
|
||||||
let old_local_timestamp = tx
|
let old_local_timestamp = tx
|
||||||
.get(&local_timestamp_tree, TIMESTAMP_KEY)?
|
.get(local_timestamp_tree, TIMESTAMP_KEY)?
|
||||||
.and_then(|x| x.try_into().ok())
|
.and_then(|x| x.try_into().ok())
|
||||||
.map(u64::from_be_bytes)
|
.map(u64::from_be_bytes)
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
@ -438,7 +438,7 @@ impl K2VRpcHandler {
|
||||||
);
|
);
|
||||||
|
|
||||||
tx.insert(
|
tx.insert(
|
||||||
&local_timestamp_tree,
|
local_timestamp_tree,
|
||||||
TIMESTAMP_KEY,
|
TIMESTAMP_KEY,
|
||||||
u64::to_be_bytes(new_local_timestamp),
|
u64::to_be_bytes(new_local_timestamp),
|
||||||
)?;
|
)?;
|
||||||
|
|
|
@ -63,7 +63,7 @@ impl RangeSeenMarker {
|
||||||
None => {
|
None => {
|
||||||
self.items.insert(item.sort_key.clone(), cc.vector_clock);
|
self.items.insert(item.sort_key.clone(), cc.vector_clock);
|
||||||
}
|
}
|
||||||
Some(ent) => *ent = vclock_max(&ent, &cc.vector_clock),
|
Some(ent) => *ent = vclock_max(ent, &cc.vector_clock),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -71,7 +71,7 @@ impl RangeSeenMarker {
|
||||||
|
|
||||||
pub fn canonicalize(&mut self) {
|
pub fn canonicalize(&mut self) {
|
||||||
let self_vc = &self.vector_clock;
|
let self_vc = &self.vector_clock;
|
||||||
self.items.retain(|_sk, vc| vclock_gt(&vc, self_vc))
|
self.items.retain(|_sk, vc| vclock_gt(vc, self_vc))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn encode(&mut self) -> Result<String, Error> {
|
pub fn encode(&mut self) -> Result<String, Error> {
|
||||||
|
@ -84,7 +84,7 @@ impl RangeSeenMarker {
|
||||||
|
|
||||||
/// Decode from msgpack+zstd+b64 representation, returns None on error.
|
/// Decode from msgpack+zstd+b64 representation, returns None on error.
|
||||||
pub fn decode(s: &str) -> Option<Self> {
|
pub fn decode(s: &str) -> Option<Self> {
|
||||||
let bytes = BASE64_STANDARD.decode(&s).ok()?;
|
let bytes = BASE64_STANDARD.decode(s).ok()?;
|
||||||
let bytes = zstd::stream::decode_all(&mut &bytes[..]).ok()?;
|
let bytes = zstd::stream::decode_all(&mut &bytes[..]).ok()?;
|
||||||
nonversioned_decode(&bytes).ok()
|
nonversioned_decode(&bytes).ok()
|
||||||
}
|
}
|
||||||
|
@ -99,7 +99,7 @@ impl RangeSeenMarker {
|
||||||
&& self
|
&& self
|
||||||
.items
|
.items
|
||||||
.get(&item.sort_key)
|
.get(&item.sort_key)
|
||||||
.map(|vc| vclock_gt(&cc.vector_clock, &vc))
|
.map(|vc| vclock_gt(&cc.vector_clock, vc))
|
||||||
.unwrap_or(true)
|
.unwrap_or(true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -355,6 +355,7 @@ impl CountedItem for Object {
|
||||||
|
|
||||||
fn counts(&self) -> Vec<(&'static str, i64)> {
|
fn counts(&self) -> Vec<(&'static str, i64)> {
|
||||||
let versions = self.versions();
|
let versions = self.versions();
|
||||||
|
#[allow(clippy::bool_to_int_with_if)]
|
||||||
let n_objects = if versions.iter().any(|v| v.is_data()) {
|
let n_objects = if versions.iter().any(|v| v.is_data()) {
|
||||||
1
|
1
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -113,7 +113,7 @@ impl ConsulDiscovery {
|
||||||
let pubkey = ent
|
let pubkey = ent
|
||||||
.node_meta
|
.node_meta
|
||||||
.get("pubkey")
|
.get("pubkey")
|
||||||
.and_then(|k| hex::decode(&k).ok())
|
.and_then(|k| hex::decode(k).ok())
|
||||||
.and_then(|k| NodeID::from_slice(&k[..]));
|
.and_then(|k| NodeID::from_slice(&k[..]));
|
||||||
if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
|
if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
|
||||||
ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
|
ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
|
||||||
|
|
|
@ -215,7 +215,7 @@ pub fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> {
|
||||||
} else {
|
} else {
|
||||||
if !metadata_dir.exists() {
|
if !metadata_dir.exists() {
|
||||||
info!("Metadata directory does not exist, creating it.");
|
info!("Metadata directory does not exist, creating it.");
|
||||||
std::fs::create_dir(&metadata_dir)?;
|
std::fs::create_dir(metadata_dir)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Generating new node key pair.");
|
info!("Generating new node key pair.");
|
||||||
|
@ -419,7 +419,7 @@ impl System {
|
||||||
.get(&n.id.into())
|
.get(&n.id.into())
|
||||||
.cloned()
|
.cloned()
|
||||||
.map(|(_, st)| st)
|
.map(|(_, st)| st)
|
||||||
.unwrap_or(NodeStatus::unknown()),
|
.unwrap_or_else(NodeStatus::unknown),
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
known_nodes
|
known_nodes
|
||||||
|
|
Loading…
Reference in a new issue