forked from Deuxfleurs/garage
Merge pull request 'Fix all typos' (#928) from majst01/garage:fix-typos into main
Reviewed-on: Deuxfleurs/garage#928 Reviewed-by: maximilien <me@mricher.fr>
This commit is contained in:
commit
294cb99409
60 changed files with 116 additions and 116 deletions
|
@ -349,7 +349,7 @@ Check [our s3 compatibility list](@/documentation/reference-manual/s3-compatibil
|
||||||
|
|
||||||
### Other tools for interacting with Garage
|
### Other tools for interacting with Garage
|
||||||
|
|
||||||
The following tools can also be used to send and recieve files from/to Garage:
|
The following tools can also be used to send and receive files from/to Garage:
|
||||||
|
|
||||||
- [minio-client](@/documentation/connect/cli.md#minio-client)
|
- [minio-client](@/documentation/connect/cli.md#minio-client)
|
||||||
- [s3cmd](@/documentation/connect/cli.md#s3cmd)
|
- [s3cmd](@/documentation/connect/cli.md#s3cmd)
|
||||||
|
|
|
@ -61,7 +61,7 @@ directed to a Garage cluster can be handled independently of one another instead
|
||||||
of going through a central bottleneck (the leader node).
|
of going through a central bottleneck (the leader node).
|
||||||
As a consequence, requests can be handled much faster, even in cases where latency
|
As a consequence, requests can be handled much faster, even in cases where latency
|
||||||
between cluster nodes is important (see our [benchmarks](@/documentation/design/benchmarks/index.md) for data on this).
|
between cluster nodes is important (see our [benchmarks](@/documentation/design/benchmarks/index.md) for data on this).
|
||||||
This is particularly usefull when nodes are far from one another and talk to one other through standard Internet connections.
|
This is particularly useful when nodes are far from one another and talk to one other through standard Internet connections.
|
||||||
|
|
||||||
### Web server for static websites
|
### Web server for static websites
|
||||||
|
|
||||||
|
|
|
@ -392,7 +392,7 @@ table_merkle_updater_todo_queue_length{table_name="block_ref"} 0
|
||||||
|
|
||||||
#### `table_sync_items_received`, `table_sync_items_sent` (counters)
|
#### `table_sync_items_received`, `table_sync_items_sent` (counters)
|
||||||
|
|
||||||
Number of data items sent to/recieved from other nodes during resync procedures
|
Number of data items sent to/received from other nodes during resync procedures
|
||||||
|
|
||||||
```
|
```
|
||||||
table_sync_items_received{from="<remote node>",table_name="bucket_v2"} 3
|
table_sync_items_received{from="<remote node>",table_name="bucket_v2"} 3
|
||||||
|
|
|
@ -42,7 +42,7 @@ The general principle are similar, but details have not been updated.**
|
||||||
A version is defined by the existence of at least one entry in the blocks table for a certain version UUID.
|
A version is defined by the existence of at least one entry in the blocks table for a certain version UUID.
|
||||||
We must keep the following invariant: if a version exists in the blocks table, it has to be referenced in the objects table.
|
We must keep the following invariant: if a version exists in the blocks table, it has to be referenced in the objects table.
|
||||||
We explicitly manage concurrent versions of an object: the version timestamp and version UUID columns are index columns, thus we may have several concurrent versions of an object.
|
We explicitly manage concurrent versions of an object: the version timestamp and version UUID columns are index columns, thus we may have several concurrent versions of an object.
|
||||||
Important: before deleting an older version from the objects table, we must make sure that we did a successfull delete of the blocks of that version from the blocks table.
|
Important: before deleting an older version from the objects table, we must make sure that we did a successful delete of the blocks of that version from the blocks table.
|
||||||
|
|
||||||
Thus, the workflow for reading an object is as follows:
|
Thus, the workflow for reading an object is as follows:
|
||||||
|
|
||||||
|
@ -95,7 +95,7 @@ Known issue: if someone is reading from a version that we want to delete and the
|
||||||
Usefull metadata:
|
Usefull metadata:
|
||||||
|
|
||||||
- list of versions that reference this block in the Casandra table, so that we can do GC by checking in Cassandra that the lines still exist
|
- list of versions that reference this block in the Casandra table, so that we can do GC by checking in Cassandra that the lines still exist
|
||||||
- list of other nodes that we know have acknowledged a write of this block, usefull in the rebalancing algorithm
|
- list of other nodes that we know have acknowledged a write of this block, useful in the rebalancing algorithm
|
||||||
|
|
||||||
Write strategy: have a single thread that does all write IO so that it is serialized (or have several threads that manage independent parts of the hash space). When writing a blob, write it to a temporary file, close, then rename so that a concurrent read gets a consistent result (either not found or found with whole content).
|
Write strategy: have a single thread that does all write IO so that it is serialized (or have several threads that manage independent parts of the hash space). When writing a blob, write it to a temporary file, close, then rename so that a concurrent read gets a consistent result (either not found or found with whole content).
|
||||||
|
|
||||||
|
|
|
@ -68,7 +68,7 @@ The migration steps are as follows:
|
||||||
5. Turn off Garage 0.3
|
5. Turn off Garage 0.3
|
||||||
|
|
||||||
6. Backup metadata folders if you can (i.e. if you have space to do it
|
6. Backup metadata folders if you can (i.e. if you have space to do it
|
||||||
somewhere). Backuping data folders could also be usefull but that's much
|
somewhere). Backuping data folders could also be useful but that's much
|
||||||
harder to do. If your filesystem supports snapshots, this could be a good
|
harder to do. If your filesystem supports snapshots, this could be a good
|
||||||
time to use them.
|
time to use them.
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,7 @@ There are two reasons for this:
|
||||||
|
|
||||||
Reminder: rules of simplicity, concerning changes to Garage's source code.
|
Reminder: rules of simplicity, concerning changes to Garage's source code.
|
||||||
Always question what we are doing.
|
Always question what we are doing.
|
||||||
Never do anything just because it looks nice or because we "think" it might be usefull at some later point but without knowing precisely why/when.
|
Never do anything just because it looks nice or because we "think" it might be useful at some later point but without knowing precisely why/when.
|
||||||
Only do things that make perfect sense in the context of what we currently know.
|
Only do things that make perfect sense in the context of what we currently know.
|
||||||
|
|
||||||
## References
|
## References
|
||||||
|
|
|
@ -562,7 +562,7 @@ token>", v: ["<value1>", ...] }`, with the following fields:
|
||||||
- in case of concurrent update and deletion, a `null` is added to the list of concurrent values
|
- in case of concurrent update and deletion, a `null` is added to the list of concurrent values
|
||||||
|
|
||||||
- if the `tombstones` query parameter is set to `true`, tombstones are returned
|
- if the `tombstones` query parameter is set to `true`, tombstones are returned
|
||||||
for items that have been deleted (this can be usefull for inserting after an
|
for items that have been deleted (this can be useful for inserting after an
|
||||||
item that has been deleted, so that the insert is not considered
|
item that has been deleted, so that the insert is not considered
|
||||||
concurrent with the delete). Tombstones are returned as tuples in the
|
concurrent with the delete). Tombstones are returned as tuples in the
|
||||||
same format with only `null` values
|
same format with only `null` values
|
||||||
|
|
|
@ -77,7 +77,7 @@ impl ApiHandler for K2VApiServer {
|
||||||
} = endpoint;
|
} = endpoint;
|
||||||
let garage = self.garage.clone();
|
let garage = self.garage.clone();
|
||||||
|
|
||||||
// The OPTIONS method is procesed early, before we even check for an API key
|
// The OPTIONS method is processed early, before we even check for an API key
|
||||||
if let Endpoint::Options = endpoint {
|
if let Endpoint::Options = endpoint {
|
||||||
let options_res = handle_options_api(garage, &req, Some(bucket_name))
|
let options_res = handle_options_api(garage, &req, Some(bucket_name))
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -204,7 +204,7 @@ macro_rules! generateQueryParameters {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get an error message in case not all parameters where used when extracting them to
|
/// Get an error message in case not all parameters where used when extracting them to
|
||||||
/// build an Enpoint variant
|
/// build an Endpoint variant
|
||||||
fn nonempty_message(&self) -> Option<&str> {
|
fn nonempty_message(&self) -> Option<&str> {
|
||||||
if self.keyword.is_some() {
|
if self.keyword.is_some() {
|
||||||
Some("Keyword not used")
|
Some("Keyword not used")
|
||||||
|
|
|
@ -340,8 +340,8 @@ pub(crate) fn request_checksum_value(
|
||||||
Ok(ret.pop())
|
Ok(ret.pop())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Checks for the presense of x-amz-checksum-algorithm
|
/// Checks for the presence of x-amz-checksum-algorithm
|
||||||
/// if so extract the corrseponding x-amz-checksum-* value
|
/// if so extract the corresponding x-amz-checksum-* value
|
||||||
pub(crate) fn request_checksum_algorithm_value(
|
pub(crate) fn request_checksum_algorithm_value(
|
||||||
headers: &HeaderMap<HeaderValue>,
|
headers: &HeaderMap<HeaderValue>,
|
||||||
) -> Result<Option<ChecksumValue>, Error> {
|
) -> Result<Option<ChecksumValue>, Error> {
|
||||||
|
|
|
@ -63,7 +63,7 @@ pub async fn handle_copy(
|
||||||
let source_checksum_algorithm = source_checksum.map(|x| x.algorithm());
|
let source_checksum_algorithm = source_checksum.map(|x| x.algorithm());
|
||||||
|
|
||||||
// If source object has a checksum, the destination object must as well.
|
// If source object has a checksum, the destination object must as well.
|
||||||
// The x-amz-checksum-algorihtm header allows to change that algorithm,
|
// The x-amz-checksum-algorithm header allows to change that algorithm,
|
||||||
// but if it is absent, we must use the same as before
|
// but if it is absent, we must use the same as before
|
||||||
let checksum_algorithm = checksum_algorithm.or(source_checksum_algorithm);
|
let checksum_algorithm = checksum_algorithm.or(source_checksum_algorithm);
|
||||||
|
|
||||||
|
|
|
@ -398,7 +398,7 @@ enum ExtractionResult {
|
||||||
key: String,
|
key: String,
|
||||||
},
|
},
|
||||||
// Fallback key is used for legacy APIs that only support
|
// Fallback key is used for legacy APIs that only support
|
||||||
// exlusive pagination (and not inclusive one).
|
// exclusive pagination (and not inclusive one).
|
||||||
SkipTo {
|
SkipTo {
|
||||||
key: String,
|
key: String,
|
||||||
fallback_key: Option<String>,
|
fallback_key: Option<String>,
|
||||||
|
@ -408,7 +408,7 @@ enum ExtractionResult {
|
||||||
#[derive(PartialEq, Clone, Debug)]
|
#[derive(PartialEq, Clone, Debug)]
|
||||||
enum RangeBegin {
|
enum RangeBegin {
|
||||||
// Fallback key is used for legacy APIs that only support
|
// Fallback key is used for legacy APIs that only support
|
||||||
// exlusive pagination (and not inclusive one).
|
// exclusive pagination (and not inclusive one).
|
||||||
IncludingKey {
|
IncludingKey {
|
||||||
key: String,
|
key: String,
|
||||||
fallback_key: Option<String>,
|
fallback_key: Option<String>,
|
||||||
|
|
|
@ -213,7 +213,7 @@ pub async fn handle_post_object(
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we ever start supporting ACLs, we likely want to map "acl" to x-amz-acl" somewhere
|
// if we ever start supporting ACLs, we likely want to map "acl" to x-amz-acl" somewhere
|
||||||
// arround here to make sure the rest of the machinery takes our acl into account.
|
// around here to make sure the rest of the machinery takes our acl into account.
|
||||||
let headers = get_headers(¶ms)?;
|
let headers = get_headers(¶ms)?;
|
||||||
|
|
||||||
let expected_checksums = ExpectedChecksums {
|
let expected_checksums = ExpectedChecksums {
|
||||||
|
|
|
@ -276,7 +276,7 @@ impl Redirect {
|
||||||
return Err(Error::bad_request("Bad XML: invalid protocol"));
|
return Err(Error::bad_request("Bad XML: invalid protocol"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// TODO there are probably more invalide cases, but which ones?
|
// TODO there are probably more invalid cases, but which ones?
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,8 +47,8 @@ pub async fn check_payload_signature(
|
||||||
let query = parse_query_map(request.uri())?;
|
let query = parse_query_map(request.uri())?;
|
||||||
|
|
||||||
if query.contains_key(&X_AMZ_ALGORITHM) {
|
if query.contains_key(&X_AMZ_ALGORITHM) {
|
||||||
// We check for presigned-URL-style authentification first, because
|
// We check for presigned-URL-style authentication first, because
|
||||||
// the browser or someting else could inject an Authorization header
|
// the browser or something else could inject an Authorization header
|
||||||
// that is totally unrelated to AWS signatures.
|
// that is totally unrelated to AWS signatures.
|
||||||
check_presigned_signature(garage, service, request, query).await
|
check_presigned_signature(garage, service, request, query).await
|
||||||
} else if request.headers().contains_key(AUTHORIZATION) {
|
} else if request.headers().contains_key(AUTHORIZATION) {
|
||||||
|
@ -132,7 +132,7 @@ async fn check_presigned_signature(
|
||||||
let authorization = Authorization::parse_presigned(&algorithm.value, &query)?;
|
let authorization = Authorization::parse_presigned(&algorithm.value, &query)?;
|
||||||
|
|
||||||
// Verify that all necessary request headers are included in signed_headers
|
// Verify that all necessary request headers are included in signed_headers
|
||||||
// For AWSv4 pre-signed URLs, the following must be incldued:
|
// For AWSv4 pre-signed URLs, the following must be included:
|
||||||
// - the Host header (mandatory)
|
// - the Host header (mandatory)
|
||||||
// - all x-amz-* headers used in the request
|
// - all x-amz-* headers used in the request
|
||||||
let signed_headers = split_signed_headers(&authorization)?;
|
let signed_headers = split_signed_headers(&authorization)?;
|
||||||
|
@ -306,7 +306,7 @@ pub fn canonical_request(
|
||||||
// Note that there is also the issue of path normalization, which I hope is unrelated to the
|
// Note that there is also the issue of path normalization, which I hope is unrelated to the
|
||||||
// one of URI-encoding. At least in aws-sigv4 both parameters can be set independently,
|
// one of URI-encoding. At least in aws-sigv4 both parameters can be set independently,
|
||||||
// and rusoto_signature does not seem to do any effective path normalization, even though
|
// and rusoto_signature does not seem to do any effective path normalization, even though
|
||||||
// it mentions it in the comments (same link to the souce code as above).
|
// it mentions it in the comments (same link to the source code as above).
|
||||||
// We make the explicit choice of NOT normalizing paths in the K2V API because doing so
|
// We make the explicit choice of NOT normalizing paths in the K2V API because doing so
|
||||||
// would make non-normalized paths invalid K2V partition keys, and we don't want that.
|
// would make non-normalized paths invalid K2V partition keys, and we don't want that.
|
||||||
let canonical_uri: std::borrow::Cow<str> = if service != "s3" {
|
let canonical_uri: std::borrow::Cow<str> = if service != "s3" {
|
||||||
|
|
|
@ -105,7 +105,7 @@ impl BlockResyncManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get lenght of resync queue
|
/// Get length of resync queue
|
||||||
pub fn queue_len(&self) -> Result<usize, Error> {
|
pub fn queue_len(&self) -> Result<usize, Error> {
|
||||||
Ok(self.queue.len()?)
|
Ok(self.queue.len()?)
|
||||||
}
|
}
|
||||||
|
@ -185,10 +185,10 @@ impl BlockResyncManager {
|
||||||
//
|
//
|
||||||
// - resync.errors: a tree that indicates for each block
|
// - resync.errors: a tree that indicates for each block
|
||||||
// if the last resync resulted in an error, and if so,
|
// if the last resync resulted in an error, and if so,
|
||||||
// the following two informations (see the ErrorCounter struct):
|
// the following two information (see the ErrorCounter struct):
|
||||||
// - how many consecutive resync errors for this block?
|
// - how many consecutive resync errors for this block?
|
||||||
// - when was the last try?
|
// - when was the last try?
|
||||||
// These two informations are used to implement an
|
// These two information are used to implement an
|
||||||
// exponential backoff retry strategy.
|
// exponential backoff retry strategy.
|
||||||
// The key in this tree is the 32-byte hash of the block,
|
// The key in this tree is the 32-byte hash of the block,
|
||||||
// and the value is the encoded ErrorCounter value.
|
// and the value is the encoded ErrorCounter value.
|
||||||
|
|
|
@ -122,7 +122,7 @@ impl Db {
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
},
|
},
|
||||||
Err(TxError::Db(e2)) => match ret {
|
Err(TxError::Db(e2)) => match ret {
|
||||||
// Ok was stored -> the error occured when finalizing
|
// Ok was stored -> the error occurred when finalizing
|
||||||
// transaction
|
// transaction
|
||||||
Ok(_) => Err(TxError::Db(e2)),
|
Ok(_) => Err(TxError::Db(e2)),
|
||||||
// An error was already stored: that's the one we want to
|
// An error was already stored: that's the one we want to
|
||||||
|
|
|
@ -233,7 +233,7 @@ impl<'a> LmdbTx<'a> {
|
||||||
fn get_tree(&self, i: usize) -> TxOpResult<&Database> {
|
fn get_tree(&self, i: usize) -> TxOpResult<&Database> {
|
||||||
self.trees.get(i).ok_or_else(|| {
|
self.trees.get(i).ok_or_else(|| {
|
||||||
TxOpError(Error(
|
TxOpError(Error(
|
||||||
"invalid tree id (it might have been openned after the transaction started)".into(),
|
"invalid tree id (it might have been opened after the transaction started)".into(),
|
||||||
))
|
))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,7 +142,7 @@ impl IDb for SqliteDb {
|
||||||
fn snapshot(&self, to: &PathBuf) -> Result<()> {
|
fn snapshot(&self, to: &PathBuf) -> Result<()> {
|
||||||
fn progress(p: rusqlite::backup::Progress) {
|
fn progress(p: rusqlite::backup::Progress) {
|
||||||
let percent = (p.pagecount - p.remaining) * 100 / p.pagecount;
|
let percent = (p.pagecount - p.remaining) * 100 / p.pagecount;
|
||||||
info!("Sqlite snapshot progres: {}%", percent);
|
info!("Sqlite snapshot progress: {}%", percent);
|
||||||
}
|
}
|
||||||
self.db
|
self.db
|
||||||
.get()?
|
.get()?
|
||||||
|
@ -304,7 +304,7 @@ impl<'a> SqliteTx<'a> {
|
||||||
fn get_tree(&self, i: usize) -> TxOpResult<&'_ str> {
|
fn get_tree(&self, i: usize) -> TxOpResult<&'_ str> {
|
||||||
self.trees.get(i).map(Arc::as_ref).ok_or_else(|| {
|
self.trees.get(i).map(Arc::as_ref).ok_or_else(|| {
|
||||||
TxOpError(Error(
|
TxOpError(Error(
|
||||||
"invalid tree id (it might have been openned after the transaction started)".into(),
|
"invalid tree id (it might have been opened after the transaction started)".into(),
|
||||||
))
|
))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -129,7 +129,7 @@ pub async fn cmd_assign_role(
|
||||||
zone: args
|
zone: args
|
||||||
.zone
|
.zone
|
||||||
.clone()
|
.clone()
|
||||||
.ok_or("Please specifiy a zone with the -z flag")?,
|
.ok_or("Please specify a zone with the -z flag")?,
|
||||||
capacity,
|
capacity,
|
||||||
tags: args.tags.clone(),
|
tags: args.tags.clone(),
|
||||||
}
|
}
|
||||||
|
@ -145,7 +145,7 @@ pub async fn cmd_assign_role(
|
||||||
|
|
||||||
send_layout(rpc_cli, rpc_host, layout).await?;
|
send_layout(rpc_cli, rpc_host, layout).await?;
|
||||||
|
|
||||||
println!("Role changes are staged but not yet commited.");
|
println!("Role changes are staged but not yet committed.");
|
||||||
println!("Use `garage layout show` to view staged role changes,");
|
println!("Use `garage layout show` to view staged role changes,");
|
||||||
println!("and `garage layout apply` to enact staged changes.");
|
println!("and `garage layout apply` to enact staged changes.");
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -172,7 +172,7 @@ pub async fn cmd_remove_role(
|
||||||
|
|
||||||
send_layout(rpc_cli, rpc_host, layout).await?;
|
send_layout(rpc_cli, rpc_host, layout).await?;
|
||||||
|
|
||||||
println!("Role removal is staged but not yet commited.");
|
println!("Role removal is staged but not yet committed.");
|
||||||
println!("Use `garage layout show` to view staged role changes,");
|
println!("Use `garage layout show` to view staged role changes,");
|
||||||
println!("and `garage layout apply` to enact staged changes.");
|
println!("and `garage layout apply` to enact staged changes.");
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -184,7 +184,7 @@ pub struct SkipDeadNodesOpt {
|
||||||
/// This will generally be the current layout version.
|
/// This will generally be the current layout version.
|
||||||
#[structopt(long = "version")]
|
#[structopt(long = "version")]
|
||||||
pub(crate) version: u64,
|
pub(crate) version: u64,
|
||||||
/// Allow the skip even if a quorum of ndoes could not be found for
|
/// Allow the skip even if a quorum of nodes could not be found for
|
||||||
/// the data among the remaining nodes
|
/// the data among the remaining nodes
|
||||||
#[structopt(long = "allow-missing-data")]
|
#[structopt(long = "allow-missing-data")]
|
||||||
pub(crate) allow_missing_data: bool,
|
pub(crate) allow_missing_data: bool,
|
||||||
|
|
|
@ -107,7 +107,7 @@ async fn main() {
|
||||||
);
|
);
|
||||||
|
|
||||||
// Initialize panic handler that aborts on panic and shows a nice message.
|
// Initialize panic handler that aborts on panic and shows a nice message.
|
||||||
// By default, Tokio continues runing normally when a task panics. We want
|
// By default, Tokio continues running normally when a task panics. We want
|
||||||
// to avoid this behavior in Garage as this would risk putting the process in an
|
// to avoid this behavior in Garage as this would risk putting the process in an
|
||||||
// unknown/uncontrollable state. We prefer to exit the process and restart it
|
// unknown/uncontrollable state. We prefer to exit the process and restart it
|
||||||
// from scratch, so that it boots back into a fresh, known state.
|
// from scratch, so that it boots back into a fresh, known state.
|
||||||
|
|
|
@ -104,7 +104,7 @@ pub(crate) fn fill_secret(
|
||||||
|
|
||||||
if let Some(val) = cli_value {
|
if let Some(val) = cli_value {
|
||||||
if config_secret.is_some() || config_secret_file.is_some() {
|
if config_secret.is_some() || config_secret_file.is_some() {
|
||||||
debug!("Overriding secret `{}` using value specified using CLI argument or environnement variable.", name);
|
debug!("Overriding secret `{}` using value specified using CLI argument or environment variable.", name);
|
||||||
}
|
}
|
||||||
|
|
||||||
*config_secret = Some(val);
|
*config_secret = Some(val);
|
||||||
|
|
|
@ -153,7 +153,7 @@ impl<'a> RequestBuilder<'a> {
|
||||||
|
|
||||||
pub async fn send(&mut self) -> Result<Response<Body>, String> {
|
pub async fn send(&mut self) -> Result<Response<Body>, String> {
|
||||||
// TODO this is a bit incorrect in that path and query params should be url-encoded and
|
// TODO this is a bit incorrect in that path and query params should be url-encoded and
|
||||||
// aren't, but this is good enought for now.
|
// aren't, but this is good enough for now.
|
||||||
|
|
||||||
let query = query_param_to_string(&self.query_params);
|
let query = query_param_to_string(&self.query_params);
|
||||||
let (host, path) = if self.vhost_style {
|
let (host, path) = if self.vhost_style {
|
||||||
|
@ -210,9 +210,9 @@ impl<'a> RequestBuilder<'a> {
|
||||||
HeaderName::from_static("x-amz-decoded-content-length"),
|
HeaderName::from_static("x-amz-decoded-content-length"),
|
||||||
HeaderValue::from_str(&self.body.len().to_string()).unwrap(),
|
HeaderValue::from_str(&self.body.len().to_string()).unwrap(),
|
||||||
);
|
);
|
||||||
// Get lenght of body by doing the conversion to a streaming body with an
|
// Get length of body by doing the conversion to a streaming body with an
|
||||||
// invalid signature (we don't know the seed) just to get its length. This
|
// invalid signature (we don't know the seed) just to get its length. This
|
||||||
// is a pretty lazy and inefficient way to do it, but it's enought for test
|
// is a pretty lazy and inefficient way to do it, but it's enough for test
|
||||||
// code.
|
// code.
|
||||||
all_headers.insert(
|
all_headers.insert(
|
||||||
CONTENT_LENGTH,
|
CONTENT_LENGTH,
|
||||||
|
|
|
@ -54,7 +54,7 @@ enum Command {
|
||||||
partition_key: String,
|
partition_key: String,
|
||||||
/// Sort key to read from
|
/// Sort key to read from
|
||||||
sort_key: String,
|
sort_key: String,
|
||||||
/// Output formating
|
/// Output formatting
|
||||||
#[clap(flatten)]
|
#[clap(flatten)]
|
||||||
output_kind: ReadOutputKind,
|
output_kind: ReadOutputKind,
|
||||||
},
|
},
|
||||||
|
@ -70,7 +70,7 @@ enum Command {
|
||||||
/// Timeout, in seconds
|
/// Timeout, in seconds
|
||||||
#[clap(short = 'T', long)]
|
#[clap(short = 'T', long)]
|
||||||
timeout: Option<u64>,
|
timeout: Option<u64>,
|
||||||
/// Output formating
|
/// Output formatting
|
||||||
#[clap(flatten)]
|
#[clap(flatten)]
|
||||||
output_kind: ReadOutputKind,
|
output_kind: ReadOutputKind,
|
||||||
},
|
},
|
||||||
|
@ -87,7 +87,7 @@ enum Command {
|
||||||
/// Timeout, in seconds
|
/// Timeout, in seconds
|
||||||
#[clap(short = 'T', long)]
|
#[clap(short = 'T', long)]
|
||||||
timeout: Option<u64>,
|
timeout: Option<u64>,
|
||||||
/// Output formating
|
/// Output formatting
|
||||||
#[clap(flatten)]
|
#[clap(flatten)]
|
||||||
output_kind: BatchOutputKind,
|
output_kind: BatchOutputKind,
|
||||||
},
|
},
|
||||||
|
@ -103,7 +103,7 @@ enum Command {
|
||||||
},
|
},
|
||||||
/// List partition keys
|
/// List partition keys
|
||||||
ReadIndex {
|
ReadIndex {
|
||||||
/// Output formating
|
/// Output formatting
|
||||||
#[clap(flatten)]
|
#[clap(flatten)]
|
||||||
output_kind: BatchOutputKind,
|
output_kind: BatchOutputKind,
|
||||||
/// Output only partition keys matching this filter
|
/// Output only partition keys matching this filter
|
||||||
|
@ -114,7 +114,7 @@ enum Command {
|
||||||
ReadRange {
|
ReadRange {
|
||||||
/// Partition key to read from
|
/// Partition key to read from
|
||||||
partition_key: String,
|
partition_key: String,
|
||||||
/// Output formating
|
/// Output formatting
|
||||||
#[clap(flatten)]
|
#[clap(flatten)]
|
||||||
output_kind: BatchOutputKind,
|
output_kind: BatchOutputKind,
|
||||||
/// Output only sort keys matching this filter
|
/// Output only sort keys matching this filter
|
||||||
|
@ -125,7 +125,7 @@ enum Command {
|
||||||
DeleteRange {
|
DeleteRange {
|
||||||
/// Partition key to delete from
|
/// Partition key to delete from
|
||||||
partition_key: String,
|
partition_key: String,
|
||||||
/// Output formating
|
/// Output formatting
|
||||||
#[clap(flatten)]
|
#[clap(flatten)]
|
||||||
output_kind: BatchOutputKind,
|
output_kind: BatchOutputKind,
|
||||||
/// Delete only sort keys matching this filter
|
/// Delete only sort keys matching this filter
|
||||||
|
@ -185,10 +185,10 @@ struct ReadOutputKind {
|
||||||
/// Raw output. Conflicts generate error, causality token is not returned
|
/// Raw output. Conflicts generate error, causality token is not returned
|
||||||
#[clap(short, long, group = "output-kind")]
|
#[clap(short, long, group = "output-kind")]
|
||||||
raw: bool,
|
raw: bool,
|
||||||
/// Human formated output
|
/// Human formatted output
|
||||||
#[clap(short = 'H', long, group = "output-kind")]
|
#[clap(short = 'H', long, group = "output-kind")]
|
||||||
human: bool,
|
human: bool,
|
||||||
/// JSON formated output
|
/// JSON formatted output
|
||||||
#[clap(short, long, group = "output-kind")]
|
#[clap(short, long, group = "output-kind")]
|
||||||
json: bool,
|
json: bool,
|
||||||
}
|
}
|
||||||
|
@ -207,7 +207,7 @@ impl ReadOutputKind {
|
||||||
let mut val = val.value;
|
let mut val = val.value;
|
||||||
if val.len() != 1 {
|
if val.len() != 1 {
|
||||||
eprintln!(
|
eprintln!(
|
||||||
"Raw mode can only read non-concurent values, found {} values, expected 1",
|
"Raw mode can only read non-concurrent values, found {} values, expected 1",
|
||||||
val.len()
|
val.len()
|
||||||
);
|
);
|
||||||
exit(1);
|
exit(1);
|
||||||
|
@ -265,10 +265,10 @@ impl ReadOutputKind {
|
||||||
#[derive(Parser, Debug)]
|
#[derive(Parser, Debug)]
|
||||||
#[clap(group = clap::ArgGroup::new("output-kind").multiple(false).required(false))]
|
#[clap(group = clap::ArgGroup::new("output-kind").multiple(false).required(false))]
|
||||||
struct BatchOutputKind {
|
struct BatchOutputKind {
|
||||||
/// Human formated output
|
/// Human formatted output
|
||||||
#[clap(short = 'H', long, group = "output-kind")]
|
#[clap(short = 'H', long, group = "output-kind")]
|
||||||
human: bool,
|
human: bool,
|
||||||
/// JSON formated output
|
/// JSON formatted output
|
||||||
#[clap(short, long, group = "output-kind")]
|
#[clap(short, long, group = "output-kind")]
|
||||||
json: bool,
|
json: bool,
|
||||||
}
|
}
|
||||||
|
|
|
@ -336,7 +336,7 @@ impl K2vClient {
|
||||||
.collect())
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Perform a DeleteBatch request, deleting mutiple values or range of values at once, without
|
/// Perform a DeleteBatch request, deleting multiple values or range of values at once, without
|
||||||
/// providing causality information.
|
/// providing causality information.
|
||||||
pub async fn delete_batch(&self, operations: &[BatchDeleteOp<'_>]) -> Result<Vec<u64>, Error> {
|
pub async fn delete_batch(&self, operations: &[BatchDeleteOp<'_>]) -> Result<Vec<u64>, Error> {
|
||||||
let url = self.build_url(None, &[("delete", "")]);
|
let url = self.build_url(None, &[("delete", "")]);
|
||||||
|
|
|
@ -89,9 +89,9 @@ pub fn is_valid_bucket_name(n: &str) -> bool {
|
||||||
// Bucket names must start and end with a letter or a number
|
// Bucket names must start and end with a letter or a number
|
||||||
&& !n.starts_with(&['-', '.'][..])
|
&& !n.starts_with(&['-', '.'][..])
|
||||||
&& !n.ends_with(&['-', '.'][..])
|
&& !n.ends_with(&['-', '.'][..])
|
||||||
// Bucket names must not be formated as an IP address
|
// Bucket names must not be formatted as an IP address
|
||||||
&& n.parse::<std::net::IpAddr>().is_err()
|
&& n.parse::<std::net::IpAddr>().is_err()
|
||||||
// Bucket names must not start wih "xn--"
|
// Bucket names must not start with "xn--"
|
||||||
&& !n.starts_with("xn--")
|
&& !n.starts_with("xn--")
|
||||||
// Bucket names must not end with "-s3alias"
|
// Bucket names must not end with "-s3alias"
|
||||||
&& !n.ends_with("-s3alias")
|
&& !n.ends_with("-s3alias")
|
||||||
|
|
|
@ -14,7 +14,7 @@ mod v08 {
|
||||||
/// A bucket is a collection of objects
|
/// A bucket is a collection of objects
|
||||||
///
|
///
|
||||||
/// Its parameters are not directly accessible as:
|
/// Its parameters are not directly accessible as:
|
||||||
/// - It must be possible to merge paramaters, hence the use of a LWW CRDT.
|
/// - It must be possible to merge parameters, hence the use of a LWW CRDT.
|
||||||
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
|
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
|
||||||
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct Bucket {
|
pub struct Bucket {
|
||||||
|
@ -126,7 +126,7 @@ impl AutoCrdt for BucketQuotas {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BucketParams {
|
impl BucketParams {
|
||||||
/// Create an empty BucketParams with no authorized keys and no website accesss
|
/// Create an empty BucketParams with no authorized keys and no website access
|
||||||
fn new() -> Self {
|
fn new() -> Self {
|
||||||
BucketParams {
|
BucketParams {
|
||||||
creation_date: now_msec(),
|
creation_date: now_msec(),
|
||||||
|
|
|
@ -231,7 +231,7 @@ impl<'a> LockedHelper<'a> {
|
||||||
let bucket_p_local_alias_key = (key.key_id.clone(), alias_name.clone());
|
let bucket_p_local_alias_key = (key.key_id.clone(), alias_name.clone());
|
||||||
|
|
||||||
// Calculate the timestamp to assign to this aliasing in the two local_aliases maps
|
// Calculate the timestamp to assign to this aliasing in the two local_aliases maps
|
||||||
// (the one from key to bucket, and the reverse one stored in the bucket iself)
|
// (the one from key to bucket, and the reverse one stored in the bucket itself)
|
||||||
// so that merges on both maps in case of a concurrent operation resolve
|
// so that merges on both maps in case of a concurrent operation resolve
|
||||||
// to the same alias being set
|
// to the same alias being set
|
||||||
let alias_ts = increment_logical_clock_2(
|
let alias_ts = increment_logical_clock_2(
|
||||||
|
|
|
@ -310,7 +310,7 @@ impl K2VRpcHandler {
|
||||||
// - we have a response to a read quorum of requests (e.g. 2/3), and an extra delay
|
// - we have a response to a read quorum of requests (e.g. 2/3), and an extra delay
|
||||||
// has passed since the quorum was achieved
|
// has passed since the quorum was achieved
|
||||||
// - a global RPC timeout expired
|
// - a global RPC timeout expired
|
||||||
// The extra delay after a quorum was received is usefull if the third response was to
|
// The extra delay after a quorum was received is useful if the third response was to
|
||||||
// arrive during this short interval: this would allow us to consider all the data seen
|
// arrive during this short interval: this would allow us to consider all the data seen
|
||||||
// by that last node in the response we produce, and would likely help reduce the
|
// by that last node in the response we produce, and would likely help reduce the
|
||||||
// size of the seen marker that we will return (because we would have an info of the
|
// size of the seen marker that we will return (because we would have an info of the
|
||||||
|
@ -500,7 +500,7 @@ impl K2VRpcHandler {
|
||||||
} else {
|
} else {
|
||||||
// If no seen marker was specified, we do not poll for anything.
|
// If no seen marker was specified, we do not poll for anything.
|
||||||
// We return immediately with the set of known items (even if
|
// We return immediately with the set of known items (even if
|
||||||
// it is empty), which will give the client an inital view of
|
// it is empty), which will give the client an initial view of
|
||||||
// the dataset and an initial seen marker for further
|
// the dataset and an initial seen marker for further
|
||||||
// PollRange calls.
|
// PollRange calls.
|
||||||
self.poll_range_read_range(range, &RangeSeenMarker::default())
|
self.poll_range_read_range(range, &RangeSeenMarker::default())
|
||||||
|
|
|
@ -31,11 +31,11 @@ mod v08 {
|
||||||
/// The key at which the object is stored in its bucket, used as sorting key
|
/// The key at which the object is stored in its bucket, used as sorting key
|
||||||
pub key: String,
|
pub key: String,
|
||||||
|
|
||||||
/// The list of currenty stored versions of the object
|
/// The list of currently stored versions of the object
|
||||||
pub(super) versions: Vec<ObjectVersion>,
|
pub(super) versions: Vec<ObjectVersion>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Informations about a version of an object
|
/// Information about a version of an object
|
||||||
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct ObjectVersion {
|
pub struct ObjectVersion {
|
||||||
/// Id of the version
|
/// Id of the version
|
||||||
|
@ -109,11 +109,11 @@ mod v09 {
|
||||||
/// The key at which the object is stored in its bucket, used as sorting key
|
/// The key at which the object is stored in its bucket, used as sorting key
|
||||||
pub key: String,
|
pub key: String,
|
||||||
|
|
||||||
/// The list of currenty stored versions of the object
|
/// The list of currently stored versions of the object
|
||||||
pub(super) versions: Vec<ObjectVersion>,
|
pub(super) versions: Vec<ObjectVersion>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Informations about a version of an object
|
/// Information about a version of an object
|
||||||
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct ObjectVersion {
|
pub struct ObjectVersion {
|
||||||
/// Id of the version
|
/// Id of the version
|
||||||
|
@ -186,11 +186,11 @@ mod v010 {
|
||||||
/// The key at which the object is stored in its bucket, used as sorting key
|
/// The key at which the object is stored in its bucket, used as sorting key
|
||||||
pub key: String,
|
pub key: String,
|
||||||
|
|
||||||
/// The list of currenty stored versions of the object
|
/// The list of currently stored versions of the object
|
||||||
pub(super) versions: Vec<ObjectVersion>,
|
pub(super) versions: Vec<ObjectVersion>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Informations about a version of an object
|
/// Information about a version of an object
|
||||||
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct ObjectVersion {
|
pub struct ObjectVersion {
|
||||||
/// Id of the version
|
/// Id of the version
|
||||||
|
|
|
@ -49,7 +49,7 @@ mod v08 {
|
||||||
pub offset: u64,
|
pub offset: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Informations about a single block
|
/// Information about a single block
|
||||||
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
|
||||||
pub struct VersionBlock {
|
pub struct VersionBlock {
|
||||||
/// Blake2 sum of the block
|
/// Blake2 sum of the block
|
||||||
|
|
|
@ -20,7 +20,7 @@ static SNAPSHOT_MUTEX: Mutex<()> = Mutex::new(());
|
||||||
|
|
||||||
// ================ snapshotting logic =====================
|
// ================ snapshotting logic =====================
|
||||||
|
|
||||||
/// Run snashot_metadata in a blocking thread and async await on it
|
/// Run snapshot_metadata in a blocking thread and async await on it
|
||||||
pub async fn async_snapshot_metadata(garage: &Arc<Garage>) -> Result<(), Error> {
|
pub async fn async_snapshot_metadata(garage: &Arc<Garage>) -> Result<(), Error> {
|
||||||
let garage = garage.clone();
|
let garage = garage.clone();
|
||||||
let worker = tokio::task::spawn_blocking(move || snapshot_metadata(&garage));
|
let worker = tokio::task::spawn_blocking(move || snapshot_metadata(&garage));
|
||||||
|
|
|
@ -59,7 +59,7 @@ impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Ths trait adds a `.log_err()` method on `Result<(), E>` types,
|
/// The trait adds a `.log_err()` method on `Result<(), E>` types,
|
||||||
/// which dismisses the error by logging it to stderr.
|
/// which dismisses the error by logging it to stderr.
|
||||||
pub trait LogError {
|
pub trait LogError {
|
||||||
fn log_err(self, msg: &'static str);
|
fn log_err(self, msg: &'static str);
|
||||||
|
|
|
@ -18,7 +18,7 @@ use crate::util::*;
|
||||||
/// in the send queue of the client, and their responses in the send queue of the
|
/// in the send queue of the client, and their responses in the send queue of the
|
||||||
/// server. Lower values mean higher priority.
|
/// server. Lower values mean higher priority.
|
||||||
///
|
///
|
||||||
/// This mechanism is usefull for messages bigger than the maximum chunk size
|
/// This mechanism is useful for messages bigger than the maximum chunk size
|
||||||
/// (set at `0x4000` bytes), such as large file transfers.
|
/// (set at `0x4000` bytes), such as large file transfers.
|
||||||
/// In such case, all of the messages in the send queue with the highest priority
|
/// In such case, all of the messages in the send queue with the highest priority
|
||||||
/// will take turns to send individual chunks, in a round-robin fashion.
|
/// will take turns to send individual chunks, in a round-robin fashion.
|
||||||
|
@ -102,7 +102,7 @@ pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static
|
||||||
|
|
||||||
/// The Req<M> is a helper object used to create requests and attach them
|
/// The Req<M> is a helper object used to create requests and attach them
|
||||||
/// a stream of data. If the stream is a fixed Bytes and not a ByteStream,
|
/// a stream of data. If the stream is a fixed Bytes and not a ByteStream,
|
||||||
/// Req<M> is cheaply clonable to allow the request to be sent to different
|
/// Req<M> is cheaply cloneable to allow the request to be sent to different
|
||||||
/// peers (Clone will panic if the stream is a ByteStream).
|
/// peers (Clone will panic if the stream is a ByteStream).
|
||||||
pub struct Req<M: Message> {
|
pub struct Req<M: Message> {
|
||||||
pub(crate) msg: Arc<M>,
|
pub(crate) msg: Arc<M>,
|
||||||
|
|
|
@ -41,7 +41,7 @@ pub(crate) type VersionTag = [u8; 16];
|
||||||
pub(crate) const NETAPP_VERSION_TAG: u64 = 0x6772676e65740010; // grgnet 0x0010 (1.0)
|
pub(crate) const NETAPP_VERSION_TAG: u64 = 0x6772676e65740010; // grgnet 0x0010 (1.0)
|
||||||
|
|
||||||
/// HelloMessage is sent by the client on a Netapp connection to indicate
|
/// HelloMessage is sent by the client on a Netapp connection to indicate
|
||||||
/// that they are also a server and ready to recieve incoming connections
|
/// that they are also a server and ready to receive incoming connections
|
||||||
/// at the specified address and port. If the client doesn't know their
|
/// at the specified address and port. If the client doesn't know their
|
||||||
/// public address, they don't need to specify it and we look at the
|
/// public address, they don't need to specify it and we look at the
|
||||||
/// remote address of the socket is used instead.
|
/// remote address of the socket is used instead.
|
||||||
|
@ -290,7 +290,7 @@ impl NetApp {
|
||||||
/// Attempt to connect to a peer, given by its ip:port and its public key.
|
/// Attempt to connect to a peer, given by its ip:port and its public key.
|
||||||
/// The public key will be checked during the secret handshake process.
|
/// The public key will be checked during the secret handshake process.
|
||||||
/// This function returns once the connection has been established and a
|
/// This function returns once the connection has been established and a
|
||||||
/// successfull handshake was made. At this point we can send messages to
|
/// successful handshake was made. At this point we can send messages to
|
||||||
/// the other node with `Netapp::request`
|
/// the other node with `Netapp::request`
|
||||||
pub async fn try_connect(self: Arc<Self>, ip: SocketAddr, id: NodeID) -> Result<(), Error> {
|
pub async fn try_connect(self: Arc<Self>, ip: SocketAddr, id: NodeID) -> Result<(), Error> {
|
||||||
// Don't connect to ourself, we don't care
|
// Don't connect to ourself, we don't care
|
||||||
|
|
|
@ -138,7 +138,7 @@ pub enum PeerConnState {
|
||||||
/// A connection tentative is in progress (the nth, where n is the value stored)
|
/// A connection tentative is in progress (the nth, where n is the value stored)
|
||||||
Trying(usize),
|
Trying(usize),
|
||||||
|
|
||||||
/// We abandonned trying to connect to this peer (too many failed attempts)
|
/// We abandoned trying to connect to this peer (too many failed attempts)
|
||||||
Abandonned,
|
Abandonned,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@ use crate::stream::*;
|
||||||
// - if error:
|
// - if error:
|
||||||
// - u8: error kind, encoded using error::io_errorkind_to_u8
|
// - u8: error kind, encoded using error::io_errorkind_to_u8
|
||||||
// - rest: error message
|
// - rest: error message
|
||||||
// - absent for cancel messag
|
// - absent for cancel message
|
||||||
|
|
||||||
pub(crate) type RequestID = u32;
|
pub(crate) type RequestID = u32;
|
||||||
pub(crate) type ChunkLength = u16;
|
pub(crate) type ChunkLength = u16;
|
||||||
|
@ -217,7 +217,7 @@ impl<'a> futures::Future for SendQueuePollNextReady<'a> {
|
||||||
|
|
||||||
enum DataFrame {
|
enum DataFrame {
|
||||||
/// a fixed size buffer containing some data + a boolean indicating whether
|
/// a fixed size buffer containing some data + a boolean indicating whether
|
||||||
/// there may be more data comming from this stream. Can be used for some
|
/// there may be more data coming from this stream. Can be used for some
|
||||||
/// optimization. It's an error to set it to false if there is more data, but it is correct
|
/// optimization. It's an error to set it to false if there is more data, but it is correct
|
||||||
/// (albeit sub-optimal) to set it to true if there is nothing coming after
|
/// (albeit sub-optimal) to set it to true if there is nothing coming after
|
||||||
Data(Bytes, bool),
|
Data(Bytes, bool),
|
||||||
|
@ -310,7 +310,7 @@ pub(crate) trait SendLoop: Sync {
|
||||||
// recv_fut is cancellation-safe according to tokio doc,
|
// recv_fut is cancellation-safe according to tokio doc,
|
||||||
// send_fut is cancellation-safe as implemented above?
|
// send_fut is cancellation-safe as implemented above?
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
biased; // always read incomming channel first if it has data
|
biased; // always read incoming channel first if it has data
|
||||||
sth = recv_fut => {
|
sth = recv_fut => {
|
||||||
match sth {
|
match sth {
|
||||||
Some(SendItem::Stream(id, prio, order_tag, data)) => {
|
Some(SendItem::Stream(id, prio, order_tag, data)) => {
|
||||||
|
|
|
@ -16,7 +16,7 @@ use crate::bytes_buf::BytesBuf;
|
||||||
///
|
///
|
||||||
/// Items sent in the ByteStream may be errors of type `std::io::Error`.
|
/// Items sent in the ByteStream may be errors of type `std::io::Error`.
|
||||||
/// An error indicates the end of the ByteStream: a reader should no longer read
|
/// An error indicates the end of the ByteStream: a reader should no longer read
|
||||||
/// after recieving an error, and a writer should stop writing after sending an error.
|
/// after receiving an error, and a writer should stop writing after sending an error.
|
||||||
pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send + Sync>>;
|
pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send + Sync>>;
|
||||||
|
|
||||||
/// A packet sent in a ByteStream, which may contain either
|
/// A packet sent in a ByteStream, which may contain either
|
||||||
|
|
|
@ -66,7 +66,7 @@ async fn run_test_inner(port_base: u16) {
|
||||||
println!("A pl2: {:?}", pl2);
|
println!("A pl2: {:?}", pl2);
|
||||||
assert_eq!(pl2.len(), 2);
|
assert_eq!(pl2.len(), 2);
|
||||||
|
|
||||||
// Connect third ndoe and check it peers with everyone
|
// Connect third node and check it peers with everyone
|
||||||
let (thread3, _netapp3, peering3) =
|
let (thread3, _netapp3, peering3) =
|
||||||
run_netapp(netid, pk3, sk3, addr3, vec![(pk2, addr2)], stop_rx.clone());
|
run_netapp(netid, pk3, sk3, addr3, vec![(pk2, addr2)], stop_rx.clone());
|
||||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||||
|
|
|
@ -25,7 +25,7 @@ where
|
||||||
/// This async function returns only when a true signal was received
|
/// This async function returns only when a true signal was received
|
||||||
/// from a watcher that tells us when to exit.
|
/// from a watcher that tells us when to exit.
|
||||||
///
|
///
|
||||||
/// Usefull in a select statement to interrupt another
|
/// Useful in a select statement to interrupt another
|
||||||
/// future:
|
/// future:
|
||||||
/// ```ignore
|
/// ```ignore
|
||||||
/// select!(
|
/// select!(
|
||||||
|
|
|
@ -133,7 +133,7 @@ impl Graph<FlowEdge> {
|
||||||
/// This function shuffles the order of the edge lists. It keeps the ids of the
|
/// This function shuffles the order of the edge lists. It keeps the ids of the
|
||||||
/// reversed edges consistent.
|
/// reversed edges consistent.
|
||||||
fn shuffle_edges(&mut self) {
|
fn shuffle_edges(&mut self) {
|
||||||
// We use deterministic randomness so that the layout calculation algorihtm
|
// We use deterministic randomness so that the layout calculation algorithm
|
||||||
// will output the same thing every time it is run. This way, the results
|
// will output the same thing every time it is run. This way, the results
|
||||||
// pre-calculated in `garage layout show` will match exactly those used
|
// pre-calculated in `garage layout show` will match exactly those used
|
||||||
// in practice with `garage layout apply`
|
// in practice with `garage layout apply`
|
||||||
|
|
|
@ -90,7 +90,7 @@ impl LayoutHelper {
|
||||||
// sync_map_min is the minimum value of sync_map among storage nodes
|
// sync_map_min is the minimum value of sync_map among storage nodes
|
||||||
// in the cluster (non-gateway nodes only, current and previous layouts).
|
// in the cluster (non-gateway nodes only, current and previous layouts).
|
||||||
// It is the highest layout version for which we know that all relevant
|
// It is the highest layout version for which we know that all relevant
|
||||||
// storage nodes have fullfilled a sync, and therefore it is safe to
|
// storage nodes have fulfilled a sync, and therefore it is safe to
|
||||||
// use a read quorum within that layout to ensure consistency.
|
// use a read quorum within that layout to ensure consistency.
|
||||||
// Gateway nodes are excluded here because they hold no relevant data
|
// Gateway nodes are excluded here because they hold no relevant data
|
||||||
// (they store the bucket and access key tables, but we don't have
|
// (they store the bucket and access key tables, but we don't have
|
||||||
|
|
|
@ -48,7 +48,7 @@ impl LayoutManager {
|
||||||
Ok(x) => {
|
Ok(x) => {
|
||||||
if x.current().replication_factor != replication_factor.replication_factor() {
|
if x.current().replication_factor != replication_factor.replication_factor() {
|
||||||
return Err(Error::Message(format!(
|
return Err(Error::Message(format!(
|
||||||
"Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
|
"Previous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
|
||||||
x.current().replication_factor,
|
x.current().replication_factor,
|
||||||
replication_factor.replication_factor()
|
replication_factor.replication_factor()
|
||||||
)));
|
)));
|
||||||
|
|
|
@ -241,7 +241,7 @@ mod v010 {
|
||||||
/// The versions currently in use in the cluster
|
/// The versions currently in use in the cluster
|
||||||
pub versions: Vec<LayoutVersion>,
|
pub versions: Vec<LayoutVersion>,
|
||||||
/// At most 5 of the previous versions, not used by the garage_table
|
/// At most 5 of the previous versions, not used by the garage_table
|
||||||
/// module, but usefull for the garage_block module to find data blocks
|
/// module, but useful for the garage_block module to find data blocks
|
||||||
/// that have not yet been moved
|
/// that have not yet been moved
|
||||||
pub old_versions: Vec<LayoutVersion>,
|
pub old_versions: Vec<LayoutVersion>,
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,7 @@ use crate::replication_mode::ReplicationFactor;
|
||||||
|
|
||||||
// This function checks that the partition size S computed is at least better than the
|
// This function checks that the partition size S computed is at least better than the
|
||||||
// one given by a very naive algorithm. To do so, we try to run the naive algorithm
|
// one given by a very naive algorithm. To do so, we try to run the naive algorithm
|
||||||
// assuming a partion size of S+1. If we succed, it means that the optimal assignment
|
// assuming a partition size of S+1. If we succeed, it means that the optimal assignment
|
||||||
// was not optimal. The naive algorithm is the following :
|
// was not optimal. The naive algorithm is the following :
|
||||||
// - we compute the max number of partitions associated to every node, capped at the
|
// - we compute the max number of partitions associated to every node, capped at the
|
||||||
// partition number. It gives the number of tokens of every node.
|
// partition number. It gives the number of tokens of every node.
|
||||||
|
|
|
@ -471,7 +471,7 @@ impl LayoutVersion {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// We clear the ring assignemnt data
|
// We clear the ring assignment data
|
||||||
self.ring_assignment_data = Vec::<CompactNodeType>::new();
|
self.ring_assignment_data = Vec::<CompactNodeType>::new();
|
||||||
|
|
||||||
Ok(Some(old_assignment))
|
Ok(Some(old_assignment))
|
||||||
|
|
|
@ -413,7 +413,7 @@ impl RpcHelper {
|
||||||
/// Make a RPC call to multiple servers, returning either a Vec of responses,
|
/// Make a RPC call to multiple servers, returning either a Vec of responses,
|
||||||
/// or an error if quorum could not be reached due to too many errors
|
/// or an error if quorum could not be reached due to too many errors
|
||||||
///
|
///
|
||||||
/// Contrary to try_call_many, this fuction is especially made for broadcast
|
/// Contrary to try_call_many, this function is especially made for broadcast
|
||||||
/// write operations. In particular:
|
/// write operations. In particular:
|
||||||
///
|
///
|
||||||
/// - The request are sent to all specified nodes as soon as `try_write_many_sets`
|
/// - The request are sent to all specified nodes as soon as `try_write_many_sets`
|
||||||
|
@ -506,7 +506,7 @@ impl RpcHelper {
|
||||||
|
|
||||||
// If we have a quorum of ok in all quorum sets, then it's a success!
|
// If we have a quorum of ok in all quorum sets, then it's a success!
|
||||||
if result_tracker.all_quorums_ok() {
|
if result_tracker.all_quorums_ok() {
|
||||||
// Continue all other requets in background
|
// Continue all other requests in background
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
resp_stream.collect::<Vec<(Uuid, Result<_, _>)>>().await;
|
resp_stream.collect::<Vec<(Uuid, Result<_, _>)>>().await;
|
||||||
drop(drop_on_complete);
|
drop(drop_on_complete);
|
||||||
|
|
|
@ -54,7 +54,7 @@ pub const SYSTEM_RPC_PATH: &str = "garage_rpc/system.rs/SystemRpc";
|
||||||
/// RPC messages related to membership
|
/// RPC messages related to membership
|
||||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
pub enum SystemRpc {
|
pub enum SystemRpc {
|
||||||
/// Response to successfull advertisements
|
/// Response to successful advertisements
|
||||||
Ok,
|
Ok,
|
||||||
/// Request to connect to a specific node (in <pubkey>@<host>:<port> format, pubkey = full-length node ID)
|
/// Request to connect to a specific node (in <pubkey>@<host>:<port> format, pubkey = full-length node ID)
|
||||||
Connect(String),
|
Connect(String),
|
||||||
|
@ -172,7 +172,7 @@ pub struct ClusterHealth {
|
||||||
pub enum ClusterHealthStatus {
|
pub enum ClusterHealthStatus {
|
||||||
/// All nodes are available
|
/// All nodes are available
|
||||||
Healthy,
|
Healthy,
|
||||||
/// Some storage nodes are unavailable, but quorum is stil
|
/// Some storage nodes are unavailable, but quorum is still
|
||||||
/// achieved for all partitions
|
/// achieved for all partitions
|
||||||
Degraded,
|
Degraded,
|
||||||
/// Quorum is not available for some partitions
|
/// Quorum is not available for some partitions
|
||||||
|
@ -286,7 +286,7 @@ impl System {
|
||||||
let mut local_status = NodeStatus::initial(replication_factor, &layout_manager);
|
let mut local_status = NodeStatus::initial(replication_factor, &layout_manager);
|
||||||
local_status.update_disk_usage(&config.metadata_dir, &config.data_dir);
|
local_status.update_disk_usage(&config.metadata_dir, &config.data_dir);
|
||||||
|
|
||||||
// ---- if enabled, set up additionnal peer discovery methods ----
|
// ---- if enabled, set up additional peer discovery methods ----
|
||||||
#[cfg(feature = "consul-discovery")]
|
#[cfg(feature = "consul-discovery")]
|
||||||
let consul_discovery = match &config.consul_discovery {
|
let consul_discovery = match &config.consul_discovery {
|
||||||
Some(cfg) => Some(
|
Some(cfg) => Some(
|
||||||
|
@ -337,7 +337,7 @@ impl System {
|
||||||
Ok(sys)
|
Ok(sys)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Perform bootstraping, starting the ping loop
|
/// Perform bootstrapping, starting the ping loop
|
||||||
pub async fn run(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
|
pub async fn run(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
|
||||||
join!(
|
join!(
|
||||||
self.netapp.clone().listen(
|
self.netapp.clone().listen(
|
||||||
|
|
|
@ -258,14 +258,14 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
|
||||||
.await
|
.await
|
||||||
.err_context("GC: remote delete tombstones")?;
|
.err_context("GC: remote delete tombstones")?;
|
||||||
|
|
||||||
// GC has been successfull for all of these entries.
|
// GC has been successful for all of these entries.
|
||||||
// We now remove them all from our local table and from the GC todo list.
|
// We now remove them all from our local table and from the GC todo list.
|
||||||
for item in items {
|
for item in items {
|
||||||
self.data
|
self.data
|
||||||
.delete_if_equal_hash(&item.key[..], item.value_hash)
|
.delete_if_equal_hash(&item.key[..], item.value_hash)
|
||||||
.err_context("GC: local delete tombstones")?;
|
.err_context("GC: local delete tombstones")?;
|
||||||
item.remove_if_equal(&self.data.gc_todo)
|
item.remove_if_equal(&self.data.gc_todo)
|
||||||
.err_context("GC: remove from todo list after successfull GC")?;
|
.err_context("GC: remove from todo list after successful GC")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -383,7 +383,7 @@ impl GcTodoEntry {
|
||||||
|
|
||||||
/// Removes the GcTodoEntry from the gc_todo tree if the
|
/// Removes the GcTodoEntry from the gc_todo tree if the
|
||||||
/// hash of the serialized value is the same here as in the tree.
|
/// hash of the serialized value is the same here as in the tree.
|
||||||
/// This is usefull to remove a todo entry only under the condition
|
/// This is useful to remove a todo entry only under the condition
|
||||||
/// that it has not changed since the time it was read, i.e.
|
/// that it has not changed since the time it was read, i.e.
|
||||||
/// what we have to do is still the same
|
/// what we have to do is still the same
|
||||||
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
|
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
|
||||||
|
|
|
@ -13,12 +13,12 @@ pub trait TableReplication: Send + Sync + 'static {
|
||||||
|
|
||||||
/// Which nodes to send read requests to
|
/// Which nodes to send read requests to
|
||||||
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>;
|
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>;
|
||||||
/// Responses needed to consider a read succesfull
|
/// Responses needed to consider a read successful
|
||||||
fn read_quorum(&self) -> usize;
|
fn read_quorum(&self) -> usize;
|
||||||
|
|
||||||
/// Which nodes to send writes to
|
/// Which nodes to send writes to
|
||||||
fn write_sets(&self, hash: &Hash) -> Self::WriteSets;
|
fn write_sets(&self, hash: &Hash) -> Self::WriteSets;
|
||||||
/// Responses needed to consider a write succesfull in each set
|
/// Responses needed to consider a write successful in each set
|
||||||
fn write_quorum(&self) -> usize;
|
fn write_quorum(&self) -> usize;
|
||||||
|
|
||||||
// Accessing partitions, for Merkle tree & sync
|
// Accessing partitions, for Merkle tree & sync
|
||||||
|
|
|
@ -316,7 +316,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
|
||||||
SyncRpc::RootCkDifferent(true) => VecDeque::from(vec![root_ck_key]),
|
SyncRpc::RootCkDifferent(true) => VecDeque::from(vec![root_ck_key]),
|
||||||
x => {
|
x => {
|
||||||
return Err(Error::Message(format!(
|
return Err(Error::Message(format!(
|
||||||
"Invalid respone to RootCkHash RPC: {}",
|
"Invalid response to RootCkHash RPC: {}",
|
||||||
debug_serialize(x)
|
debug_serialize(x)
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
@ -362,7 +362,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
|
||||||
SyncRpc::Node(_, node) => node,
|
SyncRpc::Node(_, node) => node,
|
||||||
x => {
|
x => {
|
||||||
return Err(Error::Message(format!(
|
return Err(Error::Message(format!(
|
||||||
"Invalid respone to GetNode RPC: {}",
|
"Invalid response to GetNode RPC: {}",
|
||||||
debug_serialize(x)
|
debug_serialize(x)
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
|
|
@ -171,11 +171,11 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
||||||
// We will here batch all items into a single request for each concerned
|
// We will here batch all items into a single request for each concerned
|
||||||
// node, with all of the entries it must store within that request.
|
// node, with all of the entries it must store within that request.
|
||||||
// Each entry has to be saved to a specific list of "write sets", i.e. a set
|
// Each entry has to be saved to a specific list of "write sets", i.e. a set
|
||||||
// of node within wich a quorum must be achieved. In normal operation, there
|
// of node within which a quorum must be achieved. In normal operation, there
|
||||||
// is a single write set which corresponds to the quorum in the current
|
// is a single write set which corresponds to the quorum in the current
|
||||||
// cluster layout, but when the layout is updated, multiple write sets might
|
// cluster layout, but when the layout is updated, multiple write sets might
|
||||||
// have to be handled at once. Here, since we are sending many entries, we
|
// have to be handled at once. Here, since we are sending many entries, we
|
||||||
// will have to handle many write sets in all cases. The algorihtm is thus
|
// will have to handle many write sets in all cases. The algorithm is thus
|
||||||
// to send one request to each node with all the items it must save,
|
// to send one request to each node with all the items it must save,
|
||||||
// and keep track of the OK responses within each write set: if for all sets
|
// and keep track of the OK responses within each write set: if for all sets
|
||||||
// a quorum of nodes has answered OK, then the insert has succeeded and
|
// a quorum of nodes has answered OK, then the insert has succeeded and
|
||||||
|
|
|
@ -14,7 +14,7 @@ use crate::background::{WorkerInfo, WorkerStatus};
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::time::now_msec;
|
use crate::time::now_msec;
|
||||||
|
|
||||||
// All workers that haven't exited for this time after an exit signal was recieved
|
// All workers that haven't exited for this time after an exit signal was received
|
||||||
// will be interrupted in the middle of whatever they are doing.
|
// will be interrupted in the middle of whatever they are doing.
|
||||||
const EXIT_DEADLINE: Duration = Duration::from_secs(8);
|
const EXIT_DEADLINE: Duration = Duration::from_secs(8);
|
||||||
|
|
||||||
|
@ -54,7 +54,7 @@ pub trait Worker: Send {
|
||||||
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error>;
|
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error>;
|
||||||
|
|
||||||
/// Wait for work: await for some task to become available. This future can be interrupted in
|
/// Wait for work: await for some task to become available. This future can be interrupted in
|
||||||
/// the middle for any reason, for example if an interrupt signal was recieved.
|
/// the middle for any reason, for example if an interrupt signal was received.
|
||||||
async fn wait_for_work(&mut self) -> WorkerState;
|
async fn wait_for_work(&mut self) -> WorkerState;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -93,12 +93,12 @@ pub struct Config {
|
||||||
/// the addresses announced to other peers to a specific subnet.
|
/// the addresses announced to other peers to a specific subnet.
|
||||||
pub rpc_public_addr_subnet: Option<String>,
|
pub rpc_public_addr_subnet: Option<String>,
|
||||||
|
|
||||||
/// Timeout for Netapp's ping messagess
|
/// Timeout for Netapp's ping messages
|
||||||
pub rpc_ping_timeout_msec: Option<u64>,
|
pub rpc_ping_timeout_msec: Option<u64>,
|
||||||
/// Timeout for Netapp RPC calls
|
/// Timeout for Netapp RPC calls
|
||||||
pub rpc_timeout_msec: Option<u64>,
|
pub rpc_timeout_msec: Option<u64>,
|
||||||
|
|
||||||
// -- Bootstraping and discovery
|
// -- Bootstrapping and discovery
|
||||||
/// Bootstrap peers RPC address
|
/// Bootstrap peers RPC address
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub bootstrap_peers: Vec<String>,
|
pub bootstrap_peers: Vec<String>,
|
||||||
|
|
|
@ -33,8 +33,8 @@ pub trait Crdt {
|
||||||
/// arises very often, for example with a Lww or a LwwMap: the value type has to be a CRDT so that
|
/// arises very often, for example with a Lww or a LwwMap: the value type has to be a CRDT so that
|
||||||
/// we have a rule for what to do when timestamps aren't enough to disambiguate (in a distributed
|
/// we have a rule for what to do when timestamps aren't enough to disambiguate (in a distributed
|
||||||
/// system, anything can happen!), and with AutoCrdt the rule is to make an arbitrary (but
|
/// system, anything can happen!), and with AutoCrdt the rule is to make an arbitrary (but
|
||||||
/// determinstic) choice between the two. When using an Option<T> instead with this impl, ambiguity
|
/// deterministic) choice between the two. When using an Option<T> instead with this impl, ambiguity
|
||||||
/// cases are explicitely stored as None, which allows us to detect the ambiguity and handle it in
|
/// cases are explicitly stored as None, which allows us to detect the ambiguity and handle it in
|
||||||
/// the way we want. (this can only work if we are happy with losing the value when an ambiguity
|
/// the way we want. (this can only work if we are happy with losing the value when an ambiguity
|
||||||
/// arises)
|
/// arises)
|
||||||
impl<T> Crdt for Option<T>
|
impl<T> Crdt for Option<T>
|
||||||
|
|
|
@ -16,7 +16,7 @@ use crate::crdt::crdt::*;
|
||||||
/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must
|
/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must
|
||||||
/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to
|
/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to
|
||||||
/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types
|
/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types
|
||||||
/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value.
|
/// that implement the `Ord` trait get a default CRDT implementation that keeps the maximum value.
|
||||||
/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is
|
/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is
|
||||||
/// generally desirable in this case to never explicitly produce LWW values with the same timestamp
|
/// generally desirable in this case to never explicitly produce LWW values with the same timestamp
|
||||||
/// but different inner values, as the rule to keep the maximum value isn't generally the desired
|
/// but different inner values, as the rule to keep the maximum value isn't generally the desired
|
||||||
|
@ -28,9 +28,9 @@ use crate::crdt::crdt::*;
|
||||||
///
|
///
|
||||||
/// Given that clocks are not too desynchronized, this assumption
|
/// Given that clocks are not too desynchronized, this assumption
|
||||||
/// is enough for most cases, as there is few chance that two humans
|
/// is enough for most cases, as there is few chance that two humans
|
||||||
/// coordonate themself faster than the time difference between two NTP servers.
|
/// coordinate themself faster than the time difference between two NTP servers.
|
||||||
///
|
///
|
||||||
/// As a more concret example, let's suppose you want to upload a file
|
/// As a more concrete example, let's suppose you want to upload a file
|
||||||
/// with the same key (path) in the same bucket at the very same time.
|
/// with the same key (path) in the same bucket at the very same time.
|
||||||
/// For each request, the file will be timestamped by the receiving server
|
/// For each request, the file will be timestamped by the receiving server
|
||||||
/// and may differ from what you observed with your atomic clock!
|
/// and may differ from what you observed with your atomic clock!
|
||||||
|
@ -84,16 +84,16 @@ where
|
||||||
&self.v
|
&self.v
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Take the value inside the CRDT (discards the timesamp)
|
/// Take the value inside the CRDT (discards the timestamp)
|
||||||
pub fn take(self) -> T {
|
pub fn take(self) -> T {
|
||||||
self.v
|
self.v
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a mutable reference to the CRDT's value
|
/// Get a mutable reference to the CRDT's value
|
||||||
///
|
///
|
||||||
/// This is usefull to mutate the inside value without changing the LWW timestamp.
|
/// This is useful to mutate the inside value without changing the LWW timestamp.
|
||||||
/// When such mutation is done, the merge between two LWW values is done using the inner
|
/// When such mutation is done, the merge between two LWW values is done using the inner
|
||||||
/// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large
|
/// CRDT's merge operation. This is useful in the case where the inner CRDT is a large
|
||||||
/// data type, such as a map, and we only want to change a single item in the map.
|
/// data type, such as a map, and we only want to change a single item in the map.
|
||||||
/// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification.
|
/// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification.
|
||||||
/// This delta consists in a LWW with the same timestamp, and the map
|
/// This delta consists in a LWW with the same timestamp, and the map
|
||||||
|
|
|
@ -109,7 +109,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Takes all of the values of the map and returns them. The current map is reset to the
|
/// Takes all of the values of the map and returns them. The current map is reset to the
|
||||||
/// empty map. This is very usefull to produce in-place a new map that contains only a delta
|
/// empty map. This is very useful to produce in-place a new map that contains only a delta
|
||||||
/// that modifies a certain value:
|
/// that modifies a certain value:
|
||||||
///
|
///
|
||||||
/// ```ignore
|
/// ```ignore
|
||||||
|
@ -162,7 +162,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
|
/// Gets a reference to all of the items, as a slice. Useful to iterate on all map values.
|
||||||
/// In most case you will want to ignore the timestamp (second item of the tuple).
|
/// In most case you will want to ignore the timestamp (second item of the tuple).
|
||||||
pub fn items(&self) -> &[(K, u64, V)] {
|
pub fn items(&self) -> &[(K, u64, V)] {
|
||||||
&self.vals[..]
|
&self.vals[..]
|
||||||
|
|
|
@ -57,7 +57,7 @@ where
|
||||||
Err(_) => None,
|
Err(_) => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
|
/// Gets a reference to all of the items, as a slice. Useful to iterate on all map values.
|
||||||
pub fn items(&self) -> &[(K, V)] {
|
pub fn items(&self) -> &[(K, V)] {
|
||||||
&self.vals[..]
|
&self.vals[..]
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
/// Serialize to MessagePacki, without versionning
|
/// Serialize to MessagePack, without versioning
|
||||||
/// (see garage_util::migrate for functions that manage versionned
|
/// (see garage_util::migrate for functions that manage versioned
|
||||||
/// data formats)
|
/// data formats)
|
||||||
pub fn nonversioned_encode<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
|
pub fn nonversioned_encode<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
|
||||||
where
|
where
|
||||||
|
@ -13,8 +13,8 @@ where
|
||||||
Ok(wr)
|
Ok(wr)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Deserialize from MessagePacki, without versionning
|
/// Deserialize from MessagePack, without versioning
|
||||||
/// (see garage_util::migrate for functions that manage versionned
|
/// (see garage_util::migrate for functions that manage versioned
|
||||||
/// data formats)
|
/// data formats)
|
||||||
pub fn nonversioned_decode<T>(bytes: &[u8]) -> Result<T, rmp_serde::decode::Error>
|
pub fn nonversioned_decode<T>(bytes: &[u8]) -> Result<T, rmp_serde::decode::Error>
|
||||||
where
|
where
|
||||||
|
|
Loading…
Reference in a new issue