diff --git a/exec2/driver.go b/exec2/driver.go index 312cb76..f035d15 100644 --- a/exec2/driver.go +++ b/exec2/driver.go @@ -1,276 +1,316 @@ -package hello +package exec import ( "context" - "errors" "fmt" "os" - "os/exec" "path/filepath" - "regexp" + "runtime" + "sync" "time" "github.com/hashicorp/consul-template/signals" - "github.com/hashicorp/go-hclog" + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/lib/cgutil" + "github.com/hashicorp/nomad/drivers/shared/capabilities" "github.com/hashicorp/nomad/drivers/shared/eventer" "github.com/hashicorp/nomad/drivers/shared/executor" + "github.com/hashicorp/nomad/drivers/shared/resolvconf" + "github.com/hashicorp/nomad/helper/pluginutils/loader" + "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/plugins/drivers/utils" "github.com/hashicorp/nomad/plugins/shared/hclspec" - "github.com/hashicorp/nomad/plugins/shared/structs" + pstructs "github.com/hashicorp/nomad/plugins/shared/structs" ) const ( // pluginName is the name of the plugin - // this is used for logging and (along with the version) for uniquely - // identifying plugin binaries fingerprinted by the client - pluginName = "hello-world-example" + pluginName = "exec" - // pluginVersion allows the client to identify and use newer versions of - // an installed plugin - pluginVersion = "v0.1.0" - - // fingerprintPeriod is the interval at which the plugin will send - // fingerprint responses + // fingerprintPeriod is the interval at which the driver will send fingerprint responses fingerprintPeriod = 30 * time.Second - // taskHandleVersion is the version of task handle which this plugin sets - // and understands how to decode - // this is used to allow modification and migration of the task schema - // used by the plugin + // taskHandleVersion is the version of task handle which this driver sets + // and understands how to decode driver state taskHandleVersion = 1 ) var ( - // pluginInfo describes the plugin + // PluginID is the exec plugin metadata registered in the plugin + // catalog. + PluginID = loader.PluginID{ + Name: pluginName, + PluginType: base.PluginTypeDriver, + } + + // PluginConfig is the exec driver factory function registered in the + // plugin catalog. + PluginConfig = &loader.InternalPluginConfig{ + Config: map[string]interface{}{}, + Factory: func(ctx context.Context, l hclog.Logger) interface{} { return NewExecDriver(ctx, l) }, + } + + // pluginInfo is the response returned for the PluginInfo RPC pluginInfo = &base.PluginInfoResponse{ Type: base.PluginTypeDriver, PluginApiVersions: []string{drivers.ApiVersion010}, - PluginVersion: pluginVersion, + PluginVersion: "0.1.0", Name: pluginName, } - // configSpec is the specification of the plugin's configuration - // this is used to validate the configuration specified for the plugin - // on the client. - // this is not global, but can be specified on a per-client basis. + // configSpec is the hcl specification returned by the ConfigSchema RPC configSpec = hclspec.NewObject(map[string]*hclspec.Spec{ - // TODO: define plugin's agent configuration schema. - // - // The schema should be defined using HCL specs and it will be used to - // validate the agent configuration provided by the user in the - // `plugin` stanza (https://www.nomadproject.io/docs/configuration/plugin.html). - // - // For example, for the schema below a valid configuration would be: - // - // plugin "hello-driver-plugin" { - // config { - // shell = "fish" - // } - // } - "shell": hclspec.NewDefault( - hclspec.NewAttr("shell", "string", false), - hclspec.NewLiteral(`"bash"`), + "no_pivot_root": hclspec.NewDefault( + hclspec.NewAttr("no_pivot_root", "bool", false), + hclspec.NewLiteral("false"), + ), + "default_pid_mode": hclspec.NewDefault( + hclspec.NewAttr("default_pid_mode", "string", false), + hclspec.NewLiteral(`"private"`), + ), + "default_ipc_mode": hclspec.NewDefault( + hclspec.NewAttr("default_ipc_mode", "string", false), + hclspec.NewLiteral(`"private"`), + ), + "allow_caps": hclspec.NewDefault( + hclspec.NewAttr("allow_caps", "list(string)", false), + hclspec.NewLiteral(capabilities.HCLSpecLiteral), ), }) - // taskConfigSpec is the specification of the plugin's configuration for - // a task - // this is used to validated the configuration specified for the plugin - // when a job is submitted. + // taskConfigSpec is the hcl specification for the driver config section of + // a task within a job. It is returned in the TaskConfigSchema RPC taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{ - // TODO: define plugin's task configuration schema - // - // The schema should be defined using HCL specs and it will be used to - // validate the task configuration provided by the user when they - // submit a job. - // - // For example, for the schema below a valid task would be: - // job "example" { - // group "example" { - // task "say-hi" { - // driver = "hello-driver-plugin" - // config { - // greeting = "Hi" - // } - // } - // } - // } - "greeting": hclspec.NewDefault( - hclspec.NewAttr("greeting", "string", false), - hclspec.NewLiteral(`"Hello, World!"`), - ), + "command": hclspec.NewAttr("command", "string", true), + "args": hclspec.NewAttr("args", "list(string)", false), + "pid_mode": hclspec.NewAttr("pid_mode", "string", false), + "ipc_mode": hclspec.NewAttr("ipc_mode", "string", false), + "cap_add": hclspec.NewAttr("cap_add", "list(string)", false), + "cap_drop": hclspec.NewAttr("cap_drop", "list(string)", false), }) - // capabilities indicates what optional features this driver supports - // this should be set according to the target run time. - capabilities = &drivers.Capabilities{ - // TODO: set plugin's capabilities - // - // The plugin's capabilities signal Nomad which extra functionalities - // are supported. For a list of available options check the docs page: - // https://godoc.org/github.com/hashicorp/nomad/plugins/drivers#Capabilities + // driverCapabilities represents the RPC response for what features are + // implemented by the exec task driver + driverCapabilities = &drivers.Capabilities{ SendSignals: true, - Exec: false, + Exec: true, + FSIsolation: drivers.FSIsolationChroot, + NetIsolationModes: []drivers.NetIsolationMode{ + drivers.NetIsolationModeHost, + drivers.NetIsolationModeGroup, + }, + MountConfigs: drivers.MountConfigSupportAll, } ) -// Config contains configuration information for the plugin -type Config struct { - // TODO: create decoded plugin configuration struct - // - // This struct is the decoded version of the schema defined in the - // configSpec variable above. It's used to convert the HCL configuration - // passed by the Nomad agent into Go contructs. - Shell string `codec:"shell"` -} - -// TaskConfig contains configuration information for a task that runs with -// this plugin -type TaskConfig struct { - // TODO: create decoded plugin task configuration struct - // - // This struct is the decoded version of the schema defined in the - // taskConfigSpec variable above. It's used to convert the string - // configuration for the task into Go contructs. - Greeting string `codec:"greeting"` -} - -// TaskState is the runtime state which is encoded in the handle returned to -// Nomad client. -// This information is needed to rebuild the task state and handler during -// recovery. -type TaskState struct { - ReattachConfig *structs.ReattachConfig - TaskConfig *drivers.TaskConfig - StartedAt time.Time - - // TODO: add any extra important values that must be persisted in order - // to restore a task. - // - // The plugin keeps track of its running tasks in a in-memory data - // structure. If the plugin crashes, this data will be lost, so Nomad - // will respawn a new instance of the plugin and try to restore its - // in-memory representation of the running tasks using the RecoverTask() - // method below. - Pid int -} - -// HelloDriverPlugin is an example driver plugin. When provisioned in a job, -// the taks will output a greet specified by the user. -type HelloDriverPlugin struct { +// Driver fork/execs tasks using many of the underlying OS's isolation +// features where configured. +type Driver struct { // eventer is used to handle multiplexing of TaskEvents calls such that an // event can be broadcast to all callers eventer *eventer.Eventer - // config is the plugin configuration set by the SetConfig RPC - config *Config + // config is the driver configuration set by the SetConfig RPC + config Config - // nomadConfig is the client config from Nomad + // nomadConfig is the client config from nomad nomadConfig *base.ClientDriverConfig - // tasks is the in memory datastore mapping taskIDs to driver handles + // tasks is the in memory datastore mapping taskIDs to driverHandles tasks *taskStore // ctx is the context for the driver. It is passed to other subsystems to // coordinate shutdown ctx context.Context - // signalShutdown is called when the driver is shutting down and cancels - // the ctx passed to any subsystems - signalShutdown context.CancelFunc - // logger will log to the Nomad agent logger hclog.Logger + + // A tri-state boolean to know if the fingerprinting has happened and + // whether it has been successful + fingerprintSuccess *bool + fingerprintLock sync.Mutex } -// NewPlugin returns a new example driver plugin -func NewPlugin(logger hclog.Logger) drivers.DriverPlugin { - ctx, cancel := context.WithCancel(context.Background()) - logger = logger.Named(pluginName) +// Config is the driver configuration set by the SetConfig RPC call +type Config struct { + // NoPivotRoot disables the use of pivot_root, useful when the root partition + // is on ramdisk + NoPivotRoot bool `codec:"no_pivot_root"` - return &HelloDriverPlugin{ - eventer: eventer.NewEventer(ctx, logger), - config: &Config{}, - tasks: newTaskStore(), - ctx: ctx, - signalShutdown: cancel, - logger: logger, + // DefaultModePID is the default PID isolation set for all tasks using + // exec-based task drivers. + DefaultModePID string `codec:"default_pid_mode"` + + // DefaultModeIPC is the default IPC isolation set for all tasks using + // exec-based task drivers. + DefaultModeIPC string `codec:"default_ipc_mode"` + + // AllowCaps configures which Linux Capabilities are enabled for tasks + // running on this node. + AllowCaps []string `codec:"allow_caps"` +} + +func (c *Config) validate() error { + switch c.DefaultModePID { + case executor.IsolationModePrivate, executor.IsolationModeHost: + default: + return fmt.Errorf("default_pid_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, c.DefaultModePID) + } + + switch c.DefaultModeIPC { + case executor.IsolationModePrivate, executor.IsolationModeHost: + default: + return fmt.Errorf("default_ipc_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, c.DefaultModeIPC) + } + + badCaps := capabilities.Supported().Difference(capabilities.New(c.AllowCaps)) + if !badCaps.Empty() { + return fmt.Errorf("allow_caps configured with capabilities not supported by system: %s", badCaps) + } + + return nil +} + +// TaskConfig is the driver configuration of a task within a job +type TaskConfig struct { + // Command is the thing to exec. + Command string `codec:"command"` + + // Args are passed along to Command. + Args []string `codec:"args"` + + // ModePID indicates whether PID namespace isolation is enabled for the task. + // Must be "private" or "host" if set. + ModePID string `codec:"pid_mode"` + + // ModeIPC indicates whether IPC namespace isolation is enabled for the task. + // Must be "private" or "host" if set. + ModeIPC string `codec:"ipc_mode"` + + // CapAdd is a set of linux capabilities to enable. + CapAdd []string `codec:"cap_add"` + + // CapDrop is a set of linux capabilities to disable. + CapDrop []string `codec:"cap_drop"` +} + +func (tc *TaskConfig) validate() error { + switch tc.ModePID { + case "", executor.IsolationModePrivate, executor.IsolationModeHost: + default: + return fmt.Errorf("pid_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, tc.ModePID) + } + + switch tc.ModeIPC { + case "", executor.IsolationModePrivate, executor.IsolationModeHost: + default: + return fmt.Errorf("ipc_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, tc.ModeIPC) + } + + supported := capabilities.Supported() + badAdds := supported.Difference(capabilities.New(tc.CapAdd)) + if !badAdds.Empty() { + return fmt.Errorf("cap_add configured with capabilities not supported by system: %s", badAdds) + } + badDrops := supported.Difference(capabilities.New(tc.CapDrop)) + if !badDrops.Empty() { + return fmt.Errorf("cap_drop configured with capabilities not supported by system: %s", badDrops) + } + + return nil +} + +// TaskState is the state which is encoded in the handle returned in +// StartTask. This information is needed to rebuild the task state and handler +// during recovery. +type TaskState struct { + ReattachConfig *pstructs.ReattachConfig + TaskConfig *drivers.TaskConfig + Pid int + StartedAt time.Time +} + +// NewExecDriver returns a new DrivePlugin implementation +func NewExecDriver(ctx context.Context, logger hclog.Logger) drivers.DriverPlugin { + logger = logger.Named(pluginName) + return &Driver{ + eventer: eventer.NewEventer(ctx, logger), + tasks: newTaskStore(), + ctx: ctx, + logger: logger, } } -// PluginInfo returns information describing the plugin. -func (d *HelloDriverPlugin) PluginInfo() (*base.PluginInfoResponse, error) { +// setFingerprintSuccess marks the driver as having fingerprinted successfully +func (d *Driver) setFingerprintSuccess() { + d.fingerprintLock.Lock() + d.fingerprintSuccess = pointer.Of(true) + d.fingerprintLock.Unlock() +} + +// setFingerprintFailure marks the driver as having failed fingerprinting +func (d *Driver) setFingerprintFailure() { + d.fingerprintLock.Lock() + d.fingerprintSuccess = pointer.Of(false) + d.fingerprintLock.Unlock() +} + +// fingerprintSuccessful returns true if the driver has +// never fingerprinted or has successfully fingerprinted +func (d *Driver) fingerprintSuccessful() bool { + d.fingerprintLock.Lock() + defer d.fingerprintLock.Unlock() + return d.fingerprintSuccess == nil || *d.fingerprintSuccess +} + +func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) { return pluginInfo, nil } -// ConfigSchema returns the plugin configuration schema. -func (d *HelloDriverPlugin) ConfigSchema() (*hclspec.Spec, error) { +func (d *Driver) ConfigSchema() (*hclspec.Spec, error) { return configSpec, nil } -// SetConfig is called by the client to pass the configuration for the plugin. -func (d *HelloDriverPlugin) SetConfig(cfg *base.Config) error { +func (d *Driver) SetConfig(cfg *base.Config) error { + // unpack, validate, and set agent plugin config var config Config if len(cfg.PluginConfig) != 0 { if err := base.MsgPackDecode(cfg.PluginConfig, &config); err != nil { return err } } - - // Save the configuration to the plugin - d.config = &config - - // TODO: parse and validated any configuration value if necessary. - // - // If your driver agent configuration requires any complex validation - // (some dependency between attributes) or special data parsing (the - // string "10s" into a time.Interval) you can do it here and update the - // value in d.config. - // - // In the example below we check if the shell specified by the user is - // supported by the plugin. - shell := d.config.Shell - if shell != "bash" && shell != "fish" { - return fmt.Errorf("invalid shell %s", d.config.Shell) + if err := config.validate(); err != nil { + return err } + d.config = config - // Save the Nomad agent configuration - if cfg.AgentConfig != nil { + if cfg != nil && cfg.AgentConfig != nil { d.nomadConfig = cfg.AgentConfig.Driver } - - // TODO: initialize any extra requirements if necessary. - // - // Here you can use the config values to initialize any resources that are - // shared by all tasks that use this driver, such as a daemon process. - return nil } -// TaskConfigSchema returns the HCL schema for the configuration of a task. -func (d *HelloDriverPlugin) TaskConfigSchema() (*hclspec.Spec, error) { +func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) { return taskConfigSpec, nil } -// Capabilities returns the features supported by the driver. -func (d *HelloDriverPlugin) Capabilities() (*drivers.Capabilities, error) { - return capabilities, nil +// Capabilities is returned by the Capabilities RPC and indicates what +// optional features this driver supports +func (d *Driver) Capabilities() (*drivers.Capabilities, error) { + return driverCapabilities, nil } -// Fingerprint returns a channel that will be used to send health information -// and other driver specific node attributes. -func (d *HelloDriverPlugin) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) { +func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) { ch := make(chan *drivers.Fingerprint) go d.handleFingerprint(ctx, ch) return ch, nil + } - -// handleFingerprint manages the channel and the flow of fingerprint data. -func (d *HelloDriverPlugin) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) { +func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) { defer close(ch) - - // Nomad expects the initial fingerprint to be sent immediately ticker := time.NewTimer(0) for { select { @@ -279,65 +319,110 @@ func (d *HelloDriverPlugin) handleFingerprint(ctx context.Context, ch chan<- *dr case <-d.ctx.Done(): return case <-ticker.C: - // after the initial fingerprint we can set the proper fingerprint - // period ticker.Reset(fingerprintPeriod) ch <- d.buildFingerprint() } } } -// buildFingerprint returns the driver's fingerprint data -func (d *HelloDriverPlugin) buildFingerprint() *drivers.Fingerprint { +func (d *Driver) buildFingerprint() *drivers.Fingerprint { + if runtime.GOOS != "linux" { + d.setFingerprintFailure() + return &drivers.Fingerprint{ + Health: drivers.HealthStateUndetected, + HealthDescription: "exec driver unsupported on client OS", + } + } + fp := &drivers.Fingerprint{ - Attributes: map[string]*structs.Attribute{}, + Attributes: map[string]*pstructs.Attribute{}, Health: drivers.HealthStateHealthy, HealthDescription: drivers.DriverHealthy, } - // TODO: implement fingerprinting logic to populate health and driver - // attributes. - // - // Fingerprinting is used by the plugin to relay two important information - // to Nomad: health state and node attributes. - // - // If the plugin reports to be unhealthy, or doesn't send any fingerprint - // data in the expected interval of time, Nomad will restart it. - // - // Node attributes can be used to report any relevant information about - // the node in which the plugin is running (specific library availability, - // installed versions of a software etc.). These attributes can then be - // used by an operator to set job constrains. - // - // In the example below we check if the shell specified by the user exists - // in the node. - shell := d.config.Shell + if !utils.IsUnixRoot() { + fp.Health = drivers.HealthStateUndetected + fp.HealthDescription = drivers.DriverRequiresRootMessage + d.setFingerprintFailure() + return fp + } - cmd := exec.Command("which", shell) - if err := cmd.Run(); err != nil { - return &drivers.Fingerprint{ - Health: drivers.HealthStateUndetected, - HealthDescription: fmt.Sprintf("shell %s not found", shell), + mount, err := cgutil.FindCgroupMountpointDir() + if err != nil { + fp.Health = drivers.HealthStateUnhealthy + fp.HealthDescription = drivers.NoCgroupMountMessage + if d.fingerprintSuccessful() { + d.logger.Warn(fp.HealthDescription, "error", err) } + d.setFingerprintFailure() + return fp } - // We also set the shell and its version as attributes - cmd = exec.Command(shell, "--version") - if out, err := cmd.Output(); err != nil { - d.logger.Warn("failed to find shell version: %v", err) - } else { - re := regexp.MustCompile("[0-9]\\.[0-9]\\.[0-9]") - version := re.FindString(string(out)) - - fp.Attributes["driver.hello.shell_version"] = structs.NewStringAttribute(version) - fp.Attributes["driver.hello.shell"] = structs.NewStringAttribute(shell) + if mount == "" { + fp.Health = drivers.HealthStateUnhealthy + fp.HealthDescription = drivers.CgroupMountEmpty + d.setFingerprintFailure() + return fp } + fp.Attributes["driver.exec"] = pstructs.NewBoolAttribute(true) + d.setFingerprintSuccess() return fp } -// StartTask returns a task handle and a driver network if necessary. -func (d *HelloDriverPlugin) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) { +func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { + if handle == nil { + return fmt.Errorf("handle cannot be nil") + } + + // If already attached to handle there's nothing to recover. + if _, ok := d.tasks.Get(handle.Config.ID); ok { + d.logger.Trace("nothing to recover; task already exists", + "task_id", handle.Config.ID, + "task_name", handle.Config.Name, + ) + return nil + } + + // Handle doesn't already exist, try to reattach + var taskState TaskState + if err := handle.GetDriverState(&taskState); err != nil { + d.logger.Error("failed to decode task state from handle", "error", err, "task_id", handle.Config.ID) + return fmt.Errorf("failed to decode task state from handle: %v", err) + } + + // Create client for reattached executor + plugRC, err := pstructs.ReattachConfigToGoPlugin(taskState.ReattachConfig) + if err != nil { + d.logger.Error("failed to build ReattachConfig from task state", "error", err, "task_id", handle.Config.ID) + return fmt.Errorf("failed to build ReattachConfig from task state: %v", err) + } + + exec, pluginClient, err := executor.ReattachToExecutor(plugRC, + d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID)) + if err != nil { + d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID) + return fmt.Errorf("failed to reattach to executor: %v", err) + } + + h := &taskHandle{ + exec: exec, + pid: taskState.Pid, + pluginClient: pluginClient, + taskConfig: taskState.TaskConfig, + procState: drivers.TaskStateRunning, + startedAt: taskState.StartedAt, + exitResult: &drivers.ExitResult{}, + logger: d.logger, + } + + d.tasks.Set(taskState.TaskConfig.ID, h) + + go h.run() + return nil +} + +func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) { if _, ok := d.tasks.Get(cfg.ID); ok { return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID) } @@ -347,42 +432,66 @@ func (d *HelloDriverPlugin) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHan return nil, nil, fmt.Errorf("failed to decode driver config: %v", err) } + if err := driverConfig.validate(); err != nil { + return nil, nil, fmt.Errorf("failed driver config validation: %v", err) + } + d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig)) handle := drivers.NewTaskHandle(taskHandleVersion) handle.Config = cfg - // TODO: implement driver specific mechanism to start the task. - // - // Once the task is started you will need to store any relevant runtime - // information in a taskHandle and TaskState. The taskHandle will be - // stored in-memory in the plugin and will be used to interact with the - // task. - // - // The TaskState will be returned to the Nomad client inside a - // drivers.TaskHandle instance. This TaskHandle will be sent back to plugin - // if the task ever needs to be recovered, so the TaskState should contain - // enough information to handle that. - // - // In the example below we use an executor to fork a process to run our - // greeter. The executor is then stored in the handle so we can access it - // later and the the plugin.Client is used to generate a reattach - // configuration that can be used to recover communication with the task. + pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out") executorConfig := &executor.ExecutorConfig{ - LogFile: filepath.Join(cfg.TaskDir().Dir, "executor.out"), - LogLevel: "debug", + LogFile: pluginLogFile, + LogLevel: "debug", + FSIsolation: true, } - exec, pluginClient, err := executor.CreateExecutor(d.logger, d.nomadConfig, executorConfig) + exec, pluginClient, err := executor.CreateExecutor( + d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID), + d.nomadConfig, executorConfig) if err != nil { return nil, nil, fmt.Errorf("failed to create executor: %v", err) } - echoCmd := fmt.Sprintf(`echo "%s"`, driverConfig.Greeting) + user := cfg.User + if user == "" { + user = "nobody" + } + + if cfg.DNS != nil { + dnsMount, err := resolvconf.GenerateDNSMount(cfg.TaskDir().Dir, cfg.DNS) + if err != nil { + return nil, nil, fmt.Errorf("failed to build mount for resolv.conf: %v", err) + } + cfg.Mounts = append(cfg.Mounts, dnsMount) + } + + caps, err := capabilities.Calculate( + capabilities.NomadDefaults(), d.config.AllowCaps, driverConfig.CapAdd, driverConfig.CapDrop, + ) + if err != nil { + return nil, nil, err + } + d.logger.Debug("task capabilities", "capabilities", caps) + execCmd := &executor.ExecCommand{ - Cmd: d.config.Shell, - Args: []string{"-c", echoCmd}, - StdoutPath: cfg.StdoutPath, - StderrPath: cfg.StderrPath, + Cmd: driverConfig.Command, + Args: driverConfig.Args, + Env: cfg.EnvList(), + User: user, + ResourceLimits: true, + NoPivotRoot: d.config.NoPivotRoot, + Resources: cfg.Resources, + TaskDir: cfg.TaskDir().Dir, + StdoutPath: cfg.StdoutPath, + StderrPath: cfg.StderrPath, + Mounts: cfg.Mounts, + Devices: cfg.Devices, + NetworkIsolation: cfg.NetworkIsolation, + ModePID: executor.IsolationMode(d.config.DefaultModePID, driverConfig.ModePID), + ModeIPC: executor.IsolationMode(d.config.DefaultModeIPC, driverConfig.ModeIPC), + Capabilities: caps, } ps, err := exec.Launch(execCmd) @@ -402,13 +511,16 @@ func (d *HelloDriverPlugin) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHan } driverState := TaskState{ - ReattachConfig: structs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()), + ReattachConfig: pstructs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()), Pid: ps.Pid, TaskConfig: cfg, StartedAt: h.startedAt, } if err := handle.SetDriverState(&driverState); err != nil { + d.logger.Error("failed to start task, error setting driver state", "error", err) + _ = exec.Shutdown("", 0) + pluginClient.Kill() return nil, nil, fmt.Errorf("failed to set driver state: %v", err) } @@ -417,61 +529,7 @@ func (d *HelloDriverPlugin) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHan return handle, nil, nil } -// RecoverTask recreates the in-memory state of a task from a TaskHandle. -func (d *HelloDriverPlugin) RecoverTask(handle *drivers.TaskHandle) error { - if handle == nil { - return errors.New("error: handle cannot be nil") - } - - if _, ok := d.tasks.Get(handle.Config.ID); ok { - return nil - } - - var taskState TaskState - if err := handle.GetDriverState(&taskState); err != nil { - return fmt.Errorf("failed to decode task state from handle: %v", err) - } - - var driverConfig TaskConfig - if err := taskState.TaskConfig.DecodeDriverConfig(&driverConfig); err != nil { - return fmt.Errorf("failed to decode driver config: %v", err) - } - - // TODO: implement driver specific logic to recover a task. - // - // Recovering a task involves recreating and storing a taskHandle as if the - // task was just started. - // - // In the example below we use the executor to re-attach to the process - // that was created when the task first started. - plugRC, err := structs.ReattachConfigToGoPlugin(taskState.ReattachConfig) - if err != nil { - return fmt.Errorf("failed to build ReattachConfig from taskConfig state: %v", err) - } - - execImpl, pluginClient, err := executor.ReattachToExecutor(plugRC, d.logger) - if err != nil { - return fmt.Errorf("failed to reattach to executor: %v", err) - } - - h := &taskHandle{ - exec: execImpl, - pid: taskState.Pid, - pluginClient: pluginClient, - taskConfig: taskState.TaskConfig, - procState: drivers.TaskStateRunning, - startedAt: taskState.StartedAt, - exitResult: &drivers.ExitResult{}, - } - - d.tasks.Set(taskState.TaskConfig.ID, h) - - go h.run() - return nil -} - -// WaitTask returns a channel used to notify Nomad when a task exits. -func (d *HelloDriverPlugin) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) { +func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) { handle, ok := d.tasks.Get(taskID) if !ok { return nil, drivers.ErrTaskNotFound @@ -479,23 +537,13 @@ func (d *HelloDriverPlugin) WaitTask(ctx context.Context, taskID string) (<-chan ch := make(chan *drivers.ExitResult) go d.handleWait(ctx, handle, ch) + return ch, nil } -func (d *HelloDriverPlugin) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) { +func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) { defer close(ch) var result *drivers.ExitResult - - // TODO: implement driver specific logic to notify Nomad the task has been - // completed and what was the exit result. - // - // When a result is sent in the result channel Nomad will stop the task and - // emit an event that an operator can use to get an insight on why the task - // stopped. - // - // In the example below we block and wait until the executor finishes - // running, at which point we send the exit code and signal in the result - // channel. ps, err := handle.exec.Wait(ctx) if err != nil { result = &drivers.ExitResult{ @@ -508,33 +556,21 @@ func (d *HelloDriverPlugin) handleWait(ctx context.Context, handle *taskHandle, } } - for { - select { - case <-ctx.Done(): - return - case <-d.ctx.Done(): - return - case ch <- result: - } + select { + case <-ctx.Done(): + return + case <-d.ctx.Done(): + return + case ch <- result: } } -// StopTask stops a running task with the given signal and within the timeout window. -func (d *HelloDriverPlugin) StopTask(taskID string, timeout time.Duration, signal string) error { +func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error { handle, ok := d.tasks.Get(taskID) if !ok { return drivers.ErrTaskNotFound } - // TODO: implement driver specific logic to stop a task. - // - // The StopTask function is expected to stop a running task by sending the - // given signal to it. If the task does not stop during the given timeout, - // the driver must forcefully kill the task. - // - // In the example below we let the executor handle the task shutdown - // process for us, but you might need to customize this for your own - // implementation. if err := handle.exec.Shutdown(signal, timeout); err != nil { if handle.pluginClient.Exited() { return nil @@ -545,39 +581,51 @@ func (d *HelloDriverPlugin) StopTask(taskID string, timeout time.Duration, signa return nil } -// DestroyTask cleans up and removes a task that has terminated. -func (d *HelloDriverPlugin) DestroyTask(taskID string, force bool) error { +// resetCgroup will re-create the v2 cgroup for the task after the task has been +// destroyed by libcontainer. In the case of a task restart we call DestroyTask +// which removes the cgroup - but we still need it! +// +// Ideally the cgroup management would be more unified - and we could do the creation +// on a task runner pre-start hook, eliminating the need for this hack. +func (d *Driver) resetCgroup(handle *taskHandle) { + if cgutil.UseV2 { + if handle.taskConfig.Resources != nil && + handle.taskConfig.Resources.LinuxResources != nil && + handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath != "" { + err := os.Mkdir(handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath, 0755) + if err != nil { + d.logger.Trace("failed to reset cgroup", "path", handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath) + } + } + } +} + +func (d *Driver) DestroyTask(taskID string, force bool) error { handle, ok := d.tasks.Get(taskID) if !ok { return drivers.ErrTaskNotFound } if handle.IsRunning() && !force { - return errors.New("cannot destroy running task") + return fmt.Errorf("cannot destroy running task") } - // TODO: implement driver specific logic to destroy a complete task. - // - // Destroying a task includes removing any resources used by task and any - // local references in the plugin. If force is set to true the task should - // be destroyed even if it's currently running. - // - // In the example below we use the executor to force shutdown the task - // (timeout equals 0). if !handle.pluginClient.Exited() { if err := handle.exec.Shutdown("", 0); err != nil { - handle.logger.Error("destroying executor failed", "err", err) + handle.logger.Error("destroying executor failed", "error", err) } handle.pluginClient.Kill() } + // workaround for the case where DestroyTask was issued on task restart + d.resetCgroup(handle) + d.tasks.Delete(taskID) return nil } -// InspectTask returns detailed status information for the referenced taskID. -func (d *HelloDriverPlugin) InspectTask(taskID string) (*drivers.TaskStatus, error) { +func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { handle, ok := d.tasks.Get(taskID) if !ok { return nil, drivers.ErrTaskNotFound @@ -586,42 +634,25 @@ func (d *HelloDriverPlugin) InspectTask(taskID string) (*drivers.TaskStatus, err return handle.TaskStatus(), nil } -// TaskStats returns a channel which the driver should send stats to at the given interval. -func (d *HelloDriverPlugin) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) { +func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) { handle, ok := d.tasks.Get(taskID) if !ok { return nil, drivers.ErrTaskNotFound } - // TODO: implement driver specific logic to send task stats. - // - // This function returns a channel that Nomad will use to listen for task - // stats (e.g., CPU and memory usage) in a given interval. It should send - // stats until the context is canceled or the task stops running. - // - // In the example below we use the Stats function provided by the executor, - // but you can build a set of functions similar to the fingerprint process. return handle.exec.Stats(ctx, interval) } -// TaskEvents returns a channel that the plugin can use to emit task related events. -func (d *HelloDriverPlugin) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) { +func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) { return d.eventer.TaskEvents(ctx) } -// SignalTask forwards a signal to a task. -// This is an optional capability. -func (d *HelloDriverPlugin) SignalTask(taskID string, signal string) error { +func (d *Driver) SignalTask(taskID string, signal string) error { handle, ok := d.tasks.Get(taskID) if !ok { return drivers.ErrTaskNotFound } - // TODO: implement driver specific signal handling logic. - // - // The given signal must be forwarded to the target taskID. If this plugin - // doesn't support receiving signals (capability SendSignals is set to - // false) you can just return nil. sig := os.Interrupt if s, ok := signals.SignalLookup[signal]; ok { sig = s @@ -632,9 +663,48 @@ func (d *HelloDriverPlugin) SignalTask(taskID string, signal string) error { return handle.exec.Signal(sig) } -// ExecTask returns the result of executing the given command inside a task. -// This is an optional capability. -func (d *HelloDriverPlugin) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) { - // TODO: implement driver specific logic to execute commands in a task. - return nil, errors.New("This driver does not support exec") +func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) { + if len(cmd) == 0 { + return nil, fmt.Errorf("error cmd must have at least one value") + } + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + args := []string{} + if len(cmd) > 1 { + args = cmd[1:] + } + + out, exitCode, err := handle.exec.Exec(time.Now().Add(timeout), cmd[0], args) + if err != nil { + return nil, err + } + + return &drivers.ExecTaskResult{ + Stdout: out, + ExitResult: &drivers.ExitResult{ + ExitCode: exitCode, + }, + }, nil +} + +var _ drivers.ExecTaskStreamingRawDriver = (*Driver)(nil) + +func (d *Driver) ExecTaskStreamingRaw(ctx context.Context, + taskID string, + command []string, + tty bool, + stream drivers.ExecTaskStream) error { + + if len(command) == 0 { + return fmt.Errorf("error cmd must have at least one value") + } + handle, ok := d.tasks.Get(taskID) + if !ok { + return drivers.ErrTaskNotFound + } + + return handle.exec.ExecStreaming(ctx, command, tty, stream) } diff --git a/exec2/handle.go b/exec2/handle.go index 6dfb976..606406b 100644 --- a/exec2/handle.go +++ b/exec2/handle.go @@ -1,4 +1,4 @@ -package hello +package exec import ( "context" @@ -6,30 +6,26 @@ import ( "sync" "time" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-plugin" + hclog "github.com/hashicorp/go-hclog" + plugin "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/plugins/drivers" ) -// taskHandle should store all relevant runtime information -// such as process ID if this is a local task or other meta -// data if this driver deals with external APIs type taskHandle struct { + exec executor.Executor + pid int + pluginClient *plugin.Client + logger hclog.Logger + // stateLock syncs access to all fields below stateLock sync.RWMutex - logger hclog.Logger - exec executor.Executor - pluginClient *plugin.Client - taskConfig *drivers.TaskConfig - procState drivers.TaskState - startedAt time.Time - completedAt time.Time - exitResult *drivers.ExitResult - - // TODO: add any extra relevant information about the task. - pid int + taskConfig *drivers.TaskConfig + procState drivers.TaskState + startedAt time.Time + completedAt time.Time + exitResult *drivers.ExitResult } func (h *taskHandle) TaskStatus() *drivers.TaskStatus { @@ -62,8 +58,9 @@ func (h *taskHandle) run() { } h.stateLock.Unlock() - // TODO: wait for your task to complete and upate its state. + // Block until process exits ps, err := h.exec.Wait(context.Background()) + h.stateLock.Lock() defer h.stateLock.Unlock() @@ -77,4 +74,6 @@ func (h *taskHandle) run() { h.exitResult.ExitCode = ps.ExitCode h.exitResult.Signal = ps.Signal h.completedAt = ps.Time + + // TODO: detect if the task OOMed } diff --git a/exec2/pull-upstream.sh b/exec2/pull-upstream.sh new file mode 100755 index 0000000..a797951 --- /dev/null +++ b/exec2/pull-upstream.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +REF=v1.4.3 + +wget https://github.com/hashicorp/nomad/raw/${REF}/drivers/exec/driver.go -O driver.go +wget https://github.com/hashicorp/nomad/raw/${REF}/drivers/exec/handle.go -O handle.go +wget https://github.com/hashicorp/nomad/raw/${REF}/drivers/exec/state.go -O state.go diff --git a/exec2/state.go b/exec2/state.go index 30f10d7..08cdee8 100644 --- a/exec2/state.go +++ b/exec2/state.go @@ -1,12 +1,9 @@ -package hello +package exec import ( "sync" ) -// taskStore provides a mechanism to store and retrieve -// task handles given a string identifier. The ID should -// be unique per task type taskStore struct { store map[string]*taskHandle lock sync.RWMutex