diff --git a/internal/composer/serialize.go b/internal/composer/serialize.go index b0a615b..b474355 100644 --- a/internal/composer/serialize.go +++ b/internal/composer/serialize.go @@ -10,6 +10,56 @@ import ( "github.com/roots/wp-packages/internal/version" ) +// PackageFile represents a single R2-uploadable file for a package. +type PackageFile struct { + Key string // R2 object key, e.g. "p2/wp-plugin/akismet.json" + Data []byte +} + +// PackageFiles returns all Composer p2 files that a package produces. +// Plugins produce up to 2 files (tagged + dev), themes produce 1 (tagged only). +// Returns nil if the package has no serializable versions. +func PackageFiles(pkgType, name, versionsJSON string, meta PackageMeta) ([]PackageFile, error) { + composerName := ComposerName(pkgType, name) + var files []PackageFile + + // Tagged versions (always attempted) + tagged, err := SerializePackage(pkgType, name, versionsJSON, meta) + if err != nil { + return nil, err + } + if tagged != nil { + files = append(files, PackageFile{ + Key: "p2/" + composerName + ".json", + Data: tagged, + }) + } + + // Dev versions (plugins only — SerializePackage returns nil for themes) + dev, err := SerializePackage(pkgType, name+"~dev", versionsJSON, meta) + if err != nil { + return nil, err + } + if dev != nil { + files = append(files, PackageFile{ + Key: "p2/" + composerName + "~dev.json", + Data: dev, + }) + } + + return files, nil +} + +// ObjectKeys returns all possible storage keys for a package, +// regardless of whether the files currently exist. Used for deletion. +func ObjectKeys(pkgType, name string) []string { + composerName := ComposerName(pkgType, name) + return []string{ + "p2/" + composerName + ".json", + "p2/" + composerName + "~dev.json", + } +} + // HashVersions computes a content hash over the normalized versions_json and // trunk_revision. trunk_revision is included because it affects the serialized // dev-trunk output (source.reference includes trunk@) even though it's diff --git a/internal/config/config.go b/internal/config/config.go index f479d35..cede96f 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -40,6 +40,7 @@ type R2Config struct { Bucket string `yaml:"bucket"` Endpoint string `yaml:"endpoint"` Enabled bool `yaml:"enabled"` + Concurrency int `yaml:"concurrency"` CDNBucket string `yaml:"cdn_bucket"` CDNPublicURL string `yaml:"cdn_public_url"` } @@ -74,6 +75,7 @@ func defaults() *Config { Server: ServerConfig{ Addr: ":8080", }, + R2: R2Config{Concurrency: 50}, Session: SessionConfig{LifetimeMinutes: 7200}, Telemetry: TelemetryConfig{ DedupeWindowSeconds: 3600, diff --git a/internal/deploy/r2.go b/internal/deploy/r2.go index 0119a3c..b239a4a 100644 --- a/internal/deploy/r2.go +++ b/internal/deploy/r2.go @@ -58,7 +58,7 @@ func SyncToR2(ctx context.Context, cfg config.R2Config, buildDir, buildID, previ // Upload p2/ files in parallel, packages.json last. var uploaded, skipped atomic.Int64 g, gCtx := errgroup.WithContext(ctx) - g.SetLimit(50) + g.SetLimit(cfg.Concurrency) for _, relPath := range filePaths { relPath := relPath @@ -103,35 +103,43 @@ func SyncToR2(ctx context.Context, cfg config.R2Config, buildDir, buildID, previ return nil } -// putObjectWithRetry uploads a single file to R2 with exponential backoff retry. -func putObjectWithRetry(ctx context.Context, client *s3.Client, bucket, key string, data []byte, logger *slog.Logger) error { - contentType := "application/json" - cacheControl := CacheControlForPath(key) - +// withRetry executes fn up to r2MaxRetries times with exponential backoff. +// The label is used in log messages to identify the operation. +func withRetry(ctx context.Context, logger *slog.Logger, label string, fn func() error) error { var lastErr error for attempt := range r2MaxRetries { if attempt > 0 { delay := time.Duration(float64(r2RetryBaseMs)*math.Pow(2, float64(attempt-1))) * time.Millisecond - logger.Warn("retrying R2 upload", "key", key, "attempt", attempt+1, "delay", delay) + logger.Warn("retrying R2 operation", "op", label, "attempt", attempt+1, "delay", delay) select { case <-ctx.Done(): return ctx.Err() case <-time.After(delay): } } + lastErr = fn() + if lastErr == nil { + return nil + } + } + return fmt.Errorf("%s after %d attempts: %w", label, r2MaxRetries, lastErr) +} + +// putObjectWithRetry uploads a single file to R2 with exponential backoff retry. +func putObjectWithRetry(ctx context.Context, client *s3.Client, bucket, key string, data []byte, logger *slog.Logger) error { + contentType := "application/json" + cacheControl := CacheControlForPath(key) - _, lastErr = client.PutObject(ctx, &s3.PutObjectInput{ + return withRetry(ctx, logger, "uploading "+key, func() error { + _, err := client.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(bucket), Key: aws.String(key), Body: bytes.NewReader(data), ContentType: aws.String(contentType), CacheControl: aws.String(cacheControl), }) - if lastErr == nil { - return nil - } - } - return fmt.Errorf("uploading %s after %d attempts: %w", key, r2MaxRetries, lastErr) + return err + }) } // fileUnchanged returns true if relPath exists in both directories with identical content. @@ -162,6 +170,33 @@ func CacheControlForPath(path string) string { return "public, max-age=300" } +// deleteObjectWithRetry deletes a single object from R2 with exponential backoff retry. +func deleteObjectWithRetry(ctx context.Context, client *s3.Client, bucket, key string, logger *slog.Logger) error { + return withRetry(ctx, logger, "deleting "+key, func() error { + _, err := client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + return err + }) +} + +// headObject returns the ETag of an object, or "" if the object doesn't exist. +func headObject(ctx context.Context, client *s3.Client, bucket, key string) (string, error) { + resp, err := client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + // NoSuchKey or similar — object doesn't exist + return "", nil + } + if resp.ETag != nil { + return *resp.ETag, nil + } + return "", nil +} + func newS3Client(cfg config.R2Config) *s3.Client { return s3.New(s3.Options{ Region: "auto", diff --git a/internal/deploy/sync.go b/internal/deploy/sync.go new file mode 100644 index 0000000..02081e8 --- /dev/null +++ b/internal/deploy/sync.go @@ -0,0 +1,143 @@ +package deploy + +import ( + "context" + "crypto/md5" + "database/sql" + "fmt" + "log/slog" + "sync/atomic" + "time" + + "golang.org/x/sync/errgroup" + + "github.com/roots/wp-packages/internal/composer" + "github.com/roots/wp-packages/internal/config" + "github.com/roots/wp-packages/internal/packages" +) + +// SyncResult holds statistics from a DB-driven R2 sync. +type SyncResult struct { + Uploaded int64 + Deleted int64 + Skipped int64 + Duration time.Duration +} + +// Sync uploads changed packages from the database to R2. +// +// It queries for packages where content_hash != deployed_hash, serializes +// them into Composer p2 JSON files, uploads to R2 in parallel, deletes +// p2 files for deactivated packages, conditionally uploads packages.json, +// and stamps deployed_hash on success. +func Sync(ctx context.Context, db *sql.DB, cfg config.R2Config, appURL string, logger *slog.Logger) (*SyncResult, error) { + started := time.Now() + client := newS3Client(cfg) + + var uploaded, deleted, skipped atomic.Int64 + + // Step 1: Upload changed p2/ files + dirty, err := packages.GetDirtyPackages(ctx, db) + if err != nil { + return nil, fmt.Errorf("querying dirty packages: %w", err) + } + + logger.Info("sync: dirty packages", "count", len(dirty)) + + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(cfg.Concurrency) + + for _, p := range dirty { + p := p + g.Go(func() error { + files, err := composer.PackageFiles(p.Type, p.Name, p.VersionsJSON, p.ComposerMeta()) + if err != nil { + return fmt.Errorf("serializing %s/%s: %w", p.Type, p.Name, err) + } + for _, f := range files { + if err := putObjectWithRetry(gCtx, client, cfg.Bucket, f.Key, f.Data, logger); err != nil { + return fmt.Errorf("uploading %s: %w", f.Key, err) + } + uploaded.Add(1) + } + + n := uploaded.Load() + skipped.Load() + if n%500 == 0 && n > 0 { + logger.Info("sync: upload progress", "uploaded", uploaded.Load(), "total_dirty", len(dirty)) + } + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + + // Step 2: Delete p2/ files for deactivated packages + deactivated, err := packages.GetDeactivatedDeployedPackages(ctx, db) + if err != nil { + return nil, fmt.Errorf("querying deactivated packages: %w", err) + } + + for _, p := range deactivated { + for _, key := range composer.ObjectKeys(p.Type, p.Name) { + if err := deleteObjectWithRetry(ctx, client, cfg.Bucket, key, logger); err != nil { + logger.Warn("sync: failed to delete deactivated package file", "key", key, "error", err) + continue + } + } + deleted.Add(1) + logger.Info("sync: deleted deactivated package", "type", p.Type, "name", p.Name) + } + + // Step 3: Conditional packages.json upload + packagesData, err := composer.PackagesJSON(appURL) + if err != nil { + return nil, fmt.Errorf("generating packages.json: %w", err) + } + + currentETag, _ := headObject(ctx, client, cfg.Bucket, r2IndexFile) + newETag := fmt.Sprintf(`"%x"`, md5.Sum(packagesData)) + if currentETag != newETag { + if err := putObjectWithRetry(ctx, client, cfg.Bucket, r2IndexFile, packagesData, logger); err != nil { + return nil, fmt.Errorf("uploading packages.json: %w", err) + } + logger.Info("sync: uploaded packages.json") + } else { + logger.Info("sync: packages.json unchanged, skipped") + } + + // Step 4: Stamp deployed_hash + if len(dirty) > 0 { + _, err = db.ExecContext(ctx, ` + UPDATE packages SET deployed_hash = content_hash + WHERE is_active = 1 AND content_hash IS NOT NULL + AND (deployed_hash IS NULL OR content_hash != deployed_hash)`) + if err != nil { + return nil, fmt.Errorf("stamping deployed_hash: %w", err) + } + } + + if len(deactivated) > 0 { + _, err = db.ExecContext(ctx, ` + UPDATE packages SET deployed_hash = NULL + WHERE is_active = 0 AND deployed_hash IS NOT NULL`) + if err != nil { + return nil, fmt.Errorf("clearing deployed_hash for deactivated: %w", err) + } + } + + result := &SyncResult{ + Uploaded: uploaded.Load(), + Deleted: deleted.Load(), + Skipped: skipped.Load(), + Duration: time.Since(started), + } + + logger.Info("sync: complete", + "uploaded", result.Uploaded, + "deleted", result.Deleted, + "duration", result.Duration.String(), + ) + return result, nil +} diff --git a/internal/integration/db_sync_test.go b/internal/integration/db_sync_test.go new file mode 100644 index 0000000..3d9c0a1 --- /dev/null +++ b/internal/integration/db_sync_test.go @@ -0,0 +1,209 @@ +//go:build integration + +package integration + +import ( + "context" + "database/sql" + "encoding/json" + "io" + "net/http/httptest" + "path/filepath" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/johannesboyne/gofakes3" + "github.com/johannesboyne/gofakes3/backend/s3mem" + "github.com/roots/wp-packages/internal/composer" + "github.com/roots/wp-packages/internal/config" + "github.com/roots/wp-packages/internal/deploy" + "github.com/roots/wp-packages/internal/packages" + "github.com/roots/wp-packages/internal/testutil" + "github.com/roots/wp-packages/internal/wporg" +) + +// backfillContentHashes computes content_hash for all packages that don't have one. +func backfillContentHashes(t *testing.T, database *sql.DB) { + t.Helper() + ctx := context.Background() + + pkgs, err := packages.GetPackagesNeedingUpdate(ctx, database, packages.UpdateQueryOpts{ + Force: true, + Type: "all", + }) + if err != nil { + t.Fatalf("getting packages for hash backfill: %v", err) + } + + for _, p := range pkgs { + hash := composer.HashVersions(p.VersionsJSON, p.TrunkRevision) + _, err := database.ExecContext(ctx, + `UPDATE packages SET content_hash = ? WHERE id = ?`, hash, p.ID) + if err != nil { + t.Fatalf("backfilling hash for %s/%s: %v", p.Type, p.Name, err) + } + } +} + +func TestDBDrivenSync(t *testing.T) { + ctx := context.Background() + + // 1. Seed DB from fixtures + fixtureDir := filepath.Join("..", "wporg", "testdata") + mock := wporg.NewMockServer(fixtureDir) + defer mock.Close() + + db := testutil.OpenTestDB(t) + testutil.SeedFromFixtures(t, db, mock.URL) + backfillContentHashes(t, db) + + // 2. Start gofakes3 in-process + backend := s3mem.New() + faker := gofakes3.New(backend) + ts := httptest.NewServer(faker.Server()) + defer ts.Close() + + s3Client := newTestS3Client(ts.URL) + _, err := s3Client.CreateBucket(ctx, &s3.CreateBucketInput{ + Bucket: aws.String("test-bucket"), + }) + if err != nil { + t.Fatalf("creating bucket: %v", err) + } + + r2Cfg := config.R2Config{ + AccessKeyID: "test", + SecretAccessKey: "test", + Bucket: "test-bucket", + Endpoint: ts.URL, + Concurrency: 1, + } + + // 3. First sync — all packages should be uploaded + result, err := deploy.Sync(ctx, db, r2Cfg, "http://test.local", testLogger(t)) + if err != nil { + t.Fatalf("first sync failed: %v", err) + } + if result.Uploaded == 0 { + t.Error("expected uploads on first sync") + } + t.Logf("first sync: uploaded=%d deleted=%d", result.Uploaded, result.Deleted) + + // Verify packages.json exists and is valid + rootObj, err := s3Client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String("test-bucket"), + Key: aws.String("packages.json"), + }) + if err != nil { + t.Fatalf("packages.json not found after sync: %v", err) + } + rootData, _ := io.ReadAll(rootObj.Body) + _ = rootObj.Body.Close() + + var rootJSON map[string]any + if err := json.Unmarshal(rootData, &rootJSON); err != nil { + t.Fatalf("invalid packages.json: %v", err) + } + if _, ok := rootJSON["metadata-url"]; !ok { + t.Error("packages.json missing metadata-url") + } + + // Verify p2/ files exist for a known plugin + p2Key := "p2/wp-plugin/akismet.json" + p2Obj, err := s3Client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String("test-bucket"), + Key: aws.String(p2Key), + }) + if err != nil { + t.Fatalf("p2 file %s not found: %v", p2Key, err) + } + p2Data, _ := io.ReadAll(p2Obj.Body) + _ = p2Obj.Body.Close() + + // Verify p2 content is valid Composer JSON + var p2JSON map[string]any + if err := json.Unmarshal(p2Data, &p2JSON); err != nil { + t.Fatalf("invalid p2 JSON for %s: %v", p2Key, err) + } + pkgsField, ok := p2JSON["packages"].(map[string]any) + if !ok { + t.Fatalf("p2 file missing 'packages' key") + } + if _, ok := pkgsField["wp-plugin/akismet"]; !ok { + t.Error("p2 file missing wp-plugin/akismet entry") + } + + // Verify ~dev.json exists for plugins + devKey := "p2/wp-plugin/akismet~dev.json" + devObj, err := s3Client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String("test-bucket"), + Key: aws.String(devKey), + }) + if err != nil { + t.Fatalf("dev file %s not found: %v", devKey, err) + } + _ = devObj.Body.Close() + + // Verify deployed_hash is stamped (no dirty packages remain) + var dirtyCount int + err = db.QueryRowContext(ctx, + `SELECT COUNT(*) FROM packages WHERE is_active = 1 + AND content_hash IS NOT NULL + AND (deployed_hash IS NULL OR content_hash != deployed_hash)`).Scan(&dirtyCount) + if err != nil { + t.Fatalf("counting dirty packages: %v", err) + } + if dirtyCount != 0 { + t.Errorf("expected 0 dirty packages after sync, got %d", dirtyCount) + } + + // 4. Second sync — idempotent, nothing to upload + result2, err := deploy.Sync(ctx, db, r2Cfg, "http://test.local", testLogger(t)) + if err != nil { + t.Fatalf("second sync failed: %v", err) + } + if result2.Uploaded != 0 { + t.Errorf("expected 0 uploads on idempotent sync, got %d", result2.Uploaded) + } + + // 5. Deactivate a package, sync again, verify deletion + var akismetID int64 + err = db.QueryRowContext(ctx, + `SELECT id FROM packages WHERE type='plugin' AND name='akismet'`).Scan(&akismetID) + if err != nil { + t.Fatalf("finding akismet: %v", err) + } + + if err := packages.DeactivatePackage(ctx, db, akismetID); err != nil { + t.Fatalf("deactivating akismet: %v", err) + } + + result3, err := deploy.Sync(ctx, db, r2Cfg, "http://test.local", testLogger(t)) + if err != nil { + t.Fatalf("sync after deactivation failed: %v", err) + } + if result3.Deleted == 0 { + t.Error("expected deletions after deactivating akismet") + } + + // Verify akismet p2 files are gone from R2 + _, err = s3Client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String("test-bucket"), + Key: aws.String(p2Key), + }) + if err == nil { + t.Error("akismet.json should have been deleted from R2") + } + + // Verify deployed_hash is cleared for deactivated package + var deployedHash *string + err = db.QueryRowContext(ctx, + `SELECT deployed_hash FROM packages WHERE id = ?`, akismetID).Scan(&deployedHash) + if err != nil { + t.Fatalf("checking deployed_hash: %v", err) + } + if deployedHash != nil { + t.Error("deployed_hash should be NULL for deactivated package") + } +} diff --git a/internal/integration/sync_test.go b/internal/integration/sync_test.go index 10dea36..6bcd2cb 100644 --- a/internal/integration/sync_test.go +++ b/internal/integration/sync_test.go @@ -66,6 +66,7 @@ func TestR2Sync(t *testing.T) { SecretAccessKey: "test", Bucket: "test-bucket", Endpoint: ts.URL, + Concurrency: 1, } // 4. First sync — all packages uploaded diff --git a/internal/packages/package.go b/internal/packages/package.go index 0b09431..0719abe 100644 --- a/internal/packages/package.go +++ b/internal/packages/package.go @@ -8,9 +8,28 @@ import ( "strings" "time" + "github.com/roots/wp-packages/internal/composer" "github.com/roots/wp-packages/internal/version" ) +// ComposerMeta builds a composer.PackageMeta from a Package's nullable DB fields. +func (p *Package) ComposerMeta() composer.PackageMeta { + meta := composer.PackageMeta{TrunkRevision: p.TrunkRevision} + if p.Description != nil { + meta.Description = *p.Description + } + if p.Homepage != nil { + meta.Homepage = *p.Homepage + } + if p.Author != nil { + meta.Author = *p.Author + } + if p.LastCommitted != nil { + meta.LastUpdated = p.LastCommitted.Format(time.RFC3339) + } + return meta +} + type Package struct { ID int64 Type string @@ -339,6 +358,60 @@ func GetPackagesNeedingUpdate(ctx context.Context, db *sql.DB, opts UpdateQueryO return pkgs, rows.Err() } +// GetDirtyPackages returns active packages whose content_hash differs from deployed_hash. +func GetDirtyPackages(ctx context.Context, db *sql.DB) ([]*Package, error) { + rows, err := db.QueryContext(ctx, ` + SELECT id, type, name, versions_json, content_hash, + description, homepage, author, last_committed, trunk_revision + FROM packages + WHERE is_active = 1 + AND content_hash IS NOT NULL + AND (deployed_hash IS NULL OR content_hash != deployed_hash)`) + if err != nil { + return nil, fmt.Errorf("querying dirty packages: %w", err) + } + defer func() { _ = rows.Close() }() + + var pkgs []*Package + for rows.Next() { + var p Package + var lastCommitted *string + if err := rows.Scan(&p.ID, &p.Type, &p.Name, &p.VersionsJSON, &p.ContentHash, + &p.Description, &p.Homepage, &p.Author, &lastCommitted, &p.TrunkRevision); err != nil { + return nil, fmt.Errorf("scanning dirty package: %w", err) + } + if lastCommitted != nil { + if t, err := time.Parse(time.RFC3339, *lastCommitted); err == nil { + p.LastCommitted = &t + } + } + pkgs = append(pkgs, &p) + } + return pkgs, rows.Err() +} + +// GetDeactivatedDeployedPackages returns inactive packages that still have a deployed_hash +// (i.e. their p2 files are still on R2 and need to be deleted). +func GetDeactivatedDeployedPackages(ctx context.Context, db *sql.DB) ([]*Package, error) { + rows, err := db.QueryContext(ctx, ` + SELECT id, type, name FROM packages + WHERE is_active = 0 AND deployed_hash IS NOT NULL`) + if err != nil { + return nil, fmt.Errorf("querying deactivated deployed packages: %w", err) + } + defer func() { _ = rows.Close() }() + + var pkgs []*Package + for rows.Next() { + var p Package + if err := rows.Scan(&p.ID, &p.Type, &p.Name); err != nil { + return nil, fmt.Errorf("scanning deactivated package: %w", err) + } + pkgs = append(pkgs, &p) + } + return pkgs, rows.Err() +} + // DeactivatePackage sets is_active = 0 for a package. func DeactivatePackage(ctx context.Context, db *sql.DB, id int64) error { now := time.Now().UTC().Format(time.RFC3339)