package sftp // sftp server counterpart import ( "context" "log" "encoding" "errors" "fmt" "io" "io/ioutil" "os" "strconv" "sync" "syscall" "time" "git.deuxfleurs.fr/Deuxfleurs/bagage/s3" ) const ( // SftpServerWorkerCount defines the number of workers for the SFTP server SftpServerWorkerCount = 8 ) // Server is an SSH File Transfer Protocol (sftp) server. // This is intended to provide the sftp subsystem to an ssh server daemon. // This implementation currently supports most of sftp server protocol version 3, // as specified at http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02 type Server struct { *serverConn debugStream io.Writer readOnly bool pktMgr *packetManager openFiles map[string]*s3.S3File openFilesLock sync.RWMutex handleCount int fs *s3.S3FS ctx context.Context } func (svr *Server) nextHandle(f *s3.S3File) string { svr.openFilesLock.Lock() defer svr.openFilesLock.Unlock() svr.handleCount++ handle := strconv.Itoa(svr.handleCount) svr.openFiles[handle] = f return handle } func (svr *Server) closeHandle(handle string) error { svr.openFilesLock.Lock() defer svr.openFilesLock.Unlock() if f, ok := svr.openFiles[handle]; ok { delete(svr.openFiles, handle) return f.Close() } return EBADF } func (svr *Server) getHandle(handle string) (*s3.S3File, bool) { svr.openFilesLock.RLock() defer svr.openFilesLock.RUnlock() f, ok := svr.openFiles[handle] return f, ok } type serverRespondablePacket interface { encoding.BinaryUnmarshaler id() uint32 respond(svr *Server) responsePacket } // NewServer creates a new Server instance around the provided streams, serving // content from the root of the filesystem. Optionally, ServerOption // functions may be specified to further configure the Server. // // A subsequent call to Serve() is required to begin serving files over SFTP. func NewServer(ctx context.Context, rwc io.ReadWriteCloser, fs *s3.S3FS, options ...ServerOption) (*Server, error) { svrConn := &serverConn{ conn: conn{ Reader: rwc, WriteCloser: rwc, }, } s := &Server{ serverConn: svrConn, debugStream: ioutil.Discard, pktMgr: newPktMgr(svrConn), openFiles: make(map[string]*s3.S3File), fs: fs, ctx: ctx, } for _, o := range options { if err := o(s); err != nil { return nil, err } } return s, nil } // A ServerOption is a function which applies configuration to a Server. type ServerOption func(*Server) error // WithDebug enables Server debugging output to the supplied io.Writer. func WithDebug(w io.Writer) ServerOption { return func(s *Server) error { s.debugStream = w return nil } } // ReadOnly configures a Server to serve files in read-only mode. func ReadOnly() ServerOption { return func(s *Server) error { s.readOnly = true return nil } } // WithAllocator enable the allocator. // After processing a packet we keep in memory the allocated slices // and we reuse them for new packets. // The allocator is experimental func WithAllocator() ServerOption { return func(s *Server) error { alloc := newAllocator() s.pktMgr.alloc = alloc s.conn.alloc = alloc return nil } } type rxPacket struct { pktType fxp pktBytes []byte } // Up to N parallel servers func (svr *Server) sftpServerWorker(pktChan chan orderedRequest) error { for pkt := range pktChan { // readonly checks readonly := true switch pkt := pkt.requestPacket.(type) { case notReadOnly: readonly = false case *sshFxpOpenPacket: readonly = pkt.readonly() case *sshFxpExtendedPacket: readonly = pkt.readonly() } // If server is operating read-only and a write operation is requested, // return permission denied if !readonly && svr.readOnly { svr.pktMgr.readyPacket( svr.pktMgr.newOrderedResponse(statusFromError(pkt.id(), syscall.EPERM), pkt.orderID()), ) continue } if err := handlePacket(svr, pkt); err != nil { return err } } return nil } func handlePacket(s *Server, p orderedRequest) error { var rpkt responsePacket orderID := p.orderID() switch p := p.requestPacket.(type) { case *sshFxInitPacket: log.Println("pkt: init") rpkt = &sshFxVersionPacket{ Version: sftpProtocolVersion, Extensions: sftpExtensions, } case *sshFxpStatPacket: log.Println("pkt: stat: ", p.Path) // stat the requested file info, err := s.fs.Stat(s.ctx, p.Path) rpkt = &sshFxpStatResponse{ ID: p.ID, info: info, } if err != nil { rpkt = statusFromError(p.ID, err) } case *sshFxpLstatPacket: log.Println("pkt: lstat: ", p.Path) // stat the requested file info, err := os.Lstat(toLocalPath(p.Path)) rpkt = &sshFxpStatResponse{ ID: p.ID, info: info, } if err != nil { rpkt = statusFromError(p.ID, err) } case *sshFxpFstatPacket: log.Println("pkt: fstat: ", p.Handle) f, ok := s.getHandle(p.Handle) var err error = EBADF var info os.FileInfo if ok { info, err = f.Stat() rpkt = &sshFxpStatResponse{ ID: p.ID, info: info, } } if err != nil { rpkt = statusFromError(p.ID, err) } case *sshFxpMkdirPacket: log.Println("pkt: mkdir: ", p.Path) err := os.Mkdir(toLocalPath(p.Path), 0755) rpkt = statusFromError(p.ID, err) case *sshFxpRmdirPacket: log.Println("pkt: rmdir: ", p.Path) err := os.Remove(toLocalPath(p.Path)) rpkt = statusFromError(p.ID, err) case *sshFxpRemovePacket: log.Println("pkt: rm: ", p.Filename) err := os.Remove(toLocalPath(p.Filename)) rpkt = statusFromError(p.ID, err) case *sshFxpRenamePacket: log.Println("pkt: rename: ", p.Oldpath, ", ", p.Newpath) err := os.Rename(toLocalPath(p.Oldpath), toLocalPath(p.Newpath)) rpkt = statusFromError(p.ID, err) case *sshFxpSymlinkPacket: log.Println("pkt: ln -s: ", p.Targetpath, ", ", p.Linkpath) err := os.Symlink(toLocalPath(p.Targetpath), toLocalPath(p.Linkpath)) rpkt = statusFromError(p.ID, err) case *sshFxpClosePacket: log.Println("pkt: close handle: ", p.Handle) rpkt = statusFromError(p.ID, s.closeHandle(p.Handle)) case *sshFxpReadlinkPacket: log.Println("pkt: read: ", p.Path) f, err := os.Readlink(toLocalPath(p.Path)) rpkt = &sshFxpNamePacket{ ID: p.ID, NameAttrs: []*sshFxpNameAttr{ { Name: f, LongName: f, Attrs: emptyFileStat, }, }, } if err != nil { rpkt = statusFromError(p.ID, err) } case *sshFxpRealpathPacket: log.Println("pkt: absolute path: ", p.Path) f := s3.NewS3Path(p.Path).Path rpkt = &sshFxpNamePacket{ ID: p.ID, NameAttrs: []*sshFxpNameAttr{ { Name: f, LongName: f, Attrs: emptyFileStat, }, }, } case *sshFxpOpendirPacket: log.Println("pkt: open dir: ", p.Path) p.Path = s3.NewS3Path(p.Path).Path if stat, err := s.fs.Stat(s.ctx, p.Path); err != nil { rpkt = statusFromError(p.ID, err) } else if !stat.IsDir() { rpkt = statusFromError(p.ID, &os.PathError{ Path: p.Path, Err: syscall.ENOTDIR}) } else { rpkt = (&sshFxpOpenPacket{ ID: p.ID, Path: p.Path, Pflags: sshFxfRead, }).respond(s) } case *sshFxpReadPacket: var err error = EBADF f, ok := s.getHandle(p.Handle) log.Println("pkt: read handle: ", p.Handle, f.Path.Path) if ok { err = nil data := p.getDataSlice(s.pktMgr.alloc, orderID) n, _err := f.ReadAt(data, int64(p.Offset)) log.Println("DEBUG: ", n, _err, p.Offset) if _err != nil && (_err != io.EOF || n == 0) { err = _err } rpkt = &sshFxpDataPacket{ ID: p.ID, Length: uint32(n), Data: data[:n], // do not use data[:n:n] here to clamp the capacity, we allocated extra capacity above to avoid reallocations } } if err != nil { rpkt = statusFromError(p.ID, err) } case *sshFxpWritePacket: log.Println("pkt: write handle: ", p.Handle, ", Offset: ", p.Offset) f, ok := s.getHandle(p.Handle) var err error = EBADF if ok { _, err = f.WriteAt(p.Data, int64(p.Offset)) } rpkt = statusFromError(p.ID, err) case *sshFxpExtendedPacket: log.Println("pkt: extended packet") if p.SpecificPacket == nil { rpkt = statusFromError(p.ID, ErrSSHFxOpUnsupported) } else { rpkt = p.respond(s) } case serverRespondablePacket: log.Println("pkt: respondable") rpkt = p.respond(s) default: return fmt.Errorf("unexpected packet type %T", p) } s.pktMgr.readyPacket(s.pktMgr.newOrderedResponse(rpkt, orderID)) return nil } // Serve serves SFTP connections until the streams stop or the SFTP subsystem // is stopped. func (svr *Server) Serve() error { defer func() { if svr.pktMgr.alloc != nil { svr.pktMgr.alloc.Free() } }() var wg sync.WaitGroup runWorker := func(ch chan orderedRequest) { wg.Add(1) go func() { defer wg.Done() if err := svr.sftpServerWorker(ch); err != nil { svr.conn.Close() // shuts down recvPacket } }() } pktChan := svr.pktMgr.workerChan(runWorker) var err error var pkt requestPacket var pktType uint8 var pktBytes []byte for { pktType, pktBytes, err = svr.serverConn.recvPacket(svr.pktMgr.getNextOrderID()) if err != nil { // we don't care about releasing allocated pages here, the server will quit and the allocator freed break } pkt, err = makePacket(rxPacket{fxp(pktType), pktBytes}) if err != nil { switch { case errors.Is(err, errUnknownExtendedPacket): //if err := svr.serverConn.sendError(pkt, ErrSshFxOpUnsupported); err != nil { // debug("failed to send err packet: %v", err) // svr.conn.Close() // shuts down recvPacket // break //} default: debug("makePacket err: %v", err) svr.conn.Close() // shuts down recvPacket break } } pktChan <- svr.pktMgr.newOrderedRequest(pkt) } close(pktChan) // shuts down sftpServerWorkers wg.Wait() // wait for all workers to exit // close any still-open files for handle, file := range svr.openFiles { fmt.Fprintf(svr.debugStream, "sftp server file with handle %q left open: %v\n", handle, file.Path.Path) file.Close() } return err // error from recvPacket } type ider interface { id() uint32 } // The init packet has no ID, so we just return a zero-value ID func (p *sshFxInitPacket) id() uint32 { return 0 } type sshFxpStatResponse struct { ID uint32 info os.FileInfo } func (p *sshFxpStatResponse) marshalPacket() ([]byte, []byte, error) { l := 4 + 1 + 4 // uint32(length) + byte(type) + uint32(id) b := make([]byte, 4, l) b = append(b, sshFxpAttrs) b = marshalUint32(b, p.ID) var payload []byte payload = marshalFileInfo(payload, p.info) return b, payload, nil } func (p *sshFxpStatResponse) MarshalBinary() ([]byte, error) { header, payload, err := p.marshalPacket() return append(header, payload...), err } var emptyFileStat = []interface{}{uint32(0)} func (p *sshFxpOpenPacket) readonly() bool { return !p.hasPflags(sshFxfWrite) } func (p *sshFxpOpenPacket) hasPflags(flags ...uint32) bool { for _, f := range flags { if p.Pflags&f == 0 { return false } } return true } func (p *sshFxpOpenPacket) respond(svr *Server) responsePacket { log.Println("pkt: open: ", p.Path) var osFlags int if p.hasPflags(sshFxfRead, sshFxfWrite) { osFlags |= os.O_RDWR } else if p.hasPflags(sshFxfWrite) { osFlags |= os.O_WRONLY } else if p.hasPflags(sshFxfRead) { osFlags |= os.O_RDONLY } else { // how are they opening? return statusFromError(p.ID, syscall.EINVAL) } // Don't use O_APPEND flag as it conflicts with WriteAt. // The sshFxfAppend flag is a no-op here as the client sends the offsets. // @FIXME these flags are currently ignored if p.hasPflags(sshFxfCreat) { osFlags |= os.O_CREATE } if p.hasPflags(sshFxfTrunc) { osFlags |= os.O_TRUNC } if p.hasPflags(sshFxfExcl) { osFlags |= os.O_EXCL } f, err := svr.fs.OpenFile2(svr.ctx, p.Path, osFlags, 0644) if err != nil { return statusFromError(p.ID, err) } handle := svr.nextHandle(f) return &sshFxpHandlePacket{ID: p.ID, Handle: handle} } func (p *sshFxpReaddirPacket) respond(svr *Server) responsePacket { log.Println("pkt: readdir: ", p.Handle) f, ok := svr.getHandle(p.Handle) if !ok { return statusFromError(p.ID, EBADF) } dirents, err := f.Readdir(128) if err != nil { return statusFromError(p.ID, err) } idLookup := osIDLookup{} ret := &sshFxpNamePacket{ID: p.ID} for _, dirent := range dirents { ret.NameAttrs = append(ret.NameAttrs, &sshFxpNameAttr{ Name: dirent.Name(), LongName: runLs(idLookup, dirent), Attrs: []interface{}{dirent}, }) } return ret } func (p *sshFxpSetstatPacket) respond(svr *Server) responsePacket { log.Println("pkt: setstat: ", p.Path) // additional unmarshalling is required for each possibility here b := p.Attrs.([]byte) var err error p.Path = toLocalPath(p.Path) debug("setstat name \"%s\"", p.Path) if (p.Flags & sshFileXferAttrSize) != 0 { var size uint64 if size, b, err = unmarshalUint64Safe(b); err == nil { err = os.Truncate(p.Path, int64(size)) } } if (p.Flags & sshFileXferAttrPermissions) != 0 { var mode uint32 if mode, b, err = unmarshalUint32Safe(b); err == nil { err = os.Chmod(p.Path, os.FileMode(mode)) } } if (p.Flags & sshFileXferAttrACmodTime) != 0 { var atime uint32 var mtime uint32 if atime, b, err = unmarshalUint32Safe(b); err != nil { } else if mtime, b, err = unmarshalUint32Safe(b); err != nil { } else { atimeT := time.Unix(int64(atime), 0) mtimeT := time.Unix(int64(mtime), 0) err = os.Chtimes(p.Path, atimeT, mtimeT) } } if (p.Flags & sshFileXferAttrUIDGID) != 0 { var uid uint32 var gid uint32 if uid, b, err = unmarshalUint32Safe(b); err != nil { } else if gid, _, err = unmarshalUint32Safe(b); err != nil { } else { err = os.Chown(p.Path, int(uid), int(gid)) } } return statusFromError(p.ID, err) } func (p *sshFxpFsetstatPacket) respond(svr *Server) responsePacket { log.Println("pkt: fsetstat: ", p.Handle) f, ok := svr.getHandle(p.Handle) if !ok { return statusFromError(p.ID, EBADF) } // additional unmarshalling is required for each possibility here //b := p.Attrs.([]byte) var err error debug("fsetstat name \"%s\"", f.Path.Path) if (p.Flags & sshFileXferAttrSize) != 0 { /*var size uint64 if size, b, err = unmarshalUint64Safe(b); err == nil { err = f.Truncate(int64(size)) }*/ log.Println("WARN: changing size of the file is not supported") } if (p.Flags & sshFileXferAttrPermissions) != 0 { /*var mode uint32 if mode, b, err = unmarshalUint32Safe(b); err == nil { err = f.Chmod(os.FileMode(mode)) }*/ log.Println("WARN: chmod not supported") } if (p.Flags & sshFileXferAttrACmodTime) != 0 { /*var atime uint32 var mtime uint32 if atime, b, err = unmarshalUint32Safe(b); err != nil { } else if mtime, b, err = unmarshalUint32Safe(b); err != nil { } else { atimeT := time.Unix(int64(atime), 0) mtimeT := time.Unix(int64(mtime), 0) err = os.Chtimes(f.Name(), atimeT, mtimeT) }*/ log.Println("WARN: chtimes not supported") } if (p.Flags & sshFileXferAttrUIDGID) != 0 { /*var uid uint32 var gid uint32 if uid, b, err = unmarshalUint32Safe(b); err != nil { } else if gid, _, err = unmarshalUint32Safe(b); err != nil { } else { err = f.Chown(int(uid), int(gid)) }*/ log.Println("WARN: chown not supported") } return statusFromError(p.ID, err) } func statusFromError(id uint32, err error) *sshFxpStatusPacket { ret := &sshFxpStatusPacket{ ID: id, StatusError: StatusError{ // sshFXOk = 0 // sshFXEOF = 1 // sshFXNoSuchFile = 2 ENOENT // sshFXPermissionDenied = 3 // sshFXFailure = 4 // sshFXBadMessage = 5 // sshFXNoConnection = 6 // sshFXConnectionLost = 7 // sshFXOPUnsupported = 8 Code: sshFxOk, }, } if err == nil { return ret } debug("statusFromError: error is %T %#v", err, err) ret.StatusError.Code = sshFxFailure ret.StatusError.msg = err.Error() if os.IsNotExist(err) { ret.StatusError.Code = sshFxNoSuchFile return ret } if code, ok := translateSyscallError(err); ok { ret.StatusError.Code = code return ret } switch e := err.(type) { case fxerr: ret.StatusError.Code = uint32(e) default: if e == io.EOF { ret.StatusError.Code = sshFxEOF } } return ret }