WIP: Compile on windows #891

Draft
michael wants to merge 3 commits from michael/garage:mzhang/windows-build into main
11 changed files with 107 additions and 22 deletions

1
.gitignore vendored
View file

@ -4,3 +4,4 @@
**/*.rs.bk **/*.rs.bk
*.swp *.swp
/.direnv /.direnv
Packet.lib

1
Cargo.lock generated
View file

@ -1544,6 +1544,7 @@ dependencies = [
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tracing", "tracing",
"winapi",
] ]
[[package]] [[package]]

View file

@ -1,6 +1,5 @@
use std::convert::Infallible; use std::convert::Infallible;
use std::fs::{self, Permissions}; use std::fs::{self, Permissions};
use std::os::unix::fs::PermissionsExt;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -18,7 +17,7 @@ use hyper::{HeaderMap, StatusCode};
use hyper_util::rt::TokioIo; use hyper_util::rt::TokioIo;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::sync::watch; use tokio::sync::watch;
use tokio::time::{sleep_until, Instant}; use tokio::time::{sleep_until, Instant};
@ -36,6 +35,9 @@ use garage_util::socket_address::UnixOrTCPSocketAddress;
use crate::helpers::{BoxBody, ErrorBody}; use crate::helpers::{BoxBody, ErrorBody};
#[cfg(unix)]
use tokio::net::{UnixListener, UnixStream};
pub(crate) trait ApiEndpoint: Send + Sync + 'static { pub(crate) trait ApiEndpoint: Send + Sync + 'static {
fn name(&self) -> &'static str; fn name(&self) -> &'static str;
fn add_span_attributes(&self, span: SpanRef<'_>); fn add_span_attributes(&self, span: SpanRef<'_>);
@ -119,7 +121,9 @@ impl<A: ApiHandler> ApiServer<A> {
let handler = move |request, socketaddr| self.clone().handler(request, socketaddr); let handler = move |request, socketaddr| self.clone().handler(request, socketaddr);
server_loop(server_name, listener, handler, must_exit).await server_loop(server_name, listener, handler, must_exit).await
} }
#[cfg(unix)]
UnixOrTCPSocketAddress::UnixSocket(ref path) => { UnixOrTCPSocketAddress::UnixSocket(ref path) => {
use std::os::unix::fs::PermissionsExt;
if path.exists() { if path.exists() {
fs::remove_file(path)? fs::remove_file(path)?
} }
@ -264,8 +268,10 @@ impl Accept for TcpListener {
} }
} }
#[cfg(unix)]
pub struct UnixListenerOn(pub UnixListener, pub String); pub struct UnixListenerOn(pub UnixListener, pub String);
#[cfg(unix)]
#[async_trait] #[async_trait]
impl Accept for UnixListenerOn { impl Accept for UnixListenerOn {
type Stream = UnixStream; type Stream = UnixStream;

View file

@ -279,7 +279,8 @@ impl DataLayout {
u16::from_be_bytes([ u16::from_be_bytes([
hash.as_slice()[HASH_DRIVE_BYTES.0], hash.as_slice()[HASH_DRIVE_BYTES.0],
hash.as_slice()[HASH_DRIVE_BYTES.1], hash.as_slice()[HASH_DRIVE_BYTES.1],
]) as usize % DRIVE_NPART ]) as usize
% DRIVE_NPART
} }
fn block_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf { fn block_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf {

View file

@ -792,11 +792,16 @@ impl BlockManagerLocked {
// Now, we do an fsync on the containing directory, to ensure that the rename // Now, we do an fsync on the containing directory, to ensure that the rename
// is persisted properly. See: // is persisted properly. See:
// http://thedjbway.b0llix.net/qmail/syncdir.html // http://thedjbway.b0llix.net/qmail/syncdir.html
let dir = fs::OpenOptions::new() let mut dir = fs::OpenOptions::new();
.read(true) dir.read(true);
.mode(0)
.open(directory) // TODO: Windows open options?
.await?; #[cfg(unix)]
{
dir.mode(0);
}
let dir = dir.open(directory).await?;
dir.sync_all().await?; dir.sync_all().await?;
drop(dir); drop(dir);
} }

View file

@ -52,10 +52,14 @@ pub struct Secrets {
/// from config or CLI param or env variable or read from a file specified in config or CLI /// from config or CLI param or env variable or read from a file specified in config or CLI
/// param or env variable) /// param or env variable)
pub fn fill_secrets(mut config: Config, secrets: Secrets) -> Result<Config, Error> { pub fn fill_secrets(mut config: Config, secrets: Secrets) -> Result<Config, Error> {
#[cfg(unix)]
let allow_world_readable = secrets let allow_world_readable = secrets
.allow_world_readable_secrets .allow_world_readable_secrets
.unwrap_or(config.allow_world_readable_secrets); .unwrap_or(config.allow_world_readable_secrets);
#[cfg(not(unix))]
let allow_world_readable = config.allow_world_readable_secrets;
fill_secret( fill_secret(
&mut config.rpc_secret, &mut config.rpc_secret,
&config.rpc_secret_file, &config.rpc_secret_file,

View file

@ -279,7 +279,8 @@ impl<'a> LockedHelper<'a> {
.local_aliases .local_aliases
.get(alias_name) .get(alias_name)
.cloned() .cloned()
.flatten() != Some(bucket_id) .flatten()
!= Some(bucket_id)
{ {
return Err(GarageError::Message(format!( return Err(GarageError::Message(format!(
"Bucket {:?} does not have alias {} in namespace of key {}", "Bucket {:?} does not have alias {} in namespace of key {}",

View file

@ -51,6 +51,9 @@ tokio.workspace = true
tokio-stream.workspace = true tokio-stream.workspace = true
opentelemetry.workspace = true opentelemetry.workspace = true
[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3.9", features = ["fileapi", "impl-default"] }
[features] [features]
kubernetes-discovery = ["kube", "k8s-openapi", "schemars"] kubernetes-discovery = ["kube", "k8s-openapi", "schemars"]
consul-discovery = ["reqwest", "err-derive"] consul-discovery = ["reqwest", "err-derive"]

View file

@ -218,6 +218,7 @@ pub fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> {
info!("Generating new node key pair."); info!("Generating new node key pair.");
let (pubkey, key) = ed25519::gen_keypair(); let (pubkey, key) = ed25519::gen_keypair();
#[cfg(unix)]
{ {
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
let mut f = std::fs::File::create(key_file.as_path())?; let mut f = std::fs::File::create(key_file.as_path())?;
@ -227,6 +228,17 @@ pub fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> {
f.write_all(&key[..])?; f.write_all(&key[..])?;
} }
#[cfg(windows)]
{
// use std::os::windows::io::AsHandle;
let mut f = std::fs::File::create(key_file.as_path())?;
// TODO: Set ACL here?
let mut perm = f.metadata()?.permissions();
perm.set_readonly(true);
std::fs::set_permissions(key_file.as_path(), perm)?;
f.write_all(&key[..])?;
}
{ {
let mut pubkey_file = metadata_dir.to_path_buf(); let mut pubkey_file = metadata_dir.to_path_buf();
pubkey_file.push("node_key.pub"); pubkey_file.push("node_key.pub");
@ -805,6 +817,48 @@ impl NodeStatus {
} }
} }
#[cfg(windows)]
fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &DataDirEnum) {
use winapi::um::fileapi::GetDiskFreeSpaceExA;
use winapi::um::winnt::ULARGE_INTEGER;
let mount_avail = |path: &Path| -> Option<(u64, u64)> {
let mut path = path.to_path_buf();
path.push(""); // Ensure trailing slash
let mut a: ULARGE_INTEGER = Default::default();
let mut total: ULARGE_INTEGER = Default::default();
let mut free: ULARGE_INTEGER = Default::default();
let path_ptr = path.as_os_str().as_encoded_bytes().as_ptr();
let result = unsafe {
GetDiskFreeSpaceExA(path_ptr as *const i8, &mut a, &mut total, &mut free)
};
if result == 0 {
return None;
}
let free = unsafe { *free.QuadPart() };
let total = unsafe { *total.QuadPart() };
Some((free, total))
};
self.meta_disk_avail = mount_avail(meta_dir);
self.data_disk_avail = match data_dir {
DataDirEnum::Single(path_buf) => mount_avail(path_buf),
// TODO: THIS IS WRONG!! Does not take into account multiple dirs on the same partition
// Will have to deduplicate by the filesystem mount path
DataDirEnum::Multiple(dirs) => dirs
.into_iter()
.filter_map(|dir| mount_avail(&dir.path))
.reduce(|(a1, b1), (a2, b2)| (a1 + a2, b1 + b2)),
};
}
#[cfg(unix)]
fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &DataDirEnum) { fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &DataDirEnum) {
use nix::sys::statvfs::statvfs; use nix::sys::statvfs::statvfs;
let mount_avail = |path: &Path| match statvfs(path) { let mount_avail = |path: &Path| match statvfs(path) {

View file

@ -9,6 +9,7 @@ use serde::{Deserialize, Deserializer};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum UnixOrTCPSocketAddress { pub enum UnixOrTCPSocketAddress {
TCPSocket(SocketAddr), TCPSocket(SocketAddr),
#[cfg(unix)]
UnixSocket(PathBuf), UnixSocket(PathBuf),
} }
@ -16,6 +17,7 @@ impl Display for UnixOrTCPSocketAddress {
fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result { fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result {
match self { match self {
UnixOrTCPSocketAddress::TCPSocket(address) => write!(formatter, "http://{}", address), UnixOrTCPSocketAddress::TCPSocket(address) => write!(formatter, "http://{}", address),
#[cfg(unix)]
UnixOrTCPSocketAddress::UnixSocket(path) => { UnixOrTCPSocketAddress::UnixSocket(path) => {
write!(formatter, "http+unix://{}", path.to_string_lossy()) write!(formatter, "http+unix://{}", path.to_string_lossy())
} }
@ -31,14 +33,15 @@ impl<'de> Deserialize<'de> for UnixOrTCPSocketAddress {
let string = String::deserialize(deserializer)?; let string = String::deserialize(deserializer)?;
let string = string.as_str(); let string = string.as_str();
#[cfg(unix)]
if string.starts_with("/") { if string.starts_with("/") {
Ok(UnixOrTCPSocketAddress::UnixSocket( return Ok(UnixOrTCPSocketAddress::UnixSocket(
PathBuf::from_str(string).map_err(Error::custom)?, PathBuf::from_str(string).map_err(Error::custom)?,
)) ));
} else { }
Ok(UnixOrTCPSocketAddress::TCPSocket( Ok(UnixOrTCPSocketAddress::TCPSocket(
SocketAddr::from_str(string).map_err(Error::custom)?, SocketAddr::from_str(string).map_err(Error::custom)?,
)) ))
} }
} }
}

View file

@ -1,10 +1,12 @@
use std::fs::{self, Permissions}; use std::fs::{self, Permissions};
use std::os::unix::prelude::PermissionsExt;
use std::{convert::Infallible, sync::Arc}; use std::{convert::Infallible, sync::Arc};
use tokio::net::{TcpListener, UnixListener}; use tokio::net::TcpListener;
use tokio::sync::watch; use tokio::sync::watch;
#[cfg(unix)]
use tokio::net::UnixListener;
use hyper::{ use hyper::{
body::Incoming as IncomingBody, body::Incoming as IncomingBody,
header::{HeaderValue, HOST}, header::{HeaderValue, HOST},
@ -20,7 +22,7 @@ use opentelemetry::{
use crate::error::*; use crate::error::*;
use garage_api::generic_server::{server_loop, UnixListenerOn}; use garage_api::generic_server::server_loop;
use garage_api::helpers::*; use garage_api::helpers::*;
use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket}; use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket};
use garage_api::s3::error::{ use garage_api::s3::error::{
@ -96,7 +98,11 @@ impl WebServer {
move |stream, socketaddr| self.clone().handle_request(stream, socketaddr); move |stream, socketaddr| self.clone().handle_request(stream, socketaddr);
server_loop(server_name, listener, handler, must_exit).await server_loop(server_name, listener, handler, must_exit).await
} }
#[cfg(unix)]
UnixOrTCPSocketAddress::UnixSocket(ref path) => { UnixOrTCPSocketAddress::UnixSocket(ref path) => {
use garage_api::generic_server::UnixListenerOn;
use std::os::unix::fs::PermissionsExt;
if path.exists() { if path.exists() {
fs::remove_file(path)? fs::remove_file(path)?
} }