Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 194 additions & 0 deletions src/main/java/io/kestra/plugin/aws/s3/files/AbstractS3Files.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package io.kestra.plugin.aws.s3.files;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;

import io.kestra.core.runners.RunContext;
import io.kestra.plugin.aws.AbstractConnection;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.auth.signer.params.Aws4SignerParams;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.HttpExecuteResponse;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.StsClientBuilder;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.awssdk.services.sts.model.Credentials;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public abstract class AbstractS3Files extends AbstractConnection {

static final String S3_FILES_SERVICE = "s3files";

protected AwsCredentialsProvider credentialsProvider(RunContext runContext) throws Exception {
AbstractConnection.AwsClientConfig cfg = this.awsClientConfig(runContext);

if (cfg.stsRoleArn() != null) {
return stsCredentialsProvider(cfg);
}

if (cfg.accessKeyId() != null && cfg.secretKeyId() != null) {
if (cfg.sessionToken() != null) {
return StaticCredentialsProvider.create(
AwsSessionCredentials.create(cfg.accessKeyId(), cfg.secretKeyId(), cfg.sessionToken())
);
}
return StaticCredentialsProvider.create(
AwsBasicCredentials.create(cfg.accessKeyId(), cfg.secretKeyId())
);
}

return DefaultCredentialsProvider.create();
}

private AwsCredentialsProvider stsCredentialsProvider(AbstractConnection.AwsClientConfig cfg) {
StsClientBuilder stsBuilder = StsClient.builder()
.region(Region.of(cfg.region()));

Comment on lines +66 to +69
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stsCredentialsProvider performs a one-time assumeRole call and returns a StaticCredentialsProvider. This provider won’t refresh, and because credentialsProvider(runContext) is invoked during every request signing, the code can end up calling STS repeatedly (extra latency / risk of STS throttling) or using expired credentials for long-running executions. Prefer the repo’s existing ConnectionUtils.stsAssumeRoleCredentialsProvider(...) (refreshing) or otherwise cache a refreshable provider for the task execution lifecycle.

Copilot uses AI. Check for mistakes.
if (cfg.stsEndpointOverride() != null) {
stsBuilder.endpointOverride(URI.create(cfg.stsEndpointOverride()));
}

if (cfg.accessKeyId() != null && cfg.secretKeyId() != null) {
stsBuilder.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(cfg.accessKeyId(), cfg.secretKeyId())
)
);
Comment on lines +75 to +79
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When building the STS client credentials for AssumeRole, the code ignores sessionToken and always uses AwsBasicCredentials. If users provide temporary session credentials (access key + secret + session token) along with stsRoleArn, the assume-role call will fail. Use AwsSessionCredentials when cfg.sessionToken() is set (or reuse ConnectionUtils.staticCredentialsProvider(cfg) logic).

Suggested change
stsBuilder.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(cfg.accessKeyId(), cfg.secretKeyId())
)
);
if (cfg.sessionToken() != null) {
stsBuilder.credentialsProvider(
StaticCredentialsProvider.create(
AwsSessionCredentials.create(cfg.accessKeyId(), cfg.secretKeyId(), cfg.sessionToken())
)
);
} else {
stsBuilder.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(cfg.accessKeyId(), cfg.secretKeyId())
)
);
}

Copilot uses AI. Check for mistakes.
}

try (StsClient stsClient = stsBuilder.build()) {
AssumeRoleRequest.Builder req = AssumeRoleRequest.builder()
.roleArn(cfg.stsRoleArn())
.roleSessionName(
cfg.stsRoleSessionName() != null
? cfg.stsRoleSessionName()
: "kestra-s3files-session"
)
.durationSeconds(
(int) (cfg.stsRoleSessionDuration() != null
? cfg.stsRoleSessionDuration().getSeconds()
: Duration.ofMinutes(15).getSeconds())
);

if (cfg.stsRoleExternalId() != null) {
req.externalId(cfg.stsRoleExternalId());
}

Credentials c = stsClient.assumeRole(req.build()).credentials();
return StaticCredentialsProvider.create(
AwsSessionCredentials.create(c.accessKeyId(), c.secretAccessKey(), c.sessionToken())
);
}
}

// HTTP request execution

