forked from Deuxfleurs/garage
Refactor
This commit is contained in:
parent
3477864142
commit
a50f07dfdc
3 changed files with 87 additions and 71 deletions
|
@ -1,12 +1,8 @@
|
||||||
use core::pin::Pin;
|
|
||||||
use core::task::{Context, Poll};
|
|
||||||
|
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use futures::future::Future;
|
use futures::future::Future;
|
||||||
use futures::ready;
|
|
||||||
use futures::stream::*;
|
use futures::stream::*;
|
||||||
use hyper::body::{Bytes, HttpBody};
|
use hyper::body::{Bytes, HttpBody};
|
||||||
use hyper::server::conn::AddrStream;
|
use hyper::server::conn::AddrStream;
|
||||||
|
@ -16,6 +12,7 @@ use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||||
use crate::data;
|
use crate::data;
|
||||||
use crate::data::*;
|
use crate::data::*;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
|
use crate::http_util::*;
|
||||||
use crate::proto::*;
|
use crate::proto::*;
|
||||||
use crate::rpc_client::*;
|
use crate::rpc_client::*;
|
||||||
use crate::server::Garage;
|
use crate::server::Garage;
|
||||||
|
@ -311,9 +308,9 @@ async fn handle_get(
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.buffered(2);
|
.buffered(2);
|
||||||
let body: BodyType = Box::new(NonSyncStreamBody {
|
let body: BodyType = Box::new(StreamBody::new(
|
||||||
stream: Box::pin(body_stream),
|
Box::pin(body_stream),
|
||||||
});
|
));
|
||||||
Ok(resp_builder.body(body)?)
|
Ok(resp_builder.body(body)?)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -344,67 +341,3 @@ async fn get_block(garage: Arc<Garage>, hash: &Hash) -> Result<Vec<u8>, Error> {
|
||||||
}
|
}
|
||||||
Err(Error::Message(format!("No valid blocks returned")))
|
Err(Error::Message(format!("No valid blocks returned")))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct NonSyncStreamBody {
|
|
||||||
pub stream: Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl HttpBody for NonSyncStreamBody {
|
|
||||||
type Data = Bytes;
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn poll_data(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context,
|
|
||||||
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
|
||||||
match ready!(self.stream.as_mut().poll_next(cx)) {
|
|
||||||
Some(res) => Poll::Ready(Some(res)),
|
|
||||||
None => Poll::Ready(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_trailers(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context,
|
|
||||||
) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> {
|
|
||||||
Poll::Ready(Ok(None))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct BytesBody {
|
|
||||||
pub bytes: Option<Bytes>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl HttpBody for BytesBody {
|
|
||||||
type Data = Bytes;
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn poll_data(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
_cx: &mut Context,
|
|
||||||
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
|
||||||
Poll::Ready(self.bytes.take().map(Ok))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_trailers(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
_cx: &mut Context,
|
|
||||||
) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> {
|
|
||||||
Poll::Ready(Ok(None))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<String> for BytesBody {
|
|
||||||
fn from(x: String) -> BytesBody {
|
|
||||||
BytesBody {
|
|
||||||
bytes: Some(Bytes::from(x.into_bytes())),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl From<Vec<u8>> for BytesBody {
|
|
||||||
fn from(x: Vec<u8>) -> BytesBody {
|
|
||||||
BytesBody {
|
|
||||||
bytes: Some(Bytes::from(x)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
82
src/http_util.rs
Normal file
82
src/http_util.rs
Normal file
|
@ -0,0 +1,82 @@
|
||||||
|
use core::pin::Pin;
|
||||||
|
use core::task::{Context, Poll};
|
||||||
|
|
||||||
|
use futures::ready;
|
||||||
|
use futures::stream::*;
|
||||||
|
use hyper::body::{Bytes, HttpBody};
|
||||||
|
|
||||||
|
use crate::error::Error;
|
||||||
|
|
||||||
|
type StreamType = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
|
||||||
|
|
||||||
|
pub struct StreamBody {
|
||||||
|
stream: StreamType,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StreamBody {
|
||||||
|
pub fn new(stream: StreamType) -> Self {
|
||||||
|
Self{stream}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HttpBody for StreamBody {
|
||||||
|
type Data = Bytes;
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn poll_data(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context,
|
||||||
|
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
||||||
|
match ready!(self.stream.as_mut().poll_next(cx)) {
|
||||||
|
Some(res) => Poll::Ready(Some(res)),
|
||||||
|
None => Poll::Ready(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_trailers(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
_cx: &mut Context,
|
||||||
|
) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> {
|
||||||
|
Poll::Ready(Ok(None))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct BytesBody {
|
||||||
|
bytes: Option<Bytes>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BytesBody {
|
||||||
|
pub fn new(bytes: Bytes) -> Self {
|
||||||
|
Self{bytes: Some(bytes)}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HttpBody for BytesBody {
|
||||||
|
type Data = Bytes;
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn poll_data(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
_cx: &mut Context,
|
||||||
|
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
||||||
|
Poll::Ready(self.bytes.take().map(Ok))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_trailers(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
_cx: &mut Context,
|
||||||
|
) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> {
|
||||||
|
Poll::Ready(Ok(None))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<String> for BytesBody {
|
||||||
|
fn from(x: String) -> BytesBody {
|
||||||
|
Self::new(Bytes::from(x))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl From<Vec<u8>> for BytesBody {
|
||||||
|
fn from(x: Vec<u8>) -> BytesBody {
|
||||||
|
Self::new(Bytes::from(x))
|
||||||
|
}
|
||||||
|
}
|
|
@ -10,6 +10,7 @@ mod object_table;
|
||||||
mod version_table;
|
mod version_table;
|
||||||
|
|
||||||
mod api_server;
|
mod api_server;
|
||||||
|
mod http_util;
|
||||||
mod rpc_client;
|
mod rpc_client;
|
||||||
mod rpc_server;
|
mod rpc_server;
|
||||||
mod server;
|
mod server;
|
||||||
|
|
Loading…
Reference in a new issue