Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions cloudimg/cloudimg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Package cloudimg streams OCI-packaged cloud images out of an OCI registry.
package cloudimg

import (
"cmp"
"context"
"errors"
"fmt"
"io"
"slices"

"github.com/cocoonstack/cocoon-common/manifest"
"github.com/cocoonstack/cocoon-common/ociutil"
)

// BlobReader abstracts reading a blob by digest.
type BlobReader interface {
ReadBlob(ctx context.Context, digest string) (io.ReadCloser, error)
}

// Stream concatenates disk layers sorted by title annotation.
func Stream(ctx context.Context, raw []byte, blobs BlobReader, w io.Writer) error {
m, err := manifest.Parse(raw)
if err != nil {
return fmt.Errorf("parse manifest: %w", err)
}
if kind := manifest.ClassifyParsed(m); kind != manifest.KindCloudImage {
return fmt.Errorf("manifest is %s, not a cloud image", kind)
}

return StreamParsed(ctx, m, blobs, w)
}

// StreamParsed streams disk layers from an already-parsed manifest.
func StreamParsed(ctx context.Context, m *manifest.OCIManifest, blobs BlobReader, w io.Writer) error {
disks := diskLayers(m.Layers)
if len(disks) == 0 {
return errors.New("cloud image manifest has no disk layers")
}

for _, layer := range disks {
if err := copyBlob(ctx, blobs, layer, w); err != nil {
return err
}
}
return nil
}

func diskLayers(layers []manifest.Descriptor) []manifest.Descriptor {
out := make([]manifest.Descriptor, 0, len(layers))
for _, l := range layers {
if manifest.IsDiskMediaType(l.MediaType) {
out = append(out, l)
}
}
slices.SortStableFunc(out, func(a, b manifest.Descriptor) int {
return cmp.Compare(a.Title(), b.Title())
})
return out
}

func copyBlob(ctx context.Context, blobs BlobReader, layer manifest.Descriptor, w io.Writer) error {
body, err := blobs.ReadBlob(ctx, layer.Digest)
if err != nil {
return fmt.Errorf("get blob %s: %w", layer.Digest, err)
}
defer func() { _ = body.Close() }()
return ociutil.CopyBlobExact(w, body, layer.Digest, layer.Size)
}
188 changes: 188 additions & 0 deletions cloudimg/cloudimg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package cloudimg

import (
"bytes"
"context"
"errors"
"io"
"strings"
"testing"

"github.com/cocoonstack/cocoon-common/manifest"
)

const (
diskBlobA = "AAAA"
diskBlobB = "BBBB"
// Real sha256 digests of the byte contents above; required by CopyBlobExact's digest check.
digestA = "sha256:63c1dd951ffedf6f7fd968ad4efa39b8ed584f162f46e715114ee184f8de9201"
digestB = "sha256:4a8d8134f29b0b7b60c126f5532bc9f5d9bb73037373cf6fb872d81f1dcefdfd"
)

var winManifest = `{
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"artifactType": "application/vnd.cocoonstack.os-image.v1+json",
"config": {"mediaType":"application/vnd.oci.empty.v1+json","digest":"sha256:00","size":2},
"layers": [
{
"mediaType": "application/vnd.cocoonstack.disk.qcow2.part",
"digest": "` + digestB + `",
"size": 4,
"annotations": {"org.opencontainers.image.title": "win.qcow2.01.qcow2.part"}
},
{
"mediaType": "text/plain",
"digest": "sha256:cc",
"size": 32,
"annotations": {"org.opencontainers.image.title": "SHA256SUMS"}
},
{
"mediaType": "application/vnd.cocoonstack.disk.qcow2.part",
"digest": "` + digestA + `",
"size": 4,
"annotations": {"org.opencontainers.image.title": "win.qcow2.00.qcow2.part"}
}
]
}`

// fakeBlobs is a tiny in-memory BlobReader for tests.
type fakeBlobs map[string][]byte

