package main import ( "code.gitea.io/sdk/gitea" b64 "encoding/base64" "encoding/json" "fmt" "github.com/caarlos0/env/v7" consul "github.com/hashicorp/consul/api" nomad "github.com/hashicorp/nomad/api" "golang.org/x/exp/slices" "io" "log" "net/http" "strings" ) type GitUser struct { Name string `json:"name"` Email string `json:"email"` Username string `json:username"` } type GiteaCommit struct { Id string `json:"id"` Message string `json:"message"` Url string `json:"string"` Author GitUser `json:"author"` Committer GitUser `json:"committer"` Timestamp string `json:"timestamp"` } type GiteaAccount struct { Id int64 `json:"id"` Login string `json:"login"` FullName string `json:"full_name"` Email string `json:"email"` AvatarUrl string `json:"avatar_url"` Username string `json:"username"` } type GiteaRepository struct { Id int64 `json:"id"` Owner GiteaAccount `json:"owner"` Name string `json:"name"` FullName string `json:"full_name"` Description string `json:"description"` Private bool `json:"private"` Fork bool `json:"private"` HtmlUrl string `json:"html_url"` SshUrl string `json:"ssh_url"` CloneUrl string `json:"clone_url"` Website string `json:"website"` StarsCount int64 `json:"stars_count"` ForksCount int64 `json:"forks_count"` WatchersCount int64 `json:"watchers_count"` OpenIssuesCount int64 `json:"open_issues_count"` DefaultBranch string `json:"default_branch"` CreatedAt string `json:"created_at"` UpdatedAt string `json:"updated_at"` } type GiteaNotification struct { Secret string `json:"secret"` Ref string `json:"ref"` Before string `json:"before"` After string `json:"after"` CompareUrl string `json:"compare_url"` Commits []GiteaCommit `json:"commits"` Repository GiteaRepository `json:"repository"` Pusher GiteaAccount `json:"pusher"` Sender GiteaAccount `json:"sender"` } type SecretHook struct { Token string `json:"token"` } type SecretGitea struct { Url string `json:"url"` Token string `json:"token"` } type SecretTrusted struct { Senders []string `json:"senders"` } type ConsulSecret struct { Hook SecretHook `json:"hook"` Gitea SecretGitea `json:"gitea"` Trusted SecretTrusted `json:"trusted"` Inject string `json:"inject"` } func nomadToGiteaStatus(summary *nomad.TaskGroupSummary) gitea.StatusState { if summary.Failed > 0 { return gitea.StatusError } if summary.Lost > 0 || summary.Unknown > 0 { return gitea.StatusFailure } if summary.Queued > 0 || summary.Starting > 0 || summary.Running > 0 { return gitea.StatusPending } if summary.Complete > 0 { return gitea.StatusSuccess } // When the job is just started, all the counters are = 0. return gitea.StatusPending } func notifSummary(notification *GiteaNotification) string { return fmt.Sprintf("%s/%s:%s", notification.Repository.Owner.Username, notification.Repository.Name, notification.After) } func lifecycle(notification *GiteaNotification, dispatch *nomad.JobDispatchResponse, giteaCreds *SecretGitea) { notifInfo := notifSummary(notification) log.Printf("[lifecycle] Commit to build: %s, Gitea URL: %s\n", notifInfo, giteaCreds.Url) // init Gitea forge, err := gitea.NewClient(giteaCreds.Url, gitea.SetToken(giteaCreds.Token)) if err != nil { log.Printf("Unable to create gitea client for %s: %+v\n", notifInfo, err) return } // get job's deployment jobs := NomadClient.Jobs() queryOpt := nomad.QueryOptions{ AllowStale: true, } safeguard := 1000 for ; safeguard > 0; safeguard-- { // Blocking fetch on deployment info job, meta, err := jobs.Summary(dispatch.DispatchedJobID, &queryOpt) if err != nil { log.Printf("[lifecycle] can't fetch job for %s: %+v\n", notifInfo, err) break } queryOpt.WaitIndex = meta.LastIndex summary, ok := job.Summary["runner"] if !ok { log.Printf("[lifecycle] Job %s for %s must contain a 'runner' task\n", job.JobID, notifInfo) break } log.Printf("[lifecycle] Task status for job %s on %s: %+v\n", job.JobID, notifInfo, summary) // Compute new job state state := nomadToGiteaStatus(&summary) // Try updating Gitea commit status _, _, err = forge.CreateStatus( notification.Repository.Owner.Username, notification.Repository.Name, notification.After, gitea.CreateStatusOption{ State: state, TargetURL: GlobalConfig.AlbatrosURL + "/build?log=stderr&job=" + dispatch.DispatchedJobID, Description: "build", Context: "Albatros", }) if err != nil { log.Printf("[lifecycle] can't update gitea repo %s for job %s: %+v\n", notifInfo, job.JobID, err) } // Continue the loop only if the job is pending if state != gitea.StatusPending { log.Printf("Job %s for %s terminated with status %s\n", job.JobID, notifInfo, state) break } } if safeguard == 0 { // To avoid dangerous infinite loops, we put an upperbound here // of 1k refresh here. Reaching this limit will allow us to know // that something did not work as expected... log.Printf("!!! [lifecycle] we refreshed 1k times the job of %s and it's still running, giving up...\n", notifInfo) } } func hook(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Hook only support POST requests", http.StatusBadRequest) } q := r.URL.Query() maybeToken, ok := q["token"] if !ok || len(maybeToken) < 1 { http.Error(w, "Missing query parameter 'token'. Try adding '?token=xxx'", http.StatusBadRequest) return } token := maybeToken[0] flavor := "default" var notification GiteaNotification dec := json.NewDecoder(r.Body) if err := dec.Decode(¬ification); err != nil { http.Error(w, "Can't parse your request JSON", http.StatusBadRequest) return } notifInfo := notifSummary(¬ification) log.Printf("Received gitea notification for %s\n", notifInfo) // Fetch our repo descriptor kv := ConsulClient.KV() encodedRepoUrl := b64.StdEncoding.EncodeToString([]byte(notification.Repository.CloneUrl)) key := "albatros/" + encodedRepoUrl log.Printf("Fetching key %s for %s\n", key, notifInfo) pair, _, err := kv.Get(key, nil) if err != nil { http.Error(w, "Can't fetch the repo descriptor in Consul", http.StatusInternalServerError) return } if pair == nil || pair.Value == nil { http.Error(w, "You must declare %s in Consul in order to build it", http.StatusForbidden) return } // Parse our repo descriptor var repoDesc ConsulSecret if err = json.Unmarshal(pair.Value, &repoDesc); err != nil { http.Error(w, "Can't decode your Consul configuration for this repo", http.StatusInternalServerError) return } // Check token if repoDesc.Hook.Token != token { http.Error(w, "Your albatros token does not match the one declared in Consul", http.StatusForbidden) return } // Build job parameters for Nomad meta := map[string]string{ "REPO_URL": notification.Repository.CloneUrl, "COMMIT": notification.After, "FLAVOR": flavor, // @FIXME: this code is not correct, this is a hack "BRANCH": strings.ReplaceAll(notification.Ref, "refs/heads/", ""), } // Check sender payload := []byte{} if slices.Contains(repoDesc.Trusted.Senders, notification.Sender.Username) { log.Printf("Trusted build of %s as %s in the list of allowed senders, inject secrets\n", notifInfo, notification.Sender.Username) // Write payload payload = []byte(repoDesc.Inject) } jobs := NomadClient.Jobs() dres, _, err := jobs.Dispatch("builder", meta, payload, "albatros", &nomad.WriteOptions{}) if err != nil { http.Error(w, "Can't submit your job to Nomad", http.StatusInternalServerError) } log.Printf("Created job %s for %s\n", dres.DispatchedJobID, notifInfo) // Start a lifecycle observer to update gitea status go lifecycle(¬ification, dres, &repoDesc.Gitea) io.WriteString(w, dres.DispatchedJobID) } func build(w http.ResponseWriter, r *http.Request) { q := r.URL.Query() jobID, ok := q["job"] if !ok || len(jobID) < 1 { http.Error(w, "Missing query parameter 'job'. Try adding '?job=xxx'", http.StatusBadRequest) return } logType, ok := q["log"] log.Printf("Follow logs for %s\n", jobID) if !ok || len(logType) < 1 || !(logType[0] == "stdout" || logType[0] == "stderr") { http.Error(w, "Missing or wrong query parameter 'log'.\nTry adding '?log=stdout' or '?log=stderr'", http.StatusBadRequest) return } logFilter := logType[0] jobs := NomadClient.Jobs() allocList, _, err := jobs.Allocations(jobID[0], true, &nomad.QueryOptions{}) if err != nil { http.Error(w, "Unable to fetch your job on Nomad. Might be garbage collected.", http.StatusInternalServerError) return } if len(allocList) < 1 { http.Error(w, "Job does not contain at least 1 allocation, can't display logs. Job might be garbage collected. ", http.StatusNotFound) return } myAllocStub := allocList[0] allocations := NomadClient.Allocations() myAlloc, _, err := allocations.Info(myAllocStub.ID, &nomad.QueryOptions{}) if err != nil { http.Error(w, "Allocation does not exist anymore. Allocation might be garbage collected", http.StatusNotFound) return } log.Printf("Selected alloc %s for job %s\n", myAlloc.ID, jobID) allocFS := NomadClient.AllocFS() scancel := make(chan struct{}) sframe, serr := allocFS.Logs(myAlloc, true, "runner", logFilter, "start", 0, scancel, &nomad.QueryOptions{}) // stream logs to client's browser build_loop: for { select { case <-r.Context().Done(): // client disconnect, cleaning break build_loop case nomadErr := <-serr: // an error occured in nomad, inform user and clean _, _ = io.WriteString(w, fmt.Sprintf("\nBroken stream: %+v\n", nomadErr)) break build_loop case chunk := <-sframe: // we get some data from nomad, send it to the client for i := 0; i < len(chunk.Data); { written, err := w.Write(chunk.Data[i:]) w.(http.Flusher).Flush() // flush the buffer to send it right now i += written if err == io.EOF { _, _ = io.WriteString(w, "End of file :-)") break build_loop } else if err != nil { _, _ = io.WriteString(w, fmt.Sprintf("\nBroken stream: %+v\n", err)) break build_loop } } } } log.Printf("Connection closed, cleaning listeners for %s\n", jobID) scancel <- struct{}{} } var NomadClient *nomad.Client var ConsulClient *consul.Client type config struct { AlbatrosURL string `env:"ALBATROS_URL,required"` NomadAddr string `env:"NOMAD_ADDR"` NomadClientCert string `env:"NOMAD_CLIENT_CERT"` NomadClientKey string `env:"NOMAD_CLIENT_KEY"` NomadCACert string `env:"NOMAD_CACERT"` ConsulAddr string `env:"CONSUL_HTTP_ADDR"` ConsulClientCert string `env:"CONSUL_CLIENT_CERT"` ConsulClientKey string `env:"CONSUL_CLIENT_KEY"` ConsulCACert string `env:"CONSUL_CACERT"` } var GlobalConfig config func initConfig() { if err := env.Parse(&GlobalConfig); err != nil { log.Fatal(fmt.Sprintf("unable to parse config, error: %+v\n", err)) } log.Printf("Albatros public URL: %s\n", GlobalConfig.AlbatrosURL) if GlobalConfig.NomadAddr != "" { isTLS := GlobalConfig.NomadClientCert != "" && GlobalConfig.NomadClientKey != "" && GlobalConfig.NomadCACert != "" log.Printf("Nomad URL: %s, TLS: %t\n", GlobalConfig.NomadAddr, isTLS) } else { log.Println("Use Nomad default configuration") } if GlobalConfig.ConsulAddr != "" { isTLS := GlobalConfig.ConsulClientCert != "" && GlobalConfig.ConsulClientKey != "" && GlobalConfig.ConsulCACert != "" log.Printf("Consul URL: %s, TLS: %t\n", GlobalConfig.ConsulAddr, isTLS) } else { log.Println("Use Consul default configuration") } } func initNomad() { var err error nomadConfig := nomad.DefaultConfig() nomadConfig.Namespace = "ci" nomadConfig.Address = GlobalConfig.NomadAddr nomadConfig.TLSConfig.CACert = GlobalConfig.NomadCACert nomadConfig.TLSConfig.ClientCert = GlobalConfig.NomadClientCert nomadConfig.TLSConfig.ClientKey = GlobalConfig.NomadClientKey NomadClient, err = nomad.NewClient(nomadConfig) if err != nil { log.Fatal("Unable to connect to Nomad, check your config and setup") } } func initConsul() { var err error consulConfig := consul.DefaultConfig() consulConfig.Address = GlobalConfig.ConsulAddr consulConfig.TLSConfig.CAFile = GlobalConfig.ConsulCACert consulConfig.TLSConfig.CertFile = GlobalConfig.ConsulClientCert consulConfig.TLSConfig.KeyFile = GlobalConfig.ConsulClientKey ConsulClient, err = consul.NewClient(consulConfig) if err != nil { log.Fatal("Unable to connect to Consul, check your config and setup") return } } func main() { var err error initConfig() initNomad() initConsul() // init webserver http.HandleFunc("/hook", hook) http.HandleFunc("/build", build) // launch log.Println("Listen on :8080") if err = http.ListenAndServe(":8080", nil); err != nil { log.Fatal("Can't start HTTP server") } }