Try to solve persistence issues #259
15 changed files with 68 additions and 66 deletions
|
@ -200,12 +200,7 @@ pub fn find_matching_cors_rule<'a>(
|
||||||
None => vec![],
|
None => vec![],
|
||||||
};
|
};
|
||||||
return Ok(cors_config.iter().find(|rule| {
|
return Ok(cors_config.iter().find(|rule| {
|
||||||
cors_rule_matches(
|
cors_rule_matches(rule, origin, req.method().as_ref(), request_headers.iter())
|
||||||
rule,
|
|
||||||
origin,
|
|
||||||
&req.method().to_string(),
|
|
||||||
request_headers.iter(),
|
|
||||||
)
|
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1042,12 +1042,12 @@ mod tests {
|
||||||
|
|
||||||
query.common.prefix = "a/".to_string();
|
query.common.prefix = "a/".to_string();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
common_prefix(&objs.get(0).unwrap(), &query.common),
|
common_prefix(objs.get(0).unwrap(), &query.common),
|
||||||
Some("a/b/")
|
Some("a/b/")
|
||||||
);
|
);
|
||||||
|
|
||||||
query.common.prefix = "a/b/".to_string();
|
query.common.prefix = "a/b/".to_string();
|
||||||
assert_eq!(common_prefix(&objs.get(0).unwrap(), &query.common), None);
|
assert_eq!(common_prefix(objs.get(0).unwrap(), &query.common), None);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -1272,7 +1272,7 @@ mod tests {
|
||||||
Version {
|
Version {
|
||||||
bucket_id: uuid,
|
bucket_id: uuid,
|
||||||
key: "a".to_string(),
|
key: "a".to_string(),
|
||||||
uuid: uuid,
|
uuid,
|
||||||
deleted: false.into(),
|
deleted: false.into(),
|
||||||
blocks: crdt::Map::<VersionBlockKey, VersionBlock>::from_iter(blocks),
|
blocks: crdt::Map::<VersionBlockKey, VersionBlock>::from_iter(blocks),
|
||||||
parts_etags: crdt::Map::<u64, String>::from_iter(etags),
|
parts_etags: crdt::Map::<u64, String>::from_iter(etags),
|
||||||
|
|
|
@ -259,8 +259,7 @@ impl RoutingRuleInner {
|
||||||
let has_prefix = self
|
let has_prefix = self
|
||||||
.condition
|
.condition
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(|c| c.prefix.as_ref())
|
.and_then(|c| c.prefix.as_ref())
|
||||||
.flatten()
|
|
||||||
.is_some();
|
.is_some();
|
||||||
self.redirect.validate(has_prefix)
|
self.redirect.validate(has_prefix)
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,7 +51,7 @@ pub async fn check_payload_signature(
|
||||||
|
|
||||||
let canonical_request = canonical_request(
|
let canonical_request = canonical_request(
|
||||||
request.method(),
|
request.method(),
|
||||||
&request.uri().path().to_string(),
|
request.uri().path(),
|
||||||
&canonical_query_string(request.uri()),
|
&canonical_query_string(request.uri()),
|
||||||
&headers,
|
&headers,
|
||||||
&authorization.signed_headers,
|
&authorization.signed_headers,
|
||||||
|
|
|
@ -115,7 +115,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
|
||||||
} else {
|
} else {
|
||||||
let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir)
|
let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir)
|
||||||
.err_context(READ_KEY_ERROR)?;
|
.err_context(READ_KEY_ERROR)?;
|
||||||
if let Some(a) = config.as_ref().map(|c| c.rpc_public_addr).flatten() {
|
if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr) {
|
||||||
(node_id, a)
|
(node_id, a)
|
||||||
} else {
|
} else {
|
||||||
let default_addr = SocketAddr::new(
|
let default_addr = SocketAddr::new(
|
||||||
|
|
|
@ -27,7 +27,7 @@ async fn test_bucket_all() {
|
||||||
.buckets
|
.buckets
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.into_iter()
|
.iter()
|
||||||
.filter(|x| x.name.as_ref().is_some())
|
.filter(|x| x.name.as_ref().is_some())
|
||||||
.find(|x| x.name.as_ref().unwrap() == "hello")
|
.find(|x| x.name.as_ref().unwrap() == "hello")
|
||||||
.is_some());
|
.is_some());
|
||||||
|
@ -79,7 +79,7 @@ async fn test_bucket_all() {
|
||||||
.buckets
|
.buckets
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.into_iter()
|
.iter()
|
||||||
.filter(|x| x.name.as_ref().is_some())
|
.filter(|x| x.name.as_ref().is_some())
|
||||||
.find(|x| x.name.as_ref().unwrap() == "hello")
|
.find(|x| x.name.as_ref().unwrap() == "hello")
|
||||||
.is_none());
|
.is_none());
|
||||||
|
|
|
@ -527,8 +527,8 @@ async fn test_listmultipart() {
|
||||||
upnext = r.next_upload_id_marker;
|
upnext = r.next_upload_id_marker;
|
||||||
|
|
||||||
loopcnt += 1;
|
loopcnt += 1;
|
||||||
upcnt += r.uploads.unwrap_or(vec![]).len();
|
upcnt += r.uploads.unwrap_or_default().len();
|
||||||
pfxcnt += r.common_prefixes.unwrap_or(vec![]).len();
|
pfxcnt += r.common_prefixes.unwrap_or_default().len();
|
||||||
|
|
||||||
if next.is_none() {
|
if next.is_none() {
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -124,7 +124,7 @@ async fn test_uploadlistpart() {
|
||||||
|
|
||||||
assert!(r.part_number_marker.is_none());
|
assert!(r.part_number_marker.is_none());
|
||||||
assert!(r.next_part_number_marker.is_some());
|
assert!(r.next_part_number_marker.is_some());
|
||||||
assert_eq!(r.max_parts, 1 as i32);
|
assert_eq!(r.max_parts, 1_i32);
|
||||||
assert!(r.is_truncated);
|
assert!(r.is_truncated);
|
||||||
assert_eq!(r.key.unwrap(), "a");
|
assert_eq!(r.key.unwrap(), "a");
|
||||||
assert_eq!(r.upload_id.unwrap().as_str(), uid.as_str());
|
assert_eq!(r.upload_id.unwrap().as_str(), uid.as_str());
|
||||||
|
@ -146,7 +146,7 @@ async fn test_uploadlistpart() {
|
||||||
r2.part_number_marker.as_ref().unwrap(),
|
r2.part_number_marker.as_ref().unwrap(),
|
||||||
r.next_part_number_marker.as_ref().unwrap()
|
r.next_part_number_marker.as_ref().unwrap()
|
||||||
);
|
);
|
||||||
assert_eq!(r2.max_parts, 1 as i32);
|
assert_eq!(r2.max_parts, 1_i32);
|
||||||
assert!(r2.is_truncated);
|
assert!(r2.is_truncated);
|
||||||
assert_eq!(r2.key.unwrap(), "a");
|
assert_eq!(r2.key.unwrap(), "a");
|
||||||
assert_eq!(r2.upload_id.unwrap().as_str(), uid.as_str());
|
assert_eq!(r2.upload_id.unwrap().as_str(), uid.as_str());
|
||||||
|
|
|
@ -38,7 +38,6 @@ use crate::garage::Garage;
|
||||||
/// Size under which data will be stored inlined in database instead of as files
|
/// Size under which data will be stored inlined in database instead of as files
|
||||||
pub const INLINE_THRESHOLD: usize = 3072;
|
pub const INLINE_THRESHOLD: usize = 3072;
|
||||||
|
|
||||||
pub const BACKGROUND_WORKERS: u64 = 1;
|
|
||||||
pub const BACKGROUND_TRANQUILITY: u32 = 2;
|
pub const BACKGROUND_TRANQUILITY: u32 = 2;
|
||||||
|
|
||||||
// Timeout for RPCs that read and write blocks to remote nodes
|
// Timeout for RPCs that read and write blocks to remote nodes
|
||||||
|
@ -512,17 +511,14 @@ impl BlockManager {
|
||||||
// ---- Resync loop ----
|
// ---- Resync loop ----
|
||||||
|
|
||||||
pub fn spawn_background_worker(self: Arc<Self>) {
|
pub fn spawn_background_worker(self: Arc<Self>) {
|
||||||
// Launch n simultaneous workers for background resync loop preprocessing
|
// Launch a background workers for background resync loop processing
|
||||||
for i in 0..BACKGROUND_WORKERS {
|
let background = self.system.background.clone();
|
||||||
let bm2 = self.clone();
|
tokio::spawn(async move {
|
||||||
let background = self.system.background.clone();
|
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||||
tokio::spawn(async move {
|
background.spawn_worker("block resync worker".into(), move |must_exit| {
|
||||||
tokio::time::sleep(Duration::from_secs(10 * (i + 1))).await;
|
self.resync_loop(must_exit)
|
||||||
background.spawn_worker(format!("block resync worker {}", i), move |must_exit| {
|
|
||||||
bm2.resync_loop(must_exit)
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
}
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> {
|
fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> {
|
||||||
|
@ -566,9 +562,12 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> {
|
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> {
|
||||||
if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
|
if let Some(first_pair_res) = self.resync_queue.iter().next() {
|
||||||
|
let (time_bytes, hash_bytes) = first_pair_res?;
|
||||||
|
|
||||||
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
|
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
|
||||||
let now = now_msec();
|
let now = now_msec();
|
||||||
|
|
||||||
if now >= time_msec {
|
if now >= time_msec {
|
||||||
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
|
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
|
||||||
|
|
||||||
|
@ -579,6 +578,9 @@ impl BlockManager {
|
||||||
// don't do resync and return early, but still
|
// don't do resync and return early, but still
|
||||||
// make sure the item is still in queue at expected time
|
// make sure the item is still in queue at expected time
|
||||||
self.put_to_resync_at(&hash, ec.next_try())?;
|
self.put_to_resync_at(&hash, ec.next_try())?;
|
||||||
|
// ec.next_try() > now >= time_msec, so this remove
|
||||||
|
// is not removing the one we added just above
|
||||||
|
self.resync_queue.remove(time_bytes)?;
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -609,20 +611,25 @@ impl BlockManager {
|
||||||
warn!("Error when resyncing {:?}: {}", hash, e);
|
warn!("Error when resyncing {:?}: {}", hash, e);
|
||||||
|
|
||||||
let err_counter = match self.resync_errors.get(hash.as_slice())? {
|
let err_counter = match self.resync_errors.get(hash.as_slice())? {
|
||||||
Some(ec) => ErrorCounter::decode(ec).add1(),
|
Some(ec) => ErrorCounter::decode(ec).add1(now + 1),
|
||||||
None => ErrorCounter::new(),
|
None => ErrorCounter::new(now + 1),
|
||||||
};
|
};
|
||||||
|
|
||||||
self.put_to_resync_at(&hash, err_counter.next_try())?;
|
|
||||||
self.resync_errors
|
self.resync_errors
|
||||||
.insert(hash.as_slice(), err_counter.encode())?;
|
.insert(hash.as_slice(), err_counter.encode())?;
|
||||||
|
|
||||||
|
self.put_to_resync_at(&hash, err_counter.next_try())?;
|
||||||
|
// err_counter.next_try() >= now + 1 > now,
|
||||||
|
// the entry we remove from the queue is not
|
||||||
|
// the entry we inserted with put_to_resync_at
|
||||||
|
self.resync_queue.remove(time_bytes)?;
|
||||||
} else {
|
} else {
|
||||||
self.resync_errors.remove(hash.as_slice())?;
|
self.resync_errors.remove(hash.as_slice())?;
|
||||||
|
self.resync_queue.remove(time_bytes)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(true)
|
Ok(true)
|
||||||
} else {
|
} else {
|
||||||
self.resync_queue.insert(time_bytes, hash_bytes)?;
|
|
||||||
let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
|
let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
|
||||||
select! {
|
select! {
|
||||||
_ = delay.fuse() => {},
|
_ = delay.fuse() => {},
|
||||||
|
@ -862,9 +869,11 @@ impl BlockManagerLocked {
|
||||||
let data = data.inner_buffer();
|
let data = data.inner_buffer();
|
||||||
|
|
||||||
let mut path = mgr.block_dir(hash);
|
let mut path = mgr.block_dir(hash);
|
||||||
fs::create_dir_all(&path).await?;
|
let directory = path.clone();
|
||||||
|
|
||||||
path.push(hex::encode(hash));
|
path.push(hex::encode(hash));
|
||||||
|
|
||||||
|
fs::create_dir_all(&directory).await?;
|
||||||
|
|
||||||
let to_delete = match (mgr.is_block_compressed(hash).await, compressed) {
|
let to_delete = match (mgr.is_block_compressed(hash).await, compressed) {
|
||||||
(Ok(true), _) => return Ok(BlockRpc::Ok),
|
(Ok(true), _) => return Ok(BlockRpc::Ok),
|
||||||
(Ok(false), false) => return Ok(BlockRpc::Ok),
|
(Ok(false), false) => return Ok(BlockRpc::Ok),
|
||||||
|
@ -885,6 +894,7 @@ impl BlockManagerLocked {
|
||||||
path2.set_extension("tmp");
|
path2.set_extension("tmp");
|
||||||
let mut f = fs::File::create(&path2).await?;
|
let mut f = fs::File::create(&path2).await?;
|
||||||
f.write_all(data).await?;
|
f.write_all(data).await?;
|
||||||
|
f.sync_all().await?;
|
||||||
drop(f);
|
drop(f);
|
||||||
|
|
||||||
fs::rename(path2, path).await?;
|
fs::rename(path2, path).await?;
|
||||||
|
@ -892,6 +902,19 @@ impl BlockManagerLocked {
|
||||||
fs::remove_file(to_delete).await?;
|
fs::remove_file(to_delete).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We want to ensure that when this function returns, data is properly persisted
|
||||||
|
// to disk. The first step is the sync_all above that does an fsync on the data file.
|
||||||
|
// Now, we do an fsync on the containing directory, to ensure that the rename
|
||||||
|
// is persisted properly. See:
|
||||||
|
// http://thedjbway.b0llix.net/qmail/syncdir.html
|
||||||
|
let dir = fs::OpenOptions::new()
|
||||||
|
.read(true)
|
||||||
|
.mode(0)
|
||||||
|
.open(directory)
|
||||||
|
.await?;
|
||||||
|
dir.sync_all().await?;
|
||||||
|
drop(dir);
|
||||||
|
|
||||||
Ok(BlockRpc::Ok)
|
Ok(BlockRpc::Ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1037,19 +1060,13 @@ struct ErrorCounter {
|
||||||
last_try: u64,
|
last_try: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for ErrorCounter {
|
impl ErrorCounter {
|
||||||
fn default() -> Self {
|
fn new(now: u64) -> Self {
|
||||||
Self {
|
Self {
|
||||||
errors: 1,
|
errors: 1,
|
||||||
last_try: now_msec(),
|
last_try: now,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl ErrorCounter {
|
|
||||||
fn new() -> Self {
|
|
||||||
Self::default()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn decode(data: sled::IVec) -> Self {
|
fn decode(data: sled::IVec) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
@ -1065,10 +1082,10 @@ impl ErrorCounter {
|
||||||
.concat()
|
.concat()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add1(self) -> Self {
|
fn add1(self, now: u64) -> Self {
|
||||||
Self {
|
Self {
|
||||||
errors: self.errors + 1,
|
errors: self.errors + 1,
|
||||||
last_try: now_msec(),
|
last_try: now,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,8 +30,7 @@ impl<'a> BucketHelper<'a> {
|
||||||
// the AWS spec, and hex-encoded UUIDs are 64 chars long.
|
// the AWS spec, and hex-encoded UUIDs are 64 chars long.
|
||||||
let hexbucket = hex::decode(bucket_name.as_str())
|
let hexbucket = hex::decode(bucket_name.as_str())
|
||||||
.ok()
|
.ok()
|
||||||
.map(|by| Uuid::try_from(&by))
|
.and_then(|by| Uuid::try_from(&by));
|
||||||
.flatten();
|
|
||||||
if let Some(bucket_id) = hexbucket {
|
if let Some(bucket_id) = hexbucket {
|
||||||
Ok(self
|
Ok(self
|
||||||
.0
|
.0
|
||||||
|
@ -46,8 +45,7 @@ impl<'a> BucketHelper<'a> {
|
||||||
.bucket_alias_table
|
.bucket_alias_table
|
||||||
.get(&EmptyKey, bucket_name)
|
.get(&EmptyKey, bucket_name)
|
||||||
.await?
|
.await?
|
||||||
.map(|x| *x.state.get())
|
.and_then(|x| *x.state.get()))
|
||||||
.flatten())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -106,8 +106,7 @@ impl Key {
|
||||||
/// Get permissions for a bucket
|
/// Get permissions for a bucket
|
||||||
pub fn bucket_permissions(&self, bucket: &Uuid) -> BucketKeyPerm {
|
pub fn bucket_permissions(&self, bucket: &Uuid) -> BucketKeyPerm {
|
||||||
self.params()
|
self.params()
|
||||||
.map(|params| params.authorized_buckets.get(bucket))
|
.and_then(|params| params.authorized_buckets.get(bucket))
|
||||||
.flatten()
|
|
||||||
.cloned()
|
.cloned()
|
||||||
.unwrap_or(BucketKeyPerm::NO_PERMISSIONS)
|
.unwrap_or(BucketKeyPerm::NO_PERMISSIONS)
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,10 +51,8 @@ pub async fn get_consul_nodes(
|
||||||
let pubkey = ent
|
let pubkey = ent
|
||||||
.node_meta
|
.node_meta
|
||||||
.get("pubkey")
|
.get("pubkey")
|
||||||
.map(|k| hex::decode(&k).ok())
|
.and_then(|k| hex::decode(&k).ok())
|
||||||
.flatten()
|
.and_then(|k| NodeID::from_slice(&k[..]));
|
||||||
.map(|k| NodeID::from_slice(&k[..]))
|
|
||||||
.flatten();
|
|
||||||
if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
|
if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
|
||||||
ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
|
ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -63,10 +63,8 @@ pub async fn get_kubernetes_nodes(
|
||||||
let pubkey = &node
|
let pubkey = &node
|
||||||
.metadata
|
.metadata
|
||||||
.name
|
.name
|
||||||
.map(|k| hex::decode(&k).ok())
|
.and_then(|k| hex::decode(&k).ok())
|
||||||
.flatten()
|
.and_then(|k| NodeID::from_slice(&k[..]));
|
||||||
.map(|k| NodeID::from_slice(&k[..]))
|
|
||||||
.flatten();
|
|
||||||
|
|
||||||
if let Some(pubkey) = pubkey {
|
if let Some(pubkey) = pubkey {
|
||||||
ret.push((*pubkey, SocketAddr::new(node.spec.address, node.spec.port)))
|
ret.push((*pubkey, SocketAddr::new(node.spec.address, node.spec.port)))
|
||||||
|
|
|
@ -322,8 +322,7 @@ impl RpcHelper {
|
||||||
let peer_avg_ping = peer_list
|
let peer_avg_ping = peer_list
|
||||||
.iter()
|
.iter()
|
||||||
.find(|x| x.id.as_ref() == to.as_slice())
|
.find(|x| x.id.as_ref() == to.as_slice())
|
||||||
.map(|pi| pi.avg_ping)
|
.and_then(|pi| pi.avg_ping)
|
||||||
.flatten()
|
|
||||||
.unwrap_or_else(|| Duration::from_secs(1));
|
.unwrap_or_else(|| Duration::from_secs(1));
|
||||||
(
|
(
|
||||||
to != self.0.our_node_id,
|
to != self.0.our_node_id,
|
||||||
|
|
|
@ -175,8 +175,7 @@ async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response
|
||||||
.bucket_alias_table
|
.bucket_alias_table
|
||||||
.get(&EmptyKey, &bucket_name.to_string())
|
.get(&EmptyKey, &bucket_name.to_string())
|
||||||
.await?
|
.await?
|
||||||
.map(|x| x.state.take())
|
.and_then(|x| x.state.take())
|
||||||
.flatten()
|
|
||||||
.ok_or(Error::NotFound)?;
|
.ok_or(Error::NotFound)?;
|
||||||
|
|
||||||
// Check bucket isn't deleted and has website access enabled
|
// Check bucket isn't deleted and has website access enabled
|
||||||
|
|
Loading…
Reference in a new issue