GET implementation

This commit is contained in:
Quentin 2024-04-23 15:20:29 +02:00
parent 4594e068db
commit 50ce8621c2
Signed by: quentin
GPG key ID: E9602264D639FF68
5 changed files with 76 additions and 27 deletions

View file

@ -6,7 +6,7 @@ use futures::stream::StreamExt;
use futures::stream::TryStreamExt; use futures::stream::TryStreamExt;
use http_body_util::BodyStream; use http_body_util::BodyStream;
use http_body_util::StreamBody; use http_body_util::StreamBody;
use http_body_util::combinators::BoxBody; use http_body_util::combinators::UnsyncBoxBody;
use hyper::body::Frame; use hyper::body::Frame;
use tokio_util::sync::PollSender; use tokio_util::sync::PollSender;
use std::io::{Error, ErrorKind}; use std::io::{Error, ErrorKind};
@ -16,6 +16,7 @@ use http_body_util::BodyExt;
use aero_dav::types as dav; use aero_dav::types as dav;
use aero_dav::xml as dxml; use aero_dav::xml as dxml;
use super::controller::HttpResponse;
pub(crate) fn depth(req: &Request<impl hyper::body::Body>) -> dav::Depth { pub(crate) fn depth(req: &Request<impl hyper::body::Body>) -> dav::Depth {
match req.headers().get("Depth").map(hyper::header::HeaderValue::to_str) { match req.headers().get("Depth").map(hyper::header::HeaderValue::to_str) {
@ -26,11 +27,11 @@ pub(crate) fn depth(req: &Request<impl hyper::body::Body>) -> dav::Depth {
} }
} }
pub(crate) fn text_body(txt: &'static str) -> BoxBody<Bytes, std::io::Error> { pub(crate) fn text_body(txt: &'static str) -> UnsyncBoxBody<Bytes, std::io::Error> {
BoxBody::new(Full::new(Bytes::from(txt)).map_err(|e| match e {})) UnsyncBoxBody::new(Full::new(Bytes::from(txt)).map_err(|e| match e {}))
} }
pub(crate) fn serialize<T: dxml::QWrite + Send + 'static>(status_ok: hyper::StatusCode, elem: T) -> Result<Response<BoxBody<Bytes, std::io::Error>>> { pub(crate) fn serialize<T: dxml::QWrite + Send + 'static>(status_ok: hyper::StatusCode, elem: T) -> Result<HttpResponse> {
let (tx, rx) = tokio::sync::mpsc::channel::<Bytes>(1); let (tx, rx) = tokio::sync::mpsc::channel::<Bytes>(1);
// Build the writer // Build the writer
@ -55,7 +56,7 @@ pub(crate) fn serialize<T: dxml::QWrite + Send + 'static>(status_ok: hyper::Stat
// Build the reader // Build the reader
let recv = tokio_stream::wrappers::ReceiverStream::new(rx); let recv = tokio_stream::wrappers::ReceiverStream::new(rx);
let stream = StreamBody::new(recv.map(|v| Ok(Frame::data(v)))); let stream = StreamBody::new(recv.map(|v| Ok(Frame::data(v))));
let boxed_body = BoxBody::new(stream); let boxed_body = UnsyncBoxBody::new(stream);
let response = Response::builder() let response = Response::builder()
.status(status_ok) .status(status_ok)

View file

@ -1,8 +1,10 @@
use anyhow::Result; use anyhow::Result;
use http_body_util::combinators::BoxBody; use http_body_util::combinators::{UnsyncBoxBody, BoxBody};
use hyper::body::Incoming; use hyper::body::Incoming;
use hyper::{Request, Response, body::Bytes}; use hyper::{Request, Response, body::Bytes};
use http_body_util::BodyStream; use http_body_util::BodyStream;
use http_body_util::StreamBody;
use hyper::body::Frame;
use futures::stream::{StreamExt, TryStreamExt}; use futures::stream::{StreamExt, TryStreamExt};
use aero_collections::user::User; use aero_collections::user::User;
@ -15,7 +17,8 @@ use crate::dav::node::{DavNode, PutPolicy};
use crate::dav::resource::RootNode; use crate::dav::resource::RootNode;
use crate::dav::codec; use crate::dav::codec;
type ArcUser = std::sync::Arc<User>; pub(super) type ArcUser = std::sync::Arc<User>;
pub(super) type HttpResponse = Response<UnsyncBoxBody<Bytes, std::io::Error>>;
const ALLPROP: [dav::PropertyRequest<All>; 10] = [ const ALLPROP: [dav::PropertyRequest<All>; 10] = [
dav::PropertyRequest::CreationDate, dav::PropertyRequest::CreationDate,
@ -36,7 +39,7 @@ pub(crate) struct Controller {
req: Request<Incoming>, req: Request<Incoming>,
} }
impl Controller { impl Controller {
pub(crate) async fn route(user: std::sync::Arc<User>, req: Request<Incoming>) -> Result<Response<BoxBody<Bytes, std::io::Error>>> { pub(crate) async fn route(user: std::sync::Arc<User>, req: Request<Incoming>) -> Result<HttpResponse> {
let path = req.uri().path().to_string(); let path = req.uri().path().to_string();
let path_segments: Vec<_> = path.split("/").filter(|s| *s != "").collect(); let path_segments: Vec<_> = path.split("/").filter(|s| *s != "").collect();
let method = req.method().as_str().to_uppercase(); let method = req.method().as_str().to_uppercase();
@ -60,12 +63,13 @@ impl Controller {
.header("DAV", "1") .header("DAV", "1")
.header("Allow", "HEAD,GET,PUT,OPTIONS,DELETE,PROPFIND,PROPPATCH,MKCOL,COPY,MOVE,LOCK,UNLOCK,MKCALENDAR,REPORT") .header("Allow", "HEAD,GET,PUT,OPTIONS,DELETE,PROPFIND,PROPPATCH,MKCOL,COPY,MOVE,LOCK,UNLOCK,MKCALENDAR,REPORT")
.body(codec::text_body(""))?), .body(codec::text_body(""))?),
"HEAD" | "GET" => { "HEAD" => {
tracing::warn!("HEAD+GET not correctly implemented"); tracing::warn!("HEAD not correctly implemented");
Ok(Response::builder() Ok(Response::builder()
.status(404) .status(404)
.body(codec::text_body(""))?) .body(codec::text_body(""))?)
}, },
"GET" => ctrl.get().await,
"PUT" => ctrl.put().await, "PUT" => ctrl.put().await,
"DELETE" => { "DELETE" => {
todo!(); todo!();
@ -87,7 +91,7 @@ impl Controller {
/// Note: current implementation is not generic at all, it is heavily tied to CalDAV. /// Note: current implementation is not generic at all, it is heavily tied to CalDAV.
/// A rewrite would be required to make it more generic (with the extension system that has /// A rewrite would be required to make it more generic (with the extension system that has
/// been introduced in aero-dav) /// been introduced in aero-dav)
async fn report(self) -> Result<Response<BoxBody<Bytes, std::io::Error>>> { async fn report(self) -> Result<HttpResponse> {
let status = hyper::StatusCode::from_u16(207)?; let status = hyper::StatusCode::from_u16(207)?;
let report = match deserialize::<cal::Report<All>>(self.req).await { let report = match deserialize::<cal::Report<All>>(self.req).await {
@ -135,7 +139,7 @@ impl Controller {
} }
/// PROPFIND is the standard way to fetch WebDAV properties /// PROPFIND is the standard way to fetch WebDAV properties
async fn propfind(self) -> Result<Response<BoxBody<Bytes, std::io::Error>>> { async fn propfind(self) -> Result<HttpResponse> {
let depth = depth(&self.req); let depth = depth(&self.req);
if matches!(depth, dav::Depth::Infinity) { if matches!(depth, dav::Depth::Infinity) {
return Ok(Response::builder() return Ok(Response::builder()
@ -175,7 +179,7 @@ impl Controller {
serialize(status, Self::multistatus(&self.user, nodes, not_found, propname)) serialize(status, Self::multistatus(&self.user, nodes, not_found, propname))
} }
async fn put(self) -> Result<Response<BoxBody<Bytes, std::io::Error>>> { async fn put(self) -> Result<HttpResponse> {
//@FIXME temporary, look at If-None-Match & If-Match headers //@FIXME temporary, look at If-None-Match & If-Match headers
let put_policy = PutPolicy::CreateOnly; let put_policy = PutPolicy::CreateOnly;
@ -199,20 +203,19 @@ impl Controller {
Ok(response) Ok(response)
} }
async fn get(self) -> Result<Response<BoxBody<Bytes, std::io::Error>>> { async fn get(self) -> Result<HttpResponse> {
todo!() let stream_body = StreamBody::new(self.node.content().await.map_ok(|v| Frame::data(v)));
/*let stream = StreamBody::new(self.node.get().map(|v| Ok(Frame::data(v)))); let boxed_body = UnsyncBoxBody::new(stream_body);
let boxed_body = BoxBody::new(stream);
let response = Response::builder() let response = Response::builder()
.status(200) .status(200)
//.header("content-type", "application/xml; charset=\"utf-8\"") //.header("content-type", "application/xml; charset=\"utf-8\"")
.body(boxed_body)?; .body(boxed_body)?;
Ok(response)*/ Ok(response)
} }
// --- Common utulity functions --- // --- Common utility functions ---
/// Build a multistatus response from a list of DavNodes /// Build a multistatus response from a list of DavNodes
fn multistatus(user: &ArcUser, nodes: Vec<Box<dyn DavNode>>, not_found: Vec<dav::Href>, props: Option<dav::PropName<All>>) -> dav::Multistatus<All> { fn multistatus(user: &ArcUser, nodes: Vec<Box<dyn DavNode>>, not_found: Vec<dav::Href>, props: Option<dav::PropName<All>>) -> dav::Multistatus<All> {
// Collect properties on existing objects // Collect properties on existing objects

View file

@ -1,21 +1,21 @@
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use base64::Engine; use base64::Engine;
use hyper::{Request, Response, body::Bytes}; use hyper::{Request, Response};
use hyper::body::Incoming; use hyper::body::Incoming;
use http_body_util::combinators::BoxBody;
use aero_user::login::ArcLoginProvider; use aero_user::login::ArcLoginProvider;
use aero_collections::user::User; use aero_collections::user::User;
use super::codec::text_body; use super::codec::text_body;
use super::controller::HttpResponse;
type ArcUser = std::sync::Arc<User>; type ArcUser = std::sync::Arc<User>;
pub(super) async fn auth<'a>( pub(super) async fn auth<'a>(
login: ArcLoginProvider, login: ArcLoginProvider,
req: Request<Incoming>, req: Request<Incoming>,
next: impl Fn(ArcUser, Request<Incoming>) -> futures::future::BoxFuture<'a, Result<Response<BoxBody<Bytes, std::io::Error>>>>, next: impl Fn(ArcUser, Request<Incoming>) -> futures::future::BoxFuture<'a, Result<HttpResponse>>,
) -> Result<Response<BoxBody<Bytes, std::io::Error>>> { ) -> Result<HttpResponse> {
let auth_val = match req.headers().get(hyper::header::AUTHORIZATION) { let auth_val = match req.headers().get(hyper::header::AUTHORIZATION) {
Some(hv) => hv.to_str()?, Some(hv) => hv.to_str()?,
None => { None => {

View file

@ -5,9 +5,10 @@ use hyper::body::Bytes;
use aero_dav::types as dav; use aero_dav::types as dav;
use aero_dav::realization::All; use aero_dav::realization::All;
use aero_collections::{user::User, davdag::Etag}; use aero_collections::davdag::Etag;
use super::controller::ArcUser;
type ArcUser = std::sync::Arc<User>;
pub(crate) type Content<'a> = BoxStream<'a, std::result::Result<Bytes, std::io::Error>>; pub(crate) type Content<'a> = BoxStream<'a, std::result::Result<Bytes, std::io::Error>>;
pub(crate) enum PutPolicy { pub(crate) enum PutPolicy {
@ -34,7 +35,7 @@ pub(crate) trait DavNode: Send {
/// Put an element (create or update) /// Put an element (create or update)
fn put<'a>(&'a self, policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>>; fn put<'a>(&'a self, policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>>;
/// Get content /// Get content
//fn content(&self) -> TryStream; fn content<'a>(&'a self) -> BoxFuture<'a, Content<'static>>;
//@FIXME maybe add etag, maybe add a way to set content //@FIXME maybe add etag, maybe add a way to set content

View file

@ -64,6 +64,12 @@ impl DavNode for RootNode {
fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> { fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> {
todo!() todo!()
} }
fn content<'a>(&'a self) -> BoxFuture<'a, Content<'static>> {
async {
futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed()
}.boxed()
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -127,6 +133,12 @@ impl DavNode for HomeNode {
fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> { fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> {
todo!() todo!()
} }
fn content<'a>(&'a self) -> BoxFuture<'a, Content<'static>> {
async {
futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed()
}.boxed()
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -200,6 +212,12 @@ impl DavNode for CalendarListNode {
fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> { fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> {
todo!() todo!()
} }
fn content<'a>(&'a self) -> BoxFuture<'a, Content<'static>> {
async {
futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed()
}.boxed()
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -290,6 +308,12 @@ impl DavNode for CalendarNode {
fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> { fn put<'a>(&'a self, _policy: PutPolicy, stream: Content<'a>) -> BoxFuture<'a, Result<Etag>> {
todo!() todo!()
} }
fn content<'a>(&'a self) -> BoxFuture<'a, Content<'static>> {
async {
futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed()
}.boxed()
}
} }
const FAKE_ICS: &str = r#"BEGIN:VCALENDAR const FAKE_ICS: &str = r#"BEGIN:VCALENDAR
@ -386,7 +410,8 @@ impl DavNode for EventNode {
_ => () _ => ()
}; };
//@FIXME for now, our storage interface does not allow for streaming //@FIXME for now, our storage interface does not allow streaming,
// so we load everything in memory
let mut evt = Vec::new(); let mut evt = Vec::new();
let mut reader = stream.into_async_read(); let mut reader = stream.into_async_read();
reader.read_to_end(&mut evt).await.unwrap(); reader.read_to_end(&mut evt).await.unwrap();
@ -394,6 +419,19 @@ impl DavNode for EventNode {
Ok(entry.2) Ok(entry.2)
}.boxed() }.boxed()
} }
fn content<'a>(&'a self) -> BoxFuture<'a, Content<'static>> {
async {
//@FIXME for now, our storage interface does not allow streaming,
// so we load everything in memory
let content = self.col.get(self.blob_id).await.or(Err(std::io::Error::from(std::io::ErrorKind::Interrupted)));
let r = async {
Ok(hyper::body::Bytes::from(content?))
};
//tokio::pin!(r);
futures::stream::once(Box::pin(r)).boxed()
}.boxed()
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -440,4 +478,10 @@ impl DavNode for CreateEventNode {
Ok(entry.2) Ok(entry.2)
}.boxed() }.boxed()
} }
fn content<'a>(&'a self) -> BoxFuture<'a, Content<'static>> {
async {
futures::stream::once(futures::future::err(std::io::Error::from(std::io::ErrorKind::Unsupported))).boxed()
}.boxed()
}
} }