refactor
This commit is contained in:
parent
d9facbb79c
commit
a0b05a6c2d
12 changed files with 502 additions and 421 deletions
0
bin/alba.go
Normal file
0
bin/alba.go
Normal file
23
bin/albatros.go
Normal file
23
bin/albatros.go
Normal file
|
@ -0,0 +1,23 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"git.deuxfleurs.fr/deuxfleurs/albatros/pkg"
|
||||
)
|
||||
|
||||
func main() {
|
||||
config := pkg.BuildConfig()
|
||||
cluster := pkg.NewCluster(&config)
|
||||
|
||||
// init webserver
|
||||
http.Handle("/hook", pkg.HookHandler{&config, &cluster})
|
||||
http.Handle("/build", pkg.BuildHandler{&config, &cluster})
|
||||
|
||||
// launch
|
||||
log.Println("Albatros listen on :8080")
|
||||
if err := http.ListenAndServe(":8080", nil); err != nil {
|
||||
log.Fatal("Can't start HTTP server")
|
||||
}
|
||||
}
|
13
docker-compose.yml
Normal file
13
docker-compose.yml
Normal file
|
@ -0,0 +1,13 @@
|
|||
version: '3.4'
|
||||
services:
|
||||
nomad:
|
||||
image: nomad
|
||||
command: nomad
|
||||
consul:
|
||||
image: consul
|
||||
albatros:
|
||||
image: dxflrs/albatros:4c8ea6cec9a0645825d2323465784ba3fe4ebf12
|
||||
ports:
|
||||
- "8080:8080"
|
||||
environment:
|
||||
- ALBATROS_URL="http://localhost:8080"
|
421
main.go
421
main.go
|
@ -1,421 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"code.gitea.io/sdk/gitea"
|
||||
b64 "encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/caarlos0/env/v7"
|
||||
consul "github.com/hashicorp/consul/api"
|
||||
nomad "github.com/hashicorp/nomad/api"
|
||||
"golang.org/x/exp/slices"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
/* Albatros CI */
|
||||
|
||||
type GitUser struct {
|
||||
Name string `json:"name"`
|
||||
Email string `json:"email"`
|
||||
Username string `json:username"`
|
||||
}
|
||||
|
||||
type GiteaCommit struct {
|
||||
Id string `json:"id"`
|
||||
Message string `json:"message"`
|
||||
Url string `json:"string"`
|
||||
Author GitUser `json:"author"`
|
||||
Committer GitUser `json:"committer"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
}
|
||||
|
||||
type GiteaAccount struct {
|
||||
Id int64 `json:"id"`
|
||||
Login string `json:"login"`
|
||||
FullName string `json:"full_name"`
|
||||
Email string `json:"email"`
|
||||
AvatarUrl string `json:"avatar_url"`
|
||||
Username string `json:"username"`
|
||||
}
|
||||
|
||||
type GiteaRepository struct {
|
||||
Id int64 `json:"id"`
|
||||
Owner GiteaAccount `json:"owner"`
|
||||
Name string `json:"name"`
|
||||
FullName string `json:"full_name"`
|
||||
Description string `json:"description"`
|
||||
Private bool `json:"private"`
|
||||
Fork bool `json:"private"`
|
||||
HtmlUrl string `json:"html_url"`
|
||||
SshUrl string `json:"ssh_url"`
|
||||
CloneUrl string `json:"clone_url"`
|
||||
Website string `json:"website"`
|
||||
StarsCount int64 `json:"stars_count"`
|
||||
ForksCount int64 `json:"forks_count"`
|
||||
WatchersCount int64 `json:"watchers_count"`
|
||||
OpenIssuesCount int64 `json:"open_issues_count"`
|
||||
DefaultBranch string `json:"default_branch"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
|
||||
type GiteaNotification struct {
|
||||
Secret string `json:"secret"`
|
||||
Ref string `json:"ref"`
|
||||
Before string `json:"before"`
|
||||
After string `json:"after"`
|
||||
CompareUrl string `json:"compare_url"`
|
||||
Commits []GiteaCommit `json:"commits"`
|
||||
Repository GiteaRepository `json:"repository"`
|
||||
Pusher GiteaAccount `json:"pusher"`
|
||||
Sender GiteaAccount `json:"sender"`
|
||||
}
|
||||
|
||||
type SecretHook struct {
|
||||
Token string `json:"token"`
|
||||
}
|
||||
|
||||
type SecretGitea struct {
|
||||
Url string `json:"url"`
|
||||
Token string `json:"token"`
|
||||
}
|
||||
|
||||
type SecretTrusted struct {
|
||||
Senders []string `json:"senders"`
|
||||
}
|
||||
|
||||
type ConsulSecret struct {
|
||||
Hook SecretHook `json:"hook"`
|
||||
Gitea SecretGitea `json:"gitea"`
|
||||
Trusted SecretTrusted `json:"trusted"`
|
||||
Inject string `json:"inject"`
|
||||
}
|
||||
|
||||
func nomadToGiteaStatus(summary *nomad.TaskGroupSummary) gitea.StatusState {
|
||||
if summary.Failed > 0 {
|
||||
return gitea.StatusError
|
||||
}
|
||||
if summary.Lost > 0 || summary.Unknown > 0 {
|
||||
return gitea.StatusFailure
|
||||
}
|
||||
if summary.Queued > 0 || summary.Starting > 0 || summary.Running > 0 {
|
||||
return gitea.StatusPending
|
||||
}
|
||||
if summary.Complete > 0 {
|
||||
return gitea.StatusSuccess
|
||||
}
|
||||
// When the job is just started, all the counters are = 0.
|
||||
return gitea.StatusPending
|
||||
}
|
||||
|
||||
func notifSummary(notification *GiteaNotification) string {
|
||||
return fmt.Sprintf("%s/%s:%s", notification.Repository.Owner.Username, notification.Repository.Name, notification.After)
|
||||
}
|
||||
|
||||
func lifecycle(notification *GiteaNotification, dispatch *nomad.JobDispatchResponse, giteaCreds *SecretGitea, flavor string) {
|
||||
notifInfo := notifSummary(notification)
|
||||
|
||||
log.Printf("[lifecycle] Commit to build: %s, Gitea URL: %s\n", notifInfo, giteaCreds.Url)
|
||||
// init Gitea
|
||||
forge, err := gitea.NewClient(giteaCreds.Url, gitea.SetToken(giteaCreds.Token))
|
||||
if err != nil {
|
||||
log.Printf("Unable to create gitea client for %s: %+v\n", notifInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
// get job's deployment
|
||||
jobs := NomadClient.Jobs()
|
||||
queryOpt := nomad.QueryOptions{
|
||||
AllowStale: false,
|
||||
}
|
||||
|
||||
safeguard := 1000
|
||||
for ; safeguard > 0; safeguard-- {
|
||||
// Blocking fetch on deployment info
|
||||
job, meta, err := jobs.Summary(dispatch.DispatchedJobID, &queryOpt)
|
||||
if err != nil {
|
||||
log.Printf("[lifecycle] can't fetch job for %s: %+v\n", notifInfo, err)
|
||||
break
|
||||
}
|
||||
queryOpt.WaitIndex = meta.LastIndex
|
||||
|
||||
summary, ok := job.Summary["runner"]
|
||||
if !ok {
|
||||
log.Printf("[lifecycle] Job %s for %s must contain a 'runner' task\n", job.JobID, notifInfo)
|
||||
break
|
||||
}
|
||||
log.Printf("[lifecycle] Task status for job %s on %s: %+v\n", job.JobID, notifInfo, summary)
|
||||
|
||||
// Compute new job state
|
||||
state := nomadToGiteaStatus(&summary)
|
||||
|
||||
// Try updating Gitea commit status
|
||||
_, _, err = forge.CreateStatus(
|
||||
notification.Repository.Owner.Username,
|
||||
notification.Repository.Name,
|
||||
notification.After,
|
||||
gitea.CreateStatusOption{
|
||||
State: state,
|
||||
TargetURL: GlobalConfig.AlbatrosURL + "/build?log=stderr&job=" + dispatch.DispatchedJobID,
|
||||
Description: flavor,
|
||||
Context: "Albatros",
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Printf("[lifecycle] can't update gitea repo %s for job %s: %+v\n", notifInfo, job.JobID, err)
|
||||
}
|
||||
|
||||
// Continue the loop only if the job is pending
|
||||
if state != gitea.StatusPending {
|
||||
log.Printf("Job %s for %s terminated with status %s\n", job.JobID, notifInfo, state)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if safeguard == 0 {
|
||||
// To avoid dangerous infinite loops, we put an upperbound here
|
||||
// of 1k refresh here. Reaching this limit will allow us to know
|
||||
// that something did not work as expected...
|
||||
log.Printf("!!! [lifecycle] we refreshed 1k times the job of %s and it's still running, giving up...\n", notifInfo)
|
||||
}
|
||||
}
|
||||
|
||||
func hook(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "Hook only support POST requests", http.StatusBadRequest)
|
||||
}
|
||||
|
||||
q := r.URL.Query()
|
||||
maybeToken, ok := q["token"]
|
||||
if !ok || len(maybeToken) < 1 {
|
||||
http.Error(w, "Missing query parameter 'token'. Try adding '?token=xxx'", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
token := maybeToken[0]
|
||||
flavor := "default"
|
||||
|
||||
var notification GiteaNotification
|
||||
dec := json.NewDecoder(r.Body)
|
||||
if err := dec.Decode(¬ification); err != nil {
|
||||
http.Error(w, "Can't parse your request JSON", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
notifInfo := notifSummary(¬ification)
|
||||
|
||||
log.Printf("Received gitea notification for %s\n", notifInfo)
|
||||
|
||||
// Fetch our repo descriptor
|
||||
kv := ConsulClient.KV()
|
||||
encodedRepoUrl := b64.StdEncoding.EncodeToString([]byte(notification.Repository.CloneUrl))
|
||||
key := "albatros/" + encodedRepoUrl
|
||||
log.Printf("Fetching key %s for %s\n", key, notifInfo)
|
||||
pair, _, err := kv.Get(key, nil)
|
||||
if err != nil {
|
||||
http.Error(w, "Can't fetch the repo descriptor in Consul", http.StatusInternalServerError)
|
||||
log.Printf("Job error for %s, can't fetch repo descriptor in consul: %+v\n", notifInfo, err)
|
||||
return
|
||||
}
|
||||
if pair == nil || pair.Value == nil {
|
||||
http.Error(w, "You must declare %s in Consul in order to build it", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
// Parse our repo descriptor
|
||||
var repoDesc ConsulSecret
|
||||
if err = json.Unmarshal(pair.Value, &repoDesc); err != nil {
|
||||
http.Error(w, "Can't decode your Consul configuration for this repo", http.StatusInternalServerError)
|
||||
log.Printf("Job error for %s, can't unmarshal the Consul descriptor: %+v\n", notifInfo, err)
|
||||
return
|
||||
}
|
||||
// Check token
|
||||
if repoDesc.Hook.Token != token {
|
||||
http.Error(w, "Your albatros token does not match the one declared in Consul", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
// Build job parameters for Nomad
|
||||
meta := map[string]string{
|
||||
"REPO_URL": notification.Repository.CloneUrl,
|
||||
"COMMIT": notification.After,
|
||||
"FLAVOR": flavor,
|
||||
// @FIXME: this code is not correct, this is a hack
|
||||
"BRANCH": strings.ReplaceAll(notification.Ref, "refs/heads/", ""),
|
||||
}
|
||||
|
||||
// Check sender
|
||||
payload := []byte{}
|
||||
if slices.Contains(repoDesc.Trusted.Senders, notification.Sender.Username) {
|
||||
log.Printf("Trusted build of %s as %s in the list of allowed senders, inject secrets\n", notifInfo, notification.Sender.Username)
|
||||
// Write payload
|
||||
payload = []byte(repoDesc.Inject)
|
||||
}
|
||||
|
||||
jobs := NomadClient.Jobs()
|
||||
dres, _, err := jobs.Dispatch("builder", meta, payload, "albatros", &nomad.WriteOptions{})
|
||||
if err != nil {
|
||||
http.Error(w, "Can't submit your job to Nomad", http.StatusInternalServerError)
|
||||
}
|
||||
log.Printf("Created job %s for %s\n", dres.DispatchedJobID, notifInfo)
|
||||
|
||||
// Start a lifecycle observer to update gitea status
|
||||
go lifecycle(¬ification, dres, &repoDesc.Gitea, flavor)
|
||||
|
||||
io.WriteString(w, dres.DispatchedJobID)
|
||||
}
|
||||
|
||||
func build(w http.ResponseWriter, r *http.Request) {
|
||||
q := r.URL.Query()
|
||||
jobID, ok := q["job"]
|
||||
if !ok || len(jobID) < 1 {
|
||||
http.Error(w, "Missing query parameter 'job'. Try adding '?job=xxx'", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
logType, ok := q["log"]
|
||||
log.Printf("Follow logs for %s\n", jobID)
|
||||
if !ok || len(logType) < 1 || !(logType[0] == "stdout" || logType[0] == "stderr") {
|
||||
http.Error(w, "Missing or wrong query parameter 'log'.\nTry adding '?log=stdout' or '?log=stderr'", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
logFilter := logType[0]
|
||||
|
||||
jobs := NomadClient.Jobs()
|
||||
allocList, _, err := jobs.Allocations(jobID[0], true, &nomad.QueryOptions{})
|
||||
if err != nil {
|
||||
http.Error(w, "Unable to fetch your job on Nomad. Might be garbage collected.", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if len(allocList) < 1 {
|
||||
http.Error(w, "Job does not contain at least 1 allocation, can't display logs. Job might be garbage collected. ", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
myAllocStub := allocList[0]
|
||||
|
||||
allocations := NomadClient.Allocations()
|
||||
myAlloc, _, err := allocations.Info(myAllocStub.ID, &nomad.QueryOptions{})
|
||||
if err != nil {
|
||||
http.Error(w, "Allocation does not exist anymore. Allocation might be garbage collected", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
log.Printf("Selected alloc %s for job %s\n", myAlloc.ID, jobID)
|
||||
|
||||
allocFS := NomadClient.AllocFS()
|
||||
scancel := make(chan struct{})
|
||||
sframe, serr := allocFS.Logs(myAlloc, true, "executor", logFilter, "start", 0, scancel, &nomad.QueryOptions{})
|
||||
|
||||
// stream logs to client's browser
|
||||
build_loop:
|
||||
for {
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
// client disconnect, cleaning
|
||||
break build_loop
|
||||
case nomadErr := <-serr:
|
||||
// an error occured in nomad, inform user and clean
|
||||
_, _ = io.WriteString(w, fmt.Sprintf("\nBroken stream: %+v\n", nomadErr))
|
||||
break build_loop
|
||||
case chunk := <-sframe:
|
||||
// we get some data from nomad, send it to the client
|
||||
for i := 0; i < len(chunk.Data); {
|
||||
written, err := w.Write(chunk.Data[i:])
|
||||
w.(http.Flusher).Flush() // flush the buffer to send it right now
|
||||
i += written
|
||||
if err == io.EOF {
|
||||
_, _ = io.WriteString(w, "End of file :-)")
|
||||
break build_loop
|
||||
} else if err != nil {
|
||||
_, _ = io.WriteString(w, fmt.Sprintf("\nBroken stream: %+v\n", err))
|
||||
break build_loop
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("Connection closed, cleaning listeners for %s\n", jobID)
|
||||
scancel <- struct{}{}
|
||||
}
|
||||
|
||||
var NomadClient *nomad.Client
|
||||
var ConsulClient *consul.Client
|
||||
|
||||
type config struct {
|
||||
AlbatrosURL string `env:"ALBATROS_URL,required"`
|
||||
NomadAddr string `env:"NOMAD_ADDR"`
|
||||
NomadClientCert string `env:"NOMAD_CLIENT_CERT"`
|
||||
NomadClientKey string `env:"NOMAD_CLIENT_KEY"`
|
||||
NomadCACert string `env:"NOMAD_CACERT"`
|
||||
ConsulAddr string `env:"CONSUL_HTTP_ADDR"`
|
||||
ConsulClientCert string `env:"CONSUL_CLIENT_CERT"`
|
||||
ConsulClientKey string `env:"CONSUL_CLIENT_KEY"`
|
||||
ConsulCACert string `env:"CONSUL_CACERT"`
|
||||
}
|
||||
|
||||
var GlobalConfig config
|
||||
|
||||
func initConfig() {
|
||||
if err := env.Parse(&GlobalConfig); err != nil {
|
||||
log.Fatal(fmt.Sprintf("unable to parse config, error: %+v\n", err))
|
||||
}
|
||||
log.Printf("Albatros public URL: %s\n", GlobalConfig.AlbatrosURL)
|
||||
if GlobalConfig.NomadAddr != "" {
|
||||
isTLS := GlobalConfig.NomadClientCert != "" && GlobalConfig.NomadClientKey != "" && GlobalConfig.NomadCACert != ""
|
||||
log.Printf("Nomad URL: %s, TLS: %t\n", GlobalConfig.NomadAddr, isTLS)
|
||||
} else {
|
||||
log.Println("Use Nomad default configuration")
|
||||
}
|
||||
if GlobalConfig.ConsulAddr != "" {
|
||||
isTLS := GlobalConfig.ConsulClientCert != "" && GlobalConfig.ConsulClientKey != "" && GlobalConfig.ConsulCACert != ""
|
||||
log.Printf("Consul URL: %s, TLS: %t\n", GlobalConfig.ConsulAddr, isTLS)
|
||||
} else {
|
||||
log.Println("Use Consul default configuration")
|
||||
}
|
||||
}
|
||||
|
||||
func initNomad() {
|
||||
var err error
|
||||
nomadConfig := nomad.DefaultConfig()
|
||||
nomadConfig.Namespace = "ci"
|
||||
nomadConfig.Address = GlobalConfig.NomadAddr
|
||||
nomadConfig.TLSConfig.CACert = GlobalConfig.NomadCACert
|
||||
nomadConfig.TLSConfig.ClientCert = GlobalConfig.NomadClientCert
|
||||
nomadConfig.TLSConfig.ClientKey = GlobalConfig.NomadClientKey
|
||||
NomadClient, err = nomad.NewClient(nomadConfig)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to connect to Nomad, check your config and setup")
|
||||
}
|
||||
}
|
||||
|
||||
func initConsul() {
|
||||
var err error
|
||||
consulConfig := consul.DefaultConfig()
|
||||
consulConfig.Address = GlobalConfig.ConsulAddr
|
||||
consulConfig.TLSConfig.CAFile = GlobalConfig.ConsulCACert
|
||||
consulConfig.TLSConfig.CertFile = GlobalConfig.ConsulClientCert
|
||||
consulConfig.TLSConfig.KeyFile = GlobalConfig.ConsulClientKey
|
||||
ConsulClient, err = consul.NewClient(consulConfig)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to connect to Consul, check your config and setup")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
var err error
|
||||
// init components.
|
||||
initConfig()
|
||||
initNomad()
|
||||
initConsul()
|
||||
|
||||
// init webserver
|
||||
http.HandleFunc("/hook", hook)
|
||||
http.HandleFunc("/build", build)
|
||||
|
||||
// launch
|
||||
log.Println("Albatros listen on :8080")
|
||||
if err = http.ListenAndServe(":8080", nil); err != nil {
|
||||
log.Fatal("Can't start HTTP server")
|
||||
}
|
||||
}
|
46
pkg/cluster.go
Normal file
46
pkg/cluster.go
Normal file
|
@ -0,0 +1,46 @@
|
|||
package pkg
|
||||
|
||||
import (
|
||||
"log"
|
||||
consul "github.com/hashicorp/consul/api"
|
||||
nomad "github.com/hashicorp/nomad/api"
|
||||
)
|
||||
|
||||
|
||||
type Cluster struct {
|
||||
Nomad *nomad.Client
|
||||
Consul *consul.Client
|
||||
}
|
||||
|
||||
func NewCluster(conf* Config) Cluster {
|
||||
cluster := Cluster{}
|
||||
|
||||
// Init Nomad
|
||||
nomadConfig := nomad.DefaultConfig()
|
||||
nomadConfig.Namespace = "ci"
|
||||
nomadConfig.Address = conf.NomadAddr
|
||||
nomadConfig.TLSConfig.CACert = conf.NomadCACert
|
||||
nomadConfig.TLSConfig.ClientCert = conf.NomadClientCert
|
||||
nomadConfig.TLSConfig.ClientKey = conf.NomadClientKey
|
||||
|
||||
ncli, err := nomad.NewClient(nomadConfig)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to connect to Nomad, check your config and setup")
|
||||
}
|
||||
cluster.Nomad = ncli
|
||||
|
||||
// Init Consul
|
||||
consulConfig := consul.DefaultConfig()
|
||||
consulConfig.Address = conf.ConsulAddr
|
||||
consulConfig.TLSConfig.CAFile = conf.ConsulCACert
|
||||
consulConfig.TLSConfig.CertFile = conf.ConsulClientCert
|
||||
consulConfig.TLSConfig.KeyFile = conf.ConsulClientKey
|
||||
|
||||
ccli, err := consul.NewClient(consulConfig)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to connect to Consul, check your config and setup")
|
||||
}
|
||||
cluster.Consul = ccli
|
||||
|
||||
return cluster
|
||||
}
|
40
pkg/config.go
Normal file
40
pkg/config.go
Normal file
|
@ -0,0 +1,40 @@
|
|||
package pkg
|
||||
|
||||
import (
|
||||
"log"
|
||||
"fmt"
|
||||
"github.com/caarlos0/env/v7"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
AlbatrosURL string `env:"ALBATROS_URL,required"`
|
||||
NomadAddr string `env:"NOMAD_ADDR"`
|
||||
NomadClientCert string `env:"NOMAD_CLIENT_CERT"`
|
||||
NomadClientKey string `env:"NOMAD_CLIENT_KEY"`
|
||||
NomadCACert string `env:"NOMAD_CACERT"`
|
||||
ConsulAddr string `env:"CONSUL_HTTP_ADDR"`
|
||||
ConsulClientCert string `env:"CONSUL_CLIENT_CERT"`
|
||||
ConsulClientKey string `env:"CONSUL_CLIENT_KEY"`
|
||||
ConsulCACert string `env:"CONSUL_CACERT"`
|
||||
}
|
||||
|
||||
func BuildConfig() Config {
|
||||
GlobalConfig := Config{}
|
||||
if err := env.Parse(&GlobalConfig); err != nil {
|
||||
log.Fatal(fmt.Sprintf("unable to parse config, error: %+v\n", err))
|
||||
}
|
||||
log.Printf("Albatros public URL: %s\n", GlobalConfig.AlbatrosURL)
|
||||
if GlobalConfig.NomadAddr != "" {
|
||||
isTLS := GlobalConfig.NomadClientCert != "" && GlobalConfig.NomadClientKey != "" && GlobalConfig.NomadCACert != ""
|
||||
log.Printf("Nomad URL: %s, TLS: %t\n", GlobalConfig.NomadAddr, isTLS)
|
||||
} else {
|
||||
log.Println("Use Nomad default configuration")
|
||||
}
|
||||
if GlobalConfig.ConsulAddr != "" {
|
||||
isTLS := GlobalConfig.ConsulClientCert != "" && GlobalConfig.ConsulClientKey != "" && GlobalConfig.ConsulCACert != ""
|
||||
log.Printf("Consul URL: %s, TLS: %t\n", GlobalConfig.ConsulAddr, isTLS)
|
||||
} else {
|
||||
log.Println("Use Consul default configuration")
|
||||
}
|
||||
return GlobalConfig
|
||||
}
|
86
pkg/gitea.go
Normal file
86
pkg/gitea.go
Normal file
|
@ -0,0 +1,86 @@
|
|||
package pkg
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"code.gitea.io/sdk/gitea"
|
||||
nomad "github.com/hashicorp/nomad/api"
|
||||
)
|
||||
|
||||
type GitUser struct {
|
||||
Name string `json:"name"`
|
||||
Email string `json:"email"`
|
||||
Username string `json:username"`
|
||||
}
|
||||
|
||||
type GiteaCommit struct {
|
||||
Id string `json:"id"`
|
||||
Message string `json:"message"`
|
||||
Url string `json:"string"`
|
||||
Author GitUser `json:"author"`
|
||||
Committer GitUser `json:"committer"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
}
|
||||
|
||||
type GiteaAccount struct {
|
||||
Id int64 `json:"id"`
|
||||
Login string `json:"login"`
|
||||
FullName string `json:"full_name"`
|
||||
Email string `json:"email"`
|
||||
AvatarUrl string `json:"avatar_url"`
|
||||
Username string `json:"username"`
|
||||
}
|
||||
|
||||
type GiteaRepository struct {
|
||||
Id int64 `json:"id"`
|
||||
Owner GiteaAccount `json:"owner"`
|
||||
Name string `json:"name"`
|
||||
FullName string `json:"full_name"`
|
||||
Description string `json:"description"`
|
||||
Private bool `json:"private"`
|
||||
Fork bool `json:"private"`
|
||||
HtmlUrl string `json:"html_url"`
|
||||
SshUrl string `json:"ssh_url"`
|
||||
CloneUrl string `json:"clone_url"`
|
||||
Website string `json:"website"`
|
||||
StarsCount int64 `json:"stars_count"`
|
||||
ForksCount int64 `json:"forks_count"`
|
||||
WatchersCount int64 `json:"watchers_count"`
|
||||
OpenIssuesCount int64 `json:"open_issues_count"`
|
||||
DefaultBranch string `json:"default_branch"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
|
||||
type GiteaNotification struct {
|
||||
Secret string `json:"secret"`
|
||||
Ref string `json:"ref"`
|
||||
Before string `json:"before"`
|
||||
After string `json:"after"`
|
||||
CompareUrl string `json:"compare_url"`
|
||||
Commits []GiteaCommit `json:"commits"`
|
||||
Repository GiteaRepository `json:"repository"`
|
||||
Pusher GiteaAccount `json:"pusher"`
|
||||
Sender GiteaAccount `json:"sender"`
|
||||
}
|
||||
|
||||
func nomadToGiteaStatus(summary *nomad.TaskGroupSummary) gitea.StatusState {
|
||||
if summary.Failed > 0 {
|
||||
return gitea.StatusError
|
||||
}
|
||||
if summary.Lost > 0 || summary.Unknown > 0 {
|
||||
return gitea.StatusFailure
|
||||
}
|
||||
if summary.Queued > 0 || summary.Starting > 0 || summary.Running > 0 {
|
||||
return gitea.StatusPending
|
||||
}
|
||||
if summary.Complete > 0 {
|
||||
return gitea.StatusSuccess
|
||||
}
|
||||
// When the job is just started, all the counters are = 0.
|
||||
return gitea.StatusPending
|
||||
}
|
||||
|
||||
func notifSummary(notification *GiteaNotification) string {
|
||||
return fmt.Sprintf("%s/%s:%s", notification.Repository.Owner.Username, notification.Repository.Name, notification.After)
|
||||
}
|
189
pkg/handler.go
Normal file
189
pkg/handler.go
Normal file
|
@ -0,0 +1,189 @@
|
|||
package pkg
|
||||
|
||||
import (
|
||||
b64 "encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"golang.org/x/exp/slices"
|
||||
"strings"
|
||||
|
||||
nomad "github.com/hashicorp/nomad/api"
|
||||
)
|
||||
|
||||
type HookHandler struct {
|
||||
Config *Config
|
||||
Cluster *Cluster
|
||||
}
|
||||
|
||||
type BuildHandler struct {
|
||||
Config *Config
|
||||
Cluster *Cluster
|
||||
}
|
||||
|
||||
func (h HookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "Hook only support POST requests", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
q := r.URL.Query()
|
||||
maybeToken, ok := q["token"]
|
||||
if !ok || len(maybeToken) < 1 {
|
||||
http.Error(w, "Missing query parameter 'token'. Try adding '?token=xxx'", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
token := maybeToken[0]
|
||||
flavor := "default"
|
||||
|
||||
var notification GiteaNotification
|
||||
dec := json.NewDecoder(r.Body)
|
||||
if err := dec.Decode(¬ification); err != nil {
|
||||
http.Error(w, "Can't parse your request JSON", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
notifInfo := notifSummary(¬ification)
|
||||
|
||||
log.Printf("Received gitea notification for %s\n", notifInfo)
|
||||
|
||||
// Fetch our repo descriptor
|
||||
kv := h.Cluster.Consul.KV()
|
||||
encodedRepoUrl := b64.StdEncoding.EncodeToString([]byte(notification.Repository.CloneUrl))
|
||||
key := "albatros/" + encodedRepoUrl
|
||||
log.Printf("Fetching key %s for %s\n", key, notifInfo)
|
||||
pair, _, err := kv.Get(key, nil)
|
||||
if err != nil {
|
||||
http.Error(w, "Can't fetch the repo descriptor in Consul", http.StatusInternalServerError)
|
||||
log.Printf("Job error for %s, can't fetch repo descriptor in consul: %+v\n", notifInfo, err)
|
||||
return
|
||||
}
|
||||
if pair == nil || pair.Value == nil {
|
||||
http.Error(w, "You must declare %s in Consul in order to build it", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
// Parse our repo descriptor
|
||||
var repoDesc ConsulSecret
|
||||
if err = json.Unmarshal(pair.Value, &repoDesc); err != nil {
|
||||
http.Error(w, "Can't decode your Consul configuration for this repo", http.StatusInternalServerError)
|
||||
log.Printf("Job error for %s, can't unmarshal the Consul descriptor: %+v\n", notifInfo, err)
|
||||
return
|
||||
}
|
||||
// Check token
|
||||
if repoDesc.Hook.Token != token {
|
||||
http.Error(w, "Your albatros token does not match the one declared in Consul", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
// Build job parameters for Nomad
|
||||
meta := map[string]string{
|
||||
"REPO_URL": notification.Repository.CloneUrl,
|
||||
"COMMIT": notification.After,
|
||||
"FLAVOR": flavor,
|
||||
// @FIXME: this code is not correct, this is a hack
|
||||
"BRANCH": strings.ReplaceAll(notification.Ref, "refs/heads/", ""),
|
||||
}
|
||||
|
||||
// Check sender
|
||||
payload := []byte{}
|
||||
if slices.Contains(repoDesc.Trusted.Senders, notification.Sender.Username) {
|
||||
log.Printf("Trusted build of %s as %s in the list of allowed senders, inject secrets\n", notifInfo, notification.Sender.Username)
|
||||
// Write payload
|
||||
payload = []byte(repoDesc.Inject)
|
||||
}
|
||||
|
||||
jobs := h.Cluster.Nomad.Jobs()
|
||||
dres, _, err := jobs.Dispatch("builder", meta, payload, "albatros", &nomad.WriteOptions{})
|
||||
if err != nil {
|
||||
http.Error(w, "Can't submit your job to Nomad", http.StatusInternalServerError)
|
||||
}
|
||||
log.Printf("Created job %s for %s\n", dres.DispatchedJobID, notifInfo)
|
||||
|
||||
// Start a lifecycle observer to update gitea status
|
||||
j := Job {
|
||||
Cluster: h.Cluster,
|
||||
Config: h.Config,
|
||||
Notification: ¬ification,
|
||||
Dispatch: dres,
|
||||
Creds: &repoDesc.Gitea,
|
||||
Flavor: flavor,
|
||||
}
|
||||
go j.follow()
|
||||
|
||||
io.WriteString(w, dres.DispatchedJobID)
|
||||
}
|
||||
|
||||
func (b BuildHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
q := r.URL.Query()
|
||||
jobID, ok := q["job"]
|
||||
if !ok || len(jobID) < 1 {
|
||||
http.Error(w, "Missing query parameter 'job'. Try adding '?job=xxx'", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
logType, ok := q["log"]
|
||||
log.Printf("Follow logs for %s\n", jobID)
|
||||
if !ok || len(logType) < 1 || !(logType[0] == "stdout" || logType[0] == "stderr") {
|
||||
http.Error(w, "Missing or wrong query parameter 'log'.\nTry adding '?log=stdout' or '?log=stderr'", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
logFilter := logType[0]
|
||||
|
||||
jobs := b.Cluster.Nomad.Jobs()
|
||||
allocList, _, err := jobs.Allocations(jobID[0], true, &nomad.QueryOptions{})
|
||||
if err != nil {
|
||||
http.Error(w, "Unable to fetch your job on Nomad. Might be garbage collected.", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if len(allocList) < 1 {
|
||||
http.Error(w, "Job does not contain at least 1 allocation, can't display logs. Job might be garbage collected. ", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
myAllocStub := allocList[0]
|
||||
|
||||
allocations := b.Cluster.Nomad.Allocations()
|
||||
myAlloc, _, err := allocations.Info(myAllocStub.ID, &nomad.QueryOptions{})
|
||||
if err != nil {
|
||||
http.Error(w, "Allocation does not exist anymore. Allocation might be garbage collected", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
log.Printf("Selected alloc %s for job %s\n", myAlloc.ID, jobID)
|
||||
|
||||
allocFS := b.Cluster.Nomad.AllocFS()
|
||||
scancel := make(chan struct{})
|
||||
sframe, serr := allocFS.Logs(myAlloc, true, "executor", logFilter, "start", 0, scancel, &nomad.QueryOptions{})
|
||||
|
||||
// stream logs to client's browser
|
||||
io.WriteString(w, fmt.Sprintf("--- Streaming logs for %s, job can take some times to start, be patient ---\n", jobID))
|
||||
build_loop:
|
||||
for {
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
// client disconnect, cleaning
|
||||
break build_loop
|
||||
case nomadErr := <-serr:
|
||||
// an error occured in nomad, inform user and clean
|
||||
_, _ = io.WriteString(w, fmt.Sprintf("\n--- Broken stream: %+v ---\n", nomadErr))
|
||||
break build_loop
|
||||
case chunk := <-sframe:
|
||||
// we get some data from nomad, send it to the client
|
||||
for i := 0; i < len(chunk.Data); {
|
||||
written, err := w.Write(chunk.Data[i:])
|
||||
w.(http.Flusher).Flush() // flush the buffer to send it right now
|
||||
i += written
|
||||
if err == io.EOF {
|
||||
_, _ = io.WriteString(w, "\n--- End of file :-) ---\n")
|
||||
break build_loop
|
||||
} else if err != nil {
|
||||
_, _ = io.WriteString(w, fmt.Sprintf("\n--- Broken stream: %+v ---\n", err))
|
||||
break build_loop
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("Connection closed, cleaning listeners for %s\n", jobID)
|
||||
scancel <- struct{}{}
|
||||
}
|
84
pkg/job.go
Normal file
84
pkg/job.go
Normal file
|
@ -0,0 +1,84 @@
|
|||
package pkg
|
||||
|
||||
import (
|
||||
"log"
|
||||
"code.gitea.io/sdk/gitea"
|
||||
nomad "github.com/hashicorp/nomad/api"
|
||||
)
|
||||
|
||||
type Job struct {
|
||||
Cluster *Cluster
|
||||
Config *Config
|
||||
Notification *GiteaNotification
|
||||
Dispatch *nomad.JobDispatchResponse
|
||||
Creds *SecretGitea
|
||||
Flavor string
|
||||
}
|
||||
|
||||
func (j *Job) follow() {
|
||||
notifInfo := notifSummary(j.Notification)
|
||||
|
||||
log.Printf("[lifecycle] Commit to build: %s, Gitea URL: %s\n", notifInfo, j.Creds.Url)
|
||||
// init Gitea
|
||||
forge, err := gitea.NewClient(j.Creds.Url, gitea.SetToken(j.Creds.Token))
|
||||
if err != nil {
|
||||
log.Printf("Unable to create gitea client for %s: %+v\n", notifInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
// get job's deployment
|
||||
jobs := j.Cluster.Nomad.Jobs()
|
||||
queryOpt := nomad.QueryOptions{
|
||||
AllowStale: false,
|
||||
}
|
||||
|
||||
safeguard := 1000
|
||||
for ; safeguard > 0; safeguard-- {
|
||||
// Blocking fetch on deployment info
|
||||
job, meta, err := jobs.Summary(j.Dispatch.DispatchedJobID, &queryOpt)
|
||||
if err != nil {
|
||||
log.Printf("[lifecycle] can't fetch job for %s: %+v\n", notifInfo, err)
|
||||
break
|
||||
}
|
||||
queryOpt.WaitIndex = meta.LastIndex
|
||||
|
||||
summary, ok := job.Summary["runner"]
|
||||
if !ok {
|
||||
log.Printf("[lifecycle] Job %s for %s must contain a 'runner' task\n", job.JobID, notifInfo)
|
||||
break
|
||||
}
|
||||
log.Printf("[lifecycle] Task status for job %s on %s: %+v\n", job.JobID, notifInfo, summary)
|
||||
|
||||
// Compute new job state
|
||||
state := nomadToGiteaStatus(&summary)
|
||||
|
||||
// Try updating Gitea commit status
|
||||
_, _, err = forge.CreateStatus(
|
||||
j.Notification.Repository.Owner.Username,
|
||||
j.Notification.Repository.Name,
|
||||
j.Notification.After,
|
||||
gitea.CreateStatusOption{
|
||||
State: state,
|
||||
TargetURL: j.Config.AlbatrosURL + "/build?log=stderr&job=" + j.Dispatch.DispatchedJobID,
|
||||
Description: j.Flavor,
|
||||
Context: "Albatros",
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Printf("[lifecycle] can't update gitea repo %s for job %s: %+v\n", notifInfo, job.JobID, err)
|
||||
}
|
||||
|
||||
// Continue the loop only if the job is pending
|
||||
if state != gitea.StatusPending {
|
||||
log.Printf("Job %s for %s terminated with status %s\n", job.JobID, notifInfo, state)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if safeguard == 0 {
|
||||
// To avoid dangerous infinite loops, we put an upperbound here
|
||||
// of 1k refresh here. Reaching this limit will allow us to know
|
||||
// that something did not work as expected...
|
||||
log.Printf("!!! [lifecycle] we refreshed 1k times the job of %s and it's still running, giving up...\n", notifInfo)
|
||||
}
|
||||
}
|
21
pkg/secret.go
Normal file
21
pkg/secret.go
Normal file
|
@ -0,0 +1,21 @@
|
|||
package pkg
|
||||
|
||||
type SecretHook struct {
|
||||
Token string `json:"token"`
|
||||
}
|
||||
|
||||
type SecretGitea struct {
|
||||
Url string `json:"url"`
|
||||
Token string `json:"token"`
|
||||
}
|
||||
|
||||
type SecretTrusted struct {
|
||||
Senders []string `json:"senders"`
|
||||
}
|
||||
|
||||
type ConsulSecret struct {
|
||||
Hook SecretHook `json:"hook"`
|
||||
Gitea SecretGitea `json:"gitea"`
|
||||
Trusted SecretTrusted `json:"trusted"`
|
||||
Inject string `json:"inject"`
|
||||
}
|
Loading…
Reference in a new issue