WIP add content defined chunking #42

Closed
trinity-1686a wants to merge 42 commits from content-defined-chunking into master
2 changed files with 18 additions and 12 deletions
Showing only changes of commit bdcbdd1cd8 - Show all commits

View file

@ -42,7 +42,7 @@ pub fn parse_list_objects_query(
Ok(ListObjectsQuery { Ok(ListObjectsQuery {
is_v2: params.get("list-type").map(|x| x == "2").unwrap_or(false), is_v2: params.get("list-type").map(|x| x == "2").unwrap_or(false),
bucket: bucket.to_string(), bucket: bucket.to_string(),
delimiter: params.get("delimiter").cloned(), delimiter: params.get("delimiter").filter(|x| !x.is_empty()).cloned(),
max_keys: params max_keys: params
.get("max-keys") .get("max-keys")
.map(|x| { .map(|x| {

View file

@ -165,7 +165,7 @@ impl BlockManager {
Ok(f) => f, Ok(f) => f,
Err(e) => { Err(e) => {
// Not found but maybe we should have had it ?? // Not found but maybe we should have had it ??
self.put_to_resync(hash, 0)?; self.put_to_resync(hash, Duration::from_millis(0))?;
return Err(Into::into(e)); return Err(Into::into(e));
} }
}; };
@ -175,9 +175,11 @@ impl BlockManager {
if data::blake2sum(&data[..]) != *hash { if data::blake2sum(&data[..]) != *hash {
let _lock = self.data_dir_lock.lock().await; let _lock = self.data_dir_lock.lock().await;
warn!("Block {:?} is corrupted. Deleting and resyncing.", hash); warn!("Block {:?} is corrupted. Renaming to .corrupted and resyncing.", hash);
fs::remove_file(path).await?; let mut path2 = path.clone();
self.put_to_resync(&hash, 0)?; path2.set_extension(".corrupted");
fs::rename(path, path2).await?;
self.put_to_resync(&hash, Duration::from_millis(0))?;
return Err(Error::CorruptData(*hash)); return Err(Error::CorruptData(*hash));
} }
@ -215,7 +217,7 @@ impl BlockManager {
let old_rc = self.rc.get(&hash)?; let old_rc = self.rc.get(&hash)?;
self.rc.merge(&hash, vec![1])?; self.rc.merge(&hash, vec![1])?;
if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?; self.put_to_resync(&hash, BLOCK_RW_TIMEOUT)?;
} }
Ok(()) Ok(())
} }
@ -223,13 +225,13 @@ impl BlockManager {
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
let new_rc = self.rc.merge(&hash, vec![0])?; let new_rc = self.rc.merge(&hash, vec![0])?;
if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
self.put_to_resync(&hash, 0)?; self.put_to_resync(&hash, Duration::from_secs(0))?;
} }
Ok(()) Ok(())
} }
fn put_to_resync(&self, hash: &Hash, delay_millis: u64) -> Result<(), Error> { fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> {
let when = now_msec() + delay_millis; let when = now_msec() + delay.as_millis() as u64;
trace!("Put resync_queue: {} {:?}", when, hash); trace!("Put resync_queue: {} {:?}", when, hash);
let mut key = u64::to_be_bytes(when).to_vec(); let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref()); key.extend(hash.as_ref());
@ -252,7 +254,7 @@ impl BlockManager {
if let Err(e) = self.resync_iter(&hash).await { if let Err(e) = self.resync_iter(&hash).await {
warn!("Failed to resync block {:?}, retrying later: {}", hash, e); warn!("Failed to resync block {:?}, retrying later: {}", hash, e);
self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?; self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?;
n_failures += 1; n_failures += 1;
if n_failures >= 10 { if n_failures >= 10 {
warn!("Too many resync failures, throttling."); warn!("Too many resync failures, throttling.");
@ -281,6 +283,8 @@ impl BlockManager {
} }
async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> { async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> {
let lock = self.data_dir_lock.lock().await;
let path = self.block_path(hash); let path = self.block_path(hash);
let exists = fs::metadata(&path).await.is_ok(); let exists = fs::metadata(&path).await.is_ok();
@ -360,6 +364,8 @@ impl BlockManager {
} }
if needed && !exists { if needed && !exists {
drop(lock);
// TODO find a way to not do this if they are sending it to us // TODO find a way to not do this if they are sending it to us
// Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay
// between the RC being incremented and this part being called. // between the RC being incremented and this part being called.
@ -420,7 +426,7 @@ impl BlockManager {
} }
if !block_ref.deleted.get() { if !block_ref.deleted.get() {
last_hash = Some(block_ref.block); last_hash = Some(block_ref.block);
self.put_to_resync(&block_ref.block, 0)?; self.put_to_resync(&block_ref.block, Duration::from_secs(0))?;
} }
i += 1; i += 1;
if i & 0xFF == 0 && *must_exit.borrow() { if i & 0xFF == 0 && *must_exit.borrow() {
@ -464,7 +470,7 @@ impl BlockManager {
}; };
let mut hash = [0u8; 32]; let mut hash = [0u8; 32];
hash.copy_from_slice(&hash_bytes[..]); hash.copy_from_slice(&hash_bytes[..]);
self.put_to_resync(&hash.into(), 0)?; self.put_to_resync(&hash.into(),Duration::from_secs(0))?;
} }
if *must_exit.borrow() { if *must_exit.borrow() {