/**
*
* @param runContext Kestra run context used for rendering properties
* @param method HTTP method (GET, PUT, POST, DELETE)
* @param path API path, e.g. {@code "/file-systems"} or
* {@code "/file-systems/fs-abc/mount-targets"}
* @param body request body bytes; {@code null} for bodyless methods (GET/DELETE)
* @return {@link S3FilesService.Response} containing the HTTP status and raw JSON body
* @throws RuntimeException if the API returns a non-2xx status code
*/
protected S3FilesService.Response executeRequest(
RunContext runContext,
SdkHttpMethod method,
String path,
byte[] body) throws Exception {

AbstractConnection.AwsClientConfig cfg = this.awsClientConfig(runContext);

String regionStr = cfg.region();
if (regionStr == null) {
throw new IllegalArgumentException("region is required for S3 Files tasks");
}
Region region = Region.of(regionStr);

String serviceHost = S3_FILES_SERVICE + "." + regionStr + ".amazonaws.com";

String baseUrl = cfg.endpointOverride() != null
? cfg.endpointOverride().replaceAll("/$", "")
: "https://" + serviceHost;

URI requestUri = URI.create(baseUrl + path);

byte[] bodyBytes = (body != null) ? body : new byte[0];

SdkHttpFullRequest.Builder requestBuilder = SdkHttpFullRequest.builder()
.method(method)
.uri(requestUri)
.putHeader("Content-Type", "application/json")
.putHeader("Host", serviceHost);
Comment on lines +146 to +147
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executeRequest always sets the Host header to s3files.{region}.amazonaws.com even when endpointOverride is used. This will break endpoint overrides (e.g., LocalStack) because the HTTP Host header and the SigV4 canonical host won’t match the actual request URI. Use the request URI host (and port if present) when overriding the endpoint, or avoid setting Host explicitly and let the SDK derive it from the URI before signing.

Suggested change
.putHeader("Content-Type", "application/json")
.putHeader("Host", serviceHost);
.putHeader("Content-Type", "application/json");

Copilot uses AI. Check for mistakes.

if (bodyBytes.length > 0) {
requestBuilder.contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes));
}

SdkHttpFullRequest unsignedRequest = requestBuilder.build();

Aws4SignerParams signerParams = Aws4SignerParams.builder()
.awsCredentials(credentialsProvider(runContext).resolveCredentials())
.signingName(S3_FILES_SERVICE)
.signingRegion(region)
.build();

SdkHttpFullRequest signedRequest = Aws4Signer.create().sign(unsignedRequest, signerParams);

try (ApacheHttpClient httpClient = (ApacheHttpClient) ApacheHttpClient.create()) {
ExecutableHttpRequest executableRequest = httpClient.prepareRequest(
HttpExecuteRequest.builder()
.request(signedRequest)
Comment on lines +163 to +166
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executeRequest creates and closes a new ApacheHttpClient instance for every API call. Even if most tasks call the API once, this adds avoidable overhead and makes it harder to tune connection pooling/timeouts. Consider creating a single reusable SdkHttpClient (or ApacheHttpClient.builder() with timeouts) and reusing it across requests within the task execution.

Copilot uses AI. Check for mistakes.
.contentStreamProvider(
bodyBytes.length > 0
? () -> new ByteArrayInputStream(bodyBytes)
: null
)
.build()
);

HttpExecuteResponse response = executableRequest.call();
int statusCode = response.httpResponse().statusCode();

String responseBody = "";
if (response.responseBody().isPresent()) {
try (InputStream is = response.responseBody().get()) {
responseBody = new String(is.readAllBytes(), StandardCharsets.UTF_8);
}
}

if (statusCode < 200 || statusCode >= 300) {
throw new RuntimeException(
"S3 Files API error [HTTP " + statusCode + "]: " + responseBody
);
}

return new S3FilesService.Response(statusCode, responseBody);
}
}
}
147 changes: 147 additions & 0 deletions src/main/java/io/kestra/plugin/aws/s3/files/CreateFileSystem.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package io.kestra.plugin.aws.s3.files;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.aws.s3.files.models.FileSystem;

import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import software.amazon.awssdk.http.SdkHttpMethod;

