From 39c3656972492ff498e535215664862a930cffca Mon Sep 17 00:00:00 2001 From: Nancy <9d.24.nancy.sangani@gmail.com> Date: Sat, 18 Apr 2026 19:04:49 +0530 Subject: [PATCH 1/2] feat(s3): add Amazon S3 Files control-plane support (#707) --- .../plugin/aws/s3/files/AbstractS3Files.java | 206 +++++++++++ .../plugin/aws/s3/files/CreateFileSystem.java | 147 ++++++++ .../aws/s3/files/CreateMountTarget.java | 117 ++++++ .../plugin/aws/s3/files/DeleteFileSystem.java | 69 ++++ .../aws/s3/files/DeleteMountTarget.java | 72 ++++ .../plugin/aws/s3/files/GetFileSystem.java | 85 +++++ .../plugin/aws/s3/files/ListFileSystems.java | 131 +++++++ .../plugin/aws/s3/files/ListMountTargets.java | 130 +++++++ .../plugin/aws/s3/files/S3FilesService.java | 42 +++ .../aws/s3/files/models/FileSystem.java | 45 +++ .../aws/s3/files/models/MountTarget.java | 31 ++ .../aws/s3/files/S3FilesIntegrationTest.java | 104 ++++++ .../plugin/aws/s3/files/S3FilesTaskTest.java | 347 ++++++++++++++++++ 13 files changed, 1526 insertions(+) create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/AbstractS3Files.java create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/CreateFileSystem.java create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/CreateMountTarget.java create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/DeleteFileSystem.java create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/DeleteMountTarget.java create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/GetFileSystem.java create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/ListFileSystems.java create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/ListMountTargets.java create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/S3FilesService.java create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/models/FileSystem.java create mode 100644 src/main/java/io/kestra/plugin/aws/s3/files/models/MountTarget.java create mode 100644 src/test/java/io/kestra/plugin/aws/s3/files/S3FilesIntegrationTest.java create mode 100644 src/test/java/io/kestra/plugin/aws/s3/files/S3FilesTaskTest.java diff --git a/src/main/java/io/kestra/plugin/aws/s3/files/AbstractS3Files.java b/src/main/java/io/kestra/plugin/aws/s3/files/AbstractS3Files.java new file mode 100644 index 00000000..ae553a6a --- /dev/null +++ b/src/main/java/io/kestra/plugin/aws/s3/files/AbstractS3Files.java @@ -0,0 +1,206 @@ +package io.kestra.plugin.aws.s3.files; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.Duration; + +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.aws.AbstractConnection; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.auth.signer.Aws4Signer; +import software.amazon.awssdk.auth.signer.params.Aws4SignerParams; +import software.amazon.awssdk.http.ExecutableHttpRequest; +import software.amazon.awssdk.http.HttpExecuteRequest; +import software.amazon.awssdk.http.HttpExecuteResponse; +import software.amazon.awssdk.http.SdkHttpFullRequest; +import software.amazon.awssdk.http.SdkHttpMethod; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.StsClientBuilder; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; +import software.amazon.awssdk.services.sts.model.Credentials; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +public abstract class AbstractS3Files extends AbstractConnection { + + static final String S3_FILES_SERVICE = "s3files"; + + protected AwsCredentialsProvider credentialsProvider(RunContext runContext) throws Exception { + AbstractConnection.AwsClientConfig cfg = this.awsClientConfig(runContext); + + if (cfg.stsRoleArn() != null) { + return stsCredentialsProvider(cfg); + } + + if (cfg.accessKeyId() != null && cfg.secretKeyId() != null) { + if (cfg.sessionToken() != null) { + return StaticCredentialsProvider.create( + AwsSessionCredentials.create(cfg.accessKeyId(), cfg.secretKeyId(), cfg.sessionToken()) + ); + } + return StaticCredentialsProvider.create( + AwsBasicCredentials.create(cfg.accessKeyId(), cfg.secretKeyId()) + ); + } + + return DefaultCredentialsProvider.create(); + } + + private AwsCredentialsProvider stsCredentialsProvider(AbstractConnection.AwsClientConfig cfg) { + StsClientBuilder stsBuilder = StsClient.builder() + .region(Region.of(cfg.region())); + + if (cfg.stsEndpointOverride() != null) { + stsBuilder.endpointOverride(URI.create(cfg.stsEndpointOverride())); + } + + if (cfg.accessKeyId() != null && cfg.secretKeyId() != null) { + stsBuilder.credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create(cfg.accessKeyId(), cfg.secretKeyId()) + ) + ); + } + + try (StsClient stsClient = stsBuilder.build()) { + AssumeRoleRequest.Builder req = AssumeRoleRequest.builder() + .roleArn(cfg.stsRoleArn()) + .roleSessionName( + cfg.stsRoleSessionName() != null + ? cfg.stsRoleSessionName() + : "kestra-s3files-session" + ) + .durationSeconds( + (int) (cfg.stsRoleSessionDuration() != null + ? cfg.stsRoleSessionDuration().getSeconds() + : Duration.ofMinutes(15).getSeconds()) + ); + + if (cfg.stsRoleExternalId() != null) { + req.externalId(cfg.stsRoleExternalId()); + } + + Credentials c = stsClient.assumeRole(req.build()).credentials(); + return StaticCredentialsProvider.create( + AwsSessionCredentials.create(c.accessKeyId(), c.secretAccessKey(), c.sessionToken()) + ); + } + } + + // ------------------------------------------------------------------------- + // HTTP request execution + // ------------------------------------------------------------------------- + + /** + * Builds and executes a SigV4-signed HTTP request against the S3 Files control-plane + * endpoint. + * + *

+ * The signing hostname is always the canonical AWS endpoint + * ({@code s3files.{region}.amazonaws.com}) regardless of {@code endpointOverride}, + * ensuring the SigV4 signature covers the correct {@code Host} header. + * When {@code endpointOverride} is configured the physical TCP connection is made + * to that override URL instead (useful for LocalStack / testing). + *

+ * + * @param runContext Kestra run context used for rendering properties + * @param method HTTP method (GET, PUT, POST, DELETE) + * @param path API path, e.g. {@code "/file-systems"} or + * {@code "/file-systems/fs-abc/mount-targets"} + * @param body request body bytes; {@code null} for bodyless methods (GET/DELETE) + * @return {@link S3FilesService.Response} containing the HTTP status and raw JSON body + * @throws RuntimeException if the API returns a non-2xx status code + */ + protected S3FilesService.Response executeRequest( + RunContext runContext, + SdkHttpMethod method, + String path, + byte[] body) throws Exception { + + AbstractConnection.AwsClientConfig cfg = this.awsClientConfig(runContext); + + String regionStr = cfg.region(); + if (regionStr == null) { + throw new IllegalArgumentException("region is required for S3 Files tasks"); + } + Region region = Region.of(regionStr); + + String serviceHost = S3_FILES_SERVICE + "." + regionStr + ".amazonaws.com"; + + String baseUrl = cfg.endpointOverride() != null + ? cfg.endpointOverride().replaceAll("/$", "") + : "https://" + serviceHost; + + URI requestUri = URI.create(baseUrl + path); + + byte[] bodyBytes = (body != null) ? body : new byte[0]; + + SdkHttpFullRequest.Builder requestBuilder = SdkHttpFullRequest.builder() + .method(method) + .uri(requestUri) + .putHeader("Content-Type", "application/json") + .putHeader("Host", serviceHost); + + if (bodyBytes.length > 0) { + requestBuilder.contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes)); + } + + SdkHttpFullRequest unsignedRequest = requestBuilder.build(); + + Aws4SignerParams signerParams = Aws4SignerParams.builder() + .awsCredentials(credentialsProvider(runContext).resolveCredentials()) + .signingName(S3_FILES_SERVICE) + .signingRegion(region) + .build(); + + SdkHttpFullRequest signedRequest = Aws4Signer.create().sign(unsignedRequest, signerParams); + + try (ApacheHttpClient httpClient = (ApacheHttpClient) ApacheHttpClient.create()) { + ExecutableHttpRequest executableRequest = httpClient.prepareRequest( + HttpExecuteRequest.builder() + .request(signedRequest) + .contentStreamProvider( + bodyBytes.length > 0 + ? () -> new ByteArrayInputStream(bodyBytes) + : null + ) + .build() + ); + + HttpExecuteResponse response = executableRequest.call(); + int statusCode = response.httpResponse().statusCode(); + + String responseBody = ""; + if (response.responseBody().isPresent()) { + try (InputStream is = response.responseBody().get()) { + responseBody = new String(is.readAllBytes(), StandardCharsets.UTF_8); + } + } + + if (statusCode < 200 || statusCode >= 300) { + throw new RuntimeException( + "S3 Files API error [HTTP " + statusCode + "]: " + responseBody + ); + } + + return new S3FilesService.Response(statusCode, responseBody); + } + } +} diff --git a/src/main/java/io/kestra/plugin/aws/s3/files/CreateFileSystem.java b/src/main/java/io/kestra/plugin/aws/s3/files/CreateFileSystem.java new file mode 100644 index 00000000..79d6e1ec --- /dev/null +++ b/src/main/java/io/kestra/plugin/aws/s3/files/CreateFileSystem.java @@ -0,0 +1,147 @@ +package io.kestra.plugin.aws.s3.files; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.aws.s3.files.models.FileSystem; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import software.amazon.awssdk.http.SdkHttpMethod; + +/** + * Creates an Amazon S3 Files file system backed by an S3 bucket. + */ +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Plugin( + examples = { + @Example( + full = true, + code = """ + id: aws_s3_files_create_filesystem + namespace: company.team + + tasks: + - id: create_fs + type: io.kestra.plugin.aws.s3.files.CreateFileSystem + accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" + secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}" + region: "us-east-1" + bucket: "arn:aws:s3:::my-bucket" + roleArn: "arn:aws:iam::123456789012:role/S3FilesRole" + """ + ) + } +) +@Schema( + title = "Create an Amazon S3 Files file system", + description = "Creates an S3 Files file system resource that makes an S3 bucket mountable as an NFS v4.1+ file system." +) +public class CreateFileSystem extends AbstractS3Files implements RunnableTask { + + @Schema(title = "S3 bucket ARN", description = "ARN of the S3 bucket to back the file system (e.g. arn:aws:s3:::my-bucket).") + @NotNull + private Property bucket; + + @Schema(title = "IAM role ARN", description = "ARN of the IAM role that grants the S3 Files service access to the bucket.") + @NotNull + private Property roleArn; + + @Schema(title = "Key prefix", description = "Optional prefix scoping the file system to a sub-path of the bucket.") + private Property prefix; + + @Schema(title = "KMS key ID", description = "ARN, key ID, or alias of the KMS key for encryption. When omitted the service-owned key is used.") + private Property kmsKeyId; + + @Schema(title = "Client token", description = "Idempotency token (up to 64 ASCII characters). Automatically generated when omitted.") + private Property clientToken; + + @Schema(title = "Tags", description = "Key-value tags to apply to the file system.") + @PluginProperty(dynamic = false) + private Property> tags; + + @Schema(title = "Accept bucket warning", description = "Set to true to acknowledge any bucket configuration warnings and proceed with creation.") + @Builder.Default + private Property acceptBucketWarning = Property.ofValue(false); + + @Override + public Output run(RunContext runContext) throws Exception { + Map body = new HashMap<>(); + body.put("bucket", runContext.render(bucket).as(String.class).orElseThrow()); + body.put("roleArn", runContext.render(roleArn).as(String.class).orElseThrow()); + + runContext.render(prefix).as(String.class).ifPresent(v -> body.put("prefix", v)); + runContext.render(kmsKeyId).as(String.class).ifPresent(v -> body.put("kmsKeyId", v)); + runContext.render(clientToken).as(String.class).ifPresent(v -> body.put("clientToken", v)); + runContext.render(acceptBucketWarning).as(Boolean.class) + .ifPresent(v -> + { + if (Boolean.TRUE.equals(v)) + body.put("acceptBucketWarning", true); + }); + + if (tags != null) { + Map tagMap = runContext.render(tags).asMap(String.class, String.class); + if (!tagMap.isEmpty()) { + List> tagList = tagMap.entrySet().stream() + .map(e -> Map.of("key", e.getKey(), "value", e.getValue())) + .toList(); + body.put("tags", tagList); + } + } + + S3FilesService.Response response = executeRequest( + runContext, + SdkHttpMethod.POST, + "/filesystems", + S3FilesService.toJson(body) + ); + + FileSystem fs = S3FilesService.fromJson(response.body(), FileSystem.class); + + runContext.logger().info("Created S3 Files file system: {} (status={})", fs.getFileSystemId(), fs.getStatus()); + + return Output.builder() + .fileSystemId(fs.getFileSystemId()) + .fileSystemArn(fs.getFileSystemArn()) + .status(fs.getStatus()) + .creationTime(fs.getCreationTime()) + .build(); + } + + @SuperBuilder + @Getter + @NoArgsConstructor + @Schema(title = "Output of the CreateFileSystem task") + public static class Output implements io.kestra.core.models.tasks.Output { + + @Schema(title = "File system ID") + private String fileSystemId; + + @Schema(title = "File system ARN") + private String fileSystemArn; + + @Schema(title = "Lifecycle status") + private String status; + + @Schema(title = "Creation time (Unix epoch seconds)") + private Long creationTime; + } +} diff --git a/src/main/java/io/kestra/plugin/aws/s3/files/CreateMountTarget.java b/src/main/java/io/kestra/plugin/aws/s3/files/CreateMountTarget.java new file mode 100644 index 00000000..95f37745 --- /dev/null +++ b/src/main/java/io/kestra/plugin/aws/s3/files/CreateMountTarget.java @@ -0,0 +1,117 @@ +package io.kestra.plugin.aws.s3.files; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.aws.s3.files.models.MountTarget; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import software.amazon.awssdk.http.SdkHttpMethod; + +/** + * Creates an NFS mount target for an Amazon S3 Files file system in the + * specified subnet. + */ +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Plugin( + examples = { + @Example(full = true, code = """ + id: aws_s3_files_create_mount_target + namespace: company.team + + tasks: + - id: create_mt + type: io.kestra.plugin.aws.s3.files.CreateMountTarget + accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" + secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}" + region: "us-east-1" + fileSystemId: "fs-0123456789abcdef0" + subnetId: "subnet-0123456789abcdef0" + """) + } +) +@Schema(title = "Create an Amazon S3 Files mount target", description = "Creates an NFS v4.1 mount target in a VPC subnet, enabling EC2 / Lambda / ECS workloads to mount the file system.") +public class CreateMountTarget extends AbstractS3Files implements RunnableTask { + + @Schema(title = "File system ID", description = "The ID of the S3 Files file system for which to create the mount target.") + @NotNull + private Property fileSystemId; + + @Schema(title = "Subnet ID", description = "The ID of the VPC subnet where the mount target will be created.") + @NotNull + private Property subnetId; + + @Schema(title = "IP address", description = "Optional static IPv4 address for the mount target. When omitted, AWS assigns one automatically.") + private Property ipAddress; + + @Schema(title = "Security group IDs", description = "Up to 5 security group IDs to associate with the mount target's network interface.") + @PluginProperty(dynamic = false) + private Property> securityGroups; + + @Override + public Output run(RunContext runContext) throws Exception { + String fsId = runContext.render(fileSystemId).as(String.class).orElseThrow(); + + Map body = new HashMap<>(); + body.put("subnetId", runContext.render(subnetId).as(String.class).orElseThrow()); + + runContext.render(ipAddress).as(String.class).ifPresent(v -> body.put("ipAddress", v)); + + if (securityGroups != null) { + List sgList = runContext.render(securityGroups).asList(String.class); + if (!sgList.isEmpty()) { + body.put("securityGroups", sgList); + } + } + + S3FilesService.Response response = executeRequest( + runContext, + SdkHttpMethod.POST, + "/filesystems/" + fsId + "/mounttargets", + S3FilesService.toJson(body) + ); + + MountTarget mt = S3FilesService.fromJson(response.body(), MountTarget.class); + + runContext.logger().info("Created S3 Files mount target: {} (status={})", mt.getMountTargetId(), mt.getStatus()); + + return Output.builder() + .mountTargetId(mt.getMountTargetId()) + .ipAddress(mt.getIpAddress()) + .status(mt.getStatus()) + .build(); + } + + @SuperBuilder + @Getter + @NoArgsConstructor + @Schema(title = "Output of the CreateMountTarget task") + public static class Output implements io.kestra.core.models.tasks.Output { + + @Schema(title = "Mount target ID") + private String mountTargetId; + + @Schema(title = "IPv4 address of the NFS endpoint") + private String ipAddress; + + @Schema(title = "Lifecycle status") + private String status; + } +} diff --git a/src/main/java/io/kestra/plugin/aws/s3/files/DeleteFileSystem.java b/src/main/java/io/kestra/plugin/aws/s3/files/DeleteFileSystem.java new file mode 100644 index 00000000..d267576c --- /dev/null +++ b/src/main/java/io/kestra/plugin/aws/s3/files/DeleteFileSystem.java @@ -0,0 +1,69 @@ +package io.kestra.plugin.aws.s3.files; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.runners.RunContext; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import software.amazon.awssdk.http.SdkHttpMethod; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Plugin( + examples = { + @Example( + full = true, + code = """ + id: aws_s3_files_delete_filesystem + namespace: company.team + + tasks: + - id: delete_fs + type: io.kestra.plugin.aws.s3.files.DeleteFileSystem + accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" + secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}" + region: "us-east-1" + fileSystemId: "fs-0123456789abcdef0" + """ + ) + } +) +@Schema( + title = "Delete an Amazon S3 Files file system", + description = "Deletes an S3 Files file system. All mount targets must be removed before issuing this call." +) +public class DeleteFileSystem extends AbstractS3Files implements RunnableTask { + + @Schema(title = "File system ID", description = "The ID of the file system to delete.") + @NotNull + private Property fileSystemId; + + @Override + public Output run(RunContext runContext) throws Exception { + String fsId = runContext.render(fileSystemId).as(String.class).orElseThrow(); + + executeRequest(runContext, SdkHttpMethod.DELETE, "/filesystems/" + fsId, null); + + runContext.logger().info("Deleted S3 Files file system: {}", fsId); + + return Output.builder().build(); + } + + @SuperBuilder + @Getter + @NoArgsConstructor + @Schema(title = "Output of the DeleteFileSystem task") + public static class Output implements io.kestra.core.models.tasks.Output { + } +} diff --git a/src/main/java/io/kestra/plugin/aws/s3/files/DeleteMountTarget.java b/src/main/java/io/kestra/plugin/aws/s3/files/DeleteMountTarget.java new file mode 100644 index 00000000..bb84c84a --- /dev/null +++ b/src/main/java/io/kestra/plugin/aws/s3/files/DeleteMountTarget.java @@ -0,0 +1,72 @@ +package io.kestra.plugin.aws.s3.files; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.runners.RunContext; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import software.amazon.awssdk.http.SdkHttpMethod; + +/** + * Deletes an Amazon S3 Files mount target. + */ +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Plugin( + examples = { + @Example( + full = true, + code = """ + id: aws_s3_files_delete_mount_target + namespace: company.team + + tasks: + - id: delete_mt + type: io.kestra.plugin.aws.s3.files.DeleteMountTarget + accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" + secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}" + region: "us-east-1" + mountTargetId: "mt-0123456789abcdef0" + """ + ) + } +) +@Schema( + title = "Delete an Amazon S3 Files mount target", + description = "Deletes the specified NFS mount target. The file system becomes inaccessible from the associated subnet once the mount target is deleted." +) +public class DeleteMountTarget extends AbstractS3Files implements RunnableTask { + + @Schema(title = "Mount target ID", description = "The ID of the mount target to delete.") + @NotNull + private Property mountTargetId; + + @Override + public Output run(RunContext runContext) throws Exception { + String mtId = runContext.render(mountTargetId).as(String.class).orElseThrow(); + + executeRequest(runContext, SdkHttpMethod.DELETE, "/mounttargets/" + mtId, null); + + runContext.logger().info("Deleted S3 Files mount target: {}", mtId); + + return Output.builder().build(); + } + + @SuperBuilder + @Getter + @NoArgsConstructor + @Schema(title = "Output of the DeleteMountTarget task") + public static class Output implements io.kestra.core.models.tasks.Output { + } +} diff --git a/src/main/java/io/kestra/plugin/aws/s3/files/GetFileSystem.java b/src/main/java/io/kestra/plugin/aws/s3/files/GetFileSystem.java new file mode 100644 index 00000000..93006e62 --- /dev/null +++ b/src/main/java/io/kestra/plugin/aws/s3/files/GetFileSystem.java @@ -0,0 +1,85 @@ +package io.kestra.plugin.aws.s3.files; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.aws.s3.files.models.FileSystem; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import software.amazon.awssdk.http.SdkHttpMethod; + +/** + * Retrieves the full details of a single Amazon S3 Files file system by ID. + */ +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Plugin( + examples = { + @Example( + full = true, + code = """ + id: aws_s3_files_get_filesystem + namespace: company.team + + tasks: + - id: get_fs + type: io.kestra.plugin.aws.s3.files.GetFileSystem + accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" + secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}" + region: "us-east-1" + fileSystemId: "fs-0123456789abcdef0" + """ + ) + } +) +@Schema( + title = "Get an Amazon S3 Files file system", + description = "Retrieves the metadata and status of an existing S3 Files file system." +) +public class GetFileSystem extends AbstractS3Files implements RunnableTask { + + @Schema(title = "File system ID", description = "The ID of the file system to retrieve (e.g. fs-0123456789abcdef0).") + @NotNull + private Property fileSystemId; + + @Override + public Output run(RunContext runContext) throws Exception { + String fsId = runContext.render(fileSystemId).as(String.class).orElseThrow(); + + S3FilesService.Response response = executeRequest( + runContext, + SdkHttpMethod.GET, + "/filesystems/" + fsId, + null + ); + + FileSystem fs = S3FilesService.fromJson(response.body(), FileSystem.class); + + runContext.logger().info("Retrieved S3 Files file system: {} (status={})", fs.getFileSystemId(), fs.getStatus()); + + return Output.builder() + .fileSystem(fs) + .build(); + } + + @SuperBuilder + @Getter + @NoArgsConstructor + @Schema(title = "Output of the GetFileSystem task") + public static class Output implements io.kestra.core.models.tasks.Output { + + @Schema(title = "The full file system details") + private FileSystem fileSystem; + } +} diff --git a/src/main/java/io/kestra/plugin/aws/s3/files/ListFileSystems.java b/src/main/java/io/kestra/plugin/aws/s3/files/ListFileSystems.java new file mode 100644 index 00000000..cc6f9736 --- /dev/null +++ b/src/main/java/io/kestra/plugin/aws/s3/files/ListFileSystems.java @@ -0,0 +1,131 @@ +package io.kestra.plugin.aws.s3.files; + +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.aws.s3.files.models.FileSystem; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import software.amazon.awssdk.http.SdkHttpMethod; + +/** + * Lists Amazon S3 Files file systems, with optional filtering by bucket ARN and pagination. + */ +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Plugin( + examples = { + @Example( + full = true, + code = """ + id: aws_s3_files_list_filesystems + namespace: company.team + + tasks: + - id: list_fs + type: io.kestra.plugin.aws.s3.files.ListFileSystems + accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" + secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}" + region: "us-east-1" + bucket: "arn:aws:s3:::my-bucket" + maxResults: 10 + """ + ) + } +) +@Schema( + title = "List Amazon S3 Files file systems", + description = "Returns a paginated list of S3 Files file systems, optionally filtered by bucket ARN." +) +public class ListFileSystems extends AbstractS3Files implements RunnableTask { + + @Schema(title = "Bucket ARN filter", description = "When set, returns only file systems backed by this bucket ARN.") + private Property bucket; + + @Schema(title = "Maximum results", description = "Maximum number of file systems to return per page.") + private Property maxResults; + + @Schema(title = "Pagination token", description = "Token returned from a previous call to retrieve the next page.") + private Property nextToken; + + @Override + public Output run(RunContext runContext) throws Exception { + StringBuilder path = new StringBuilder("/filesystems"); + String sep = "?"; + + String bucketVal = bucket != null + ? runContext.render(bucket).as(String.class).orElse(null) + : null; + if (bucketVal != null) { + path.append(sep).append("bucket=").append(java.net.URLEncoder.encode(bucketVal, java.nio.charset.StandardCharsets.UTF_8)); + sep = "&"; + } + + Integer maxVal = maxResults != null + ? runContext.render(maxResults).as(Integer.class).orElse(null) + : null; + if (maxVal != null) { + path.append(sep).append("maxResults=").append(maxVal); + sep = "&"; + } + + String tokenVal = nextToken != null + ? runContext.render(nextToken).as(String.class).orElse(null) + : null; + if (tokenVal != null) { + path.append(sep).append("nextToken=").append(java.net.URLEncoder.encode(tokenVal, java.nio.charset.StandardCharsets.UTF_8)); + } + + S3FilesService.Response response = executeRequest( + runContext, + SdkHttpMethod.GET, + path.toString(), + null + ); + + ListFileSystemsResponse parsed = S3FilesService.fromJson(response.body(), ListFileSystemsResponse.class); + + runContext.logger().info("Listed {} S3 Files file system(s)", parsed.getFileSystems() != null ? parsed.getFileSystems().size() : 0); + + return Output.builder() + .fileSystems(parsed.getFileSystems()) + .nextToken(parsed.getNextToken()) + .build(); + } + + /** Internal wrapper matching the API list response envelope. */ + @Data + @JsonIgnoreProperties(ignoreUnknown = true) + private static class ListFileSystemsResponse { + private List fileSystems; + private String nextToken; + } + + @SuperBuilder + @Getter + @NoArgsConstructor + @Schema(title = "Output of the ListFileSystems task") + public static class Output implements io.kestra.core.models.tasks.Output { + + @Schema(title = "List of file systems") + private List fileSystems; + + @Schema(title = "Pagination token for the next page") + private String nextToken; + } +} diff --git a/src/main/java/io/kestra/plugin/aws/s3/files/ListMountTargets.java b/src/main/java/io/kestra/plugin/aws/s3/files/ListMountTargets.java new file mode 100644 index 00000000..ed244eda --- /dev/null +++ b/src/main/java/io/kestra/plugin/aws/s3/files/ListMountTargets.java @@ -0,0 +1,130 @@ +package io.kestra.plugin.aws.s3.files; + +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.aws.s3.files.models.MountTarget; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import software.amazon.awssdk.http.SdkHttpMethod; + +/** + * Lists all mount targets for an Amazon S3 Files file system. + */ +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Plugin( + examples = { + @Example( + full = true, + code = """ + id: aws_s3_files_list_mount_targets + namespace: company.team + + tasks: + - id: list_mt + type: io.kestra.plugin.aws.s3.files.ListMountTargets + accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" + secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}" + region: "us-east-1" + fileSystemId: "fs-0123456789abcdef0" + """ + ) + } +) +@Schema( + title = "List Amazon S3 Files mount targets", + description = "Returns a paginated list of mount targets for the specified S3 Files file system." +) +public class ListMountTargets extends AbstractS3Files implements RunnableTask { + + @Schema(title = "File system ID", description = "The ID of the file system whose mount targets to list.") + @NotNull + private Property fileSystemId; + + @Schema(title = "Maximum results", description = "Maximum number of mount targets to return per page.") + private Property maxResults; + + @Schema(title = "Pagination token", description = "Token from a previous call to retrieve the next page.") + private Property nextToken; + + @Override + public Output run(RunContext runContext) throws Exception { + String fsId = runContext.render(fileSystemId).as(String.class).orElseThrow(); + + StringBuilder path = new StringBuilder("/filesystems/" + fsId + "/mounttargets"); + String sep = "?"; + + Integer maxVal = maxResults != null + ? runContext.render(maxResults).as(Integer.class).orElse(null) + : null; + if (maxVal != null) { + path.append(sep).append("maxResults=").append(maxVal); + sep = "&"; + } + + String tokenVal = nextToken != null + ? runContext.render(nextToken).as(String.class).orElse(null) + : null; + if (tokenVal != null) { + path.append(sep).append("nextToken=").append( + java.net.URLEncoder.encode(tokenVal, java.nio.charset.StandardCharsets.UTF_8) + ); + } + + S3FilesService.Response response = executeRequest( + runContext, + SdkHttpMethod.GET, + path.toString(), + null + ); + + ListMountTargetsResponse parsed = S3FilesService.fromJson(response.body(), ListMountTargetsResponse.class); + + runContext.logger().info( + "Listed {} mount target(s) for file system: {}", + parsed.getMountTargets() != null ? parsed.getMountTargets().size() : 0, fsId + ); + + return Output.builder() + .mountTargets(parsed.getMountTargets()) + .nextToken(parsed.getNextToken()) + .build(); + } + + @Data + @JsonIgnoreProperties(ignoreUnknown = true) + private static class ListMountTargetsResponse { + private List mountTargets; + private String nextToken; + } + + @SuperBuilder + @Getter + @NoArgsConstructor + @Schema(title = "Output of the ListMountTargets task") + public static class Output implements io.kestra.core.models.tasks.Output { + + @Schema(title = "List of mount targets") + private List mountTargets; + + @Schema(title = "Pagination token for the next page") + private String nextToken; + } +} diff --git a/src/main/java/io/kestra/plugin/aws/s3/files/S3FilesService.java b/src/main/java/io/kestra/plugin/aws/s3/files/S3FilesService.java new file mode 100644 index 00000000..cc8f199e --- /dev/null +++ b/src/main/java/io/kestra/plugin/aws/s3/files/S3FilesService.java @@ -0,0 +1,42 @@ +package io.kestra.plugin.aws.s3.files; + +import java.io.IOException; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; + +public final class S3FilesService { + + private S3FilesService() { + } + + static final ObjectMapper MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) + .setSerializationInclusion(JsonInclude.Include.NON_NULL); + + /** + * Serializes an object to a compact JSON byte array. + */ + public static byte[] toJson(Object value) throws IOException { + return MAPPER.writeValueAsBytes(value); + } + + /** + * Deserializes a JSON string to the given type. + */ + public static T fromJson(String json, Class type) throws IOException { + return MAPPER.readValue(json, type); + } + + /** + * Lightweight wrapper around a raw HTTP response from the S3 Files API. + * + * @param statusCode HTTP status code + * @param body raw response body (UTF-8 JSON) + */ + public record Response(int statusCode, String body) { + } +} diff --git a/src/main/java/io/kestra/plugin/aws/s3/files/models/FileSystem.java b/src/main/java/io/kestra/plugin/aws/s3/files/models/FileSystem.java new file mode 100644 index 00000000..59e7f976 --- /dev/null +++ b/src/main/java/io/kestra/plugin/aws/s3/files/models/FileSystem.java @@ -0,0 +1,45 @@ +package io.kestra.plugin.aws.s3.files.models; + +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Represents an Amazon S3 Files file system resource as returned by the control-plane API. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +public class FileSystem { + + private String fileSystemId; + private String fileSystemArn; + private String status; + private String statusMessage; + private String bucket; + private String prefix; + private String roleArn; + private String kmsKeyId; + private String name; + private String ownerId; + private Long creationTime; + private List tags; + private String clientToken; + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonIgnoreProperties(ignoreUnknown = true) + public static class Tag { + private String key; + private String value; + } +} diff --git a/src/main/java/io/kestra/plugin/aws/s3/files/models/MountTarget.java b/src/main/java/io/kestra/plugin/aws/s3/files/models/MountTarget.java new file mode 100644 index 00000000..637fecdf --- /dev/null +++ b/src/main/java/io/kestra/plugin/aws/s3/files/models/MountTarget.java @@ -0,0 +1,31 @@ +package io.kestra.plugin.aws.s3.files.models; + +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Represents an Amazon S3 Files mount target as returned by the control-plane API. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +public class MountTarget { + + private String mountTargetId; + private String fileSystemId; + private String subnetId; + private String ipAddress; + private String status; + private String statusMessage; + private String vpcId; + private String ownerId; + private List securityGroups; +} diff --git a/src/test/java/io/kestra/plugin/aws/s3/files/S3FilesIntegrationTest.java b/src/test/java/io/kestra/plugin/aws/s3/files/S3FilesIntegrationTest.java new file mode 100644 index 00000000..96677432 --- /dev/null +++ b/src/test/java/io/kestra/plugin/aws/s3/files/S3FilesIntegrationTest.java @@ -0,0 +1,104 @@ +package io.kestra.plugin.aws.s3.files; + +import java.util.Map; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import io.kestra.core.models.property.Property; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.plugin.aws.AbstractLocalStackTest; + +import jakarta.inject.Inject; + +import static org.assertj.core.api.Assertions.assertThat; + +@Disabled("LocalStack 3.4.0 does not yet support the S3 Files control-plane API") +class S3FilesIntegrationTest extends AbstractLocalStackTest { + + @Inject + RunContextFactory runContextFactory; + + @Test + void fullLifecycle() throws Exception { + RunContext runContext = runContextFactory.of(Map.of()); + + String endpointOverride = "http://localhost:4566"; + String region = "us-east-1"; + String bucketArn = "arn:aws:s3:::my-test-bucket"; + String roleArn = "arn:aws:iam::000000000000:role/S3FilesRole"; + String subnetId = "subnet-00000000"; + + // 1. Create file system + CreateFileSystem createFs = CreateFileSystem.builder() + .endpointOverride(Property.ofValue(endpointOverride)) + .region(Property.ofValue(region)) + .bucket(Property.ofValue(bucketArn)) + .roleArn(Property.ofValue(roleArn)) + .build(); + + CreateFileSystem.Output createFsOutput = createFs.run(runContext); + assertThat(createFsOutput.getFileSystemId()).isNotNull(); + + String fsId = createFsOutput.getFileSystemId(); + + // 2. Get file system + GetFileSystem getFs = GetFileSystem.builder() + .endpointOverride(Property.ofValue(endpointOverride)) + .region(Property.ofValue(region)) + .fileSystemId(Property.ofValue(fsId)) + .build(); + + GetFileSystem.Output getFsOutput = getFs.run(runContext); + assertThat(getFsOutput.getFileSystem().getFileSystemId()).isEqualTo(fsId); + + // 3. List file systems + ListFileSystems listFs = ListFileSystems.builder() + .endpointOverride(Property.ofValue(endpointOverride)) + .region(Property.ofValue(region)) + .build(); + + ListFileSystems.Output listFsOutput = listFs.run(runContext); + assertThat(listFsOutput.getFileSystems()).anyMatch(f -> fsId.equals(f.getFileSystemId())); + + // 4. Create mount target + CreateMountTarget createMt = CreateMountTarget.builder() + .endpointOverride(Property.ofValue(endpointOverride)) + .region(Property.ofValue(region)) + .fileSystemId(Property.ofValue(fsId)) + .subnetId(Property.ofValue(subnetId)) + .build(); + + CreateMountTarget.Output createMtOutput = createMt.run(runContext); + assertThat(createMtOutput.getMountTargetId()).isNotNull(); + + String mtId = createMtOutput.getMountTargetId(); + + // 5. List mount targets + ListMountTargets listMt = ListMountTargets.builder() + .endpointOverride(Property.ofValue(endpointOverride)) + .region(Property.ofValue(region)) + .fileSystemId(Property.ofValue(fsId)) + .build(); + + ListMountTargets.Output listMtOutput = listMt.run(runContext); + assertThat(listMtOutput.getMountTargets()).anyMatch(m -> mtId.equals(m.getMountTargetId())); + + // 6. Delete mount target + DeleteMountTarget deleteMt = DeleteMountTarget.builder() + .endpointOverride(Property.ofValue(endpointOverride)) + .region(Property.ofValue(region)) + .mountTargetId(Property.ofValue(mtId)) + .build(); + deleteMt.run(runContext); + + // 7. Delete file system + DeleteFileSystem deleteFs = DeleteFileSystem.builder() + .endpointOverride(Property.ofValue(endpointOverride)) + .region(Property.ofValue(region)) + .fileSystemId(Property.ofValue(fsId)) + .build(); + deleteFs.run(runContext); + } +} diff --git a/src/test/java/io/kestra/plugin/aws/s3/files/S3FilesTaskTest.java b/src/test/java/io/kestra/plugin/aws/s3/files/S3FilesTaskTest.java new file mode 100644 index 00000000..afc28fa1 --- /dev/null +++ b/src/test/java/io/kestra/plugin/aws/s3/files/S3FilesTaskTest.java @@ -0,0 +1,347 @@ +package io.kestra.plugin.aws.s3.files; + +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.models.property.Property; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.plugin.aws.s3.files.models.FileSystem; + +import jakarta.inject.Inject; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +@KestraTest +class S3FilesTaskTest { + + @Inject + RunContextFactory runContextFactory; + + private RunContext runContext; + + @BeforeEach + void setup() throws Exception { + runContext = runContextFactory.of(Map.of()); + } + + // ────────────────── + // CreateFileSystem + // ────────────────── + + @Test + void createFileSystem_parsesResponse() throws Exception { + String responseJson = """ + { + "fileSystemId": "fs-abc123", + "fileSystemArn": "arn:aws:s3files:us-east-1:123456789012:file-system/fs-abc123", + "status": "creating", + "creationTime": 1712500000 + } + """; + + CreateFileSystem task = CreateFileSystem.builder() + .region(Property.ofValue("us-east-1")) + .bucket(Property.ofValue("arn:aws:s3:::my-bucket")) + .roleArn(Property.ofValue("arn:aws:iam::123456789012:role/S3FilesRole")) + .build(); + + CreateFileSystem spy = spy(task); + doReturn(new S3FilesService.Response(201, responseJson)) + .when(spy).executeRequest(any(), any(), anyString(), any()); + + CreateFileSystem.Output output = spy.run(runContext); + + assertThat(output.getFileSystemId()).isEqualTo("fs-abc123"); + assertThat(output.getFileSystemArn()).contains("fs-abc123"); + assertThat(output.getStatus()).isEqualTo("creating"); + assertThat(output.getCreationTime()).isEqualTo(1712500000L); + } + + @Test + void createFileSystem_includesOptionalFields() throws Exception { + String responseJson = """ + { + "fileSystemId": "fs-xyz", + "fileSystemArn": "arn:aws:s3files:us-east-1:123456789012:file-system/fs-xyz", + "status": "available", + "creationTime": 1712600000 + } + """; + + CreateFileSystem task = CreateFileSystem.builder() + .region(Property.ofValue("us-east-1")) + .bucket(Property.ofValue("arn:aws:s3:::my-bucket")) + .roleArn(Property.ofValue("arn:aws:iam::123456789012:role/S3FilesRole")) + .prefix(Property.ofValue("data/")) + .kmsKeyId(Property.ofValue("alias/my-key")) + .clientToken(Property.ofValue("unique-token-42")) + .tags(Property.ofValue(Map.of("env", "prod", "team", "data"))) + .build(); + + CreateFileSystem spy = spy(task); + doReturn(new S3FilesService.Response(201, responseJson)) + .when(spy).executeRequest(any(), any(), anyString(), any()); + + CreateFileSystem.Output output = spy.run(runContext); + assertThat(output.getFileSystemId()).isEqualTo("fs-xyz"); + } + + // ──────────────── + // GetFileSystem + // ──────────────── + + @Test + void getFileSystem_returnsFullModel() throws Exception { + String responseJson = """ + { + "fileSystemId": "fs-abc123", + "fileSystemArn": "arn:aws:s3files:us-east-1:123456789012:file-system/fs-abc123", + "status": "available", + "bucket": "arn:aws:s3:::my-bucket", + "roleArn": "arn:aws:iam::123456789012:role/S3FilesRole", + "prefix": "data/", + "creationTime": 1712500000, + "ownerId": "123456789012" + } + """; + + GetFileSystem task = GetFileSystem.builder() + .region(Property.ofValue("us-east-1")) + .fileSystemId(Property.ofValue("fs-abc123")) + .build(); + + GetFileSystem spy = spy(task); + doReturn(new S3FilesService.Response(200, responseJson)) + .when(spy).executeRequest(any(), any(), eq("/filesystems/fs-abc123"), any()); + + GetFileSystem.Output output = spy.run(runContext); + + FileSystem fs = output.getFileSystem(); + assertThat(fs.getFileSystemId()).isEqualTo("fs-abc123"); + assertThat(fs.getStatus()).isEqualTo("available"); + assertThat(fs.getPrefix()).isEqualTo("data/"); + assertThat(fs.getOwnerId()).isEqualTo("123456789012"); + } + + // ──────────────── + // ListFileSystems + // ──────────────── + + @Test + void listFileSystems_parsesListAndNextToken() throws Exception { + String responseJson = """ + { + "fileSystems": [ + {"fileSystemId": "fs-001", "status": "available"}, + {"fileSystemId": "fs-002", "status": "creating"} + ], + "nextToken": "page2token" + } + """; + + ListFileSystems task = ListFileSystems.builder() + .region(Property.ofValue("us-east-1")) + .maxResults(Property.ofValue(10)) + .build(); + + ListFileSystems spy = spy(task); + doReturn(new S3FilesService.Response(200, responseJson)) + .when(spy).executeRequest(any(), any(), anyString(), any()); + + ListFileSystems.Output output = spy.run(runContext); + + assertThat(output.getFileSystems()).hasSize(2); + assertThat(output.getFileSystems().get(0).getFileSystemId()).isEqualTo("fs-001"); + assertThat(output.getFileSystems().get(1).getStatus()).isEqualTo("creating"); + assertThat(output.getNextToken()).isEqualTo("page2token"); + } + + @Test + void listFileSystems_emptyList() throws Exception { + String responseJson = """ + { + "fileSystems": [] + } + """; + + ListFileSystems task = ListFileSystems.builder() + .region(Property.ofValue("us-east-1")) + .build(); + + ListFileSystems spy = spy(task); + doReturn(new S3FilesService.Response(200, responseJson)) + .when(spy).executeRequest(any(), any(), anyString(), any()); + + ListFileSystems.Output output = spy.run(runContext); + + assertThat(output.getFileSystems()).isEmpty(); + assertThat(output.getNextToken()).isNull(); + } + + // ────────────────── + // DeleteFileSystem + // ────────────────── + + @Test + void deleteFileSystem_callsCorrectPath() throws Exception { + DeleteFileSystem task = DeleteFileSystem.builder() + .region(Property.ofValue("us-east-1")) + .fileSystemId(Property.ofValue("fs-abc123")) + .build(); + + DeleteFileSystem spy = spy(task); + doReturn(new S3FilesService.Response(204, "")) + .when(spy).executeRequest(any(), any(), eq("/filesystems/fs-abc123"), any()); + + DeleteFileSystem.Output output = spy.run(runContext); + + assertThat(output).isNotNull(); + verify(spy).executeRequest(any(), any(), eq("/filesystems/fs-abc123"), any()); + } + + // ─────────────────── + // CreateMountTarget + // ─────────────────── + + @Test + void createMountTarget_parsesResponse() throws Exception { + String responseJson = """ + { + "mountTargetId": "mt-abc123", + "fileSystemId": "fs-abc123", + "subnetId": "subnet-abc", + "ipAddress": "10.0.1.42", + "status": "creating" + } + """; + + CreateMountTarget task = CreateMountTarget.builder() + .region(Property.ofValue("us-east-1")) + .fileSystemId(Property.ofValue("fs-abc123")) + .subnetId(Property.ofValue("subnet-abc")) + .build(); + + CreateMountTarget spy = spy(task); + doReturn(new S3FilesService.Response(201, responseJson)) + .when(spy).executeRequest(any(), any(), eq("/filesystems/fs-abc123/mounttargets"), any()); + + CreateMountTarget.Output output = spy.run(runContext); + + assertThat(output.getMountTargetId()).isEqualTo("mt-abc123"); + assertThat(output.getIpAddress()).isEqualTo("10.0.1.42"); + assertThat(output.getStatus()).isEqualTo("creating"); + } + + @Test + void createMountTarget_withSecurityGroups() throws Exception { + String responseJson = """ + { + "mountTargetId": "mt-xyz", + "ipAddress": "10.0.2.11", + "status": "creating" + } + """; + + CreateMountTarget task = CreateMountTarget.builder() + .region(Property.ofValue("us-east-1")) + .fileSystemId(Property.ofValue("fs-abc123")) + .subnetId(Property.ofValue("subnet-abc")) + .ipAddress(Property.ofValue("10.0.2.11")) + .securityGroups(Property.ofValue(List.of("sg-001", "sg-002"))) + .build(); + + CreateMountTarget spy = spy(task); + doReturn(new S3FilesService.Response(201, responseJson)) + .when(spy).executeRequest(any(), any(), anyString(), any()); + + CreateMountTarget.Output output = spy.run(runContext); + assertThat(output.getMountTargetId()).isEqualTo("mt-xyz"); + } + + // ────────────────── + // ListMountTargets + // ────────────────── + + @Test + void listMountTargets_parsesListAndNextToken() throws Exception { + String responseJson = """ + { + "mountTargets": [ + {"mountTargetId": "mt-001", "status": "available", "ipAddress": "10.0.1.5"}, + {"mountTargetId": "mt-002", "status": "creating", "ipAddress": "10.0.1.6"} + ], + "nextToken": "mt-page2" + } + """; + + ListMountTargets task = ListMountTargets.builder() + .region(Property.ofValue("us-east-1")) + .fileSystemId(Property.ofValue("fs-abc123")) + .build(); + + ListMountTargets spy = spy(task); + doReturn(new S3FilesService.Response(200, responseJson)) + .when(spy).executeRequest(any(), any(), anyString(), any()); + + ListMountTargets.Output output = spy.run(runContext); + + assertThat(output.getMountTargets()).hasSize(2); + assertThat(output.getMountTargets().get(0).getMountTargetId()).isEqualTo("mt-001"); + assertThat(output.getMountTargets().get(0).getIpAddress()).isEqualTo("10.0.1.5"); + assertThat(output.getNextToken()).isEqualTo("mt-page2"); + } + + // ─────────────────── + // DeleteMountTarget + // ─────────────────── + + @Test + void deleteMountTarget_callsCorrectPath() throws Exception { + DeleteMountTarget task = DeleteMountTarget.builder() + .region(Property.ofValue("us-east-1")) + .mountTargetId(Property.ofValue("mt-abc123")) + .build(); + + DeleteMountTarget spy = spy(task); + doReturn(new S3FilesService.Response(204, "")) + .when(spy).executeRequest(any(), any(), eq("/mounttargets/mt-abc123"), any()); + + DeleteMountTarget.Output output = spy.run(runContext); + + assertThat(output).isNotNull(); + verify(spy).executeRequest(any(), any(), eq("/mounttargets/mt-abc123"), any()); + } + + // ──────────────── + // Error handling + // ──────────────── + + @Test + void task_throwsOnApiError() throws Exception { + GetFileSystem task = GetFileSystem.builder() + .region(Property.ofValue("us-east-1")) + .fileSystemId(Property.ofValue("fs-nonexistent")) + .build(); + + GetFileSystem spy = spy(task); + doThrow(new RuntimeException("S3 Files API error [HTTP 404]: {\"message\":\"FileSystem not found\"}")) + .when(spy).executeRequest(any(), any(), anyString(), any()); + + assertThatThrownBy(() -> spy.run(runContext)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("404"); + } +} From d592827bf7875484d206bda24eab42a5e669dc1f Mon Sep 17 00:00:00 2001 From: Nancy <9d.24.nancy.sangani@gmail.com> Date: Sat, 18 Apr 2026 19:08:52 +0530 Subject: [PATCH 2/2] feat(s3): add Amazon S3 Files control-plane support (#707) --- .../kestra/plugin/aws/s3/files/AbstractS3Files.java | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/main/java/io/kestra/plugin/aws/s3/files/AbstractS3Files.java b/src/main/java/io/kestra/plugin/aws/s3/files/AbstractS3Files.java index ae553a6a..70e674e0 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/files/AbstractS3Files.java +++ b/src/main/java/io/kestra/plugin/aws/s3/files/AbstractS3Files.java @@ -104,21 +104,9 @@ private AwsCredentialsProvider stsCredentialsProvider(AbstractConnection.AwsClie } } - // ------------------------------------------------------------------------- // HTTP request execution - // ------------------------------------------------------------------------- /** - * Builds and executes a SigV4-signed HTTP request against the S3 Files control-plane - * endpoint. - * - *

- * The signing hostname is always the canonical AWS endpoint - * ({@code s3files.{region}.amazonaws.com}) regardless of {@code endpointOverride}, - * ensuring the SigV4 signature covers the correct {@code Host} header. - * When {@code endpointOverride} is configured the physical TCP connection is made - * to that override URL instead (useful for LocalStack / testing). - *

* * @param runContext Kestra run context used for rendering properties * @param method HTTP method (GET, PUT, POST, DELETE)