200 lines
6.2 KiB
Go
200 lines
6.2 KiB
Go
package pkg
|
|
|
|
import (
|
|
b64 "encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"golang.org/x/exp/slices"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"strings"
|
|
|
|
nomad "github.com/hashicorp/nomad/api"
|
|
)
|
|
|
|
type HookHandler struct {
|
|
Config *Config
|
|
Cluster *Cluster
|
|
}
|
|
|
|
type BuildHandler struct {
|
|
Config *Config
|
|
Cluster *Cluster
|
|
}
|
|
|
|
func (h HookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
http.Error(w, "Hook only support POST requests", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
q := r.URL.Query()
|
|
maybeToken, ok := q["token"]
|
|
if !ok || len(maybeToken) < 1 {
|
|
http.Error(w, "Missing query parameter 'token'. Try adding '?token=xxx'", http.StatusBadRequest)
|
|
return
|
|
}
|
|
token := maybeToken[0]
|
|
flavor := "default"
|
|
|
|
var notification GiteaNotification
|
|
dec := json.NewDecoder(r.Body)
|
|
if err := dec.Decode(¬ification); err != nil {
|
|
http.Error(w, "Can't parse your request JSON", http.StatusBadRequest)
|
|
return
|
|
}
|
|
notifInfo := notifSummary(¬ification)
|
|
|
|
log.Printf("Received gitea notification for %s\n", notifInfo)
|
|
|
|
// Fetch our repo descriptor
|
|
kv := h.Cluster.Consul.KV()
|
|
encodedRepoUrl := b64.StdEncoding.EncodeToString([]byte(notification.Repository.CloneUrl))
|
|
key := "albatros/" + encodedRepoUrl
|
|
log.Printf("Fetching key %s for %s\n", key, notifInfo)
|
|
pair, _, err := kv.Get(key, nil)
|
|
if err != nil {
|
|
http.Error(w, "Can't fetch the repo descriptor in Consul", http.StatusInternalServerError)
|
|
log.Printf("Job error for %s, can't fetch repo descriptor in consul: %+v\n", notifInfo, err)
|
|
return
|
|
}
|
|
if pair == nil || pair.Value == nil {
|
|
http.Error(w, "You must declare %s in Consul in order to build it", http.StatusForbidden)
|
|
return
|
|
}
|
|
// Parse our repo descriptor
|
|
var repoDesc ConsulSecret
|
|
if err = json.Unmarshal(pair.Value, &repoDesc); err != nil {
|
|
http.Error(w, "Can't decode your Consul configuration for this repo", http.StatusInternalServerError)
|
|
log.Printf("Job error for %s, can't unmarshal the Consul descriptor: %+v\n", notifInfo, err)
|
|
return
|
|
}
|
|
// Check token
|
|
if repoDesc.Hook.Token != token {
|
|
http.Error(w, "Your albatros token does not match the one declared in Consul", http.StatusForbidden)
|
|
return
|
|
}
|
|
|
|
var branch, tag string
|
|
// BRANCH OR TAG?
|
|
if strings.HasPrefix(notification.Ref, "refs/heads/") {
|
|
branch = strings.ReplaceAll(notification.Ref, "refs/heads/", "")
|
|
} else if strings.HasPrefix(notification.Ref, "refs/tags/") {
|
|
tag = strings.ReplaceAll(notification.Ref, "refs/tags/", "")
|
|
} else {
|
|
http.Error(w, fmt.Sprintf("ref '%s' is not a tag or a branch ref, albatros does not know what to do."), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// Build job parameters for Nomad
|
|
meta := map[string]string{
|
|
"REPO_URL": notification.Repository.CloneUrl,
|
|
"COMMIT": notification.After,
|
|
"FLAVOR": flavor,
|
|
"BRANCH": branch,
|
|
"TAG": tag,
|
|
}
|
|
|
|
// Check sender
|
|
payload := []byte{}
|
|
if slices.Contains(repoDesc.Trusted.Senders, notification.Sender.Username) {
|
|
log.Printf("Trusted build of %s as %s in the list of allowed senders, inject secrets\n", notifInfo, notification.Sender.Username)
|
|
// Write payload
|
|
payload = []byte(repoDesc.Inject)
|
|
}
|
|
|
|
jobs := h.Cluster.Nomad.Jobs()
|
|
dres, _, err := jobs.Dispatch("builder", meta, payload, "albatros", &nomad.WriteOptions{})
|
|
if err != nil {
|
|
http.Error(w, "Can't submit your job to Nomad", http.StatusInternalServerError)
|
|
}
|
|
log.Printf("Created job %s for %s\n", dres.DispatchedJobID, notifInfo)
|
|
|
|
// Start a lifecycle observer to update gitea status
|
|
j := Job{
|
|
Cluster: h.Cluster,
|
|
Config: h.Config,
|
|
Notification: ¬ification,
|
|
Dispatch: dres,
|
|
Creds: &repoDesc.Gitea,
|
|
Flavor: flavor,
|
|
}
|
|
go j.follow()
|
|
|
|
io.WriteString(w, dres.DispatchedJobID)
|
|
}
|
|
|
|
func (b BuildHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
q := r.URL.Query()
|
|
jobID, ok := q["job"]
|
|
if !ok || len(jobID) < 1 {
|
|
http.Error(w, "Missing query parameter 'job'. Try adding '?job=xxx'", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
logType, ok := q["log"]
|
|
log.Printf("Follow logs for %s\n", jobID)
|
|
if !ok || len(logType) < 1 || !(logType[0] == "stdout" || logType[0] == "stderr") {
|
|
http.Error(w, "Missing or wrong query parameter 'log'.\nTry adding '?log=stdout' or '?log=stderr'", http.StatusBadRequest)
|
|
return
|
|
}
|
|
logFilter := logType[0]
|
|
|
|
jobs := b.Cluster.Nomad.Jobs()
|
|
allocList, _, err := jobs.Allocations(jobID[0], true, &nomad.QueryOptions{})
|
|
if err != nil {
|
|
http.Error(w, "Unable to fetch your job on Nomad. Might be garbage collected.", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
if len(allocList) < 1 {
|
|
http.Error(w, "Job does not contain at least 1 allocation, can't display logs. Job might be garbage collected. ", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
myAllocStub := allocList[0]
|
|
|
|
allocations := b.Cluster.Nomad.Allocations()
|
|
myAlloc, _, err := allocations.Info(myAllocStub.ID, &nomad.QueryOptions{})
|
|
if err != nil {
|
|
http.Error(w, "Allocation does not exist anymore. Allocation might be garbage collected", http.StatusNotFound)
|
|
return
|
|
}
|
|
log.Printf("Selected alloc %s for job %s\n", myAlloc.ID, jobID)
|
|
|
|
allocFS := b.Cluster.Nomad.AllocFS()
|
|
scancel := make(chan struct{})
|
|
sframe, serr := allocFS.Logs(myAlloc, true, "executor", logFilter, "start", 0, scancel, &nomad.QueryOptions{})
|
|
|
|
// stream logs to client's browser
|
|
io.WriteString(w, fmt.Sprintf("--- Streaming logs for %s, job can take some times to start, be patient ---\n", jobID))
|
|
build_loop:
|
|
for {
|
|
select {
|
|
case <-r.Context().Done():
|
|
// client disconnect, cleaning
|
|
break build_loop
|
|
case nomadErr := <-serr:
|
|
// an error occured in nomad, inform user and clean
|
|
_, _ = io.WriteString(w, fmt.Sprintf("\n--- Broken stream: %+v ---\n", nomadErr))
|
|
break build_loop
|
|
case chunk := <-sframe:
|
|
// we get some data from nomad, send it to the client
|
|
for i := 0; i < len(chunk.Data); {
|
|
written, err := w.Write(chunk.Data[i:])
|
|
w.(http.Flusher).Flush() // flush the buffer to send it right now
|
|
i += written
|
|
if err == io.EOF {
|
|
_, _ = io.WriteString(w, "\n--- End of file :-) ---\n")
|
|
break build_loop
|
|
} else if err != nil {
|
|
_, _ = io.WriteString(w, fmt.Sprintf("\n--- Broken stream: %+v ---\n", err))
|
|
break build_loop
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
log.Printf("Connection closed, cleaning listeners for %s\n", jobID)
|
|
scancel <- struct{}{}
|
|
}
|