/**
* Creates an Amazon S3 Files file system backed by an S3 bucket.
*/
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Plugin(
examples = {
@Example(
full = true,
code = """
id: aws_s3_files_create_filesystem
namespace: company.team

tasks:
- id: create_fs
type: io.kestra.plugin.aws.s3.files.CreateFileSystem
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: "us-east-1"
bucket: "arn:aws:s3:::my-bucket"
roleArn: "arn:aws:iam::123456789012:role/S3FilesRole"
"""
)
}
)
@Schema(
title = "Create an Amazon S3 Files file system",
description = "Creates an S3 Files file system resource that makes an S3 bucket mountable as an NFS v4.1+ file system."
)
public class CreateFileSystem extends AbstractS3Files implements RunnableTask<CreateFileSystem.Output> {
Comment on lines +53 to +57
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description states “8 new tasks”, but the added io.kestra.plugin.aws.s3.files package contains 7 task classes (Create/Get/List/DeleteFileSystem and Create/List/DeleteMountTarget). Please update the PR description (or add the missing task) to avoid confusion for reviewers and release notes.

Copilot uses AI. Check for mistakes.

@Schema(title = "S3 bucket ARN", description = "ARN of the S3 bucket to back the file system (e.g. arn:aws:s3:::my-bucket).")
@NotNull
private Property<String> bucket;

@Schema(title = "IAM role ARN", description = "ARN of the IAM role that grants the S3 Files service access to the bucket.")
@NotNull
private Property<String> roleArn;

@Schema(title = "Key prefix", description = "Optional prefix scoping the file system to a sub-path of the bucket.")
private Property<String> prefix;

@Schema(title = "KMS key ID", description = "ARN, key ID, or alias of the KMS key for encryption. When omitted the service-owned key is used.")
private Property<String> kmsKeyId;

@Schema(title = "Client token", description = "Idempotency token (up to 64 ASCII characters). Automatically generated when omitted.")
private Property<String> clientToken;

@Schema(title = "Tags", description = "Key-value tags to apply to the file system.")
@PluginProperty(dynamic = false)
private Property<Map<String, String>> tags;

@Schema(title = "Accept bucket warning", description = "Set to true to acknowledge any bucket configuration warnings and proceed with creation.")
@Builder.Default
private Property<Boolean> acceptBucketWarning = Property.ofValue(false);

@Override
public Output run(RunContext runContext) throws Exception {
Map<String, Object> body = new HashMap<>();
body.put("bucket", runContext.render(bucket).as(String.class).orElseThrow());
body.put("roleArn", runContext.render(roleArn).as(String.class).orElseThrow());

runContext.render(prefix).as(String.class).ifPresent(v -> body.put("prefix", v));
runContext.render(kmsKeyId).as(String.class).ifPresent(v -> body.put("kmsKeyId", v));
runContext.render(clientToken).as(String.class).ifPresent(v -> body.put("clientToken", v));
runContext.render(acceptBucketWarning).as(Boolean.class)
.ifPresent(v ->
{
if (Boolean.TRUE.equals(v))
body.put("acceptBucketWarning", true);
});

if (tags != null) {
Map<String, String> tagMap = runContext.render(tags).asMap(String.class, String.class);
if (!tagMap.isEmpty()) {
List<Map<String, String>> tagList = tagMap.entrySet().stream()
.map(e -> Map.of("key", e.getKey(), "value", e.getValue()))
.toList();
body.put("tags", tagList);
}
}

S3FilesService.Response response = executeRequest(
runContext,
SdkHttpMethod.POST,
"/filesystems",
S3FilesService.toJson(body)
);

FileSystem fs = S3FilesService.fromJson(response.body(), FileSystem.class);

runContext.logger().info("Created S3 Files file system: {} (status={})", fs.getFileSystemId(), fs.getStatus());

return Output.builder()
.fileSystemId(fs.getFileSystemId())
.fileSystemArn(fs.getFileSystemArn())
.status(fs.getStatus())
.creationTime(fs.getCreationTime())
.build();
}

@SuperBuilder
@Getter
@NoArgsConstructor
@Schema(title = "Output of the CreateFileSystem task")
public static class Output implements io.kestra.core.models.tasks.Output {

@Schema(title = "File system ID")
private String fileSystemId;

@Schema(title = "File system ARN")
private String fileSystemArn;

@Schema(title = "Lifecycle status")
private String status;

@Schema(title = "Creation time (Unix epoch seconds)")
private Long creationTime;
}
}
Loading
Loading