package executor import ( "context" "fmt" "io" "os" "os/exec" "sync" "syscall" hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/plugins/drivers" dproto "github.com/hashicorp/nomad/plugins/drivers/proto" ) // execHelper is a convenient wrapper for starting and executing commands, and handling their output type execHelper struct { logger hclog.Logger // newTerminal function creates a tty appropriate for the command // The returned pty end of tty function is to be called after process start. newTerminal func() (pty func() (*os.File, error), tty *os.File, err error) // setTTY is a callback to configure the command with slave end of the tty of the terminal, when tty is enabled setTTY func(tty *os.File) error // setTTY is a callback to configure the command with std{in|out|err}, when tty is disabled setIO func(stdin io.Reader, stdout, stderr io.Writer) error // processStart starts the process, like `exec.Cmd.Start()` processStart func() error // processWait blocks until command terminates and returns its final state processWait func() (*os.ProcessState, error) } func (e *execHelper) run(ctx context.Context, tty bool, stream drivers.ExecTaskStream) error { if tty { return e.runTTY(ctx, stream) } return e.runNoTTY(ctx, stream) } func (e *execHelper) runTTY(ctx context.Context, stream drivers.ExecTaskStream) error { ptyF, tty, err := e.newTerminal() if err != nil { return fmt.Errorf("failed to open a tty: %v", err) } defer tty.Close() if err := e.setTTY(tty); err != nil { return fmt.Errorf("failed to set command tty: %v", err) } if err := e.processStart(); err != nil { return fmt.Errorf("failed to start command: %v", err) } var wg sync.WaitGroup errCh := make(chan error, 3) pty, err := ptyF() if err != nil { return fmt.Errorf("failed to get pty: %v", err) } defer pty.Close() wg.Add(1) go handleStdin(e.logger, pty, stream, errCh) // when tty is on, stdout and stderr point to the same pty so only read once go handleStdout(e.logger, pty, &wg, stream.Send, errCh) ps, err := e.processWait() // force close streams to close out the stream copying goroutines tty.Close() // wait until we get all process output wg.Wait() // wait to flush out output stream.Send(cmdExitResult(ps, err)) select { case cerr := <-errCh: return cerr default: return nil } } func (e *execHelper) runNoTTY(ctx context.Context, stream drivers.ExecTaskStream) error { var sendLock sync.Mutex send := func(v *drivers.ExecTaskStreamingResponseMsg) error { sendLock.Lock() defer sendLock.Unlock() return stream.Send(v) } stdinPr, stdinPw := io.Pipe() stdoutPr, stdoutPw := io.Pipe() stderrPr, stderrPw := io.Pipe() defer stdoutPw.Close() defer stderrPw.Close() if err := e.setIO(stdinPr, stdoutPw, stderrPw); err != nil { return fmt.Errorf("failed to set command io: %v", err) } if err := e.processStart(); err != nil { return fmt.Errorf("failed to start command: %v", err) } var wg sync.WaitGroup errCh := make(chan error, 3) wg.Add(2) go handleStdin(e.logger, stdinPw, stream, errCh) go handleStdout(e.logger, stdoutPr, &wg, send, errCh) go handleStderr(e.logger, stderrPr, &wg, send, errCh) ps, err := e.processWait() // force close streams to close out the stream copying goroutines stdinPr.Close() stdoutPw.Close() stderrPw.Close() // wait until we get all process output wg.Wait() // wait to flush out output stream.Send(cmdExitResult(ps, err)) select { case cerr := <-errCh: return cerr default: return nil } } func cmdExitResult(ps *os.ProcessState, err error) *drivers.ExecTaskStreamingResponseMsg { exitCode := -1 if ps == nil { if ee, ok := err.(*exec.ExitError); ok { ps = ee.ProcessState } } if ps == nil { exitCode = -2 } else if status, ok := ps.Sys().(syscall.WaitStatus); ok { exitCode = status.ExitStatus() if status.Signaled() { const exitSignalBase = 128 signal := int(status.Signal()) exitCode = exitSignalBase + signal } } return &drivers.ExecTaskStreamingResponseMsg{ Exited: true, Result: &dproto.ExitResult{ ExitCode: int32(exitCode), }, } } func handleStdin(logger hclog.Logger, stdin io.WriteCloser, stream drivers.ExecTaskStream, errCh chan<- error) { for { m, err := stream.Recv() if isClosedError(err) { return } else if err != nil { errCh <- err return } if m.Stdin != nil { if len(m.Stdin.Data) != 0 { _, err := stdin.Write(m.Stdin.Data) if err != nil { errCh <- err return } } if m.Stdin.Close { stdin.Close() } } else if m.TtySize != nil { err := setTTYSize(stdin, m.TtySize.Height, m.TtySize.Width) if err != nil { errCh <- fmt.Errorf("failed to resize tty: %v", err) return } } } } func handleStdout(logger hclog.Logger, reader io.Reader, wg *sync.WaitGroup, send func(*drivers.ExecTaskStreamingResponseMsg) error, errCh chan<- error) { defer wg.Done() buf := make([]byte, 4096) for { n, err := reader.Read(buf) // always send output first if we read something if n > 0 { if err := send(&drivers.ExecTaskStreamingResponseMsg{ Stdout: &dproto.ExecTaskStreamingIOOperation{ Data: buf[:n], }, }); err != nil { errCh <- err return } } // then process error if isClosedError(err) { if err := send(&drivers.ExecTaskStreamingResponseMsg{ Stdout: &dproto.ExecTaskStreamingIOOperation{ Close: true, }, }); err != nil { errCh <- err return } return } else if err != nil { errCh <- err return } } } func handleStderr(logger hclog.Logger, reader io.Reader, wg *sync.WaitGroup, send func(*drivers.ExecTaskStreamingResponseMsg) error, errCh chan<- error) { defer wg.Done() buf := make([]byte, 4096) for { n, err := reader.Read(buf) // always send output first if we read something if n > 0 { if err := send(&drivers.ExecTaskStreamingResponseMsg{ Stderr: &dproto.ExecTaskStreamingIOOperation{ Data: buf[:n], }, }); err != nil { errCh <- err return } } // then process error if isClosedError(err) { if err := send(&drivers.ExecTaskStreamingResponseMsg{ Stderr: &dproto.ExecTaskStreamingIOOperation{ Close: true, }, }); err != nil { errCh <- err return } return } else if err != nil { errCh <- err return } } } func isClosedError(err error) bool { if err == nil { return false } return err == io.EOF || err == io.ErrClosedPipe || isUnixEIOErr(err) }