From 3bf2df622a070fe8f233bec6d60bd5cca995fbfc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 16:21:41 +0100 Subject: [PATCH] Time and metadata improvements --- Cargo.lock | 1 + src/api/api_server.rs | 5 ++- src/api/s3_copy.rs | 69 +++++++++++++++++++++++++++++---------- src/api/s3_delete.rs | 1 + src/api/s3_list.rs | 6 ++-- src/api/s3_put.rs | 26 ++++++++++++--- src/garage/cli.rs | 2 +- src/model/block.rs | 4 +-- src/rpc/membership.rs | 1 + src/table/crdt/lww.rs | 2 +- src/table/crdt/lww_map.rs | 2 +- src/table/gc.rs | 10 +++--- src/util/Cargo.toml | 1 + src/util/data.rs | 8 ----- src/util/lib.rs | 1 + src/util/time.rs | 16 +++++++++ 16 files changed, 111 insertions(+), 44 deletions(-) create mode 100644 src/util/time.rs diff --git a/Cargo.lock b/Cargo.lock index b0633e2..9fb368e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -608,6 +608,7 @@ name = "garage_util" version = "0.1.1" dependencies = [ "blake2", + "chrono", "err-derive", "fasthash", "futures", diff --git a/src/api/api_server.rs b/src/api/api_server.rs index c6b1d48..bc98686 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -137,7 +137,10 @@ async fn handler_inner(garage: Arc, req: Request) -> Result, + req: &Request, dest_bucket: &str, dest_key: &str, source_bucket: &str, @@ -42,25 +44,44 @@ pub async fn handle_copy( let new_uuid = gen_uuid(); let new_timestamp = now_msec(); - let dest_object_version = ObjectVersion { - uuid: new_uuid, - timestamp: new_timestamp, - state: ObjectVersionState::Complete(source_last_state.clone()), - }; - let dest_object = Object::new( - dest_bucket.to_string(), - dest_key.to_string(), - vec![dest_object_version], - ); - match source_last_state { + // Implement x-amz-metadata-directive: REPLACE + let old_meta = match source_last_state { ObjectVersionData::DeleteMarker => { return Err(Error::NotFound); } - ObjectVersionData::Inline(_meta, _bytes) => { + ObjectVersionData::Inline(meta, _bytes) => meta, + ObjectVersionData::FirstBlock(meta, _fbh) => meta, + }; + let new_meta = match req.headers().get("x-amz-metadata-directive") { + Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta { + headers: get_headers(req)?, + size: old_meta.size, + etag: old_meta.etag.clone(), + }, + _ => old_meta.clone(), + }; + + // Save object copy + match source_last_state { + ObjectVersionData::DeleteMarker => unreachable!(), + ObjectVersionData::Inline(_meta, bytes) => { + let dest_object_version = ObjectVersion { + uuid: new_uuid, + timestamp: new_timestamp, + state: ObjectVersionState::Complete(ObjectVersionData::Inline( + new_meta, + bytes.clone(), + )), + }; + let dest_object = Object::new( + dest_bucket.to_string(), + dest_key.to_string(), + vec![dest_object_version], + ); garage.object_table.insert(&dest_object).await?; } - ObjectVersionData::FirstBlock(meta, _first_block_hash) => { + ObjectVersionData::FirstBlock(_meta, first_block_hash) => { // Get block list from source version let source_version = garage .version_table @@ -74,7 +95,7 @@ pub async fn handle_copy( let tmp_dest_object_version = ObjectVersion { uuid: new_uuid, timestamp: new_timestamp, - state: ObjectVersionState::Uploading(meta.headers.clone()), + state: ObjectVersionState::Uploading(new_meta.headers.clone()), }; let tmp_dest_object = Object::new( dest_bucket.to_string(), @@ -120,12 +141,24 @@ pub async fn handle_copy( // it to update the modification timestamp for instance). If we did this concurrently // with the stuff before, the block's reference counts could be decremented before // they are incremented again for the new version, leading to data being deleted. + let dest_object_version = ObjectVersion { + uuid: new_uuid, + timestamp: new_timestamp, + state: ObjectVersionState::Complete(ObjectVersionData::FirstBlock( + new_meta, + *first_block_hash, + )), + }; + let dest_object = Object::new( + dest_bucket.to_string(), + dest_key.to_string(), + vec![dest_object_version], + ); garage.object_table.insert(&dest_object).await?; } } - let now = Utc::now(); // FIXME use the unix timestamp from above - let last_modified = now.to_rfc3339_opts(SecondsFormat::Secs, true); + let last_modified = msec_to_rfc3339(new_timestamp); let mut xml = String::new(); writeln!(&mut xml, r#""#).unwrap(); writeln!(&mut xml, r#""#).unwrap(); diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs index 7f75256..bb42d90 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3_delete.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use hyper::{Body, Request, Response}; use garage_util::data::*; +use garage_util::time::*; use garage_model::garage::Garage; use garage_model::object_table::*; diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index 16d96a4..4d6c32b 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -2,10 +2,10 @@ use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::fmt::Write; use std::sync::Arc; -use chrono::{DateTime, NaiveDateTime, SecondsFormat, Utc}; use hyper::{Body, Response}; use garage_util::error::Error as GarageError; +use garage_util::time::*; use garage_model::garage::Garage; use garage_model::object_table::*; @@ -247,9 +247,7 @@ pub async fn handle_list( } for (key, info) in result_keys.iter() { - let last_modif = NaiveDateTime::from_timestamp(info.last_modified as i64 / 1000, 0); - let last_modif = DateTime::::from_utc(last_modif, Utc); - let last_modif = last_modif.to_rfc3339_opts(SecondsFormat::Millis, true); + let last_modif = msec_to_rfc3339(info.last_modified); writeln!(&mut xml, "\t").unwrap(); writeln!( &mut xml, diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 6f675e3..ea3664b 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -10,6 +10,7 @@ use sha2::{Digest as Sha256Digest, Sha256}; use garage_table::*; use garage_util::data::*; use garage_util::error::Error as GarageError; +use garage_util::time::*; use garage_model::block::INLINE_THRESHOLD; use garage_model::block_ref_table::*; @@ -583,17 +584,19 @@ fn get_mime_type(req: &Request) -> Result { .to_string()) } -fn get_headers(req: &Request) -> Result { +pub(crate) fn get_headers(req: &Request) -> Result { let content_type = get_mime_type(req)?; - let other_headers = vec![ + let mut other = BTreeMap::new(); + + // Preserve standard headers + let standard_header = vec![ hyper::header::CACHE_CONTROL, hyper::header::CONTENT_DISPOSITION, hyper::header::CONTENT_ENCODING, hyper::header::CONTENT_LANGUAGE, hyper::header::EXPIRES, ]; - let mut other = BTreeMap::new(); - for h in other_headers.iter() { + for h in standard_header.iter() { if let Some(v) = req.headers().get(h) { match v.to_str() { Ok(v_str) => { @@ -605,6 +608,21 @@ fn get_headers(req: &Request) -> Result { } } } + + // Preserve x-amz-meta- headers + for (k, v) in req.headers().iter() { + if k.as_str().starts_with("x-amz-meta-") { + match v.to_str() { + Ok(v_str) => { + other.insert(k.to_string(), v_str.to_string()); + } + Err(e) => { + warn!("Discarding header {}, error in .to_str(): {}", k, e); + } + } + } + } + Ok(ObjectVersionHeaders { content_type, other, diff --git a/src/garage/cli.rs b/src/garage/cli.rs index 56f03c8..b5c91ff 100644 --- a/src/garage/cli.rs +++ b/src/garage/cli.rs @@ -5,8 +5,8 @@ use std::path::PathBuf; use serde::{Deserialize, Serialize}; use structopt::StructOpt; -use garage_util::data::*; use garage_util::error::Error; +use garage_util::time::*; use garage_rpc::membership::*; use garage_rpc::ring::*; diff --git a/src/model/block.rs b/src/model/block.rs index 5934f20..36ad867 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -11,9 +11,9 @@ use tokio::fs; use tokio::prelude::*; use tokio::sync::{watch, Mutex, Notify}; -use garage_util::data; use garage_util::data::*; use garage_util::error::Error; +use garage_util::time::*; use garage_rpc::membership::System; use garage_rpc::rpc_client::*; @@ -174,7 +174,7 @@ impl BlockManager { f.read_to_end(&mut data).await?; drop(f); - if data::blake2sum(&data[..]) != *hash { + if blake2sum(&data[..]) != *hash { let _lock = self.data_dir_lock.lock().await; warn!( "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index e1dc297..6636e50 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -18,6 +18,7 @@ use tokio::sync::Mutex; use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; +use garage_util::time::*; use crate::consul::get_consul_nodes; use crate::ring::*; diff --git a/src/table/crdt/lww.rs b/src/table/crdt/lww.rs index 9a3ab67..25ecdb0 100644 --- a/src/table/crdt/lww.rs +++ b/src/table/crdt/lww.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -use garage_util::data::now_msec; +use garage_util::time::now_msec; use crate::crdt::crdt::*; diff --git a/src/table/crdt/lww_map.rs b/src/table/crdt/lww_map.rs index bd40f36..7b37219 100644 --- a/src/table/crdt/lww_map.rs +++ b/src/table/crdt/lww_map.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -use garage_util::data::now_msec; +use garage_util::time::now_msec; use crate::crdt::crdt::*; diff --git a/src/table/gc.rs b/src/table/gc.rs index 5b7f1ee..c13c823 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -75,17 +75,19 @@ where match self.gc_loop_iter().await { Ok(true) => { // Stuff was done, loop imediately + continue; } Ok(false) => { - select! { - _ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (), - _ = must_exit.recv().fuse() => (), - } + // Nothing was done, sleep for some time (below) } Err(e) => { warn!("({}) Error doing GC: {}", self.data.name, e); } } + select! { + _ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (), + _ = must_exit.recv().fuse() => (), + } } Ok(()) } diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 35130c9..7bb7cb3 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -27,6 +27,7 @@ toml = "0.5" rmp-serde = "0.14.3" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_json = "1.0" +chrono = "0.4" futures = "0.3" futures-util = "0.3" diff --git a/src/util/data.rs b/src/util/data.rs index 0dbd6df..591b760 100644 --- a/src/util/data.rs +++ b/src/util/data.rs @@ -2,7 +2,6 @@ use rand::Rng; use serde::de::{self, Visitor}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::fmt; -use std::time::{SystemTime, UNIX_EPOCH}; #[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)] pub struct FixedBytes32([u8; 32]); @@ -119,13 +118,6 @@ pub fn gen_uuid() -> UUID { rand::thread_rng().gen::<[u8; 32]>().into() } -pub fn now_msec() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Fix your clock :o") - .as_millis() as u64 -} - // RMP serialization with names of fields and variants pub fn rmp_to_vec_all_named(val: &T) -> Result, rmp_serde::encode::Error> diff --git a/src/util/lib.rs b/src/util/lib.rs index 0bf09bf..e544a87 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -5,3 +5,4 @@ pub mod background; pub mod config; pub mod data; pub mod error; +pub mod time; diff --git a/src/util/time.rs b/src/util/time.rs new file mode 100644 index 0000000..148860e --- /dev/null +++ b/src/util/time.rs @@ -0,0 +1,16 @@ +use chrono::{SecondsFormat, TimeZone, Utc}; +use std::time::{SystemTime, UNIX_EPOCH}; + +pub fn now_msec() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Fix your clock :o") + .as_millis() as u64 +} + +pub fn msec_to_rfc3339(msecs: u64) -> String { + let secs = msecs as i64 / 1000; + let nanos = (msecs as i64 % 1000) as u32 * 1_000_000; + let timestamp = Utc.timestamp(secs, nanos); + timestamp.to_rfc3339_opts(SecondsFormat::Secs, true) +}