Compare commits
2 commits
28bc967c83
...
20e6e9fa20
Author | SHA1 | Date | |
---|---|---|---|
Alex | 20e6e9fa20 | ||
Alex | bf25c95fe2 |
164
Cargo.lock
generated
164
Cargo.lock
generated
|
@ -220,10 +220,24 @@ checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
|
|||
dependencies = [
|
||||
"autocfg",
|
||||
"cfg-if 0.1.10",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.7.2",
|
||||
"lazy_static",
|
||||
"maybe-uninit",
|
||||
"memoffset",
|
||||
"memoffset 0.5.6",
|
||||
"scopeguard",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-epoch"
|
||||
version = "0.9.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d60ab4a8dba064f2fbb5aa270c28da5cf4bbd0e72dae1140a6b0353a779dbe00"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"crossbeam-utils 0.8.2",
|
||||
"lazy_static",
|
||||
"loom",
|
||||
"memoffset 0.6.1",
|
||||
"scopeguard",
|
||||
]
|
||||
|
||||
|
@ -238,6 +252,18 @@ dependencies = [
|
|||
"lazy_static",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-utils"
|
||||
version = "0.8.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bae8f328835f8f5a6ceb6a7842a7f2d0c03692adb5c889347235d59194731fe3"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"cfg-if 1.0.0",
|
||||
"lazy_static",
|
||||
"loom",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crypto-mac"
|
||||
version = "0.7.0"
|
||||
|
@ -503,7 +529,8 @@ dependencies = [
|
|||
"rmp-serde",
|
||||
"serde",
|
||||
"sha2",
|
||||
"sled",
|
||||
"sled 0.31.0",
|
||||
"sled 0.34.6",
|
||||
"structopt",
|
||||
"tokio",
|
||||
"toml",
|
||||
|
@ -559,7 +586,7 @@ dependencies = [
|
|||
"serde",
|
||||
"serde_bytes",
|
||||
"sha2",
|
||||
"sled",
|
||||
"sled 0.34.6",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
|
@ -584,7 +611,7 @@ dependencies = [
|
|||
"serde",
|
||||
"serde_bytes",
|
||||
"sha2",
|
||||
"sled",
|
||||
"sled 0.31.0",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
|
@ -660,7 +687,7 @@ dependencies = [
|
|||
"rmp-serde",
|
||||
"serde",
|
||||
"serde_bytes",
|
||||
"sled",
|
||||
"sled 0.34.6",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
|
@ -684,7 +711,7 @@ dependencies = [
|
|||
"rmp-serde",
|
||||
"serde",
|
||||
"serde_bytes",
|
||||
"sled",
|
||||
"sled 0.31.0",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
|
@ -708,7 +735,7 @@ dependencies = [
|
|||
"serde",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"sled",
|
||||
"sled 0.34.6",
|
||||
"tokio",
|
||||
"toml",
|
||||
"webpki",
|
||||
|
@ -734,7 +761,7 @@ dependencies = [
|
|||
"serde",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"sled",
|
||||
"sled 0.31.0",
|
||||
"tokio",
|
||||
"toml",
|
||||
"webpki",
|
||||
|
@ -764,6 +791,19 @@ version = "0.3.55"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2"
|
||||
|
||||
[[package]]
|
||||
name = "generator"
|
||||
version = "0.6.24"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a9fed24fd1e18827652b4d55652899a1e9da8e54d91624dc3437a5bc3a9f9a9c"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
"log",
|
||||
"rustversion",
|
||||
"winapi 0.3.9",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "generic-array"
|
||||
version = "0.12.3"
|
||||
|
@ -983,6 +1023,15 @@ dependencies = [
|
|||
"hashbrown",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "instant"
|
||||
version = "0.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "iovec"
|
||||
version = "0.1.4"
|
||||
|
@ -1044,6 +1093,15 @@ dependencies = [
|
|||
"scopeguard",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lock_api"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312"
|
||||
dependencies = [
|
||||
"scopeguard",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "log"
|
||||
version = "0.4.13"
|
||||
|
@ -1053,6 +1111,17 @@ dependencies = [
|
|||
"cfg-if 0.1.10",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "loom"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d44c73b4636e497b4917eb21c33539efa3816741a2d3ff26c6316f1b529481a4"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"generator",
|
||||
"scoped-tls",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "matches"
|
||||
version = "0.1.8"
|
||||
|
@ -1091,6 +1160,15 @@ dependencies = [
|
|||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memoffset"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "157b4208e3059a8f9e78d559edc658e13df41410cb3ae03979c83130067fdd87"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mio"
|
||||
version = "0.6.23"
|
||||
|
@ -1209,8 +1287,19 @@ version = "0.10.2"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
"parking_lot_core",
|
||||
"lock_api 0.3.4",
|
||||
"parking_lot_core 0.7.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb"
|
||||
dependencies = [
|
||||
"instant",
|
||||
"lock_api 0.4.2",
|
||||
"parking_lot_core 0.8.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1222,7 +1311,21 @@ dependencies = [
|
|||
"cfg-if 0.1.10",
|
||||
"cloudabi",
|
||||
"libc",
|
||||
"redox_syscall",
|
||||
"redox_syscall 0.1.57",
|
||||
"smallvec",
|
||||
"winapi 0.3.9",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot_core"
|
||||
version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"instant",
|
||||
"libc",
|
||||
"redox_syscall 0.2.5",
|
||||
"smallvec",
|
||||
"winapi 0.3.9",
|
||||
]
|
||||
|
@ -1451,6 +1554,15 @@ version = "0.1.57"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.2.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "94341e4e44e24f6b591b59e47a8a027df12e008d73fd5672dbea9cc22f4507d9"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex"
|
||||
version = "1.4.3"
|
||||
|
@ -1539,6 +1651,12 @@ version = "1.0.5"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
|
||||
|
||||
[[package]]
|
||||
name = "scoped-tls"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2"
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.1.0"
|
||||
|
@ -1635,13 +1753,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "8fb6824dde66ad33bf20c6e8476f5b82b871bc8bc3c129a10ea2f7dae5060fa3"
|
||||
dependencies = [
|
||||
"crc32fast",
|
||||
"crossbeam-epoch",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-epoch 0.8.2",
|
||||
"crossbeam-utils 0.7.2",
|
||||
"fs2",
|
||||
"fxhash",
|
||||
"libc",
|
||||
"log",
|
||||
"parking_lot",
|
||||
"parking_lot 0.10.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sled"
|
||||
version = "0.34.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1d0132f3e393bcb7390c60bb45769498cf4550bcb7a21d7f95c02b69f6362cdc"
|
||||
dependencies = [
|
||||
"crc32fast",
|
||||
"crossbeam-epoch 0.9.2",
|
||||
"crossbeam-utils 0.8.2",
|
||||
"fs2",
|
||||
"fxhash",
|
||||
"libc",
|
||||
"log",
|
||||
"parking_lot 0.11.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
@ -28,7 +28,8 @@ sha2 = "0.8"
|
|||
log = "0.4"
|
||||
pretty_env_logger = "0.4"
|
||||
|
||||
sled = "0.31"
|
||||
sled = "0.34"
|
||||
old_sled = { package = "sled", version = "0.31" }
|
||||
|
||||
structopt = { version = "0.3", default-features = false }
|
||||
toml = "0.5"
|
||||
|
|
|
@ -40,7 +40,28 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
|
|||
info!("Opening database...");
|
||||
let mut db_path = config.metadata_dir.clone();
|
||||
db_path.push("db");
|
||||
let db = sled::open(db_path).expect("Unable to open DB");
|
||||
let db = match sled::open(&db_path) {
|
||||
Ok(db) => db,
|
||||
Err(e) => {
|
||||
warn!("Old DB could not be openned ({}), attempting migration.", e);
|
||||
let old = old_sled::open(&db_path).expect("Unable to open old DB for migration");
|
||||
let mut new_path = config.metadata_dir.clone();
|
||||
new_path.push("db2");
|
||||
let new = sled::open(&new_path).expect("Unable to open new DB for migration");
|
||||
new.import(old.export());
|
||||
if old.checksum().expect("unable to compute old db checksum")
|
||||
!= new.checksum().expect("unable to compute new db checksum")
|
||||
{
|
||||
panic!("db checksums don't match after migration");
|
||||
}
|
||||
drop(new);
|
||||
drop(old);
|
||||
std::fs::remove_dir_all(&db_path).expect("Cannot remove old DB folder");
|
||||
std::fs::rename(new_path, &db_path)
|
||||
.expect("Cannot move new DB folder to correct place");
|
||||
sled::open(db_path).expect("Unable to open new DB after migration")
|
||||
}
|
||||
};
|
||||
|
||||
info!("Initialize RPC server...");
|
||||
let mut rpc_server = RpcServer::new(config.rpc_bind_addr.clone(), config.rpc_tls.clone());
|
||||
|
|
|
@ -25,7 +25,7 @@ sha2 = "0.8"
|
|||
arc-swap = "0.4"
|
||||
log = "0.4"
|
||||
|
||||
sled = "0.31"
|
||||
sled = "0.34"
|
||||
|
||||
rmp-serde = "0.14.3"
|
||||
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
||||
|
|
|
@ -1,10 +1,8 @@
|
|||
use async_trait::async_trait;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
|
||||
use garage_util::background::*;
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error;
|
||||
|
||||
use garage_table::*;
|
||||
|
||||
|
@ -42,24 +40,26 @@ pub struct BlockRefTable {
|
|||
pub block_manager: Arc<BlockManager>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TableSchema for BlockRefTable {
|
||||
type P = Hash;
|
||||
type S = UUID;
|
||||
type E = BlockRef;
|
||||
type Filter = DeletedFilter;
|
||||
|
||||
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
|
||||
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
|
||||
let block = &old.as_ref().or(new.as_ref()).unwrap().block;
|
||||
let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false);
|
||||
let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false);
|
||||
if is_after && !was_before {
|
||||
self.block_manager.block_incref(block)?;
|
||||
if let Err(e) = self.block_manager.block_incref(block) {
|
||||
warn!("block_incref failed for block {:?}: {}", block, e);
|
||||
}
|
||||
}
|
||||
if was_before && !is_after {
|
||||
self.block_manager.block_decref(block)?;
|
||||
if let Err(e) = self.block_manager.block_decref(block) {
|
||||
warn!("block_decref failed for block {:?}: {}", block, e);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
|
||||
|
|
|
@ -1,11 +1,8 @@
|
|||
use async_trait::async_trait;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use garage_table::crdt::CRDT;
|
||||
use garage_table::*;
|
||||
|
||||
use garage_util::error::Error;
|
||||
|
||||
use crate::key_table::PermissionSet;
|
||||
|
||||
// We import the same file but in its version 0.1.0.
|
||||
|
@ -100,17 +97,12 @@ impl Entry<EmptyKey, String> for Bucket {
|
|||
|
||||
pub struct BucketTable;
|
||||
|
||||
#[async_trait]
|
||||
impl TableSchema for BucketTable {
|
||||
type P = EmptyKey;
|
||||
type S = String;
|
||||
type E = Bucket;
|
||||
type Filter = DeletedFilter;
|
||||
|
||||
async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
|
||||
filter.apply(entry.is_deleted())
|
||||
}
|
||||
|
|
|
@ -1,11 +1,8 @@
|
|||
use async_trait::async_trait;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use garage_table::crdt::CRDT;
|
||||
use garage_table::*;
|
||||
|
||||
use garage_util::error::Error;
|
||||
|
||||
use model010::key_table as prev;
|
||||
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||
|
@ -92,17 +89,12 @@ impl Entry<EmptyKey, String> for Key {
|
|||
|
||||
pub struct KeyTable;
|
||||
|
||||
#[async_trait]
|
||||
impl TableSchema for KeyTable {
|
||||
type P = EmptyKey;
|
||||
type S = String;
|
||||
type E = Key;
|
||||
type Filter = DeletedFilter;
|
||||
|
||||
async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
|
||||
filter.apply(entry.deleted.get())
|
||||
}
|
||||
|
|
|
@ -1,11 +1,9 @@
|
|||
use async_trait::async_trait;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use garage_util::background::BackgroundRunner;
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error;
|
||||
|
||||
use garage_table::table_sharded::*;
|
||||
use garage_table::*;
|
||||
|
@ -191,41 +189,42 @@ pub struct ObjectTable {
|
|||
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TableSchema for ObjectTable {
|
||||
type P = String;
|
||||
type S = String;
|
||||
type E = Object;
|
||||
type Filter = DeletedFilter;
|
||||
|
||||
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
|
||||
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
|
||||
let version_table = self.version_table.clone();
|
||||
if let (Some(old_v), Some(new_v)) = (old, new) {
|
||||
// Propagate deletion of old versions
|
||||
for v in old_v.versions.iter() {
|
||||
let newly_deleted = match new_v
|
||||
.versions
|
||||
.binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
|
||||
{
|
||||
Err(_) => true,
|
||||
Ok(i) => {
|
||||
new_v.versions[i].state == ObjectVersionState::Aborted
|
||||
&& v.state != ObjectVersionState::Aborted
|
||||
self.background.spawn(async move {
|
||||
if let (Some(old_v), Some(new_v)) = (old, new) {
|
||||
// Propagate deletion of old versions
|
||||
for v in old_v.versions.iter() {
|
||||
let newly_deleted = match new_v
|
||||
.versions
|
||||
.binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
|
||||
{
|
||||
Err(_) => true,
|
||||
Ok(i) => {
|
||||
new_v.versions[i].state == ObjectVersionState::Aborted
|
||||
&& v.state != ObjectVersionState::Aborted
|
||||
}
|
||||
};
|
||||
if newly_deleted {
|
||||
let deleted_version = Version::new(
|
||||
v.uuid,
|
||||
old_v.bucket.clone(),
|
||||
old_v.key.clone(),
|
||||
true,
|
||||
vec![],
|
||||
);
|
||||
version_table.insert(&deleted_version).await?;
|
||||
}
|
||||
};
|
||||
if newly_deleted {
|
||||
let deleted_version = Version::new(
|
||||
v.uuid,
|
||||
old_v.bucket.clone(),
|
||||
old_v.key.clone(),
|
||||
true,
|
||||
vec![],
|
||||
);
|
||||
version_table.insert(&deleted_version).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
|
||||
|
|
|
@ -1,10 +1,8 @@
|
|||
use async_trait::async_trait;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
|
||||
use garage_util::background::BackgroundRunner;
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error;
|
||||
|
||||
use garage_table::table_sharded::*;
|
||||
use garage_table::*;
|
||||
|
@ -112,31 +110,32 @@ pub struct VersionTable {
|
|||
pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TableSchema for VersionTable {
|
||||
type P = Hash;
|
||||
type S = EmptyKey;
|
||||
type E = Version;
|
||||
type Filter = DeletedFilter;
|
||||
|
||||
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
|
||||
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
|
||||
let block_ref_table = self.block_ref_table.clone();
|
||||
if let (Some(old_v), Some(new_v)) = (old, new) {
|
||||
// Propagate deletion of version blocks
|
||||
if new_v.deleted && !old_v.deleted {
|
||||
let deleted_block_refs = old_v
|
||||
.blocks
|
||||
.iter()
|
||||
.map(|vb| BlockRef {
|
||||
block: vb.hash,
|
||||
version: old_v.uuid,
|
||||
deleted: true,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
block_ref_table.insert_many(&deleted_block_refs[..]).await?;
|
||||
self.background.spawn(async move {
|
||||
if let (Some(old_v), Some(new_v)) = (old, new) {
|
||||
// Propagate deletion of version blocks
|
||||
if new_v.deleted && !old_v.deleted {
|
||||
let deleted_block_refs = old_v
|
||||
.blocks
|
||||
.iter()
|
||||
.map(|vb| BlockRef {
|
||||
block: vb.hash,
|
||||
version: old_v.uuid,
|
||||
deleted: true,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
block_ref_table.insert_many(&deleted_block_refs[..]).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
|
||||
|
|
|
@ -310,7 +310,9 @@ impl RpcHttpClient {
|
|||
ClientMethod::HTTPS(client) => client.request(req).fuse(),
|
||||
};
|
||||
|
||||
trace!("({}) Acquiring request_limiter slot...", path);
|
||||
let slot = self.request_limiter.acquire().await;
|
||||
trace!("({}) Got slot, doing request to {}...", path, to_addr);
|
||||
let resp = tokio::time::timeout(timeout, resp_fut)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
|
@ -330,6 +332,7 @@ impl RpcHttpClient {
|
|||
})?;
|
||||
|
||||
let status = resp.status();
|
||||
trace!("({}) Request returned, got status {}", path, status);
|
||||
let body = hyper::body::to_bytes(resp.into_body()).await?;
|
||||
drop(slot);
|
||||
|
||||
|
|
|
@ -48,6 +48,12 @@ where
|
|||
let begin_time = Instant::now();
|
||||
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
|
||||
let msg = rmp_serde::decode::from_read::<_, M>(whole_body.into_buf())?;
|
||||
|
||||
trace!(
|
||||
"Request message: {}",
|
||||
serde_json::to_string(&msg).unwrap_or("<json error>".into())
|
||||
);
|
||||
|
||||
match handler(msg, sockaddr).await {
|
||||
Ok(resp) => {
|
||||
let resp_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Ok(resp))?;
|
||||
|
@ -112,7 +118,8 @@ impl RpcServer {
|
|||
return Ok(bad_request);
|
||||
}
|
||||
|
||||
let path = &req.uri().path()[1..];
|
||||
let path = &req.uri().path()[1..].to_string();
|
||||
|
||||
let handler = match self.handlers.get(path) {
|
||||
Some(h) => h,
|
||||
None => {
|
||||
|
@ -122,6 +129,8 @@ impl RpcServer {
|
|||
}
|
||||
};
|
||||
|
||||
trace!("({}) Handling request", path);
|
||||
|
||||
let resp_waiter = tokio::spawn(handler(req, addr));
|
||||
match resp_waiter.await {
|
||||
Err(err) => {
|
||||
|
@ -131,11 +140,15 @@ impl RpcServer {
|
|||
Ok(ise)
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
trace!("({}) Request handler failed: {}", path, err);
|
||||
let mut bad_request = Response::new(Body::from(format!("{}", err)));
|
||||
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
|
||||
Ok(bad_request)
|
||||
}
|
||||
Ok(Ok(resp)) => Ok(resp),
|
||||
Ok(Ok(resp)) => {
|
||||
trace!("({}) Request handler succeeded", path);
|
||||
Ok(resp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ arc-swap = "0.4"
|
|||
log = "0.4"
|
||||
hexdump = "0.1"
|
||||
|
||||
sled = "0.31"
|
||||
sled = "0.34"
|
||||
|
||||
rmp-serde = "0.14.3"
|
||||
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
use async_trait::async_trait;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error;
|
||||
|
||||
pub trait PartitionKey {
|
||||
fn hash(&self) -> Hash;
|
||||
|
@ -45,7 +43,6 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
|
|||
fn merge(&mut self, other: &Self);
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait TableSchema: Send + Sync {
|
||||
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||
|
@ -58,7 +55,12 @@ pub trait TableSchema: Send + Sync {
|
|||
None
|
||||
}
|
||||
|
||||
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error>;
|
||||
// Updated triggers some stuff downstream, but it is not supposed to block or fail,
|
||||
// as the update itself is an unchangeable fact that will never go back
|
||||
// due to CRDT logic. Typically errors in propagation of info should be logged
|
||||
// to stderr.
|
||||
fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {}
|
||||
|
||||
fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
|
||||
true
|
||||
}
|
||||
|
|
|
@ -394,7 +394,7 @@ where
|
|||
Some(prev_bytes) => {
|
||||
let old_entry = self
|
||||
.decode_entry(&prev_bytes)
|
||||
.map_err(sled::ConflictableTransactionError::Abort)?;
|
||||
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
|
||||
let mut new_entry = old_entry.clone();
|
||||
new_entry.merge(&update);
|
||||
(Some(old_entry), new_entry)
|
||||
|
@ -404,7 +404,7 @@ where
|
|||
|
||||
let new_bytes = rmp_to_vec_all_named(&new_entry)
|
||||
.map_err(Error::RMPEncode)
|
||||
.map_err(sled::ConflictableTransactionError::Abort)?;
|
||||
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
|
||||
db.insert(tree_key.clone(), new_bytes)?;
|
||||
Ok((old_entry, new_entry))
|
||||
})?;
|
||||
|
@ -414,7 +414,7 @@ where
|
|||
epidemic_propagate.push(new_entry.clone());
|
||||
}
|
||||
|
||||
self.instance.updated(old_entry, Some(new_entry)).await?;
|
||||
self.instance.updated(old_entry, Some(new_entry));
|
||||
syncer.invalidate(&tree_key[..]);
|
||||
}
|
||||
}
|
||||
|
@ -429,11 +429,7 @@ where
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_if_equal(
|
||||
self: &Arc<Self>,
|
||||
k: &[u8],
|
||||
v: &[u8],
|
||||
) -> Result<bool, Error> {
|
||||
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
|
||||
let removed = self.store.transaction(|txn| {
|
||||
if let Some(cur_v) = self.store.get(k)? {
|
||||
if cur_v == v {
|
||||
|
@ -445,7 +441,7 @@ where
|
|||
})?;
|
||||
if removed {
|
||||
let old_entry = self.decode_entry(v)?;
|
||||
self.instance.updated(Some(old_entry), None).await?;
|
||||
self.instance.updated(Some(old_entry), None);
|
||||
self.syncer.load_full().unwrap().invalidate(k);
|
||||
}
|
||||
Ok(removed)
|
||||
|
|
|
@ -348,14 +348,8 @@ where
|
|||
}
|
||||
|
||||
// All remote nodes have written those items, now we can delete them locally
|
||||
for was_removed in join_all(
|
||||
items
|
||||
.iter()
|
||||
.map(|(k, v)| self.table.delete_if_equal(&k[..], &v[..])),
|
||||
)
|
||||
.await
|
||||
{
|
||||
was_removed?;
|
||||
for (k, v) in items.iter() {
|
||||
self.table.delete_if_equal(&k[..], &v[..])?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -391,6 +385,7 @@ where
|
|||
must_exit: &mut watch::Receiver<bool>,
|
||||
) -> Result<RangeChecksum, Error> {
|
||||
assert!(range.level != 0);
|
||||
trace!("Call range_checksum {:?}", range);
|
||||
|
||||
if range.level == 1 {
|
||||
let mut children = vec![];
|
||||
|
@ -406,6 +401,7 @@ where
|
|||
.iter()
|
||||
.all(|x| *x == 0u8)
|
||||
{
|
||||
trace!("range_checksum {:?} returning {} items", range, children.len());
|
||||
return Ok(RangeChecksum {
|
||||
bounds: range.clone(),
|
||||
children,
|
||||
|
@ -420,6 +416,7 @@ where
|
|||
};
|
||||
children.push((item_range, blake2sum(&value[..])));
|
||||
}
|
||||
trace!("range_checksum {:?} returning {} items", range, children.len());
|
||||
Ok(RangeChecksum {
|
||||
bounds: range.clone(),
|
||||
children,
|
||||
|
@ -445,6 +442,7 @@ where
|
|||
}
|
||||
|
||||
if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() {
|
||||
trace!("range_checksum {:?} returning {} items", range, children.len());
|
||||
return Ok(RangeChecksum {
|
||||
bounds: range.clone(),
|
||||
children,
|
||||
|
@ -459,6 +457,7 @@ where
|
|||
.iter()
|
||||
.all(|x| *x == 0u8)
|
||||
{
|
||||
trace!("range_checksum {:?} returning {} items", range, children.len());
|
||||
return Ok(RangeChecksum {
|
||||
bounds: range.clone(),
|
||||
children,
|
||||
|
@ -469,6 +468,7 @@ where
|
|||
|
||||
sub_range.begin = found_limit;
|
||||
}
|
||||
trace!("range_checksum {:?} exiting due to must_exit", range);
|
||||
Err(Error::Message(format!("Exiting.")))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ err-derive = "0.2.3"
|
|||
log = "0.4"
|
||||
fasthash = "0.4"
|
||||
|
||||
sled = "0.31"
|
||||
sled = "0.34"
|
||||
|
||||
toml = "0.5"
|
||||
rmp-serde = "0.14.3"
|
||||
|
|
|
@ -73,11 +73,11 @@ pub enum Error {
|
|||
Message(String),
|
||||
}
|
||||
|
||||
impl From<sled::TransactionError<Error>> for Error {
|
||||
fn from(e: sled::TransactionError<Error>) -> Error {
|
||||
impl From<sled::transaction::TransactionError<Error>> for Error {
|
||||
fn from(e: sled::transaction::TransactionError<Error>) -> Error {
|
||||
match e {
|
||||
sled::TransactionError::Abort(x) => x,
|
||||
sled::TransactionError::Storage(x) => Error::Sled(x),
|
||||
sled::transaction::TransactionError::Abort(x) => x,
|
||||
sled::transaction::TransactionError::Storage(x) => Error::Sled(x),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue