Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions internal/composer/serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,56 @@ import (
"github.com/roots/wp-packages/internal/version"
)

// PackageFile represents a single R2-uploadable file for a package.
type PackageFile struct {
Key string // R2 object key, e.g. "p2/wp-plugin/akismet.json"
Data []byte
}

// PackageFiles returns all Composer p2 files that a package produces.
// Plugins produce up to 2 files (tagged + dev), themes produce 1 (tagged only).
// Returns nil if the package has no serializable versions.
func PackageFiles(pkgType, name, versionsJSON string, meta PackageMeta) ([]PackageFile, error) {
composerName := ComposerName(pkgType, name)
var files []PackageFile

// Tagged versions (always attempted)
tagged, err := SerializePackage(pkgType, name, versionsJSON, meta)
if err != nil {
return nil, err
}
if tagged != nil {
files = append(files, PackageFile{
Key: "p2/" + composerName + ".json",
Data: tagged,
})
}

// Dev versions (plugins only — SerializePackage returns nil for themes)
dev, err := SerializePackage(pkgType, name+"~dev", versionsJSON, meta)
if err != nil {
return nil, err
}
if dev != nil {
files = append(files, PackageFile{
Key: "p2/" + composerName + "~dev.json",
Data: dev,
})
}

return files, nil
}

// ObjectKeys returns all possible storage keys for a package,
// regardless of whether the files currently exist. Used for deletion.
func ObjectKeys(pkgType, name string) []string {
composerName := ComposerName(pkgType, name)
return []string{
"p2/" + composerName + ".json",
"p2/" + composerName + "~dev.json",
}
}

