WIP: POC for webhooks #340
6 changed files with 56 additions and 4 deletions
3
Cargo.lock
generated
3
Cargo.lock
generated
|
@ -1013,6 +1013,7 @@ dependencies = [
|
||||||
"http-range",
|
"http-range",
|
||||||
"httpdate 0.3.2",
|
"httpdate 0.3.2",
|
||||||
"hyper",
|
"hyper",
|
||||||
|
"hyper-tls",
|
||||||
"idna",
|
"idna",
|
||||||
"md-5",
|
"md-5",
|
||||||
"multer",
|
"multer",
|
||||||
|
@ -1714,7 +1715,7 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "k2v-client"
|
name = "k2v-client"
|
||||||
version = "0.1.0"
|
version = "0.0.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"base64",
|
"base64",
|
||||||
"clap 3.1.18",
|
"clap 3.1.18",
|
||||||
|
|
|
@ -44,6 +44,7 @@ http = "0.2"
|
||||||
httpdate = "0.3"
|
httpdate = "0.3"
|
||||||
http-range = "0.1"
|
http-range = "0.1"
|
||||||
hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] }
|
hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] }
|
||||||
|
hyper-tls = {version = "0.5.0"}
|
||||||
multer = "2.0"
|
multer = "2.0"
|
||||||
percent-encoding = "2.1.0"
|
percent-encoding = "2.1.0"
|
||||||
roxmltree = "0.14"
|
roxmltree = "0.14"
|
||||||
|
|
|
@ -15,3 +15,4 @@ pub mod admin;
|
||||||
#[cfg(feature = "k2v")]
|
#[cfg(feature = "k2v")]
|
||||||
pub mod k2v;
|
pub mod k2v;
|
||||||
pub mod s3;
|
pub mod s3;
|
||||||
|
pub mod webhooks;
|
|
@ -30,6 +30,7 @@ use crate::s3::post_object::handle_post_object;
|
||||||
use crate::s3::put::*;
|
use crate::s3::put::*;
|
||||||
use crate::s3::router::Endpoint;
|
use crate::s3::router::Endpoint;
|
||||||
use crate::s3::website::*;
|
use crate::s3::website::*;
|
||||||
|
use crate::webhooks::*;
|
||||||
|
|
||||||
pub struct S3ApiServer {
|
pub struct S3ApiServer {
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
|
@ -189,8 +190,7 @@ impl ApiHandler for S3ApiServer {
|
||||||
part_number,
|
part_number,
|
||||||
&upload_id,
|
&upload_id,
|
||||||
content_sha256,
|
content_sha256,
|
||||||
)
|
).await
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
Endpoint::CopyObject { key } => {
|
Endpoint::CopyObject { key } => {
|
||||||
handle_copy(garage, &api_key, &req, bucket_id, &key).await
|
handle_copy(garage, &api_key, &req, bucket_id, &key).await
|
||||||
|
@ -212,7 +212,10 @@ impl ApiHandler for S3ApiServer {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
Endpoint::PutObject { key } => {
|
Endpoint::PutObject { key } => {
|
||||||
handle_put(garage, req, &bucket, &key, content_sha256).await
|
let hook_awaiter = call_hook(garage.clone(), create_put_object_hook(bucket_name, &key, api_key.key_id));
|
||||||
|
let put_awaiter = handle_put(garage, req, &bucket, &key, content_sha256);
|
||||||
|
let (put_result, _hook_result) = futures::join!(put_awaiter, hook_awaiter);
|
||||||
|
put_result
|
||||||
}
|
}
|
||||||
Endpoint::AbortMultipartUpload { key, upload_id } => {
|
Endpoint::AbortMultipartUpload { key, upload_id } => {
|
||||||
handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
|
handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
|
||||||
|
|
43
src/api/webhooks.rs
Normal file
43
src/api/webhooks.rs
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use garage_model::garage::Garage;
|
||||||
|
use serde::{Serialize};
|
||||||
|
use hyper::{Body, Client, Method, Request};
|
||||||
|
use hyper_tls::HttpsConnector;
|
||||||
|
|
||||||
|
use crate::s3::error::*;
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, PartialEq)]
|
||||||
|
pub struct ObjectHook {
|
||||||
|
pub hook_type: String,
|
||||||
|
pub bucket: String,
|
||||||
|
pub object: String,
|
||||||
|
pub via: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create_put_object_hook(bucket: String, obj: &String, via: String) -> ObjectHook {
|
||||||
|
return ObjectHook { hook_type: "PutObject".to_string(), bucket: bucket, object: obj.to_string(), via: via }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn call_hook<T: Serialize>(garage: Arc<Garage>, hook: T) -> Result<(), Error> {
|
||||||
|
if let Some(uri) = garage.config.webhook_uri.as_ref() {
|
||||||
|
let client = Client::builder().build(HttpsConnector::new());
|
||||||
|
let b = serde_json::to_string(&hook).unwrap();
|
||||||
|
println!("Connecting to {}", uri);
|
||||||
|
let req = Request::builder()
|
||||||
|
.method(Method::POST)
|
||||||
|
.uri(uri)
|
||||||
|
.header("Content-Type", "application/json")
|
||||||
|
.body(Body::from(b))?;
|
||||||
|
|
||||||
|
// even if there is an error with the webhook, do not cause an error
|
||||||
|
if let Err(result) = client.request(req).await {
|
||||||
|
println!("Error processing webhook to {}: {}", uri, result);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
|
} else {
|
||||||
|
return Ok(())
|
||||||
|
}
|
||||||
|
}
|
|
@ -84,6 +84,9 @@ pub struct Config {
|
||||||
#[cfg(feature = "k2v")]
|
#[cfg(feature = "k2v")]
|
||||||
pub k2v_api: Option<K2VApiConfig>,
|
pub k2v_api: Option<K2VApiConfig>,
|
||||||
|
|
||||||
|
/// Configuration for webhooks
|
||||||
|
pub webhook_uri: Option<String>,
|
||||||
|
|
||||||
/// Configuration for serving files as normal web server
|
/// Configuration for serving files as normal web server
|
||||||
pub s3_web: WebConfig,
|
pub s3_web: WebConfig,
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue