-
Notifications
You must be signed in to change notification settings - Fork 28
feat(s3): add Amazon S3 Files control-plane support (#707) #709
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,194 @@ | ||||||||||||||||||||||||||||||||||||||
| package io.kestra.plugin.aws.s3.files; | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| import java.io.ByteArrayInputStream; | ||||||||||||||||||||||||||||||||||||||
| import java.io.InputStream; | ||||||||||||||||||||||||||||||||||||||
| import java.net.URI; | ||||||||||||||||||||||||||||||||||||||
| import java.nio.charset.StandardCharsets; | ||||||||||||||||||||||||||||||||||||||
| import java.time.Duration; | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| import io.kestra.core.runners.RunContext; | ||||||||||||||||||||||||||||||||||||||
| import io.kestra.plugin.aws.AbstractConnection; | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| import lombok.EqualsAndHashCode; | ||||||||||||||||||||||||||||||||||||||
| import lombok.Getter; | ||||||||||||||||||||||||||||||||||||||
| import lombok.NoArgsConstructor; | ||||||||||||||||||||||||||||||||||||||
| import lombok.ToString; | ||||||||||||||||||||||||||||||||||||||
| import lombok.experimental.SuperBuilder; | ||||||||||||||||||||||||||||||||||||||
| import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; | ||||||||||||||||||||||||||||||||||||||
| import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; | ||||||||||||||||||||||||||||||||||||||
| import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; | ||||||||||||||||||||||||||||||||||||||
| import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; | ||||||||||||||||||||||||||||||||||||||
| import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; | ||||||||||||||||||||||||||||||||||||||
| import software.amazon.awssdk.auth.signer.Aws4Signer; | ||||||||||||||||||||||||||||||||||||||
| import software.amazon.awssdk.auth.signer.params.Aws4SignerParams; | ||||||||||||||||||||||||||||||||||||||
| import software.amazon.awssdk.http.ExecutableHttpRequest; | ||||||||||||||||||||||||||||||||||||||
| import software.amazon.awssdk.http.HttpExecuteRequest; | ||||||||||||||||||||||||||||||||||||||
| import software.amazon.awssdk.http.HttpExecuteResponse; | ||||||||||||||||||||||||||||||||||||||
| import software.amazon.awssdk.http.SdkHttpFullRequest; | ||||||||||||||||||||||||||||||||||||||
| import software.amazon.awssdk.http.SdkHttpMethod; | ||||||||||||||||||||||||||||||||||||||
| import software.amazon.awssdk.http.apache.ApacheHttpClient; | ||||||||||||||||||||||||||||||||||||||
| import software.amazon.awssdk.regions.Region; | ||||||||||||||||||||||||||||||||||||||
| import software.amazon.awssdk.services.sts.StsClient; | ||||||||||||||||||||||||||||||||||||||
| import software.amazon.awssdk.services.sts.StsClientBuilder; | ||||||||||||||||||||||||||||||||||||||
| import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; | ||||||||||||||||||||||||||||||||||||||
| import software.amazon.awssdk.services.sts.model.Credentials; | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| @SuperBuilder | ||||||||||||||||||||||||||||||||||||||
| @ToString | ||||||||||||||||||||||||||||||||||||||
| @EqualsAndHashCode | ||||||||||||||||||||||||||||||||||||||
| @Getter | ||||||||||||||||||||||||||||||||||||||
| @NoArgsConstructor | ||||||||||||||||||||||||||||||||||||||
| public abstract class AbstractS3Files extends AbstractConnection { | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| static final String S3_FILES_SERVICE = "s3files"; | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| protected AwsCredentialsProvider credentialsProvider(RunContext runContext) throws Exception { | ||||||||||||||||||||||||||||||||||||||
| AbstractConnection.AwsClientConfig cfg = this.awsClientConfig(runContext); | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| if (cfg.stsRoleArn() != null) { | ||||||||||||||||||||||||||||||||||||||
| return stsCredentialsProvider(cfg); | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| if (cfg.accessKeyId() != null && cfg.secretKeyId() != null) { | ||||||||||||||||||||||||||||||||||||||
| if (cfg.sessionToken() != null) { | ||||||||||||||||||||||||||||||||||||||
| return StaticCredentialsProvider.create( | ||||||||||||||||||||||||||||||||||||||
| AwsSessionCredentials.create(cfg.accessKeyId(), cfg.secretKeyId(), cfg.sessionToken()) | ||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| return StaticCredentialsProvider.create( | ||||||||||||||||||||||||||||||||||||||
| AwsBasicCredentials.create(cfg.accessKeyId(), cfg.secretKeyId()) | ||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| return DefaultCredentialsProvider.create(); | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| private AwsCredentialsProvider stsCredentialsProvider(AbstractConnection.AwsClientConfig cfg) { | ||||||||||||||||||||||||||||||||||||||
| StsClientBuilder stsBuilder = StsClient.builder() | ||||||||||||||||||||||||||||||||||||||
| .region(Region.of(cfg.region())); | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| if (cfg.stsEndpointOverride() != null) { | ||||||||||||||||||||||||||||||||||||||
| stsBuilder.endpointOverride(URI.create(cfg.stsEndpointOverride())); | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| if (cfg.accessKeyId() != null && cfg.secretKeyId() != null) { | ||||||||||||||||||||||||||||||||||||||
| stsBuilder.credentialsProvider( | ||||||||||||||||||||||||||||||||||||||
| StaticCredentialsProvider.create( | ||||||||||||||||||||||||||||||||||||||
| AwsBasicCredentials.create(cfg.accessKeyId(), cfg.secretKeyId()) | ||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+75
to
+79
|
||||||||||||||||||||||||||||||||||||||
| stsBuilder.credentialsProvider( | |
| StaticCredentialsProvider.create( | |
| AwsBasicCredentials.create(cfg.accessKeyId(), cfg.secretKeyId()) | |
| ) | |
| ); | |
| if (cfg.sessionToken() != null) { | |
| stsBuilder.credentialsProvider( | |
| StaticCredentialsProvider.create( | |
| AwsSessionCredentials.create(cfg.accessKeyId(), cfg.secretKeyId(), cfg.sessionToken()) | |
| ) | |
| ); | |
| } else { | |
| stsBuilder.credentialsProvider( | |
| StaticCredentialsProvider.create( | |
| AwsBasicCredentials.create(cfg.accessKeyId(), cfg.secretKeyId()) | |
| ) | |
| ); | |
| } |
Copilot
AI
Apr 18, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executeRequest always sets the Host header to s3files.{region}.amazonaws.com even when endpointOverride is used. This will break endpoint overrides (e.g., LocalStack) because the HTTP Host header and the SigV4 canonical host won’t match the actual request URI. Use the request URI host (and port if present) when overriding the endpoint, or avoid setting Host explicitly and let the SDK derive it from the URI before signing.
| .putHeader("Content-Type", "application/json") | |
| .putHeader("Host", serviceHost); | |
| .putHeader("Content-Type", "application/json"); |
Copilot
AI
Apr 18, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executeRequest creates and closes a new ApacheHttpClient instance for every API call. Even if most tasks call the API once, this adds avoidable overhead and makes it harder to tune connection pooling/timeouts. Consider creating a single reusable SdkHttpClient (or ApacheHttpClient.builder() with timeouts) and reusing it across requests within the task execution.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| package io.kestra.plugin.aws.s3.files; | ||
|
|
||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| import io.kestra.core.models.annotations.Example; | ||
| import io.kestra.core.models.annotations.Plugin; | ||
| import io.kestra.core.models.annotations.PluginProperty; | ||
| import io.kestra.core.models.property.Property; | ||
| import io.kestra.core.models.tasks.RunnableTask; | ||
| import io.kestra.core.runners.RunContext; | ||
| import io.kestra.plugin.aws.s3.files.models.FileSystem; | ||
|
|
||
| import io.swagger.v3.oas.annotations.media.Schema; | ||
| import jakarta.validation.constraints.NotNull; | ||
| import lombok.Builder; | ||
| import lombok.EqualsAndHashCode; | ||
| import lombok.Getter; | ||
| import lombok.NoArgsConstructor; | ||
| import lombok.ToString; | ||
| import lombok.experimental.SuperBuilder; | ||
| import software.amazon.awssdk.http.SdkHttpMethod; | ||
|
|
||
| /** | ||
| * Creates an Amazon S3 Files file system backed by an S3 bucket. | ||
| */ | ||
| @SuperBuilder | ||
| @ToString | ||
| @EqualsAndHashCode | ||
| @Getter | ||
| @NoArgsConstructor | ||
| @Plugin( | ||
| examples = { | ||
| @Example( | ||
| full = true, | ||
| code = """ | ||
| id: aws_s3_files_create_filesystem | ||
| namespace: company.team | ||
|
|
||
| tasks: | ||
| - id: create_fs | ||
| type: io.kestra.plugin.aws.s3.files.CreateFileSystem | ||
| accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" | ||
| secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}" | ||
| region: "us-east-1" | ||
| bucket: "arn:aws:s3:::my-bucket" | ||
| roleArn: "arn:aws:iam::123456789012:role/S3FilesRole" | ||
| """ | ||
| ) | ||
| } | ||
| ) | ||
| @Schema( | ||
| title = "Create an Amazon S3 Files file system", | ||
| description = "Creates an S3 Files file system resource that makes an S3 bucket mountable as an NFS v4.1+ file system." | ||
| ) | ||
| public class CreateFileSystem extends AbstractS3Files implements RunnableTask<CreateFileSystem.Output> { | ||
|
Comment on lines
+53
to
+57
|
||
|
|
||
| @Schema(title = "S3 bucket ARN", description = "ARN of the S3 bucket to back the file system (e.g. arn:aws:s3:::my-bucket).") | ||
| @NotNull | ||
| private Property<String> bucket; | ||
|
|
||
| @Schema(title = "IAM role ARN", description = "ARN of the IAM role that grants the S3 Files service access to the bucket.") | ||
| @NotNull | ||
| private Property<String> roleArn; | ||
|
|
||
| @Schema(title = "Key prefix", description = "Optional prefix scoping the file system to a sub-path of the bucket.") | ||
| private Property<String> prefix; | ||
|
|
||
| @Schema(title = "KMS key ID", description = "ARN, key ID, or alias of the KMS key for encryption. When omitted the service-owned key is used.") | ||
| private Property<String> kmsKeyId; | ||
|
|
||
| @Schema(title = "Client token", description = "Idempotency token (up to 64 ASCII characters). Automatically generated when omitted.") | ||
| private Property<String> clientToken; | ||
|
|
||
| @Schema(title = "Tags", description = "Key-value tags to apply to the file system.") | ||
| @PluginProperty(dynamic = false) | ||
| private Property<Map<String, String>> tags; | ||
|
|
||
| @Schema(title = "Accept bucket warning", description = "Set to true to acknowledge any bucket configuration warnings and proceed with creation.") | ||
| @Builder.Default | ||
| private Property<Boolean> acceptBucketWarning = Property.ofValue(false); | ||
|
|
||
| @Override | ||
| public Output run(RunContext runContext) throws Exception { | ||
| Map<String, Object> body = new HashMap<>(); | ||
| body.put("bucket", runContext.render(bucket).as(String.class).orElseThrow()); | ||
| body.put("roleArn", runContext.render(roleArn).as(String.class).orElseThrow()); | ||
|
|
||
| runContext.render(prefix).as(String.class).ifPresent(v -> body.put("prefix", v)); | ||
| runContext.render(kmsKeyId).as(String.class).ifPresent(v -> body.put("kmsKeyId", v)); | ||
| runContext.render(clientToken).as(String.class).ifPresent(v -> body.put("clientToken", v)); | ||
| runContext.render(acceptBucketWarning).as(Boolean.class) | ||
| .ifPresent(v -> | ||
| { | ||
| if (Boolean.TRUE.equals(v)) | ||
| body.put("acceptBucketWarning", true); | ||
| }); | ||
|
|
||
| if (tags != null) { | ||
| Map<String, String> tagMap = runContext.render(tags).asMap(String.class, String.class); | ||
| if (!tagMap.isEmpty()) { | ||
| List<Map<String, String>> tagList = tagMap.entrySet().stream() | ||
| .map(e -> Map.of("key", e.getKey(), "value", e.getValue())) | ||
| .toList(); | ||
| body.put("tags", tagList); | ||
| } | ||
| } | ||
|
|
||
| S3FilesService.Response response = executeRequest( | ||
| runContext, | ||
| SdkHttpMethod.POST, | ||
| "/filesystems", | ||
| S3FilesService.toJson(body) | ||
| ); | ||
|
|
||
| FileSystem fs = S3FilesService.fromJson(response.body(), FileSystem.class); | ||
|
|
||
| runContext.logger().info("Created S3 Files file system: {} (status={})", fs.getFileSystemId(), fs.getStatus()); | ||
|
|
||
| return Output.builder() | ||
| .fileSystemId(fs.getFileSystemId()) | ||
| .fileSystemArn(fs.getFileSystemArn()) | ||
| .status(fs.getStatus()) | ||
| .creationTime(fs.getCreationTime()) | ||
| .build(); | ||
| } | ||
|
|
||
| @SuperBuilder | ||
| @Getter | ||
| @NoArgsConstructor | ||
| @Schema(title = "Output of the CreateFileSystem task") | ||
| public static class Output implements io.kestra.core.models.tasks.Output { | ||
|
|
||
| @Schema(title = "File system ID") | ||
| private String fileSystemId; | ||
|
|
||
| @Schema(title = "File system ARN") | ||
| private String fileSystemArn; | ||
|
|
||
| @Schema(title = "Lifecycle status") | ||
| private String status; | ||
|
|
||
| @Schema(title = "Creation time (Unix epoch seconds)") | ||
| private Long creationTime; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stsCredentialsProviderperforms a one-timeassumeRolecall and returns aStaticCredentialsProvider. This provider won’t refresh, and becausecredentialsProvider(runContext)is invoked during every request signing, the code can end up calling STS repeatedly (extra latency / risk of STS throttling) or using expired credentials for long-running executions. Prefer the repo’s existingConnectionUtils.stsAssumeRoleCredentialsProvider(...)(refreshing) or otherwise cache a refreshable provider for the task execution lifecycle.