WIP add content defined chunking #42
3 changed files with 28 additions and 27 deletions
23
Cargo.lock
generated
23
Cargo.lock
generated
|
@ -233,6 +233,12 @@ dependencies = [
|
||||||
"synstructure",
|
"synstructure",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "fastcdc"
|
||||||
|
version = "1.0.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "5afa29be46b12c8c380b997def8d1ac77c2665da93eb0a768fab0bf4db79333f"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fasthash"
|
name = "fasthash"
|
||||||
version = "0.4.0"
|
version = "0.4.0"
|
||||||
|
@ -255,12 +261,6 @@ dependencies = [
|
||||||
"gcc",
|
"gcc",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "fmt-extra"
|
|
||||||
version = "0.2.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "07f11f71b1f9be830047fbb1899d90601c3b21a471dc99fe1057303eee37f2b9"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fnv"
|
name = "fnv"
|
||||||
version = "1.0.7"
|
version = "1.0.7"
|
||||||
|
@ -432,12 +432,12 @@ dependencies = [
|
||||||
"chrono",
|
"chrono",
|
||||||
"crypto-mac 0.10.0",
|
"crypto-mac 0.10.0",
|
||||||
"err-derive",
|
"err-derive",
|
||||||
|
"fastcdc",
|
||||||
"futures",
|
"futures",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"garage_model",
|
"garage_model",
|
||||||
"garage_table",
|
"garage_table",
|
||||||
"garage_util",
|
"garage_util",
|
||||||
"hash-roll",
|
|
||||||
"hex",
|
"hex",
|
||||||
"hmac",
|
"hmac",
|
||||||
"http",
|
"http",
|
||||||
|
@ -662,15 +662,6 @@ dependencies = [
|
||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "hash-roll"
|
|
||||||
version = "0.3.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "a9e27803a4b526df90ed2a3f60523eeec6b5ace6ba7530f9920fbee82027fa11"
|
|
||||||
dependencies = [
|
|
||||||
"fmt-extra",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hashbrown"
|
name = "hashbrown"
|
||||||
version = "0.9.1"
|
version = "0.9.1"
|
||||||
|
|
|
@ -22,7 +22,7 @@ bytes = "1.0"
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
crypto-mac = "0.10"
|
crypto-mac = "0.10"
|
||||||
err-derive = "0.3"
|
err-derive = "0.3"
|
||||||
hash-roll = "0.3.0"
|
fastcdc = "1.0.5"
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
hmac = "0.10"
|
hmac = "0.10"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
|
|
|
@ -2,8 +2,8 @@ use std::collections::{BTreeMap, VecDeque};
|
||||||
use std::fmt::Write;
|
use std::fmt::Write;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use fastcdc::{Chunk, FastCDC};
|
||||||
use futures::stream::*;
|
use futures::stream::*;
|
||||||
use hash_roll::{ChunkIncr, fastcdc::{FastCdc, FastCdcIncr}, gear_table::GEAR_64};
|
|
||||||
use hyper::{Body, Request, Response};
|
use hyper::{Body, Request, Response};
|
||||||
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
|
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
|
||||||
use sha2::Sha256;
|
use sha2::Sha256;
|
||||||
|
@ -269,25 +269,33 @@ async fn put_block_meta(
|
||||||
struct BodyChunker {
|
struct BodyChunker {
|
||||||
body: Body,
|
body: Body,
|
||||||
read_all: bool,
|
read_all: bool,
|
||||||
|
min_block_size: usize,
|
||||||
|
avg_block_size: usize,
|
||||||
max_block_size: usize,
|
max_block_size: usize,
|
||||||
buf: VecDeque<u8>,
|
buf: VecDeque<u8>,
|
||||||
chunker: FastCdcIncr<'static>,
|
chunk_buf: VecDeque<Chunk>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BodyChunker {
|
impl BodyChunker {
|
||||||
fn new(body: Body, block_size: usize) -> Self {
|
fn new(body: Body, block_size: usize) -> Self {
|
||||||
|
let min_block_size = block_size / 4 * 3;
|
||||||
|
let avg_block_size = block_size;
|
||||||
let max_block_size = block_size * 2;
|
let max_block_size = block_size * 2;
|
||||||
let chunker = FastCdc::new(&GEAR_64, block_size as u64 / 2, block_size as u64, max_block_size as u64);
|
|
||||||
let chunker = (&chunker).into();
|
|
||||||
Self {
|
Self {
|
||||||
body,
|
body,
|
||||||
read_all: false,
|
read_all: false,
|
||||||
|
min_block_size,
|
||||||
|
avg_block_size,
|
||||||
max_block_size,
|
max_block_size,
|
||||||
buf: VecDeque::with_capacity(2 * max_block_size),
|
buf: VecDeque::with_capacity(2 * max_block_size),
|
||||||
chunker,
|
chunk_buf: VecDeque::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
|
async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
|
||||||
|
if let Some(Chunk {length, ..}) = self.chunk_buf.pop_front() {
|
||||||
|
let block = self.buf.drain(..length).collect::<Vec<u8>>();
|
||||||
|
return Ok(Some(block));
|
||||||
|
}
|
||||||
while !self.read_all && self.buf.len() < self.max_block_size {
|
while !self.read_all && self.buf.len() < self.max_block_size {
|
||||||
if let Some(block) = self.body.next().await {
|
if let Some(block) = self.body.next().await {
|
||||||
let bytes = block?;
|
let bytes = block?;
|
||||||
|
@ -299,12 +307,14 @@ impl BodyChunker {
|
||||||
}
|
}
|
||||||
if self.buf.len() == 0 {
|
if self.buf.len() == 0 {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
} else if let Some(index) = self.chunker.push(self.buf.make_contiguous()) {
|
|
||||||
let block = self.buf.drain(..index).collect::<Vec<u8>>();
|
|
||||||
Ok(Some(block))
|
|
||||||
} else {
|
} else {
|
||||||
let block = self.buf.drain(..).collect::<Vec<u8>>();
|
let mut iter = FastCDC::with_eof(self.buf.make_contiguous(), self.min_block_size, self.avg_block_size, self.max_block_size, self.read_all);
|
||||||
Ok(Some(block))
|
if let Some(Chunk {length, ..}) = iter.next() {
|
||||||
|
let block = self.buf.drain(..length).collect::<Vec<u8>>();
|
||||||
|
Ok(Some(block))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue