albatros/cmd/container.go

724 lines
17 KiB
Go

package cmd
import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
"github.com/docker/cli/cli/config"
"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/name"
"github.com/google/go-containerregistry/pkg/v1/layout"
"github.com/google/go-containerregistry/pkg/v1/remote"
"github.com/containers/image/v5/copy"
"github.com/containers/image/v5/signature"
"github.com/containers/image/v5/transports/alltransports"
"github.com/spf13/cobra"
"gocloud.dev/blob"
_ "gocloud.dev/blob/s3blob"
)
var pctx *signature.PolicyContext
const distributionPrefix = "v2"
//---
//--- vendored from crane
// headerTransport sets headers on outgoing requests.
type headerTransport struct {
httpHeaders map[string]string
inner http.RoundTripper
}
// RoundTrip implements http.RoundTripper.
func (ht *headerTransport) RoundTrip(in *http.Request) (*http.Response, error) {
for k, v := range ht.httpHeaders {
if http.CanonicalHeaderKey(k) == "User-Agent" {
// Docker sets this, which is annoying, since we're not docker.
// We might want to revisit completely ignoring this.
continue
}
in.Header.Set(k, v)
}
return ht.inner.RoundTrip(in)
}
//---
//--- Image converter
type OCIImageManifest struct {
SchemaVersion int `json:"schemaVersion"`
MediaType string `json:"mediaType"`
Config OCIRef `json:"config"`
Layers []OCIRef `json:layers"`
}
func LoadOCIImageManifest(path string) (*OCIImageManifest, error) {
fd, err := os.Open(path)
if err != nil {
return nil, err
}
defer fd.Close()
b, err := io.ReadAll(fd)
if err != nil {
return nil, err
}
oim := &OCIImageManifest{}
err = json.Unmarshal(b, oim)
if err != nil {
return nil, err
}
return oim, nil
}
type OCIPlatform struct {
Architecture string `json:"architecture"`
OS string `json:"os"`
}
type OCIRef struct {
MediaType string `json:"mediaType"`
Digest string `json:"digest"`
Size int `json:"size"`
Platform *OCIPlatform `json:"platform,omitempty"`
}
type OCIImageIndex struct {
SchemaVersion int `json:"schemaVersion"`
MediaType string `json:"mediaType"`
Manifests []OCIRef `json:"manifests"`
}
func NewOCIImageIndex(o *OCIMultiArch) (OCIImageIndex, error) {
idx := OCIImageIndex{
SchemaVersion: 2,
MediaType: "application/vnd.oci.image.index.v1+json",
Manifests: []OCIRef{},
}
for _, syst := range o.images {
if syst.index == nil {
return idx, errors.New(fmt.Sprintf("Missing index content for %s. Check that the file exists and that it can be parsed", syst.path))
}
if len(syst.index.Manifests) != 1 {
return idx, errors.New(fmt.Sprintf("%s is not a system image as it does not contain exactly one manifest in its index", syst.path))
}
if syst.index.Manifests[0].Platform == nil {
return idx, errors.New(fmt.Sprintf("Manifest for %s has not been enriched with its platform (os+arch). This is a logic bug.", syst.path))
}
idx.Manifests = append(idx.Manifests, syst.index.Manifests[0])
}
return idx, nil
}
func LoadOCIImageIndex(path string) (*OCIImageIndex, error) {
fd, err := os.Open(filepath.Join(path, "index.json"))
if err != nil {
return nil, err
}
defer fd.Close()
b, err := io.ReadAll(fd)
if err != nil {
return nil, err
}
var idx OCIImageIndex
err = json.Unmarshal(b, &idx)
if err != nil {
return nil, err
}
return &idx, nil
}
func (i OCIImageIndex) WriteTo(root string) error {
txt, err := json.Marshal(i)
if err != nil {
return err
}
txtPath := filepath.Join(root, "index.json")
err = os.WriteFile(txtPath, txt, 0644)
return err
}
type OCILayout struct {
ImageLayoutVersion string `json:"imageLayoutVersion"`
}
func NewOCILayout() OCILayout {
return OCILayout{"1.0.0"}
}
func (o OCILayout) WriteTo(root string) error {
txt, err := json.Marshal(o)
if err != nil {
return err
}
txtPath := filepath.Join(root, "oci-layout")
err = os.WriteFile(txtPath, txt, 0644)
return err
}
//---- Alba logic
type OCISystemImage struct {
path string
os string
arch string
index *OCIImageIndex
}
func NewOCISystemImage(path, os, arch string) (OCISystemImage, error) {
si := OCISystemImage{
path: path,
os: os,
arch: arch,
index: nil,
}
man, err := LoadOCIImageIndex(path)
if err != nil {
return si, err
}
si.index = man
if len(si.index.Manifests) != 1 {
return si, errors.New(fmt.Sprintf("%s index has not exactly one manifest, this can't be a system image\n", path))
}
// Enrich manifest
si.index.Manifests[0].Platform = &OCIPlatform{
Architecture: arch,
OS: os,
}
return si, nil
}
func (o OCISystemImage) String() string {
return fmt.Sprintf("[oci system image; os:%s, arch:%s, path:%s]", o.os, o.arch, o.path)
}
type OCIMultiArch struct {
Name string
Tag string
path string
multi *OCIImageIndex
images []OCISystemImage
}
func NewOCIMultiArch(nametag string) (*OCIMultiArch, error) {
ntspl := strings.Split(nametag, ":")
if len(ntspl) != 2 {
return nil, errors.New("nametag must be of the form 'name:tag'")
}
tmp, err := os.MkdirTemp("", "alba-oci")
if err != nil {
return nil, err
}
return &OCIMultiArch{
Name: ntspl[0],
Tag: ntspl[1],
path: tmp,
images: []OCISystemImage{},
}, nil
}
func (o *OCIMultiArch) Close() error {
return os.RemoveAll(o.path)
}
func (o *OCIMultiArch) LoadFromDockerArchives(path string) error {
fmt.Printf("-- load docker archives --\n")
imgDirList, err := os.ReadDir(path)
if err != nil {
return err
}
// Convert all docker archives to oci images
for idx, imgFile := range imgDirList {
imgFilename := imgFile.Name()
// Check extension
archivePath := filepath.Join(path, imgFilename)
if !(strings.HasSuffix(imgFilename, ".tar.gz") || strings.HasSuffix(imgFilename, ".tgz")) {
fmt.Printf("skipping %s: not a tar.gz archive\n", archivePath)
continue
}
// Check we can extract info from the filename
infos := strings.Split(imgFilename, ".")
if len(infos) != 3 && len(infos) != 4 {
fmt.Printf("skipping %s: format is <goos>.<goarch>.tar.gz\n", archivePath)
continue
}
goos := infos[0]
goarch := infos[1]
// Prepare the image conversion
srcRef, err := alltransports.ParseImageName(fmt.Sprintf("docker-archive:%s", archivePath))
if err != nil {
return err
}
ociPath := filepath.Join(o.path, strconv.FormatInt(int64(idx), 10))
dstRef, err := alltransports.ParseImageName(fmt.Sprintf("oci:%s", ociPath))
if err != nil {
return err
}
// Convert the docker archive to an oci image
_, err = copy.Image(context.Background(), pctx, dstRef, srcRef, &copy.Options{})
if err != nil {
return err
}
img, err := NewOCISystemImage(ociPath, goos, goarch)
if err != nil {
return err
}
fmt.Printf("%s -> %s\n", archivePath, img)
o.images = append(o.images, img)
}
return nil
}
func (o *OCIMultiArch) MergeSystemImages() error {
fmt.Printf("-- merge system images --\n")
multiArchRoot := filepath.Join(o.path, "multi")
// Create the root folder for the OCI multi arch image
err := os.Mkdir(multiArchRoot, 0750)
if err != nil {
return err
}
// Create the blob folder for the multi arch image
multiBlobPath := filepath.Join(multiArchRoot, "blobs", "sha256")
err = os.MkdirAll(multiBlobPath, 0750)
if err != nil {
return err
}
// Create the oci-layout file
err = NewOCILayout().WriteTo(multiArchRoot)
if err != nil {
return err
}
fmt.Printf("-> oci-layout\n")
// Create the index.json
idx, err := NewOCIImageIndex(o)
if err != nil {
return err
}
err = idx.WriteTo(multiArchRoot)
if err != nil {
return err
}
o.multi = &idx
fmt.Printf("-> index.json\n")
// Copy blobs
for _, img := range o.images {
blobCounter := 0
blobPath := filepath.Join(img.path, "blobs", "sha256")
blobList, err := os.ReadDir(blobPath)
if err != nil {
return err
}
for _, blobFile := range blobList {
src, err := os.Open(filepath.Join(blobPath, blobFile.Name()))
if err != nil {
return err
}
defer src.Close()
dst, err := os.Create(filepath.Join(multiBlobPath, blobFile.Name()))
if err != nil {
return err
}
defer dst.Close()
_, err = io.Copy(dst, src)
if err != nil {
return err
}
blobCounter += 1
}
fmt.Printf("%s -> %s (%d items)\n", blobPath, multiBlobPath, blobCounter)
}
return nil
}
func (o *OCIMultiArch) UploadImageRegistry(tag string) error {
fmt.Println("--- push to registry ---")
multiArchRoot := filepath.Join(o.path, "multi")
img, err := layout.ImageIndexFromPath(multiArchRoot)
if err != nil {
return fmt.Errorf("loading %s as OCI layout: %w", multiArchRoot, err)
}
ref, err := name.ParseReference(tag)
if err != nil {
return err
}
// Build transport
transport := remote.DefaultTransport.(*http.Transport).Clone()
var rt http.RoundTripper = transport
cf, err := config.Load(os.Getenv("DOCKER_CONFIG"))
if err != nil {
fmt.Printf("failed to read config file: %v", err)
} else if len(cf.HTTPHeaders) != 0 {
rt = &headerTransport{
inner: rt,
httpHeaders: cf.HTTPHeaders,
}
}
if err := remote.WriteIndex(ref, img, remote.WithTransport(rt), remote.WithAuthFromKeychain(authn.DefaultKeychain)); err != nil {
return err
}
return nil
}
func (o *OCIMultiArch) UploadImageS3(buck *blob.Bucket) error {
fmt.Printf("-- push to the s3 target --\n")
// FS paths
multiArchRoot := filepath.Join(o.path, "multi")
multiBlobPath := filepath.Join(multiArchRoot, "blobs", "sha256")
// Registry URLs
urlPrefix := filepath.Join("v2", o.Name)
manifestPrefix := filepath.Join(urlPrefix, "manifests")
manifestUrl := filepath.Join(manifestPrefix, o.Tag)
blobPrefix := filepath.Join(urlPrefix, "blobs")
// Utils
cutDigestPrefix := len("sha256:")
// Checks
if o.multi == nil {
return errors.New("You try to upload a multiarch image that has not been built yet...")
}
// Upload index
src := filepath.Join(multiArchRoot, "index.json")
up, err := NewUploadFromFS(buck, src)
if err != nil {
return err
}
err = up.ContentType("application/vnd.oci.image.index.v1+json").UploadTo(manifestUrl)
if err != nil {
return err
}
fmt.Printf("[index] index.json -> %s\n", manifestUrl)
// Upload the same index but with its sha256
fd, err := os.Open(src)
if err != nil {
return err
}
defer fd.Close()
h := sha256.New()
if _, err := io.Copy(h, fd); err != nil {
return err
}
digest := fmt.Sprintf("%x", h.Sum(nil))
src = filepath.Join(multiArchRoot, "index.json")
upSha, err := NewUploadFromFS(buck, src)
if err != nil {
return err
}
dst := path.Join(manifestPrefix, "sha256:"+digest)
err = upSha.ContentType("application/vnd.oci.image.index.v1+json").UploadTo(dst)
if err != nil {
return err
}
fmt.Printf("[index] index.json -> %s\n", dst)
// Upload manifest of each system image
for _, m := range o.multi.Manifests {
src := filepath.Join(multiBlobPath, m.Digest[cutDigestPrefix:])
upDigest, err := NewUploadFromFS(buck, src)
if err != nil {
return err
}
dst := path.Join(manifestPrefix, m.Digest)
err = upDigest.ContentType("application/vnd.oci.image.manifest.v1+json").UploadTo(dst)
if err != nil {
return err
}
fmt.Printf("[manifest %s %s] %s -> %s\n", m.Platform.OS, m.Platform.Architecture, src, dst)
}
// Upload blobs from each system image
for _, ref := range o.multi.Manifests {
fullManifest := filepath.Join(multiBlobPath, ref.Digest[cutDigestPrefix:])
m, err := LoadOCIImageManifest(fullManifest)
if err != nil {
return err
}
// Upload config's blob
src := filepath.Join(multiBlobPath, m.Config.Digest[cutDigestPrefix:])
upConf, err := NewUploadFromFS(buck, src)
if err != nil {
return err
}
dst := path.Join(blobPrefix, m.Config.Digest)
err = upConf.UploadTo(dst)
if err != nil {
return err
}
fmt.Printf("[config %s %s] %s -> %s\n", ref.Platform.OS, ref.Platform.Architecture, src, dst)
// Upload layers' blob
counter := 0
for _, lref := range m.Layers {
src := filepath.Join(multiBlobPath, lref.Digest[cutDigestPrefix:])
upLayer, err := NewUploadFromFS(buck, src)
if err != nil {
return err
}
dst := path.Join(blobPrefix, lref.Digest)
err = upLayer.UploadTo(dst)
if err != nil {
return err
}
counter += 1
}
fmt.Printf("[blob %s %s] %d items sent\n", ref.Platform.OS, ref.Platform.Architecture, counter)
}
return nil
}
type StaticRegistryManager struct {
name string
buck *blob.Bucket
images []ImageDescriptor
}
type ImageDescriptor struct {
Tag string
Date time.Time
}
func NewStaticRegistryManager(buck *blob.Bucket, name string) *StaticRegistryManager {
return &StaticRegistryManager{
buck: buck,
name: name,
}
}
type TagList struct {
Name string `json:"name"`
Tags []string `json:"tags"`
}
func (l *StaticRegistryManager) Scan() error {
digestPrefix := "sha256:"
cutDigestPrefix := len(digestPrefix)
iter := l.buck.List(&blob.ListOptions{
Prefix: fmt.Sprintf("v2/%s/manifests/", l.name),
Delimiter: "/",
})
for {
obj, err := iter.Next(context.Background())
if err == io.EOF {
break
}
if err != nil {
return err
}
ksplit := strings.Split(obj.Key, "/")
if len(ksplit) < 1 {
return errors.New(fmt.Sprintf("Invalid key name %s", obj.Key))
}
fname := ksplit[len(ksplit)-1]
if len(fname) >= cutDigestPrefix && fname[:cutDigestPrefix] == digestPrefix {
// we ignore sha256 addressed manifests
continue
}
id := ImageDescriptor{
Tag: fname,
Date: obj.ModTime,
}
l.images = append(l.images, id)
}
return nil
}
func (l *StaticRegistryManager) TagList() TagList {
// Sort by date desc
sort.Slice(l.images, func(i, j int) bool {
return l.images[i].Date.After(l.images[j].Date)
})
// Build tagList
tagList := TagList{
Name: l.name,
}
for _, img := range l.images {
tagList.Tags = append(tagList.Tags, img.Tag)
}
return tagList
}
func (l *StaticRegistryManager) UpdateTagList() error {
fmt.Printf("--- update taglist ---\n")
err := l.Scan()
if err != nil {
return err
}
tagList := l.TagList()
fmt.Printf("computed tag list: %v\n", tagList)
txt, err := json.Marshal(tagList)
if err != nil {
return err
}
dst := path.Join("v2", l.name, "tags", "list")
err = NewUploadFromByte(l.buck, txt).ContentType("application/json").UploadTo(dst)
if err != nil {
return err
}
fmt.Printf("taglist -> %s\n", dst)
return nil
}
//---
//--- Command logic
var containerCmd = &cobra.Command{
Use: "container",
Short: "Manage container images",
Long: "Publish software on an S3 target following the OCI specification",
}
var containerTag string
var containerPublishCmd = &cobra.Command{
Use: "push [folder] [remote]", // https://gocloud.dev/howto/blob/#s3-compatible
Short: "Publish a container image",
Long: "Copy .tar.gz files in the specified folder on the S3 target so that they match the OCI distribution specification",
Args: cobra.ExactArgs(2),
Run: func(cmd *cobra.Command, args []string) {
localPath := args[0]
remotePath := args[1]
dockerProto := "docker://"
dockerProtoCut := len(dockerProto)
oi, err := NewOCIMultiArch(containerTag)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer oi.Close()
if err = oi.LoadFromDockerArchives(localPath); err != nil {
fmt.Println(err)
os.Exit(1)
}
if err = oi.MergeSystemImages(); err != nil {
fmt.Println(err)
os.Exit(1)
}
if strings.HasPrefix(remotePath, "s3://") {
// open bucket
bucket, err := blob.OpenBucket(context.Background(), remotePath)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer bucket.Close()
// upload image
if err = oi.UploadImageS3(bucket); err != nil {
fmt.Println(err)
os.Exit(1)
}
// update tag
if err = NewStaticRegistryManager(bucket, oi.Name).UpdateTagList(); err != nil {
fmt.Println(err)
os.Exit(1)
}
} else if strings.HasPrefix(remotePath, dockerProto) {
if err = oi.UploadImageRegistry(remotePath[dockerProtoCut:]); err != nil {
fmt.Println(err)
os.Exit(1)
}
} else {
fmt.Printf("Protocol not supported for remote path %s. Supported transports are s3:// and docker://\n", remotePath)
os.Exit(1)
}
fmt.Printf("✅ push succeeded\n")
},
}
func init() {
var err error
// @FIXME: this policy feature is probably here for something, so we should not bypass it
// but as there is no documentation around the error on how to easily and quickly doing it the right way,
// I am just disabling it. Thanks again security people for yet another convoluted incomprehensible undocumented stuff.
policy := &signature.Policy{Default: []signature.PolicyRequirement{signature.NewPRInsecureAcceptAnything()}}
pctx, err = signature.NewPolicyContext(policy)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
containerPublishCmd.Flags().StringVarP(&containerTag, "tag", "t", "", "Tag of the project, eg. albatros:0.9")
containerPublishCmd.MarkFlagRequired("tag")
containerCmd.AddCommand(containerPublishCmd)
RootCmd.AddCommand(containerCmd)
}