various fixes for v0.8.0 #380

Merged
lx merged 4 commits from various-fixes-for-0.8 into main 2022-09-13 14:49:06 +00:00
23 changed files with 81 additions and 59 deletions

View file

@ -295,7 +295,6 @@ fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option<Option<String>> {
let mut ret = None;
for item in cbc.children() {
println!("{:?}", item);
if item.has_tag_name("LocationConstraint") {
if ret != None {
return None;

View file

@ -609,7 +609,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
}
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct CopyObjectResult {
#[serde(rename = "LastModified")]
pub last_modified: s3_xml::Value,
@ -617,7 +617,7 @@ pub struct CopyObjectResult {
pub etag: s3_xml::Value,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct CopyPartResult {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
@ -662,7 +662,6 @@ mod tests {
last_modified: s3_xml::Value("2011-04-11T20:34:56.000Z".into()),
etag: s3_xml::Value("\"9b2cf535f27731c974343645a3985328\"".into()),
};
println!("{}", to_xml_with_header(&v)?);
assert_eq!(to_xml_with_header(&v)?, expected_retval);

View file

@ -25,7 +25,7 @@ impl From<&str> for Value {
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct IntValue(#[serde(rename = "$value")] pub i64);
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct Bucket {
#[serde(rename = "CreationDate")]
pub creation_date: Value,
@ -33,7 +33,7 @@ pub struct Bucket {
pub name: Value,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct Owner {
#[serde(rename = "DisplayName")]
pub display_name: Value,
@ -41,13 +41,13 @@ pub struct Owner {
pub id: Value,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct BucketList {
#[serde(rename = "Bucket")]
pub entries: Vec<Bucket>,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct ListAllMyBucketsResult {
#[serde(rename = "Buckets")]
pub buckets: BucketList,
@ -55,7 +55,7 @@ pub struct ListAllMyBucketsResult {
pub owner: Owner,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct LocationConstraint {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
@ -63,7 +63,7 @@ pub struct LocationConstraint {
pub region: String,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct Deleted {
#[serde(rename = "Key")]
pub key: Value,
@ -73,7 +73,7 @@ pub struct Deleted {
pub delete_marker_version_id: Value,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct Error {
#[serde(rename = "Code")]
pub code: Value,
@ -85,7 +85,7 @@ pub struct Error {
pub region: Option<Value>,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct DeleteError {
#[serde(rename = "Code")]
pub code: Value,
@ -97,7 +97,7 @@ pub struct DeleteError {
pub version_id: Option<Value>,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct DeleteResult {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
@ -107,7 +107,7 @@ pub struct DeleteResult {
pub errors: Vec<DeleteError>,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct InitiateMultipartUploadResult {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
@ -119,7 +119,7 @@ pub struct InitiateMultipartUploadResult {
pub upload_id: Value,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct CompleteMultipartUploadResult {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
@ -133,7 +133,7 @@ pub struct CompleteMultipartUploadResult {
pub etag: Value,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct Initiator {
#[serde(rename = "DisplayName")]
pub display_name: Value,
@ -141,7 +141,7 @@ pub struct Initiator {
pub id: Value,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct ListMultipartItem {
#[serde(rename = "Initiated")]
pub initiated: Value,
@ -157,7 +157,7 @@ pub struct ListMultipartItem {
pub storage_class: Value,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct ListMultipartUploadsResult {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
@ -187,7 +187,7 @@ pub struct ListMultipartUploadsResult {
pub encoding_type: Option<Value>,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct PartItem {
#[serde(rename = "ETag")]
pub etag: Value,
@ -199,7 +199,7 @@ pub struct PartItem {
pub size: IntValue,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct ListPartsResult {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
@ -227,7 +227,7 @@ pub struct ListPartsResult {
pub storage_class: Value,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct ListBucketItem {
#[serde(rename = "Key")]
pub key: Value,
@ -241,13 +241,13 @@ pub struct ListBucketItem {
pub storage_class: Value,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct CommonPrefix {
#[serde(rename = "Prefix")]
pub prefix: Value,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct ListBucketResult {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
@ -281,7 +281,7 @@ pub struct ListBucketResult {
pub common_prefixes: Vec<CommonPrefix>,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct VersioningConfiguration {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
@ -289,7 +289,7 @@ pub struct VersioningConfiguration {
pub status: Option<Value>,
}
#[derive(Debug, Serialize, PartialEq)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub struct PostObject {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),

View file

@ -7,7 +7,7 @@ use garage_table::*;
/// The bucket alias table holds the names given to buckets
/// in the global namespace.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketAlias {
name: String,
pub state: crdt::Lww<Option<Uuid>>,

View file

@ -12,7 +12,7 @@ use crate::permission::BucketKeyPerm;
/// Its parameters are not directly accessible as:
/// - It must be possible to merge paramaters, hence the use of a LWW CRDT.
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket {
/// ID of the bucket
pub id: Uuid,
@ -21,7 +21,7 @@ pub struct Bucket {
}
/// Configuration for a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketParams {
/// Bucket's creation date
pub creation_date: u64,

View file

@ -6,7 +6,7 @@ use garage_db as db;
use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
use garage_util::error::*;
use garage_rpc::system::System;
@ -76,9 +76,14 @@ pub struct GarageK2V {
impl Garage {
/// Create and run garage
pub fn new(config: Config, background: Arc<BackgroundRunner>) -> Result<Arc<Self>, Error> {
// Create meta dir and data dir if they don't exist already
std::fs::create_dir_all(&config.metadata_dir)
.ok_or_message("Unable to create Garage metadata directory")?;
std::fs::create_dir_all(&config.data_dir)
.ok_or_message("Unable to create Garage data directory")?;
info!("Opening database...");
let mut db_path = config.metadata_dir.clone();
std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory");
let db = match config.db_engine.as_str() {
// ---- Sled DB ----
#[cfg(feature = "sled")]
@ -164,7 +169,7 @@ impl Garage {
background.clone(),
replication_mode.replication_factor(),
&config,
);
)?;
let data_rep_param = TableShardedReplication {
system: system.clone(),

View file

@ -81,7 +81,7 @@ impl<T: CountedItem> CounterEntry<T> {
}
/// A counter entry in the global table
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct CounterValue {
pub node_values: BTreeMap<Uuid, (u64, i64)>,
}

View file

@ -9,7 +9,7 @@ use crate::permission::BucketKeyPerm;
use crate::prev::v051::key_table as old;
/// An api key
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Key {
/// The id of the key (immutable), used as partition key
pub key_id: String,
@ -19,7 +19,7 @@ pub struct Key {
}
/// Configuration for a key
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct KeyParams {
/// The secret_key associated (immutable)
pub secret_key: String,

View file

@ -10,7 +10,7 @@ use super::key_table::PermissionSet;
/// Its parameters are not directly accessible as:
/// - It must be possible to merge paramaters, hence the use of a LWW CRDT.
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket {
/// Name of the bucket
pub name: String,
@ -19,7 +19,7 @@ pub struct Bucket {
}
/// State of a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum BucketState {
/// The bucket is deleted
Deleted,
@ -41,7 +41,7 @@ impl Crdt for BucketState {
}
/// Configuration for a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketParams {
/// Map of key with access to the bucket, and what kind of access they give
pub authorized_keys: crdt::LwwMap<String, PermissionSet>,

View file

@ -4,7 +4,7 @@ use garage_table::crdt::*;
use garage_table::*;
/// An api key
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Key {
/// The id of the key (immutable), used as partition key
pub key_id: String,

View file

@ -6,7 +6,7 @@ use garage_util::data::*;
use garage_table::crdt::*;
/// An object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
/// The bucket in which the object is stored, used as partition key
pub bucket: String,
@ -26,7 +26,7 @@ impl Object {
}
/// Informations about a version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion {
/// Id of the version
pub uuid: Uuid,
@ -37,7 +37,7 @@ pub struct ObjectVersion {
}
/// State of an object version
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState {
/// The version is being received
Uploading(ObjectVersionHeaders),

View file

@ -6,7 +6,7 @@ use garage_table::crdt::*;
use garage_table::*;
/// A version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
/// UUID of the version, used as partition key
pub uuid: Uuid,

View file

@ -10,7 +10,7 @@ use garage_table::*;
use garage_block::manager::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct BlockRef {
/// Hash (blake2 sum) of the block, used as partition key
pub block: Hash,

View file

@ -21,7 +21,7 @@ pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads";
pub const BYTES: &str = "bytes";
/// An object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
/// The bucket in which the object is stored, used as partition key
pub bucket_id: Uuid,
@ -70,7 +70,7 @@ impl Object {
}
/// Informations about a version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion {
/// Id of the version
pub uuid: Uuid,
@ -81,7 +81,7 @@ pub struct ObjectVersion {
}
/// State of an object version
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState {
/// The version is being received
Uploading(ObjectVersionHeaders),

View file

@ -15,7 +15,7 @@ use crate::s3::block_ref_table::*;
use crate::prev::v051::version_table as old;
/// A version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
/// UUID of the version, used as partition key
pub uuid: Uuid,

View file

@ -56,7 +56,7 @@ pub async fn get_kubernetes_nodes(
let mut ret = Vec::with_capacity(nodes.items.len());
for node in nodes {
println!("Found Pod: {:?}", node.metadata.name);
info!("Found Pod: {:?}", node.metadata.name);
let pubkey = &node
.metadata

View file

@ -198,7 +198,7 @@ impl System {
background: Arc<BackgroundRunner>,
replication_factor: usize,
config: &Config,
) -> Arc<Self> {
) -> Result<Arc<Self>, Error> {
let node_key =
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
info!(
@ -206,11 +206,21 @@ impl System {
hex::encode(&node_key.public_key()[..8])
);
let persist_cluster_layout = Persister::new(&config.metadata_dir, "cluster_layout");
let persist_cluster_layout: Persister<ClusterLayout> =
Persister::new(&config.metadata_dir, "cluster_layout");
let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
let cluster_layout = match persist_cluster_layout.load() {
Ok(x) => x,
Ok(x) => {
if x.replication_factor != replication_factor {
return Err(Error::Message(format!(
"Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
x.replication_factor,
replication_factor
)));
}
x
}
Err(e) => {
info!(
"No valid previous cluster layout stored ({}), starting fresh.",
@ -303,7 +313,7 @@ impl System {
metadata_dir: config.metadata_dir.clone(),
});
sys.system_endpoint.set_handler(sys.clone());
sys
Ok(sys)
}
/// Perform bootstraping, starting the ping loop
@ -485,7 +495,7 @@ impl System {
let local_info = self.local_status.load();
if local_info.replication_factor < info.replication_factor {
error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and might lead to bugs",
error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and will lead to data corruption. Shutting down for safety.",
info.replication_factor,
local_info.replication_factor);
std::process::exit(1);
@ -513,6 +523,16 @@ impl System {
self: &Arc<Self>,
adv: &ClusterLayout,
) -> Result<SystemRpc, Error> {
if adv.replication_factor != self.replication_factor {
let msg = format!(
"Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
adv.replication_factor,
self.replication_factor
);
error!("{}", msg);
return Err(Error::Message(msg));
}
let update_ring = self.update_ring.lock().await;
let mut layout: ClusterLayout = self.ring.borrow().layout.clone();

View file

@ -113,7 +113,6 @@ where
async fn insert_internal(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
//eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
let rpc = TableRpc::<F>::Update(vec![e_enc]);

View file

@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use crate::crdt::crdt::*;
/// Boolean, where `true` is an absorbing state
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Bool(bool);
impl Bool {

View file

@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use crate::crdt::crdt::*;
/// Deletable object (once deleted, cannot go back)
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum Deletable<T> {
Present(T),
Deleted,

View file

@ -37,7 +37,7 @@ use crate::crdt::crdt::*;
///
/// This scheme is used by AWS S3 or Soundcloud and often without knowing
/// in enterprise when reconciliating databases with ad-hoc scripts.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Lww<T> {
ts: u64,
v: T,

View file

@ -23,7 +23,7 @@ use crate::crdt::crdt::*;
/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
/// actually not losing anything here.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct LwwMap<K, V> {
vals: Vec<(K, u64, V)>,
}

View file

@ -16,7 +16,7 @@ use crate::crdt::crdt::*;
/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
/// actually not losing anything here.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Map<K, V> {
vals: Vec<(K, V)>,
}