330 lines
8.6 KiB
Go
330 lines
8.6 KiB
Go
package skeleton
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
hclog "github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/nomad/drivers/shared/eventer"
|
|
"github.com/hashicorp/nomad/plugins/base"
|
|
"github.com/hashicorp/nomad/plugins/drivers"
|
|
"github.com/hashicorp/nomad/plugins/shared/hclspec"
|
|
structs "github.com/hashicorp/nomad/plugins/shared/structs"
|
|
)
|
|
|
|
const (
|
|
// pluginName is the name of the plugin
|
|
pluginName = "test-plugin"
|
|
|
|
// fingerprintPeriod is the interval at which the driver will send fingerprint responses
|
|
fingerprintPeriod = 30 * time.Second
|
|
|
|
// taskHandleVersion is the version of task handle which this driver sets
|
|
// and understands how to decode driver state
|
|
taskHandleVersion = 1
|
|
)
|
|
|
|
var (
|
|
// pluginInfo is the response returned for the PluginInfo RPC
|
|
pluginInfo = &base.PluginInfoResponse{
|
|
Type: base.PluginTypeDriver,
|
|
PluginApiVersions: []string{"0.1.0"},
|
|
PluginVersion: "0.0.1",
|
|
Name: pluginName,
|
|
}
|
|
|
|
// configSpec is the hcl specification returned by the ConfigSchema RPC
|
|
configSpec = hclspec.NewObject(map[string]*hclspec.Spec{
|
|
"enabled": hclspec.NewDefault(
|
|
hclspec.NewAttr("enabled", "bool", false),
|
|
hclspec.NewLiteral("true"),
|
|
),
|
|
})
|
|
|
|
// taskConfigSpec is the hcl specification for the driver config section of
|
|
// a taskConfig within a job. It is returned in the TaskConfigSchema RPC
|
|
taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
|
|
"strconfig": hclspec.NewAttr("strconfig", "string", true),
|
|
"boolcfg": hclspec.NewAttr("boolcfg", "bool", false),
|
|
})
|
|
|
|
// capabilities is returned by the Capabilities RPC and indicates what
|
|
// optional features this driver supports. This should be set according
|
|
// to the target run time
|
|
capabilities = &drivers.Capabilities{
|
|
SendSignals: true,
|
|
Exec: false,
|
|
}
|
|
)
|
|
|
|
// Driver is a skeleton driver implementation. The fields in this struct
|
|
// are to help plugin authors get started writing their runtime plugins.
|
|
type Driver struct {
|
|
// eventer is used to handle multiplexing of TaskEvents calls such that an
|
|
// event can be broadcast to all callers
|
|
eventer *eventer.Eventer
|
|
|
|
// config is the driver configuration set by the SetConfig RPC
|
|
config *Config
|
|
|
|
// nomadConfig is the client config from nomad
|
|
nomadConfig *base.ClientDriverConfig
|
|
|
|
// tasks is the in memory datastore mapping taskIDs to rawExecDriverHandles
|
|
tasks *taskStore
|
|
|
|
// ctx is the context for the driver. It is passed to other subsystems to
|
|
// coordinate shutdown
|
|
ctx context.Context
|
|
|
|
// signalShutdown is called when the driver is shutting down and cancels the
|
|
// ctx passed to any subsystems
|
|
signalShutdown context.CancelFunc
|
|
|
|
// logger will log to the Nomad agent
|
|
logger hclog.Logger
|
|
}
|
|
|
|
// Config is the driver configuration set by the SetConfig RPC call
|
|
type Config struct {
|
|
// Enabled is set to true to enable this driver
|
|
Enabled bool `codec:"enabled"`
|
|
}
|
|
|
|
// TaskConfig is the driver configuration of a task within a job
|
|
type TaskConfig struct {
|
|
StrConfig string `codec:"strconfig"`
|
|
BoolConfig bool `codec:"boolconfig"`
|
|
}
|
|
|
|
// TaskState is the state which is encoded in the handle returned in
|
|
// StartTask. This information is needed to rebuild the task state and handler
|
|
// during recovery.
|
|
type TaskState struct {
|
|
TaskConfig *drivers.TaskConfig
|
|
ContainerName string
|
|
StartedAt time.Time
|
|
PID int
|
|
}
|
|
|
|
// NewSkeletonDriver returns a new DriverPlugin implementation
|
|
func NewSkeletonDriver(logger hclog.Logger) drivers.DriverPlugin {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
logger = logger.Named(pluginName)
|
|
|
|
return &Driver{
|
|
eventer: eventer.NewEventer(ctx, logger),
|
|
config: &Config{},
|
|
tasks: newTaskStore(),
|
|
ctx: ctx,
|
|
signalShutdown: cancel,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) {
|
|
return pluginInfo, nil
|
|
}
|
|
|
|
func (d *Driver) ConfigSchema() (*hclspec.Spec, error) {
|
|
return configSpec, nil
|
|
}
|
|
|
|
func (d *Driver) SetConfig(cfg *base.Config) error {
|
|
var config Config
|
|
if len(cfg.PluginConfig) != 0 {
|
|
if err := base.MsgPackDecode(cfg.PluginConfig, &config); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
d.config = &config
|
|
if cfg.AgentConfig != nil {
|
|
d.nomadConfig = cfg.AgentConfig.Driver
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *Driver) Shutdown(ctx context.Context) error {
|
|
d.signalShutdown()
|
|
return nil
|
|
}
|
|
|
|
func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
|
|
return taskConfigSpec, nil
|
|
}
|
|
|
|
func (d *Driver) Capabilities() (*drivers.Capabilities, error) {
|
|
return capabilities, nil
|
|
}
|
|
|
|
func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
|
|
ch := make(chan *drivers.Fingerprint)
|
|
go d.handleFingerprint(ctx, ch)
|
|
return ch, nil
|
|
}
|
|
|
|
func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) {
|
|
defer close(ch)
|
|
ticker := time.NewTimer(0)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-d.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
ticker.Reset(fingerprintPeriod)
|
|
ch <- d.buildFingerprint()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Driver) buildFingerprint() *drivers.Fingerprint {
|
|
var health drivers.HealthState
|
|
var desc string
|
|
attrs := map[string]*structs.Attribute{}
|
|
|
|
// Implement finger printing logic here to populate health and driver attributes
|
|
|
|
return &drivers.Fingerprint{
|
|
Attributes: attrs,
|
|
Health: health,
|
|
HealthDescription: desc,
|
|
}
|
|
}
|
|
|
|
func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
|
|
if handle == nil {
|
|
return fmt.Errorf("error: handle cannot be nil")
|
|
}
|
|
|
|
if _, ok := d.tasks.Get(handle.Config.ID); ok {
|
|
return nil
|
|
}
|
|
|
|
var taskState TaskState
|
|
if err := handle.GetDriverState(&taskState); err != nil {
|
|
return fmt.Errorf("failed to decode task state from handle: %v", err)
|
|
}
|
|
|
|
var driverConfig TaskConfig
|
|
if err := taskState.TaskConfig.DecodeDriverConfig(&driverConfig); err != nil {
|
|
return fmt.Errorf("failed to decode driver config: %v", err)
|
|
}
|
|
|
|
// Implement driver specific way to reattach to a running task or restart
|
|
return nil
|
|
}
|
|
|
|
// StartTask setup the task exec and calls the container excecutor
|
|
func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) {
|
|
if _, ok := d.tasks.Get(cfg.ID); ok {
|
|
return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID)
|
|
}
|
|
|
|
var driverConfig TaskConfig
|
|
if err := cfg.DecodeDriverConfig(&driverConfig); err != nil {
|
|
return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
|
|
}
|
|
|
|
handle := drivers.NewTaskHandle(taskHandleVersion)
|
|
handle.Config = cfg
|
|
|
|
// Implement driver specific mechanism to start the task here
|
|
|
|
return nil, nil, nil
|
|
|
|
}
|
|
|
|
func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
|
|
handle, ok := d.tasks.Get(taskID)
|
|
if !ok {
|
|
return nil, drivers.ErrTaskNotFound
|
|
}
|
|
|
|
ch := make(chan *drivers.ExitResult)
|
|
go d.handleWait(ctx, handle, ch)
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
|
|
defer close(ch)
|
|
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-d.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
s := handle.TaskStatus()
|
|
if s.State == drivers.TaskStateExited {
|
|
ch <- handle.exitResult
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error {
|
|
handle, ok := d.tasks.Get(taskID)
|
|
if !ok {
|
|
return drivers.ErrTaskNotFound
|
|
}
|
|
|
|
if err := handle.shutdown(timeout); err != nil {
|
|
return fmt.Errorf("executor Shutdown failed: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *Driver) DestroyTask(taskID string, force bool) error {
|
|
handle, ok := d.tasks.Get(taskID)
|
|
if !ok {
|
|
return drivers.ErrTaskNotFound
|
|
}
|
|
|
|
if handle.IsRunning() && !force {
|
|
return fmt.Errorf("cannot destroy running task")
|
|
}
|
|
|
|
d.tasks.Delete(taskID)
|
|
return nil
|
|
}
|
|
|
|
func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
|
|
handle, ok := d.tasks.Get(taskID)
|
|
if !ok {
|
|
return nil, drivers.ErrTaskNotFound
|
|
}
|
|
|
|
return handle.TaskStatus(), nil
|
|
}
|
|
|
|
func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) {
|
|
handle, ok := d.tasks.Get(taskID)
|
|
if !ok {
|
|
return nil, drivers.ErrTaskNotFound
|
|
}
|
|
|
|
return handle.stats(ctx, interval)
|
|
}
|
|
|
|
func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
|
|
return d.eventer.TaskEvents(ctx)
|
|
}
|
|
|
|
func (d *Driver) SignalTask(taskID string, signal string) error {
|
|
return fmt.Errorf("This driver does not support signals")
|
|
}
|
|
|
|
func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {
|
|
return nil, fmt.Errorf("This driver does not support exec") //TODO
|
|
}
|