Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/dotc1z/engine/pebble/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ func (e *Engine) Save(ctx context.Context, dest string) error {
return errors.New("pebble engine: Save requires the dotc1z.Save shim (envelope write); use CheckpointTo for direct directory access")
}

// DB returns the underlying *pebble.DB. Exported for the
// synccompactor/pebble package; callers must not Close it directly
// (use Engine.Close) and must respect the engine's lifecycle. Returns
// nil after Close.
func (e *Engine) DB() *pebble.DB { return e.db }

// CheckpointTo writes a self-contained Pebble directory snapshot to
// destDir. destDir must not exist yet. Pebble creates it and
// hard-links SSTs where possible.
Expand Down
61 changes: 47 additions & 14 deletions pkg/dotc1z/engine/pebble/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
idxGrantByNeedsExpansion byte = 0x05
)

// --- Grant ---

// encodeGrantKey returns the primary key for a grant.
//
// v3 | typeGrant | sync_id_bytes | external_id
Expand Down Expand Up @@ -199,9 +201,6 @@ func encodeResourceByParentPrefix(syncIDBytes []byte, parentRT, parentID string)

// --- Entitlement ---

// encodeEntitlementKey:
//
// v3 | typeEntitlement | sync_id_bytes | external_id
func encodeEntitlementKey(syncIDBytes []byte, externalID string) []byte {
buf := make([]byte, 0, 5+len(syncIDBytes)+len(externalID))
buf = append(buf, versionV3, typeEntitlement)
Expand All @@ -218,7 +217,6 @@ func encodeEntitlementPrefix(syncIDBytes []byte) []byte {
return buf
}

// encodeEntitlementByResourceIndexKey: index of entitlements-on-resource.
func encodeEntitlementByResourceIndexKey(syncIDBytes []byte, resourceTypeID, resourceID, externalID string) []byte {
buf := make([]byte, 0, 64)
buf = append(buf, versionV3, typeIndex, idxEntitlementByResource)
Expand Down Expand Up @@ -246,9 +244,6 @@ func encodeEntitlementByResourcePrefix(syncIDBytes []byte, resourceTypeID, resou

// --- Asset ---

// encodeAssetKey:
//
// v3 | typeAsset | sync_id_bytes | external_id
func encodeAssetKey(syncIDBytes []byte, externalID string) []byte {
buf := make([]byte, 0, 5+len(syncIDBytes)+len(externalID))
buf = append(buf, versionV3, typeAsset)
Expand All @@ -267,24 +262,62 @@ func encodeAssetPrefix(syncIDBytes []byte) []byte {

// --- SyncRun ---

// encodeSyncRunKey:
//
// v3 | typeSyncRun | sync_id_bytes
//
// SyncRunRecord has only sync_id as primary key.
func encodeSyncRunKey(syncIDBytes []byte) []byte {
buf := make([]byte, 0, 2+len(syncIDBytes))
buf = append(buf, versionV3, typeSyncRun)
buf = append(buf, syncIDBytes...)
return buf
}

// encodeSyncRunFullPrefix returns the prefix for iterating ALL sync
// runs across the engine (no sync_id constraint).
func encodeSyncRunFullPrefix() []byte {
return []byte{versionV3, typeSyncRun}
}

// --- Exported bound helpers for synccompactor/pebble ---

// GrantSyncLowerBound returns the lowest key in the grant primary
// bucket for a given sync. Together with GrantSyncUpperBound it gives
// the half-open [lo, hi) range covering every grant under that sync.
// Exported for the synccompactor/pebble package; not part of the
// stable public API of the engine.
func GrantSyncLowerBound(syncIDBytes []byte) []byte {
return encodeGrantPrefix(syncIDBytes)
}

// GrantSyncUpperBound returns the exclusive upper bound for the
// grant primary bucket under a sync.
func GrantSyncUpperBound(syncIDBytes []byte) []byte {
return upperBoundOf(encodeGrantPrefix(syncIDBytes))
}

// GrantByEntitlementSyncLowerBound returns the lowest key in the
// by_entitlement index bucket for a given sync.
func GrantByEntitlementSyncLowerBound(syncIDBytes []byte) []byte {
buf := make([]byte, 0, 3+len(syncIDBytes))
buf = append(buf, versionV3, typeIndex, idxGrantByEntitlement)
buf = append(buf, syncIDBytes...)
return buf
}

// GrantByEntitlementSyncUpperBound is the exclusive upper bound.
func GrantByEntitlementSyncUpperBound(syncIDBytes []byte) []byte {
return upperBoundOf(GrantByEntitlementSyncLowerBound(syncIDBytes))
}

// GrantByPrincipalSyncLowerBound returns the lowest key in the
// by_principal index bucket for a given sync.
func GrantByPrincipalSyncLowerBound(syncIDBytes []byte) []byte {
buf := make([]byte, 0, 3+len(syncIDBytes))
buf = append(buf, versionV3, typeIndex, idxGrantByPrincipal)
buf = append(buf, syncIDBytes...)
return buf
}

// GrantByPrincipalSyncUpperBound is the exclusive upper bound.
func GrantByPrincipalSyncUpperBound(syncIDBytes []byte) []byte {
return upperBoundOf(GrantByPrincipalSyncLowerBound(syncIDBytes))
}

// upperBoundOf returns the smallest key strictly greater than every
// key with the given prefix. Used as the UpperBound in pebble.IterOptions
// for range scans. Increments the last byte; if the prefix is all
Expand Down
50 changes: 50 additions & 0 deletions pkg/synccompactor/pebble/bucket_plans.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//go:build batonsdkv2

package pebble

import (
enginepkg "github.com/conductorone/baton-sdk/pkg/dotc1z/engine/pebble"
)

// bucketPlan describes one contiguous key range to compact. The v3 key
// layout puts the record-type byte (and, for indexes, the index
// discriminator) ahead of the sync_id, so a single sync_id occupies one
// sub-range per record-type bucket. We emit one SST + one excise span
// per bucket.
type bucketPlan struct {
name string
lower []byte
upper []byte
}

// buildBucketPlans returns the set of (lower, upper) excise spans that
// together cover every key in the engine that belongs to a single
// sync_id. The order is fixed and deterministic so logs and tests are
// stable.
//
// Stack 4 MVP covers the GrantRecord buckets implemented by Stack 3:
// - grant primary records
// - grant_by_entitlement index
// - grant_by_principal index
//
// Other record-type buckets are added in the same commit series as
// their Stack 3 implementations.
func buildBucketPlans(syncIDBytes []byte) []bucketPlan {
return []bucketPlan{
{
name: "grant",
lower: enginepkg.GrantSyncLowerBound(syncIDBytes),
upper: enginepkg.GrantSyncUpperBound(syncIDBytes),
},
{
name: "grant_by_entitlement",
lower: enginepkg.GrantByEntitlementSyncLowerBound(syncIDBytes),
upper: enginepkg.GrantByEntitlementSyncUpperBound(syncIDBytes),
},
{
name: "grant_by_principal",
lower: enginepkg.GrantByPrincipalSyncLowerBound(syncIDBytes),
upper: enginepkg.GrantByPrincipalSyncUpperBound(syncIDBytes),
},
}
}
210 changes: 210 additions & 0 deletions pkg/synccompactor/pebble/compactor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
//go:build batonsdkv2

// Package pebble implements the cross-engine compaction primitive for
// the v3 Pebble-backed storage engine (RFC 0004 §3.10, Stack 4).
//
// The compactor takes a `base` engine and one or more `applied`
// engines (each representing a sync_run worth of records) and atomically
// merges `applied`'s data into `base` using pebble.DB.IngestAndExcise:
//
// 1. For each applied engine, iterate its records in the source
// engine's key order and accumulate them into an SST file on disk.
// 2. Call base.DB.IngestAndExcise(paths, exciseSpan) with the new
// SST and the key range covering the applied sync_id. Pebble
// atomically:
// (a) excises every key in [exciseSpan.Start, exciseSpan.End)
// from base — old sync_id rows go away in one shot.
// (b) ingests the SST as a new L6 file (or flushable, depending
// on Pebble's choice) under that range.
//
// The net effect is a byte-level merge: zero proto encode/decode per
// record, zero LSM compaction churn from a record-by-record Put loop.
//
// Bound: each Compact call replaces exactly one sync_id range in the
// destination. Multi-sync rollup is a higher-level loop that Compact
// in sequence.
package pebble

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"

"github.com/cockroachdb/pebble"

enginepkg "github.com/conductorone/baton-sdk/pkg/dotc1z/engine/pebble"
"github.com/conductorone/baton-sdk/pkg/dotc1z/engine/pebble/codec"
)

// ErrEmptySync is returned when Compact is called with a sync_id that
// has no records in the source engine. The caller can treat this as a
// no-op or as an error depending on context.
var ErrEmptySync = errors.New("synccompactor/pebble: source has no records under the given sync_id")

// Compactor merges sync_run data between two Pebble engines using
// IngestAndExcise. Reusable across many Compact calls; safe for
// sequential use only (not concurrent).
type Compactor struct {
base *enginepkg.Engine
tmpDir string
}

// NewCompactor builds a compactor that writes its merge into base. The
// tmpDir is where intermediate SST files are written; it must be on
// the same filesystem as the engine's data directory so Pebble can
// hard-link (a different FS would force a copy and break atomicity).
// If tmpDir is empty, os.TempDir is used (acceptable for tests but not
// production).
func NewCompactor(base *enginepkg.Engine, tmpDir string) (*Compactor, error) {
if base == nil {
return nil, errors.New("synccompactor/pebble: base engine is nil")
}
if tmpDir == "" {
tmpDir = os.TempDir()
}
if err := os.MkdirAll(tmpDir, 0o755); err != nil {
return nil, fmt.Errorf("synccompactor/pebble: mkdir tmpDir: %w", err)
}
return &Compactor{base: base, tmpDir: tmpDir}, nil
}

// Compact merges all records belonging to syncID in `source` into the
// base engine. Any pre-existing data in base under that syncID is
// excised before the new data is ingested — base ends up with exactly
// source's view of that sync.
//
// The operation is atomic from base's perspective: a concurrent reader
// either sees only the old data or only the new data, never a mixture.
//
// Caller is responsible for quiescing writes to `source` for the
// duration of the call (an active syncer would invalidate the SST as
// it's being built).
func (c *Compactor) Compact(ctx context.Context, source *enginepkg.Engine, syncID string) error {
if source == nil {
return errors.New("synccompactor/pebble.Compact: source engine is nil")
}
if syncID == "" {
return errors.New("synccompactor/pebble.Compact: syncID is required")
}

srcDB := source.DB()
if srcDB == nil {
return errors.New("synccompactor/pebble.Compact: source engine has no DB (closed?)")
}
dstDB := c.base.DB()
if dstDB == nil {
return errors.New("synccompactor/pebble.Compact: base engine has no DB (closed?)")
}

syncIDBytes, err := codec.EncodeSyncID(syncID)
if err != nil {
return fmt.Errorf("synccompactor/pebble.Compact: encode syncID: %w", err)
}

// The full key range that belongs to this sync across all record
// types and indexes. The v3 key layout puts sync_id as the third
// byte tuple component within each type bucket, so a single
// (sync_id_lower, sync_id_upper) range does not cover *all* of
// them at once. Instead we walk each record-type bucket and emit
// one SST per bucket, exciseing each bucket's [sync_lo, sync_hi).
//
// For Stack 4 MVP we cover the canonical record-type buckets
// already implemented in Stack 3 (the GrantRecord primary +
// by-entitlement + by-principal indexes); other record types ship
// alongside their Stack 3 commits.
plans := buildBucketPlans(syncIDBytes)

type pendingIngest struct {
sstPath string
exciseSpan pebble.KeyRange
keyCount int
}
var pending []pendingIngest

// Always clean up SSTs on the way out; pebble's IngestAndExcise
// links them into its own object store on success, so it's fine to
// remove the path either way (Pebble retains its own ref).
defer func() {
for _, p := range pending {
_ = os.Remove(p.sstPath)
}
}()

totalKeys := 0
for _, plan := range plans {
if err := ctx.Err(); err != nil {
return err
}
iter, err := srcDB.NewIter(&pebble.IterOptions{
LowerBound: plan.lower,
UpperBound: plan.upper,
})
if err != nil {
return fmt.Errorf("compact: source iter for %s: %w", plan.name, err)
}

sstPath := filepath.Join(c.tmpDir, fmt.Sprintf("compact-%s-%x.sst", plan.name, syncIDBytes[:4]))
b, err := buildSSTFromIter(ctx, sstPath, iter)
_ = iter.Close()
if err != nil {
return fmt.Errorf("compact: build SST for %s: %w", plan.name, err)
}

// Empty bucket: nothing to ingest, but we still excise the
// destination's range so that the resulting state matches the
// source exactly (if the source had no grants under this sync,
// the destination's pre-existing grants under this sync must
// also go).
if b.KeyCount() == 0 {
// Tiny excise-only: no SST to ingest but still need to
// clear destination. Pebble doesn't let us IngestAndExcise
// with zero SSTs (the ingest must reference at least one
// file), so we fall back to a RangeDelete + Flush.
_ = os.Remove(sstPath)
if err := dstDB.DeleteRange(plan.lower, plan.upper, pebble.Sync); err != nil {
return fmt.Errorf("compact: excise-only DeleteRange for %s: %w", plan.name, err)
}
continue
}

pending = append(pending, pendingIngest{
sstPath: sstPath,
exciseSpan: pebble.KeyRange{
Start: plan.lower,
End: plan.upper,
},
keyCount: b.KeyCount(),
})
totalKeys += b.KeyCount()
}

if len(pending) == 0 && totalKeys == 0 {
// Source had nothing for this sync. Destination's pre-existing
// data (if any) was cleared via DeleteRange in the empty
// branches above. Return ErrEmptySync so callers can
// distinguish a "true no-op" from a successful merge.
return ErrEmptySync
}

// One IngestAndExcise per bucket. We can't bundle them into a
// single call because each bucket has a distinct excise span and
// Pebble takes only one span per call.
for _, p := range pending {
if err := ctx.Err(); err != nil {
return err
}
if _, err := dstDB.IngestAndExcise(
ctx,
[]string{p.sstPath},
nil, // no shared SSTs
nil, // no external SSTs
p.exciseSpan,
); err != nil {
return fmt.Errorf("compact: IngestAndExcise: %w", err)
}
}

return nil
}
Loading
Loading