Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions lib/instances/firecracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ func TestFirecrackerWarmForkChain(t *testing.T) {
mgr, tmpDir := setupTestManagerForFirecrackerNoNetwork(t)
ctx := context.Background()
p := paths.New(tmpDir)
reflinkOK := probeReflinkSupport(t, tmpDir)

imageManager, err := images.NewManager(p, 1, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -622,6 +623,11 @@ func TestFirecrackerWarmForkChain(t *testing.T) {
warm, err = waitForInstanceState(ctx, mgr, warmID, StateRunning, integrationTestTimeout(20*time.Second))
require.NoError(t, err)
require.NoError(t, waitForExecAgent(ctx, mgr, warmID, 30*time.Second))
if reflinkOK {
requireSnapshotMemorySharedExtents(t, mgr, p.InstanceSnapshotBase(warmID), "warm retained base from snapshot")
} else {
t.Log("reflink unavailable; skipping shared extent assertion for warm fork")
}

child, err := mgr.ForkInstance(ctx, warmID, ForkInstanceRequest{
Name: "fc-warm-chain-child",
Expand All @@ -641,12 +647,28 @@ func TestFirecrackerWarmForkChain(t *testing.T) {
}
require.Equal(t, StateRunning, warm.State)
require.NoError(t, waitForExecAgent(ctx, mgr, warmID, 30*time.Second))
if reflinkOK {
requireSnapshotMemorySharedExtents(t, mgr, p.InstanceSnapshotBase(warmID), "warm retained base after child fork")
}

require.NoError(t, mgr.DeleteInstance(ctx, warmID))
warmDeleted = true
require.NoError(t, mgr.DeleteSnapshot(ctx, snapshot.Id))
}

func requireSnapshotMemorySharedExtents(t *testing.T, mgr *manager, snapshotDir, label string) {
t.Helper()

rawPath, ok := findRawSnapshotMemoryFile(snapshotDir)
require.True(t, ok, "%s should have a raw snapshot memory file", label)

sharing, err := mgr.inspectSnapshotMemorySharing(rawPath)
require.NoError(t, err, "%s shared extent inspection failed", label)
require.False(t, sharing.Unknown, "%s shared extent inspection returned unknown", label)
require.Greater(t, sharing.SharedBytes, int64(0), "%s should have shared extents", label)
t.Logf("%s memory sharing: shared=%d private=%d path=%s", label, sharing.SharedBytes, sharing.PrivateBytes, rawPath)
}

// TestFirecrackerForkIsolation verifies CoW isolation between a firecracker
// source's standby snapshot and a fork derived from it. A fork must end up
// with its own mem-file inode (reflink-cloned, not hardlinked) so that
Expand Down
1 change: 1 addition & 0 deletions lib/instances/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type manager struct {
compressionMu sync.Mutex
compressionJobs map[string]*compressionJob
compressionTimerFactory func(time.Duration) compressionTimer
sharingInspector func(string) (snapshotMemorySharing, error)
nativeCodecMu sync.Mutex
nativeCodecPaths map[string]string
imageUsageRecorder ImageUsageRecorder
Expand Down
10 changes: 9 additions & 1 deletion lib/instances/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ const (
snapshotCompressionResultFailed snapshotCompressionResult = "failed"
)

type snapshotCompressionSkipReason string

const (
snapshotCompressionSkipReasonNone snapshotCompressionSkipReason = "none"
snapshotCompressionSkipReasonSharedExtents snapshotCompressionSkipReason = "shared_extents"
)

type snapshotCompressionWaitOutcome string

const (
Expand Down Expand Up @@ -569,14 +576,15 @@ func snapshotCompressionAttributes(hvType hypervisor.Type, algorithm snapshotsto
return attrs
}

func (m *manager) recordSnapshotCompressionJob(ctx context.Context, target compressionTarget, result snapshotCompressionResult, compressionStart *time.Time, uncompressedSize, compressedSize int64) {
func (m *manager) recordSnapshotCompressionJob(ctx context.Context, target compressionTarget, result snapshotCompressionResult, skipReason snapshotCompressionSkipReason, compressionStart *time.Time, uncompressedSize, compressedSize int64) {
if m.metrics == nil {
return
}

attrs := snapshotCompressionAttributes(target.HypervisorType, target.Policy.Algorithm, target.Source)
attrsWithResult := append([]attribute.KeyValue{}, attrs...)
attrsWithResult = append(attrsWithResult, attribute.String("result", string(result)))
attrsWithResult = append(attrsWithResult, attribute.String("reason", string(skipReason)))

m.metrics.snapshotCompressionJobsTotal.Add(ctx, 1, metric.WithAttributes(attrsWithResult...))
if compressionStart != nil {
Expand Down
7 changes: 5 additions & 2 deletions lib/instances/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func TestSnapshotCompressionMetrics_RecordAndObserve(t *testing.T) {

target := m.compressionJobs["job-1"].target
startedAt := time.Now().Add(-2 * time.Second)
m.recordSnapshotCompressionJob(t.Context(), target, snapshotCompressionResultSuccess, &startedAt, 1024, 256)
m.recordSnapshotCompressionJob(t.Context(), m.compressionJobs["job-2"].target, snapshotCompressionResultSkipped, nil, 0, 0)
m.recordSnapshotCompressionJob(t.Context(), target, snapshotCompressionResultSuccess, snapshotCompressionSkipReasonNone, &startedAt, 1024, 256)
m.recordSnapshotCompressionJob(t.Context(), m.compressionJobs["job-2"].target, snapshotCompressionResultSkipped, snapshotCompressionSkipReasonSharedExtents, nil, 0, 0)
m.recordSnapshotCompressionWait(t.Context(), m.compressionJobs["job-2"].target, snapshotCompressionWaitOutcomeSkipped, time.Now().Add(-1500*time.Millisecond))
m.recordSnapshotCodecFallback(t.Context(), snapshotstore.SnapshotCompressionAlgorithmLz4, snapshotCodecOperationCompress, snapshotCodecFallbackReasonMissingBinary)
m.recordSnapshotRestoreMemoryPrepare(t.Context(), hypervisor.TypeCloudHypervisor, snapshotMemoryPreparePathRaw, snapshotCompressionResultSuccess, time.Now().Add(-250*time.Millisecond))
Expand Down Expand Up @@ -96,11 +96,13 @@ func TestSnapshotCompressionMetrics_RecordAndObserve(t *testing.T) {
assert.Equal(t, "cloud-hypervisor", metricLabel(t, point.Attributes, "hypervisor"))
assert.Equal(t, "lz4", metricLabel(t, point.Attributes, "algorithm"))
assert.Equal(t, "standby", metricLabel(t, point.Attributes, "source"))
assert.Equal(t, "none", metricLabel(t, point.Attributes, "reason"))
case "skipped":
assert.Equal(t, int64(1), point.Value)
assert.Equal(t, "qemu", metricLabel(t, point.Attributes, "hypervisor"))
assert.Equal(t, "zstd", metricLabel(t, point.Attributes, "algorithm"))
assert.Equal(t, "standby", metricLabel(t, point.Attributes, "source"))
assert.Equal(t, "shared_extents", metricLabel(t, point.Attributes, "reason"))
default:
t.Fatalf("unexpected compression job result datapoint: %s", metricLabel(t, point.Attributes, "result"))
}
Expand Down Expand Up @@ -546,6 +548,7 @@ func TestEnsureSnapshotMemoryReadySkipsPendingCompressionWithoutPreemptionMetric
require.True(t, ok)
require.Len(t, jobs.DataPoints, 1)
assert.Equal(t, "skipped", metricLabel(t, jobs.DataPoints[0].Attributes, "result"))
assert.Equal(t, "none", metricLabel(t, jobs.DataPoints[0].Attributes, "reason"))

waitMetric := findMetric(t, rm, "hypeman_snapshot_compression_wait_duration_seconds")
waitDurations, ok := waitMetric.Data.(metricdata.Histogram[float64])
Expand Down
52 changes: 51 additions & 1 deletion lib/instances/snapshot_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ type nativeCodecRuntime struct {
commandContext func(ctx context.Context, name string, arg ...string) *exec.Cmd
}

type snapshotMemorySharing struct {
SharedBytes int64
PrivateBytes int64
Unknown bool
}

func (r nativeCodecRuntime) lookPathFunc() func(string) (string, error) {
if r.lookPath != nil {
return r.lookPath
Expand Down Expand Up @@ -511,6 +517,35 @@ func runWithNativeFallback(ctx context.Context, runtime nativeCodecRuntime, algo
return nil
}

func (m *manager) inspectSnapshotMemorySharing(rawPath string) (snapshotMemorySharing, error) {
if m != nil && m.sharingInspector != nil {
return m.sharingInspector(rawPath)
}
return inspectSnapshotMemorySharing(rawPath)
}

func (m *manager) shouldSkipSnapshotCompressionForSharedMemory(ctx context.Context, target compressionTarget, rawPath string) (snapshotMemorySharing, bool) {
if target.HypervisorType != hypervisor.TypeFirecracker || target.Source != snapshotCompressionSourceStandby {
return snapshotMemorySharing{}, false
}

sharing, err := m.inspectSnapshotMemorySharing(rawPath)
if err != nil {
logger.FromContext(ctx).WarnContext(ctx, "failed to inspect snapshot memory sharing; continuing compression",
"owner_id", target.OwnerID,
"snapshot_id", target.SnapshotID,
"snapshot_dir", target.SnapshotDir,
"raw_path", rawPath,
"error", err,
)
return snapshotMemorySharing{}, false
}
if sharing.Unknown || sharing.SharedBytes <= 0 {
return sharing, false
}
return sharing, true
}

func (m *manager) startCompressionJob(ctx context.Context, target compressionTarget) {
if target.Key == "" || !target.Policy.Enabled {
return
Expand Down Expand Up @@ -539,6 +574,7 @@ func (m *manager) startCompressionJob(ctx context.Context, target compressionTar

go func() {
result := snapshotCompressionResultSuccess
skipReason := snapshotCompressionSkipReasonNone
var uncompressedSize int64
var compressedSize int64
var spanErr error
Expand All @@ -547,7 +583,7 @@ func (m *manager) startCompressionJob(ctx context.Context, target compressionTar
var compressionStart *time.Time

defer func() {
m.recordSnapshotCompressionJob(metricsCtx, target, result, compressionStart, uncompressedSize, compressedSize)
m.recordSnapshotCompressionJob(metricsCtx, target, result, skipReason, compressionStart, uncompressedSize, compressedSize)
if target.Source == snapshotCompressionSourceStandby && target.SnapshotID == "" {
if err := m.clearPendingStandbyCompression(context.Background(), target.OwnerID); err != nil && !errors.Is(err, ErrNotFound) {
log.WarnContext(context.Background(), "failed to clear pending standby compression plan after job completion", "instance_id", target.OwnerID, "error", err)
Expand Down Expand Up @@ -667,6 +703,20 @@ func (m *manager) startCompressionJob(ctx context.Context, target compressionTar
}
return
}
if sharing, skip := m.shouldSkipSnapshotCompressionForSharedMemory(compressionCtx, target, rawPath); skip {
result = snapshotCompressionResultSkipped
skipReason = snapshotCompressionSkipReasonSharedExtents
log.InfoContext(compressionCtx, "skipping standby snapshot compression because memory file has shared extents",
"owner_id", target.OwnerID,
"snapshot_id", target.SnapshotID,
"snapshot_dir", target.SnapshotDir,
"raw_path", rawPath,
"hypervisor", string(target.HypervisorType),
"shared_bytes", sharing.SharedBytes,
"private_bytes", sharing.PrivateBytes,
)
return
}

var err error
uncompressedSize, compressedSize, err = compressSnapshotMemoryFileWithRuntime(jobCtx, nativeCodecRuntime{manager: m}, rawPath, target.Policy)
Expand Down
97 changes: 97 additions & 0 deletions lib/instances/snapshot_compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
snapshotstore "github.com/kernel/hypeman/lib/snapshot"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
otelmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func TestNormalizeCompressionConfig(t *testing.T) {
Expand Down Expand Up @@ -433,6 +435,101 @@ func newSnapshotCompressionTestManager(t *testing.T) *manager {
}
}

func TestStartCompressionJobSkipsFirecrackerStandbySharedMemory(t *testing.T) {
mgr := newSnapshotCompressionTestManager(t)
reader := otelmetric.NewManualReader()
provider := otelmetric.NewMeterProvider(otelmetric.WithReader(reader))
metrics, err := newInstanceMetrics(provider.Meter("test"), nil, mgr)
require.NoError(t, err)
mgr.metrics = metrics

snapshotDir := t.TempDir()
rawPath := filepath.Join(snapshotDir, "memory")
require.NoError(t, os.WriteFile(rawPath, []byte("shared firecracker standby memory"), 0o644))

mgr.sharingInspector = func(path string) (snapshotMemorySharing, error) {
assert.Equal(t, rawPath, path)
return snapshotMemorySharing{SharedBytes: 1024, PrivateBytes: 512}, nil
}

target := compressionTarget{
Key: "instance:shared-firecracker",
OwnerID: "shared-firecracker",
SnapshotDir: snapshotDir,
HypervisorType: hypervisor.TypeFirecracker,
Source: snapshotCompressionSourceStandby,
Policy: snapshotstore.SnapshotCompressionConfig{
Enabled: true,
Algorithm: snapshotstore.SnapshotCompressionAlgorithmZstd,
Level: intPtr(1),
},
}

mgr.startCompressionJob(context.Background(), target)

require.Eventually(t, func() bool {
mgr.compressionMu.Lock()
defer mgr.compressionMu.Unlock()
_, ok := mgr.compressionJobs[target.Key]
return !ok
}, time.Second, 10*time.Millisecond)

_, err = os.Stat(rawPath)
require.NoError(t, err, "shared raw memory should remain uncompressed")
_, err = os.Stat(rawPath + ".zst")
require.Error(t, err)
assert.True(t, os.IsNotExist(err))

var rm metricdata.ResourceMetrics
require.NoError(t, reader.Collect(t.Context(), &rm))
jobsMetric := findMetric(t, rm, "hypeman_snapshot_compression_jobs_total")
jobs, ok := jobsMetric.Data.(metricdata.Sum[int64])
require.True(t, ok)
require.Len(t, jobs.DataPoints, 1)
assert.Equal(t, "skipped", metricLabel(t, jobs.DataPoints[0].Attributes, "result"))
assert.Equal(t, "shared_extents", metricLabel(t, jobs.DataPoints[0].Attributes, "reason"))
}

func TestStartCompressionJobDoesNotSkipNonFirecrackerSharedMemory(t *testing.T) {
mgr := newSnapshotCompressionTestManager(t)

snapshotDir := t.TempDir()
rawPath := filepath.Join(snapshotDir, "memory")
require.NoError(t, os.WriteFile(rawPath, []byte("cloud hypervisor memory"), 0o644))

mgr.sharingInspector = func(string) (snapshotMemorySharing, error) {
return snapshotMemorySharing{SharedBytes: 1024}, nil
}

target := compressionTarget{
Key: "instance:shared-cloud-hypervisor",
OwnerID: "shared-cloud-hypervisor",
SnapshotDir: snapshotDir,
HypervisorType: hypervisor.TypeCloudHypervisor,
Source: snapshotCompressionSourceStandby,
Policy: snapshotstore.SnapshotCompressionConfig{
Enabled: true,
Algorithm: snapshotstore.SnapshotCompressionAlgorithmZstd,
Level: intPtr(1),
},
}

mgr.startCompressionJob(context.Background(), target)

require.Eventually(t, func() bool {
mgr.compressionMu.Lock()
defer mgr.compressionMu.Unlock()
_, ok := mgr.compressionJobs[target.Key]
return !ok
}, time.Second, 10*time.Millisecond)

_, err := os.Stat(rawPath)
require.Error(t, err)
assert.True(t, os.IsNotExist(err))
_, _, compressed := findCompressedSnapshotMemoryFile(snapshotDir)
assert.True(t, compressed)
}

func TestStartCompressionJobDelayedCancellationRecordsSkipped(t *testing.T) {
t.Parallel()

Expand Down
Loading
Loading