Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions internal/cache/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"net/textproto"
"os"
"runtime"
"time"

"github.com/alecthomas/errors"
Expand All @@ -24,12 +25,14 @@ func init() {
}

type S3Config struct {
Bucket string `hcl:"bucket" help:"S3 bucket name."`
Endpoint string `hcl:"endpoint,optional" help:"S3 endpoint URL (e.g., s3.amazonaws.com or localhost:9000)." default:"s3.amazonaws.com"`
Region string `hcl:"region,optional" help:"S3 region (defaults to us-west-2)." default:"us-west-2"`
UseSSL bool `hcl:"use-ssl,optional" help:"Use SSL for S3 connections (defaults to true)." default:"true"`
SkipSSLVerify bool `hcl:"skip-ssl-verify,optional" help:"Skip SSL certificate verification (defaults to false)." default:"false"`
MaxTTL time.Duration `hcl:"max-ttl,optional" help:"Maximum time-to-live for entries in the S3 cache (defaults to 1 hour)." default:"1h"`
Bucket string `hcl:"bucket" help:"S3 bucket name."`
Endpoint string `hcl:"endpoint,optional" help:"S3 endpoint URL (e.g., s3.amazonaws.com or localhost:9000)." default:"s3.amazonaws.com"`
Region string `hcl:"region,optional" help:"S3 region (defaults to us-west-2)." default:"us-west-2"`
UseSSL bool `hcl:"use-ssl,optional" help:"Use SSL for S3 connections (defaults to true)." default:"true"`
SkipSSLVerify bool `hcl:"skip-ssl-verify,optional" help:"Skip SSL certificate verification (defaults to false)." default:"false"`
MaxTTL time.Duration `hcl:"max-ttl,optional" help:"Maximum time-to-live for entries in the S3 cache (defaults to 1 hour)." default:"1h"`
UploadConcurrency uint `hcl:"upload-concurrency,optional" help:"Number of concurrent workers for multi-part uploads (0 = use all CPU cores, defaults to 1)." default:"1"`
UploadPartSizeMB uint `hcl:"upload-part-size-mb,optional" help:"Size of each part for multi-part uploads in megabytes (defaults to 16MB, minimum 5MB)." default:"16"`
}

type S3 struct {
Expand All @@ -53,12 +56,24 @@ var _ Cache = (*S3)(nil)
// Metadata (headers and expiration time) are stored as object user metadata. The implementation
// uses the lightweight minio-go SDK to reduce overhead compared to the AWS SDK.
func NewS3(ctx context.Context, config S3Config) (*S3, error) {
// Set defaults and validate configuration
if config.UploadConcurrency == 0 {
// #nosec G115 -- n is guaranteed >= 1. I was unable to satisfy all linters.
config.UploadConcurrency = uint(max(runtime.NumCPU(), 1))
}

if config.UploadPartSizeMB < 5 {
return nil, errors.New("upload-part-size-mb must be at least 5MB (S3 minimum part size)")
}

logging.FromContext(ctx).InfoContext(ctx, "Constructing S3 cache",
"endpoint", config.Endpoint,
"bucket", config.Bucket,
"region", config.Region,
"use-ssl", config.UseSSL,
"max-ttl", config.MaxTTL)
"max-ttl", config.MaxTTL,
"upload-concurrency", config.UploadConcurrency,
"upload-part-size-mb", config.UploadPartSizeMB)

// Create default transport for credential chain
defaultTransport, err := minio.DefaultTransport(config.UseSSL)
Expand Down Expand Up @@ -277,16 +292,26 @@ func (w *s3Writer) upload(pr *io.PipeReader) {
userMetadata["Headers"] = string(headersJSON)
}

// Configure upload options
opts := minio.PutObjectOptions{
UserMetadata: userMetadata,
}

// Enable concurrent streaming for multi-part uploads if configured
if w.s3.config.UploadConcurrency > 1 {
opts.ConcurrentStreamParts = true
opts.NumThreads = w.s3.config.UploadConcurrency
opts.PartSize = uint64(w.s3.config.UploadPartSizeMB) * 1024 * 1024 // Convert MB to bytes
}

// Upload object with streaming (size -1 means unknown size, will use chunked encoding)
_, err = w.s3.client.PutObject(
w.ctx,
w.s3.config.Bucket,
objectName,
pr,
-1,
minio.PutObjectOptions{
UserMetadata: userMetadata,
},
opts,
)
if err != nil {
w.errCh <- errors.Errorf("failed to put object: %w", err)
Expand Down
11 changes: 6 additions & 5 deletions internal/cache/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,12 @@ func TestS3Cache(t *testing.T) {
t.Setenv("AWS_SECRET_ACCESS_KEY", rustfsPassword)

c, err := cache.NewS3(ctx, cache.S3Config{
Endpoint: rustfsAddr,
Bucket: rustfsBucket,
Region: "",
UseSSL: false,
MaxTTL: 100 * time.Millisecond,
Endpoint: rustfsAddr,
Bucket: rustfsBucket,
Region: "",
UseSSL: false,
MaxTTL: 100 * time.Millisecond,
UploadPartSizeMB: 16,
})
assert.NoError(t, err)
return c
Expand Down