improve internal item counter mechanisms and implement bucket quotas #326
9 changed files with 302 additions and 89 deletions
|
@ -24,7 +24,7 @@ use garage_model::migrate::Migrate;
|
||||||
use garage_model::permission::*;
|
use garage_model::permission::*;
|
||||||
|
|
||||||
use crate::cli::*;
|
use crate::cli::*;
|
||||||
use crate::repair::Repair;
|
use crate::repair::online::OnlineRepair;
|
||||||
|
|
||||||
pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
|
pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
|
||||||
|
|
||||||
|
@ -619,7 +619,7 @@ impl AdminRpcHandler {
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let repair = Repair {
|
let repair = OnlineRepair {
|
||||||
garage: self.garage.clone(),
|
garage: self.garage.clone(),
|
||||||
};
|
};
|
||||||
self.garage
|
self.garage
|
||||||
|
|
|
@ -33,10 +33,15 @@ pub enum Command {
|
||||||
#[structopt(name = "migrate")]
|
#[structopt(name = "migrate")]
|
||||||
Migrate(MigrateOpt),
|
Migrate(MigrateOpt),
|
||||||
|
|
||||||
/// Start repair of node data
|
/// Start repair of node data on remote node
|
||||||
#[structopt(name = "repair")]
|
#[structopt(name = "repair")]
|
||||||
Repair(RepairOpt),
|
Repair(RepairOpt),
|
||||||
|
|
||||||
|
/// Offline reparation of node data (these repairs must be run offline
|
||||||
|
/// directly on the server node)
|
||||||
|
#[structopt(name = "offline-repair")]
|
||||||
|
OfflineRepair(OfflineRepairOpt),
|
||||||
|
|
||||||
/// Gather node statistics
|
/// Gather node statistics
|
||||||
#[structopt(name = "stats")]
|
#[structopt(name = "stats")]
|
||||||
Stats(StatsOpt),
|
Stats(StatsOpt),
|
||||||
|
@ -405,6 +410,23 @@ pub enum RepairWhat {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
||||||
|
pub struct OfflineRepairOpt {
|
||||||
|
/// Confirm the launch of the repair operation
|
||||||
|
#[structopt(long = "yes")]
|
||||||
|
pub yes: bool,
|
||||||
|
|
||||||
|
#[structopt(subcommand)]
|
||||||
|
pub what: OfflineRepairWhat,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
|
||||||
|
pub enum OfflineRepairWhat {
|
||||||
|
/// Repair K2V item counters
|
||||||
|
#[structopt(name = "k2v_item_counters")]
|
||||||
|
K2VItemCounters,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
||||||
pub struct StatsOpt {
|
pub struct StatsOpt {
|
||||||
/// Gather statistics from all nodes
|
/// Gather statistics from all nodes
|
||||||
|
|
|
@ -61,17 +61,17 @@ async fn main() {
|
||||||
pretty_env_logger::init();
|
pretty_env_logger::init();
|
||||||
sodiumoxide::init().expect("Unable to init sodiumoxide");
|
sodiumoxide::init().expect("Unable to init sodiumoxide");
|
||||||
|
|
||||||
let opt = Opt::from_args();
|
|
||||||
|
|
||||||
let res = match opt.cmd {
|
|
||||||
Command::Server => {
|
|
||||||
// Abort on panic (same behavior as in Go)
|
// Abort on panic (same behavior as in Go)
|
||||||
std::panic::set_hook(Box::new(|panic_info| {
|
std::panic::set_hook(Box::new(|panic_info| {
|
||||||
error!("{}", panic_info.to_string());
|
error!("{}", panic_info.to_string());
|
||||||
std::process::abort();
|
std::process::abort();
|
||||||
}));
|
}));
|
||||||
|
|
||||||
server::run_server(opt.config_file).await
|
let opt = Opt::from_args();
|
||||||
|
let res = match opt.cmd {
|
||||||
|
Command::Server => server::run_server(opt.config_file).await,
|
||||||
|
Command::OfflineRepair(repair_opt) => {
|
||||||
|
repair::offline::offline_repair(opt.config_file, repair_opt).await
|
||||||
}
|
}
|
||||||
Command::Node(NodeOperation::NodeId(node_id_opt)) => {
|
Command::Node(NodeOperation::NodeId(node_id_opt)) => {
|
||||||
node_id_command(opt.config_file, node_id_opt.quiet)
|
node_id_command(opt.config_file, node_id_opt.quiet)
|
||||||
|
|
2
src/garage/repair/mod.rs
Normal file
2
src/garage/repair/mod.rs
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
pub mod offline;
|
||||||
|
pub mod online;
|
52
src/garage/repair/offline.rs
Normal file
52
src/garage/repair/offline.rs
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
use tokio::sync::watch;
|
||||||
|
|
||||||
|
use garage_util::background::*;
|
||||||
|
use garage_util::config::*;
|
||||||
|
use garage_util::error::*;
|
||||||
|
|
||||||
|
use garage_model::garage::Garage;
|
||||||
|
|
||||||
|
use crate::cli::structs::*;
|
||||||
|
|
||||||
|
pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Result<(), Error> {
|
||||||
|
if !opt.yes {
|
||||||
|
return Err(Error::Message(
|
||||||
|
"Please add the --yes flag to launch repair operation".into(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Loading configuration...");
|
||||||
|
let config = read_config(config_file)?;
|
||||||
|
|
||||||
|
info!("Initializing background runner...");
|
||||||
|
let (done_tx, done_rx) = watch::channel(false);
|
||||||
|
let (background, await_background_done) = BackgroundRunner::new(16, done_rx);
|
||||||
|
|
||||||
|
info!("Initializing Garage main data store...");
|
||||||
|
let garage = Garage::new(config.clone(), background)?;
|
||||||
|
|
||||||
|
info!("Launching repair operation...");
|
||||||
|
match opt.what {
|
||||||
|
OfflineRepairWhat::K2VItemCounters => {
|
||||||
|
#[cfg(feature = "k2v")]
|
||||||
|
garage
|
||||||
|
.k2v
|
||||||
|
.counter_table
|
||||||
|
.offline_recount_all(&garage.k2v.item_table)?;
|
||||||
|
#[cfg(not(feature = "k2v"))]
|
||||||
|
error!("K2V not enabled in this build.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Repair operation finished, shutting down Garage internals...");
|
||||||
|
done_tx.send(true).unwrap();
|
||||||
|
drop(garage);
|
||||||
|
|
||||||
|
await_background_done.await?;
|
||||||
|
|
||||||
|
info!("Cleaning up...");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -11,11 +11,11 @@ use garage_util::error::Error;
|
||||||
|
|
||||||
use crate::*;
|
use crate::*;
|
||||||
|
|
||||||
pub struct Repair {
|
pub struct OnlineRepair {
|
||||||
pub garage: Arc<Garage>,
|
pub garage: Arc<Garage>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Repair {
|
impl OnlineRepair {
|
||||||
pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) {
|
pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) {
|
||||||
if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
|
if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
|
||||||
warn!("Repair worker failed with error: {}", e);
|
warn!("Repair worker failed with error: {}", e);
|
|
@ -2,8 +2,6 @@ use std::path::PathBuf;
|
||||||
|
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use garage_db as db;
|
|
||||||
|
|
||||||
use garage_util::background::*;
|
use garage_util::background::*;
|
||||||
use garage_util::config::*;
|
use garage_util::config::*;
|
||||||
use garage_util::error::Error;
|
use garage_util::error::Error;
|
||||||
|
@ -29,62 +27,14 @@ async fn wait_from(mut chan: watch::Receiver<bool>) {
|
||||||
|
|
||||||
pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
|
pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
|
||||||
info!("Loading configuration...");
|
info!("Loading configuration...");
|
||||||
let config = read_config(config_file).expect("Unable to read config file");
|
let config = read_config(config_file)?;
|
||||||
|
|
||||||
info!("Opening database...");
|
|
||||||
let mut db_path = config.metadata_dir.clone();
|
|
||||||
std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory");
|
|
||||||
let db = match config.db_engine.as_str() {
|
|
||||||
"sled" => {
|
|
||||||
db_path.push("db");
|
|
||||||
info!("Opening Sled database at: {}", db_path.display());
|
|
||||||
let db = db::sled_adapter::sled::Config::default()
|
|
||||||
.path(&db_path)
|
|
||||||
.cache_capacity(config.sled_cache_capacity)
|
|
||||||
.flush_every_ms(Some(config.sled_flush_every_ms))
|
|
||||||
.open()
|
|
||||||
.expect("Unable to open sled DB");
|
|
||||||
db::sled_adapter::SledDb::init(db)
|
|
||||||
}
|
|
||||||
"sqlite" | "sqlite3" | "rusqlite" => {
|
|
||||||
db_path.push("db.sqlite");
|
|
||||||
info!("Opening Sqlite database at: {}", db_path.display());
|
|
||||||
let db = db::sqlite_adapter::rusqlite::Connection::open(db_path)
|
|
||||||
.expect("Unable to open sqlite DB");
|
|
||||||
db::sqlite_adapter::SqliteDb::init(db)
|
|
||||||
}
|
|
||||||
"lmdb" | "heed" => {
|
|
||||||
db_path.push("db.lmdb");
|
|
||||||
info!("Opening LMDB database at: {}", db_path.display());
|
|
||||||
std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory");
|
|
||||||
let map_size = if u32::MAX as usize == usize::MAX {
|
|
||||||
warn!("LMDB is not recommended on 32-bit systems, database size will be limited");
|
|
||||||
1usize << 30 // 1GB for 32-bit systems
|
|
||||||
} else {
|
|
||||||
1usize << 40 // 1TB for 64-bit systems
|
|
||||||
};
|
|
||||||
|
|
||||||
let db = db::lmdb_adapter::heed::EnvOpenOptions::new()
|
|
||||||
.max_dbs(100)
|
|
||||||
.map_size(map_size)
|
|
||||||
.open(&db_path)
|
|
||||||
.expect("Unable to open LMDB DB");
|
|
||||||
db::lmdb_adapter::LmdbDb::init(db)
|
|
||||||
}
|
|
||||||
e => {
|
|
||||||
return Err(Error::Message(format!(
|
|
||||||
"Unsupported DB engine: {} (options: sled, sqlite, lmdb)",
|
|
||||||
e
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
info!("Initializing background runner...");
|
info!("Initializing background runner...");
|
||||||
let watch_cancel = netapp::util::watch_ctrl_c();
|
let watch_cancel = netapp::util::watch_ctrl_c();
|
||||||
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
|
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
|
||||||
|
|
||||||
info!("Initializing Garage main data store...");
|
info!("Initializing Garage main data store...");
|
||||||
let garage = Garage::new(config.clone(), db, background);
|
let garage = Garage::new(config.clone(), background)?;
|
||||||
|
|
||||||
info!("Initialize tracing...");
|
info!("Initialize tracing...");
|
||||||
if let Some(export_to) = config.admin.trace_sink {
|
if let Some(export_to) = config.admin.trace_sink {
|
||||||
|
@ -94,6 +44,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
|
||||||
info!("Initialize Admin API server and metrics collector...");
|
info!("Initialize Admin API server and metrics collector...");
|
||||||
let admin_server = AdminApiServer::new(garage.clone());
|
let admin_server = AdminApiServer::new(garage.clone());
|
||||||
|
|
||||||
|
info!("Launching internal Garage cluster communications...");
|
||||||
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
|
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
|
||||||
|
|
||||||
info!("Create admin RPC handler...");
|
info!("Create admin RPC handler...");
|
||||||
|
|
|
@ -6,6 +6,7 @@ use garage_db as db;
|
||||||
|
|
||||||
use garage_util::background::*;
|
use garage_util::background::*;
|
||||||
use garage_util::config::*;
|
use garage_util::config::*;
|
||||||
|
use garage_util::error::Error;
|
||||||
|
|
||||||
use garage_rpc::system::System;
|
use garage_rpc::system::System;
|
||||||
|
|
||||||
|
@ -73,7 +74,56 @@ pub struct GarageK2V {
|
||||||
|
|
||||||
impl Garage {
|
impl Garage {
|
||||||
/// Create and run garage
|
/// Create and run garage
|
||||||
pub fn new(config: Config, db: db::Db, background: Arc<BackgroundRunner>) -> Arc<Self> {
|
pub fn new(config: Config, background: Arc<BackgroundRunner>) -> Result<Arc<Self>, Error> {
|
||||||
|
info!("Opening database...");
|
||||||
|
let mut db_path = config.metadata_dir.clone();
|
||||||
|
std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory");
|
||||||
|
let db = match config.db_engine.as_str() {
|
||||||
|
"sled" => {
|
||||||
|
db_path.push("db");
|
||||||
|
info!("Opening Sled database at: {}", db_path.display());
|
||||||
|
let db = db::sled_adapter::sled::Config::default()
|
||||||
|
.path(&db_path)
|
||||||
|
.cache_capacity(config.sled_cache_capacity)
|
||||||
|
.flush_every_ms(Some(config.sled_flush_every_ms))
|
||||||
|
.open()
|
||||||
|
.expect("Unable to open sled DB");
|
||||||
|
db::sled_adapter::SledDb::init(db)
|
||||||
|
}
|
||||||
|
"sqlite" | "sqlite3" | "rusqlite" => {
|
||||||
|
db_path.push("db.sqlite");
|
||||||
|
info!("Opening Sqlite database at: {}", db_path.display());
|
||||||
|
let db = db::sqlite_adapter::rusqlite::Connection::open(db_path)
|
||||||
|
.expect("Unable to open sqlite DB");
|
||||||
|
db::sqlite_adapter::SqliteDb::init(db)
|
||||||
|
}
|
||||||
|
"lmdb" | "heed" => {
|
||||||
|
db_path.push("db.lmdb");
|
||||||
|
info!("Opening LMDB database at: {}", db_path.display());
|
||||||
|
std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory");
|
||||||
|
let map_size =
|
||||||
|
if u32::MAX as usize == usize::MAX {
|
||||||
|
warn!("LMDB is not recommended on 32-bit systems, database size will be limited");
|
||||||
|
1usize << 30 // 1GB for 32-bit systems
|
||||||
|
} else {
|
||||||
|
1usize << 40 // 1TB for 64-bit systems
|
||||||
|
};
|
||||||
|
|
||||||
|
let db = db::lmdb_adapter::heed::EnvOpenOptions::new()
|
||||||
|
.max_dbs(100)
|
||||||
|
.map_size(map_size)
|
||||||
|
.open(&db_path)
|
||||||
|
.expect("Unable to open LMDB DB");
|
||||||
|
db::lmdb_adapter::LmdbDb::init(db)
|
||||||
|
}
|
||||||
|
e => {
|
||||||
|
return Err(Error::Message(format!(
|
||||||
|
"Unsupported DB engine: {} (options: sled, sqlite, lmdb)",
|
||||||
|
e
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let network_key = NetworkKey::from_slice(
|
let network_key = NetworkKey::from_slice(
|
||||||
&hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..],
|
&hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..],
|
||||||
)
|
)
|
||||||
|
@ -171,9 +221,8 @@ impl Garage {
|
||||||
#[cfg(feature = "k2v")]
|
#[cfg(feature = "k2v")]
|
||||||
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
|
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
|
||||||
|
|
||||||
info!("Initialize Garage...");
|
// -- done --
|
||||||
|
Ok(Arc::new(Self {
|
||||||
Arc::new(Self {
|
|
||||||
config,
|
config,
|
||||||
db,
|
db,
|
||||||
background,
|
background,
|
||||||
|
@ -187,7 +236,7 @@ impl Garage {
|
||||||
block_ref_table,
|
block_ref_table,
|
||||||
#[cfg(feature = "k2v")]
|
#[cfg(feature = "k2v")]
|
||||||
k2v,
|
k2v,
|
||||||
})
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
|
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use core::ops::Bound;
|
||||||
use std::collections::{hash_map, BTreeMap, HashMap};
|
use std::collections::{hash_map, BTreeMap, HashMap};
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -12,9 +13,10 @@ use garage_rpc::ring::Ring;
|
||||||
use garage_rpc::system::System;
|
use garage_rpc::system::System;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
|
use garage_util::time::*;
|
||||||
|
|
||||||
use garage_table::crdt::*;
|
use garage_table::crdt::*;
|
||||||
use garage_table::replication::TableShardedReplication;
|
use garage_table::replication::*;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
|
|
||||||
pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
|
pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
|
||||||
|
@ -139,7 +141,7 @@ impl<T: CountedItem> TableSchema for CounterTable<T> {
|
||||||
pub struct IndexCounter<T: CountedItem> {
|
pub struct IndexCounter<T: CountedItem> {
|
||||||
this_node: Uuid,
|
this_node: Uuid,
|
||||||
local_counter: db::Tree,
|
local_counter: db::Tree,
|
||||||
propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry)>,
|
propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>,
|
||||||
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
|
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -203,17 +205,22 @@ impl<T: CountedItem> IndexCounter<T> {
|
||||||
let tree_key = self.table.data.tree_key(pk, sk);
|
let tree_key = self.table.data.tree_key(pk, sk);
|
||||||
|
|
||||||
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
|
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
|
||||||
Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
|
Some(old_bytes) => {
|
||||||
|
rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes)
|
||||||
.map_err(Error::RmpDecode)
|
.map_err(Error::RmpDecode)
|
||||||
.map_err(db::TxError::Abort)?,
|
.map_err(db::TxError::Abort)?
|
||||||
|
}
|
||||||
None => LocalCounterEntry {
|
None => LocalCounterEntry {
|
||||||
|
pk: pk.clone(),
|
||||||
|
sk: sk.clone(),
|
||||||
values: BTreeMap::new(),
|
values: BTreeMap::new(),
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let now = now_msec();
|
||||||
for (s, inc) in counts.iter() {
|
for (s, inc) in counts.iter() {
|
||||||
let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
|
let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
|
||||||
ent.0 += 1;
|
ent.0 = std::cmp::max(ent.0 + 1, now);
|
||||||
ent.1 += *inc;
|
ent.1 += *inc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -234,7 +241,7 @@ impl<T: CountedItem> IndexCounter<T> {
|
||||||
|
|
||||||
async fn propagate_loop(
|
async fn propagate_loop(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>,
|
mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
|
||||||
must_exit: watch::Receiver<bool>,
|
must_exit: watch::Receiver<bool>,
|
||||||
) {
|
) {
|
||||||
// This loop batches updates to counters to be sent all at once.
|
// This loop batches updates to counters to be sent all at once.
|
||||||
|
@ -257,7 +264,7 @@ impl<T: CountedItem> IndexCounter<T> {
|
||||||
|
|
||||||
if let Some((pk, sk, counters)) = ent {
|
if let Some((pk, sk, counters)) = ent {
|
||||||
let tree_key = self.table.data.tree_key(&pk, &sk);
|
let tree_key = self.table.data.tree_key(&pk, &sk);
|
||||||
let dist_entry = counters.into_counter_entry::<T>(self.this_node, pk, sk);
|
let dist_entry = counters.into_counter_entry(self.this_node);
|
||||||
match buf.entry(tree_key) {
|
match buf.entry(tree_key) {
|
||||||
hash_map::Entry::Vacant(e) => {
|
hash_map::Entry::Vacant(e) => {
|
||||||
e.insert(dist_entry);
|
e.insert(dist_entry);
|
||||||
|
@ -293,23 +300,153 @@ impl<T: CountedItem> IndexCounter<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn offline_recount_all<TS, TR>(
|
||||||
|
&self,
|
||||||
|
counted_table: &Arc<Table<TS, TR>>,
|
||||||
|
) -> Result<(), Error>
|
||||||
|
where
|
||||||
|
TS: TableSchema<E = T>,
|
||||||
|
TR: TableReplication,
|
||||||
|
{
|
||||||
|
let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> {
|
||||||
|
let entry_k = self
|
||||||
|
.table
|
||||||
|
.data
|
||||||
|
.tree_key(entry.partition_key(), entry.sort_key());
|
||||||
|
self.table
|
||||||
|
.data
|
||||||
|
.update_entry_with(&entry_k, |ent| match ent {
|
||||||
|
Some(mut ent) => {
|
||||||
|
ent.merge(&entry);
|
||||||
|
ent
|
||||||
|
}
|
||||||
|
None => entry.clone(),
|
||||||
|
})?;
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
|
||||||
|
// 1. Set all old local counters to zero
|
||||||
|
let now = now_msec();
|
||||||
|
let mut next_start: Option<Vec<u8>> = None;
|
||||||
|
loop {
|
||||||
|
let low_bound = match next_start.take() {
|
||||||
|
Some(v) => Bound::Excluded(v),
|
||||||
|
None => Bound::Unbounded,
|
||||||
|
};
|
||||||
|
let mut batch = vec![];
|
||||||
|
for item in self.local_counter.range((low_bound, Bound::Unbounded))? {
|
||||||
|
batch.push(item?);
|
||||||
|
if batch.len() > 1000 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if batch.is_empty() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (local_counter_k, local_counter) in batch {
|
||||||
|
let mut local_counter =
|
||||||
|
rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?;
|
||||||
|
|
||||||
|
for (_, tv) in local_counter.values.iter_mut() {
|
||||||
|
tv.0 = std::cmp::max(tv.0 + 1, now);
|
||||||
|
tv.1 = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
|
||||||
|
self.local_counter
|
||||||
|
.insert(&local_counter_k, &local_counter_bytes)?;
|
||||||
|
|
||||||
|
let counter_entry = local_counter.into_counter_entry(self.this_node);
|
||||||
|
save_counter_entry(counter_entry)?;
|
||||||
|
|
||||||
|
next_start = Some(local_counter_k);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Recount all table entries
|
||||||
|
let now = now_msec();
|
||||||
|
let mut next_start: Option<Vec<u8>> = None;
|
||||||
|
loop {
|
||||||
|
let low_bound = match next_start.take() {
|
||||||
|
Some(v) => Bound::Excluded(v),
|
||||||
|
None => Bound::Unbounded,
|
||||||
|
};
|
||||||
|
let mut batch = vec![];
|
||||||
|
for item in counted_table
|
||||||
|
.data
|
||||||
|
.store
|
||||||
|
.range((low_bound, Bound::Unbounded))?
|
||||||
|
{
|
||||||
|
batch.push(item?);
|
||||||
|
if batch.len() > 1000 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if batch.is_empty() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (counted_entry_k, counted_entry) in batch {
|
||||||
|
let counted_entry = counted_table.data.decode_entry(&counted_entry)?;
|
||||||
|
|
||||||
|
let pk = counted_entry.counter_partition_key();
|
||||||
|
let sk = counted_entry.counter_sort_key();
|
||||||
|
let counts = counted_entry.counts();
|
||||||
|
|
||||||
|
let local_counter_key = self.table.data.tree_key(pk, sk);
|
||||||
|
let mut local_counter = match self.local_counter.get(&local_counter_key)? {
|
||||||
|
Some(old_bytes) => {
|
||||||
|
let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(
|
||||||
|
&old_bytes,
|
||||||
|
)?;
|
||||||
|
assert!(ent.pk == *pk);
|
||||||
|
assert!(ent.sk == *sk);
|
||||||
|
ent
|
||||||
|
}
|
||||||
|
None => LocalCounterEntry {
|
||||||
|
pk: pk.clone(),
|
||||||
|
sk: sk.clone(),
|
||||||
|
values: BTreeMap::new(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
for (s, v) in counts.iter() {
|
||||||
|
let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0));
|
||||||
|
tv.0 = std::cmp::max(tv.0 + 1, now);
|
||||||
|
tv.1 += v;
|
||||||
|
}
|
||||||
|
|
||||||
|
let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
|
||||||
|
self.local_counter
|
||||||
|
.insert(&local_counter_key, local_counter_bytes)?;
|
||||||
|
|
||||||
|
let counter_entry = local_counter.into_counter_entry(self.this_node);
|
||||||
|
save_counter_entry(counter_entry)?;
|
||||||
|
|
||||||
|
next_start = Some(counted_entry_k);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Done
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
struct LocalCounterEntry {
|
struct LocalCounterEntry<T: CountedItem> {
|
||||||
|
pk: T::CP,
|
||||||
|
sk: T::CS,
|
||||||
values: BTreeMap<String, (u64, i64)>,
|
values: BTreeMap<String, (u64, i64)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LocalCounterEntry {
|
impl<T: CountedItem> LocalCounterEntry<T> {
|
||||||
fn into_counter_entry<T: CountedItem>(
|
fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> {
|
||||||
self,
|
|
||||||
this_node: Uuid,
|
|
||||||
pk: T::CP,
|
|
||||||
sk: T::CS,
|
|
||||||
) -> CounterEntry<T> {
|
|
||||||
CounterEntry {
|
CounterEntry {
|
||||||
pk,
|
pk: self.pk,
|
||||||
sk,
|
sk: self.sk,
|
||||||
values: self
|
values: self
|
||||||
.values
|
.values
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
|
Loading…
Reference in a new issue