add kv_get_prefix

This commit is contained in:
Alex 2023-04-21 13:10:41 +02:00
parent a11ed03031
commit 45e12c3bcd
5 changed files with 82 additions and 25 deletions

View file

@ -2,7 +2,7 @@
name = "df-consul" name = "df-consul"
description = "Deuxfleurs' async Rust bindings for (a subset of) the Consul HTTP API" description = "Deuxfleurs' async Rust bindings for (a subset of) the Consul HTTP API"
authors = [ "Alex Auvolat <alex@adnab.me>" ] authors = [ "Alex Auvolat <alex@adnab.me>" ]
version = "0.3.3" version = "0.3.4"
edition = "2021" edition = "2021"
license = "MIT" license = "MIT"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/df-consul" repository = "https://git.deuxfleurs.fr/Deuxfleurs/df-consul"
@ -14,6 +14,7 @@ anyhow = "1.0.66"
serde = { version = "1.0.149", features = ["derive"] } serde = { version = "1.0.149", features = ["derive"] }
log = "0.4" log = "0.4"
bytes = "1" bytes = "1"
base64 = "0.13"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls-manual-roots" ] } reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls-manual-roots" ] }
tokio = { version = "1.23", default-features = false, features = [ "macros" ] } tokio = { version = "1.23", default-features = false, features = [ "macros" ] }
futures = "0.3.25" futures = "0.3.25"

View file

@ -45,4 +45,19 @@ async fn main() {
.unwrap() .unwrap()
); );
} }
println!("== LIST PREFIX ==");
let prefix = consul
.kv_get_prefix("diplonat/autodiscovery", None)
.await
.unwrap();
println!("{:?}", prefix);
for i in 0..3 {
println!("-- wait for update... --");
let prefix = consul
.kv_get_prefix("diplonat/autodiscovery", Some(prefix.index()))
.await
.unwrap();
println!("{:?}", prefix);
}
} }

View file

@ -5,7 +5,6 @@
use std::cmp; use std::cmp;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -178,25 +177,6 @@ impl Consul {
rx rx
} }
async fn get_with_index<T: for<'de> Deserialize<'de>>(
&self,
mut url: String,
last_index: Option<usize>,
) -> Result<WithIndex<T>> {
if let Some(i) = last_index {
if url.contains('?') {
write!(&mut url, "&index={}", i).unwrap();
} else {
write!(&mut url, "?index={}", i).unwrap();
}
}
debug!("GET {} as {}", url, std::any::type_name::<T>());
let http = self.client.get(&url).send().await?;
Ok(WithIndex::<T>::index_from(&http)?.value(http.json().await?))
}
} }
async fn do_watch_all_service_health( async fn do_watch_all_service_health(

View file

@ -1,10 +1,19 @@
use std::collections::HashMap;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use bytes::Bytes; use bytes::Bytes;
use log::*; use log::*;
use reqwest::StatusCode; use reqwest::StatusCode;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::Consul; use crate::{Consul, WithIndex};
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
pub struct KvGetPrefixEntry {
pub key: String,
pub value: String,
}
impl Consul { impl Consul {
pub async fn kv_get(&self, key: &str) -> Result<Option<Bytes>> { pub async fn kv_get(&self, key: &str) -> Result<Option<Bytes>> {
@ -37,6 +46,33 @@ impl Consul {
} }
} }
pub async fn kv_get_prefix(
&self,
key_prefix: &str,
last_index: Option<usize>,
) -> Result<WithIndex<HashMap<String, Bytes>>> {
debug!("kv_get_prefix {} index={:?}", key_prefix, last_index);
let results: WithIndex<Vec<KvGetPrefixEntry>> = self
.get_with_index(
format!(
"{}/v1/kv/{}{}?recurse",
self.url, self.kv_prefix, key_prefix
),
last_index,
)
.await?;
let mut res = HashMap::new();
for ent in results.value {
res.insert(ent.key, Bytes::from(base64::decode(&ent.value)?));
}
Ok(WithIndex {
value: res,
index: results.index,
})
}
pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> { pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> {
debug!("kv_put {}", key); debug!("kv_put {}", key);

View file

@ -1,14 +1,39 @@
use std::fmt::{Debug, Display}; use std::fmt::{Debug, Display, Write};
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use log::*;
use reqwest::Response; use reqwest::Response;
use serde::Deserialize;
use crate::Consul;
impl Consul {
pub(crate) async fn get_with_index<T: for<'de> Deserialize<'de>>(
&self,
mut url: String,
last_index: Option<usize>,
) -> Result<WithIndex<T>> {
if let Some(i) = last_index {
if url.contains('?') {
write!(&mut url, "&index={}", i).unwrap();
} else {
write!(&mut url, "?index={}", i).unwrap();
}
}
debug!("GET {} as {}", url, std::any::type_name::<T>());
let http = self.client.get(&url).send().await?;
Ok(WithIndex::<T>::index_from(&http)?.value(http.json().await?))
}
}
/// Wraps the returned value of an [API call with blocking /// Wraps the returned value of an [API call with blocking
/// possibility](https://developer.hashicorp.com/consul/api-docs/features/blocking) with the /// possibility](https://developer.hashicorp.com/consul/api-docs/features/blocking) with the
/// returned Consul index /// returned Consul index
pub struct WithIndex<T> { pub struct WithIndex<T> {
value: T, pub(crate) value: T,
index: usize, pub(crate) index: usize,
} }
impl<T> WithIndex<T> { impl<T> WithIndex<T> {