From 915f11e3e0351acc9c7508b15614b8f67bd067ea Mon Sep 17 00:00:00 2001 From: sjmiller609 <7516283+sjmiller609@users.noreply.github.com> Date: Tue, 26 May 2026 15:15:47 +0000 Subject: [PATCH] Skip shared Firecracker standby compression --- lib/instances/firecracker_test.go | 22 ++++ lib/instances/manager.go | 1 + lib/instances/metrics.go | 10 +- lib/instances/metrics_test.go | 7 +- lib/instances/snapshot_compression.go | 52 ++++++++- lib/instances/snapshot_compression_test.go | 97 ++++++++++++++++ .../snapshot_memory_sharing_linux.go | 106 ++++++++++++++++++ .../snapshot_memory_sharing_unsupported.go | 7 ++ 8 files changed, 298 insertions(+), 4 deletions(-) create mode 100644 lib/instances/snapshot_memory_sharing_linux.go create mode 100644 lib/instances/snapshot_memory_sharing_unsupported.go diff --git a/lib/instances/firecracker_test.go b/lib/instances/firecracker_test.go index 67e4b0f0..16aebee3 100644 --- a/lib/instances/firecracker_test.go +++ b/lib/instances/firecracker_test.go @@ -565,6 +565,7 @@ func TestFirecrackerWarmForkChain(t *testing.T) { mgr, tmpDir := setupTestManagerForFirecrackerNoNetwork(t) ctx := context.Background() p := paths.New(tmpDir) + reflinkOK := probeReflinkSupport(t, tmpDir) imageManager, err := images.NewManager(p, 1, nil) require.NoError(t, err) @@ -622,6 +623,11 @@ func TestFirecrackerWarmForkChain(t *testing.T) { warm, err = waitForInstanceState(ctx, mgr, warmID, StateRunning, integrationTestTimeout(20*time.Second)) require.NoError(t, err) require.NoError(t, waitForExecAgent(ctx, mgr, warmID, 30*time.Second)) + if reflinkOK { + requireSnapshotMemorySharedExtents(t, mgr, p.InstanceSnapshotBase(warmID), "warm retained base from snapshot") + } else { + t.Log("reflink unavailable; skipping shared extent assertion for warm fork") + } child, err := mgr.ForkInstance(ctx, warmID, ForkInstanceRequest{ Name: "fc-warm-chain-child", @@ -641,12 +647,28 @@ func TestFirecrackerWarmForkChain(t *testing.T) { } require.Equal(t, StateRunning, warm.State) require.NoError(t, waitForExecAgent(ctx, mgr, warmID, 30*time.Second)) + if reflinkOK { + requireSnapshotMemorySharedExtents(t, mgr, p.InstanceSnapshotBase(warmID), "warm retained base after child fork") + } require.NoError(t, mgr.DeleteInstance(ctx, warmID)) warmDeleted = true require.NoError(t, mgr.DeleteSnapshot(ctx, snapshot.Id)) } +func requireSnapshotMemorySharedExtents(t *testing.T, mgr *manager, snapshotDir, label string) { + t.Helper() + + rawPath, ok := findRawSnapshotMemoryFile(snapshotDir) + require.True(t, ok, "%s should have a raw snapshot memory file", label) + + sharing, err := mgr.inspectSnapshotMemorySharing(rawPath) + require.NoError(t, err, "%s shared extent inspection failed", label) + require.False(t, sharing.Unknown, "%s shared extent inspection returned unknown", label) + require.Greater(t, sharing.SharedBytes, int64(0), "%s should have shared extents", label) + t.Logf("%s memory sharing: shared=%d private=%d path=%s", label, sharing.SharedBytes, sharing.PrivateBytes, rawPath) +} + // TestFirecrackerForkIsolation verifies CoW isolation between a firecracker // source's standby snapshot and a fork derived from it. A fork must end up // with its own mem-file inode (reflink-cloned, not hardlinked) so that diff --git a/lib/instances/manager.go b/lib/instances/manager.go index b0842e6a..4a02e431 100644 --- a/lib/instances/manager.go +++ b/lib/instances/manager.go @@ -131,6 +131,7 @@ type manager struct { compressionMu sync.Mutex compressionJobs map[string]*compressionJob compressionTimerFactory func(time.Duration) compressionTimer + sharingInspector func(string) (snapshotMemorySharing, error) nativeCodecMu sync.Mutex nativeCodecPaths map[string]string imageUsageRecorder ImageUsageRecorder diff --git a/lib/instances/metrics.go b/lib/instances/metrics.go index 84bf8060..ac929414 100644 --- a/lib/instances/metrics.go +++ b/lib/instances/metrics.go @@ -30,6 +30,13 @@ const ( snapshotCompressionResultFailed snapshotCompressionResult = "failed" ) +type snapshotCompressionSkipReason string + +const ( + snapshotCompressionSkipReasonNone snapshotCompressionSkipReason = "none" + snapshotCompressionSkipReasonSharedExtents snapshotCompressionSkipReason = "shared_extents" +) + type snapshotCompressionWaitOutcome string const ( @@ -569,7 +576,7 @@ func snapshotCompressionAttributes(hvType hypervisor.Type, algorithm snapshotsto return attrs } -func (m *manager) recordSnapshotCompressionJob(ctx context.Context, target compressionTarget, result snapshotCompressionResult, compressionStart *time.Time, uncompressedSize, compressedSize int64) { +func (m *manager) recordSnapshotCompressionJob(ctx context.Context, target compressionTarget, result snapshotCompressionResult, skipReason snapshotCompressionSkipReason, compressionStart *time.Time, uncompressedSize, compressedSize int64) { if m.metrics == nil { return } @@ -577,6 +584,7 @@ func (m *manager) recordSnapshotCompressionJob(ctx context.Context, target compr attrs := snapshotCompressionAttributes(target.HypervisorType, target.Policy.Algorithm, target.Source) attrsWithResult := append([]attribute.KeyValue{}, attrs...) attrsWithResult = append(attrsWithResult, attribute.String("result", string(result))) + attrsWithResult = append(attrsWithResult, attribute.String("reason", string(skipReason))) m.metrics.snapshotCompressionJobsTotal.Add(ctx, 1, metric.WithAttributes(attrsWithResult...)) if compressionStart != nil { diff --git a/lib/instances/metrics_test.go b/lib/instances/metrics_test.go index 811571e3..f8c3a3f1 100644 --- a/lib/instances/metrics_test.go +++ b/lib/instances/metrics_test.go @@ -61,8 +61,8 @@ func TestSnapshotCompressionMetrics_RecordAndObserve(t *testing.T) { target := m.compressionJobs["job-1"].target startedAt := time.Now().Add(-2 * time.Second) - m.recordSnapshotCompressionJob(t.Context(), target, snapshotCompressionResultSuccess, &startedAt, 1024, 256) - m.recordSnapshotCompressionJob(t.Context(), m.compressionJobs["job-2"].target, snapshotCompressionResultSkipped, nil, 0, 0) + m.recordSnapshotCompressionJob(t.Context(), target, snapshotCompressionResultSuccess, snapshotCompressionSkipReasonNone, &startedAt, 1024, 256) + m.recordSnapshotCompressionJob(t.Context(), m.compressionJobs["job-2"].target, snapshotCompressionResultSkipped, snapshotCompressionSkipReasonSharedExtents, nil, 0, 0) m.recordSnapshotCompressionWait(t.Context(), m.compressionJobs["job-2"].target, snapshotCompressionWaitOutcomeSkipped, time.Now().Add(-1500*time.Millisecond)) m.recordSnapshotCodecFallback(t.Context(), snapshotstore.SnapshotCompressionAlgorithmLz4, snapshotCodecOperationCompress, snapshotCodecFallbackReasonMissingBinary) m.recordSnapshotRestoreMemoryPrepare(t.Context(), hypervisor.TypeCloudHypervisor, snapshotMemoryPreparePathRaw, snapshotCompressionResultSuccess, time.Now().Add(-250*time.Millisecond)) @@ -96,11 +96,13 @@ func TestSnapshotCompressionMetrics_RecordAndObserve(t *testing.T) { assert.Equal(t, "cloud-hypervisor", metricLabel(t, point.Attributes, "hypervisor")) assert.Equal(t, "lz4", metricLabel(t, point.Attributes, "algorithm")) assert.Equal(t, "standby", metricLabel(t, point.Attributes, "source")) + assert.Equal(t, "none", metricLabel(t, point.Attributes, "reason")) case "skipped": assert.Equal(t, int64(1), point.Value) assert.Equal(t, "qemu", metricLabel(t, point.Attributes, "hypervisor")) assert.Equal(t, "zstd", metricLabel(t, point.Attributes, "algorithm")) assert.Equal(t, "standby", metricLabel(t, point.Attributes, "source")) + assert.Equal(t, "shared_extents", metricLabel(t, point.Attributes, "reason")) default: t.Fatalf("unexpected compression job result datapoint: %s", metricLabel(t, point.Attributes, "result")) } @@ -546,6 +548,7 @@ func TestEnsureSnapshotMemoryReadySkipsPendingCompressionWithoutPreemptionMetric require.True(t, ok) require.Len(t, jobs.DataPoints, 1) assert.Equal(t, "skipped", metricLabel(t, jobs.DataPoints[0].Attributes, "result")) + assert.Equal(t, "none", metricLabel(t, jobs.DataPoints[0].Attributes, "reason")) waitMetric := findMetric(t, rm, "hypeman_snapshot_compression_wait_duration_seconds") waitDurations, ok := waitMetric.Data.(metricdata.Histogram[float64]) diff --git a/lib/instances/snapshot_compression.go b/lib/instances/snapshot_compression.go index 61d1ebff..f188d271 100644 --- a/lib/instances/snapshot_compression.go +++ b/lib/instances/snapshot_compression.go @@ -85,6 +85,12 @@ type nativeCodecRuntime struct { commandContext func(ctx context.Context, name string, arg ...string) *exec.Cmd } +type snapshotMemorySharing struct { + SharedBytes int64 + PrivateBytes int64 + Unknown bool +} + func (r nativeCodecRuntime) lookPathFunc() func(string) (string, error) { if r.lookPath != nil { return r.lookPath @@ -511,6 +517,35 @@ func runWithNativeFallback(ctx context.Context, runtime nativeCodecRuntime, algo return nil } +func (m *manager) inspectSnapshotMemorySharing(rawPath string) (snapshotMemorySharing, error) { + if m != nil && m.sharingInspector != nil { + return m.sharingInspector(rawPath) + } + return inspectSnapshotMemorySharing(rawPath) +} + +func (m *manager) shouldSkipSnapshotCompressionForSharedMemory(ctx context.Context, target compressionTarget, rawPath string) (snapshotMemorySharing, bool) { + if target.HypervisorType != hypervisor.TypeFirecracker || target.Source != snapshotCompressionSourceStandby { + return snapshotMemorySharing{}, false + } + + sharing, err := m.inspectSnapshotMemorySharing(rawPath) + if err != nil { + logger.FromContext(ctx).WarnContext(ctx, "failed to inspect snapshot memory sharing; continuing compression", + "owner_id", target.OwnerID, + "snapshot_id", target.SnapshotID, + "snapshot_dir", target.SnapshotDir, + "raw_path", rawPath, + "error", err, + ) + return snapshotMemorySharing{}, false + } + if sharing.Unknown || sharing.SharedBytes <= 0 { + return sharing, false + } + return sharing, true +} + func (m *manager) startCompressionJob(ctx context.Context, target compressionTarget) { if target.Key == "" || !target.Policy.Enabled { return @@ -539,6 +574,7 @@ func (m *manager) startCompressionJob(ctx context.Context, target compressionTar go func() { result := snapshotCompressionResultSuccess + skipReason := snapshotCompressionSkipReasonNone var uncompressedSize int64 var compressedSize int64 var spanErr error @@ -547,7 +583,7 @@ func (m *manager) startCompressionJob(ctx context.Context, target compressionTar var compressionStart *time.Time defer func() { - m.recordSnapshotCompressionJob(metricsCtx, target, result, compressionStart, uncompressedSize, compressedSize) + m.recordSnapshotCompressionJob(metricsCtx, target, result, skipReason, compressionStart, uncompressedSize, compressedSize) if target.Source == snapshotCompressionSourceStandby && target.SnapshotID == "" { if err := m.clearPendingStandbyCompression(context.Background(), target.OwnerID); err != nil && !errors.Is(err, ErrNotFound) { log.WarnContext(context.Background(), "failed to clear pending standby compression plan after job completion", "instance_id", target.OwnerID, "error", err) @@ -667,6 +703,20 @@ func (m *manager) startCompressionJob(ctx context.Context, target compressionTar } return } + if sharing, skip := m.shouldSkipSnapshotCompressionForSharedMemory(compressionCtx, target, rawPath); skip { + result = snapshotCompressionResultSkipped + skipReason = snapshotCompressionSkipReasonSharedExtents + log.InfoContext(compressionCtx, "skipping standby snapshot compression because memory file has shared extents", + "owner_id", target.OwnerID, + "snapshot_id", target.SnapshotID, + "snapshot_dir", target.SnapshotDir, + "raw_path", rawPath, + "hypervisor", string(target.HypervisorType), + "shared_bytes", sharing.SharedBytes, + "private_bytes", sharing.PrivateBytes, + ) + return + } var err error uncompressedSize, compressedSize, err = compressSnapshotMemoryFileWithRuntime(jobCtx, nativeCodecRuntime{manager: m}, rawPath, target.Policy) diff --git a/lib/instances/snapshot_compression_test.go b/lib/instances/snapshot_compression_test.go index 86a61e48..b235c3e9 100644 --- a/lib/instances/snapshot_compression_test.go +++ b/lib/instances/snapshot_compression_test.go @@ -14,6 +14,8 @@ import ( snapshotstore "github.com/kernel/hypeman/lib/snapshot" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + otelmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" ) func TestNormalizeCompressionConfig(t *testing.T) { @@ -433,6 +435,101 @@ func newSnapshotCompressionTestManager(t *testing.T) *manager { } } +func TestStartCompressionJobSkipsFirecrackerStandbySharedMemory(t *testing.T) { + mgr := newSnapshotCompressionTestManager(t) + reader := otelmetric.NewManualReader() + provider := otelmetric.NewMeterProvider(otelmetric.WithReader(reader)) + metrics, err := newInstanceMetrics(provider.Meter("test"), nil, mgr) + require.NoError(t, err) + mgr.metrics = metrics + + snapshotDir := t.TempDir() + rawPath := filepath.Join(snapshotDir, "memory") + require.NoError(t, os.WriteFile(rawPath, []byte("shared firecracker standby memory"), 0o644)) + + mgr.sharingInspector = func(path string) (snapshotMemorySharing, error) { + assert.Equal(t, rawPath, path) + return snapshotMemorySharing{SharedBytes: 1024, PrivateBytes: 512}, nil + } + + target := compressionTarget{ + Key: "instance:shared-firecracker", + OwnerID: "shared-firecracker", + SnapshotDir: snapshotDir, + HypervisorType: hypervisor.TypeFirecracker, + Source: snapshotCompressionSourceStandby, + Policy: snapshotstore.SnapshotCompressionConfig{ + Enabled: true, + Algorithm: snapshotstore.SnapshotCompressionAlgorithmZstd, + Level: intPtr(1), + }, + } + + mgr.startCompressionJob(context.Background(), target) + + require.Eventually(t, func() bool { + mgr.compressionMu.Lock() + defer mgr.compressionMu.Unlock() + _, ok := mgr.compressionJobs[target.Key] + return !ok + }, time.Second, 10*time.Millisecond) + + _, err = os.Stat(rawPath) + require.NoError(t, err, "shared raw memory should remain uncompressed") + _, err = os.Stat(rawPath + ".zst") + require.Error(t, err) + assert.True(t, os.IsNotExist(err)) + + var rm metricdata.ResourceMetrics + require.NoError(t, reader.Collect(t.Context(), &rm)) + jobsMetric := findMetric(t, rm, "hypeman_snapshot_compression_jobs_total") + jobs, ok := jobsMetric.Data.(metricdata.Sum[int64]) + require.True(t, ok) + require.Len(t, jobs.DataPoints, 1) + assert.Equal(t, "skipped", metricLabel(t, jobs.DataPoints[0].Attributes, "result")) + assert.Equal(t, "shared_extents", metricLabel(t, jobs.DataPoints[0].Attributes, "reason")) +} + +func TestStartCompressionJobDoesNotSkipNonFirecrackerSharedMemory(t *testing.T) { + mgr := newSnapshotCompressionTestManager(t) + + snapshotDir := t.TempDir() + rawPath := filepath.Join(snapshotDir, "memory") + require.NoError(t, os.WriteFile(rawPath, []byte("cloud hypervisor memory"), 0o644)) + + mgr.sharingInspector = func(string) (snapshotMemorySharing, error) { + return snapshotMemorySharing{SharedBytes: 1024}, nil + } + + target := compressionTarget{ + Key: "instance:shared-cloud-hypervisor", + OwnerID: "shared-cloud-hypervisor", + SnapshotDir: snapshotDir, + HypervisorType: hypervisor.TypeCloudHypervisor, + Source: snapshotCompressionSourceStandby, + Policy: snapshotstore.SnapshotCompressionConfig{ + Enabled: true, + Algorithm: snapshotstore.SnapshotCompressionAlgorithmZstd, + Level: intPtr(1), + }, + } + + mgr.startCompressionJob(context.Background(), target) + + require.Eventually(t, func() bool { + mgr.compressionMu.Lock() + defer mgr.compressionMu.Unlock() + _, ok := mgr.compressionJobs[target.Key] + return !ok + }, time.Second, 10*time.Millisecond) + + _, err := os.Stat(rawPath) + require.Error(t, err) + assert.True(t, os.IsNotExist(err)) + _, _, compressed := findCompressedSnapshotMemoryFile(snapshotDir) + assert.True(t, compressed) +} + func TestStartCompressionJobDelayedCancellationRecordsSkipped(t *testing.T) { t.Parallel() diff --git a/lib/instances/snapshot_memory_sharing_linux.go b/lib/instances/snapshot_memory_sharing_linux.go new file mode 100644 index 00000000..4862099e --- /dev/null +++ b/lib/instances/snapshot_memory_sharing_linux.go @@ -0,0 +1,106 @@ +//go:build linux + +package instances + +import ( + "errors" + "fmt" + "os" + "unsafe" + + "golang.org/x/sys/unix" +) + +const ( + snapshotMemoryFsIocFiemap = 0xC020660B + snapshotMemoryFiemapFlagSync = 0x00000001 + snapshotMemoryFiemapExtentLast = 0x00000001 + snapshotMemoryFiemapExtentShared = 0x00002000 + snapshotMemoryMaxFiemapExtents = 512 +) + +type snapshotMemoryFiemapExtent struct { + Logical uint64 + Physical uint64 + Length uint64 + Reserved64 [2]uint64 + Flags uint32 + Reserved [3]uint32 +} + +type snapshotMemoryFiemap struct { + Start uint64 + Length uint64 + Flags uint32 + MappedExtents uint32 + ExtentCount uint32 + Reserved uint32 + Extents [snapshotMemoryMaxFiemapExtents]snapshotMemoryFiemapExtent +} + +func inspectSnapshotMemorySharing(path string) (snapshotMemorySharing, error) { + file, err := os.Open(path) + if err != nil { + return snapshotMemorySharing{}, fmt.Errorf("open snapshot memory: %w", err) + } + defer file.Close() + + info, err := file.Stat() + if err != nil { + return snapshotMemorySharing{}, fmt.Errorf("stat snapshot memory: %w", err) + } + if info.Size() == 0 { + return snapshotMemorySharing{}, nil + } + + var out snapshotMemorySharing + start := uint64(0) + for { + mapping := snapshotMemoryFiemap{ + Start: start, + Length: ^uint64(0), + Flags: snapshotMemoryFiemapFlagSync, + ExtentCount: snapshotMemoryMaxFiemapExtents, + } + + _, _, errno := unix.Syscall(unix.SYS_IOCTL, file.Fd(), uintptr(snapshotMemoryFsIocFiemap), uintptr(unsafe.Pointer(&mapping))) + if errno != 0 { + if isFiemapUnsupported(errno) { + return snapshotMemorySharing{Unknown: true}, nil + } + return snapshotMemorySharing{}, fmt.Errorf("fiemap snapshot memory: %w", errno) + } + if mapping.MappedExtents == 0 { + return out, nil + } + + last := false + nextStart := start + for i := uint32(0); i < mapping.MappedExtents; i++ { + extent := mapping.Extents[i] + length := int64(extent.Length) + if extent.Flags&snapshotMemoryFiemapExtentShared != 0 { + out.SharedBytes += length + } else { + out.PrivateBytes += length + } + nextStart = extent.Logical + extent.Length + if extent.Flags&snapshotMemoryFiemapExtentLast != 0 { + last = true + } + } + if last || nextStart <= start { + return out, nil + } + start = nextStart + } +} + +func isFiemapUnsupported(err error) bool { + return errors.Is(err, unix.ENOTTY) || + errors.Is(err, unix.EOPNOTSUPP) || + errors.Is(err, unix.ENOTSUP) || + errors.Is(err, unix.EINVAL) || + errors.Is(err, unix.ENOSYS) || + errors.Is(err, unix.EPERM) +} diff --git a/lib/instances/snapshot_memory_sharing_unsupported.go b/lib/instances/snapshot_memory_sharing_unsupported.go new file mode 100644 index 00000000..4d1c3b06 --- /dev/null +++ b/lib/instances/snapshot_memory_sharing_unsupported.go @@ -0,0 +1,7 @@ +//go:build !linux + +package instances + +func inspectSnapshotMemorySharing(string) (snapshotMemorySharing, error) { + return snapshotMemorySharing{Unknown: true}, nil +}