From 4233ede0b0966fc634107b9520976ec620b27a9b Mon Sep 17 00:00:00 2001 From: Scott Walkinshaw Date: Sat, 4 Apr 2026 10:50:24 -0500 Subject: [PATCH 1/5] Add DB-driven R2 sync function New deploy.Sync() replaces the filesystem-based SyncToR2 approach: - Queries packages where content_hash != deployed_hash - Serializes Composer p2 JSON from DB via composer.SerializePackage() - Uploads tagged + dev files to R2 in parallel (50 goroutines) - Deletes p2 files for deactivated packages (fixes existing gap where closures detected by check-status left orphaned files on R2) - Conditionally uploads packages.json (skips if ETag matches) - Stamps deployed_hash after successful upload Also adds deleteObjectWithRetry and headObject helpers to r2.go. Integration test (db_sync_test.go) validates full cycle against gofakes3: first sync uploads all, second sync is idempotent, deactivation triggers deletion. Co-Authored-By: Claude Opus 4.6 --- internal/deploy/r2.go | 41 +++++ internal/deploy/sync.go | 222 +++++++++++++++++++++++++++ internal/integration/db_sync_test.go | 208 +++++++++++++++++++++++++ 3 files changed, 471 insertions(+) create mode 100644 internal/deploy/sync.go create mode 100644 internal/integration/db_sync_test.go diff --git a/internal/deploy/r2.go b/internal/deploy/r2.go index 0119a3c..f531229 100644 --- a/internal/deploy/r2.go +++ b/internal/deploy/r2.go @@ -162,6 +162,47 @@ func CacheControlForPath(path string) string { return "public, max-age=300" } +// deleteObjectWithRetry deletes a single object from R2 with exponential backoff retry. +func deleteObjectWithRetry(ctx context.Context, client *s3.Client, bucket, key string, logger *slog.Logger) error { + var lastErr error + for attempt := range r2MaxRetries { + if attempt > 0 { + delay := time.Duration(float64(r2RetryBaseMs)*math.Pow(2, float64(attempt-1))) * time.Millisecond + logger.Warn("retrying R2 delete", "key", key, "attempt", attempt+1, "delay", delay) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + } + } + + _, lastErr = client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if lastErr == nil { + return nil + } + } + return fmt.Errorf("deleting %s after %d attempts: %w", key, r2MaxRetries, lastErr) +} + +// headObject returns the ETag of an object, or "" if the object doesn't exist. +func headObject(ctx context.Context, client *s3.Client, bucket, key string) (string, error) { + resp, err := client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + // NoSuchKey or similar — object doesn't exist + return "", nil + } + if resp.ETag != nil { + return *resp.ETag, nil + } + return "", nil +} + func newS3Client(cfg config.R2Config) *s3.Client { return s3.New(s3.Options{ Region: "auto", diff --git a/internal/deploy/sync.go b/internal/deploy/sync.go new file mode 100644 index 0000000..3cd0efc --- /dev/null +++ b/internal/deploy/sync.go @@ -0,0 +1,222 @@ +package deploy + +import ( + "context" + "crypto/md5" + "database/sql" + "fmt" + "log/slog" + "sync/atomic" + "time" + + "golang.org/x/sync/errgroup" + + "github.com/roots/wp-packages/internal/composer" + "github.com/roots/wp-packages/internal/config" +) + +// SyncResult holds statistics from a DB-driven R2 sync. +type SyncResult struct { + Uploaded int64 + Deleted int64 + Skipped int64 + Duration time.Duration +} + +// Sync uploads changed packages from the database to R2. +// +// It queries for packages where content_hash != deployed_hash, serializes +// them into Composer p2 JSON files, uploads to R2 in parallel, deletes +// p2 files for deactivated packages, conditionally uploads packages.json, +// and stamps deployed_hash on success. +func Sync(ctx context.Context, db *sql.DB, cfg config.R2Config, appURL string, logger *slog.Logger) (*SyncResult, error) { + started := time.Now() + client := newS3Client(cfg) + + var uploaded, deleted, skipped atomic.Int64 + + // Step 1: Upload changed p2/ files + rows, err := db.QueryContext(ctx, ` + SELECT id, type, name, versions_json, content_hash, + description, homepage, author, last_committed, trunk_revision + FROM packages + WHERE is_active = 1 + AND content_hash IS NOT NULL + AND (deployed_hash IS NULL OR content_hash != deployed_hash)`) + if err != nil { + return nil, fmt.Errorf("querying dirty packages: %w", err) + } + + type dirtyPkg struct { + id int64 + pkgType, name string + versionsJSON string + contentHash string + meta composer.PackageMeta + } + + var dirty []dirtyPkg + for rows.Next() { + var p dirtyPkg + var description, homepage, author, lastCommitted *string + var trunkRevision *int64 + + if err := rows.Scan(&p.id, &p.pkgType, &p.name, &p.versionsJSON, &p.contentHash, + &description, &homepage, &author, &lastCommitted, &trunkRevision); err != nil { + _ = rows.Close() + return nil, fmt.Errorf("scanning dirty package: %w", err) + } + + if description != nil { + p.meta.Description = *description + } + if homepage != nil { + p.meta.Homepage = *homepage + } + if author != nil { + p.meta.Author = *author + } + if lastCommitted != nil { + p.meta.LastUpdated = *lastCommitted + } + p.meta.TrunkRevision = trunkRevision + dirty = append(dirty, p) + } + if err := rows.Close(); err != nil { + return nil, fmt.Errorf("closing dirty packages query: %w", err) + } + + logger.Info("sync: dirty packages", "count", len(dirty)) + + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(50) + + for _, p := range dirty { + p := p + g.Go(func() error { + composerName := composer.ComposerName(p.pkgType, p.name) + + // Tagged versions file (always) + taggedData, err := composer.SerializePackage(p.pkgType, p.name, p.versionsJSON, p.meta) + if err != nil { + return fmt.Errorf("serializing %s: %w", composerName, err) + } + if taggedData != nil { + key := "p2/" + composerName + ".json" + if err := putObjectWithRetry(gCtx, client, cfg.Bucket, key, taggedData, logger); err != nil { + return fmt.Errorf("uploading %s: %w", key, err) + } + uploaded.Add(1) + } + + // Dev versions file (plugins only) + devData, err := composer.SerializePackage(p.pkgType, p.name+"~dev", p.versionsJSON, p.meta) + if err != nil { + return fmt.Errorf("serializing %s~dev: %w", composerName, err) + } + if devData != nil { + key := "p2/" + composerName + "~dev.json" + if err := putObjectWithRetry(gCtx, client, cfg.Bucket, key, devData, logger); err != nil { + return fmt.Errorf("uploading %s: %w", key, err) + } + uploaded.Add(1) + } + + n := uploaded.Load() + skipped.Load() + if n%500 == 0 && n > 0 { + logger.Info("sync: upload progress", "uploaded", uploaded.Load(), "total_dirty", len(dirty)) + } + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + + // Step 2: Delete p2/ files for deactivated packages + deactivatedRows, err := db.QueryContext(ctx, ` + SELECT type, name FROM packages + WHERE is_active = 0 AND deployed_hash IS NOT NULL`) + if err != nil { + return nil, fmt.Errorf("querying deactivated packages: %w", err) + } + + type deactivatedPkg struct { + pkgType, name string + } + var deactivated []deactivatedPkg + for deactivatedRows.Next() { + var p deactivatedPkg + if err := deactivatedRows.Scan(&p.pkgType, &p.name); err != nil { + _ = deactivatedRows.Close() + return nil, fmt.Errorf("scanning deactivated package: %w", err) + } + deactivated = append(deactivated, p) + } + _ = deactivatedRows.Close() + + for _, p := range deactivated { + composerName := composer.ComposerName(p.pkgType, p.name) + for _, suffix := range []string{".json", "~dev.json"} { + key := "p2/" + composerName + suffix + if err := deleteObjectWithRetry(ctx, client, cfg.Bucket, key, logger); err != nil { + logger.Warn("sync: failed to delete deactivated package file", "key", key, "error", err) + continue + } + } + deleted.Add(1) + logger.Info("sync: deleted deactivated package", "package", composerName) + } + + // Step 3: Conditional packages.json upload + packagesData, err := composer.PackagesJSON(appURL) + if err != nil { + return nil, fmt.Errorf("generating packages.json: %w", err) + } + + currentETag, _ := headObject(ctx, client, cfg.Bucket, r2IndexFile) + newETag := fmt.Sprintf(`"%x"`, md5.Sum(packagesData)) + if currentETag != newETag { + if err := putObjectWithRetry(ctx, client, cfg.Bucket, r2IndexFile, packagesData, logger); err != nil { + return nil, fmt.Errorf("uploading packages.json: %w", err) + } + logger.Info("sync: uploaded packages.json") + } else { + logger.Info("sync: packages.json unchanged, skipped") + } + + // Step 4: Stamp deployed_hash + if len(dirty) > 0 { + _, err = db.ExecContext(ctx, ` + UPDATE packages SET deployed_hash = content_hash + WHERE is_active = 1 AND content_hash IS NOT NULL + AND (deployed_hash IS NULL OR content_hash != deployed_hash)`) + if err != nil { + return nil, fmt.Errorf("stamping deployed_hash: %w", err) + } + } + + if len(deactivated) > 0 { + _, err = db.ExecContext(ctx, ` + UPDATE packages SET deployed_hash = NULL + WHERE is_active = 0 AND deployed_hash IS NOT NULL`) + if err != nil { + return nil, fmt.Errorf("clearing deployed_hash for deactivated: %w", err) + } + } + + result := &SyncResult{ + Uploaded: uploaded.Load(), + Deleted: deleted.Load(), + Skipped: skipped.Load(), + Duration: time.Since(started), + } + + logger.Info("sync: complete", + "uploaded", result.Uploaded, + "deleted", result.Deleted, + "duration", result.Duration.String(), + ) + return result, nil +} diff --git a/internal/integration/db_sync_test.go b/internal/integration/db_sync_test.go new file mode 100644 index 0000000..3e62d77 --- /dev/null +++ b/internal/integration/db_sync_test.go @@ -0,0 +1,208 @@ +//go:build integration + +package integration + +import ( + "context" + "database/sql" + "encoding/json" + "io" + "net/http/httptest" + "path/filepath" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/johannesboyne/gofakes3" + "github.com/johannesboyne/gofakes3/backend/s3mem" + "github.com/roots/wp-packages/internal/composer" + "github.com/roots/wp-packages/internal/config" + "github.com/roots/wp-packages/internal/deploy" + "github.com/roots/wp-packages/internal/packages" + "github.com/roots/wp-packages/internal/testutil" + "github.com/roots/wp-packages/internal/wporg" +) + +// backfillContentHashes computes content_hash for all packages that don't have one. +func backfillContentHashes(t *testing.T, database *sql.DB) { + t.Helper() + ctx := context.Background() + + pkgs, err := packages.GetPackagesNeedingUpdate(ctx, database, packages.UpdateQueryOpts{ + Force: true, + Type: "all", + }) + if err != nil { + t.Fatalf("getting packages for hash backfill: %v", err) + } + + for _, p := range pkgs { + hash := composer.HashVersions(p.VersionsJSON, p.TrunkRevision) + _, err := database.ExecContext(ctx, + `UPDATE packages SET content_hash = ? WHERE id = ?`, hash, p.ID) + if err != nil { + t.Fatalf("backfilling hash for %s/%s: %v", p.Type, p.Name, err) + } + } +} + +func TestDBDrivenSync(t *testing.T) { + ctx := context.Background() + + // 1. Seed DB from fixtures + fixtureDir := filepath.Join("..", "wporg", "testdata") + mock := wporg.NewMockServer(fixtureDir) + defer mock.Close() + + db := testutil.OpenTestDB(t) + testutil.SeedFromFixtures(t, db, mock.URL) + backfillContentHashes(t, db) + + // 2. Start gofakes3 in-process + backend := s3mem.New() + faker := gofakes3.New(backend) + ts := httptest.NewServer(faker.Server()) + defer ts.Close() + + s3Client := newTestS3Client(ts.URL) + _, err := s3Client.CreateBucket(ctx, &s3.CreateBucketInput{ + Bucket: aws.String("test-bucket"), + }) + if err != nil { + t.Fatalf("creating bucket: %v", err) + } + + r2Cfg := config.R2Config{ + AccessKeyID: "test", + SecretAccessKey: "test", + Bucket: "test-bucket", + Endpoint: ts.URL, + } + + // 3. First sync — all packages should be uploaded + result, err := deploy.Sync(ctx, db, r2Cfg, "http://test.local", testLogger(t)) + if err != nil { + t.Fatalf("first sync failed: %v", err) + } + if result.Uploaded == 0 { + t.Error("expected uploads on first sync") + } + t.Logf("first sync: uploaded=%d deleted=%d", result.Uploaded, result.Deleted) + + // Verify packages.json exists and is valid + rootObj, err := s3Client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String("test-bucket"), + Key: aws.String("packages.json"), + }) + if err != nil { + t.Fatalf("packages.json not found after sync: %v", err) + } + rootData, _ := io.ReadAll(rootObj.Body) + _ = rootObj.Body.Close() + + var rootJSON map[string]any + if err := json.Unmarshal(rootData, &rootJSON); err != nil { + t.Fatalf("invalid packages.json: %v", err) + } + if _, ok := rootJSON["metadata-url"]; !ok { + t.Error("packages.json missing metadata-url") + } + + // Verify p2/ files exist for a known plugin + p2Key := "p2/wp-plugin/akismet.json" + p2Obj, err := s3Client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String("test-bucket"), + Key: aws.String(p2Key), + }) + if err != nil { + t.Fatalf("p2 file %s not found: %v", p2Key, err) + } + p2Data, _ := io.ReadAll(p2Obj.Body) + _ = p2Obj.Body.Close() + + // Verify p2 content is valid Composer JSON + var p2JSON map[string]any + if err := json.Unmarshal(p2Data, &p2JSON); err != nil { + t.Fatalf("invalid p2 JSON for %s: %v", p2Key, err) + } + pkgsField, ok := p2JSON["packages"].(map[string]any) + if !ok { + t.Fatalf("p2 file missing 'packages' key") + } + if _, ok := pkgsField["wp-plugin/akismet"]; !ok { + t.Error("p2 file missing wp-plugin/akismet entry") + } + + // Verify ~dev.json exists for plugins + devKey := "p2/wp-plugin/akismet~dev.json" + devObj, err := s3Client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String("test-bucket"), + Key: aws.String(devKey), + }) + if err != nil { + t.Fatalf("dev file %s not found: %v", devKey, err) + } + _ = devObj.Body.Close() + + // Verify deployed_hash is stamped (no dirty packages remain) + var dirtyCount int + err = db.QueryRowContext(ctx, + `SELECT COUNT(*) FROM packages WHERE is_active = 1 + AND content_hash IS NOT NULL + AND (deployed_hash IS NULL OR content_hash != deployed_hash)`).Scan(&dirtyCount) + if err != nil { + t.Fatalf("counting dirty packages: %v", err) + } + if dirtyCount != 0 { + t.Errorf("expected 0 dirty packages after sync, got %d", dirtyCount) + } + + // 4. Second sync — idempotent, nothing to upload + result2, err := deploy.Sync(ctx, db, r2Cfg, "http://test.local", testLogger(t)) + if err != nil { + t.Fatalf("second sync failed: %v", err) + } + if result2.Uploaded != 0 { + t.Errorf("expected 0 uploads on idempotent sync, got %d", result2.Uploaded) + } + + // 5. Deactivate a package, sync again, verify deletion + var akismetID int64 + err = db.QueryRowContext(ctx, + `SELECT id FROM packages WHERE type='plugin' AND name='akismet'`).Scan(&akismetID) + if err != nil { + t.Fatalf("finding akismet: %v", err) + } + + if err := packages.DeactivatePackage(ctx, db, akismetID); err != nil { + t.Fatalf("deactivating akismet: %v", err) + } + + result3, err := deploy.Sync(ctx, db, r2Cfg, "http://test.local", testLogger(t)) + if err != nil { + t.Fatalf("sync after deactivation failed: %v", err) + } + if result3.Deleted == 0 { + t.Error("expected deletions after deactivating akismet") + } + + // Verify akismet p2 files are gone from R2 + _, err = s3Client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String("test-bucket"), + Key: aws.String(p2Key), + }) + if err == nil { + t.Error("akismet.json should have been deleted from R2") + } + + // Verify deployed_hash is cleared for deactivated package + var deployedHash *string + err = db.QueryRowContext(ctx, + `SELECT deployed_hash FROM packages WHERE id = ?`, akismetID).Scan(&deployedHash) + if err != nil { + t.Fatalf("checking deployed_hash: %v", err) + } + if deployedHash != nil { + t.Error("deployed_hash should be NULL for deactivated package") + } +} From b83266821baf4fdd75828e1c3505b6ea4d81b35e Mon Sep 17 00:00:00 2001 From: Scott Walkinshaw Date: Sat, 4 Apr 2026 11:03:07 -0500 Subject: [PATCH 2/5] Extract shared withRetry helper for R2 operations Deduplicates the retry loop from putObjectWithRetry and deleteObjectWithRetry into a single withRetry function with exponential backoff and context cancellation. Co-Authored-By: Claude Opus 4.6 --- internal/deploy/r2.go | 54 +++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 30 deletions(-) diff --git a/internal/deploy/r2.go b/internal/deploy/r2.go index f531229..421b7f3 100644 --- a/internal/deploy/r2.go +++ b/internal/deploy/r2.go @@ -103,35 +103,43 @@ func SyncToR2(ctx context.Context, cfg config.R2Config, buildDir, buildID, previ return nil } -// putObjectWithRetry uploads a single file to R2 with exponential backoff retry. -func putObjectWithRetry(ctx context.Context, client *s3.Client, bucket, key string, data []byte, logger *slog.Logger) error { - contentType := "application/json" - cacheControl := CacheControlForPath(key) - +// withRetry executes fn up to r2MaxRetries times with exponential backoff. +// The label is used in log messages to identify the operation. +func withRetry(ctx context.Context, logger *slog.Logger, label string, fn func() error) error { var lastErr error for attempt := range r2MaxRetries { if attempt > 0 { delay := time.Duration(float64(r2RetryBaseMs)*math.Pow(2, float64(attempt-1))) * time.Millisecond - logger.Warn("retrying R2 upload", "key", key, "attempt", attempt+1, "delay", delay) + logger.Warn("retrying R2 operation", "op", label, "attempt", attempt+1, "delay", delay) select { case <-ctx.Done(): return ctx.Err() case <-time.After(delay): } } + lastErr = fn() + if lastErr == nil { + return nil + } + } + return fmt.Errorf("%s after %d attempts: %w", label, r2MaxRetries, lastErr) +} - _, lastErr = client.PutObject(ctx, &s3.PutObjectInput{ +// putObjectWithRetry uploads a single file to R2 with exponential backoff retry. +func putObjectWithRetry(ctx context.Context, client *s3.Client, bucket, key string, data []byte, logger *slog.Logger) error { + contentType := "application/json" + cacheControl := CacheControlForPath(key) + + return withRetry(ctx, logger, "uploading "+key, func() error { + _, err := client.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(bucket), Key: aws.String(key), Body: bytes.NewReader(data), ContentType: aws.String(contentType), CacheControl: aws.String(cacheControl), }) - if lastErr == nil { - return nil - } - } - return fmt.Errorf("uploading %s after %d attempts: %w", key, r2MaxRetries, lastErr) + return err + }) } // fileUnchanged returns true if relPath exists in both directories with identical content. @@ -164,27 +172,13 @@ func CacheControlForPath(path string) string { // deleteObjectWithRetry deletes a single object from R2 with exponential backoff retry. func deleteObjectWithRetry(ctx context.Context, client *s3.Client, bucket, key string, logger *slog.Logger) error { - var lastErr error - for attempt := range r2MaxRetries { - if attempt > 0 { - delay := time.Duration(float64(r2RetryBaseMs)*math.Pow(2, float64(attempt-1))) * time.Millisecond - logger.Warn("retrying R2 delete", "key", key, "attempt", attempt+1, "delay", delay) - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(delay): - } - } - - _, lastErr = client.DeleteObject(ctx, &s3.DeleteObjectInput{ + return withRetry(ctx, logger, "deleting "+key, func() error { + _, err := client.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: aws.String(bucket), Key: aws.String(key), }) - if lastErr == nil { - return nil - } - } - return fmt.Errorf("deleting %s after %d attempts: %w", key, r2MaxRetries, lastErr) + return err + }) } // headObject returns the ETag of an object, or "" if the object doesn't exist. From 27cf78f010428fd5d246431c4b63a843f0ddc9c2 Mon Sep 17 00:00:00 2001 From: Scott Walkinshaw Date: Sat, 4 Apr 2026 11:30:37 -0500 Subject: [PATCH 3/5] Make R2 upload concurrency configurable Add Concurrency field to R2Config (default 50) instead of hardcoding the errgroup limit in SyncToR2 and Sync. Follows the same pattern as DiscoveryConfig.Concurrency. Co-Authored-By: Claude Opus 4.6 --- internal/config/config.go | 2 ++ internal/deploy/r2.go | 2 +- internal/deploy/sync.go | 2 +- internal/integration/db_sync_test.go | 1 + internal/integration/sync_test.go | 1 + 5 files changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index f479d35..cede96f 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -40,6 +40,7 @@ type R2Config struct { Bucket string `yaml:"bucket"` Endpoint string `yaml:"endpoint"` Enabled bool `yaml:"enabled"` + Concurrency int `yaml:"concurrency"` CDNBucket string `yaml:"cdn_bucket"` CDNPublicURL string `yaml:"cdn_public_url"` } @@ -74,6 +75,7 @@ func defaults() *Config { Server: ServerConfig{ Addr: ":8080", }, + R2: R2Config{Concurrency: 50}, Session: SessionConfig{LifetimeMinutes: 7200}, Telemetry: TelemetryConfig{ DedupeWindowSeconds: 3600, diff --git a/internal/deploy/r2.go b/internal/deploy/r2.go index 421b7f3..b239a4a 100644 --- a/internal/deploy/r2.go +++ b/internal/deploy/r2.go @@ -58,7 +58,7 @@ func SyncToR2(ctx context.Context, cfg config.R2Config, buildDir, buildID, previ // Upload p2/ files in parallel, packages.json last. var uploaded, skipped atomic.Int64 g, gCtx := errgroup.WithContext(ctx) - g.SetLimit(50) + g.SetLimit(cfg.Concurrency) for _, relPath := range filePaths { relPath := relPath diff --git a/internal/deploy/sync.go b/internal/deploy/sync.go index 3cd0efc..7f1c5c2 100644 --- a/internal/deploy/sync.go +++ b/internal/deploy/sync.go @@ -89,7 +89,7 @@ func Sync(ctx context.Context, db *sql.DB, cfg config.R2Config, appURL string, l logger.Info("sync: dirty packages", "count", len(dirty)) g, gCtx := errgroup.WithContext(ctx) - g.SetLimit(50) + g.SetLimit(cfg.Concurrency) for _, p := range dirty { p := p diff --git a/internal/integration/db_sync_test.go b/internal/integration/db_sync_test.go index 3e62d77..3d9c0a1 100644 --- a/internal/integration/db_sync_test.go +++ b/internal/integration/db_sync_test.go @@ -77,6 +77,7 @@ func TestDBDrivenSync(t *testing.T) { SecretAccessKey: "test", Bucket: "test-bucket", Endpoint: ts.URL, + Concurrency: 1, } // 3. First sync — all packages should be uploaded diff --git a/internal/integration/sync_test.go b/internal/integration/sync_test.go index 10dea36..6bcd2cb 100644 --- a/internal/integration/sync_test.go +++ b/internal/integration/sync_test.go @@ -66,6 +66,7 @@ func TestR2Sync(t *testing.T) { SecretAccessKey: "test", Bucket: "test-bucket", Endpoint: ts.URL, + Concurrency: 1, } // 4. First sync — all packages uploaded From 371b01a4d01781c6c90efe04706e6693af3061db Mon Sep 17 00:00:00 2001 From: Scott Walkinshaw Date: Sat, 4 Apr 2026 11:36:44 -0500 Subject: [PATCH 4/5] Add ComposerMeta constructor and extract DB queries from sync Add Package.ComposerMeta() to centralize the nullable-field-to-PackageMeta mapping that was duplicated across sync.go, composer.go, and builder.go. Move dirty package and deactivated package queries into the packages package as GetDirtyPackages() and GetDeactivatedDeployedPackages(), keeping DB access out of the deploy layer. Co-Authored-By: Claude Opus 4.6 --- internal/deploy/sync.go | 75 ++++-------------------------------- internal/packages/package.go | 73 +++++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 67 deletions(-) diff --git a/internal/deploy/sync.go b/internal/deploy/sync.go index 7f1c5c2..a5f56b0 100644 --- a/internal/deploy/sync.go +++ b/internal/deploy/sync.go @@ -13,6 +13,7 @@ import ( "github.com/roots/wp-packages/internal/composer" "github.com/roots/wp-packages/internal/config" + "github.com/roots/wp-packages/internal/packages" ) // SyncResult holds statistics from a DB-driven R2 sync. @@ -36,56 +37,11 @@ func Sync(ctx context.Context, db *sql.DB, cfg config.R2Config, appURL string, l var uploaded, deleted, skipped atomic.Int64 // Step 1: Upload changed p2/ files - rows, err := db.QueryContext(ctx, ` - SELECT id, type, name, versions_json, content_hash, - description, homepage, author, last_committed, trunk_revision - FROM packages - WHERE is_active = 1 - AND content_hash IS NOT NULL - AND (deployed_hash IS NULL OR content_hash != deployed_hash)`) + dirty, err := packages.GetDirtyPackages(ctx, db) if err != nil { return nil, fmt.Errorf("querying dirty packages: %w", err) } - type dirtyPkg struct { - id int64 - pkgType, name string - versionsJSON string - contentHash string - meta composer.PackageMeta - } - - var dirty []dirtyPkg - for rows.Next() { - var p dirtyPkg - var description, homepage, author, lastCommitted *string - var trunkRevision *int64 - - if err := rows.Scan(&p.id, &p.pkgType, &p.name, &p.versionsJSON, &p.contentHash, - &description, &homepage, &author, &lastCommitted, &trunkRevision); err != nil { - _ = rows.Close() - return nil, fmt.Errorf("scanning dirty package: %w", err) - } - - if description != nil { - p.meta.Description = *description - } - if homepage != nil { - p.meta.Homepage = *homepage - } - if author != nil { - p.meta.Author = *author - } - if lastCommitted != nil { - p.meta.LastUpdated = *lastCommitted - } - p.meta.TrunkRevision = trunkRevision - dirty = append(dirty, p) - } - if err := rows.Close(); err != nil { - return nil, fmt.Errorf("closing dirty packages query: %w", err) - } - logger.Info("sync: dirty packages", "count", len(dirty)) g, gCtx := errgroup.WithContext(ctx) @@ -94,10 +50,11 @@ func Sync(ctx context.Context, db *sql.DB, cfg config.R2Config, appURL string, l for _, p := range dirty { p := p g.Go(func() error { - composerName := composer.ComposerName(p.pkgType, p.name) + composerName := composer.ComposerName(p.Type, p.Name) + meta := p.ComposerMeta() // Tagged versions file (always) - taggedData, err := composer.SerializePackage(p.pkgType, p.name, p.versionsJSON, p.meta) + taggedData, err := composer.SerializePackage(p.Type, p.Name, p.VersionsJSON, meta) if err != nil { return fmt.Errorf("serializing %s: %w", composerName, err) } @@ -110,7 +67,7 @@ func Sync(ctx context.Context, db *sql.DB, cfg config.R2Config, appURL string, l } // Dev versions file (plugins only) - devData, err := composer.SerializePackage(p.pkgType, p.name+"~dev", p.versionsJSON, p.meta) + devData, err := composer.SerializePackage(p.Type, p.Name+"~dev", p.VersionsJSON, meta) if err != nil { return fmt.Errorf("serializing %s~dev: %w", composerName, err) } @@ -135,29 +92,13 @@ func Sync(ctx context.Context, db *sql.DB, cfg config.R2Config, appURL string, l } // Step 2: Delete p2/ files for deactivated packages - deactivatedRows, err := db.QueryContext(ctx, ` - SELECT type, name FROM packages - WHERE is_active = 0 AND deployed_hash IS NOT NULL`) + deactivated, err := packages.GetDeactivatedDeployedPackages(ctx, db) if err != nil { return nil, fmt.Errorf("querying deactivated packages: %w", err) } - type deactivatedPkg struct { - pkgType, name string - } - var deactivated []deactivatedPkg - for deactivatedRows.Next() { - var p deactivatedPkg - if err := deactivatedRows.Scan(&p.pkgType, &p.name); err != nil { - _ = deactivatedRows.Close() - return nil, fmt.Errorf("scanning deactivated package: %w", err) - } - deactivated = append(deactivated, p) - } - _ = deactivatedRows.Close() - for _, p := range deactivated { - composerName := composer.ComposerName(p.pkgType, p.name) + composerName := composer.ComposerName(p.Type, p.Name) for _, suffix := range []string{".json", "~dev.json"} { key := "p2/" + composerName + suffix if err := deleteObjectWithRetry(ctx, client, cfg.Bucket, key, logger); err != nil { diff --git a/internal/packages/package.go b/internal/packages/package.go index 0b09431..0719abe 100644 --- a/internal/packages/package.go +++ b/internal/packages/package.go @@ -8,9 +8,28 @@ import ( "strings" "time" + "github.com/roots/wp-packages/internal/composer" "github.com/roots/wp-packages/internal/version" ) +// ComposerMeta builds a composer.PackageMeta from a Package's nullable DB fields. +func (p *Package) ComposerMeta() composer.PackageMeta { + meta := composer.PackageMeta{TrunkRevision: p.TrunkRevision} + if p.Description != nil { + meta.Description = *p.Description + } + if p.Homepage != nil { + meta.Homepage = *p.Homepage + } + if p.Author != nil { + meta.Author = *p.Author + } + if p.LastCommitted != nil { + meta.LastUpdated = p.LastCommitted.Format(time.RFC3339) + } + return meta +} + type Package struct { ID int64 Type string @@ -339,6 +358,60 @@ func GetPackagesNeedingUpdate(ctx context.Context, db *sql.DB, opts UpdateQueryO return pkgs, rows.Err() } +// GetDirtyPackages returns active packages whose content_hash differs from deployed_hash. +func GetDirtyPackages(ctx context.Context, db *sql.DB) ([]*Package, error) { + rows, err := db.QueryContext(ctx, ` + SELECT id, type, name, versions_json, content_hash, + description, homepage, author, last_committed, trunk_revision + FROM packages + WHERE is_active = 1 + AND content_hash IS NOT NULL + AND (deployed_hash IS NULL OR content_hash != deployed_hash)`) + if err != nil { + return nil, fmt.Errorf("querying dirty packages: %w", err) + } + defer func() { _ = rows.Close() }() + + var pkgs []*Package + for rows.Next() { + var p Package + var lastCommitted *string + if err := rows.Scan(&p.ID, &p.Type, &p.Name, &p.VersionsJSON, &p.ContentHash, + &p.Description, &p.Homepage, &p.Author, &lastCommitted, &p.TrunkRevision); err != nil { + return nil, fmt.Errorf("scanning dirty package: %w", err) + } + if lastCommitted != nil { + if t, err := time.Parse(time.RFC3339, *lastCommitted); err == nil { + p.LastCommitted = &t + } + } + pkgs = append(pkgs, &p) + } + return pkgs, rows.Err() +} + +// GetDeactivatedDeployedPackages returns inactive packages that still have a deployed_hash +// (i.e. their p2 files are still on R2 and need to be deleted). +func GetDeactivatedDeployedPackages(ctx context.Context, db *sql.DB) ([]*Package, error) { + rows, err := db.QueryContext(ctx, ` + SELECT id, type, name FROM packages + WHERE is_active = 0 AND deployed_hash IS NOT NULL`) + if err != nil { + return nil, fmt.Errorf("querying deactivated deployed packages: %w", err) + } + defer func() { _ = rows.Close() }() + + var pkgs []*Package + for rows.Next() { + var p Package + if err := rows.Scan(&p.ID, &p.Type, &p.Name); err != nil { + return nil, fmt.Errorf("scanning deactivated package: %w", err) + } + pkgs = append(pkgs, &p) + } + return pkgs, rows.Err() +} + // DeactivatePackage sets is_active = 0 for a package. func DeactivatePackage(ctx context.Context, db *sql.DB, id int64) error { now := time.Now().UTC().Format(time.RFC3339) From 20b5884b309b2e605e9c4353e071978227a327c5 Mon Sep 17 00:00:00 2001 From: Scott Walkinshaw Date: Sat, 4 Apr 2026 11:40:54 -0500 Subject: [PATCH 5/5] Add PackageFiles and R2KeysForPackage to composer package MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PackageFiles encapsulates the tagged/dev file split — callers get back a slice of {Key, Data} without needing to know about ~dev naming, the themes-have-no-dev rule, or trunk-only plugin handling. R2KeysForPackage returns all possible R2 keys for deletion without hardcoding suffix lists at call sites. Simplifies deploy.Sync to a loop over PackageFiles for uploads and R2KeysForPackage for deletions. Co-Authored-By: Claude Opus 4.6 --- internal/composer/serialize.go | 50 ++++++++++++++++++++++++++++++++++ internal/deploy/sync.go | 34 +++++------------------ 2 files changed, 57 insertions(+), 27 deletions(-) diff --git a/internal/composer/serialize.go b/internal/composer/serialize.go index b0a615b..b474355 100644 --- a/internal/composer/serialize.go +++ b/internal/composer/serialize.go @@ -10,6 +10,56 @@ import ( "github.com/roots/wp-packages/internal/version" ) +// PackageFile represents a single R2-uploadable file for a package. +type PackageFile struct { + Key string // R2 object key, e.g. "p2/wp-plugin/akismet.json" + Data []byte +} + +// PackageFiles returns all Composer p2 files that a package produces. +// Plugins produce up to 2 files (tagged + dev), themes produce 1 (tagged only). +// Returns nil if the package has no serializable versions. +func PackageFiles(pkgType, name, versionsJSON string, meta PackageMeta) ([]PackageFile, error) { + composerName := ComposerName(pkgType, name) + var files []PackageFile + + // Tagged versions (always attempted) + tagged, err := SerializePackage(pkgType, name, versionsJSON, meta) + if err != nil { + return nil, err + } + if tagged != nil { + files = append(files, PackageFile{ + Key: "p2/" + composerName + ".json", + Data: tagged, + }) + } + + // Dev versions (plugins only — SerializePackage returns nil for themes) + dev, err := SerializePackage(pkgType, name+"~dev", versionsJSON, meta) + if err != nil { + return nil, err + } + if dev != nil { + files = append(files, PackageFile{ + Key: "p2/" + composerName + "~dev.json", + Data: dev, + }) + } + + return files, nil +} + +// ObjectKeys returns all possible storage keys for a package, +// regardless of whether the files currently exist. Used for deletion. +func ObjectKeys(pkgType, name string) []string { + composerName := ComposerName(pkgType, name) + return []string{ + "p2/" + composerName + ".json", + "p2/" + composerName + "~dev.json", + } +} + // HashVersions computes a content hash over the normalized versions_json and // trunk_revision. trunk_revision is included because it affects the serialized // dev-trunk output (source.reference includes trunk@) even though it's diff --git a/internal/deploy/sync.go b/internal/deploy/sync.go index a5f56b0..02081e8 100644 --- a/internal/deploy/sync.go +++ b/internal/deploy/sync.go @@ -50,31 +50,13 @@ func Sync(ctx context.Context, db *sql.DB, cfg config.R2Config, appURL string, l for _, p := range dirty { p := p g.Go(func() error { - composerName := composer.ComposerName(p.Type, p.Name) - meta := p.ComposerMeta() - - // Tagged versions file (always) - taggedData, err := composer.SerializePackage(p.Type, p.Name, p.VersionsJSON, meta) - if err != nil { - return fmt.Errorf("serializing %s: %w", composerName, err) - } - if taggedData != nil { - key := "p2/" + composerName + ".json" - if err := putObjectWithRetry(gCtx, client, cfg.Bucket, key, taggedData, logger); err != nil { - return fmt.Errorf("uploading %s: %w", key, err) - } - uploaded.Add(1) - } - - // Dev versions file (plugins only) - devData, err := composer.SerializePackage(p.Type, p.Name+"~dev", p.VersionsJSON, meta) + files, err := composer.PackageFiles(p.Type, p.Name, p.VersionsJSON, p.ComposerMeta()) if err != nil { - return fmt.Errorf("serializing %s~dev: %w", composerName, err) + return fmt.Errorf("serializing %s/%s: %w", p.Type, p.Name, err) } - if devData != nil { - key := "p2/" + composerName + "~dev.json" - if err := putObjectWithRetry(gCtx, client, cfg.Bucket, key, devData, logger); err != nil { - return fmt.Errorf("uploading %s: %w", key, err) + for _, f := range files { + if err := putObjectWithRetry(gCtx, client, cfg.Bucket, f.Key, f.Data, logger); err != nil { + return fmt.Errorf("uploading %s: %w", f.Key, err) } uploaded.Add(1) } @@ -98,16 +80,14 @@ func Sync(ctx context.Context, db *sql.DB, cfg config.R2Config, appURL string, l } for _, p := range deactivated { - composerName := composer.ComposerName(p.Type, p.Name) - for _, suffix := range []string{".json", "~dev.json"} { - key := "p2/" + composerName + suffix + for _, key := range composer.ObjectKeys(p.Type, p.Name) { if err := deleteObjectWithRetry(ctx, client, cfg.Bucket, key, logger); err != nil { logger.Warn("sync: failed to delete deactivated package file", "key", key, "error", err) continue } } deleted.Add(1) - logger.Info("sync: deleted deactivated package", "package", composerName) + logger.Info("sync: deleted deactivated package", "type", p.Type, "name", p.Name) } // Step 3: Conditional packages.json upload