From a0b05a6c2dfd04e0351822931011c514244a82f5 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 17 Apr 2023 11:25:26 +0200 Subject: [PATCH] refactor --- bin/alba.go | 0 bin/albatros.go | 23 + docker-compose.yml | 13 + .../albatros.hook.json | 0 consul/gitea.json => example/gitea.hook.json | 0 main.go | 421 ------------------ pkg/cluster.go | 46 ++ pkg/config.go | 40 ++ pkg/gitea.go | 86 ++++ pkg/handler.go | 189 ++++++++ pkg/job.go | 84 ++++ pkg/secret.go | 21 + 12 files changed, 502 insertions(+), 421 deletions(-) create mode 100644 bin/alba.go create mode 100644 bin/albatros.go create mode 100644 docker-compose.yml rename consul/albatros.json => example/albatros.hook.json (100%) rename consul/gitea.json => example/gitea.hook.json (100%) delete mode 100644 main.go create mode 100644 pkg/cluster.go create mode 100644 pkg/config.go create mode 100644 pkg/gitea.go create mode 100644 pkg/handler.go create mode 100644 pkg/job.go create mode 100644 pkg/secret.go diff --git a/bin/alba.go b/bin/alba.go new file mode 100644 index 0000000..e69de29 diff --git a/bin/albatros.go b/bin/albatros.go new file mode 100644 index 0000000..8e0c82c --- /dev/null +++ b/bin/albatros.go @@ -0,0 +1,23 @@ +package main + +import ( + "log" + "net/http" + + "git.deuxfleurs.fr/deuxfleurs/albatros/pkg" +) + +func main() { + config := pkg.BuildConfig() + cluster := pkg.NewCluster(&config) + + // init webserver + http.Handle("/hook", pkg.HookHandler{&config, &cluster}) + http.Handle("/build", pkg.BuildHandler{&config, &cluster}) + + // launch + log.Println("Albatros listen on :8080") + if err := http.ListenAndServe(":8080", nil); err != nil { + log.Fatal("Can't start HTTP server") + } +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..60c296a --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,13 @@ +version: '3.4' +services: + nomad: + image: nomad + command: nomad + consul: + image: consul + albatros: + image: dxflrs/albatros:4c8ea6cec9a0645825d2323465784ba3fe4ebf12 + ports: + - "8080:8080" + environment: + - ALBATROS_URL="http://localhost:8080" diff --git a/consul/albatros.json b/example/albatros.hook.json similarity index 100% rename from consul/albatros.json rename to example/albatros.hook.json diff --git a/consul/gitea.json b/example/gitea.hook.json similarity index 100% rename from consul/gitea.json rename to example/gitea.hook.json diff --git a/main.go b/main.go deleted file mode 100644 index 405c02f..0000000 --- a/main.go +++ /dev/null @@ -1,421 +0,0 @@ -package main - -import ( - "code.gitea.io/sdk/gitea" - b64 "encoding/base64" - "encoding/json" - "fmt" - "github.com/caarlos0/env/v7" - consul "github.com/hashicorp/consul/api" - nomad "github.com/hashicorp/nomad/api" - "golang.org/x/exp/slices" - "io" - "log" - "net/http" - "strings" -) - -/* Albatros CI */ - -type GitUser struct { - Name string `json:"name"` - Email string `json:"email"` - Username string `json:username"` -} - -type GiteaCommit struct { - Id string `json:"id"` - Message string `json:"message"` - Url string `json:"string"` - Author GitUser `json:"author"` - Committer GitUser `json:"committer"` - Timestamp string `json:"timestamp"` -} - -type GiteaAccount struct { - Id int64 `json:"id"` - Login string `json:"login"` - FullName string `json:"full_name"` - Email string `json:"email"` - AvatarUrl string `json:"avatar_url"` - Username string `json:"username"` -} - -type GiteaRepository struct { - Id int64 `json:"id"` - Owner GiteaAccount `json:"owner"` - Name string `json:"name"` - FullName string `json:"full_name"` - Description string `json:"description"` - Private bool `json:"private"` - Fork bool `json:"private"` - HtmlUrl string `json:"html_url"` - SshUrl string `json:"ssh_url"` - CloneUrl string `json:"clone_url"` - Website string `json:"website"` - StarsCount int64 `json:"stars_count"` - ForksCount int64 `json:"forks_count"` - WatchersCount int64 `json:"watchers_count"` - OpenIssuesCount int64 `json:"open_issues_count"` - DefaultBranch string `json:"default_branch"` - CreatedAt string `json:"created_at"` - UpdatedAt string `json:"updated_at"` -} - -type GiteaNotification struct { - Secret string `json:"secret"` - Ref string `json:"ref"` - Before string `json:"before"` - After string `json:"after"` - CompareUrl string `json:"compare_url"` - Commits []GiteaCommit `json:"commits"` - Repository GiteaRepository `json:"repository"` - Pusher GiteaAccount `json:"pusher"` - Sender GiteaAccount `json:"sender"` -} - -type SecretHook struct { - Token string `json:"token"` -} - -type SecretGitea struct { - Url string `json:"url"` - Token string `json:"token"` -} - -type SecretTrusted struct { - Senders []string `json:"senders"` -} - -type ConsulSecret struct { - Hook SecretHook `json:"hook"` - Gitea SecretGitea `json:"gitea"` - Trusted SecretTrusted `json:"trusted"` - Inject string `json:"inject"` -} - -func nomadToGiteaStatus(summary *nomad.TaskGroupSummary) gitea.StatusState { - if summary.Failed > 0 { - return gitea.StatusError - } - if summary.Lost > 0 || summary.Unknown > 0 { - return gitea.StatusFailure - } - if summary.Queued > 0 || summary.Starting > 0 || summary.Running > 0 { - return gitea.StatusPending - } - if summary.Complete > 0 { - return gitea.StatusSuccess - } - // When the job is just started, all the counters are = 0. - return gitea.StatusPending -} - -func notifSummary(notification *GiteaNotification) string { - return fmt.Sprintf("%s/%s:%s", notification.Repository.Owner.Username, notification.Repository.Name, notification.After) -} - -func lifecycle(notification *GiteaNotification, dispatch *nomad.JobDispatchResponse, giteaCreds *SecretGitea, flavor string) { - notifInfo := notifSummary(notification) - - log.Printf("[lifecycle] Commit to build: %s, Gitea URL: %s\n", notifInfo, giteaCreds.Url) - // init Gitea - forge, err := gitea.NewClient(giteaCreds.Url, gitea.SetToken(giteaCreds.Token)) - if err != nil { - log.Printf("Unable to create gitea client for %s: %+v\n", notifInfo, err) - return - } - - // get job's deployment - jobs := NomadClient.Jobs() - queryOpt := nomad.QueryOptions{ - AllowStale: false, - } - - safeguard := 1000 - for ; safeguard > 0; safeguard-- { - // Blocking fetch on deployment info - job, meta, err := jobs.Summary(dispatch.DispatchedJobID, &queryOpt) - if err != nil { - log.Printf("[lifecycle] can't fetch job for %s: %+v\n", notifInfo, err) - break - } - queryOpt.WaitIndex = meta.LastIndex - - summary, ok := job.Summary["runner"] - if !ok { - log.Printf("[lifecycle] Job %s for %s must contain a 'runner' task\n", job.JobID, notifInfo) - break - } - log.Printf("[lifecycle] Task status for job %s on %s: %+v\n", job.JobID, notifInfo, summary) - - // Compute new job state - state := nomadToGiteaStatus(&summary) - - // Try updating Gitea commit status - _, _, err = forge.CreateStatus( - notification.Repository.Owner.Username, - notification.Repository.Name, - notification.After, - gitea.CreateStatusOption{ - State: state, - TargetURL: GlobalConfig.AlbatrosURL + "/build?log=stderr&job=" + dispatch.DispatchedJobID, - Description: flavor, - Context: "Albatros", - }) - - if err != nil { - log.Printf("[lifecycle] can't update gitea repo %s for job %s: %+v\n", notifInfo, job.JobID, err) - } - - // Continue the loop only if the job is pending - if state != gitea.StatusPending { - log.Printf("Job %s for %s terminated with status %s\n", job.JobID, notifInfo, state) - break - } - } - - if safeguard == 0 { - // To avoid dangerous infinite loops, we put an upperbound here - // of 1k refresh here. Reaching this limit will allow us to know - // that something did not work as expected... - log.Printf("!!! [lifecycle] we refreshed 1k times the job of %s and it's still running, giving up...\n", notifInfo) - } -} - -func hook(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - http.Error(w, "Hook only support POST requests", http.StatusBadRequest) - } - - q := r.URL.Query() - maybeToken, ok := q["token"] - if !ok || len(maybeToken) < 1 { - http.Error(w, "Missing query parameter 'token'. Try adding '?token=xxx'", http.StatusBadRequest) - return - } - token := maybeToken[0] - flavor := "default" - - var notification GiteaNotification - dec := json.NewDecoder(r.Body) - if err := dec.Decode(¬ification); err != nil { - http.Error(w, "Can't parse your request JSON", http.StatusBadRequest) - return - } - notifInfo := notifSummary(¬ification) - - log.Printf("Received gitea notification for %s\n", notifInfo) - - // Fetch our repo descriptor - kv := ConsulClient.KV() - encodedRepoUrl := b64.StdEncoding.EncodeToString([]byte(notification.Repository.CloneUrl)) - key := "albatros/" + encodedRepoUrl - log.Printf("Fetching key %s for %s\n", key, notifInfo) - pair, _, err := kv.Get(key, nil) - if err != nil { - http.Error(w, "Can't fetch the repo descriptor in Consul", http.StatusInternalServerError) - log.Printf("Job error for %s, can't fetch repo descriptor in consul: %+v\n", notifInfo, err) - return - } - if pair == nil || pair.Value == nil { - http.Error(w, "You must declare %s in Consul in order to build it", http.StatusForbidden) - return - } - // Parse our repo descriptor - var repoDesc ConsulSecret - if err = json.Unmarshal(pair.Value, &repoDesc); err != nil { - http.Error(w, "Can't decode your Consul configuration for this repo", http.StatusInternalServerError) - log.Printf("Job error for %s, can't unmarshal the Consul descriptor: %+v\n", notifInfo, err) - return - } - // Check token - if repoDesc.Hook.Token != token { - http.Error(w, "Your albatros token does not match the one declared in Consul", http.StatusForbidden) - return - } - - // Build job parameters for Nomad - meta := map[string]string{ - "REPO_URL": notification.Repository.CloneUrl, - "COMMIT": notification.After, - "FLAVOR": flavor, - // @FIXME: this code is not correct, this is a hack - "BRANCH": strings.ReplaceAll(notification.Ref, "refs/heads/", ""), - } - - // Check sender - payload := []byte{} - if slices.Contains(repoDesc.Trusted.Senders, notification.Sender.Username) { - log.Printf("Trusted build of %s as %s in the list of allowed senders, inject secrets\n", notifInfo, notification.Sender.Username) - // Write payload - payload = []byte(repoDesc.Inject) - } - - jobs := NomadClient.Jobs() - dres, _, err := jobs.Dispatch("builder", meta, payload, "albatros", &nomad.WriteOptions{}) - if err != nil { - http.Error(w, "Can't submit your job to Nomad", http.StatusInternalServerError) - } - log.Printf("Created job %s for %s\n", dres.DispatchedJobID, notifInfo) - - // Start a lifecycle observer to update gitea status - go lifecycle(¬ification, dres, &repoDesc.Gitea, flavor) - - io.WriteString(w, dres.DispatchedJobID) -} - -func build(w http.ResponseWriter, r *http.Request) { - q := r.URL.Query() - jobID, ok := q["job"] - if !ok || len(jobID) < 1 { - http.Error(w, "Missing query parameter 'job'. Try adding '?job=xxx'", http.StatusBadRequest) - return - } - - logType, ok := q["log"] - log.Printf("Follow logs for %s\n", jobID) - if !ok || len(logType) < 1 || !(logType[0] == "stdout" || logType[0] == "stderr") { - http.Error(w, "Missing or wrong query parameter 'log'.\nTry adding '?log=stdout' or '?log=stderr'", http.StatusBadRequest) - return - } - logFilter := logType[0] - - jobs := NomadClient.Jobs() - allocList, _, err := jobs.Allocations(jobID[0], true, &nomad.QueryOptions{}) - if err != nil { - http.Error(w, "Unable to fetch your job on Nomad. Might be garbage collected.", http.StatusInternalServerError) - return - } - if len(allocList) < 1 { - http.Error(w, "Job does not contain at least 1 allocation, can't display logs. Job might be garbage collected. ", http.StatusNotFound) - return - } - - myAllocStub := allocList[0] - - allocations := NomadClient.Allocations() - myAlloc, _, err := allocations.Info(myAllocStub.ID, &nomad.QueryOptions{}) - if err != nil { - http.Error(w, "Allocation does not exist anymore. Allocation might be garbage collected", http.StatusNotFound) - return - } - log.Printf("Selected alloc %s for job %s\n", myAlloc.ID, jobID) - - allocFS := NomadClient.AllocFS() - scancel := make(chan struct{}) - sframe, serr := allocFS.Logs(myAlloc, true, "executor", logFilter, "start", 0, scancel, &nomad.QueryOptions{}) - - // stream logs to client's browser -build_loop: - for { - select { - case <-r.Context().Done(): - // client disconnect, cleaning - break build_loop - case nomadErr := <-serr: - // an error occured in nomad, inform user and clean - _, _ = io.WriteString(w, fmt.Sprintf("\nBroken stream: %+v\n", nomadErr)) - break build_loop - case chunk := <-sframe: - // we get some data from nomad, send it to the client - for i := 0; i < len(chunk.Data); { - written, err := w.Write(chunk.Data[i:]) - w.(http.Flusher).Flush() // flush the buffer to send it right now - i += written - if err == io.EOF { - _, _ = io.WriteString(w, "End of file :-)") - break build_loop - } else if err != nil { - _, _ = io.WriteString(w, fmt.Sprintf("\nBroken stream: %+v\n", err)) - break build_loop - } - } - } - } - - log.Printf("Connection closed, cleaning listeners for %s\n", jobID) - scancel <- struct{}{} -} - -var NomadClient *nomad.Client -var ConsulClient *consul.Client - -type config struct { - AlbatrosURL string `env:"ALBATROS_URL,required"` - NomadAddr string `env:"NOMAD_ADDR"` - NomadClientCert string `env:"NOMAD_CLIENT_CERT"` - NomadClientKey string `env:"NOMAD_CLIENT_KEY"` - NomadCACert string `env:"NOMAD_CACERT"` - ConsulAddr string `env:"CONSUL_HTTP_ADDR"` - ConsulClientCert string `env:"CONSUL_CLIENT_CERT"` - ConsulClientKey string `env:"CONSUL_CLIENT_KEY"` - ConsulCACert string `env:"CONSUL_CACERT"` -} - -var GlobalConfig config - -func initConfig() { - if err := env.Parse(&GlobalConfig); err != nil { - log.Fatal(fmt.Sprintf("unable to parse config, error: %+v\n", err)) - } - log.Printf("Albatros public URL: %s\n", GlobalConfig.AlbatrosURL) - if GlobalConfig.NomadAddr != "" { - isTLS := GlobalConfig.NomadClientCert != "" && GlobalConfig.NomadClientKey != "" && GlobalConfig.NomadCACert != "" - log.Printf("Nomad URL: %s, TLS: %t\n", GlobalConfig.NomadAddr, isTLS) - } else { - log.Println("Use Nomad default configuration") - } - if GlobalConfig.ConsulAddr != "" { - isTLS := GlobalConfig.ConsulClientCert != "" && GlobalConfig.ConsulClientKey != "" && GlobalConfig.ConsulCACert != "" - log.Printf("Consul URL: %s, TLS: %t\n", GlobalConfig.ConsulAddr, isTLS) - } else { - log.Println("Use Consul default configuration") - } -} - -func initNomad() { - var err error - nomadConfig := nomad.DefaultConfig() - nomadConfig.Namespace = "ci" - nomadConfig.Address = GlobalConfig.NomadAddr - nomadConfig.TLSConfig.CACert = GlobalConfig.NomadCACert - nomadConfig.TLSConfig.ClientCert = GlobalConfig.NomadClientCert - nomadConfig.TLSConfig.ClientKey = GlobalConfig.NomadClientKey - NomadClient, err = nomad.NewClient(nomadConfig) - if err != nil { - log.Fatal("Unable to connect to Nomad, check your config and setup") - } -} - -func initConsul() { - var err error - consulConfig := consul.DefaultConfig() - consulConfig.Address = GlobalConfig.ConsulAddr - consulConfig.TLSConfig.CAFile = GlobalConfig.ConsulCACert - consulConfig.TLSConfig.CertFile = GlobalConfig.ConsulClientCert - consulConfig.TLSConfig.KeyFile = GlobalConfig.ConsulClientKey - ConsulClient, err = consul.NewClient(consulConfig) - if err != nil { - log.Fatal("Unable to connect to Consul, check your config and setup") - return - } -} - -func main() { - var err error - // init components. - initConfig() - initNomad() - initConsul() - - // init webserver - http.HandleFunc("/hook", hook) - http.HandleFunc("/build", build) - - // launch - log.Println("Albatros listen on :8080") - if err = http.ListenAndServe(":8080", nil); err != nil { - log.Fatal("Can't start HTTP server") - } -} diff --git a/pkg/cluster.go b/pkg/cluster.go new file mode 100644 index 0000000..aef1d8a --- /dev/null +++ b/pkg/cluster.go @@ -0,0 +1,46 @@ +package pkg + +import ( + "log" + consul "github.com/hashicorp/consul/api" + nomad "github.com/hashicorp/nomad/api" +) + + +type Cluster struct { + Nomad *nomad.Client + Consul *consul.Client +} + +func NewCluster(conf* Config) Cluster { + cluster := Cluster{} + + // Init Nomad + nomadConfig := nomad.DefaultConfig() + nomadConfig.Namespace = "ci" + nomadConfig.Address = conf.NomadAddr + nomadConfig.TLSConfig.CACert = conf.NomadCACert + nomadConfig.TLSConfig.ClientCert = conf.NomadClientCert + nomadConfig.TLSConfig.ClientKey = conf.NomadClientKey + + ncli, err := nomad.NewClient(nomadConfig) + if err != nil { + log.Fatal("Unable to connect to Nomad, check your config and setup") + } + cluster.Nomad = ncli + + // Init Consul + consulConfig := consul.DefaultConfig() + consulConfig.Address = conf.ConsulAddr + consulConfig.TLSConfig.CAFile = conf.ConsulCACert + consulConfig.TLSConfig.CertFile = conf.ConsulClientCert + consulConfig.TLSConfig.KeyFile = conf.ConsulClientKey + + ccli, err := consul.NewClient(consulConfig) + if err != nil { + log.Fatal("Unable to connect to Consul, check your config and setup") + } + cluster.Consul = ccli + + return cluster +} diff --git a/pkg/config.go b/pkg/config.go new file mode 100644 index 0000000..ec7775f --- /dev/null +++ b/pkg/config.go @@ -0,0 +1,40 @@ +package pkg + +import ( + "log" + "fmt" + "github.com/caarlos0/env/v7" +) + +type Config struct { + AlbatrosURL string `env:"ALBATROS_URL,required"` + NomadAddr string `env:"NOMAD_ADDR"` + NomadClientCert string `env:"NOMAD_CLIENT_CERT"` + NomadClientKey string `env:"NOMAD_CLIENT_KEY"` + NomadCACert string `env:"NOMAD_CACERT"` + ConsulAddr string `env:"CONSUL_HTTP_ADDR"` + ConsulClientCert string `env:"CONSUL_CLIENT_CERT"` + ConsulClientKey string `env:"CONSUL_CLIENT_KEY"` + ConsulCACert string `env:"CONSUL_CACERT"` +} + +func BuildConfig() Config { + GlobalConfig := Config{} + if err := env.Parse(&GlobalConfig); err != nil { + log.Fatal(fmt.Sprintf("unable to parse config, error: %+v\n", err)) + } + log.Printf("Albatros public URL: %s\n", GlobalConfig.AlbatrosURL) + if GlobalConfig.NomadAddr != "" { + isTLS := GlobalConfig.NomadClientCert != "" && GlobalConfig.NomadClientKey != "" && GlobalConfig.NomadCACert != "" + log.Printf("Nomad URL: %s, TLS: %t\n", GlobalConfig.NomadAddr, isTLS) + } else { + log.Println("Use Nomad default configuration") + } + if GlobalConfig.ConsulAddr != "" { + isTLS := GlobalConfig.ConsulClientCert != "" && GlobalConfig.ConsulClientKey != "" && GlobalConfig.ConsulCACert != "" + log.Printf("Consul URL: %s, TLS: %t\n", GlobalConfig.ConsulAddr, isTLS) + } else { + log.Println("Use Consul default configuration") + } + return GlobalConfig +} diff --git a/pkg/gitea.go b/pkg/gitea.go new file mode 100644 index 0000000..6a4fe53 --- /dev/null +++ b/pkg/gitea.go @@ -0,0 +1,86 @@ +package pkg + +import ( + "fmt" + + "code.gitea.io/sdk/gitea" + nomad "github.com/hashicorp/nomad/api" +) + +type GitUser struct { + Name string `json:"name"` + Email string `json:"email"` + Username string `json:username"` +} + +type GiteaCommit struct { + Id string `json:"id"` + Message string `json:"message"` + Url string `json:"string"` + Author GitUser `json:"author"` + Committer GitUser `json:"committer"` + Timestamp string `json:"timestamp"` +} + +type GiteaAccount struct { + Id int64 `json:"id"` + Login string `json:"login"` + FullName string `json:"full_name"` + Email string `json:"email"` + AvatarUrl string `json:"avatar_url"` + Username string `json:"username"` +} + +type GiteaRepository struct { + Id int64 `json:"id"` + Owner GiteaAccount `json:"owner"` + Name string `json:"name"` + FullName string `json:"full_name"` + Description string `json:"description"` + Private bool `json:"private"` + Fork bool `json:"private"` + HtmlUrl string `json:"html_url"` + SshUrl string `json:"ssh_url"` + CloneUrl string `json:"clone_url"` + Website string `json:"website"` + StarsCount int64 `json:"stars_count"` + ForksCount int64 `json:"forks_count"` + WatchersCount int64 `json:"watchers_count"` + OpenIssuesCount int64 `json:"open_issues_count"` + DefaultBranch string `json:"default_branch"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` +} + +type GiteaNotification struct { + Secret string `json:"secret"` + Ref string `json:"ref"` + Before string `json:"before"` + After string `json:"after"` + CompareUrl string `json:"compare_url"` + Commits []GiteaCommit `json:"commits"` + Repository GiteaRepository `json:"repository"` + Pusher GiteaAccount `json:"pusher"` + Sender GiteaAccount `json:"sender"` +} + +func nomadToGiteaStatus(summary *nomad.TaskGroupSummary) gitea.StatusState { + if summary.Failed > 0 { + return gitea.StatusError + } + if summary.Lost > 0 || summary.Unknown > 0 { + return gitea.StatusFailure + } + if summary.Queued > 0 || summary.Starting > 0 || summary.Running > 0 { + return gitea.StatusPending + } + if summary.Complete > 0 { + return gitea.StatusSuccess + } + // When the job is just started, all the counters are = 0. + return gitea.StatusPending +} + +func notifSummary(notification *GiteaNotification) string { + return fmt.Sprintf("%s/%s:%s", notification.Repository.Owner.Username, notification.Repository.Name, notification.After) +} diff --git a/pkg/handler.go b/pkg/handler.go new file mode 100644 index 0000000..833fe5c --- /dev/null +++ b/pkg/handler.go @@ -0,0 +1,189 @@ +package pkg + +import ( + b64 "encoding/base64" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "golang.org/x/exp/slices" + "strings" + + nomad "github.com/hashicorp/nomad/api" +) + +type HookHandler struct { + Config *Config + Cluster *Cluster +} + +type BuildHandler struct { + Config *Config + Cluster *Cluster +} + +func (h HookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Hook only support POST requests", http.StatusBadRequest) + return + } + + q := r.URL.Query() + maybeToken, ok := q["token"] + if !ok || len(maybeToken) < 1 { + http.Error(w, "Missing query parameter 'token'. Try adding '?token=xxx'", http.StatusBadRequest) + return + } + token := maybeToken[0] + flavor := "default" + + var notification GiteaNotification + dec := json.NewDecoder(r.Body) + if err := dec.Decode(¬ification); err != nil { + http.Error(w, "Can't parse your request JSON", http.StatusBadRequest) + return + } + notifInfo := notifSummary(¬ification) + + log.Printf("Received gitea notification for %s\n", notifInfo) + + // Fetch our repo descriptor + kv := h.Cluster.Consul.KV() + encodedRepoUrl := b64.StdEncoding.EncodeToString([]byte(notification.Repository.CloneUrl)) + key := "albatros/" + encodedRepoUrl + log.Printf("Fetching key %s for %s\n", key, notifInfo) + pair, _, err := kv.Get(key, nil) + if err != nil { + http.Error(w, "Can't fetch the repo descriptor in Consul", http.StatusInternalServerError) + log.Printf("Job error for %s, can't fetch repo descriptor in consul: %+v\n", notifInfo, err) + return + } + if pair == nil || pair.Value == nil { + http.Error(w, "You must declare %s in Consul in order to build it", http.StatusForbidden) + return + } + // Parse our repo descriptor + var repoDesc ConsulSecret + if err = json.Unmarshal(pair.Value, &repoDesc); err != nil { + http.Error(w, "Can't decode your Consul configuration for this repo", http.StatusInternalServerError) + log.Printf("Job error for %s, can't unmarshal the Consul descriptor: %+v\n", notifInfo, err) + return + } + // Check token + if repoDesc.Hook.Token != token { + http.Error(w, "Your albatros token does not match the one declared in Consul", http.StatusForbidden) + return + } + + // Build job parameters for Nomad + meta := map[string]string{ + "REPO_URL": notification.Repository.CloneUrl, + "COMMIT": notification.After, + "FLAVOR": flavor, + // @FIXME: this code is not correct, this is a hack + "BRANCH": strings.ReplaceAll(notification.Ref, "refs/heads/", ""), + } + + // Check sender + payload := []byte{} + if slices.Contains(repoDesc.Trusted.Senders, notification.Sender.Username) { + log.Printf("Trusted build of %s as %s in the list of allowed senders, inject secrets\n", notifInfo, notification.Sender.Username) + // Write payload + payload = []byte(repoDesc.Inject) + } + + jobs := h.Cluster.Nomad.Jobs() + dres, _, err := jobs.Dispatch("builder", meta, payload, "albatros", &nomad.WriteOptions{}) + if err != nil { + http.Error(w, "Can't submit your job to Nomad", http.StatusInternalServerError) + } + log.Printf("Created job %s for %s\n", dres.DispatchedJobID, notifInfo) + + // Start a lifecycle observer to update gitea status + j := Job { + Cluster: h.Cluster, + Config: h.Config, + Notification: ¬ification, + Dispatch: dres, + Creds: &repoDesc.Gitea, + Flavor: flavor, + } + go j.follow() + + io.WriteString(w, dres.DispatchedJobID) +} + +func (b BuildHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + jobID, ok := q["job"] + if !ok || len(jobID) < 1 { + http.Error(w, "Missing query parameter 'job'. Try adding '?job=xxx'", http.StatusBadRequest) + return + } + + logType, ok := q["log"] + log.Printf("Follow logs for %s\n", jobID) + if !ok || len(logType) < 1 || !(logType[0] == "stdout" || logType[0] == "stderr") { + http.Error(w, "Missing or wrong query parameter 'log'.\nTry adding '?log=stdout' or '?log=stderr'", http.StatusBadRequest) + return + } + logFilter := logType[0] + + jobs := b.Cluster.Nomad.Jobs() + allocList, _, err := jobs.Allocations(jobID[0], true, &nomad.QueryOptions{}) + if err != nil { + http.Error(w, "Unable to fetch your job on Nomad. Might be garbage collected.", http.StatusInternalServerError) + return + } + if len(allocList) < 1 { + http.Error(w, "Job does not contain at least 1 allocation, can't display logs. Job might be garbage collected. ", http.StatusNotFound) + return + } + + myAllocStub := allocList[0] + + allocations := b.Cluster.Nomad.Allocations() + myAlloc, _, err := allocations.Info(myAllocStub.ID, &nomad.QueryOptions{}) + if err != nil { + http.Error(w, "Allocation does not exist anymore. Allocation might be garbage collected", http.StatusNotFound) + return + } + log.Printf("Selected alloc %s for job %s\n", myAlloc.ID, jobID) + + allocFS := b.Cluster.Nomad.AllocFS() + scancel := make(chan struct{}) + sframe, serr := allocFS.Logs(myAlloc, true, "executor", logFilter, "start", 0, scancel, &nomad.QueryOptions{}) + + // stream logs to client's browser + io.WriteString(w, fmt.Sprintf("--- Streaming logs for %s, job can take some times to start, be patient ---\n", jobID)) +build_loop: + for { + select { + case <-r.Context().Done(): + // client disconnect, cleaning + break build_loop + case nomadErr := <-serr: + // an error occured in nomad, inform user and clean + _, _ = io.WriteString(w, fmt.Sprintf("\n--- Broken stream: %+v ---\n", nomadErr)) + break build_loop + case chunk := <-sframe: + // we get some data from nomad, send it to the client + for i := 0; i < len(chunk.Data); { + written, err := w.Write(chunk.Data[i:]) + w.(http.Flusher).Flush() // flush the buffer to send it right now + i += written + if err == io.EOF { + _, _ = io.WriteString(w, "\n--- End of file :-) ---\n") + break build_loop + } else if err != nil { + _, _ = io.WriteString(w, fmt.Sprintf("\n--- Broken stream: %+v ---\n", err)) + break build_loop + } + } + } + } + + log.Printf("Connection closed, cleaning listeners for %s\n", jobID) + scancel <- struct{}{} +} diff --git a/pkg/job.go b/pkg/job.go new file mode 100644 index 0000000..af43e96 --- /dev/null +++ b/pkg/job.go @@ -0,0 +1,84 @@ +package pkg + +import ( + "log" + "code.gitea.io/sdk/gitea" + nomad "github.com/hashicorp/nomad/api" +) + +type Job struct { + Cluster *Cluster + Config *Config + Notification *GiteaNotification + Dispatch *nomad.JobDispatchResponse + Creds *SecretGitea + Flavor string +} + +func (j *Job) follow() { + notifInfo := notifSummary(j.Notification) + + log.Printf("[lifecycle] Commit to build: %s, Gitea URL: %s\n", notifInfo, j.Creds.Url) + // init Gitea + forge, err := gitea.NewClient(j.Creds.Url, gitea.SetToken(j.Creds.Token)) + if err != nil { + log.Printf("Unable to create gitea client for %s: %+v\n", notifInfo, err) + return + } + + // get job's deployment + jobs := j.Cluster.Nomad.Jobs() + queryOpt := nomad.QueryOptions{ + AllowStale: false, + } + + safeguard := 1000 + for ; safeguard > 0; safeguard-- { + // Blocking fetch on deployment info + job, meta, err := jobs.Summary(j.Dispatch.DispatchedJobID, &queryOpt) + if err != nil { + log.Printf("[lifecycle] can't fetch job for %s: %+v\n", notifInfo, err) + break + } + queryOpt.WaitIndex = meta.LastIndex + + summary, ok := job.Summary["runner"] + if !ok { + log.Printf("[lifecycle] Job %s for %s must contain a 'runner' task\n", job.JobID, notifInfo) + break + } + log.Printf("[lifecycle] Task status for job %s on %s: %+v\n", job.JobID, notifInfo, summary) + + // Compute new job state + state := nomadToGiteaStatus(&summary) + + // Try updating Gitea commit status + _, _, err = forge.CreateStatus( + j.Notification.Repository.Owner.Username, + j.Notification.Repository.Name, + j.Notification.After, + gitea.CreateStatusOption{ + State: state, + TargetURL: j.Config.AlbatrosURL + "/build?log=stderr&job=" + j.Dispatch.DispatchedJobID, + Description: j.Flavor, + Context: "Albatros", + }) + + if err != nil { + log.Printf("[lifecycle] can't update gitea repo %s for job %s: %+v\n", notifInfo, job.JobID, err) + } + + // Continue the loop only if the job is pending + if state != gitea.StatusPending { + log.Printf("Job %s for %s terminated with status %s\n", job.JobID, notifInfo, state) + break + } + } + + if safeguard == 0 { + // To avoid dangerous infinite loops, we put an upperbound here + // of 1k refresh here. Reaching this limit will allow us to know + // that something did not work as expected... + log.Printf("!!! [lifecycle] we refreshed 1k times the job of %s and it's still running, giving up...\n", notifInfo) + } +} diff --git a/pkg/secret.go b/pkg/secret.go new file mode 100644 index 0000000..7a9e1e3 --- /dev/null +++ b/pkg/secret.go @@ -0,0 +1,21 @@ +package pkg + +type SecretHook struct { + Token string `json:"token"` +} + +type SecretGitea struct { + Url string `json:"url"` + Token string `json:"token"` +} + +type SecretTrusted struct { + Senders []string `json:"senders"` +} + +type ConsulSecret struct { + Hook SecretHook `json:"hook"` + Gitea SecretGitea `json:"gitea"` + Trusted SecretTrusted `json:"trusted"` + Inject string `json:"inject"` +}