From 39c3656972492ff498e535215664862a930cffca Mon Sep 17 00:00:00 2001
From: Nancy <9d.24.nancy.sangani@gmail.com>
Date: Sat, 18 Apr 2026 19:04:49 +0530
Subject: [PATCH 1/2] feat(s3): add Amazon S3 Files control-plane support
(#707)
---
.../plugin/aws/s3/files/AbstractS3Files.java | 206 +++++++++++
.../plugin/aws/s3/files/CreateFileSystem.java | 147 ++++++++
.../aws/s3/files/CreateMountTarget.java | 117 ++++++
.../plugin/aws/s3/files/DeleteFileSystem.java | 69 ++++
.../aws/s3/files/DeleteMountTarget.java | 72 ++++
.../plugin/aws/s3/files/GetFileSystem.java | 85 +++++
.../plugin/aws/s3/files/ListFileSystems.java | 131 +++++++
.../plugin/aws/s3/files/ListMountTargets.java | 130 +++++++
.../plugin/aws/s3/files/S3FilesService.java | 42 +++
.../aws/s3/files/models/FileSystem.java | 45 +++
.../aws/s3/files/models/MountTarget.java | 31 ++
.../aws/s3/files/S3FilesIntegrationTest.java | 104 ++++++
.../plugin/aws/s3/files/S3FilesTaskTest.java | 347 ++++++++++++++++++
13 files changed, 1526 insertions(+)
create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/AbstractS3Files.java
create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/CreateFileSystem.java
create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/CreateMountTarget.java
create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/DeleteFileSystem.java
create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/DeleteMountTarget.java
create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/GetFileSystem.java
create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/ListFileSystems.java
create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/ListMountTargets.java
create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/S3FilesService.java
create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/models/FileSystem.java
create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/models/MountTarget.java
create mode 100644 src/test/java/io/kestra/plugin/aws/s3/files/S3FilesIntegrationTest.java
create mode 100644 src/test/java/io/kestra/plugin/aws/s3/files/S3FilesTaskTest.java
diff --git a/src/main/java/io/kestra/plugin/aws/s3/files/AbstractS3Files.java b/src/main/java/io/kestra/plugin/aws/s3/files/AbstractS3Files.java
new file mode 100644
index 00000000..ae553a6a
--- /dev/null
+++ b/src/main/java/io/kestra/plugin/aws/s3/files/AbstractS3Files.java
@@ -0,0 +1,206 @@
+package io.kestra.plugin.aws.s3.files;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+
+import io.kestra.core.runners.RunContext;
+import io.kestra.plugin.aws.AbstractConnection;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.auth.signer.Aws4Signer;
+import software.amazon.awssdk.auth.signer.params.Aws4SignerParams;
+import software.amazon.awssdk.http.ExecutableHttpRequest;
+import software.amazon.awssdk.http.HttpExecuteRequest;
+import software.amazon.awssdk.http.HttpExecuteResponse;
+import software.amazon.awssdk.http.SdkHttpFullRequest;
+import software.amazon.awssdk.http.SdkHttpMethod;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.StsClientBuilder;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.services.sts.model.Credentials;
+
+@SuperBuilder
+@ToString
+@EqualsAndHashCode
+@Getter
+@NoArgsConstructor
+public abstract class AbstractS3Files extends AbstractConnection {
+
+ static final String S3_FILES_SERVICE = "s3files";
+
+ protected AwsCredentialsProvider credentialsProvider(RunContext runContext) throws Exception {
+ AbstractConnection.AwsClientConfig cfg = this.awsClientConfig(runContext);
+
+ if (cfg.stsRoleArn() != null) {
+ return stsCredentialsProvider(cfg);
+ }
+
+ if (cfg.accessKeyId() != null && cfg.secretKeyId() != null) {
+ if (cfg.sessionToken() != null) {
+ return StaticCredentialsProvider.create(
+ AwsSessionCredentials.create(cfg.accessKeyId(), cfg.secretKeyId(), cfg.sessionToken())
+ );
+ }
+ return StaticCredentialsProvider.create(
+ AwsBasicCredentials.create(cfg.accessKeyId(), cfg.secretKeyId())
+ );
+ }
+
+ return DefaultCredentialsProvider.create();
+ }
+
+ private AwsCredentialsProvider stsCredentialsProvider(AbstractConnection.AwsClientConfig cfg) {
+ StsClientBuilder stsBuilder = StsClient.builder()
+ .region(Region.of(cfg.region()));
+
+ if (cfg.stsEndpointOverride() != null) {
+ stsBuilder.endpointOverride(URI.create(cfg.stsEndpointOverride()));
+ }
+
+ if (cfg.accessKeyId() != null && cfg.secretKeyId() != null) {
+ stsBuilder.credentialsProvider(
+ StaticCredentialsProvider.create(
+ AwsBasicCredentials.create(cfg.accessKeyId(), cfg.secretKeyId())
+ )
+ );
+ }
+
+ try (StsClient stsClient = stsBuilder.build()) {
+ AssumeRoleRequest.Builder req = AssumeRoleRequest.builder()
+ .roleArn(cfg.stsRoleArn())
+ .roleSessionName(
+ cfg.stsRoleSessionName() != null
+ ? cfg.stsRoleSessionName()
+ : "kestra-s3files-session"
+ )
+ .durationSeconds(
+ (int) (cfg.stsRoleSessionDuration() != null
+ ? cfg.stsRoleSessionDuration().getSeconds()
+ : Duration.ofMinutes(15).getSeconds())
+ );
+
+ if (cfg.stsRoleExternalId() != null) {
+ req.externalId(cfg.stsRoleExternalId());
+ }
+
+ Credentials c = stsClient.assumeRole(req.build()).credentials();
+ return StaticCredentialsProvider.create(
+ AwsSessionCredentials.create(c.accessKeyId(), c.secretAccessKey(), c.sessionToken())
+ );
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // HTTP request execution
+ // -------------------------------------------------------------------------
+
+ /**
+ * Builds and executes a SigV4-signed HTTP request against the S3 Files control-plane
+ * endpoint.
+ *
+ *
+ * The signing hostname is always the canonical AWS endpoint
+ * ({@code s3files.{region}.amazonaws.com}) regardless of {@code endpointOverride},
+ * ensuring the SigV4 signature covers the correct {@code Host} header.
+ * When {@code endpointOverride} is configured the physical TCP connection is made
+ * to that override URL instead (useful for LocalStack / testing).
+ *
+ *
+ * @param runContext Kestra run context used for rendering properties
+ * @param method HTTP method (GET, PUT, POST, DELETE)
+ * @param path API path, e.g. {@code "/file-systems"} or
+ * {@code "/file-systems/fs-abc/mount-targets"}
+ * @param body request body bytes; {@code null} for bodyless methods (GET/DELETE)
+ * @return {@link S3FilesService.Response} containing the HTTP status and raw JSON body
+ * @throws RuntimeException if the API returns a non-2xx status code
+ */
+ protected S3FilesService.Response executeRequest(
+ RunContext runContext,
+ SdkHttpMethod method,
+ String path,
+ byte[] body) throws Exception {
+
+ AbstractConnection.AwsClientConfig cfg = this.awsClientConfig(runContext);
+
+ String regionStr = cfg.region();
+ if (regionStr == null) {
+ throw new IllegalArgumentException("region is required for S3 Files tasks");
+ }
+ Region region = Region.of(regionStr);
+
+ String serviceHost = S3_FILES_SERVICE + "." + regionStr + ".amazonaws.com";
+
+ String baseUrl = cfg.endpointOverride() != null
+ ? cfg.endpointOverride().replaceAll("/$", "")
+ : "https://" + serviceHost;
+
+ URI requestUri = URI.create(baseUrl + path);
+
+ byte[] bodyBytes = (body != null) ? body : new byte[0];
+
+ SdkHttpFullRequest.Builder requestBuilder = SdkHttpFullRequest.builder()
+ .method(method)
+ .uri(requestUri)
+ .putHeader("Content-Type", "application/json")
+ .putHeader("Host", serviceHost);
+
+ if (bodyBytes.length > 0) {
+ requestBuilder.contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes));
+ }
+
+ SdkHttpFullRequest unsignedRequest = requestBuilder.build();
+
+ Aws4SignerParams signerParams = Aws4SignerParams.builder()
+ .awsCredentials(credentialsProvider(runContext).resolveCredentials())
+ .signingName(S3_FILES_SERVICE)
+ .signingRegion(region)
+ .build();
+
+ SdkHttpFullRequest signedRequest = Aws4Signer.create().sign(unsignedRequest, signerParams);
+
+ try (ApacheHttpClient httpClient = (ApacheHttpClient) ApacheHttpClient.create()) {
+ ExecutableHttpRequest executableRequest = httpClient.prepareRequest(
+ HttpExecuteRequest.builder()
+ .request(signedRequest)
+ .contentStreamProvider(
+ bodyBytes.length > 0
+ ? () -> new ByteArrayInputStream(bodyBytes)
+ : null
+ )
+ .build()
+ );
+
+ HttpExecuteResponse response = executableRequest.call();
+ int statusCode = response.httpResponse().statusCode();
+
+ String responseBody = "";
+ if (response.responseBody().isPresent()) {
+ try (InputStream is = response.responseBody().get()) {
+ responseBody = new String(is.readAllBytes(), StandardCharsets.UTF_8);
+ }
+ }
+
+ if (statusCode < 200 || statusCode >= 300) {
+ throw new RuntimeException(
+ "S3 Files API error [HTTP " + statusCode + "]: " + responseBody
+ );
+ }
+
+ return new S3FilesService.Response(statusCode, responseBody);
+ }
+ }
+}
diff --git a/src/main/java/io/kestra/plugin/aws/s3/files/CreateFileSystem.java b/src/main/java/io/kestra/plugin/aws/s3/files/CreateFileSystem.java
new file mode 100644
index 00000000..79d6e1ec
--- /dev/null
+++ b/src/main/java/io/kestra/plugin/aws/s3/files/CreateFileSystem.java
@@ -0,0 +1,147 @@
+package io.kestra.plugin.aws.s3.files;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.kestra.core.models.annotations.Example;
+import io.kestra.core.models.annotations.Plugin;
+import io.kestra.core.models.annotations.PluginProperty;
+import io.kestra.core.models.property.Property;
+import io.kestra.core.models.tasks.RunnableTask;
+import io.kestra.core.runners.RunContext;
+import io.kestra.plugin.aws.s3.files.models.FileSystem;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+import jakarta.validation.constraints.NotNull;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import software.amazon.awssdk.http.SdkHttpMethod;
+
+/**
+ * Creates an Amazon S3 Files file system backed by an S3 bucket.
+ */
+@SuperBuilder
+@ToString
+@EqualsAndHashCode
+@Getter
+@NoArgsConstructor
+@Plugin(
+ examples = {
+ @Example(
+ full = true,
+ code = """
+ id: aws_s3_files_create_filesystem
+ namespace: company.team
+
+ tasks:
+ - id: create_fs
+ type: io.kestra.plugin.aws.s3.files.CreateFileSystem
+ accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
+ secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
+ region: "us-east-1"
+ bucket: "arn:aws:s3:::my-bucket"
+ roleArn: "arn:aws:iam::123456789012:role/S3FilesRole"
+ """
+ )
+ }
+)
+@Schema(
+ title = "Create an Amazon S3 Files file system",
+ description = "Creates an S3 Files file system resource that makes an S3 bucket mountable as an NFS v4.1+ file system."
+)
+public class CreateFileSystem extends AbstractS3Files implements RunnableTask {
+
+ @Schema(title = "S3 bucket ARN", description = "ARN of the S3 bucket to back the file system (e.g. arn:aws:s3:::my-bucket).")
+ @NotNull
+ private Property bucket;
+
+ @Schema(title = "IAM role ARN", description = "ARN of the IAM role that grants the S3 Files service access to the bucket.")
+ @NotNull
+ private Property roleArn;
+
+ @Schema(title = "Key prefix", description = "Optional prefix scoping the file system to a sub-path of the bucket.")
+ private Property prefix;
+
+ @Schema(title = "KMS key ID", description = "ARN, key ID, or alias of the KMS key for encryption. When omitted the service-owned key is used.")
+ private Property kmsKeyId;
+
+ @Schema(title = "Client token", description = "Idempotency token (up to 64 ASCII characters). Automatically generated when omitted.")
+ private Property clientToken;
+
+ @Schema(title = "Tags", description = "Key-value tags to apply to the file system.")
+ @PluginProperty(dynamic = false)
+ private Property