albatros/pkg/handler.go

201 lines
6.2 KiB
Go

package pkg
import (
b64 "encoding/base64"
"encoding/json"
"fmt"
"golang.org/x/exp/slices"
"io"
"log"
"net/http"
"strings"
nomad "github.com/hashicorp/nomad/api"
)
type HookHandler struct {
Config *Config
Cluster *Cluster
}
type BuildHandler struct {
Config *Config
Cluster *Cluster
}
func (h HookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Hook only support POST requests", http.StatusBadRequest)
return
}
q := r.URL.Query()
maybeToken, ok := q["token"]
if !ok || len(maybeToken) < 1 {
http.Error(w, "Missing query parameter 'token'. Try adding '?token=xxx'", http.StatusBadRequest)
return
}
token := maybeToken[0]
flavor := "default"
var notification GiteaNotification
dec := json.NewDecoder(r.Body)
if err := dec.Decode(&notification); err != nil {
http.Error(w, "Can't parse your request JSON", http.StatusBadRequest)
return
}
notifInfo := notifSummary(&notification)
log.Printf("Received gitea notification for %s\n", notifInfo)
// Fetch our repo descriptor
kv := h.Cluster.Consul.KV()
encodedRepoUrl := b64.StdEncoding.EncodeToString([]byte(notification.Repository.CloneUrl))
key := "albatros/" + encodedRepoUrl
log.Printf("Fetching key %s for %s\n", key, notifInfo)
pair, _, err := kv.Get(key, nil)
if err != nil {
http.Error(w, "Can't fetch the repo descriptor in Consul", http.StatusInternalServerError)
log.Printf("Job error for %s, can't fetch repo descriptor in consul: %+v\n", notifInfo, err)
return
}
if pair == nil || pair.Value == nil {
http.Error(w, "You must declare %s in Consul in order to build it", http.StatusForbidden)
return
}
// Parse our repo descriptor
var repoDesc ConsulSecret
if err = json.Unmarshal(pair.Value, &repoDesc); err != nil {
http.Error(w, "Can't decode your Consul configuration for this repo", http.StatusInternalServerError)
log.Printf("Job error for %s, can't unmarshal the Consul descriptor: %+v\n", notifInfo, err)
return
}
// Check token
if repoDesc.Hook.Token != token {
http.Error(w, "Your albatros token does not match the one declared in Consul", http.StatusForbidden)
return
}
var branch, tag string
// BRANCH OR TAG?
if strings.HasPrefix(notification.Ref, "refs/heads/") {
branch = strings.ReplaceAll(notification.Ref, "refs/heads/", "")
} else if strings.HasPrefix(notification.Ref, "refs/tags/") {
tag = strings.ReplaceAll(notification.Ref, "refs/tags/", "")
} else {
http.Error(w, fmt.Sprintf("ref '%s' is not a tag or a branch ref, albatros does not know what to do."), http.StatusBadRequest)
return
}
// Build job parameters for Nomad
meta := map[string]string{
"REPO_URL": notification.Repository.CloneUrl,
"COMMIT": notification.After,
"FLAVOR": flavor,
"BRANCH": branch,
"TAG": tag,
}
// Check sender
payload := []byte{}
if slices.Contains(repoDesc.Trusted.Senders, notification.Sender.Username) {
log.Printf("Trusted build of %s as %s in the list of allowed senders, inject secrets\n", notifInfo, notification.Sender.Username)
// Write payload
payload = []byte(repoDesc.Inject)
}
jobs := h.Cluster.Nomad.Jobs()
dres, _, err := jobs.Dispatch("builder", meta, payload, "albatros", &nomad.WriteOptions{})
if err != nil {
http.Error(w, "Can't submit your job to Nomad", http.StatusInternalServerError)
}
log.Printf("Created job %s for %s\n", dres.DispatchedJobID, notifInfo)
// Start a lifecycle observer to update gitea status
j := Job{
Cluster: h.Cluster,
Config: h.Config,
Notification: &notification,
Dispatch: dres,
Creds: &repoDesc.Gitea,
Flavor: flavor,
}
go j.follow()
io.WriteString(w, dres.DispatchedJobID)
}
func (b BuildHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
jobID, ok := q["job"]
if !ok || len(jobID) < 1 {
http.Error(w, "Missing query parameter 'job'. Try adding '?job=xxx'", http.StatusBadRequest)
return
}
logType, ok := q["log"]
log.Printf("Follow logs for %s\n", jobID)
if !ok || len(logType) < 1 || !(logType[0] == "stdout" || logType[0] == "stderr") {
http.Error(w, "Missing or wrong query parameter 'log'.\nTry adding '?log=stdout' or '?log=stderr'", http.StatusBadRequest)
return
}
logFilter := logType[0]
jobs := b.Cluster.Nomad.Jobs()
allocList, _, err := jobs.Allocations(jobID[0], true, &nomad.QueryOptions{})
if err != nil {
http.Error(w, "Unable to fetch your job on Nomad. Might be garbage collected.", http.StatusInternalServerError)
return
}
if len(allocList) < 1 {
http.Error(w, "Job does not contain at least 1 allocation, can't display logs. Job might be garbage collected. ", http.StatusNotFound)
return
}
myAllocStub := allocList[0]
allocations := b.Cluster.Nomad.Allocations()
myAlloc, _, err := allocations.Info(myAllocStub.ID, &nomad.QueryOptions{})
if err != nil {
http.Error(w, "Allocation does not exist anymore. Allocation might be garbage collected", http.StatusNotFound)
return
}
log.Printf("Selected alloc %s for job %s\n", myAlloc.ID, jobID)
allocFS := b.Cluster.Nomad.AllocFS()
scancel := make(chan struct{})
sframe, serr := allocFS.Logs(myAlloc, true, "executor", logFilter, "start", 0, scancel, &nomad.QueryOptions{})
// stream logs to client's browser
io.WriteString(w, fmt.Sprintf("--- Streaming logs for %s, job can take some times to start, be patient ---\n", jobID))
build_loop:
for {
select {
case <-r.Context().Done():
// client disconnect, cleaning
break build_loop
case nomadErr := <-serr:
// an error occured in nomad, inform user and clean
_, _ = io.WriteString(w, fmt.Sprintf("\n--- Broken stream: %+v ---\n", nomadErr))
break build_loop
case chunk := <-sframe:
// we get some data from nomad, send it to the client
for i := 0; i < len(chunk.Data); {
written, err := w.Write(chunk.Data[i:])
w.(http.Flusher).Flush() // flush the buffer to send it right now
i += written
if err == io.EOF {
_, _ = io.WriteString(w, "\n--- End of file :-) ---\n")
break build_loop
} else if err != nil {
_, _ = io.WriteString(w, fmt.Sprintf("\n--- Broken stream: %+v ---\n", err))
break build_loop
}
}
}
}
log.Printf("Connection closed, cleaning listeners for %s\n", jobID)
scancel <- struct{}{}
}