func (f fakeBlobs) ReadBlob(_ context.Context, digest string) (io.ReadCloser, error) {
data, ok := f[digest]
if !ok {
return nil, errors.New("blob not found: " + digest)
}
return io.NopCloser(bytes.NewReader(data)), nil
}

func TestStreamConcatenatesDiskLayersInTitleOrder(t *testing.T) {
blobs := fakeBlobs{
digestA: []byte(diskBlobA),
digestB: []byte(diskBlobB),
"sha256:cc": []byte("ignored-sha256sums"),
}

var out bytes.Buffer
if err := Stream(t.Context(), []byte(winManifest), blobs, &out); err != nil {
t.Fatalf("Stream: %v", err)
}
if got, want := out.String(), "AAAABBBB"; got != want {
t.Errorf("Stream output = %q, want %q", got, want)
}
}

func TestStreamRejectsContainerImage(t *testing.T) {
containerManifest := `{
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"config": {"mediaType":"application/vnd.oci.image.config.v1+json","digest":"sha256:00","size":1},
"layers": [{"mediaType":"application/vnd.oci.image.layer.v1.tar+gzip","digest":"sha256:11","size":1}]
}`
err := Stream(t.Context(), []byte(containerManifest), fakeBlobs{}, io.Discard)
if err == nil {
t.Fatal("expected error streaming container image")
}
if !strings.Contains(err.Error(), "not a cloud image") {
t.Errorf("error = %v, want %q substring", err, "not a cloud image")
}
}

func TestStreamRejectsManifestWithNoDiskLayers(t *testing.T) {
noDisk := `{
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"artifactType": "application/vnd.cocoonstack.os-image.v1+json",
"config": {"mediaType":"application/vnd.oci.empty.v1+json","digest":"sha256:00","size":2},
"layers": [{"mediaType":"text/plain","digest":"sha256:11","size":1}]
}`
err := Stream(t.Context(), []byte(noDisk), fakeBlobs{}, io.Discard)
if err == nil {
t.Fatal("expected error for manifest with no disk layers")
}
}

func TestDiskLayersFiltersAndSorts(t *testing.T) {
in := []manifest.Descriptor{
{MediaType: "text/plain", Annotations: map[string]string{manifest.AnnotationTitle: "SHA256SUMS"}},
{MediaType: manifest.MediaTypeDiskQcow2Part, Annotations: map[string]string{manifest.AnnotationTitle: "x.02.part"}},
{MediaType: manifest.MediaTypeDiskQcow2Part, Annotations: map[string]string{manifest.AnnotationTitle: "x.00.part"}},
{MediaType: manifest.MediaTypeDiskQcow2Part, Annotations: map[string]string{manifest.AnnotationTitle: "x.01.part"}},
}
got := diskLayers(in)
want := []string{"x.00.part", "x.01.part", "x.02.part"}
if len(got) != len(want) {
t.Fatalf("len = %d, want %d", len(got), len(want))
}
for i, w := range want {
if got[i].Title() != w {
t.Errorf("got[%d].Title() = %q, want %q", i, got[i].Title(), w)
}
}
}

// fakeCocoon captures the cocoon image import stdin payload so puller tests
// can assert what cocoon would have received.
type fakeCocoon struct {
importPayload bytes.Buffer
importName string
}

func (f *fakeCocoon) ImportImage(_ context.Context, name string) (io.WriteCloser, func() error, error) {
f.importName = name
return nopCloser{w: &f.importPayload}, func() error { return nil }, nil
}

type nopCloser struct{ w io.Writer }

func (n nopCloser) Write(p []byte) (int, error) { return n.w.Write(p) }
func (n nopCloser) Close() error { return nil }

// fakeDownloader implements snapshot.Downloader from a static manifest +
// blob map.
type fakeDownloader struct {
manifest []byte
contentType string
blobs map[string][]byte
}

