212 lines
5.6 KiB
Go
212 lines
5.6 KiB
Go
|
package executor
|
||
|
|
||
|
import (
|
||
|
"os"
|
||
|
"strconv"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
hclog "github.com/hashicorp/go-hclog"
|
||
|
"github.com/hashicorp/nomad/client/lib/resources"
|
||
|
"github.com/hashicorp/nomad/client/stats"
|
||
|
"github.com/hashicorp/nomad/plugins/drivers"
|
||
|
ps "github.com/mitchellh/go-ps"
|
||
|
"github.com/shirou/gopsutil/v3/process"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
// pidScanInterval is the interval at which the executor scans the process
|
||
|
// tree for finding out the pids that the executor and it's child processes
|
||
|
// have forked
|
||
|
pidScanInterval = 5 * time.Second
|
||
|
)
|
||
|
|
||
|
// pidCollector is a utility that can be embedded in an executor to collect pid
|
||
|
// stats
|
||
|
type pidCollector struct {
|
||
|
pids map[int]*resources.PID
|
||
|
pidLock sync.RWMutex
|
||
|
logger hclog.Logger
|
||
|
}
|
||
|
|
||
|
// allPidGetter is a func which is used by the pid collector to gather
|
||
|
// stats on
|
||
|
type allPidGetter func() (resources.PIDs, error)
|
||
|
|
||
|
func newPidCollector(logger hclog.Logger) *pidCollector {
|
||
|
return &pidCollector{
|
||
|
pids: make(map[int]*resources.PID),
|
||
|
logger: logger.Named("pid_collector"),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// collectPids collects the pids of the child processes that the executor is
|
||
|
// running every 5 seconds
|
||
|
func (c *pidCollector) collectPids(stopCh chan interface{}, pidGetter allPidGetter) {
|
||
|
// Fire the timer right away when the executor starts from there on the pids
|
||
|
// are collected every scan interval
|
||
|
timer := time.NewTimer(0)
|
||
|
defer timer.Stop()
|
||
|
for {
|
||
|
select {
|
||
|
case <-timer.C:
|
||
|
pids, err := pidGetter()
|
||
|
if err != nil {
|
||
|
c.logger.Debug("error collecting pids", "error", err)
|
||
|
}
|
||
|
c.pidLock.Lock()
|
||
|
|
||
|
// Adding pids which are not being tracked
|
||
|
for pid, np := range pids {
|
||
|
if _, ok := c.pids[pid]; !ok {
|
||
|
c.pids[pid] = np
|
||
|
}
|
||
|
}
|
||
|
// Removing pids which are no longer present
|
||
|
for pid := range c.pids {
|
||
|
if _, ok := pids[pid]; !ok {
|
||
|
delete(c.pids, pid)
|
||
|
}
|
||
|
}
|
||
|
c.pidLock.Unlock()
|
||
|
timer.Reset(pidScanInterval)
|
||
|
case <-stopCh:
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// scanPids scans all the pids on the machine running the current executor and
|
||
|
// returns the child processes of the executor.
|
||
|
func scanPids(parentPid int, allPids []ps.Process) (map[int]*resources.PID, error) {
|
||
|
processFamily := make(map[int]struct{})
|
||
|
processFamily[parentPid] = struct{}{}
|
||
|
|
||
|
// A mapping of pids to their parent pids. It is used to build the process
|
||
|
// tree of the executing task
|
||
|
pidsRemaining := make(map[int]int, len(allPids))
|
||
|
for _, pid := range allPids {
|
||
|
pidsRemaining[pid.Pid()] = pid.PPid()
|
||
|
}
|
||
|
|
||
|
for {
|
||
|
// flag to indicate if we have found a match
|
||
|
foundNewPid := false
|
||
|
|
||
|
for pid, ppid := range pidsRemaining {
|
||
|
_, childPid := processFamily[ppid]
|
||
|
|
||
|
// checking if the pid is a child of any of the parents
|
||
|
if childPid {
|
||
|
processFamily[pid] = struct{}{}
|
||
|
delete(pidsRemaining, pid)
|
||
|
foundNewPid = true
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// not scanning anymore if we couldn't find a single match
|
||
|
if !foundNewPid {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
|
||
|
res := make(map[int]*resources.PID)
|
||
|
for pid := range processFamily {
|
||
|
res[pid] = &resources.PID{
|
||
|
PID: pid,
|
||
|
StatsTotalCPU: stats.NewCpuStats(),
|
||
|
StatsUserCPU: stats.NewCpuStats(),
|
||
|
StatsSysCPU: stats.NewCpuStats(),
|
||
|
}
|
||
|
}
|
||
|
return res, nil
|
||
|
}
|
||
|
|
||
|
// pidStats returns the resource usage stats per pid
|
||
|
func (c *pidCollector) pidStats() (map[string]*drivers.ResourceUsage, error) {
|
||
|
stats := make(map[string]*drivers.ResourceUsage)
|
||
|
c.pidLock.RLock()
|
||
|
pids := make(map[int]*resources.PID, len(c.pids))
|
||
|
for k, v := range c.pids {
|
||
|
pids[k] = v
|
||
|
}
|
||
|
c.pidLock.RUnlock()
|
||
|
for pid, np := range pids {
|
||
|
p, err := process.NewProcess(int32(pid))
|
||
|
if err != nil {
|
||
|
c.logger.Trace("unable to create new process", "pid", pid, "error", err)
|
||
|
continue
|
||
|
}
|
||
|
ms := &drivers.MemoryStats{}
|
||
|
if memInfo, err := p.MemoryInfo(); err == nil {
|
||
|
ms.RSS = memInfo.RSS
|
||
|
ms.Swap = memInfo.Swap
|
||
|
ms.Measured = ExecutorBasicMeasuredMemStats
|
||
|
}
|
||
|
|
||
|
cs := &drivers.CpuStats{}
|
||
|
if cpuStats, err := p.Times(); err == nil {
|
||
|
cs.SystemMode = np.StatsSysCPU.Percent(cpuStats.System * float64(time.Second))
|
||
|
cs.UserMode = np.StatsUserCPU.Percent(cpuStats.User * float64(time.Second))
|
||
|
cs.Measured = ExecutorBasicMeasuredCpuStats
|
||
|
|
||
|
// calculate cpu usage percent
|
||
|
cs.Percent = np.StatsTotalCPU.Percent(cpuStats.Total() * float64(time.Second))
|
||
|
}
|
||
|
stats[strconv.Itoa(pid)] = &drivers.ResourceUsage{MemoryStats: ms, CpuStats: cs}
|
||
|
}
|
||
|
|
||
|
return stats, nil
|
||
|
}
|
||
|
|
||
|
// aggregatedResourceUsage aggregates the resource usage of all the pids and
|
||
|
// returns a TaskResourceUsage data point
|
||
|
func aggregatedResourceUsage(systemCpuStats *stats.CpuStats, pidStats map[string]*drivers.ResourceUsage) *drivers.TaskResourceUsage {
|
||
|
ts := time.Now().UTC().UnixNano()
|
||
|
var (
|
||
|
systemModeCPU, userModeCPU, percent float64
|
||
|
totalRSS, totalSwap uint64
|
||
|
)
|
||
|
|
||
|
for _, pidStat := range pidStats {
|
||
|
systemModeCPU += pidStat.CpuStats.SystemMode
|
||
|
userModeCPU += pidStat.CpuStats.UserMode
|
||
|
percent += pidStat.CpuStats.Percent
|
||
|
|
||
|
totalRSS += pidStat.MemoryStats.RSS
|
||
|
totalSwap += pidStat.MemoryStats.Swap
|
||
|
}
|
||
|
|
||
|
totalCPU := &drivers.CpuStats{
|
||
|
SystemMode: systemModeCPU,
|
||
|
UserMode: userModeCPU,
|
||
|
Percent: percent,
|
||
|
Measured: ExecutorBasicMeasuredCpuStats,
|
||
|
TotalTicks: systemCpuStats.TicksConsumed(percent),
|
||
|
}
|
||
|
|
||
|
totalMemory := &drivers.MemoryStats{
|
||
|
RSS: totalRSS,
|
||
|
Swap: totalSwap,
|
||
|
Measured: ExecutorBasicMeasuredMemStats,
|
||
|
}
|
||
|
|
||
|
resourceUsage := drivers.ResourceUsage{
|
||
|
MemoryStats: totalMemory,
|
||
|
CpuStats: totalCPU,
|
||
|
}
|
||
|
return &drivers.TaskResourceUsage{
|
||
|
ResourceUsage: &resourceUsage,
|
||
|
Timestamp: ts,
|
||
|
Pids: pidStats,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func getAllPidsByScanning() (resources.PIDs, error) {
|
||
|
allProcesses, err := ps.Processes()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return scanPids(os.Getpid(), allProcesses)
|
||
|
}
|