Compare commits
No commits in common. "main" and "domain-web-requests" have entirely different histories.
main
...
domain-web
13 changed files with 45 additions and 136 deletions
4
Makefile
4
Makefile
|
@ -2,7 +2,9 @@
|
||||||
|
|
||||||
all:
|
all:
|
||||||
clear
|
clear
|
||||||
cargo build
|
cargo build \
|
||||||
|
--config 'target.x86_64-unknown-linux-gnu.linker="clang"' \
|
||||||
|
--config 'target.x86_64-unknown-linux-gnu.rustflags=["-C", "link-arg=-fuse-ld=mold"]' \
|
||||||
|
|
||||||
# ----
|
# ----
|
||||||
|
|
||||||
|
|
|
@ -317,6 +317,7 @@ impl ApiHandler for S3ApiServer {
|
||||||
} => {
|
} => {
|
||||||
let query = ListPartsQuery {
|
let query = ListPartsQuery {
|
||||||
bucket_name: ctx.bucket_name.clone(),
|
bucket_name: ctx.bucket_name.clone(),
|
||||||
|
bucket_id,
|
||||||
key,
|
key,
|
||||||
upload_id,
|
upload_id,
|
||||||
part_number_marker: part_number_marker.map(|p| p.min(10000)),
|
part_number_marker: part_number_marker.map(|p| p.min(10000)),
|
||||||
|
|
|
@ -54,6 +54,7 @@ pub struct ListMultipartUploadsQuery {
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ListPartsQuery {
|
pub struct ListPartsQuery {
|
||||||
pub bucket_name: String,
|
pub bucket_name: String,
|
||||||
|
pub bucket_id: Uuid,
|
||||||
pub key: String,
|
pub key: String,
|
||||||
pub upload_id: String,
|
pub upload_id: String,
|
||||||
pub part_number_marker: Option<u64>,
|
pub part_number_marker: Option<u64>,
|
||||||
|
@ -1244,8 +1245,10 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_fetch_part_info() -> Result<(), Error> {
|
fn test_fetch_part_info() -> Result<(), Error> {
|
||||||
|
let uuid = Uuid::from([0x08; 32]);
|
||||||
let mut query = ListPartsQuery {
|
let mut query = ListPartsQuery {
|
||||||
bucket_name: "a".to_string(),
|
bucket_name: "a".to_string(),
|
||||||
|
bucket_id: uuid,
|
||||||
key: "a".to_string(),
|
key: "a".to_string(),
|
||||||
upload_id: "xx".to_string(),
|
upload_id: "xx".to_string(),
|
||||||
part_number_marker: None,
|
part_number_marker: None,
|
||||||
|
|
|
@ -430,16 +430,7 @@ pub async fn handle_complete_multipart_upload(
|
||||||
// Send response saying ok we're done
|
// Send response saying ok we're done
|
||||||
let result = s3_xml::CompleteMultipartUploadResult {
|
let result = s3_xml::CompleteMultipartUploadResult {
|
||||||
xmlns: (),
|
xmlns: (),
|
||||||
// FIXME: the location returned is not always correct:
|
location: None,
|
||||||
// - we always return https, but maybe some people do http
|
|
||||||
// - if root_domain is not specified, a full URL is not returned
|
|
||||||
location: garage
|
|
||||||
.config
|
|
||||||
.s3_api
|
|
||||||
.root_domain
|
|
||||||
.as_ref()
|
|
||||||
.map(|rd| s3_xml::Value(format!("https://{}.{}/{}", bucket_name, rd, key)))
|
|
||||||
.or(Some(s3_xml::Value(format!("/{}/{}", bucket_name, key)))),
|
|
||||||
bucket: s3_xml::Value(bucket_name.to_string()),
|
bucket: s3_xml::Value(bucket_name.to_string()),
|
||||||
key: s3_xml::Value(key),
|
key: s3_xml::Value(key),
|
||||||
etag: s3_xml::Value(format!("\"{}\"", etag)),
|
etag: s3_xml::Value(format!("\"{}\"", etag)),
|
||||||
|
|
|
@ -352,18 +352,6 @@ impl Endpoint {
|
||||||
_ => return Err(Error::bad_request("Unknown method")),
|
_ => return Err(Error::bad_request("Unknown method")),
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(x_id) = query.x_id.take() {
|
|
||||||
if x_id != res.name() {
|
|
||||||
// I think AWS ignores the x-id parameter.
|
|
||||||
// Let's make this at least be a warnin to help debugging.
|
|
||||||
warn!(
|
|
||||||
"x-id ({}) does not match parsed endpoint ({})",
|
|
||||||
x_id,
|
|
||||||
res.name()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(message) = query.nonempty_message() {
|
if let Some(message) = query.nonempty_message() {
|
||||||
debug!("Unused query parameter: {}", message)
|
debug!("Unused query parameter: {}", message)
|
||||||
}
|
}
|
||||||
|
@ -708,8 +696,7 @@ generateQueryParameters! {
|
||||||
"uploadId" => upload_id,
|
"uploadId" => upload_id,
|
||||||
"upload-id-marker" => upload_id_marker,
|
"upload-id-marker" => upload_id_marker,
|
||||||
"versionId" => version_id,
|
"versionId" => version_id,
|
||||||
"version-id-marker" => version_id_marker,
|
"version-id-marker" => version_id_marker
|
||||||
"x-id" => x_id
|
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -370,7 +370,7 @@ impl BlockManager {
|
||||||
prevent_compression: bool,
|
prevent_compression: bool,
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let who = self.system.cluster_layout().current_storage_nodes_of(&hash);
|
let who = self.replication.write_sets(&hash);
|
||||||
|
|
||||||
let compression_level = self.compression_level.filter(|_| !prevent_compression);
|
let compression_level = self.compression_level.filter(|_| !prevent_compression);
|
||||||
let (header, bytes) = DataBlock::from_buffer(data, compression_level)
|
let (header, bytes) = DataBlock::from_buffer(data, compression_level)
|
||||||
|
@ -396,7 +396,7 @@ impl BlockManager {
|
||||||
.rpc_helper()
|
.rpc_helper()
|
||||||
.try_write_many_sets(
|
.try_write_many_sets(
|
||||||
&self.endpoint,
|
&self.endpoint,
|
||||||
&[who],
|
who.as_ref(),
|
||||||
put_block_rpc,
|
put_block_rpc,
|
||||||
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
|
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
|
||||||
.with_drop_on_completion(permit)
|
.with_drop_on_completion(permit)
|
||||||
|
@ -668,12 +668,10 @@ impl BlockManager {
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
wrong_path: DataBlockPath,
|
wrong_path: DataBlockPath,
|
||||||
) -> Result<usize, Error> {
|
) -> Result<usize, Error> {
|
||||||
let data = self.read_block_from(hash, &wrong_path).await?;
|
|
||||||
self.lock_mutate(hash)
|
self.lock_mutate(hash)
|
||||||
.await
|
.await
|
||||||
.write_block_inner(hash, &data, self, Some(wrong_path))
|
.fix_block_location(hash, wrong_path, self)
|
||||||
.await?;
|
.await
|
||||||
Ok(data.as_parts_ref().1.len())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
|
async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
|
||||||
|
@ -829,6 +827,18 @@ impl BlockManagerLocked {
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn fix_block_location(
|
||||||
|
&self,
|
||||||
|
hash: &Hash,
|
||||||
|
wrong_path: DataBlockPath,
|
||||||
|
mgr: &BlockManager,
|
||||||
|
) -> Result<usize, Error> {
|
||||||
|
let data = mgr.read_block_from(hash, &wrong_path).await?;
|
||||||
|
self.write_block_inner(hash, &data, mgr, Some(wrong_path))
|
||||||
|
.await?;
|
||||||
|
Ok(data.as_parts_ref().1.len())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct DeleteOnDrop(Option<PathBuf>);
|
struct DeleteOnDrop(Option<PathBuf>);
|
||||||
|
|
|
@ -377,10 +377,7 @@ impl BlockResyncManager {
|
||||||
info!("Resync block {:?}: offloading and deleting", hash);
|
info!("Resync block {:?}: offloading and deleting", hash);
|
||||||
let existing_path = existing_path.unwrap();
|
let existing_path = existing_path.unwrap();
|
||||||
|
|
||||||
let mut who = manager
|
let mut who = manager.replication.storage_nodes(hash);
|
||||||
.system
|
|
||||||
.cluster_layout()
|
|
||||||
.current_storage_nodes_of(hash);
|
|
||||||
if who.len() < manager.replication.write_quorum() {
|
if who.len() < manager.replication.write_quorum() {
|
||||||
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
|
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
|
||||||
}
|
}
|
||||||
|
@ -458,25 +455,6 @@ impl BlockResyncManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
if rc.is_nonzero() && !exists {
|
if rc.is_nonzero() && !exists {
|
||||||
// The refcount is > 0, and the block is not present locally.
|
|
||||||
// We might need to fetch it from another node.
|
|
||||||
|
|
||||||
// First, check whether we are still supposed to store that
|
|
||||||
// block in the latest cluster layout version.
|
|
||||||
let storage_nodes = manager
|
|
||||||
.system
|
|
||||||
.cluster_layout()
|
|
||||||
.current_storage_nodes_of(&hash);
|
|
||||||
|
|
||||||
if !storage_nodes.contains(&manager.system.id) {
|
|
||||||
info!(
|
|
||||||
"Resync block {:?}: block is absent with refcount > 0, but it will drop to zero after all metadata is synced. Not fetching the block.",
|
|
||||||
hash
|
|
||||||
);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
// We know we need the block. Fetch it.
|
|
||||||
info!(
|
info!(
|
||||||
"Resync block {:?}: fetching absent but needed block (refcount > 0)",
|
"Resync block {:?}: fetching absent but needed block (refcount > 0)",
|
||||||
hash
|
hash
|
||||||
|
|
|
@ -13,6 +13,7 @@ static GARAGE_TEST_SECRET: &str =
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone)]
|
#[derive(Debug, Default, Clone)]
|
||||||
pub struct Key {
|
pub struct Key {
|
||||||
|
pub name: Option<String>,
|
||||||
pub id: String,
|
pub id: String,
|
||||||
pub secret: String,
|
pub secret: String,
|
||||||
}
|
}
|
||||||
|
@ -212,7 +213,10 @@ api_bind_addr = "127.0.0.1:{admin_port}"
|
||||||
assert!(!key.id.is_empty(), "Invalid key: Key ID is empty");
|
assert!(!key.id.is_empty(), "Invalid key: Key ID is empty");
|
||||||
assert!(!key.secret.is_empty(), "Invalid key: Key secret is empty");
|
assert!(!key.secret.is_empty(), "Invalid key: Key secret is empty");
|
||||||
|
|
||||||
key
|
Key {
|
||||||
|
name: maybe_name.map(String::from),
|
||||||
|
..key
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -329,7 +329,7 @@ impl Garage {
|
||||||
|
|
||||||
pub async fn locked_helper(&self) -> helper::locked::LockedHelper {
|
pub async fn locked_helper(&self) -> helper::locked::LockedHelper {
|
||||||
let lock = self.bucket_lock.lock().await;
|
let lock = self.bucket_lock.lock().await;
|
||||||
helper::locked::LockedHelper(self, Some(lock))
|
helper::locked::LockedHelper(self, lock)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,16 +27,9 @@ use crate::permission::BucketKeyPerm;
|
||||||
/// See issues: #649, #723
|
/// See issues: #649, #723
|
||||||
pub struct LockedHelper<'a>(
|
pub struct LockedHelper<'a>(
|
||||||
pub(crate) &'a Garage,
|
pub(crate) &'a Garage,
|
||||||
pub(crate) Option<tokio::sync::MutexGuard<'a, ()>>,
|
pub(crate) tokio::sync::MutexGuard<'a, ()>,
|
||||||
);
|
);
|
||||||
|
|
||||||
impl<'a> Drop for LockedHelper<'a> {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
// make it explicit that the mutexguard lives until here
|
|
||||||
drop(self.1.take())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(clippy::ptr_arg)]
|
#[allow(clippy::ptr_arg)]
|
||||||
impl<'a> LockedHelper<'a> {
|
impl<'a> LockedHelper<'a> {
|
||||||
pub fn bucket(&self) -> BucketHelper<'a> {
|
pub fn bucket(&self) -> BucketHelper<'a> {
|
||||||
|
|
|
@ -395,13 +395,13 @@ fn midnight_ts(date: NaiveDate, use_local_tz: bool) -> u64 {
|
||||||
.expect("bad local midnight")
|
.expect("bad local midnight")
|
||||||
.timestamp_millis() as u64;
|
.timestamp_millis() as u64;
|
||||||
}
|
}
|
||||||
midnight.and_utc().timestamp_millis() as u64
|
midnight.timestamp_millis() as u64
|
||||||
}
|
}
|
||||||
|
|
||||||
fn next_date(ts: u64) -> NaiveDate {
|
fn next_date(ts: u64) -> NaiveDate {
|
||||||
DateTime::<Utc>::from_timestamp_millis(ts as i64)
|
NaiveDateTime::from_timestamp_millis(ts as i64)
|
||||||
.expect("bad timestamp")
|
.expect("bad timestamp")
|
||||||
.date_naive()
|
.date()
|
||||||
.succ_opt()
|
.succ_opt()
|
||||||
.expect("no next day")
|
.expect("no next day")
|
||||||
}
|
}
|
||||||
|
|
|
@ -219,11 +219,6 @@ impl LayoutHelper {
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn current_storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
|
|
||||||
let ver = self.current();
|
|
||||||
ver.nodes_of(position, ver.replication_factor).collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn trackers_hash(&self) -> Hash {
|
pub fn trackers_hash(&self) -> Hash {
|
||||||
self.trackers_hash
|
self.trackers_hash
|
||||||
}
|
}
|
||||||
|
|
|
@ -540,73 +540,19 @@ impl RpcHelper {
|
||||||
// ---- functions not related to MAKING RPCs, but just determining to what nodes
|
// ---- functions not related to MAKING RPCs, but just determining to what nodes
|
||||||
// they should be made and in which order ----
|
// they should be made and in which order ----
|
||||||
|
|
||||||
/// Determine to what nodes, and in what order, requests to read a data block
|
|
||||||
/// should be sent. All nodes in the Vec returned by this function are tried
|
|
||||||
/// one by one until there is one that returns the block (in block/manager.rs).
|
|
||||||
///
|
|
||||||
/// We want to have the best chance of finding the block in as few requests
|
|
||||||
/// as possible, and we want to avoid nodes that answer slowly.
|
|
||||||
///
|
|
||||||
/// Note that when there are several active layout versions, the block might
|
|
||||||
/// be stored only by nodes of the latest version (in case of a block that was
|
|
||||||
/// written after the layout change), or only by nodes of the oldest active
|
|
||||||
/// version (for all blocks that were written before). So we have to try nodes
|
|
||||||
/// of all layout versions. We also want to try nodes of all layout versions
|
|
||||||
/// fast, so as to optimize the chance of finding the block fast.
|
|
||||||
///
|
|
||||||
/// Therefore, the strategy is the following:
|
|
||||||
///
|
|
||||||
/// 1. ask first all nodes of all currently active layout versions
|
|
||||||
/// -> ask the preferred node in all layout versions (older to newer),
|
|
||||||
/// then the second preferred onde in all verions, etc.
|
|
||||||
/// -> we start by the oldest active layout version first, because a majority
|
|
||||||
/// of blocks will have been saved before the layout change
|
|
||||||
/// 2. ask all nodes of historical layout versions, for blocks which have not
|
|
||||||
/// yet been transferred to their new storage nodes
|
|
||||||
///
|
|
||||||
/// The preference order, for each layout version, is given by `request_order`,
|
|
||||||
/// based on factors such as nodes being in the same datacenter,
|
|
||||||
/// having low ping, etc.
|
|
||||||
pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec<Uuid> {
|
pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec<Uuid> {
|
||||||
let layout = self.0.layout.read().unwrap();
|
let layout = self.0.layout.read().unwrap();
|
||||||
|
|
||||||
// Compute, for each layout version, the set of nodes that might store
|
let mut ret = Vec::with_capacity(12);
|
||||||
// the block, and put them in their preferred order as of `request_order`.
|
let ver_iter = layout
|
||||||
let mut vernodes = layout.versions().iter().map(|ver| {
|
.versions()
|
||||||
let nodes = ver.nodes_of(position, ver.replication_factor);
|
.iter()
|
||||||
rpc_helper.request_order(layout.current(), nodes)
|
.rev()
|
||||||
});
|
.chain(layout.inner().old_versions.iter().rev());
|
||||||
|
for ver in ver_iter {
|
||||||
let mut ret = if layout.versions().len() == 1 {
|
if ver.version > layout.sync_map_min() {
|
||||||
// If we have only one active layout version, then these are the
|
continue;
|
||||||
// only nodes we ask in step 1
|
|
||||||
vernodes.next().unwrap()
|
|
||||||
} else {
|
|
||||||
let vernodes = vernodes.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let mut nodes = Vec::<Uuid>::with_capacity(12);
|
|
||||||
for i in 0..layout.current().replication_factor {
|
|
||||||
for vn in vernodes.iter() {
|
|
||||||
if let Some(n) = vn.get(i) {
|
|
||||||
if !nodes.contains(&n) {
|
|
||||||
if *n == self.0.our_node_id {
|
|
||||||
// it's always fast (almost free) to ask locally,
|
|
||||||
// so always put that as first choice
|
|
||||||
nodes.insert(0, *n);
|
|
||||||
} else {
|
|
||||||
nodes.push(*n);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
nodes
|
|
||||||
};
|
|
||||||
|
|
||||||
// Second step: add nodes of older layout versions
|
|
||||||
let old_ver_iter = layout.inner().old_versions.iter().rev();
|
|
||||||
for ver in old_ver_iter {
|
|
||||||
let nodes = ver.nodes_of(position, ver.replication_factor);
|
let nodes = ver.nodes_of(position, ver.replication_factor);
|
||||||
for node in rpc_helper.request_order(layout.current(), nodes) {
|
for node in rpc_helper.request_order(layout.current(), nodes) {
|
||||||
if !ret.contains(&node) {
|
if !ret.contains(&node) {
|
||||||
|
@ -614,7 +560,6 @@ impl RpcHelper {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue