diff --git a/main.go b/main.go index 4eac0b6..3d79ef8 100644 --- a/main.go +++ b/main.go @@ -1,16 +1,17 @@ package main + import ( - b64 "encoding/base64" + "code.gitea.io/sdk/gitea" + b64 "encoding/base64" "encoding/json" "fmt" + "github.com/caarlos0/env/v7" + consul "github.com/hashicorp/consul/api" nomad "github.com/hashicorp/nomad/api" - consul "github.com/hashicorp/consul/api" "io" "log" "net/http" "strings" - "github.com/caarlos0/env/v7" - "code.gitea.io/sdk/gitea" ) type GitUser struct { @@ -40,8 +41,8 @@ type GiteaAccount struct { type GiteaRepository struct { Id int64 `json:"id"` Owner GiteaAccount `json:"owner"` - Name string `json:"name"` - FullName string `json:"full_name"` + Name string `json:"name"` + FullName string `json:"full_name"` Description string `json:"description"` Private bool `json:"private"` Fork bool `json:"private"` @@ -71,111 +72,112 @@ type GiteaNotification struct { } type SecretHook struct { - Token string `json:"token"` + Token string `json:"token"` } type SecretGitea struct { - Url string `json:"url"` - Token string `json:"token"` + Url string `json:"url"` + Token string `json:"token"` } type SecretTrusted struct { - Senders []string `json:"senders"` + Senders []string `json:"senders"` } type ConsulSecret struct { - Hook SecretHook `json:"hook"` - Gitea SecretGitea `json:"gitea"` - Trusted SecretTrusted `json:"trusted"` - Inject string `json:"inject"` + Hook SecretHook `json:"hook"` + Gitea SecretGitea `json:"gitea"` + Trusted SecretTrusted `json:"trusted"` + Inject string `json:"inject"` } func nomadToGiteaStatus(summary *nomad.TaskGroupSummary) gitea.StatusState { - if summary.Failed > 0 { - return gitea.StatusError - } - if summary.Lost > 0 || summary.Unknown > 0 { - return gitea.StatusFailure - } - if summary.Queued > 0 || summary.Starting > 0 || summary.Running > 0 { - return gitea.StatusPending - } - if summary.Complete > 0 { - return gitea.StatusSuccess - } - // When the job is just started, all the counters are = 0. - return gitea.StatusPending + if summary.Failed > 0 { + return gitea.StatusError + } + if summary.Lost > 0 || summary.Unknown > 0 { + return gitea.StatusFailure + } + if summary.Queued > 0 || summary.Starting > 0 || summary.Running > 0 { + return gitea.StatusPending + } + if summary.Complete > 0 { + return gitea.StatusSuccess + } + // When the job is just started, all the counters are = 0. + return gitea.StatusPending +} + +func notifSummary(notification *GiteaNotification) string { + return fmt.Sprintf("%s/%s:%s", notification.Repository.Owner.Username, notification.Repository.Name, notification.After) } func lifecycle(notification *GiteaNotification, dispatch *nomad.JobDispatchResponse, giteaCreds *SecretGitea) { - log.Printf("[lifecyle] Gitea URL: %s\n", giteaCreds.Url) - // init Gitea - forge, err := gitea.NewClient(giteaCreds.Url, gitea.SetToken(giteaCreds.Token)) - if err != nil { - log.Printf("Unable to create gitea client: %+v\n", err) - return - } + notifInfo := notifSummary(notification) - // get job's deployment - jobs := NomadClient.Jobs() - queryOpt := nomad.QueryOptions{ - AllowStale: true, - } - - safeguard := 1000 - for ; safeguard > 0; safeguard-- { - // Blocking fetch on deployment info - job, meta, err := jobs.Summary(dispatch.DispatchedJobID, &queryOpt) - if err != nil { - log.Printf("[lifecycle] can't fetch job: %+v\n", err) - break - } - queryOpt.WaitIndex = meta.LastIndex + log.Printf("[lifecyle] Commit to build: %s, Gitea URL: %s\n", notifInfo, giteaCreds.Url) + // init Gitea + forge, err := gitea.NewClient(giteaCreds.Url, gitea.SetToken(giteaCreds.Token)) + if err != nil { + log.Printf("Unable to create gitea client for %s: %+v\n", notifInfo, err) + return + } - summary, ok := job.Summary["runner"]; - if !ok { - log.Printf("[lifecycle] your job %s must contain a 'runner' task\n", job.JobID) - break - } - log.Printf("[lifecycle] Summary for job %s: %+v\n", job.JobID, summary) + // get job's deployment + jobs := NomadClient.Jobs() + queryOpt := nomad.QueryOptions{ + AllowStale: true, + } - // Compute new job state - state := nomadToGiteaStatus(&summary) + safeguard := 1000 + for ; safeguard > 0; safeguard-- { + // Blocking fetch on deployment info + job, meta, err := jobs.Summary(dispatch.DispatchedJobID, &queryOpt) + if err != nil { + log.Printf("[lifecycle] can't fetch job for %s: %+v\n", notifInfo, err) + break + } + queryOpt.WaitIndex = meta.LastIndex - // Try updating Gitea commit status - _, _, err = forge.CreateStatus( - notification.Repository.Owner.Username, - notification.Repository.Name, - notification.After, - gitea.CreateStatusOption { - State: state, - TargetURL: GlobalConfig.AlbatrosURL + "/build?log=stderr&job=" + dispatch.DispatchedJobID, - Description: "build", - Context: "Albatros", - }) + summary, ok := job.Summary["runner"] + if !ok { + log.Printf("[lifecycle] Job %s for %s must contain a 'runner' task\n", job.JobID, notifInfo) + break + } + log.Printf("[lifecycle] Task status for job %s on %s: %+v\n", job.JobID, notifInfo, summary) - if err != nil { - log.Printf( - "[lifecycle] can't update gitea repo %s/%s:%s: %+v\n", - notification.Repository.Owner.Username, - notification.Repository.Name, - notification.After, - err) - } + // Compute new job state + state := nomadToGiteaStatus(&summary) - // Continue the loop only if the job is pending - if state != gitea.StatusPending { - log.Printf("Job %s teminated with status %s\n", job.JobID, state) - break - } - } + // Try updating Gitea commit status + _, _, err = forge.CreateStatus( + notification.Repository.Owner.Username, + notification.Repository.Name, + notification.After, + gitea.CreateStatusOption{ + State: state, + TargetURL: GlobalConfig.AlbatrosURL + "/build?log=stderr&job=" + dispatch.DispatchedJobID, + Description: "build", + Context: "Albatros", + }) - if safeguard == 0 { - // To avoid dangerous infinite loops, we put an upperbound here - // of 1k refresh here. Reaching this limit will allow us to know - // that something did not work as expected... - log.Println("!!! [lifecycle] we refreshed 1k times this deployment and it's still running, giving up...") - } + if err != nil { + log.Printf("[lifecycle] can't update gitea repo %s for job %s: %+v\n", notifInfo, job.JobID, err) + } + + // Continue the loop only if the job is pending + if state != gitea.StatusPending { + log.Printf("Job %s for %s terminated with status %s\n", job.JobID, notifInfo, state) + break + } + } + + if safeguard == 0 { + // To avoid dangerous infinite loops, we put an upperbound here + // of 1k refresh here. Reaching this limit will allow us to know + // that something did not work as expected... + log.Printf("!!! [lifecycle] we refreshed 1k times the job of %s and it's still running, giving up...\n", notifInfo) + } } func hook(w http.ResponseWriter, r *http.Request) { @@ -189,7 +191,7 @@ func hook(w http.ResponseWriter, r *http.Request) { http.Error(w, "Missing query parameter 'token'. Try adding '?token=xxx'", http.StatusBadRequest) return } - token := maybeToken[0] + token := maybeToken[0] flavor := "default" var notification GiteaNotification @@ -198,36 +200,37 @@ func hook(w http.ResponseWriter, r *http.Request) { http.Error(w, "Can't parse your request JSON", http.StatusBadRequest) return } + notifInfo := notifSummary(¬ification) - log.Printf("Gitea notification: %+v\n", notification) + log.Printf("Received gitea notification for %s\n", notifInfo) - // Fetch our repo descriptor - kv := ConsulClient.KV() - encodedRepoUrl := b64.StdEncoding.EncodeToString([]byte(notification.Repository.CloneUrl)) - key := "albatros/"+encodedRepoUrl - log.Printf("Fetching key %s\n", key) - pair, _, err := kv.Get(key, nil) - if err != nil { - http.Error(w, "Can't fetch the repo descriptor in Consul", http.StatusInternalServerError) - return - } - if pair == nil || pair.Value == nil { - http.Error(w, "You must declare your repo in Consul in order to build it", http.StatusForbidden) - return - } - // Parse our repo descriptor - var repoDesc ConsulSecret - if err = json.Unmarshal(pair.Value, &repoDesc); err != nil { - http.Error(w, "Can't decode your Consul configuration for this repo", http.StatusInternalServerError) - return - } - // Check token - if repoDesc.Hook.Token != token { - http.Error(w, "Your albatros token does not match the one declared in Consul", http.StatusForbidden) - return - } - - // Build job parameters for Nomad + // Fetch our repo descriptor + kv := ConsulClient.KV() + encodedRepoUrl := b64.StdEncoding.EncodeToString([]byte(notification.Repository.CloneUrl)) + key := "albatros/" + encodedRepoUrl + log.Printf("Fetching key %s for %s\n", key, notifInfo) + pair, _, err := kv.Get(key, nil) + if err != nil { + http.Error(w, "Can't fetch the repo descriptor in Consul", http.StatusInternalServerError) + return + } + if pair == nil || pair.Value == nil { + http.Error(w, "You must declare %s in Consul in order to build it", http.StatusForbidden) + return + } + // Parse our repo descriptor + var repoDesc ConsulSecret + if err = json.Unmarshal(pair.Value, &repoDesc); err != nil { + http.Error(w, "Can't decode your Consul configuration for this repo", http.StatusInternalServerError) + return + } + // Check token + if repoDesc.Hook.Token != token { + http.Error(w, "Your albatros token does not match the one declared in Consul", http.StatusForbidden) + return + } + + // Build job parameters for Nomad meta := map[string]string{ "REPO_URL": notification.Repository.CloneUrl, "COMMIT": notification.After, @@ -241,16 +244,15 @@ func hook(w http.ResponseWriter, r *http.Request) { // 2. Transform the consul object into a nomad payload jobs := NomadClient.Jobs() - dres, dmeta, err := jobs.Dispatch("builder", meta, []byte{}, "albatros", &nomad.WriteOptions{}) + dres, _, err := jobs.Dispatch("builder", meta, []byte{}, "albatros", &nomad.WriteOptions{}) if err != nil { http.Error(w, "Can't submit your job to Nomad", http.StatusInternalServerError) } - log.Printf("Query info: %+v\n", dmeta) - log.Printf("Job info: %+v\n", dres) + log.Printf("Created job %s for %s\n", dres.DispatchedJobID, notifInfo) - // Start a lifecycle observer to update gitea status - // @FIXME: need to inject gitea descriptor - go lifecycle(¬ification, dres, &repoDesc.Gitea) + // Start a lifecycle observer to update gitea status + // @FIXME: need to inject gitea descriptor + go lifecycle(¬ification, dres, &repoDesc.Gitea) io.WriteString(w, dres.DispatchedJobID) } @@ -264,7 +266,7 @@ func build(w http.ResponseWriter, r *http.Request) { } logType, ok := q["log"] - log.Printf("%+v\n", q) + log.Printf("Follow logs for %s\n", jobID) if !ok || len(logType) < 1 || !(logType[0] == "stdout" || logType[0] == "stderr") { http.Error(w, "Missing or wrong query parameter 'log'.\nTry adding '?log=stdout' or '?log=stderr'", http.StatusBadRequest) return @@ -290,28 +292,28 @@ func build(w http.ResponseWriter, r *http.Request) { http.Error(w, "Allocation does not exist anymore. Allocation might be garbage collected", http.StatusNotFound) return } - log.Printf("Alloc: %+v\n", myAlloc) + log.Printf("Selected alloc %s for job %s\n", myAlloc.ID, jobID) allocFS := NomadClient.AllocFS() scancel := make(chan struct{}) sframe, serr := allocFS.Logs(myAlloc, true, "runner", logFilter, "start", 0, scancel, &nomad.QueryOptions{}) - // stream logs to client's browser - build_loop: + // stream logs to client's browser +build_loop: for { select { - case <- r.Context().Done(): - // client disconnect, cleaning - break build_loop + case <-r.Context().Done(): + // client disconnect, cleaning + break build_loop case nomadErr := <-serr: // an error occured in nomad, inform user and clean _, _ = io.WriteString(w, fmt.Sprintf("\nBroken stream: %+v\n", nomadErr)) break build_loop case chunk := <-sframe: - // we get some data from nomad, send it to the client + // we get some data from nomad, send it to the client for i := 0; i < len(chunk.Data); { written, err := w.Write(chunk.Data[i:]) - w.(http.Flusher).Flush() // flush the buffer to send it right now + w.(http.Flusher).Flush() // flush the buffer to send it right now i += written if err == io.EOF { _, _ = io.WriteString(w, "End of file :-)") @@ -324,52 +326,53 @@ func build(w http.ResponseWriter, r *http.Request) { } } - log.Printf("Cleaning %+v\n", myAlloc) - scancel <- struct{}{} + log.Printf("Connection closed, cleaning listeners for %s\n", jobID) + scancel <- struct{}{} } var NomadClient *nomad.Client var ConsulClient *consul.Client type config struct { - AlbatrosURL string `env:"ALBATROS_URL,required"` + AlbatrosURL string `env:"ALBATROS_URL,required"` // @TODO get nomad config from env - // @TODO get consul config from env + // @TODO get consul config from env } + var GlobalConfig config func main() { var err error - // init config - if err = env.Parse(&GlobalConfig); err != nil { - log.Fatal(fmt.Sprintf("unable to parse config, error: %+v\n", err)) - return - } - log.Printf("Albatros public URL: %s\n", GlobalConfig.AlbatrosURL) + // init config + if err = env.Parse(&GlobalConfig); err != nil { + log.Fatal(fmt.Sprintf("unable to parse config, error: %+v\n", err)) + return + } + log.Printf("Albatros public URL: %s\n", GlobalConfig.AlbatrosURL) - // init nomad + // init nomad nomadConfig := nomad.DefaultConfig() nomadConfig.Namespace = "ci" NomadClient, err = nomad.NewClient(nomadConfig) if err != nil { log.Fatal("Unable to connect to Nomad, check your config and setup") - return + return } - // init consul - consulConfig := consul.DefaultConfig() - ConsulClient, err = consul.NewClient(consulConfig) - if err != nil { + // init consul + consulConfig := consul.DefaultConfig() + ConsulClient, err = consul.NewClient(consulConfig) + if err != nil { log.Fatal("Unable to connect to Consul, check your config and setup") - return - } + return + } - // init webserver + // init webserver http.HandleFunc("/hook", hook) http.HandleFunc("/build", build) - // launch + // launch log.Println("Listen on :8080") if err = http.ListenAndServe(":8080", nil); err != nil { log.Fatal("Can't start HTTP server")