forked from Deuxfleurs/garage
Compare commits
17 commits
domain-web
...
main
Author | SHA1 | Date | |
---|---|---|---|
859b38b0d2 | |||
2729a71d9d | |||
c9d00f5f7b | |||
89c944ebd6 | |||
24470377c9 | |||
5b26545abf | |||
9c7e3c7bde | |||
165f9316e2 | |||
a94adf804f | |||
e4c9a8cd53 | |||
9312c6bbcb | |||
fdf4dad728 | |||
6820b69f30 | |||
d0104b9f9b | |||
3fe8db9e52 | |||
627a37fe9f | |||
c1b39d9ba1 |
13 changed files with 136 additions and 45 deletions
4
Makefile
4
Makefile
|
@ -2,9 +2,7 @@
|
|||
|
||||
all:
|
||||
clear
|
||||
cargo build \
|
||||
--config 'target.x86_64-unknown-linux-gnu.linker="clang"' \
|
||||
--config 'target.x86_64-unknown-linux-gnu.rustflags=["-C", "link-arg=-fuse-ld=mold"]' \
|
||||
cargo build
|
||||
|
||||
# ----
|
||||
|
||||
|
|
|
@ -317,7 +317,6 @@ impl ApiHandler for S3ApiServer {
|
|||
} => {
|
||||
let query = ListPartsQuery {
|
||||
bucket_name: ctx.bucket_name.clone(),
|
||||
bucket_id,
|
||||
key,
|
||||
upload_id,
|
||||
part_number_marker: part_number_marker.map(|p| p.min(10000)),
|
||||
|
|
|
@ -54,7 +54,6 @@ pub struct ListMultipartUploadsQuery {
|
|||
#[derive(Debug)]
|
||||
pub struct ListPartsQuery {
|
||||
pub bucket_name: String,
|
||||
pub bucket_id: Uuid,
|
||||
pub key: String,
|
||||
pub upload_id: String,
|
||||
pub part_number_marker: Option<u64>,
|
||||
|
@ -1245,10 +1244,8 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_fetch_part_info() -> Result<(), Error> {
|
||||
let uuid = Uuid::from([0x08; 32]);
|
||||
let mut query = ListPartsQuery {
|
||||
bucket_name: "a".to_string(),
|
||||
bucket_id: uuid,
|
||||
key: "a".to_string(),
|
||||
upload_id: "xx".to_string(),
|
||||
part_number_marker: None,
|
||||
|
|
|
@ -430,7 +430,16 @@ pub async fn handle_complete_multipart_upload(
|
|||
// Send response saying ok we're done
|
||||
let result = s3_xml::CompleteMultipartUploadResult {
|
||||
xmlns: (),
|
||||
location: None,
|
||||
// FIXME: the location returned is not always correct:
|
||||
// - we always return https, but maybe some people do http
|
||||
// - if root_domain is not specified, a full URL is not returned
|
||||
location: garage
|
||||
.config
|
||||
.s3_api
|
||||
.root_domain
|
||||
.as_ref()
|
||||
.map(|rd| s3_xml::Value(format!("https://{}.{}/{}", bucket_name, rd, key)))
|
||||
.or(Some(s3_xml::Value(format!("/{}/{}", bucket_name, key)))),
|
||||
bucket: s3_xml::Value(bucket_name.to_string()),
|
||||
key: s3_xml::Value(key),
|
||||
etag: s3_xml::Value(format!("\"{}\"", etag)),
|
||||
|
|
|
@ -352,6 +352,18 @@ impl Endpoint {
|
|||
_ => return Err(Error::bad_request("Unknown method")),
|
||||
};
|
||||
|
||||
if let Some(x_id) = query.x_id.take() {
|
||||
if x_id != res.name() {
|
||||
// I think AWS ignores the x-id parameter.
|
||||
// Let's make this at least be a warnin to help debugging.
|
||||
warn!(
|
||||
"x-id ({}) does not match parsed endpoint ({})",
|
||||
x_id,
|
||||
res.name()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(message) = query.nonempty_message() {
|
||||
debug!("Unused query parameter: {}", message)
|
||||
}
|
||||
|
@ -696,7 +708,8 @@ generateQueryParameters! {
|
|||
"uploadId" => upload_id,
|
||||
"upload-id-marker" => upload_id_marker,
|
||||
"versionId" => version_id,
|
||||
"version-id-marker" => version_id_marker
|
||||
"version-id-marker" => version_id_marker,
|
||||
"x-id" => x_id
|
||||
]
|
||||
}
|
||||
|
||||
|
|
|
@ -370,7 +370,7 @@ impl BlockManager {
|
|||
prevent_compression: bool,
|
||||
order_tag: Option<OrderTag>,
|
||||
) -> Result<(), Error> {
|
||||
let who = self.replication.write_sets(&hash);
|
||||
let who = self.system.cluster_layout().current_storage_nodes_of(&hash);
|
||||
|
||||
let compression_level = self.compression_level.filter(|_| !prevent_compression);
|
||||
let (header, bytes) = DataBlock::from_buffer(data, compression_level)
|
||||
|
@ -396,7 +396,7 @@ impl BlockManager {
|
|||
.rpc_helper()
|
||||
.try_write_many_sets(
|
||||
&self.endpoint,
|
||||
who.as_ref(),
|
||||
&[who],
|
||||
put_block_rpc,
|
||||
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
|
||||
.with_drop_on_completion(permit)
|
||||
|
@ -668,10 +668,12 @@ impl BlockManager {
|
|||
hash: &Hash,
|
||||
wrong_path: DataBlockPath,
|
||||
) -> Result<usize, Error> {
|
||||
let data = self.read_block_from(hash, &wrong_path).await?;
|
||||
self.lock_mutate(hash)
|
||||
.await
|
||||
.fix_block_location(hash, wrong_path, self)
|
||||
.await
|
||||
.write_block_inner(hash, &data, self, Some(wrong_path))
|
||||
.await?;
|
||||
Ok(data.as_parts_ref().1.len())
|
||||
}
|
||||
|
||||
async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
|
||||
|
@ -827,18 +829,6 @@ impl BlockManagerLocked {
|
|||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn fix_block_location(
|
||||
&self,
|
||||
hash: &Hash,
|
||||
wrong_path: DataBlockPath,
|
||||
mgr: &BlockManager,
|
||||
) -> Result<usize, Error> {
|
||||
let data = mgr.read_block_from(hash, &wrong_path).await?;
|
||||
self.write_block_inner(hash, &data, mgr, Some(wrong_path))
|
||||
.await?;
|
||||
Ok(data.as_parts_ref().1.len())
|
||||
}
|
||||
}
|
||||
|
||||
struct DeleteOnDrop(Option<PathBuf>);
|
||||
|
|
|
@ -377,7 +377,10 @@ impl BlockResyncManager {
|
|||
info!("Resync block {:?}: offloading and deleting", hash);
|
||||
let existing_path = existing_path.unwrap();
|
||||
|
||||
let mut who = manager.replication.storage_nodes(hash);
|
||||
let mut who = manager
|
||||
.system
|
||||
.cluster_layout()
|
||||
.current_storage_nodes_of(hash);
|
||||
if who.len() < manager.replication.write_quorum() {
|
||||
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
|
||||
}
|
||||
|
@ -455,6 +458,25 @@ impl BlockResyncManager {
|
|||
}
|
||||
|
||||
if rc.is_nonzero() && !exists {
|
||||
// The refcount is > 0, and the block is not present locally.
|
||||
// We might need to fetch it from another node.
|
||||
|
||||
// First, check whether we are still supposed to store that
|
||||
// block in the latest cluster layout version.
|
||||
let storage_nodes = manager
|
||||
.system
|
||||
.cluster_layout()
|
||||
.current_storage_nodes_of(&hash);
|
||||
|
||||
if !storage_nodes.contains(&manager.system.id) {
|
||||
info!(
|
||||
"Resync block {:?}: block is absent with refcount > 0, but it will drop to zero after all metadata is synced. Not fetching the block.",
|
||||
hash
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// We know we need the block. Fetch it.
|
||||
info!(
|
||||
"Resync block {:?}: fetching absent but needed block (refcount > 0)",
|
||||
hash
|
||||
|
|
|
@ -13,7 +13,6 @@ static GARAGE_TEST_SECRET: &str =
|
|||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct Key {
|
||||
pub name: Option<String>,
|
||||
pub id: String,
|
||||
pub secret: String,
|
||||
}
|
||||
|
@ -213,10 +212,7 @@ api_bind_addr = "127.0.0.1:{admin_port}"
|
|||
assert!(!key.id.is_empty(), "Invalid key: Key ID is empty");
|
||||
assert!(!key.secret.is_empty(), "Invalid key: Key secret is empty");
|
||||
|
||||
Key {
|
||||
name: maybe_name.map(String::from),
|
||||
..key
|
||||
}
|
||||
key
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -329,7 +329,7 @@ impl Garage {
|
|||
|
||||
pub async fn locked_helper(&self) -> helper::locked::LockedHelper {
|
||||
let lock = self.bucket_lock.lock().await;
|
||||
helper::locked::LockedHelper(self, lock)
|
||||
helper::locked::LockedHelper(self, Some(lock))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,9 +27,16 @@ use crate::permission::BucketKeyPerm;
|
|||
/// See issues: #649, #723
|
||||
pub struct LockedHelper<'a>(
|
||||
pub(crate) &'a Garage,
|
||||
pub(crate) tokio::sync::MutexGuard<'a, ()>,
|
||||
pub(crate) Option<tokio::sync::MutexGuard<'a, ()>>,
|
||||
);
|
||||
|
||||
impl<'a> Drop for LockedHelper<'a> {
|
||||
fn drop(&mut self) {
|
||||
// make it explicit that the mutexguard lives until here
|
||||
drop(self.1.take())
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::ptr_arg)]
|
||||
impl<'a> LockedHelper<'a> {
|
||||
pub fn bucket(&self) -> BucketHelper<'a> {
|
||||
|
|
|
@ -395,13 +395,13 @@ fn midnight_ts(date: NaiveDate, use_local_tz: bool) -> u64 {
|
|||
.expect("bad local midnight")
|
||||
.timestamp_millis() as u64;
|
||||
}
|
||||
midnight.timestamp_millis() as u64
|
||||
midnight.and_utc().timestamp_millis() as u64
|
||||
}
|
||||
|
||||
fn next_date(ts: u64) -> NaiveDate {
|
||||
NaiveDateTime::from_timestamp_millis(ts as i64)
|
||||
DateTime::<Utc>::from_timestamp_millis(ts as i64)
|
||||
.expect("bad timestamp")
|
||||
.date()
|
||||
.date_naive()
|
||||
.succ_opt()
|
||||
.expect("no next day")
|
||||
}
|
||||
|
|
|
@ -219,6 +219,11 @@ impl LayoutHelper {
|
|||
ret
|
||||
}
|
||||
|
||||
pub fn current_storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
|
||||
let ver = self.current();
|
||||
ver.nodes_of(position, ver.replication_factor).collect()
|
||||
}
|
||||
|
||||
pub fn trackers_hash(&self) -> Hash {
|
||||
self.trackers_hash
|
||||
}
|
||||
|
|
|
@ -540,19 +540,73 @@ impl RpcHelper {
|
|||
// ---- functions not related to MAKING RPCs, but just determining to what nodes
|
||||
// they should be made and in which order ----
|
||||
|
||||
/// Determine to what nodes, and in what order, requests to read a data block
|
||||
/// should be sent. All nodes in the Vec returned by this function are tried
|
||||
/// one by one until there is one that returns the block (in block/manager.rs).
|
||||
///
|
||||
/// We want to have the best chance of finding the block in as few requests
|
||||
/// as possible, and we want to avoid nodes that answer slowly.
|
||||
///
|
||||
/// Note that when there are several active layout versions, the block might
|
||||
/// be stored only by nodes of the latest version (in case of a block that was
|
||||
/// written after the layout change), or only by nodes of the oldest active
|
||||
/// version (for all blocks that were written before). So we have to try nodes
|
||||
/// of all layout versions. We also want to try nodes of all layout versions
|
||||
/// fast, so as to optimize the chance of finding the block fast.
|
||||
///
|
||||
/// Therefore, the strategy is the following:
|
||||
///
|
||||
/// 1. ask first all nodes of all currently active layout versions
|
||||
/// -> ask the preferred node in all layout versions (older to newer),
|
||||
/// then the second preferred onde in all verions, etc.
|
||||
/// -> we start by the oldest active layout version first, because a majority
|
||||
/// of blocks will have been saved before the layout change
|
||||
/// 2. ask all nodes of historical layout versions, for blocks which have not
|
||||
/// yet been transferred to their new storage nodes
|
||||
///
|
||||
/// The preference order, for each layout version, is given by `request_order`,
|
||||
/// based on factors such as nodes being in the same datacenter,
|
||||
/// having low ping, etc.
|
||||
pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec<Uuid> {
|
||||
let layout = self.0.layout.read().unwrap();
|
||||
|
||||
let mut ret = Vec::with_capacity(12);
|
||||
let ver_iter = layout
|
||||
.versions()
|
||||
.iter()
|
||||
.rev()
|
||||
.chain(layout.inner().old_versions.iter().rev());
|
||||
for ver in ver_iter {
|
||||
if ver.version > layout.sync_map_min() {
|
||||
continue;
|
||||
// Compute, for each layout version, the set of nodes that might store
|
||||
// the block, and put them in their preferred order as of `request_order`.
|
||||
let mut vernodes = layout.versions().iter().map(|ver| {
|
||||
let nodes = ver.nodes_of(position, ver.replication_factor);
|
||||
rpc_helper.request_order(layout.current(), nodes)
|
||||
});
|
||||
|
||||
let mut ret = if layout.versions().len() == 1 {
|
||||
// If we have only one active layout version, then these are the
|
||||
// only nodes we ask in step 1
|
||||
vernodes.next().unwrap()
|
||||
} else {
|
||||
let vernodes = vernodes.collect::<Vec<_>>();
|
||||
|
||||
let mut nodes = Vec::<Uuid>::with_capacity(12);
|
||||
for i in 0..layout.current().replication_factor {
|
||||
for vn in vernodes.iter() {
|
||||
if let Some(n) = vn.get(i) {
|
||||
if !nodes.contains(&n) {
|
||||
if *n == self.0.our_node_id {
|
||||
// it's always fast (almost free) to ask locally,
|
||||
// so always put that as first choice
|
||||
nodes.insert(0, *n);
|
||||
} else {
|
||||
nodes.push(*n);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
nodes
|
||||
};
|
||||
|
||||
// Second step: add nodes of older layout versions
|
||||
let old_ver_iter = layout.inner().old_versions.iter().rev();
|
||||
for ver in old_ver_iter {
|
||||
let nodes = ver.nodes_of(position, ver.replication_factor);
|
||||
for node in rpc_helper.request_order(layout.current(), nodes) {
|
||||
if !ret.contains(&node) {
|
||||
|
@ -560,6 +614,7 @@ impl RpcHelper {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
ret
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue