fmt + logging
All checks were successful
Albatros build

This commit is contained in:
Quentin 2023-03-15 19:59:20 +01:00
parent 6cddfb9a1f
commit d5b2292474
Signed by: quentin
GPG key ID: E9602264D639FF68

309
main.go
View file

@ -1,16 +1,17 @@
package main package main
import ( import (
b64 "encoding/base64" "code.gitea.io/sdk/gitea"
b64 "encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/caarlos0/env/v7"
consul "github.com/hashicorp/consul/api"
nomad "github.com/hashicorp/nomad/api" nomad "github.com/hashicorp/nomad/api"
consul "github.com/hashicorp/consul/api"
"io" "io"
"log" "log"
"net/http" "net/http"
"strings" "strings"
"github.com/caarlos0/env/v7"
"code.gitea.io/sdk/gitea"
) )
type GitUser struct { type GitUser struct {
@ -40,8 +41,8 @@ type GiteaAccount struct {
type GiteaRepository struct { type GiteaRepository struct {
Id int64 `json:"id"` Id int64 `json:"id"`
Owner GiteaAccount `json:"owner"` Owner GiteaAccount `json:"owner"`
Name string `json:"name"` Name string `json:"name"`
FullName string `json:"full_name"` FullName string `json:"full_name"`
Description string `json:"description"` Description string `json:"description"`
Private bool `json:"private"` Private bool `json:"private"`
Fork bool `json:"private"` Fork bool `json:"private"`
@ -71,111 +72,112 @@ type GiteaNotification struct {
} }
type SecretHook struct { type SecretHook struct {
Token string `json:"token"` Token string `json:"token"`
} }
type SecretGitea struct { type SecretGitea struct {
Url string `json:"url"` Url string `json:"url"`
Token string `json:"token"` Token string `json:"token"`
} }
type SecretTrusted struct { type SecretTrusted struct {
Senders []string `json:"senders"` Senders []string `json:"senders"`
} }
type ConsulSecret struct { type ConsulSecret struct {
Hook SecretHook `json:"hook"` Hook SecretHook `json:"hook"`
Gitea SecretGitea `json:"gitea"` Gitea SecretGitea `json:"gitea"`
Trusted SecretTrusted `json:"trusted"` Trusted SecretTrusted `json:"trusted"`
Inject string `json:"inject"` Inject string `json:"inject"`
} }
func nomadToGiteaStatus(summary *nomad.TaskGroupSummary) gitea.StatusState { func nomadToGiteaStatus(summary *nomad.TaskGroupSummary) gitea.StatusState {
if summary.Failed > 0 { if summary.Failed > 0 {
return gitea.StatusError return gitea.StatusError
} }
if summary.Lost > 0 || summary.Unknown > 0 { if summary.Lost > 0 || summary.Unknown > 0 {
return gitea.StatusFailure return gitea.StatusFailure
} }
if summary.Queued > 0 || summary.Starting > 0 || summary.Running > 0 { if summary.Queued > 0 || summary.Starting > 0 || summary.Running > 0 {
return gitea.StatusPending return gitea.StatusPending
} }
if summary.Complete > 0 { if summary.Complete > 0 {
return gitea.StatusSuccess return gitea.StatusSuccess
} }
// When the job is just started, all the counters are = 0. // When the job is just started, all the counters are = 0.
return gitea.StatusPending return gitea.StatusPending
}
func notifSummary(notification *GiteaNotification) string {
return fmt.Sprintf("%s/%s:%s", notification.Repository.Owner.Username, notification.Repository.Name, notification.After)
} }
func lifecycle(notification *GiteaNotification, dispatch *nomad.JobDispatchResponse, giteaCreds *SecretGitea) { func lifecycle(notification *GiteaNotification, dispatch *nomad.JobDispatchResponse, giteaCreds *SecretGitea) {
log.Printf("[lifecyle] Gitea URL: %s\n", giteaCreds.Url) notifInfo := notifSummary(notification)
// init Gitea
forge, err := gitea.NewClient(giteaCreds.Url, gitea.SetToken(giteaCreds.Token))
if err != nil {
log.Printf("Unable to create gitea client: %+v\n", err)
return
}
// get job's deployment log.Printf("[lifecyle] Commit to build: %s, Gitea URL: %s\n", notifInfo, giteaCreds.Url)
jobs := NomadClient.Jobs() // init Gitea
queryOpt := nomad.QueryOptions{ forge, err := gitea.NewClient(giteaCreds.Url, gitea.SetToken(giteaCreds.Token))
AllowStale: true, if err != nil {
} log.Printf("Unable to create gitea client for %s: %+v\n", notifInfo, err)
return
safeguard := 1000 }
for ; safeguard > 0; safeguard-- {
// Blocking fetch on deployment info
job, meta, err := jobs.Summary(dispatch.DispatchedJobID, &queryOpt)
if err != nil {
log.Printf("[lifecycle] can't fetch job: %+v\n", err)
break
}
queryOpt.WaitIndex = meta.LastIndex
summary, ok := job.Summary["runner"]; // get job's deployment
if !ok { jobs := NomadClient.Jobs()
log.Printf("[lifecycle] your job %s must contain a 'runner' task\n", job.JobID) queryOpt := nomad.QueryOptions{
break AllowStale: true,
} }
log.Printf("[lifecycle] Summary for job %s: %+v\n", job.JobID, summary)
// Compute new job state safeguard := 1000
state := nomadToGiteaStatus(&summary) for ; safeguard > 0; safeguard-- {
// Blocking fetch on deployment info
job, meta, err := jobs.Summary(dispatch.DispatchedJobID, &queryOpt)
if err != nil {
log.Printf("[lifecycle] can't fetch job for %s: %+v\n", notifInfo, err)
break
}
queryOpt.WaitIndex = meta.LastIndex
// Try updating Gitea commit status summary, ok := job.Summary["runner"]
_, _, err = forge.CreateStatus( if !ok {
notification.Repository.Owner.Username, log.Printf("[lifecycle] Job %s for %s must contain a 'runner' task\n", job.JobID, notifInfo)
notification.Repository.Name, break
notification.After, }
gitea.CreateStatusOption { log.Printf("[lifecycle] Task status for job %s on %s: %+v\n", job.JobID, notifInfo, summary)
State: state,
TargetURL: GlobalConfig.AlbatrosURL + "/build?log=stderr&job=" + dispatch.DispatchedJobID,
Description: "build",
Context: "Albatros",
})
if err != nil { // Compute new job state
log.Printf( state := nomadToGiteaStatus(&summary)
"[lifecycle] can't update gitea repo %s/%s:%s: %+v\n",
notification.Repository.Owner.Username,
notification.Repository.Name,
notification.After,
err)
}
// Continue the loop only if the job is pending // Try updating Gitea commit status
if state != gitea.StatusPending { _, _, err = forge.CreateStatus(
log.Printf("Job %s teminated with status %s\n", job.JobID, state) notification.Repository.Owner.Username,
break notification.Repository.Name,
} notification.After,
} gitea.CreateStatusOption{
State: state,
TargetURL: GlobalConfig.AlbatrosURL + "/build?log=stderr&job=" + dispatch.DispatchedJobID,
Description: "build",
Context: "Albatros",
})
if safeguard == 0 { if err != nil {
// To avoid dangerous infinite loops, we put an upperbound here log.Printf("[lifecycle] can't update gitea repo %s for job %s: %+v\n", notifInfo, job.JobID, err)
// of 1k refresh here. Reaching this limit will allow us to know }
// that something did not work as expected...
log.Println("!!! [lifecycle] we refreshed 1k times this deployment and it's still running, giving up...") // Continue the loop only if the job is pending
} if state != gitea.StatusPending {
log.Printf("Job %s for %s terminated with status %s\n", job.JobID, notifInfo, state)
break
}
}
if safeguard == 0 {
// To avoid dangerous infinite loops, we put an upperbound here
// of 1k refresh here. Reaching this limit will allow us to know
// that something did not work as expected...
log.Printf("!!! [lifecycle] we refreshed 1k times the job of %s and it's still running, giving up...\n", notifInfo)
}
} }
func hook(w http.ResponseWriter, r *http.Request) { func hook(w http.ResponseWriter, r *http.Request) {
@ -189,7 +191,7 @@ func hook(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Missing query parameter 'token'. Try adding '?token=xxx'", http.StatusBadRequest) http.Error(w, "Missing query parameter 'token'. Try adding '?token=xxx'", http.StatusBadRequest)
return return
} }
token := maybeToken[0] token := maybeToken[0]
flavor := "default" flavor := "default"
var notification GiteaNotification var notification GiteaNotification
@ -198,36 +200,37 @@ func hook(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Can't parse your request JSON", http.StatusBadRequest) http.Error(w, "Can't parse your request JSON", http.StatusBadRequest)
return return
} }
notifInfo := notifSummary(&notification)
log.Printf("Gitea notification: %+v\n", notification) log.Printf("Received gitea notification for %s\n", notifInfo)
// Fetch our repo descriptor // Fetch our repo descriptor
kv := ConsulClient.KV() kv := ConsulClient.KV()
encodedRepoUrl := b64.StdEncoding.EncodeToString([]byte(notification.Repository.CloneUrl)) encodedRepoUrl := b64.StdEncoding.EncodeToString([]byte(notification.Repository.CloneUrl))
key := "albatros/"+encodedRepoUrl key := "albatros/" + encodedRepoUrl
log.Printf("Fetching key %s\n", key) log.Printf("Fetching key %s for %s\n", key, notifInfo)
pair, _, err := kv.Get(key, nil) pair, _, err := kv.Get(key, nil)
if err != nil { if err != nil {
http.Error(w, "Can't fetch the repo descriptor in Consul", http.StatusInternalServerError) http.Error(w, "Can't fetch the repo descriptor in Consul", http.StatusInternalServerError)
return return
} }
if pair == nil || pair.Value == nil { if pair == nil || pair.Value == nil {
http.Error(w, "You must declare your repo in Consul in order to build it", http.StatusForbidden) http.Error(w, "You must declare %s in Consul in order to build it", http.StatusForbidden)
return return
} }
// Parse our repo descriptor // Parse our repo descriptor
var repoDesc ConsulSecret var repoDesc ConsulSecret
if err = json.Unmarshal(pair.Value, &repoDesc); err != nil { if err = json.Unmarshal(pair.Value, &repoDesc); err != nil {
http.Error(w, "Can't decode your Consul configuration for this repo", http.StatusInternalServerError) http.Error(w, "Can't decode your Consul configuration for this repo", http.StatusInternalServerError)
return return
} }
// Check token // Check token
if repoDesc.Hook.Token != token { if repoDesc.Hook.Token != token {
http.Error(w, "Your albatros token does not match the one declared in Consul", http.StatusForbidden) http.Error(w, "Your albatros token does not match the one declared in Consul", http.StatusForbidden)
return return
} }
// Build job parameters for Nomad // Build job parameters for Nomad
meta := map[string]string{ meta := map[string]string{
"REPO_URL": notification.Repository.CloneUrl, "REPO_URL": notification.Repository.CloneUrl,
"COMMIT": notification.After, "COMMIT": notification.After,
@ -241,16 +244,15 @@ func hook(w http.ResponseWriter, r *http.Request) {
// 2. Transform the consul object into a nomad payload // 2. Transform the consul object into a nomad payload
jobs := NomadClient.Jobs() jobs := NomadClient.Jobs()
dres, dmeta, err := jobs.Dispatch("builder", meta, []byte{}, "albatros", &nomad.WriteOptions{}) dres, _, err := jobs.Dispatch("builder", meta, []byte{}, "albatros", &nomad.WriteOptions{})
if err != nil { if err != nil {
http.Error(w, "Can't submit your job to Nomad", http.StatusInternalServerError) http.Error(w, "Can't submit your job to Nomad", http.StatusInternalServerError)
} }
log.Printf("Query info: %+v\n", dmeta) log.Printf("Created job %s for %s\n", dres.DispatchedJobID, notifInfo)
log.Printf("Job info: %+v\n", dres)
// Start a lifecycle observer to update gitea status // Start a lifecycle observer to update gitea status
// @FIXME: need to inject gitea descriptor // @FIXME: need to inject gitea descriptor
go lifecycle(&notification, dres, &repoDesc.Gitea) go lifecycle(&notification, dres, &repoDesc.Gitea)
io.WriteString(w, dres.DispatchedJobID) io.WriteString(w, dres.DispatchedJobID)
} }
@ -264,7 +266,7 @@ func build(w http.ResponseWriter, r *http.Request) {
} }
logType, ok := q["log"] logType, ok := q["log"]
log.Printf("%+v\n", q) log.Printf("Follow logs for %s\n", jobID)
if !ok || len(logType) < 1 || !(logType[0] == "stdout" || logType[0] == "stderr") { if !ok || len(logType) < 1 || !(logType[0] == "stdout" || logType[0] == "stderr") {
http.Error(w, "Missing or wrong query parameter 'log'.\nTry adding '?log=stdout' or '?log=stderr'", http.StatusBadRequest) http.Error(w, "Missing or wrong query parameter 'log'.\nTry adding '?log=stdout' or '?log=stderr'", http.StatusBadRequest)
return return
@ -290,28 +292,28 @@ func build(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Allocation does not exist anymore. Allocation might be garbage collected", http.StatusNotFound) http.Error(w, "Allocation does not exist anymore. Allocation might be garbage collected", http.StatusNotFound)
return return
} }
log.Printf("Alloc: %+v\n", myAlloc) log.Printf("Selected alloc %s for job %s\n", myAlloc.ID, jobID)
allocFS := NomadClient.AllocFS() allocFS := NomadClient.AllocFS()
scancel := make(chan struct{}) scancel := make(chan struct{})
sframe, serr := allocFS.Logs(myAlloc, true, "runner", logFilter, "start", 0, scancel, &nomad.QueryOptions{}) sframe, serr := allocFS.Logs(myAlloc, true, "runner", logFilter, "start", 0, scancel, &nomad.QueryOptions{})
// stream logs to client's browser // stream logs to client's browser
build_loop: build_loop:
for { for {
select { select {
case <- r.Context().Done(): case <-r.Context().Done():
// client disconnect, cleaning // client disconnect, cleaning
break build_loop break build_loop
case nomadErr := <-serr: case nomadErr := <-serr:
// an error occured in nomad, inform user and clean // an error occured in nomad, inform user and clean
_, _ = io.WriteString(w, fmt.Sprintf("\nBroken stream: %+v\n", nomadErr)) _, _ = io.WriteString(w, fmt.Sprintf("\nBroken stream: %+v\n", nomadErr))
break build_loop break build_loop
case chunk := <-sframe: case chunk := <-sframe:
// we get some data from nomad, send it to the client // we get some data from nomad, send it to the client
for i := 0; i < len(chunk.Data); { for i := 0; i < len(chunk.Data); {
written, err := w.Write(chunk.Data[i:]) written, err := w.Write(chunk.Data[i:])
w.(http.Flusher).Flush() // flush the buffer to send it right now w.(http.Flusher).Flush() // flush the buffer to send it right now
i += written i += written
if err == io.EOF { if err == io.EOF {
_, _ = io.WriteString(w, "End of file :-)") _, _ = io.WriteString(w, "End of file :-)")
@ -324,52 +326,53 @@ func build(w http.ResponseWriter, r *http.Request) {
} }
} }
log.Printf("Cleaning %+v\n", myAlloc) log.Printf("Connection closed, cleaning listeners for %s\n", jobID)
scancel <- struct{}{} scancel <- struct{}{}
} }
var NomadClient *nomad.Client var NomadClient *nomad.Client
var ConsulClient *consul.Client var ConsulClient *consul.Client
type config struct { type config struct {
AlbatrosURL string `env:"ALBATROS_URL,required"` AlbatrosURL string `env:"ALBATROS_URL,required"`
// @TODO get nomad config from env // @TODO get nomad config from env
// @TODO get consul config from env // @TODO get consul config from env
} }
var GlobalConfig config var GlobalConfig config
func main() { func main() {
var err error var err error
// init config // init config
if err = env.Parse(&GlobalConfig); err != nil { if err = env.Parse(&GlobalConfig); err != nil {
log.Fatal(fmt.Sprintf("unable to parse config, error: %+v\n", err)) log.Fatal(fmt.Sprintf("unable to parse config, error: %+v\n", err))
return return
} }
log.Printf("Albatros public URL: %s\n", GlobalConfig.AlbatrosURL) log.Printf("Albatros public URL: %s\n", GlobalConfig.AlbatrosURL)
// init nomad // init nomad
nomadConfig := nomad.DefaultConfig() nomadConfig := nomad.DefaultConfig()
nomadConfig.Namespace = "ci" nomadConfig.Namespace = "ci"
NomadClient, err = nomad.NewClient(nomadConfig) NomadClient, err = nomad.NewClient(nomadConfig)
if err != nil { if err != nil {
log.Fatal("Unable to connect to Nomad, check your config and setup") log.Fatal("Unable to connect to Nomad, check your config and setup")
return return
} }
// init consul // init consul
consulConfig := consul.DefaultConfig() consulConfig := consul.DefaultConfig()
ConsulClient, err = consul.NewClient(consulConfig) ConsulClient, err = consul.NewClient(consulConfig)
if err != nil { if err != nil {
log.Fatal("Unable to connect to Consul, check your config and setup") log.Fatal("Unable to connect to Consul, check your config and setup")
return return
} }
// init webserver // init webserver
http.HandleFunc("/hook", hook) http.HandleFunc("/hook", hook)
http.HandleFunc("/build", build) http.HandleFunc("/build", build)
// launch // launch
log.Println("Listen on :8080") log.Println("Listen on :8080")
if err = http.ListenAndServe(":8080", nil); err != nil { if err = http.ListenAndServe(":8080", nil); err != nil {
log.Fatal("Can't start HTTP server") log.Fatal("Can't start HTTP server")