Move block manager to separate module

This commit is contained in:
Alex 2022-03-15 12:04:12 +01:00
parent 8565f7dc31
commit c1d9854d2c
Signed by: lx
GPG key ID: 0E496D15096376BE
12 changed files with 89 additions and 41 deletions

25
Cargo.lock generated
View file

@ -883,6 +883,7 @@ dependencies = [
"form_urlencoded",
"futures",
"futures-util",
"garage_block",
"garage_model 0.7.0",
"garage_table 0.7.0",
"garage_util 0.7.0",
@ -910,6 +911,29 @@ dependencies = [
"url",
]
[[package]]
name = "garage_block"
version = "0.7.0"
dependencies = [
"async-trait",
"bytes 1.1.0",
"futures",
"futures-util",
"garage_rpc 0.7.0",
"garage_table 0.7.0",
"garage_util 0.7.0",
"hex",
"opentelemetry",
"rand 0.8.5",
"rmp-serde 0.15.5",
"serde",
"serde_bytes",
"sled",
"tokio",
"tracing",
"zstd",
]
[[package]]
name = "garage_model"
version = "0.5.1"
@ -944,6 +968,7 @@ dependencies = [
"err-derive 0.3.1",
"futures",
"futures-util",
"garage_block",
"garage_model 0.5.1",
"garage_rpc 0.7.0",
"garage_table 0.7.0",

View file

@ -16,6 +16,7 @@ path = "lib.rs"
[dependencies]
garage_model = { version = "0.7.0", path = "../model" }
garage_table = { version = "0.7.0", path = "../table" }
garage_block = { version = "0.7.0", path = "../block" }
garage_util = { version = "0.7.0", path = "../util" }
base64 = "0.13"

View file

@ -13,7 +13,7 @@ use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::*;
use garage_model::block::INLINE_THRESHOLD;
use garage_block::manager::INLINE_THRESHOLD;
use garage_model::block_ref_table::*;
use garage_model::garage::Garage;
use garage_model::object_table::*;

38
src/block/Cargo.toml Normal file
View file

@ -0,0 +1,38 @@
[package]
name = "garage_block"
version = "0.7.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
description = "Block manager for the Garage object store"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
readme = "../../README.md"
[lib]
path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_util = { version = "0.7.0", path = "../util" }
garage_table = { version = "0.7.0", path = "../table" }
opentelemetry = "0.17"
async-trait = "0.1.7"
bytes = "1.0"
hex = "0.4"
tracing = "0.1.30"
rand = "0.8"
zstd = { version = "0.9", default-features = false }
sled = "0.34"
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }

6
src/block/lib.rs Normal file
View file

@ -0,0 +1,6 @@
#[macro_use]
extern crate tracing;
pub mod manager;
mod metrics;

View file

@ -3,7 +3,6 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use zstd::stream::{decode_all as zstd_decode, Encoder};
@ -31,9 +30,7 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block_metrics::*;
use crate::block_ref_table::*;
use crate::garage::Garage;
use crate::metrics::*;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
@ -151,6 +148,8 @@ pub struct BlockManager {
pub replication: TableShardedReplication,
/// Directory in which block are stored
pub data_dir: PathBuf,
/// Zstd compression level
compression_level: Option<i32>,
mutation_lock: Mutex<BlockManagerLocked>,
@ -162,7 +161,6 @@ pub struct BlockManager {
system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
pub(crate) garage: ArcSwapOption<Garage>,
metrics: BlockManagerMetrics,
}
@ -176,6 +174,7 @@ impl BlockManager {
pub fn new(
db: &sled::Db,
data_dir: PathBuf,
compression_level: Option<i32>,
replication: TableShardedReplication,
system: Arc<System>,
) -> Arc<Self> {
@ -204,6 +203,7 @@ impl BlockManager {
let block_manager = Arc::new(Self {
replication,
data_dir,
compression_level,
mutation_lock: Mutex::new(manager_locked),
rc,
resync_queue,
@ -211,7 +211,6 @@ impl BlockManager {
resync_errors,
system,
endpoint,
garage: ArcSwapOption::from(None),
metrics,
});
block_manager.endpoint.set_handler(block_manager.clone());
@ -257,14 +256,7 @@ impl BlockManager {
/// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash);
let compression_level = self
.garage
.load()
.as_ref()
.unwrap()
.config
.compression_level;
let data = DataBlock::from_buffer(data, compression_level);
let data = DataBlock::from_buffer(data, self.compression_level);
self.system
.rpc
.try_call_many(
@ -286,18 +278,10 @@ impl BlockManager {
/// to fix any mismatch between the two.
pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
// 1. Repair blocks from RC table.
let garage = self.garage.load_full().unwrap();
let mut last_hash = None;
for (i, entry) in garage.block_ref_table.data.store.iter().enumerate() {
let (_k, v_bytes) = entry?;
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?;
if Some(&block_ref.block) == last_hash.as_ref() {
continue;
}
if !block_ref.deleted.get() {
last_hash = Some(block_ref.block);
self.put_to_resync(&block_ref.block, Duration::from_secs(0))?;
}
for (i, entry) in self.rc.iter().enumerate() {
let (hash, _) = entry?;
let hash = Hash::try_from(&hash[..]).unwrap();
self.put_to_resync(&hash, Duration::from_secs(0))?;
if i & 0xFF == 0 && *must_exit.borrow() {
return Ok(());
}

View file

@ -98,8 +98,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
// Await for netapp RPC system to end
run_system.await?;
// Break last reference cycles so that stuff can terminate properly
garage.break_reference_cycles();
// Drop all references so that stuff can terminate properly
drop(garage);
// Await for all background tasks to end

View file

@ -16,6 +16,7 @@ path = "lib.rs"
[dependencies]
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.7.0", path = "../table" }
garage_block = { version = "0.7.0", path = "../block" }
garage_util = { version = "0.7.0", path = "../util" }
garage_model_050 = { package = "garage_model", version = "0.5.1" }

View file

@ -6,7 +6,7 @@ use garage_util::data::*;
use garage_table::crdt::Crdt;
use garage_table::*;
use crate::block::*;
use garage_block::manager::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BlockRef {

View file

@ -11,8 +11,8 @@ use garage_table::replication::ReplicationMode;
use garage_table::replication::TableFullReplication;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
use garage_block::manager::*;
use crate::block::*;
use crate::block_ref_table::*;
use crate::bucket_alias_table::*;
use crate::bucket_table::*;
@ -87,7 +87,10 @@ impl Garage {
info!("Initialize block manager...");
let block_manager =
BlockManager::new(&db, config.data_dir.clone(), data_rep_param, system.clone());
BlockManager::new(&db,
config.data_dir.clone(),
config.compression_level,
data_rep_param, system.clone());
info!("Initialize block_ref_table...");
let block_ref_table = Table::new(
@ -151,17 +154,11 @@ impl Garage {
});
info!("Start block manager background thread...");
garage.block_manager.garage.swap(Some(garage.clone()));
garage.block_manager.clone().spawn_background_worker();
garage
}
/// Use this for shutdown
pub fn break_reference_cycles(&self) {
self.block_manager.garage.swap(None);
}
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
helper::bucket::BucketHelper(self)
}

View file

@ -10,9 +10,6 @@ pub mod key_table;
pub mod object_table;
pub mod version_table;
pub mod block;
mod block_metrics;
pub mod garage;
pub mod helper;
pub mod migrate;