// HashVersions computes a content hash over the normalized versions_json and
// trunk_revision. trunk_revision is included because it affects the serialized
// dev-trunk output (source.reference includes trunk@<rev>) even though it's
Expand Down
2 changes: 2 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type R2Config struct {
Bucket string `yaml:"bucket"`
Endpoint string `yaml:"endpoint"`
Enabled bool `yaml:"enabled"`
Concurrency int `yaml:"concurrency"`
CDNBucket string `yaml:"cdn_bucket"`
CDNPublicURL string `yaml:"cdn_public_url"`
}
Expand Down Expand Up @@ -74,6 +75,7 @@ func defaults() *Config {
Server: ServerConfig{
Addr: ":8080",
},
R2: R2Config{Concurrency: 50},
Session: SessionConfig{LifetimeMinutes: 7200},
Telemetry: TelemetryConfig{
DedupeWindowSeconds: 3600,
Expand Down
61 changes: 48 additions & 13 deletions internal/deploy/r2.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func SyncToR2(ctx context.Context, cfg config.R2Config, buildDir, buildID, previ
// Upload p2/ files in parallel, packages.json last.
var uploaded, skipped atomic.Int64
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(50)
g.SetLimit(cfg.Concurrency)

for _, relPath := range filePaths {
relPath := relPath
Expand Down Expand Up @@ -103,35 +103,43 @@ func SyncToR2(ctx context.Context, cfg config.R2Config, buildDir, buildID, previ
return nil
}

// putObjectWithRetry uploads a single file to R2 with exponential backoff retry.
func putObjectWithRetry(ctx context.Context, client *s3.Client, bucket, key string, data []byte, logger *slog.Logger) error {
contentType := "application/json"
cacheControl := CacheControlForPath(key)

// withRetry executes fn up to r2MaxRetries times with exponential backoff.
// The label is used in log messages to identify the operation.
func withRetry(ctx context.Context, logger *slog.Logger, label string, fn func() error) error {
var lastErr error
for attempt := range r2MaxRetries {
if attempt > 0 {
delay := time.Duration(float64(r2RetryBaseMs)*math.Pow(2, float64(attempt-1))) * time.Millisecond
logger.Warn("retrying R2 upload", "key", key, "attempt", attempt+1, "delay", delay)
logger.Warn("retrying R2 operation", "op", label, "attempt", attempt+1, "delay", delay)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
}
}
lastErr = fn()
if lastErr == nil {
return nil
}
}
return fmt.Errorf("%s after %d attempts: %w", label, r2MaxRetries, lastErr)
}

// putObjectWithRetry uploads a single file to R2 with exponential backoff retry.
func putObjectWithRetry(ctx context.Context, client *s3.Client, bucket, key string, data []byte, logger *slog.Logger) error {
contentType := "application/json"
cacheControl := CacheControlForPath(key)

_, lastErr = client.PutObject(ctx, &s3.PutObjectInput{
return withRetry(ctx, logger, "uploading "+key, func() error {
_, err := client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
ContentType: aws.String(contentType),
CacheControl: aws.String(cacheControl),
})
if lastErr == nil {
return nil
}
}
return fmt.Errorf("uploading %s after %d attempts: %w", key, r2MaxRetries, lastErr)
return err
})
}

// fileUnchanged returns true if relPath exists in both directories with identical content.
Expand Down Expand Up @@ -162,6 +170,33 @@ func CacheControlForPath(path string) string {
return "public, max-age=300"
}

// deleteObjectWithRetry deletes a single object from R2 with exponential backoff retry.
func deleteObjectWithRetry(ctx context.Context, client *s3.Client, bucket, key string, logger *slog.Logger) error {
return withRetry(ctx, logger, "deleting "+key, func() error {
_, err := client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
return err
})
}

// headObject returns the ETag of an object, or "" if the object doesn't exist.
func headObject(ctx context.Context, client *s3.Client, bucket, key string) (string, error) {
resp, err := client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
// NoSuchKey or similar — object doesn't exist
return "", nil
}
if resp.ETag != nil {
return *resp.ETag, nil
}
return "", nil
}

func newS3Client(cfg config.R2Config) *s3.Client {
return s3.New(s3.Options{
Region: "auto",
Expand Down
143 changes: 143 additions & 0 deletions internal/deploy/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package deploy

import (
"context"
"crypto/md5"
"database/sql"
"fmt"
"log/slog"
"sync/atomic"
"time"

"golang.org/x/sync/errgroup"

"github.com/roots/wp-packages/internal/composer"
"github.com/roots/wp-packages/internal/config"
"github.com/roots/wp-packages/internal/packages"
)

// SyncResult holds statistics from a DB-driven R2 sync.
type SyncResult struct {
Uploaded int64
Deleted int64
Skipped int64
Duration time.Duration
}

// Sync uploads changed packages from the database to R2.
//
// It queries for packages where content_hash != deployed_hash, serializes
// them into Composer p2 JSON files, uploads to R2 in parallel, deletes
// p2 files for deactivated packages, conditionally uploads packages.json,
// and stamps deployed_hash on success.
func Sync(ctx context.Context, db *sql.DB, cfg config.R2Config, appURL string, logger *slog.Logger) (*SyncResult, error) {
started := time.Now()
client := newS3Client(cfg)

var uploaded, deleted, skipped atomic.Int64

// Step 1: Upload changed p2/ files
dirty, err := packages.GetDirtyPackages(ctx, db)
if err != nil {
return nil, fmt.Errorf("querying dirty packages: %w", err)
}

logger.Info("sync: dirty packages", "count", len(dirty))

g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(cfg.Concurrency)

for _, p := range dirty {
p := p
g.Go(func() error {
files, err := composer.PackageFiles(p.Type, p.Name, p.VersionsJSON, p.ComposerMeta())
if err != nil {
return fmt.Errorf("serializing %s/%s: %w", p.Type, p.Name, err)
}
for _, f := range files {
if err := putObjectWithRetry(gCtx, client, cfg.Bucket, f.Key, f.Data, logger); err != nil {
return fmt.Errorf("uploading %s: %w", f.Key, err)
}
uploaded.Add(1)
}

n := uploaded.Load() + skipped.Load()
if n%500 == 0 && n > 0 {
logger.Info("sync: upload progress", "uploaded", uploaded.Load(), "total_dirty", len(dirty))
}
return nil
})
}

if err := g.Wait(); err != nil {
return nil, err
}

// Step 2: Delete p2/ files for deactivated packages
deactivated, err := packages.GetDeactivatedDeployedPackages(ctx, db)
if err != nil {
return nil, fmt.Errorf("querying deactivated packages: %w", err)
}

for _, p := range deactivated {
for _, key := range composer.ObjectKeys(p.Type, p.Name) {
if err := deleteObjectWithRetry(ctx, client, cfg.Bucket, key, logger); err != nil {
logger.Warn("sync: failed to delete deactivated package file", "key", key, "error", err)
continue
}
}
deleted.Add(1)
logger.Info("sync: deleted deactivated package", "type", p.Type, "name", p.Name)
}

// Step 3: Conditional packages.json upload
packagesData, err := composer.PackagesJSON(appURL)
if err != nil {
return nil, fmt.Errorf("generating packages.json: %w", err)
}

currentETag, _ := headObject(ctx, client, cfg.Bucket, r2IndexFile)
newETag := fmt.Sprintf(`"%x"`, md5.Sum(packagesData))
if currentETag != newETag {
if err := putObjectWithRetry(ctx, client, cfg.Bucket, r2IndexFile, packagesData, logger); err != nil {
return nil, fmt.Errorf("uploading packages.json: %w", err)
}
logger.Info("sync: uploaded packages.json")
} else {
logger.Info("sync: packages.json unchanged, skipped")
}

// Step 4: Stamp deployed_hash
if len(dirty) > 0 {
_, err = db.ExecContext(ctx, `
UPDATE packages SET deployed_hash = content_hash
WHERE is_active = 1 AND content_hash IS NOT NULL
AND (deployed_hash IS NULL OR content_hash != deployed_hash)`)
if err != nil {
return nil, fmt.Errorf("stamping deployed_hash: %w", err)
}
}

if len(deactivated) > 0 {
_, err = db.ExecContext(ctx, `
UPDATE packages SET deployed_hash = NULL
WHERE is_active = 0 AND deployed_hash IS NOT NULL`)
if err != nil {
return nil, fmt.Errorf("clearing deployed_hash for deactivated: %w", err)
}
}

result := &SyncResult{
Uploaded: uploaded.Load(),
Deleted: deleted.Load(),
Skipped: skipped.Load(),
Duration: time.Since(started),
}

logger.Info("sync: complete",
"uploaded", result.Uploaded,
"deleted", result.Deleted,
"duration", result.Duration.String(),
)
return result, nil
}
Loading
Loading