WIP add content defined chunking #42

Closed
trinity-1686a wants to merge 42 commits from content-defined-chunking into master
5 changed files with 16 additions and 10 deletions
Showing only changes of commit 3214dd52dd - Show all commits

View file

@ -411,7 +411,9 @@ pub async fn handle_put_part(
// Store part etag in version // Store part etag in version
let data_md5sum_hex = hex::encode(data_md5sum); let data_md5sum_hex = hex::encode(data_md5sum);
let mut version = version; let mut version = version;
version.parts_etags.put(part_number, data_md5sum_hex.clone()); version
.parts_etags
.put(part_number, data_md5sum_hex.clone());
garage.version_table.insert(&version).await?; garage.version_table.insert(&version).await?;
let response = Response::builder() let response = Response::builder()
@ -495,11 +497,7 @@ pub async fn handle_complete_multipart_upload(
for (_, etag) in version.parts_etags.items().iter() { for (_, etag) in version.parts_etags.items().iter() {
etag_md5_hasher.update(etag.as_bytes()); etag_md5_hasher.update(etag.as_bytes());
} }
let etag = format!( let etag = format!("{}-{}", hex::encode(etag_md5_hasher.finalize()), num_parts);
"{}-{}",
hex::encode(etag_md5_hasher.finalize()),
num_parts
);
// Calculate total size of final object // Calculate total size of final object
let total_size = version let total_size = version

View file

@ -225,4 +225,3 @@ impl TableSchema for ObjectTable {
filter.apply(deleted) filter.apply(deleted)
} }
} }

View file

@ -172,7 +172,7 @@ impl Ring {
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> { pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
if self.ring.len() != 1 << PARTITION_BITS { if self.ring.len() != 1 << PARTITION_BITS {
warn!("Ring not yet ready, read/writes will be lost"); warn!("Ring not yet ready, read/writes will be lost!");
return vec![]; return vec![];
} }

View file

@ -44,6 +44,7 @@ impl TableReplication for TableShardedReplication {
fn split_points(&self, ring: &Ring) -> Vec<Hash> { fn split_points(&self, ring: &Ring) -> Vec<Hash> {
let mut ret = vec![]; let mut ret = vec![];
ret.push([0u8; 32].into());
for entry in ring.ring.iter() { for entry in ring.ring.iter() {
ret.push(entry.location); ret.push(entry.location);
} }

View file

@ -18,10 +18,14 @@ use garage_util::error::Error;
use crate::*; use crate::*;
const MAX_DEPTH: usize = 16; const MAX_DEPTH: usize = 16;
const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800);
const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
// Scan & sync every 12 hours
const SCAN_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60);
// Expire cache after 30 minutes
const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(30 * 60);
pub struct TableSyncer<F: TableSchema, R: TableReplication> { pub struct TableSyncer<F: TableSchema, R: TableReplication> {
table: Arc<Table<F, R>>, table: Arc<Table<F, R>>,
todo: Mutex<SyncTodo>, todo: Mutex<SyncTodo>,
@ -797,6 +801,10 @@ impl SyncTodo {
for i in 0..split_points.len() - 1 { for i in 0..split_points.len() - 1 {
let begin = split_points[i]; let begin = split_points[i];
let end = split_points[i + 1]; let end = split_points[i + 1];
if begin == end {
continue;
}
let nodes = table.replication.replication_nodes(&begin, &ring); let nodes = table.replication.replication_nodes(&begin, &ring);
let retain = nodes.contains(&my_id); let retain = nodes.contains(&my_id);