albatros/main.go

381 lines
11 KiB
Go
Raw Normal View History

2023-03-14 17:51:31 +00:00
package main
2023-03-15 18:59:20 +00:00
2023-03-14 17:51:31 +00:00
import (
2023-03-15 18:59:20 +00:00
"code.gitea.io/sdk/gitea"
b64 "encoding/base64"
2023-03-15 10:37:01 +00:00
"encoding/json"
"fmt"
2023-03-15 18:59:20 +00:00
"github.com/caarlos0/env/v7"
consul "github.com/hashicorp/consul/api"
2023-03-15 10:37:01 +00:00
nomad "github.com/hashicorp/nomad/api"
"io"
"log"
"net/http"
"strings"
2023-03-14 17:51:31 +00:00
)
type GitUser struct {
2023-03-15 10:37:01 +00:00
Name string `json:"name"`
Email string `json:"email"`
Username string `json:username"`
2023-03-14 17:51:31 +00:00
}
type GiteaCommit struct {
2023-03-15 10:37:01 +00:00
Id string `json:"id"`
Message string `json:"message"`
Url string `json:"string"`
Author GitUser `json:"author"`
Committer GitUser `json:"committer"`
Timestamp string `json:"timestamp"`
2023-03-14 17:51:31 +00:00
}
type GiteaAccount struct {
2023-03-15 10:37:01 +00:00
Id int64 `json:"id"`
Login string `json:"login"`
FullName string `json:"full_name"`
Email string `json:"email"`
AvatarUrl string `json:"avatar_url"`
Username string `json:"username"`
2023-03-14 17:51:31 +00:00
}
type GiteaRepository struct {
2023-03-15 10:37:01 +00:00
Id int64 `json:"id"`
Owner GiteaAccount `json:"owner"`
2023-03-15 18:59:20 +00:00
Name string `json:"name"`
FullName string `json:"full_name"`
2023-03-15 10:37:01 +00:00
Description string `json:"description"`
Private bool `json:"private"`
Fork bool `json:"private"`
HtmlUrl string `json:"html_url"`
SshUrl string `json:"ssh_url"`
CloneUrl string `json:"clone_url"`
Website string `json:"website"`
StarsCount int64 `json:"stars_count"`
ForksCount int64 `json:"forks_count"`
WatchersCount int64 `json:"watchers_count"`
OpenIssuesCount int64 `json:"open_issues_count"`
DefaultBranch string `json:"default_branch"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
2023-03-14 17:51:31 +00:00
}
type GiteaNotification struct {
2023-03-15 10:37:01 +00:00
Secret string `json:"secret"`
Ref string `json:"ref"`
Before string `json:"before"`
After string `json:"after"`
CompareUrl string `json:"compare_url"`
Commits []GiteaCommit `json:"commits"`
Repository GiteaRepository `json:"repository"`
Pusher GiteaAccount `json:"pusher"`
Sender GiteaAccount `json:"sender"`
2023-03-14 17:51:31 +00:00
}
2023-03-15 14:34:52 +00:00
type SecretHook struct {
2023-03-15 18:59:20 +00:00
Token string `json:"token"`
2023-03-15 14:34:52 +00:00
}
type SecretGitea struct {
2023-03-15 18:59:20 +00:00
Url string `json:"url"`
Token string `json:"token"`
2023-03-15 14:34:52 +00:00
}
type SecretTrusted struct {
2023-03-15 18:59:20 +00:00
Senders []string `json:"senders"`
2023-03-15 14:34:52 +00:00
}
type ConsulSecret struct {
2023-03-15 18:59:20 +00:00
Hook SecretHook `json:"hook"`
Gitea SecretGitea `json:"gitea"`
Trusted SecretTrusted `json:"trusted"`
Inject string `json:"inject"`
2023-03-15 14:34:52 +00:00
}
func nomadToGiteaStatus(summary *nomad.TaskGroupSummary) gitea.StatusState {
2023-03-15 18:59:20 +00:00
if summary.Failed > 0 {
return gitea.StatusError
}
if summary.Lost > 0 || summary.Unknown > 0 {
return gitea.StatusFailure
}
if summary.Queued > 0 || summary.Starting > 0 || summary.Running > 0 {
return gitea.StatusPending
}
if summary.Complete > 0 {
return gitea.StatusSuccess
}
// When the job is just started, all the counters are = 0.
return gitea.StatusPending
}
func notifSummary(notification *GiteaNotification) string {
return fmt.Sprintf("%s/%s:%s", notification.Repository.Owner.Username, notification.Repository.Name, notification.After)
2023-03-15 14:34:52 +00:00
}
func lifecycle(notification *GiteaNotification, dispatch *nomad.JobDispatchResponse, giteaCreds *SecretGitea) {
2023-03-15 18:59:20 +00:00
notifInfo := notifSummary(notification)
log.Printf("[lifecyle] Commit to build: %s, Gitea URL: %s\n", notifInfo, giteaCreds.Url)
// init Gitea
forge, err := gitea.NewClient(giteaCreds.Url, gitea.SetToken(giteaCreds.Token))
if err != nil {
log.Printf("Unable to create gitea client for %s: %+v\n", notifInfo, err)
return
}
// get job's deployment
jobs := NomadClient.Jobs()
queryOpt := nomad.QueryOptions{
AllowStale: true,
}
safeguard := 1000
for ; safeguard > 0; safeguard-- {
// Blocking fetch on deployment info
job, meta, err := jobs.Summary(dispatch.DispatchedJobID, &queryOpt)
if err != nil {
log.Printf("[lifecycle] can't fetch job for %s: %+v\n", notifInfo, err)
break
}
queryOpt.WaitIndex = meta.LastIndex
summary, ok := job.Summary["runner"]
if !ok {
log.Printf("[lifecycle] Job %s for %s must contain a 'runner' task\n", job.JobID, notifInfo)
break
}
log.Printf("[lifecycle] Task status for job %s on %s: %+v\n", job.JobID, notifInfo, summary)
// Compute new job state
state := nomadToGiteaStatus(&summary)
// Try updating Gitea commit status
_, _, err = forge.CreateStatus(
notification.Repository.Owner.Username,
notification.Repository.Name,
notification.After,
gitea.CreateStatusOption{
State: state,
TargetURL: GlobalConfig.AlbatrosURL + "/build?log=stderr&job=" + dispatch.DispatchedJobID,
Description: "build",
Context: "Albatros",
})
if err != nil {
log.Printf("[lifecycle] can't update gitea repo %s for job %s: %+v\n", notifInfo, job.JobID, err)
}
// Continue the loop only if the job is pending
if state != gitea.StatusPending {
log.Printf("Job %s for %s terminated with status %s\n", job.JobID, notifInfo, state)
break
}
}
if safeguard == 0 {
// To avoid dangerous infinite loops, we put an upperbound here
// of 1k refresh here. Reaching this limit will allow us to know
// that something did not work as expected...
log.Printf("!!! [lifecycle] we refreshed 1k times the job of %s and it's still running, giving up...\n", notifInfo)
}
2023-03-15 14:34:52 +00:00
}
2023-03-14 17:51:31 +00:00
func hook(w http.ResponseWriter, r *http.Request) {
2023-03-15 10:37:01 +00:00
if r.Method != http.MethodPost {
http.Error(w, "Hook only support POST requests", http.StatusBadRequest)
}
q := r.URL.Query()
2023-03-15 14:34:52 +00:00
maybeToken, ok := q["token"]
if !ok || len(maybeToken) < 1 {
2023-03-15 10:37:01 +00:00
http.Error(w, "Missing query parameter 'token'. Try adding '?token=xxx'", http.StatusBadRequest)
return
}
2023-03-15 18:59:20 +00:00
token := maybeToken[0]
2023-03-15 10:37:01 +00:00
flavor := "default"
var notification GiteaNotification
dec := json.NewDecoder(r.Body)
if err := dec.Decode(&notification); err != nil {
http.Error(w, "Can't parse your request JSON", http.StatusBadRequest)
return
}
2023-03-15 18:59:20 +00:00
notifInfo := notifSummary(&notification)
log.Printf("Received gitea notification for %s\n", notifInfo)
// Fetch our repo descriptor
kv := ConsulClient.KV()
encodedRepoUrl := b64.StdEncoding.EncodeToString([]byte(notification.Repository.CloneUrl))
key := "albatros/" + encodedRepoUrl
log.Printf("Fetching key %s for %s\n", key, notifInfo)
pair, _, err := kv.Get(key, nil)
if err != nil {
http.Error(w, "Can't fetch the repo descriptor in Consul", http.StatusInternalServerError)
return
}
if pair == nil || pair.Value == nil {
http.Error(w, "You must declare %s in Consul in order to build it", http.StatusForbidden)
return
}
// Parse our repo descriptor
var repoDesc ConsulSecret
if err = json.Unmarshal(pair.Value, &repoDesc); err != nil {
http.Error(w, "Can't decode your Consul configuration for this repo", http.StatusInternalServerError)
return
}
// Check token
if repoDesc.Hook.Token != token {
http.Error(w, "Your albatros token does not match the one declared in Consul", http.StatusForbidden)
return
}
2023-03-15 10:37:01 +00:00
2023-03-15 18:59:20 +00:00
// Build job parameters for Nomad
2023-03-15 10:37:01 +00:00
meta := map[string]string{
"REPO_URL": notification.Repository.CloneUrl,
"COMMIT": notification.After,
"FLAVOR": flavor,
// @FIXME: this code is not correct, this is a hack
"BRANCH": strings.ReplaceAll(notification.Ref, "refs/heads/", ""),
}
// @FIXME logic on how to inject secrets securely
// 1. Check senders
// 2. Transform the consul object into a nomad payload
jobs := NomadClient.Jobs()
2023-03-15 18:59:20 +00:00
dres, _, err := jobs.Dispatch("builder", meta, []byte{}, "albatros", &nomad.WriteOptions{})
2023-03-15 10:37:01 +00:00
if err != nil {
http.Error(w, "Can't submit your job to Nomad", http.StatusInternalServerError)
}
2023-03-15 18:59:20 +00:00
log.Printf("Created job %s for %s\n", dres.DispatchedJobID, notifInfo)
2023-03-15 10:37:01 +00:00
2023-03-15 18:59:20 +00:00
// Start a lifecycle observer to update gitea status
// @FIXME: need to inject gitea descriptor
go lifecycle(&notification, dres, &repoDesc.Gitea)
2023-03-15 14:34:52 +00:00
2023-03-15 10:37:01 +00:00
io.WriteString(w, dres.DispatchedJobID)
2023-03-14 17:51:31 +00:00
}
func build(w http.ResponseWriter, r *http.Request) {
2023-03-15 10:37:01 +00:00
q := r.URL.Query()
jobID, ok := q["job"]
if !ok || len(jobID) < 1 {
http.Error(w, "Missing query parameter 'job'. Try adding '?job=xxx'", http.StatusBadRequest)
return
}
logType, ok := q["log"]
2023-03-15 18:59:20 +00:00
log.Printf("Follow logs for %s\n", jobID)
2023-03-15 10:37:01 +00:00
if !ok || len(logType) < 1 || !(logType[0] == "stdout" || logType[0] == "stderr") {
http.Error(w, "Missing or wrong query parameter 'log'.\nTry adding '?log=stdout' or '?log=stderr'", http.StatusBadRequest)
return
}
logFilter := logType[0]
jobs := NomadClient.Jobs()
allocList, _, err := jobs.Allocations(jobID[0], true, &nomad.QueryOptions{})
if err != nil {
http.Error(w, "Unable to fetch your job on Nomad. Might be garbage collected.", http.StatusInternalServerError)
return
}
if len(allocList) < 1 {
http.Error(w, "Job does not contain at least 1 allocation, can't display logs. Job might be garbage collected. ", http.StatusNotFound)
return
}
myAllocStub := allocList[0]
allocations := NomadClient.Allocations()
myAlloc, _, err := allocations.Info(myAllocStub.ID, &nomad.QueryOptions{})
if err != nil {
http.Error(w, "Allocation does not exist anymore. Allocation might be garbage collected", http.StatusNotFound)
return
}
2023-03-15 18:59:20 +00:00
log.Printf("Selected alloc %s for job %s\n", myAlloc.ID, jobID)
2023-03-15 10:37:01 +00:00
allocFS := NomadClient.AllocFS()
scancel := make(chan struct{})
sframe, serr := allocFS.Logs(myAlloc, true, "runner", logFilter, "start", 0, scancel, &nomad.QueryOptions{})
2023-03-15 18:59:20 +00:00
// stream logs to client's browser
build_loop:
2023-03-15 10:37:01 +00:00
for {
select {
2023-03-15 18:59:20 +00:00
case <-r.Context().Done():
// client disconnect, cleaning
break build_loop
2023-03-15 10:37:01 +00:00
case nomadErr := <-serr:
// an error occured in nomad, inform user and clean
_, _ = io.WriteString(w, fmt.Sprintf("\nBroken stream: %+v\n", nomadErr))
break build_loop
case chunk := <-sframe:
2023-03-15 18:59:20 +00:00
// we get some data from nomad, send it to the client
2023-03-15 10:37:01 +00:00
for i := 0; i < len(chunk.Data); {
written, err := w.Write(chunk.Data[i:])
2023-03-15 18:59:20 +00:00
w.(http.Flusher).Flush() // flush the buffer to send it right now
2023-03-15 10:37:01 +00:00
i += written
if err == io.EOF {
_, _ = io.WriteString(w, "End of file :-)")
break build_loop
} else if err != nil {
_, _ = io.WriteString(w, fmt.Sprintf("\nBroken stream: %+v\n", err))
break build_loop
}
}
}
}
2023-03-15 18:59:20 +00:00
log.Printf("Connection closed, cleaning listeners for %s\n", jobID)
scancel <- struct{}{}
2023-03-15 10:37:01 +00:00
}
2023-03-14 17:51:31 +00:00
2023-03-15 07:35:37 +00:00
var NomadClient *nomad.Client
2023-03-15 14:34:52 +00:00
var ConsulClient *consul.Client
type config struct {
2023-03-15 18:59:20 +00:00
AlbatrosURL string `env:"ALBATROS_URL,required"`
2023-03-15 14:34:52 +00:00
// @TODO get nomad config from env
2023-03-15 18:59:20 +00:00
// @TODO get consul config from env
2023-03-15 14:34:52 +00:00
}
2023-03-15 18:59:20 +00:00
2023-03-15 14:34:52 +00:00
var GlobalConfig config
2023-03-15 07:35:37 +00:00
2023-03-14 17:51:31 +00:00
func main() {
2023-03-15 10:37:01 +00:00
var err error
2023-03-15 14:34:52 +00:00
2023-03-15 18:59:20 +00:00
// init config
if err = env.Parse(&GlobalConfig); err != nil {
log.Fatal(fmt.Sprintf("unable to parse config, error: %+v\n", err))
return
}
log.Printf("Albatros public URL: %s\n", GlobalConfig.AlbatrosURL)
2023-03-15 14:34:52 +00:00
2023-03-15 18:59:20 +00:00
// init nomad
2023-03-15 10:37:01 +00:00
nomadConfig := nomad.DefaultConfig()
nomadConfig.Namespace = "ci"
NomadClient, err = nomad.NewClient(nomadConfig)
if err != nil {
log.Fatal("Unable to connect to Nomad, check your config and setup")
2023-03-15 18:59:20 +00:00
return
2023-03-15 10:37:01 +00:00
}
2023-03-15 18:59:20 +00:00
// init consul
consulConfig := consul.DefaultConfig()
ConsulClient, err = consul.NewClient(consulConfig)
if err != nil {
2023-03-15 14:34:52 +00:00
log.Fatal("Unable to connect to Consul, check your config and setup")
2023-03-15 18:59:20 +00:00
return
}
2023-03-15 14:34:52 +00:00
2023-03-15 18:59:20 +00:00
// init webserver
2023-03-15 10:37:01 +00:00
http.HandleFunc("/hook", hook)
http.HandleFunc("/build", build)
2023-03-15 18:59:20 +00:00
// launch
2023-03-15 14:34:52 +00:00
log.Println("Listen on :8080")
2023-03-15 10:37:01 +00:00
if err = http.ListenAndServe(":8080", nil); err != nil {
log.Fatal("Can't start HTTP server")
}
2023-03-14 17:51:31 +00:00
}