package main import ( "encoding/json" "fmt" nomad "github.com/hashicorp/nomad/api" "io" "log" "net/http" "strings" //"code.gitea.io/sdk/gitea" ) type GitUser struct { Name string `json:"name"` Email string `json:"email"` Username string `json:username"` } type GiteaCommit struct { Id string `json:"id"` Message string `json:"message"` Url string `json:"string"` Author GitUser `json:"author"` Committer GitUser `json:"committer"` Timestamp string `json:"timestamp"` } type GiteaAccount struct { Id int64 `json:"id"` Login string `json:"login"` FullName string `json:"full_name"` Email string `json:"email"` AvatarUrl string `json:"avatar_url"` Username string `json:"username"` } type GiteaRepository struct { Id int64 `json:"id"` Owner GiteaAccount `json:"owner"` Description string `json:"description"` Private bool `json:"private"` Fork bool `json:"private"` HtmlUrl string `json:"html_url"` SshUrl string `json:"ssh_url"` CloneUrl string `json:"clone_url"` Website string `json:"website"` StarsCount int64 `json:"stars_count"` ForksCount int64 `json:"forks_count"` WatchersCount int64 `json:"watchers_count"` OpenIssuesCount int64 `json:"open_issues_count"` DefaultBranch string `json:"default_branch"` CreatedAt string `json:"created_at"` UpdatedAt string `json:"updated_at"` } type GiteaNotification struct { Secret string `json:"secret"` Ref string `json:"ref"` Before string `json:"before"` After string `json:"after"` CompareUrl string `json:"compare_url"` Commits []GiteaCommit `json:"commits"` Repository GiteaRepository `json:"repository"` Pusher GiteaAccount `json:"pusher"` Sender GiteaAccount `json:"sender"` } func hook(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Hook only support POST requests", http.StatusBadRequest) } q := r.URL.Query() token, ok := q["token"] if !ok || len(token) < 1 { http.Error(w, "Missing query parameter 'token'. Try adding '?token=xxx'", http.StatusBadRequest) return } flavor := "default" //@FIXME check for token in consul var notification GiteaNotification dec := json.NewDecoder(r.Body) if err := dec.Decode(¬ification); err != nil { http.Error(w, "Can't parse your request JSON", http.StatusBadRequest) return } log.Printf("Gitea notification: %+v\n", notification) meta := map[string]string{ "REPO_URL": notification.Repository.CloneUrl, "COMMIT": notification.After, "FLAVOR": flavor, // @FIXME: this code is not correct, this is a hack "BRANCH": strings.ReplaceAll(notification.Ref, "refs/heads/", ""), } // @FIXME logic on how to inject secrets securely // 1. Check senders // 2. Transform the consul object into a nomad payload jobs := NomadClient.Jobs() dres, dmeta, err := jobs.Dispatch("builder", meta, []byte{}, "albatros", &nomad.WriteOptions{}) if err != nil { http.Error(w, "Can't submit your job to Nomad", http.StatusInternalServerError) } log.Printf("Query info: %+v\n", dmeta) log.Printf("Job info: %+v\n", dres) io.WriteString(w, dres.DispatchedJobID) } func build(w http.ResponseWriter, r *http.Request) { q := r.URL.Query() jobID, ok := q["job"] if !ok || len(jobID) < 1 { http.Error(w, "Missing query parameter 'job'. Try adding '?job=xxx'", http.StatusBadRequest) return } logType, ok := q["log"] log.Printf("%+v\n", q) if !ok || len(logType) < 1 || !(logType[0] == "stdout" || logType[0] == "stderr") { http.Error(w, "Missing or wrong query parameter 'log'.\nTry adding '?log=stdout' or '?log=stderr'", http.StatusBadRequest) return } logFilter := logType[0] jobs := NomadClient.Jobs() allocList, _, err := jobs.Allocations(jobID[0], true, &nomad.QueryOptions{}) if err != nil { http.Error(w, "Unable to fetch your job on Nomad. Might be garbage collected.", http.StatusInternalServerError) return } if len(allocList) < 1 { http.Error(w, "Job does not contain at least 1 allocation, can't display logs. Job might be garbage collected. ", http.StatusNotFound) return } myAllocStub := allocList[0] allocations := NomadClient.Allocations() myAlloc, _, err := allocations.Info(myAllocStub.ID, &nomad.QueryOptions{}) if err != nil { http.Error(w, "Allocation does not exist anymore. Allocation might be garbage collected", http.StatusNotFound) return } log.Printf("Alloc: %+v\n", myAlloc) allocFS := NomadClient.AllocFS() scancel := make(chan struct{}) sframe, serr := allocFS.Logs(myAlloc, true, "runner", logFilter, "start", 0, scancel, &nomad.QueryOptions{}) build_loop: for { select { case <- r.Context().Done(): // client disconnect, cleaning break build_loop case nomadErr := <-serr: // an error occured in nomad, inform user and clean _, _ = io.WriteString(w, fmt.Sprintf("\nBroken stream: %+v\n", nomadErr)) break build_loop case chunk := <-sframe: // we get some data from nomad, send it to the client for i := 0; i < len(chunk.Data); { written, err := w.Write(chunk.Data[i:]) w.(http.Flusher).Flush() // flush the buffer to send it right now i += written if err == io.EOF { _, _ = io.WriteString(w, "End of file :-)") break build_loop } else if err != nil { _, _ = io.WriteString(w, fmt.Sprintf("\nBroken stream: %+v\n", err)) break build_loop } } } } log.Printf("Cleaning %+v\n", myAlloc) scancel <- struct{}{} } var NomadClient *nomad.Client func main() { var err error nomadConfig := nomad.DefaultConfig() nomadConfig.Namespace = "ci" // @TODO read env for encrypted parameters NomadClient, err = nomad.NewClient(nomadConfig) if err != nil { log.Fatal("Unable to connect to Nomad, check your config and setup") } http.HandleFunc("/hook", hook) http.HandleFunc("/build", build) fmt.Println("albatros (:8080)") if err = http.ListenAndServe(":8080", nil); err != nil { log.Fatal("Can't start HTTP server") } }