nomad-mirage-plugin/hello/driver.go
2023-02-27 16:20:30 +00:00

643 lines
21 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package hello
import (
"context"
"errors"
"fmt"
"os"
"os/exec"
"path/filepath"
"regexp"
"time"
"github.com/hashicorp/consul-template/signals"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/drivers/shared/eventer"
"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/structs"
)
const (
// pluginName is the name of the plugin
// this is used for logging and (along with the version) for uniquely
// identifying plugin binaries fingerprinted by the client
pluginName = "hello-world-example"
// pluginVersion allows the client to identify and use newer versions of
// an installed plugin
pluginVersion = "v0.1.0"
// fingerprintPeriod is the interval at which the plugin will send
// fingerprint responses
fingerprintPeriod = 30 * time.Second
// taskHandleVersion is the version of task handle which this plugin sets
// and understands how to decode
// this is used to allow modification and migration of the task schema
// used by the plugin
taskHandleVersion = 1
)
var (
// pluginInfo describes the plugin
pluginInfo = &base.PluginInfoResponse{
Type: base.PluginTypeDriver,
PluginApiVersions: []string{drivers.ApiVersion010},
PluginVersion: pluginVersion,
Name: pluginName,
}
// configSpec is the specification of the plugin's configuration
// this is used to validate the configuration specified for the plugin
// on the client.
// this is not global, but can be specified on a per-client basis.
configSpec = hclspec.NewObject(map[string]*hclspec.Spec{
// TODO: define plugin's agent configuration schema.
//
// The schema should be defined using HCL specs and it will be used to
// validate the agent configuration provided by the user in the
// `plugin` stanza (https://www.nomadproject.io/docs/configuration/plugin.html).
//
// For example, for the schema below a valid configuration would be:
//
// plugin "hello-driver-plugin" {
// config {
// shell = "fish"
// }
// }
"shell": hclspec.NewDefault(
hclspec.NewAttr("shell", "string", false),
hclspec.NewLiteral(`"bash"`),
),
})
// taskConfigSpec is the specification of the plugin's configuration for
// a task
// this is used to validated the configuration specified for the plugin
// when a job is submitted.
taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
// TODO: define plugin's task configuration schema
//
// The schema should be defined using HCL specs and it will be used to
// validate the task configuration provided by the user when they
// submit a job.
//
// For example, for the schema below a valid task would be:
// job "example" {
// group "example" {
// task "say-hi" {
// driver = "hello-driver-plugin"
// config {
// greeting = "Hi"
// }
// }
// }
// }
"greeting": hclspec.NewDefault(
hclspec.NewAttr("greeting", "string", false),
hclspec.NewLiteral(`"Hello, World!"`),
),
})
// capabilities indicates what optional features this driver supports
// this should be set according to the target run time.
capabilities = &drivers.Capabilities{
// TODO: set plugin's capabilities
//
// The plugin's capabilities signal Nomad which extra functionalities
// are supported. For a list of available options check the docs page:
// https://godoc.org/github.com/hashicorp/nomad/plugins/drivers#Capabilities
SendSignals: true,
Exec: false,
}
)
// Config contains configuration information for the plugin
type Config struct {
// TODO: create decoded plugin configuration struct
//
// This struct is the decoded version of the schema defined in the
// configSpec variable above. It's used to convert the HCL configuration
// passed by the Nomad agent into Go contructs.
Shell string `codec:"shell"`
}
// TaskConfig contains configuration information for a task that runs with
// this plugin
type TaskConfig struct {
// TODO: create decoded plugin task configuration struct
//
// This struct is the decoded version of the schema defined in the
// taskConfigSpec variable above. It's used to convert the string
// configuration for the task into Go contructs.
Greeting string `codec:"greeting"`
}
// TaskState is the runtime state which is encoded in the handle returned to
// Nomad client.
// This information is needed to rebuild the task state and handler during
// recovery.
type TaskState struct {
ReattachConfig *structs.ReattachConfig
TaskConfig *drivers.TaskConfig
StartedAt time.Time
// TODO: add any extra important values that must be persisted in order
// to restore a task.
//
// The plugin keeps track of its running tasks in a in-memory data
// structure. If the plugin crashes, this data will be lost, so Nomad
// will respawn a new instance of the plugin and try to restore its
// in-memory representation of the running tasks using the RecoverTask()
// method below.
Pid int
}
// HelloDriverPlugin is an example driver plugin. When provisioned in a job,
// the taks will output a greet specified by the user.
type HelloDriverPlugin struct {
// eventer is used to handle multiplexing of TaskEvents calls such that an
// event can be broadcast to all callers
eventer *eventer.Eventer
// config is the plugin configuration set by the SetConfig RPC
config *Config
// nomadConfig is the client config from Nomad
nomadConfig *base.ClientDriverConfig
// tasks is the in memory datastore mapping taskIDs to driver handles
tasks *taskStore
// ctx is the context for the driver. It is passed to other subsystems to
// coordinate shutdown
ctx context.Context
// signalShutdown is called when the driver is shutting down and cancels
// the ctx passed to any subsystems
signalShutdown context.CancelFunc
// logger will log to the Nomad agent
logger hclog.Logger
}
// NewPlugin returns a new example driver plugin
func NewPlugin(logger hclog.Logger) drivers.DriverPlugin {
ctx, cancel := context.WithCancel(context.Background())
logger = logger.Named(pluginName)
return &HelloDriverPlugin{
eventer: eventer.NewEventer(ctx, logger),
config: &Config{},
tasks: newTaskStore(),
ctx: ctx,
signalShutdown: cancel,
logger: logger,
}
}
// PluginInfo returns information describing the plugin.
func (d *HelloDriverPlugin) PluginInfo() (*base.PluginInfoResponse, error) {
return pluginInfo, nil
}
// ConfigSchema returns the plugin configuration schema.
func (d *HelloDriverPlugin) ConfigSchema() (*hclspec.Spec, error) {
return configSpec, nil
}
// SetConfig is called by the client to pass the configuration for the plugin.
func (d *HelloDriverPlugin) SetConfig(cfg *base.Config) error {
var config Config
if len(cfg.PluginConfig) != 0 {
if err := base.MsgPackDecode(cfg.PluginConfig, &config); err != nil {
return err
}
}
// Save the configuration to the plugin
d.config = &config
// TODO: parse and validated any configuration value if necessary.
//
// If your driver agent configuration requires any complex validation
// (some dependency between attributes) or special data parsing (the
// string "10s" into a time.Interval) you can do it here and update the
// value in d.config.
//
// In the example below we check if the shell specified by the user is
// supported by the plugin.
shell := d.config.Shell
if shell != "bash" && shell != "fish" {
return fmt.Errorf("invalid shell %s", d.config.Shell)
}
// Save the Nomad agent configuration
if cfg.AgentConfig != nil {
d.nomadConfig = cfg.AgentConfig.Driver
}
// TODO: initialize any extra requirements if necessary.
//
// Here you can use the config values to initialize any resources that are
// shared by all tasks that use this driver, such as a daemon process.
return nil
}
// TaskConfigSchema returns the HCL schema for the configuration of a task.
func (d *HelloDriverPlugin) TaskConfigSchema() (*hclspec.Spec, error) {
return taskConfigSpec, nil
}
// Capabilities returns the features supported by the driver.
func (d *HelloDriverPlugin) Capabilities() (*drivers.Capabilities, error) {
return capabilities, nil
}
// Fingerprint returns a channel that will be used to send health information
// and other driver specific node attributes.
func (d *HelloDriverPlugin) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
ch := make(chan *drivers.Fingerprint)
go d.handleFingerprint(ctx, ch)
return ch, nil
}
// handleFingerprint manages the channel and the flow of fingerprint data.
func (d *HelloDriverPlugin) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) {
defer close(ch)
// Nomad expects the initial fingerprint to be sent immediately
ticker := time.NewTimer(0)
for {
select {
case <-ctx.Done():
return
case <-d.ctx.Done():
return
case <-ticker.C:
// after the initial fingerprint we can set the proper fingerprint
// period
ticker.Reset(fingerprintPeriod)
ch <- d.buildFingerprint()
}
}
}
// buildFingerprint returns the driver's fingerprint data
func (d *HelloDriverPlugin) buildFingerprint() *drivers.Fingerprint {
fp := &drivers.Fingerprint{
Attributes: map[string]*structs.Attribute{},
Health: drivers.HealthStateHealthy,
HealthDescription: drivers.DriverHealthy,
}
// TODO: implement fingerprinting logic to populate health and driver
// attributes.
//
// Fingerprinting is used by the plugin to relay two important information
// to Nomad: health state and node attributes.
//
// If the plugin reports to be unhealthy, or doesn't send any fingerprint
// data in the expected interval of time, Nomad will restart it.
//
// Node attributes can be used to report any relevant information about
// the node in which the plugin is running (specific library availability,
// installed versions of a software etc.). These attributes can then be
// used by an operator to set job constrains.
//
// In the example below we check if the shell specified by the user exists
// in the node.
shell := d.config.Shell
cmd := exec.Command("which", shell)
if err := cmd.Run(); err != nil {
return &drivers.Fingerprint{
Health: drivers.HealthStateUndetected,
HealthDescription: fmt.Sprintf("shell %s not found", shell),
}
}
// We also set the shell and its version as attributes
cmd = exec.Command(shell, "--version")
if out, err := cmd.Output(); err != nil {
d.logger.Warn("failed to find shell version: %v", err)
} else {
re := regexp.MustCompile("[0-9]\\.[0-9]\\.[0-9]")
version := re.FindString(string(out))
fp.Attributes["driver.hello.shell_version"] = structs.NewStringAttribute(version)
fp.Attributes["driver.hello.shell"] = structs.NewStringAttribute(shell)
}
return fp
}
// StartTask returns a task handle and a driver network if necessary.
func (d *HelloDriverPlugin) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) {
if _, ok := d.tasks.Get(cfg.ID); ok {
return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID)
}
var driverConfig TaskConfig
if err := cfg.DecodeDriverConfig(&driverConfig); err != nil {
return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
}
d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig))
handle := drivers.NewTaskHandle(taskHandleVersion)
handle.Config = cfg
// TODO: implement driver specific mechanism to start the task.
//
// Once the task is started you will need to store any relevant runtime
// information in a taskHandle and TaskState. The taskHandle will be
// stored in-memory in the plugin and will be used to interact with the
// task.
//
// The TaskState will be returned to the Nomad client inside a
// drivers.TaskHandle instance. This TaskHandle will be sent back to plugin
// if the task ever needs to be recovered, so the TaskState should contain
// enough information to handle that.
//
// In the example below we use an executor to fork a process to run our
// greeter. The executor is then stored in the handle so we can access it
// later and the the plugin.Client is used to generate a reattach
// configuration that can be used to recover communication with the task.
executorConfig := &executor.ExecutorConfig{
LogFile: filepath.Join(cfg.TaskDir().Dir, "executor.out"),
LogLevel: "debug",
}
exec, pluginClient, err := executor.CreateExecutor(d.logger, d.nomadConfig, executorConfig)
if err != nil {
return nil, nil, fmt.Errorf("failed to create executor: %v", err)
}
echoCmd := fmt.Sprintf(`echo "%s"`, driverConfig.Greeting)
execCmd := &executor.ExecCommand{
Cmd: d.config.Shell,
Args: []string{"-c", echoCmd},
StdoutPath: cfg.StdoutPath,
StderrPath: cfg.StderrPath,
}
ps, err := exec.Launch(execCmd)
if err != nil {
pluginClient.Kill()
return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err)
}
h := &taskHandle{
exec: exec,
pid: ps.Pid,
pluginClient: pluginClient,
taskConfig: cfg,
procState: drivers.TaskStateRunning,
startedAt: time.Now().Round(time.Millisecond),
logger: d.logger,
}
driverState := TaskState{
ReattachConfig: structs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()),
Pid: ps.Pid,
TaskConfig: cfg,
StartedAt: h.startedAt,
}
if err := handle.SetDriverState(&driverState); err != nil {
return nil, nil, fmt.Errorf("failed to set driver state: %v", err)
}
d.tasks.Set(cfg.ID, h)
go h.run()
return handle, nil, nil
}
// RecoverTask recreates the in-memory state of a task from a TaskHandle.
func (d *HelloDriverPlugin) RecoverTask(handle *drivers.TaskHandle) error {
if handle == nil {
return errors.New("error: handle cannot be nil")
}
if _, ok := d.tasks.Get(handle.Config.ID); ok {
return nil
}
var taskState TaskState
if err := handle.GetDriverState(&taskState); err != nil {
return fmt.Errorf("failed to decode task state from handle: %v", err)
}
var driverConfig TaskConfig
if err := taskState.TaskConfig.DecodeDriverConfig(&driverConfig); err != nil {
return fmt.Errorf("failed to decode driver config: %v", err)
}
// TODO: implement driver specific logic to recover a task.
//
// Recovering a task involves recreating and storing a taskHandle as if the
// task was just started.
//
// In the example below we use the executor to re-attach to the process
// that was created when the task first started.
plugRC, err := structs.ReattachConfigToGoPlugin(taskState.ReattachConfig)
if err != nil {
return fmt.Errorf("failed to build ReattachConfig from taskConfig state: %v", err)
}
execImpl, pluginClient, err := executor.ReattachToExecutor(plugRC, d.logger)
if err != nil {
return fmt.Errorf("failed to reattach to executor: %v", err)
}
h := &taskHandle{
exec: execImpl,
pid: taskState.Pid,
pluginClient: pluginClient,
taskConfig: taskState.TaskConfig,
procState: drivers.TaskStateRunning,
startedAt: taskState.StartedAt,
exitResult: &drivers.ExitResult{},
}
d.tasks.Set(taskState.TaskConfig.ID, h)
go h.run()
return nil
}
// WaitTask returns a channel used to notify Nomad when a task exits.
func (d *HelloDriverPlugin) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
ch := make(chan *drivers.ExitResult)
go d.handleWait(ctx, handle, ch)
return ch, nil
}
func (d *HelloDriverPlugin) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
defer close(ch)
var result *drivers.ExitResult
// TODO: implement driver specific logic to notify Nomad the task has been
// completed and what was the exit result.
//
// When a result is sent in the result channel Nomad will stop the task and
// emit an event that an operator can use to get an insight on why the task
// stopped.
//
// In the example below we block and wait until the executor finishes
// running, at which point we send the exit code and signal in the result
// channel.
ps, err := handle.exec.Wait(ctx)
if err != nil {
result = &drivers.ExitResult{
Err: fmt.Errorf("executor: error waiting on process: %v", err),
}
} else {
result = &drivers.ExitResult{
ExitCode: ps.ExitCode,
Signal: ps.Signal,
}
}
for {
select {
case <-ctx.Done():
return
case <-d.ctx.Done():
return
case ch <- result:
}
}
}
// StopTask stops a running task with the given signal and within the timeout window.
func (d *HelloDriverPlugin) StopTask(taskID string, timeout time.Duration, signal string) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
// TODO: implement driver specific logic to stop a task.
//
// The StopTask function is expected to stop a running task by sending the
// given signal to it. If the task does not stop during the given timeout,
// the driver must forcefully kill the task.
//
// In the example below we let the executor handle the task shutdown
// process for us, but you might need to customize this for your own
// implementation.
if err := handle.exec.Shutdown(signal, timeout); err != nil {
if handle.pluginClient.Exited() {
return nil
}
return fmt.Errorf("executor Shutdown failed: %v", err)
}
return nil
}
// DestroyTask cleans up and removes a task that has terminated.
func (d *HelloDriverPlugin) DestroyTask(taskID string, force bool) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
if handle.IsRunning() && !force {
return errors.New("cannot destroy running task")
}
// TODO: implement driver specific logic to destroy a complete task.
//
// Destroying a task includes removing any resources used by task and any
// local references in the plugin. If force is set to true the task should
// be destroyed even if it's currently running.
//
// In the example below we use the executor to force shutdown the task
// (timeout equals 0).
if !handle.pluginClient.Exited() {
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "err", err)
}
handle.pluginClient.Kill()
}
d.tasks.Delete(taskID)
return nil
}
// InspectTask returns detailed status information for the referenced taskID.
func (d *HelloDriverPlugin) InspectTask(taskID string) (*drivers.TaskStatus, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
return handle.TaskStatus(), nil
}
// TaskStats returns a channel which the driver should send stats to at the given interval.
func (d *HelloDriverPlugin) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
// TODO: implement driver specific logic to send task stats.
//
// This function returns a channel that Nomad will use to listen for task
// stats (e.g., CPU and memory usage) in a given interval. It should send
// stats until the context is canceled or the task stops running.
//
// In the example below we use the Stats function provided by the executor,
// but you can build a set of functions similar to the fingerprint process.
return handle.exec.Stats(ctx, interval)
}
// TaskEvents returns a channel that the plugin can use to emit task related events.
func (d *HelloDriverPlugin) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
return d.eventer.TaskEvents(ctx)
}
// SignalTask forwards a signal to a task.
// This is an optional capability.
func (d *HelloDriverPlugin) SignalTask(taskID string, signal string) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
// TODO: implement driver specific signal handling logic.
//
// The given signal must be forwarded to the target taskID. If this plugin
// doesn't support receiving signals (capability SendSignals is set to
// false) you can just return nil.
sig := os.Interrupt
if s, ok := signals.SignalLookup[signal]; ok {
sig = s
} else {
d.logger.Warn("unknown signal to send to task, using SIGINT instead", "signal", signal, "task_id", handle.taskConfig.ID)
}
return handle.exec.Signal(sig)
}
// ExecTask returns the result of executing the given command inside a task.
// This is an optional capability.
func (d *HelloDriverPlugin) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {
// TODO: implement driver specific logic to execute commands in a task.
return nil, errors.New("This driver does not support exec")
}