Compare commits

..

17 commits

Author SHA1 Message Date
859b38b0d2 Merge pull request 'fix compilation warnings' (#959) from fixes into main
Reviewed-on: Deuxfleurs/garage#959
2025-02-14 17:32:30 +00:00
2729a71d9d fix warning in garage test 2025-02-14 18:27:00 +01:00
c9d00f5f7b garage_api_s3: remove unused field in ListPartsQuery 2025-02-14 18:25:23 +01:00
89c944ebd6 Merge pull request 's3api: return Location in CompleteMultipartUpload (fix #852)' (#958) from fix-852 into main
Reviewed-on: Deuxfleurs/garage#958
2025-02-14 17:16:58 +00:00
24470377c9 garage_model: fix warning about dead code 2025-02-14 18:12:14 +01:00
5b26545abf fix deprecated uses of chrono in lifecycle worker 2025-02-14 18:08:23 +01:00
9c7e3c7bde remove cargo build options in makefile to avoid mistakes 2025-02-14 18:06:07 +01:00
165f9316e2 s3api: return Location in CompleteMultipartUpload (fix #852)
NB. The location returned is not guaranteed to work in all cases.
This already fixes the parse issue in #852.
2025-02-14 18:05:07 +01:00
a94adf804f Merge pull request 'block manager: avoid deadlock in fix_block_location (fix #845)' (#957) from fix-845 into main
Reviewed-on: Deuxfleurs/garage#957
2025-02-14 16:53:01 +00:00
e4c9a8cd53 block manager: avoid deadlock in fix_block_location (fix #845) 2025-02-14 17:41:50 +01:00
9312c6bbcb Merge pull request 'Store data blocks only on nodes in the latest cluster layout version (fix #815)' (#956) from fix-815 into main
Reviewed-on: Deuxfleurs/garage#956
2025-02-14 15:53:16 +00:00
fdf4dad728 block resync: avoid saving blocks to draining nodes 2025-02-14 16:45:55 +01:00
6820b69f30 block manager: improve read strategy to find blocks faster 2025-02-14 16:45:55 +01:00
d0104b9f9b block manager: write blocks only to currently active layout version (fix #815)
avoid wastefully writing blocks to nodes that will discard them as soon
as the layout migration is finished
2025-02-14 16:45:55 +01:00
3fe8db9e52 Merge pull request 'web_server.rs: Added bucket domain to observability' (#608) from jpds/garage:domain-web-requests into main
Reviewed-on: Deuxfleurs/garage#608
2025-02-14 14:26:08 +00:00
627a37fe9f Merge pull request 's3 api: parse x-id query parameter and warn of any inconsistency (fix #822)' (#954) from fix-822 into main
Reviewed-on: Deuxfleurs/garage#954
2025-02-14 14:07:01 +00:00
c1b39d9ba1 s3 api: parse x-id query parameter and warn of any inconsistency (fix #822) 2025-02-14 14:30:58 +01:00
13 changed files with 136 additions and 45 deletions

View file

@ -2,9 +2,7 @@
all:
clear
cargo build \
--config 'target.x86_64-unknown-linux-gnu.linker="clang"' \
--config 'target.x86_64-unknown-linux-gnu.rustflags=["-C", "link-arg=-fuse-ld=mold"]' \
cargo build
# ----

View file

@ -317,7 +317,6 @@ impl ApiHandler for S3ApiServer {
} => {
let query = ListPartsQuery {
bucket_name: ctx.bucket_name.clone(),
bucket_id,
key,
upload_id,
part_number_marker: part_number_marker.map(|p| p.min(10000)),

View file

@ -54,7 +54,6 @@ pub struct ListMultipartUploadsQuery {
#[derive(Debug)]
pub struct ListPartsQuery {
pub bucket_name: String,
pub bucket_id: Uuid,
pub key: String,
pub upload_id: String,
pub part_number_marker: Option<u64>,
@ -1245,10 +1244,8 @@ mod tests {
#[test]
fn test_fetch_part_info() -> Result<(), Error> {
let uuid = Uuid::from([0x08; 32]);
let mut query = ListPartsQuery {
bucket_name: "a".to_string(),
bucket_id: uuid,
key: "a".to_string(),
upload_id: "xx".to_string(),
part_number_marker: None,

View file

@ -430,7 +430,16 @@ pub async fn handle_complete_multipart_upload(
// Send response saying ok we're done
let result = s3_xml::CompleteMultipartUploadResult {
xmlns: (),
location: None,
// FIXME: the location returned is not always correct:
// - we always return https, but maybe some people do http
// - if root_domain is not specified, a full URL is not returned
location: garage
.config
.s3_api
.root_domain
.as_ref()
.map(|rd| s3_xml::Value(format!("https://{}.{}/{}", bucket_name, rd, key)))
.or(Some(s3_xml::Value(format!("/{}/{}", bucket_name, key)))),
bucket: s3_xml::Value(bucket_name.to_string()),
key: s3_xml::Value(key),
etag: s3_xml::Value(format!("\"{}\"", etag)),

View file

@ -352,6 +352,18 @@ impl Endpoint {
_ => return Err(Error::bad_request("Unknown method")),
};
if let Some(x_id) = query.x_id.take() {
if x_id != res.name() {
// I think AWS ignores the x-id parameter.
// Let's make this at least be a warnin to help debugging.
warn!(
"x-id ({}) does not match parsed endpoint ({})",
x_id,
res.name()
);
}
}
if let Some(message) = query.nonempty_message() {
debug!("Unused query parameter: {}", message)
}
@ -696,7 +708,8 @@ generateQueryParameters! {
"uploadId" => upload_id,
"upload-id-marker" => upload_id_marker,
"versionId" => version_id,
"version-id-marker" => version_id_marker
"version-id-marker" => version_id_marker,
"x-id" => x_id
]
}

View file

@ -370,7 +370,7 @@ impl BlockManager {
prevent_compression: bool,
order_tag: Option<OrderTag>,
) -> Result<(), Error> {
let who = self.replication.write_sets(&hash);
let who = self.system.cluster_layout().current_storage_nodes_of(&hash);
let compression_level = self.compression_level.filter(|_| !prevent_compression);
let (header, bytes) = DataBlock::from_buffer(data, compression_level)
@ -396,7 +396,7 @@ impl BlockManager {
.rpc_helper()
.try_write_many_sets(
&self.endpoint,
who.as_ref(),
&[who],
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
.with_drop_on_completion(permit)
@ -668,10 +668,12 @@ impl BlockManager {
hash: &Hash,
wrong_path: DataBlockPath,
) -> Result<usize, Error> {
let data = self.read_block_from(hash, &wrong_path).await?;
self.lock_mutate(hash)
.await
.fix_block_location(hash, wrong_path, self)
.await
.write_block_inner(hash, &data, self, Some(wrong_path))
.await?;
Ok(data.as_parts_ref().1.len())
}
async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
@ -827,18 +829,6 @@ impl BlockManagerLocked {
}
Ok(())
}
async fn fix_block_location(
&self,
hash: &Hash,
wrong_path: DataBlockPath,
mgr: &BlockManager,
) -> Result<usize, Error> {
let data = mgr.read_block_from(hash, &wrong_path).await?;
self.write_block_inner(hash, &data, mgr, Some(wrong_path))
.await?;
Ok(data.as_parts_ref().1.len())
}
}
struct DeleteOnDrop(Option<PathBuf>);

View file

@ -377,7 +377,10 @@ impl BlockResyncManager {
info!("Resync block {:?}: offloading and deleting", hash);
let existing_path = existing_path.unwrap();
let mut who = manager.replication.storage_nodes(hash);
let mut who = manager
.system
.cluster_layout()
.current_storage_nodes_of(hash);
if who.len() < manager.replication.write_quorum() {
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
}
@ -455,6 +458,25 @@ impl BlockResyncManager {
}
if rc.is_nonzero() && !exists {
// The refcount is > 0, and the block is not present locally.
// We might need to fetch it from another node.
// First, check whether we are still supposed to store that
// block in the latest cluster layout version.
let storage_nodes = manager
.system
.cluster_layout()
.current_storage_nodes_of(&hash);
if !storage_nodes.contains(&manager.system.id) {
info!(
"Resync block {:?}: block is absent with refcount > 0, but it will drop to zero after all metadata is synced. Not fetching the block.",
hash
);
return Ok(());
}
// We know we need the block. Fetch it.
info!(
"Resync block {:?}: fetching absent but needed block (refcount > 0)",
hash

View file

@ -13,7 +13,6 @@ static GARAGE_TEST_SECRET: &str =
#[derive(Debug, Default, Clone)]
pub struct Key {
pub name: Option<String>,
pub id: String,
pub secret: String,
}
@ -213,10 +212,7 @@ api_bind_addr = "127.0.0.1:{admin_port}"
assert!(!key.id.is_empty(), "Invalid key: Key ID is empty");
assert!(!key.secret.is_empty(), "Invalid key: Key secret is empty");
Key {
name: maybe_name.map(String::from),
..key
}
key
}
}

View file

@ -329,7 +329,7 @@ impl Garage {
pub async fn locked_helper(&self) -> helper::locked::LockedHelper {
let lock = self.bucket_lock.lock().await;
helper::locked::LockedHelper(self, lock)
helper::locked::LockedHelper(self, Some(lock))
}
}

View file

@ -27,9 +27,16 @@ use crate::permission::BucketKeyPerm;
/// See issues: #649, #723
pub struct LockedHelper<'a>(
pub(crate) &'a Garage,
pub(crate) tokio::sync::MutexGuard<'a, ()>,
pub(crate) Option<tokio::sync::MutexGuard<'a, ()>>,
);
impl<'a> Drop for LockedHelper<'a> {
fn drop(&mut self) {
// make it explicit that the mutexguard lives until here
drop(self.1.take())
}
}
#[allow(clippy::ptr_arg)]
impl<'a> LockedHelper<'a> {
pub fn bucket(&self) -> BucketHelper<'a> {

View file

@ -395,13 +395,13 @@ fn midnight_ts(date: NaiveDate, use_local_tz: bool) -> u64 {
.expect("bad local midnight")
.timestamp_millis() as u64;
}
midnight.timestamp_millis() as u64
midnight.and_utc().timestamp_millis() as u64
}
fn next_date(ts: u64) -> NaiveDate {
NaiveDateTime::from_timestamp_millis(ts as i64)
DateTime::<Utc>::from_timestamp_millis(ts as i64)
.expect("bad timestamp")
.date()
.date_naive()
.succ_opt()
.expect("no next day")
}

View file

@ -219,6 +219,11 @@ impl LayoutHelper {
ret
}
pub fn current_storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
let ver = self.current();
ver.nodes_of(position, ver.replication_factor).collect()
}
pub fn trackers_hash(&self) -> Hash {
self.trackers_hash
}

View file

@ -540,19 +540,73 @@ impl RpcHelper {
// ---- functions not related to MAKING RPCs, but just determining to what nodes
// they should be made and in which order ----
/// Determine to what nodes, and in what order, requests to read a data block
/// should be sent. All nodes in the Vec returned by this function are tried
/// one by one until there is one that returns the block (in block/manager.rs).
///
/// We want to have the best chance of finding the block in as few requests
/// as possible, and we want to avoid nodes that answer slowly.
///
/// Note that when there are several active layout versions, the block might
/// be stored only by nodes of the latest version (in case of a block that was
/// written after the layout change), or only by nodes of the oldest active
/// version (for all blocks that were written before). So we have to try nodes
/// of all layout versions. We also want to try nodes of all layout versions
/// fast, so as to optimize the chance of finding the block fast.
///
/// Therefore, the strategy is the following:
///
/// 1. ask first all nodes of all currently active layout versions
/// -> ask the preferred node in all layout versions (older to newer),
/// then the second preferred onde in all verions, etc.
/// -> we start by the oldest active layout version first, because a majority
/// of blocks will have been saved before the layout change
/// 2. ask all nodes of historical layout versions, for blocks which have not
/// yet been transferred to their new storage nodes
///
/// The preference order, for each layout version, is given by `request_order`,
/// based on factors such as nodes being in the same datacenter,
/// having low ping, etc.
pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec<Uuid> {
let layout = self.0.layout.read().unwrap();
let mut ret = Vec::with_capacity(12);
let ver_iter = layout
.versions()
.iter()
.rev()
.chain(layout.inner().old_versions.iter().rev());
for ver in ver_iter {
if ver.version > layout.sync_map_min() {
continue;
// Compute, for each layout version, the set of nodes that might store
// the block, and put them in their preferred order as of `request_order`.
let mut vernodes = layout.versions().iter().map(|ver| {
let nodes = ver.nodes_of(position, ver.replication_factor);
rpc_helper.request_order(layout.current(), nodes)
});
let mut ret = if layout.versions().len() == 1 {
// If we have only one active layout version, then these are the
// only nodes we ask in step 1
vernodes.next().unwrap()
} else {
let vernodes = vernodes.collect::<Vec<_>>();
let mut nodes = Vec::<Uuid>::with_capacity(12);
for i in 0..layout.current().replication_factor {
for vn in vernodes.iter() {
if let Some(n) = vn.get(i) {
if !nodes.contains(&n) {
if *n == self.0.our_node_id {
// it's always fast (almost free) to ask locally,
// so always put that as first choice
nodes.insert(0, *n);
} else {
nodes.push(*n);
}
}
}
}
}
nodes
};
// Second step: add nodes of older layout versions
let old_ver_iter = layout.inner().old_versions.iter().rev();
for ver in old_ver_iter {
let nodes = ver.nodes_of(position, ver.replication_factor);
for node in rpc_helper.request_order(layout.current(), nodes) {
if !ret.contains(&node) {
@ -560,6 +614,7 @@ impl RpcHelper {
}
}
}
ret
}