bagage/s3/file.go

319 lines
6.7 KiB
Go

package s3
import (
"errors"
"fmt"
"io"
"io/fs"
"log"
"mime"
"os"
"path"
"github.com/minio/minio-go/v7"
)
type S3File struct {
fs *S3FS
obj *minio.Object
objw *io.PipeWriter
cache *os.File
donew chan error
pos int64
entries []fs.FileInfo
Path S3Path
}
func NewS3File(s *S3FS, path string) (*S3File, error) {
f := new(S3File)
f.fs = s
f.pos = 0
f.entries = nil
f.Path = NewS3Path(path)
return f, nil
}
func (f *S3File) Close() error {
err := make([]error, 0)
if f.obj != nil {
err = append(err, f.obj.Close())
f.obj = nil
}
if f.objw != nil {
// wait that minio completes its transfers in background
err = append(err, f.objw.Close())
err = append(err, <-f.donew)
f.donew = nil
f.objw = nil
}
if f.cache != nil {
err = append(err, f.writeFlush())
f.cache = nil
}
count := 0
for _, e := range err {
if e != nil {
count++
log.Println(e)
}
}
if count > 0 {
return errors.New(fmt.Sprintf("%d errors when closing this S3 File. Read previous logs to know more.", count))
}
return nil
}
func (f *S3File) loadObject() error {
if f.obj == nil {
obj, err := f.fs.mc.GetObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, minio.GetObjectOptions{})
if err != nil {
return err
}
f.obj = obj
}
return nil
}
func (f *S3File) Read(p []byte) (n int, err error) {
//log.Printf("s3 Read\n")
//if f.Stat() & OBJECT == 0 { /* @FIXME Ideally we would check against OBJECT but we need a non OPAQUE_KEY */
// return 0, os.ErrInvalid
//}
if f.cache != nil {
return f.cache.Read(p)
}
if err := f.loadObject(); err != nil {
return 0, err
}
return f.obj.Read(p)
}
func (f *S3File) ReadAt(p []byte, off int64) (n int, err error) {
if f.cache != nil {
return f.cache.ReadAt(p, off)
}
stat, err := f.Stat()
if err != nil {
return 0, err
} else if off >= stat.Size() {
return 0, io.EOF
}
//log.Printf("s3 ReadAt %v\n", off)
if err := f.loadObject(); err != nil {
return 0, err
}
return f.obj.ReadAt(p, off)
}
func (f *S3File) initCache() error {
// We use a locally cached file instead of writing directly to S3
// When the user calls close, the file is flushed on S3.
// Check writeFlush below.
if f.cache == nil {
// We create a temp file in the configured folder
// We do not use the default tmp file as files can be very large
// and could fillup the RAM (often /tmp is mounted in RAM)
tmp, err := os.CreateTemp(f.fs.local, "bagage-cache")
if err != nil {
return err
}
f.cache = tmp
// Problem: WriteAt override the existing file, if it exists
// So if when we stat the file, its size is greater than zero,
// we download it in our cache
file, err := f.Stat()
if err != nil {
return err
} else if file.Size() != 0 {
// We get a Reader on our object
object, err := f.fs.mc.GetObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, minio.GetObjectOptions{})
if err != nil {
return err
}
// We inject it in our cache file
if _, err = io.Copy(f.cache, object); err != nil {
return err
}
}
}
return nil
}
func (f *S3File) WriteAt(p []byte, off int64) (n int, err error) {
f.initCache()
// And now we simply apply the command on our cache
return f.cache.WriteAt(p, off)
}
func (f *S3File) Write(p []byte) (n int, err error) {
f.initCache()
return f.cache.Write(p)
}
func (f *S3File) writeFlush() error {
// Only needed if we used a write cache
if f.cache == nil {
return nil
}
// Rewind the file to copy from the start
_, err := f.cache.Seek(0, 0)
if err != nil {
return err
}
// Get a FileInfo object as minio needs its size (ideally)
stat, err := f.cache.Stat()
if err != nil {
return err
}
// Send the file to minio
contentType := mime.TypeByExtension(path.Ext(f.Path.Key))
_, err = f.fs.mc.PutObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, f.cache, stat.Size(), minio.PutObjectOptions{
ContentType: contentType,
})
if err != nil {
return err
}
// Close the cache file and remove it
err = f.cache.Close()
if err != nil {
return err
}
err = os.Remove(f.cache.Name())
if err != nil {
return err
}
return nil
}
func (f *S3File) Seek(offset int64, whence int) (int64, error) {
if f.cache != nil {
return f.cache.Seek(offset, whence)
}
if err := f.loadObject(); err != nil {
return 0, err
}
pos, err := f.obj.Seek(offset, whence)
f.pos += pos
return pos, err
}
/*
ReadDir reads the contents of the directory associated with the file f and returns a slice of DirEntry values in directory order. Subsequent calls on the same file will yield later DirEntry records in the directory.
If n > 0, ReadDir returns at most n DirEntry records. In this case, if ReadDir returns an empty slice, it will return an error explaining why. At the end of a directory, the error is io.EOF.
If n <= 0, ReadDir returns all the DirEntry records remaining in the directory. When it succeeds, it returns a nil error (not io.EOF).
*/
func (f *S3File) Readdir(count int) ([]fs.FileInfo, error) {
if f.Path.Class == ROOT {
return f.readDirRoot(count)
} else {
return f.readDirChild(count)
}
}
func min(a, b int64) int64 {
if a < b {
return a
}
return b
}
func (f *S3File) readDirRoot(count int) ([]fs.FileInfo, error) {
var err error
if f.entries == nil {
buckets, err := f.fs.mc.ListBuckets(f.fs.ctx)
if err != nil {
return nil, err
}
f.entries = make([]fs.FileInfo, 0, len(buckets))
for _, bucket := range buckets {
//log.Println("Stat from GarageFile.readDirRoot()", "/"+bucket.Name)
nf, err := NewS3Stat(f.fs, "/"+bucket.Name)
if err != nil {
return nil, err
}
f.entries = append(f.entries, nf)
}
}
beg := f.pos
end := int64(len(f.entries))
if count > 0 {
end = min(beg+int64(count), end)
}
f.pos = end
if end-beg == 0 {
err = io.EOF
}
return f.entries[beg:end], err
}
func (f *S3File) readDirChild(count int) ([]fs.FileInfo, error) {
var err error
if f.entries == nil {
prefix := f.Path.Key
if len(prefix) > 0 && prefix[len(prefix)-1:] != "/" {
prefix = prefix + "/"
}
objs_info := f.fs.mc.ListObjects(f.fs.ctx, f.Path.Bucket, minio.ListObjectsOptions{
Prefix: prefix,
Recursive: false,
})
f.entries = make([]fs.FileInfo, 0)
for object := range objs_info {
if object.Err != nil {
return nil, object.Err
}
//log.Println("Stat from GarageFile.readDirChild()", path.Join("/", f.path.bucket, object.Key))
nf, err := NewS3StatFromObjectInfo(f.fs, f.Path.Bucket, object)
if err != nil {
return nil, err
}
f.entries = append(f.entries, nf)
}
}
beg := f.pos
end := int64(len(f.entries))
if count > 0 {
end = min(beg+int64(count), end)
}
f.pos = end
if end-beg == 0 {
err = io.EOF
}
return f.entries[beg:end], err
}
func (f *S3File) Stat() (fs.FileInfo, error) {
return NewS3Stat(f.fs, f.Path.Path)
}