248 lines
5.9 KiB
Go
248 lines
5.9 KiB
Go
package s3
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/fs"
|
|
"log"
|
|
"mime"
|
|
"path"
|
|
|
|
"github.com/minio/minio-go/v7"
|
|
)
|
|
|
|
type S3File struct {
|
|
fs *S3FS
|
|
obj *minio.Object
|
|
objw *io.PipeWriter
|
|
donew chan error
|
|
pos int64
|
|
eof bool
|
|
entries []fs.FileInfo
|
|
Path S3Path
|
|
}
|
|
|
|
func NewS3File(s *S3FS, path string) (*S3File, error) {
|
|
f := new(S3File)
|
|
f.fs = s
|
|
f.pos = 0
|
|
f.entries = nil
|
|
f.Path = NewS3Path(path)
|
|
return f, nil
|
|
}
|
|
|
|
func (f *S3File) Close() error {
|
|
err := make([]error, 0)
|
|
|
|
if f.obj != nil {
|
|
err = append(err, f.obj.Close())
|
|
f.obj = nil
|
|
}
|
|
|
|
if f.objw != nil {
|
|
// wait that minio completes its transfers in background
|
|
err = append(err, f.objw.Close())
|
|
err = append(err, <-f.donew)
|
|
f.donew = nil
|
|
f.objw = nil
|
|
}
|
|
|
|
count := 0
|
|
for _, e := range err {
|
|
if e != nil {
|
|
count++
|
|
log.Println(e)
|
|
}
|
|
}
|
|
if count > 0 {
|
|
return errors.New(fmt.Sprintf("%d errors when closing this WebDAV File. Read previous logs to know more.", count))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *S3File) loadObject() error {
|
|
if f.obj == nil {
|
|
obj, err := f.fs.mc.GetObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, minio.GetObjectOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
f.obj = obj
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *S3File) Read(p []byte) (n int, err error) {
|
|
log.Printf("s3 Read\n")
|
|
//if f.Stat() & OBJECT == 0 { /* @FIXME Ideally we would check against OBJECT but we need a non OPAQUE_KEY */
|
|
// return 0, os.ErrInvalid
|
|
//}
|
|
if err := f.loadObject(); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return f.obj.Read(p)
|
|
}
|
|
|
|
func (f *S3File) ReadAt(p []byte, off int64) (n int, err error) {
|
|
stat, err := f.Stat()
|
|
if err != nil {
|
|
return 0, err
|
|
} else if off >= stat.Size() {
|
|
return 0, io.EOF
|
|
}
|
|
|
|
log.Printf("s3 ReadAt %v\n", off)
|
|
if err := f.loadObject(); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return f.obj.ReadAt(p, off)
|
|
}
|
|
|
|
func (f *S3File) WriteAt(p []byte, off int64) (n int, err error) {
|
|
return 0, errors.New("not implemented")
|
|
|
|
}
|
|
|
|
func (f *S3File) Write(p []byte) (n int, err error) {
|
|
/*if f.path.class != OBJECT {
|
|
return 0, os.ErrInvalid
|
|
}*/
|
|
|
|
if f.objw == nil {
|
|
if f.pos != 0 {
|
|
return 0, errors.New("writing with an offset is not implemented")
|
|
}
|
|
|
|
r, w := io.Pipe()
|
|
f.donew = make(chan error, 1)
|
|
f.objw = w
|
|
|
|
contentType := mime.TypeByExtension(path.Ext(f.Path.Key))
|
|
go func() {
|
|
/* @FIXME
|
|
PutObject has a strange behaviour when used with unknown size, it supposes the final size will be 5TiB.
|
|
Then it computes that, following the final size of the file, each part of the multipart upload should be 512MiB, which leads to big allocations.
|
|
The culprit is OptimalPartInfo: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/api-put-object-common.go#L70
|
|
We set this value to the minimum allowed one, 5MiB.
|
|
The minimum value is set here: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/constants.go#L24
|
|
Because Multipart uploads seems to be limited to 10 000 parts, it might be possible that we are limited to 50 GiB files, which is still good enough.
|
|
Ref: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/api-put-object-common.go#L110-L112
|
|
*/
|
|
_, err := f.fs.mc.PutObject(context.Background(), f.Path.Bucket, f.Path.Key, r, -1, minio.PutObjectOptions{ContentType: contentType, PartSize: 5*1024*1024})
|
|
f.donew <- err
|
|
}()
|
|
}
|
|
|
|
return f.objw.Write(p)
|
|
}
|
|
|
|
func (f *S3File) Seek(offset int64, whence int) (int64, error) {
|
|
if err := f.loadObject(); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
pos, err := f.obj.Seek(offset, whence)
|
|
f.pos += pos
|
|
return pos, err
|
|
}
|
|
|
|
/*
|
|
ReadDir reads the contents of the directory associated with the file f and returns a slice of DirEntry values in directory order. Subsequent calls on the same file will yield later DirEntry records in the directory.
|
|
|
|
If n > 0, ReadDir returns at most n DirEntry records. In this case, if ReadDir returns an empty slice, it will return an error explaining why. At the end of a directory, the error is io.EOF.
|
|
|
|
If n <= 0, ReadDir returns all the DirEntry records remaining in the directory. When it succeeds, it returns a nil error (not io.EOF).
|
|
*/
|
|
func (f *S3File) Readdir(count int) ([]fs.FileInfo, error) {
|
|
if f.Path.Class == ROOT {
|
|
return f.readDirRoot(count)
|
|
} else {
|
|
return f.readDirChild(count)
|
|
}
|
|
}
|
|
|
|
func min(a, b int64) int64 {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func (f *S3File) readDirRoot(count int) ([]fs.FileInfo, error) {
|
|
var err error
|
|
if f.entries == nil {
|
|
buckets, err := f.fs.mc.ListBuckets(f.fs.ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
f.entries = make([]fs.FileInfo, 0, len(buckets))
|
|
for _, bucket := range buckets {
|
|
//log.Println("Stat from GarageFile.readDirRoot()", "/"+bucket.Name)
|
|
nf, err := NewS3Stat(f.fs, "/"+bucket.Name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
f.entries = append(f.entries, nf)
|
|
}
|
|
}
|
|
beg := f.pos
|
|
end := int64(len(f.entries))
|
|
if count > 0 {
|
|
end = min(beg + int64(count), end)
|
|
}
|
|
f.pos = end
|
|
|
|
if end - beg == 0 {
|
|
err = io.EOF
|
|
}
|
|
|
|
return f.entries[beg:end], err
|
|
}
|
|
|
|
func (f *S3File) readDirChild(count int) ([]fs.FileInfo, error) {
|
|
var err error
|
|
if f.entries == nil {
|
|
prefix := f.Path.Key
|
|
if len(prefix) > 0 && prefix[len(prefix)-1:] != "/" {
|
|
prefix = prefix + "/"
|
|
}
|
|
|
|
objs_info := f.fs.mc.ListObjects(f.fs.ctx, f.Path.Bucket, minio.ListObjectsOptions{
|
|
Prefix: prefix,
|
|
Recursive: false,
|
|
})
|
|
|
|
f.entries = make([]fs.FileInfo, 0)
|
|
for object := range objs_info {
|
|
if object.Err != nil {
|
|
return nil, object.Err
|
|
}
|
|
//log.Println("Stat from GarageFile.readDirChild()", path.Join("/", f.path.bucket, object.Key))
|
|
nf, err := NewS3StatFromObjectInfo(f.fs, f.Path.Bucket, object)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
f.entries = append(f.entries, nf)
|
|
}
|
|
}
|
|
beg := f.pos
|
|
end := int64(len(f.entries))
|
|
if count > 0 {
|
|
end = min(beg + int64(count), end)
|
|
}
|
|
f.pos = end
|
|
|
|
if end - beg == 0 {
|
|
err = io.EOF
|
|
}
|
|
|
|
return f.entries[beg:end], err
|
|
}
|
|
|
|
func (f *S3File) Stat() (fs.FileInfo, error) {
|
|
return NewS3Stat(f.fs, f.Path.Path)
|
|
}
|