diff --git a/internal/cache/s3.go b/internal/cache/s3.go index d1a0f7e..3595b5e 100644 --- a/internal/cache/s3.go +++ b/internal/cache/s3.go @@ -10,6 +10,7 @@ import ( "net/http" "net/textproto" "os" + "runtime" "time" "github.com/alecthomas/errors" @@ -24,12 +25,14 @@ func init() { } type S3Config struct { - Bucket string `hcl:"bucket" help:"S3 bucket name."` - Endpoint string `hcl:"endpoint,optional" help:"S3 endpoint URL (e.g., s3.amazonaws.com or localhost:9000)." default:"s3.amazonaws.com"` - Region string `hcl:"region,optional" help:"S3 region (defaults to us-west-2)." default:"us-west-2"` - UseSSL bool `hcl:"use-ssl,optional" help:"Use SSL for S3 connections (defaults to true)." default:"true"` - SkipSSLVerify bool `hcl:"skip-ssl-verify,optional" help:"Skip SSL certificate verification (defaults to false)." default:"false"` - MaxTTL time.Duration `hcl:"max-ttl,optional" help:"Maximum time-to-live for entries in the S3 cache (defaults to 1 hour)." default:"1h"` + Bucket string `hcl:"bucket" help:"S3 bucket name."` + Endpoint string `hcl:"endpoint,optional" help:"S3 endpoint URL (e.g., s3.amazonaws.com or localhost:9000)." default:"s3.amazonaws.com"` + Region string `hcl:"region,optional" help:"S3 region (defaults to us-west-2)." default:"us-west-2"` + UseSSL bool `hcl:"use-ssl,optional" help:"Use SSL for S3 connections (defaults to true)." default:"true"` + SkipSSLVerify bool `hcl:"skip-ssl-verify,optional" help:"Skip SSL certificate verification (defaults to false)." default:"false"` + MaxTTL time.Duration `hcl:"max-ttl,optional" help:"Maximum time-to-live for entries in the S3 cache (defaults to 1 hour)." default:"1h"` + UploadConcurrency uint `hcl:"upload-concurrency,optional" help:"Number of concurrent workers for multi-part uploads (0 = use all CPU cores, defaults to 1)." default:"1"` + UploadPartSizeMB uint `hcl:"upload-part-size-mb,optional" help:"Size of each part for multi-part uploads in megabytes (defaults to 16MB, minimum 5MB)." default:"16"` } type S3 struct { @@ -53,12 +56,24 @@ var _ Cache = (*S3)(nil) // Metadata (headers and expiration time) are stored as object user metadata. The implementation // uses the lightweight minio-go SDK to reduce overhead compared to the AWS SDK. func NewS3(ctx context.Context, config S3Config) (*S3, error) { + // Set defaults and validate configuration + if config.UploadConcurrency == 0 { + // #nosec G115 -- n is guaranteed >= 1. I was unable to satisfy all linters. + config.UploadConcurrency = uint(max(runtime.NumCPU(), 1)) + } + + if config.UploadPartSizeMB < 5 { + return nil, errors.New("upload-part-size-mb must be at least 5MB (S3 minimum part size)") + } + logging.FromContext(ctx).InfoContext(ctx, "Constructing S3 cache", "endpoint", config.Endpoint, "bucket", config.Bucket, "region", config.Region, "use-ssl", config.UseSSL, - "max-ttl", config.MaxTTL) + "max-ttl", config.MaxTTL, + "upload-concurrency", config.UploadConcurrency, + "upload-part-size-mb", config.UploadPartSizeMB) // Create default transport for credential chain defaultTransport, err := minio.DefaultTransport(config.UseSSL) @@ -277,6 +292,18 @@ func (w *s3Writer) upload(pr *io.PipeReader) { userMetadata["Headers"] = string(headersJSON) } + // Configure upload options + opts := minio.PutObjectOptions{ + UserMetadata: userMetadata, + } + + // Enable concurrent streaming for multi-part uploads if configured + if w.s3.config.UploadConcurrency > 1 { + opts.ConcurrentStreamParts = true + opts.NumThreads = w.s3.config.UploadConcurrency + opts.PartSize = uint64(w.s3.config.UploadPartSizeMB) * 1024 * 1024 // Convert MB to bytes + } + // Upload object with streaming (size -1 means unknown size, will use chunked encoding) _, err = w.s3.client.PutObject( w.ctx, @@ -284,9 +311,7 @@ func (w *s3Writer) upload(pr *io.PipeReader) { objectName, pr, -1, - minio.PutObjectOptions{ - UserMetadata: userMetadata, - }, + opts, ) if err != nil { w.errCh <- errors.Errorf("failed to put object: %w", err) diff --git a/internal/cache/s3_test.go b/internal/cache/s3_test.go index 2e351c1..cc4df78 100644 --- a/internal/cache/s3_test.go +++ b/internal/cache/s3_test.go @@ -145,11 +145,12 @@ func TestS3Cache(t *testing.T) { t.Setenv("AWS_SECRET_ACCESS_KEY", rustfsPassword) c, err := cache.NewS3(ctx, cache.S3Config{ - Endpoint: rustfsAddr, - Bucket: rustfsBucket, - Region: "", - UseSSL: false, - MaxTTL: 100 * time.Millisecond, + Endpoint: rustfsAddr, + Bucket: rustfsBucket, + Region: "", + UseSSL: false, + MaxTTL: 100 * time.Millisecond, + UploadPartSizeMB: 16, }) assert.NoError(t, err) return c