From e10f04c5e36109c2e58d667c4b6ec054cbdd51be Mon Sep 17 00:00:00 2001 From: Quentin Date: Sat, 20 Nov 2021 13:42:20 +0100 Subject: [PATCH] It seems to work --- config.go | 1 + main.go | 9 ++- s3/file.go | 176 ++++++++++++++++++++++++++++------------- s3/fs.go | 4 +- s3/stat.go | 2 - sftp/allocator.go | 3 +- sftp/packet_manager.go | 3 +- sftp/server.go | 34 ++++---- webdav.go | 2 +- 9 files changed, 151 insertions(+), 83 deletions(-) diff --git a/config.go b/config.go index b660e68..c5aff5b 100644 --- a/config.go +++ b/config.go @@ -14,6 +14,7 @@ type Config struct { UserNameAttr string `env:"BAGAGE_LDAP_USERNAME_ATTR" default:"cn"` Endpoint string `env:"BAGAGE_S3_ENDPOINT" default:"garage.deuxfleurs.fr"` UseSSL bool `env:"BAGAGE_S3_SSL" default:"true"` + S3Cache string `env:"BAGAGE_S3_CACHE" default:"./s3_cache"` SSHKey string `env:"BAGAGE_SSH_KEY" default:"id_rsa"` } diff --git a/main.go b/main.go index bf7dc5e..c70dd4d 100644 --- a/main.go +++ b/main.go @@ -13,16 +13,19 @@ import ( "log" "net" "net/http" + "os" ) func main() { log.Println("=== Starting Bagage ===") config := (&Config{}).LoadWithDefault().LoadWithEnv() - log.Println(config) - done := make(chan error) + // Some init + os.MkdirAll(config.S3Cache, 0755) + // Launch our submodules + done := make(chan error) go httpServer(config, done) go sshServer(config, done) @@ -148,7 +151,7 @@ func handleSSHConn(nConn net.Conn, dconfig *Config, config *ssh.ServerConfig) { return } - fs := s3.NewS3FS(mc) + fs := s3.NewS3FS(mc, dconfig.S3Cache) server, err := sftp.NewServer(ctx, channel, &fs) if err != nil { diff --git a/s3/file.go b/s3/file.go index b20d247..1d6fced 100644 --- a/s3/file.go +++ b/s3/file.go @@ -1,27 +1,27 @@ package s3 import ( - "context" "errors" "fmt" "io" "io/fs" "log" "mime" + "os" "path" "github.com/minio/minio-go/v7" ) type S3File struct { - fs *S3FS - obj *minio.Object - objw *io.PipeWriter - donew chan error - pos int64 - eof bool + fs *S3FS + obj *minio.Object + objw *io.PipeWriter + cache *os.File + donew chan error + pos int64 entries []fs.FileInfo - Path S3Path + Path S3Path } func NewS3File(s *S3FS, path string) (*S3File, error) { @@ -49,6 +49,11 @@ func (f *S3File) Close() error { f.objw = nil } + if f.cache != nil { + err = append(err, f.writeFlush()) + f.cache = nil + } + count := 0 for _, e := range err { if e != nil { @@ -57,7 +62,7 @@ func (f *S3File) Close() error { } } if count > 0 { - return errors.New(fmt.Sprintf("%d errors when closing this WebDAV File. Read previous logs to know more.", count)) + return errors.New(fmt.Sprintf("%d errors when closing this S3 File. Read previous logs to know more.", count)) } return nil } @@ -74,10 +79,15 @@ func (f *S3File) loadObject() error { } func (f *S3File) Read(p []byte) (n int, err error) { - log.Printf("s3 Read\n") + //log.Printf("s3 Read\n") //if f.Stat() & OBJECT == 0 { /* @FIXME Ideally we would check against OBJECT but we need a non OPAQUE_KEY */ // return 0, os.ErrInvalid //} + + if f.cache != nil { + return f.cache.Read(p) + } + if err := f.loadObject(); err != nil { return 0, err } @@ -86,60 +96,120 @@ func (f *S3File) Read(p []byte) (n int, err error) { } func (f *S3File) ReadAt(p []byte, off int64) (n int, err error) { - stat, err := f.Stat() - if err != nil { - return 0, err - } else if off >= stat.Size() { - return 0, io.EOF - } + if f.cache != nil { + return f.cache.ReadAt(p, off) + } - log.Printf("s3 ReadAt %v\n", off) + stat, err := f.Stat() + if err != nil { + return 0, err + } else if off >= stat.Size() { + return 0, io.EOF + } + + //log.Printf("s3 ReadAt %v\n", off) if err := f.loadObject(); err != nil { return 0, err } - return f.obj.ReadAt(p, off) + return f.obj.ReadAt(p, off) +} + +func (f *S3File) initCache() error { + // We use a locally cached file instead of writing directly to S3 + // When the user calls close, the file is flushed on S3. + // Check writeFlush below. + if f.cache == nil { + // We create a temp file in the configured folder + // We do not use the default tmp file as files can be very large + // and could fillup the RAM (often /tmp is mounted in RAM) + tmp, err := os.CreateTemp(f.fs.local, "bagage-cache") + if err != nil { + return err + } + f.cache = tmp + + // Problem: WriteAt override the existing file, if it exists + // So if when we stat the file, its size is greater than zero, + // we download it in our cache + file, err := f.Stat() + if err != nil { + return err + } else if file.Size() != 0 { + // We get a Reader on our object + object, err := f.fs.mc.GetObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, minio.GetObjectOptions{}) + if err != nil { + return err + } + // We inject it in our cache file + if _, err = io.Copy(f.cache, object); err != nil { + return err + } + } + } + + return nil } func (f *S3File) WriteAt(p []byte, off int64) (n int, err error) { - return 0, errors.New("not implemented") + f.initCache() + // And now we simply apply the command on our cache + return f.cache.WriteAt(p, off) } func (f *S3File) Write(p []byte) (n int, err error) { - /*if f.path.class != OBJECT { - return 0, os.ErrInvalid - }*/ + f.initCache() - if f.objw == nil { - if f.pos != 0 { - return 0, errors.New("writing with an offset is not implemented") - } + return f.cache.Write(p) +} - r, w := io.Pipe() - f.donew = make(chan error, 1) - f.objw = w - - contentType := mime.TypeByExtension(path.Ext(f.Path.Key)) - go func() { - /* @FIXME - PutObject has a strange behaviour when used with unknown size, it supposes the final size will be 5TiB. - Then it computes that, following the final size of the file, each part of the multipart upload should be 512MiB, which leads to big allocations. - The culprit is OptimalPartInfo: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/api-put-object-common.go#L70 - We set this value to the minimum allowed one, 5MiB. - The minimum value is set here: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/constants.go#L24 - Because Multipart uploads seems to be limited to 10 000 parts, it might be possible that we are limited to 50 GiB files, which is still good enough. - Ref: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/api-put-object-common.go#L110-L112 - */ - _, err := f.fs.mc.PutObject(context.Background(), f.Path.Bucket, f.Path.Key, r, -1, minio.PutObjectOptions{ContentType: contentType, PartSize: 5*1024*1024}) - f.donew <- err - }() +func (f *S3File) writeFlush() error { + // Only needed if we used a write cache + if f.cache == nil { + return nil } - return f.objw.Write(p) + // Rewind the file to copy from the start + _, err := f.cache.Seek(0, 0) + if err != nil { + return err + } + + // Get a FileInfo object as minio needs its size (ideally) + stat, err := f.cache.Stat() + if err != nil { + return err + } + + // Send the file to minio + contentType := mime.TypeByExtension(path.Ext(f.Path.Key)) + _, err = f.fs.mc.PutObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, f.cache, stat.Size(), minio.PutObjectOptions{ + ContentType: contentType, + }) + if err != nil { + return err + } + + // Close the cache file and remove it + err = f.cache.Close() + if err != nil { + return err + } + + err = os.Remove(f.cache.Name()) + if err != nil { + return err + } + + return nil } func (f *S3File) Seek(offset int64, whence int) (int64, error) { + if f.cache != nil { + return f.cache.Seek(offset, whence) + } + if err := f.loadObject(); err != nil { return 0, err } @@ -165,10 +235,10 @@ func (f *S3File) Readdir(count int) ([]fs.FileInfo, error) { } func min(a, b int64) int64 { - if a < b { - return a - } - return b + if a < b { + return a + } + return b } func (f *S3File) readDirRoot(count int) ([]fs.FileInfo, error) { @@ -192,11 +262,11 @@ func (f *S3File) readDirRoot(count int) ([]fs.FileInfo, error) { beg := f.pos end := int64(len(f.entries)) if count > 0 { - end = min(beg + int64(count), end) + end = min(beg+int64(count), end) } f.pos = end - if end - beg == 0 { + if end-beg == 0 { err = io.EOF } @@ -232,11 +302,11 @@ func (f *S3File) readDirChild(count int) ([]fs.FileInfo, error) { beg := f.pos end := int64(len(f.entries)) if count > 0 { - end = min(beg + int64(count), end) + end = min(beg+int64(count), end) } f.pos = end - if end - beg == 0 { + if end-beg == 0 { err = io.EOF } diff --git a/s3/fs.go b/s3/fs.go index c5ae6a0..a8199d7 100644 --- a/s3/fs.go +++ b/s3/fs.go @@ -22,13 +22,15 @@ import ( type S3FS struct { cache map[string]*S3Stat mc *minio.Client + local string ctx context.Context } -func NewS3FS(mc *minio.Client) S3FS { +func NewS3FS(mc *minio.Client, local string) S3FS { return S3FS{ cache: make(map[string]*S3Stat), mc: mc, + local: local, } } diff --git a/s3/stat.go b/s3/stat.go index c91a757..96b0c24 100644 --- a/s3/stat.go +++ b/s3/stat.go @@ -1,7 +1,6 @@ package s3 import ( - "log" "errors" "io/fs" "path" @@ -105,7 +104,6 @@ func (s *S3Stat) Name() string { } func (s *S3Stat) Size() int64 { - log.Println("stat size: ", s.obj.Size) return s.obj.Size } diff --git a/sftp/allocator.go b/sftp/allocator.go index fc1b6f0..5ae2145 100644 --- a/sftp/allocator.go +++ b/sftp/allocator.go @@ -2,7 +2,7 @@ package sftp /* Imported from: https://github.com/pkg/sftp - */ +*/ import ( "sync" @@ -98,4 +98,3 @@ func (a *allocator) isRequestOrderIDUsed(requestOrderID uint32) bool { _, ok := a.used[requestOrderID] return ok } - diff --git a/sftp/packet_manager.go b/sftp/packet_manager.go index 5aeb72b..59b1ed1 100644 --- a/sftp/packet_manager.go +++ b/sftp/packet_manager.go @@ -2,7 +2,7 @@ package sftp /* Imported from: https://github.com/pkg/sftp - */ +*/ import ( "encoding" @@ -218,4 +218,3 @@ func (s *packetManager) maybeSendPackets() { // } // return res // } - diff --git a/sftp/server.go b/sftp/server.go index 51db31a..be9f70a 100644 --- a/sftp/server.go +++ b/sftp/server.go @@ -4,12 +4,12 @@ package sftp import ( "context" - "log" "encoding" "errors" "fmt" "io" "io/ioutil" + "log" "os" "strconv" "sync" @@ -21,7 +21,7 @@ import ( const ( // SftpServerWorkerCount defines the number of workers for the SFTP server - SftpServerWorkerCount = 8 + SftpServerWorkerCount = 1 ) // Server is an SSH File Transfer Protocol (sftp) server. @@ -194,7 +194,7 @@ func handlePacket(s *Server, p orderedRequest) error { case *sshFxpLstatPacket: log.Println("pkt: lstat: ", p.Path) // stat the requested file - info, err := os.Lstat(toLocalPath(p.Path)) + info, err := s.fs.Stat(s.ctx, p.Path) rpkt = &sshFxpStatResponse{ ID: p.ID, info: info, @@ -219,43 +219,39 @@ func handlePacket(s *Server, p orderedRequest) error { } case *sshFxpMkdirPacket: log.Println("pkt: mkdir: ", p.Path) - err := os.Mkdir(toLocalPath(p.Path), 0755) + err := s.fs.Mkdir(s.ctx, p.Path, 0755) rpkt = statusFromError(p.ID, err) case *sshFxpRmdirPacket: log.Println("pkt: rmdir: ", p.Path) - err := os.Remove(toLocalPath(p.Path)) + err := s.fs.RemoveAll(s.ctx, p.Path) rpkt = statusFromError(p.ID, err) case *sshFxpRemovePacket: log.Println("pkt: rm: ", p.Filename) - err := os.Remove(toLocalPath(p.Filename)) + err := s.fs.RemoveAll(s.ctx, p.Filename) rpkt = statusFromError(p.ID, err) case *sshFxpRenamePacket: log.Println("pkt: rename: ", p.Oldpath, ", ", p.Newpath) - err := os.Rename(toLocalPath(p.Oldpath), toLocalPath(p.Newpath)) + err := s.fs.Rename(s.ctx, p.Oldpath, p.Newpath) rpkt = statusFromError(p.ID, err) case *sshFxpSymlinkPacket: log.Println("pkt: ln -s: ", p.Targetpath, ", ", p.Linkpath) - err := os.Symlink(toLocalPath(p.Targetpath), toLocalPath(p.Linkpath)) + err := s.fs.Rename(s.ctx, p.Targetpath, p.Linkpath) rpkt = statusFromError(p.ID, err) case *sshFxpClosePacket: log.Println("pkt: close handle: ", p.Handle) rpkt = statusFromError(p.ID, s.closeHandle(p.Handle)) case *sshFxpReadlinkPacket: - log.Println("pkt: read: ", p.Path) - f, err := os.Readlink(toLocalPath(p.Path)) + log.Println("pkt: readlink: ", p.Path) rpkt = &sshFxpNamePacket{ ID: p.ID, NameAttrs: []*sshFxpNameAttr{ { - Name: f, - LongName: f, + Name: p.Path, + LongName: p.Path, Attrs: emptyFileStat, }, }, } - if err != nil { - rpkt = statusFromError(p.ID, err) - } case *sshFxpRealpathPacket: log.Println("pkt: absolute path: ", p.Path) f := s3.NewS3Path(p.Path).Path @@ -288,7 +284,7 @@ func handlePacket(s *Server, p orderedRequest) error { case *sshFxpReadPacket: var err error = EBADF f, ok := s.getHandle(p.Handle) - log.Println("pkt: read handle: ", p.Handle, f.Path.Path) + //log.Println("pkt: read handle: ", p.Handle, f.Path.Path) if ok { err = nil data := p.getDataSlice(s.pktMgr.alloc, orderID) @@ -309,7 +305,7 @@ func handlePacket(s *Server, p orderedRequest) error { } case *sshFxpWritePacket: - log.Println("pkt: write handle: ", p.Handle, ", Offset: ", p.Offset) + //log.Println("pkt: write handle: ", p.Handle, ", Offset: ", p.Offset) f, ok := s.getHandle(p.Handle) var err error = EBADF if ok { @@ -324,7 +320,7 @@ func handlePacket(s *Server, p orderedRequest) error { rpkt = p.respond(s) } case serverRespondablePacket: - log.Println("pkt: respondable") + //log.Println("pkt: respondable") rpkt = p.respond(s) default: return fmt.Errorf("unexpected packet type %T", p) @@ -477,7 +473,7 @@ func (p *sshFxpOpenPacket) respond(svr *Server) responsePacket { } func (p *sshFxpReaddirPacket) respond(svr *Server) responsePacket { - log.Println("pkt: readdir: ", p.Handle) + //log.Println("pkt: readdir: ", p.Handle) f, ok := svr.getHandle(p.Handle) if !ok { return statusFromError(p.ID, EBADF) diff --git a/webdav.go b/webdav.go index 8901166..8e2ce95 100644 --- a/webdav.go +++ b/webdav.go @@ -16,7 +16,7 @@ func (wd WebDav) WithMC(mc *minio.Client) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { (&webdav.Handler{ Prefix: wd.WithConfig.DavPath, - FileSystem: s3.NewS3FS(mc), + FileSystem: s3.NewS3FS(mc, wd.WithConfig.S3Cache), LockSystem: webdav.NewMemLS(), Logger: func(r *http.Request, err error) { log.Printf("INFO: %s %s %s\n", r.RemoteAddr, r.Method, r.URL)