It seems to work

This commit is contained in:
Quentin 2021-11-20 13:42:20 +01:00
parent 87fff9843d
commit e10f04c5e3
9 changed files with 151 additions and 83 deletions

View file

@ -14,6 +14,7 @@ type Config struct {
UserNameAttr string `env:"BAGAGE_LDAP_USERNAME_ATTR" default:"cn"`
Endpoint string `env:"BAGAGE_S3_ENDPOINT" default:"garage.deuxfleurs.fr"`
UseSSL bool `env:"BAGAGE_S3_SSL" default:"true"`
S3Cache string `env:"BAGAGE_S3_CACHE" default:"./s3_cache"`
SSHKey string `env:"BAGAGE_SSH_KEY" default:"id_rsa"`
}

View file

@ -13,16 +13,19 @@ import (
"log"
"net"
"net/http"
"os"
)
func main() {
log.Println("=== Starting Bagage ===")
config := (&Config{}).LoadWithDefault().LoadWithEnv()
log.Println(config)
done := make(chan error)
// Some init
os.MkdirAll(config.S3Cache, 0755)
// Launch our submodules
done := make(chan error)
go httpServer(config, done)
go sshServer(config, done)
@ -148,7 +151,7 @@ func handleSSHConn(nConn net.Conn, dconfig *Config, config *ssh.ServerConfig) {
return
}
fs := s3.NewS3FS(mc)
fs := s3.NewS3FS(mc, dconfig.S3Cache)
server, err := sftp.NewServer(ctx, channel, &fs)
if err != nil {

View file

@ -1,13 +1,13 @@
package s3
import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"log"
"mime"
"os"
"path"
"github.com/minio/minio-go/v7"
@ -17,9 +17,9 @@ type S3File struct {
fs *S3FS
obj *minio.Object
objw *io.PipeWriter
cache *os.File
donew chan error
pos int64
eof bool
entries []fs.FileInfo
Path S3Path
}
@ -49,6 +49,11 @@ func (f *S3File) Close() error {
f.objw = nil
}
if f.cache != nil {
err = append(err, f.writeFlush())
f.cache = nil
}
count := 0
for _, e := range err {
if e != nil {
@ -57,7 +62,7 @@ func (f *S3File) Close() error {
}
}
if count > 0 {
return errors.New(fmt.Sprintf("%d errors when closing this WebDAV File. Read previous logs to know more.", count))
return errors.New(fmt.Sprintf("%d errors when closing this S3 File. Read previous logs to know more.", count))
}
return nil
}
@ -74,10 +79,15 @@ func (f *S3File) loadObject() error {
}
func (f *S3File) Read(p []byte) (n int, err error) {
log.Printf("s3 Read\n")
//log.Printf("s3 Read\n")
//if f.Stat() & OBJECT == 0 { /* @FIXME Ideally we would check against OBJECT but we need a non OPAQUE_KEY */
// return 0, os.ErrInvalid
//}
if f.cache != nil {
return f.cache.Read(p)
}
if err := f.loadObject(); err != nil {
return 0, err
}
@ -86,6 +96,10 @@ func (f *S3File) Read(p []byte) (n int, err error) {
}
func (f *S3File) ReadAt(p []byte, off int64) (n int, err error) {
if f.cache != nil {
return f.cache.ReadAt(p, off)
}
stat, err := f.Stat()
if err != nil {
return 0, err
@ -93,7 +107,7 @@ func (f *S3File) ReadAt(p []byte, off int64) (n int, err error) {
return 0, io.EOF
}
log.Printf("s3 ReadAt %v\n", off)
//log.Printf("s3 ReadAt %v\n", off)
if err := f.loadObject(); err != nil {
return 0, err
}
@ -101,45 +115,101 @@ func (f *S3File) ReadAt(p []byte, off int64) (n int, err error) {
return f.obj.ReadAt(p, off)
}
func (f *S3File) WriteAt(p []byte, off int64) (n int, err error) {
return 0, errors.New("not implemented")
func (f *S3File) initCache() error {
// We use a locally cached file instead of writing directly to S3
// When the user calls close, the file is flushed on S3.
// Check writeFlush below.
if f.cache == nil {
// We create a temp file in the configured folder
// We do not use the default tmp file as files can be very large
// and could fillup the RAM (often /tmp is mounted in RAM)
tmp, err := os.CreateTemp(f.fs.local, "bagage-cache")
if err != nil {
return err
}
f.cache = tmp
// Problem: WriteAt override the existing file, if it exists
// So if when we stat the file, its size is greater than zero,
// we download it in our cache
file, err := f.Stat()
if err != nil {
return err
} else if file.Size() != 0 {
// We get a Reader on our object
object, err := f.fs.mc.GetObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, minio.GetObjectOptions{})
if err != nil {
return err
}
// We inject it in our cache file
if _, err = io.Copy(f.cache, object); err != nil {
return err
}
}
}
return nil
}
func (f *S3File) WriteAt(p []byte, off int64) (n int, err error) {
f.initCache()
// And now we simply apply the command on our cache
return f.cache.WriteAt(p, off)
}
func (f *S3File) Write(p []byte) (n int, err error) {
/*if f.path.class != OBJECT {
return 0, os.ErrInvalid
}*/
f.initCache()
if f.objw == nil {
if f.pos != 0 {
return 0, errors.New("writing with an offset is not implemented")
return f.cache.Write(p)
}
func (f *S3File) writeFlush() error {
// Only needed if we used a write cache
if f.cache == nil {
return nil
}
r, w := io.Pipe()
f.donew = make(chan error, 1)
f.objw = w
// Rewind the file to copy from the start
_, err := f.cache.Seek(0, 0)
if err != nil {
return err
}
// Get a FileInfo object as minio needs its size (ideally)
stat, err := f.cache.Stat()
if err != nil {
return err
}
// Send the file to minio
contentType := mime.TypeByExtension(path.Ext(f.Path.Key))
go func() {
/* @FIXME
PutObject has a strange behaviour when used with unknown size, it supposes the final size will be 5TiB.
Then it computes that, following the final size of the file, each part of the multipart upload should be 512MiB, which leads to big allocations.
The culprit is OptimalPartInfo: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/api-put-object-common.go#L70
We set this value to the minimum allowed one, 5MiB.
The minimum value is set here: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/constants.go#L24
Because Multipart uploads seems to be limited to 10 000 parts, it might be possible that we are limited to 50 GiB files, which is still good enough.
Ref: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/api-put-object-common.go#L110-L112
*/
_, err := f.fs.mc.PutObject(context.Background(), f.Path.Bucket, f.Path.Key, r, -1, minio.PutObjectOptions{ContentType: contentType, PartSize: 5*1024*1024})
f.donew <- err
}()
_, err = f.fs.mc.PutObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, f.cache, stat.Size(), minio.PutObjectOptions{
ContentType: contentType,
})
if err != nil {
return err
}
return f.objw.Write(p)
// Close the cache file and remove it
err = f.cache.Close()
if err != nil {
return err
}
err = os.Remove(f.cache.Name())
if err != nil {
return err
}
return nil
}
func (f *S3File) Seek(offset int64, whence int) (int64, error) {
if f.cache != nil {
return f.cache.Seek(offset, whence)
}
if err := f.loadObject(); err != nil {
return 0, err
}
@ -192,11 +262,11 @@ func (f *S3File) readDirRoot(count int) ([]fs.FileInfo, error) {
beg := f.pos
end := int64(len(f.entries))
if count > 0 {
end = min(beg + int64(count), end)
end = min(beg+int64(count), end)
}
f.pos = end
if end - beg == 0 {
if end-beg == 0 {
err = io.EOF
}
@ -232,11 +302,11 @@ func (f *S3File) readDirChild(count int) ([]fs.FileInfo, error) {
beg := f.pos
end := int64(len(f.entries))
if count > 0 {
end = min(beg + int64(count), end)
end = min(beg+int64(count), end)
}
f.pos = end
if end - beg == 0 {
if end-beg == 0 {
err = io.EOF
}

View file

@ -22,13 +22,15 @@ import (
type S3FS struct {
cache map[string]*S3Stat
mc *minio.Client
local string
ctx context.Context
}
func NewS3FS(mc *minio.Client) S3FS {
func NewS3FS(mc *minio.Client, local string) S3FS {
return S3FS{
cache: make(map[string]*S3Stat),
mc: mc,
local: local,
}
}

View file

@ -1,7 +1,6 @@
package s3
import (
"log"
"errors"
"io/fs"
"path"
@ -105,7 +104,6 @@ func (s *S3Stat) Name() string {
}
func (s *S3Stat) Size() int64 {
log.Println("stat size: ", s.obj.Size)
return s.obj.Size
}

View file

@ -2,7 +2,7 @@ package sftp
/*
Imported from: https://github.com/pkg/sftp
*/
*/
import (
"sync"
@ -98,4 +98,3 @@ func (a *allocator) isRequestOrderIDUsed(requestOrderID uint32) bool {
_, ok := a.used[requestOrderID]
return ok
}

View file

@ -2,7 +2,7 @@ package sftp
/*
Imported from: https://github.com/pkg/sftp
*/
*/
import (
"encoding"
@ -218,4 +218,3 @@ func (s *packetManager) maybeSendPackets() {
// }
// return res
// }

View file

@ -4,12 +4,12 @@ package sftp
import (
"context"
"log"
"encoding"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"strconv"
"sync"
@ -21,7 +21,7 @@ import (
const (
// SftpServerWorkerCount defines the number of workers for the SFTP server
SftpServerWorkerCount = 8
SftpServerWorkerCount = 1
)
// Server is an SSH File Transfer Protocol (sftp) server.
@ -194,7 +194,7 @@ func handlePacket(s *Server, p orderedRequest) error {
case *sshFxpLstatPacket:
log.Println("pkt: lstat: ", p.Path)
// stat the requested file
info, err := os.Lstat(toLocalPath(p.Path))
info, err := s.fs.Stat(s.ctx, p.Path)
rpkt = &sshFxpStatResponse{
ID: p.ID,
info: info,
@ -219,43 +219,39 @@ func handlePacket(s *Server, p orderedRequest) error {
}
case *sshFxpMkdirPacket:
log.Println("pkt: mkdir: ", p.Path)
err := os.Mkdir(toLocalPath(p.Path), 0755)
err := s.fs.Mkdir(s.ctx, p.Path, 0755)
rpkt = statusFromError(p.ID, err)
case *sshFxpRmdirPacket:
log.Println("pkt: rmdir: ", p.Path)
err := os.Remove(toLocalPath(p.Path))
err := s.fs.RemoveAll(s.ctx, p.Path)
rpkt = statusFromError(p.ID, err)
case *sshFxpRemovePacket:
log.Println("pkt: rm: ", p.Filename)
err := os.Remove(toLocalPath(p.Filename))
err := s.fs.RemoveAll(s.ctx, p.Filename)
rpkt = statusFromError(p.ID, err)
case *sshFxpRenamePacket:
log.Println("pkt: rename: ", p.Oldpath, ", ", p.Newpath)
err := os.Rename(toLocalPath(p.Oldpath), toLocalPath(p.Newpath))
err := s.fs.Rename(s.ctx, p.Oldpath, p.Newpath)
rpkt = statusFromError(p.ID, err)
case *sshFxpSymlinkPacket:
log.Println("pkt: ln -s: ", p.Targetpath, ", ", p.Linkpath)
err := os.Symlink(toLocalPath(p.Targetpath), toLocalPath(p.Linkpath))
err := s.fs.Rename(s.ctx, p.Targetpath, p.Linkpath)
rpkt = statusFromError(p.ID, err)
case *sshFxpClosePacket:
log.Println("pkt: close handle: ", p.Handle)
rpkt = statusFromError(p.ID, s.closeHandle(p.Handle))
case *sshFxpReadlinkPacket:
log.Println("pkt: read: ", p.Path)
f, err := os.Readlink(toLocalPath(p.Path))
log.Println("pkt: readlink: ", p.Path)
rpkt = &sshFxpNamePacket{
ID: p.ID,
NameAttrs: []*sshFxpNameAttr{
{
Name: f,
LongName: f,
Name: p.Path,
LongName: p.Path,
Attrs: emptyFileStat,
},
},
}
if err != nil {
rpkt = statusFromError(p.ID, err)
}
case *sshFxpRealpathPacket:
log.Println("pkt: absolute path: ", p.Path)
f := s3.NewS3Path(p.Path).Path
@ -288,7 +284,7 @@ func handlePacket(s *Server, p orderedRequest) error {
case *sshFxpReadPacket:
var err error = EBADF
f, ok := s.getHandle(p.Handle)
log.Println("pkt: read handle: ", p.Handle, f.Path.Path)
//log.Println("pkt: read handle: ", p.Handle, f.Path.Path)
if ok {
err = nil
data := p.getDataSlice(s.pktMgr.alloc, orderID)
@ -309,7 +305,7 @@ func handlePacket(s *Server, p orderedRequest) error {
}
case *sshFxpWritePacket:
log.Println("pkt: write handle: ", p.Handle, ", Offset: ", p.Offset)
//log.Println("pkt: write handle: ", p.Handle, ", Offset: ", p.Offset)
f, ok := s.getHandle(p.Handle)
var err error = EBADF
if ok {
@ -324,7 +320,7 @@ func handlePacket(s *Server, p orderedRequest) error {
rpkt = p.respond(s)
}
case serverRespondablePacket:
log.Println("pkt: respondable")
//log.Println("pkt: respondable")
rpkt = p.respond(s)
default:
return fmt.Errorf("unexpected packet type %T", p)
@ -477,7 +473,7 @@ func (p *sshFxpOpenPacket) respond(svr *Server) responsePacket {
}
func (p *sshFxpReaddirPacket) respond(svr *Server) responsePacket {
log.Println("pkt: readdir: ", p.Handle)
//log.Println("pkt: readdir: ", p.Handle)
f, ok := svr.getHandle(p.Handle)
if !ok {
return statusFromError(p.ID, EBADF)

View file

@ -16,7 +16,7 @@ func (wd WebDav) WithMC(mc *minio.Client) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
(&webdav.Handler{
Prefix: wd.WithConfig.DavPath,
FileSystem: s3.NewS3FS(mc),
FileSystem: s3.NewS3FS(mc, wd.WithConfig.S3Cache),
LockSystem: webdav.NewMemLS(),
Logger: func(r *http.Request, err error) {
log.Printf("INFO: %s %s %s\n", r.RemoteAddr, r.Method, r.URL)