func (f *fakeDownloader) GetManifest(_ context.Context, _, _ string) ([]byte, string, error) {
return f.manifest, f.contentType, nil
}

func (f *fakeDownloader) GetBlob(_ context.Context, _, digest string) (io.ReadCloser, error) {
data, ok := f.blobs[digest]
if !ok {
return nil, errors.New("blob not found: " + digest)
}
return io.NopCloser(bytes.NewReader(data)), nil
}

func TestPullerPipesAssembledDiskToCocoonImport(t *testing.T) {
dl := &fakeDownloader{
manifest: []byte(winManifest),
contentType: manifest.MediaTypeOCIManifest,
blobs: map[string][]byte{
digestA: []byte(diskBlobA),
digestB: []byte(diskBlobB),
"sha256:cc": []byte("ignored"),
},
}
cocoon := &fakeCocoon{}
puller := &Puller{Downloader: dl, Cocoon: cocoon}

if err := puller.Pull(t.Context(), PullOptions{
Name: "windows/win11",
Tag: "25h2",
LocalName: "win11",
}); err != nil {
t.Fatalf("Pull: %v", err)
}
if cocoon.importName != "win11" {
t.Errorf("import name = %q, want win11", cocoon.importName)
}
if got, want := cocoon.importPayload.String(), "AAAABBBB"; got != want {
t.Errorf("import payload = %q, want %q", got, want)
}
}
81 changes: 81 additions & 0 deletions cloudimg/pull.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package cloudimg

import (
"context"
"errors"
"fmt"
"io"
)

var _ BlobReader = blobReaderAdapter{}

// Downloader abstracts OCI manifest and blob downloads.
type Downloader interface {
GetManifest(ctx context.Context, name, tag string) ([]byte, string, error)
GetBlob(ctx context.Context, name, digest string) (io.ReadCloser, error)
}

// CocoonRunner abstracts the cocoon image import subprocess.
type CocoonRunner interface {
ImportImage(ctx context.Context, name string) (io.WriteCloser, func() error, error)
}

// Puller downloads cloud-image artifacts and pipes them into cocoon image import.
type Puller struct {
Downloader Downloader
Cocoon CocoonRunner
}

// PullOptions configures a cloud-image pull operation.
type PullOptions struct {
Name string // OCI repository name. Required.
Tag string // Defaults to "latest".
LocalName string // Override the cocoon-side image name. Empty = use Name.
}

// Pull downloads a cloud-image artifact and pipes it into cocoon image import.
func (p *Puller) Pull(ctx context.Context, opts PullOptions) error {
if opts.Name == "" {
return errors.New("cloudimg pull: name is required")
}
if opts.Tag == "" {
opts.Tag = "latest"
}
localName := opts.LocalName
if localName == "" {
localName = opts.Name
}

raw, _, err := p.Downloader.GetManifest(ctx, opts.Name, opts.Tag)
if err != nil {
return fmt.Errorf("get manifest %s:%s: %w", opts.Name, opts.Tag, err)
}

stdin, wait, err := p.Cocoon.ImportImage(ctx, localName)
if err != nil {
return fmt.Errorf("start cocoon image import: %w", err)
}

streamErr := Stream(ctx, raw, blobReaderAdapter{name: opts.Name, dl: p.Downloader}, stdin)
closeErr := stdin.Close()
waitErr := wait()

switch {
case streamErr != nil:
return fmt.Errorf("stream cloudimg: %w", streamErr)
case closeErr != nil:
return fmt.Errorf("close cocoon stdin: %w", closeErr)
case waitErr != nil:
return fmt.Errorf("cocoon image import: %w", waitErr)
}
return nil
}

type blobReaderAdapter struct {
name string
dl Downloader
}

func (a blobReaderAdapter) ReadBlob(ctx context.Context, digest string) (io.ReadCloser, error) {
return a.dl.GetBlob(ctx, a.name, digest)
}
Loading