Merge branch 'driver-exec2'

This commit is contained in:
Alex 2022-11-29 10:10:22 +01:00
commit 50412d4cf0
Signed by: lx
GPG Key ID: 0E496D15096376BE
2 changed files with 71 additions and 28 deletions

View File

@ -4,6 +4,7 @@ import (
@ -114,6 +115,9 @@ type Driver struct {
// config is the driver configuration set by the SetConfig RPC
config Config
// nomadConfig is the client config from nomad
nomadConfig *base.ClientDriverConfig
// tasks is the in memory datastore mapping taskIDs to driverHandles
tasks *taskStore
@ -236,9 +240,10 @@ func (tc *TaskConfig) validate() error {
// StartTask. This information is needed to rebuild the task state and handler
// during recovery.
type TaskState struct {
TaskConfig *drivers.TaskConfig
Pid int
StartedAt time.Time
ReattachConfig *pstructs.ReattachConfig
TaskConfig *drivers.TaskConfig
Pid int
StartedAt time.Time
// NewPlugin returns a new DrivePlugin implementation
@ -298,6 +303,9 @@ func (d *Driver) SetConfig(cfg *base.Config) error {
d.logger.Info("Got config", "driver_config", hclog.Fmt("%+v", config))
d.config = config
if cfg != nil && cfg.AgentConfig != nil {
d.nomadConfig = cfg.AgentConfig.Driver
return nil
@ -399,18 +407,29 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
return fmt.Errorf("failed to decode task state from handle: %v", err)
// Create new executor
exec := executor.NewExecutorWithIsolation(
// Create client for reattached executor
plugRC, err := pstructs.ReattachConfigToGoPlugin(taskState.ReattachConfig)
if err != nil {
d.logger.Error("failed to build ReattachConfig from task state", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to build ReattachConfig from task state: %v", err)
exec, pluginClient, err := executor.ReattachToExecutor(plugRC,
d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID))
if err != nil {
d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to reattach to executor: %v", err)
h := &taskHandle{
exec: exec,
pid: taskState.Pid,
taskConfig: taskState.TaskConfig,
procState: drivers.TaskStateRunning,
startedAt: taskState.StartedAt,
exitResult: &drivers.ExitResult{},
logger: d.logger,
exec: exec,
pid: taskState.Pid,
pluginClient: pluginClient,
taskConfig: taskState.TaskConfig,
procState: drivers.TaskStateRunning,
startedAt: taskState.StartedAt,
exitResult: &drivers.ExitResult{},
logger: d.logger,
d.tasks.Set(taskState.TaskConfig.ID, h)
@ -437,8 +456,19 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
handle := drivers.NewTaskHandle(taskHandleVersion)
handle.Config = cfg
exec := executor.NewExecutorWithIsolation(
d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID))
pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out")
executorConfig := &executor.ExecutorConfig{
LogFile: pluginLogFile,
LogLevel: "debug",
FSIsolation: true,
exec, pluginClient, err := executor.CreateExecutor(
d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID),
d.nomadConfig, executorConfig)
if err != nil {
return nil, nil, fmt.Errorf("failed to create executor: %v", err)
user := cfg.User
if user == "" {
@ -518,27 +548,31 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
ps, err := exec.Launch(execCmd)
if err != nil {
return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err)
h := &taskHandle{
exec: exec,
pid: ps.Pid,
taskConfig: cfg,
procState: drivers.TaskStateRunning,
startedAt: time.Now().Round(time.Millisecond),
logger: d.logger,
exec: exec,
pid: ps.Pid,
pluginClient: pluginClient,
taskConfig: cfg,
procState: drivers.TaskStateRunning,
startedAt: time.Now().Round(time.Millisecond),
logger: d.logger,
driverState := TaskState{
Pid: ps.Pid,
TaskConfig: cfg,
StartedAt: h.startedAt,
ReattachConfig: pstructs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()),
Pid: ps.Pid,
TaskConfig: cfg,
StartedAt: h.startedAt,
if err := handle.SetDriverState(&driverState); err != nil {
d.logger.Error("failed to start task, error setting driver state", "error", err)
_ = exec.Shutdown("", 0)
return nil, nil, fmt.Errorf("failed to set driver state: %v", err)
@ -590,6 +624,9 @@ func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) e
if err := handle.exec.Shutdown(signal, timeout); err != nil {
if handle.pluginClient.Exited() {
return nil
return fmt.Errorf("executor Shutdown failed: %v", err)
@ -625,8 +662,12 @@ func (d *Driver) DestroyTask(taskID string, force bool) error {
return fmt.Errorf("cannot destroy running task")
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "error", err)
if !handle.pluginClient.Exited() {
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "error", err)
// workaround for the case where DestroyTask was issued on task restart

View File

@ -8,13 +8,15 @@ import (
hclog ""
plugin ""
type taskHandle struct {
exec executor.Executor
pid int
logger hclog.Logger
exec executor.Executor
pid int
pluginClient *plugin.Client
logger hclog.Logger
// stateLock syncs access to all fields below
stateLock sync.RWMutex