From f13f33e6a4c5e4758d8d1f1dc55bc3b568b567c3 Mon Sep 17 00:00:00 2001 From: Malay Dewangan Date: Fri, 10 Apr 2026 11:02:25 +0530 Subject: [PATCH 1/4] fix: plugin v2 compatibility --- .github/workflows/main.yml | 1 + build.gradle | 2 +- gradle.properties | 2 +- .../plugin/aws/cloudwatch/TriggerTest.java | 5 ++-- .../plugin/aws/kinesis/TriggerTest.java | 4 ++-- .../io/kestra/plugin/aws/s3/TriggerTest.java | 24 +++++++++---------- 6 files changed, 20 insertions(+), 18 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 2d9fc98c..d509c2d0 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -46,4 +46,5 @@ jobs: with: skip-test: ${{ github.event.inputs.skip-test == 'true' }} kestra-version: ${{ github.event.inputs.kestra-version }} + java-version: '25' secrets: inherit diff --git a/build.gradle b/build.gradle index 200b419c..c76d77fc 100644 --- a/build.gradle +++ b/build.gradle @@ -22,7 +22,7 @@ repositories { } } -final targetJavaVersion = JavaVersion.VERSION_21 +final targetJavaVersion = JavaVersion.VERSION_25 java { sourceCompatibility = targetJavaVersion diff --git a/gradle.properties b/gradle.properties index 973e8747..37e8d1a0 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ version=2.1.6-SNAPSHOT -kestraVersion=1.2.5 +kestraVersion=2.0.0-SNAPSHOT diff --git a/src/test/java/io/kestra/plugin/aws/cloudwatch/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/cloudwatch/TriggerTest.java index 7be9cd4b..b8932f06 100644 --- a/src/test/java/io/kestra/plugin/aws/cloudwatch/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/cloudwatch/TriggerTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.kestra.core.models.conditions.ConditionContext; @KestraTest class TriggerTest { @@ -59,8 +60,8 @@ void evaluate() throws Exception { Query.class, (mock, context) -> when(mock.run(any())).thenReturn(output) ) ) { - var conditionContext = io.kestra.core.utils.TestsUtils.mockTrigger(runContextFactory, trigger); - var execution = trigger.evaluate(conditionContext.getKey(), conditionContext.getValue()); + Map.Entry conditionContext = io.kestra.core.utils.TestsUtils.mockTrigger(runContextFactory, trigger); + var execution = trigger.evaluate(conditionContext.getKey(), conditionContext.getValue().context()); assertThat(execution.isPresent(), is(true)); assertThat(mockedQuery.constructed(), hasSize(1)); diff --git a/src/test/java/io/kestra/plugin/aws/kinesis/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/kinesis/TriggerTest.java index fae6535d..ef4bd028 100644 --- a/src/test/java/io/kestra/plugin/aws/kinesis/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/kinesis/TriggerTest.java @@ -58,8 +58,8 @@ void evaluate() throws Exception { .endpointOverride(Property.ofValue(localstack.getEndpoint().toString())) .build(); - Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); - Optional execution = trigger.evaluate(context.getKey(), context.getValue()); + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + Optional execution = trigger.evaluate(context.getKey(), context.getValue().context()); assertThat(execution.isPresent(), is(true)); } diff --git a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java index 95d7348a..9e4c33b4 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java @@ -236,9 +236,9 @@ void shouldExecuteOnCreate() throws Exception { upload("trigger/on-create", bucket); - Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); - Optional execution = trigger.evaluate(context.getKey(), context.getValue()); + Optional execution = trigger.evaluate(context.getKey(), context.getValue().context()); assertThat(execution.isPresent(), is(true)); } @@ -263,14 +263,14 @@ void shouldExecuteOnUpdate() throws Exception { .interval(Duration.ofSeconds(10)) .build(); - Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); - trigger.evaluate(context.getKey(), context.getValue()); + trigger.evaluate(context.getKey(), context.getValue().context()); update(key, bucket); Thread.sleep(2000); - Optional execution = trigger.evaluate(context.getKey(), context.getValue()); + Optional execution = trigger.evaluate(context.getKey(), context.getValue().context()); assertThat(execution.isPresent(), is(true)); } @@ -294,15 +294,15 @@ void shouldExecuteOnCreateOrUpdate() throws Exception { var key = upload("trigger/on-create-or-update", bucket); - Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); - Optional createExecution = trigger.evaluate(context.getKey(), context.getValue()); + Optional createExecution = trigger.evaluate(context.getKey(), context.getValue().context()); assertThat("Trigger should fire on CREATE", createExecution.isPresent(), is(true)); update(key, bucket); Thread.sleep(2000); - Optional updateExecution = trigger.evaluate(context.getKey(), context.getValue()); + Optional updateExecution = trigger.evaluate(context.getKey(), context.getValue().context()); assertThat(updateExecution.isPresent(), is(true)); } @@ -331,9 +331,9 @@ void maxFilesExceeded() throws Exception { .interval(Duration.ofSeconds(10)) .build(); - Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); - Optional execution = trigger.evaluate(context.getKey(), context.getValue()); + Optional execution = trigger.evaluate(context.getKey(), context.getValue().context()); // When maxFiles exceeded, List returns first 3 files, so Trigger should fire assertThat(execution.isPresent(), is(true)); } @@ -363,9 +363,9 @@ void maxFilesNotExceeded() throws Exception { .interval(Duration.ofSeconds(10)) .build(); - Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); - Optional execution = trigger.evaluate(context.getKey(), context.getValue()); + Optional execution = trigger.evaluate(context.getKey(), context.getValue().context()); assertThat(execution.isPresent(), is(true)); } } From 2709f28d61970b3395fa63a1ce397479f913bf8f Mon Sep 17 00:00:00 2001 From: Malay Dewangan Date: Sun, 12 Apr 2026 00:55:42 +0530 Subject: [PATCH 2/4] fix: v2 compatibility --- build.gradle | 1 + .../aws/kinesis/RealtimeTriggerTest.java | 19 +++++++------- .../io/kestra/plugin/aws/s3/TriggerTest.java | 26 ++++--------------- .../plugin/aws/sqs/RealtimeTriggerTest.java | 19 ++++++-------- .../io/kestra/plugin/aws/sqs/TriggerTest.java | 19 ++++++-------- src/test/resources/application.yml | 7 +++++ 6 files changed, 38 insertions(+), 53 deletions(-) diff --git a/build.gradle b/build.gradle index 354c497f..d7224dbf 100644 --- a/build.gradle +++ b/build.gradle @@ -127,6 +127,7 @@ dependencies { testImplementation group: "io.kestra", name: "repository-memory" testImplementation group: "io.kestra", name: "scheduler" testImplementation group: "io.kestra", name: "worker" + testImplementation group: "io.kestra", name: "indexer", version: kestraVersion // testcontainers testImplementation "org.testcontainers:testcontainers:2.0.4" diff --git a/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java b/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java index 5afc384b..592e87b3 100644 --- a/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java @@ -6,16 +6,15 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.*; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; -import io.kestra.core.queues.QueueFactoryInterface; -import io.kestra.core.queues.QueueInterface; +import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; import io.kestra.core.runners.FlowListeners; -import io.kestra.core.utils.TestsUtils; import io.kestra.jdbc.runner.JdbcScheduler; import io.kestra.plugin.aws.kinesis.model.Record; import io.kestra.scheduler.AbstractScheduler; @@ -23,8 +22,6 @@ import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; -import jakarta.inject.Named; -import reactor.core.publisher.Flux; import software.amazon.awssdk.services.kinesis.model.*; import static org.hamcrest.MatcherAssert.assertThat; @@ -38,8 +35,7 @@ class RealtimeTriggerTest extends AbstractKinesisTest { FlowListeners flowListeners; @Inject - @Named(QueueFactoryInterface.EXECUTION_NAMED) - QueueInterface executionQueue; + DispatchQueueInterface executionQueue; @Inject LocalFlowRepositoryLoader repositoryLoader; @@ -48,8 +44,12 @@ class RealtimeTriggerTest extends AbstractKinesisTest { void evaluate() throws Exception { String consumerArn = registerConsumer(); CountDownLatch latch = new CountDownLatch(1); + AtomicReference lastExecution = new AtomicReference<>(); - Flux received = TestsUtils.receive(executionQueue, e -> latch.countDown()); + executionQueue.addListener(e -> { + lastExecution.set(e); + latch.countDown(); + }); DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); try (AbstractScheduler scheduler = new JdbcScheduler(applicationContext, flowListeners)) { @@ -103,8 +103,7 @@ void evaluate() throws Exception { boolean done = latch.await(30, TimeUnit.SECONDS); assertThat(done, is(true)); - Execution exec = received.blockLast(); - assertThat(exec.getTrigger().getVariables().get("data"), is("hello")); + assertThat(lastExecution.get().getTrigger().getVariables().get("data"), is("hello")); } } } \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java index 9e4c33b4..900add6d 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java @@ -16,8 +16,7 @@ import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; import io.kestra.core.models.triggers.StatefulTriggerInterface; -import io.kestra.core.queues.QueueFactoryInterface; -import io.kestra.core.queues.QueueInterface; +import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; import io.kestra.core.runners.FlowListeners; import io.kestra.core.utils.IdUtils; @@ -29,8 +28,6 @@ import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; -import jakarta.inject.Named; -import reactor.core.publisher.Flux; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -43,8 +40,7 @@ class TriggerTest extends AbstractTest { private FlowListeners flowListenersService; @Inject - @Named(QueueFactoryInterface.EXECUTION_NAMED) - private QueueInterface executionQueue; + private DispatchQueueInterface executionQueue; @Inject protected LocalFlowRepositoryLoader repositoryLoader; @@ -69,10 +65,7 @@ void deleteAction() throws Exception { AtomicReference last = new AtomicReference<>(); // wait for execution - Flux receive = TestsUtils.receive(executionQueue, executionWithError -> - { - Execution execution = executionWithError.getLeft(); - + executionQueue.addListener(execution -> { if (execution.getFlowId().equals("s3-listen")) { last.set(execution); queueCount.countDown(); @@ -91,7 +84,6 @@ void deleteAction() throws Exception { assertThat(await, is(true)); } finally { worker.shutdown(); - receive.blockLast(); } @SuppressWarnings("unchecked") @@ -115,10 +107,7 @@ void noneAction() throws Exception { // wait for execution CountDownLatch queueCount = new CountDownLatch(1); AtomicReference last = new AtomicReference<>(); - Flux receive = TestsUtils.receive(executionQueue, executionWithError -> - { - Execution execution = executionWithError.getLeft(); - + executionQueue.addListener(execution -> { if (execution.getFlowId().equals("s3-listen-none-action")) { last.set(execution); queueCount.countDown(); @@ -145,7 +134,6 @@ void noneAction() throws Exception { assertThat(await, is(true)); } finally { worker.shutdown(); - receive.blockLast(); } @SuppressWarnings("unchecked") @@ -178,10 +166,7 @@ void forcePathStyleWithSimpleLocalhost() throws Exception { ) { AtomicReference last = new AtomicReference<>(); - Flux receive = TestsUtils.receive(executionQueue, executionWithError -> - { - Execution execution = executionWithError.getLeft(); - + executionQueue.addListener(execution -> { if (execution.getFlowId().equals("s3-listen-localhost-force-path-style")) { last.set(execution); queueCount.countDown(); @@ -200,7 +185,6 @@ void forcePathStyleWithSimpleLocalhost() throws Exception { assertThat("trigger should work with localhost endpoint + forcePathStyle", await, is(true)); } finally { worker.shutdown(); - receive.blockLast(); } @SuppressWarnings("unchecked") diff --git a/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java b/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java index f34a8c85..85c9753b 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java @@ -5,17 +5,16 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; -import io.kestra.core.queues.QueueFactoryInterface; -import io.kestra.core.queues.QueueInterface; +import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; import io.kestra.core.runners.FlowListeners; -import io.kestra.core.utils.TestsUtils; import io.kestra.jdbc.runner.JdbcScheduler; import io.kestra.plugin.aws.sqs.model.Message; import io.kestra.scheduler.AbstractScheduler; @@ -23,8 +22,6 @@ import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; -import jakarta.inject.Named; -import reactor.core.publisher.Flux; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -37,8 +34,7 @@ class RealtimeTriggerTest extends AbstractSqsTest { private FlowListeners flowListenersService; @Inject - @Named(QueueFactoryInterface.EXECUTION_NAMED) - private QueueInterface executionQueue; + private DispatchQueueInterface executionQueue; @Inject protected LocalFlowRepositoryLoader repositoryLoader; @@ -57,10 +53,11 @@ void flow() throws Exception { ) ) { // wait for execution - Flux receive = TestsUtils.receive(executionQueue, execution -> - { + AtomicReference lastExecution = new AtomicReference<>(); + executionQueue.addListener(execution -> { + lastExecution.set(execution); queueCount.countDown(); - assertThat(execution.getLeft().getFlowId(), is("realtime")); + assertThat(execution.getFlowId(), is("realtime")); }); worker.run(); @@ -89,7 +86,7 @@ void flow() throws Exception { boolean await = queueCount.await(1, TimeUnit.MINUTES); assertThat(await, is(true)); - Execution last = receive.blockLast(); + Execution last = lastExecution.get(); assertThat(last.getTrigger().getVariables().size(), is(1)); assertThat(last.getTrigger().getVariables().get("data"), is("Hello World")); } diff --git a/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java index 49439e61..043d0736 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java @@ -5,18 +5,17 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; -import io.kestra.core.queues.QueueFactoryInterface; -import io.kestra.core.queues.QueueInterface; +import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; import io.kestra.core.runners.FlowListeners; import io.kestra.core.runners.RunContextFactory; -import io.kestra.core.utils.TestsUtils; import io.kestra.jdbc.runner.JdbcScheduler; import io.kestra.plugin.aws.sqs.model.Message; import io.kestra.scheduler.AbstractScheduler; @@ -24,8 +23,6 @@ import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; -import jakarta.inject.Named; -import reactor.core.publisher.Flux; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -39,8 +36,7 @@ class TriggerTest extends AbstractSqsTest { private FlowListeners flowListenersService; @Inject - @Named(QueueFactoryInterface.EXECUTION_NAMED) - private QueueInterface executionQueue; + private DispatchQueueInterface executionQueue; @Inject protected LocalFlowRepositoryLoader repositoryLoader; @@ -62,10 +58,11 @@ void flow() throws Exception { ) ) { // wait for execution - Flux receive = TestsUtils.receive(executionQueue, execution -> - { + AtomicReference lastExecution = new AtomicReference<>(); + executionQueue.addListener(execution -> { + lastExecution.set(execution); queueCount.countDown(); - assertThat(execution.getLeft().getFlowId(), is("sqs-listen")); + assertThat(execution.getFlowId(), is("sqs-listen")); }); worker.run(); @@ -95,7 +92,7 @@ void flow() throws Exception { boolean await = queueCount.await(1, TimeUnit.MINUTES); assertThat(await, is(true)); - Execution last = receive.blockLast(); + Execution last = lastExecution.get(); var count = (Integer) last.getTrigger().getVariables().get("count"); var uri = (String) last.getTrigger().getVariables().get("uri"); assertThat(count, is(2)); diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index f1bb4dd5..32ccf5a7 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -9,3 +9,10 @@ kestra: type: memory repository: type: memory + +worker: + controllers: + type: STATIC + static: + endpoints: + - host: localhost From 03a34120a7233b92f5f13ec28af75f52f5f9a978 Mon Sep 17 00:00:00 2001 From: Malay Dewangan Date: Sun, 12 Apr 2026 01:52:42 +0530 Subject: [PATCH 3/4] fix: v2 compatibility --- build.gradle | 1 - 1 file changed, 1 deletion(-) diff --git a/build.gradle b/build.gradle index d7224dbf..354c497f 100644 --- a/build.gradle +++ b/build.gradle @@ -127,7 +127,6 @@ dependencies { testImplementation group: "io.kestra", name: "repository-memory" testImplementation group: "io.kestra", name: "scheduler" testImplementation group: "io.kestra", name: "worker" - testImplementation group: "io.kestra", name: "indexer", version: kestraVersion // testcontainers testImplementation "org.testcontainers:testcontainers:2.0.4" From 27fcd5fada5de32e1aec1e034c69bf78f75c18f9 Mon Sep 17 00:00:00 2001 From: Malay Dewangan Date: Sun, 12 Apr 2026 02:24:36 +0530 Subject: [PATCH 4/4] fix: v2 compatibility --- .../aws/kinesis/RealtimeTriggerTest.java | 24 +-- .../io/kestra/plugin/aws/s3/TriggerTest.java | 177 ++++++------------ .../plugin/aws/sqs/RealtimeTriggerTest.java | 85 +++------ .../io/kestra/plugin/aws/sqs/TriggerTest.java | 91 ++++----- 4 files changed, 122 insertions(+), 255 deletions(-) diff --git a/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java b/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java index 592e87b3..468354dd 100644 --- a/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java @@ -3,37 +3,26 @@ import java.io.File; import java.nio.file.Files; import java.util.List; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.*; +import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; -import io.kestra.core.runners.FlowListeners; -import io.kestra.jdbc.runner.JdbcScheduler; import io.kestra.plugin.aws.kinesis.model.Record; -import io.kestra.scheduler.AbstractScheduler; -import io.kestra.worker.DefaultWorker; - -import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; import software.amazon.awssdk.services.kinesis.model.*; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; +@KestraTest(startRunner = true, startScheduler = true) class RealtimeTriggerTest extends AbstractKinesisTest { - @Inject - ApplicationContext applicationContext; - - @Inject - FlowListeners flowListeners; - @Inject DispatchQueueInterface executionQueue; @@ -51,12 +40,6 @@ void evaluate() throws Exception { latch.countDown(); }); - DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); - try (AbstractScheduler scheduler = new JdbcScheduler(applicationContext, flowListeners)) { - - worker.run(); - scheduler.run(); - String yaml = """ id: realtime namespace: company.team @@ -104,6 +87,5 @@ void evaluate() throws Exception { assertThat(done, is(true)); assertThat(lastExecution.get().getTrigger().getVariables().get("data"), is("hello")); - } } -} \ No newline at end of file +} diff --git a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java index 900add6d..198c3441 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java @@ -4,7 +4,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -12,33 +11,23 @@ import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; +import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.conditions.ConditionContext; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; import io.kestra.core.models.triggers.StatefulTriggerInterface; import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; -import io.kestra.core.runners.FlowListeners; import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.TestsUtils; -import io.kestra.jdbc.runner.JdbcScheduler; import io.kestra.plugin.aws.s3.models.S3Object; -import io.kestra.scheduler.AbstractScheduler; -import io.kestra.worker.DefaultWorker; - -import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +@KestraTest(startRunner = true, startScheduler = true) class TriggerTest extends AbstractTest { - @Inject - private ApplicationContext applicationContext; - - @Inject - private FlowListeners flowListenersService; - @Inject private DispatchQueueInterface executionQueue; @@ -51,51 +40,33 @@ void deleteAction() throws Exception { this.createBucket(bucket); List listTask = list().bucket(Property.ofValue(bucket)).build(); - // mock flow listeners CountDownLatch queueCount = new CountDownLatch(1); + AtomicReference last = new AtomicReference<>(); - // scheduler - DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); - try ( - AbstractScheduler scheduler = new JdbcScheduler( - this.applicationContext, - this.flowListenersService - ) - ) { - AtomicReference last = new AtomicReference<>(); - - // wait for execution - executionQueue.addListener(execution -> { - if (execution.getFlowId().equals("s3-listen")) { - last.set(execution); - queueCount.countDown(); - } - }); - - upload("trigger/s3", bucket); - upload("trigger/s3", bucket); - - worker.run(); - scheduler.run(); - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen.yaml"))); - - boolean await = queueCount.await(10, TimeUnit.SECONDS); - try { - assertThat(await, is(true)); - } finally { - worker.shutdown(); + executionQueue.addListener(execution -> { + if (execution.getFlowId().equals("s3-listen")) { + last.set(execution); + queueCount.countDown(); } + }); - @SuppressWarnings("unchecked") - java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); + upload("trigger/s3", bucket); + upload("trigger/s3", bucket); - assertThat(trigger.size(), is(2)); + repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen.yaml"))); - int remainingFilesOnBucket = listTask.run(runContext(listTask)) - .getObjects() - .size(); - assertThat(remainingFilesOnBucket, is(0)); - } + boolean await = queueCount.await(10, TimeUnit.SECONDS); + assertThat(await, is(true)); + + @SuppressWarnings("unchecked") + java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); + + assertThat(trigger.size(), is(2)); + + int remainingFilesOnBucket = listTask.run(runContext(listTask)) + .getObjects() + .size(); + assertThat(remainingFilesOnBucket, is(0)); } @Test @@ -104,7 +75,6 @@ void noneAction() throws Exception { this.createBucket(bucket); List listTask = list().bucket(Property.ofValue(bucket)).build(); - // wait for execution CountDownLatch queueCount = new CountDownLatch(1); AtomicReference last = new AtomicReference<>(); executionQueue.addListener(execution -> { @@ -114,38 +84,23 @@ void noneAction() throws Exception { } }); - // scheduler - DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); - try ( - AbstractScheduler scheduler = new JdbcScheduler( - this.applicationContext, - this.flowListenersService - ) - ) { - upload("trigger/s3", bucket); - upload("trigger/s3", bucket); - - worker.run(); - scheduler.run(); - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-none-action.yaml"))); - - boolean await = queueCount.await(10, TimeUnit.SECONDS); - try { - assertThat(await, is(true)); - } finally { - worker.shutdown(); - } + upload("trigger/s3", bucket); + upload("trigger/s3", bucket); - @SuppressWarnings("unchecked") - java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); + repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-none-action.yaml"))); - assertThat(trigger.size(), is(2)); + boolean await = queueCount.await(10, TimeUnit.SECONDS); + assertThat(await, is(true)); - int remainingFilesOnBucket = listTask.run(runContext(listTask)) - .getObjects() - .size(); - assertThat(remainingFilesOnBucket, is(2)); - } + @SuppressWarnings("unchecked") + java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); + + assertThat(trigger.size(), is(2)); + + int remainingFilesOnBucket = listTask.run(runContext(listTask)) + .getObjects() + .size(); + assertThat(remainingFilesOnBucket, is(2)); } @Test @@ -155,48 +110,32 @@ void forcePathStyleWithSimpleLocalhost() throws Exception { List listTask = list().bucket(Property.ofValue(bucket)).build(); CountDownLatch queueCount = new CountDownLatch(1); + AtomicReference last = new AtomicReference<>(); - // scheduler - DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); - try ( - AbstractScheduler scheduler = new JdbcScheduler( - this.applicationContext, - this.flowListenersService - ) - ) { - AtomicReference last = new AtomicReference<>(); - - executionQueue.addListener(execution -> { - if (execution.getFlowId().equals("s3-listen-localhost-force-path-style")) { - last.set(execution); - queueCount.countDown(); - } - }); - - upload("trigger/s3", bucket); - upload("trigger/s3", bucket); - - worker.run(); - scheduler.run(); - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-localhost-force-path-style.yaml"))); - - boolean await = queueCount.await(15, TimeUnit.SECONDS); - try { - assertThat("trigger should work with localhost endpoint + forcePathStyle", await, is(true)); - } finally { - worker.shutdown(); + executionQueue.addListener(execution -> { + if (execution.getFlowId().equals("s3-listen-localhost-force-path-style")) { + last.set(execution); + queueCount.countDown(); } + }); - @SuppressWarnings("unchecked") - java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); + upload("trigger/s3", bucket); + upload("trigger/s3", bucket); - assertThat(trigger.size(), is(2)); + repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-localhost-force-path-style.yaml"))); - int remainingFilesOnBucket = listTask.run(runContext(listTask)) - .getObjects() - .size(); - assertThat(remainingFilesOnBucket, is(0)); - } + boolean await = queueCount.await(15, TimeUnit.SECONDS); + assertThat("trigger should work with localhost endpoint + forcePathStyle", await, is(true)); + + @SuppressWarnings("unchecked") + java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); + + assertThat(trigger.size(), is(2)); + + int remainingFilesOnBucket = listTask.run(runContext(listTask)) + .getObjects() + .size(); + assertThat(remainingFilesOnBucket, is(0)); } @Test diff --git a/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java b/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java index 85c9753b..70123a24 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java @@ -2,7 +2,6 @@ import java.util.List; import java.util.Objects; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -10,29 +9,19 @@ import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; +import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; -import io.kestra.core.runners.FlowListeners; -import io.kestra.jdbc.runner.JdbcScheduler; import io.kestra.plugin.aws.sqs.model.Message; -import io.kestra.scheduler.AbstractScheduler; -import io.kestra.worker.DefaultWorker; - -import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +@KestraTest(startRunner = true, startScheduler = true) class RealtimeTriggerTest extends AbstractSqsTest { - @Inject - private ApplicationContext applicationContext; - - @Inject - private FlowListeners flowListenersService; - @Inject private DispatchQueueInterface executionQueue; @@ -41,55 +30,39 @@ class RealtimeTriggerTest extends AbstractSqsTest { @Test void flow() throws Exception { - // mock flow listeners CountDownLatch queueCount = new CountDownLatch(1); - - // scheduler - DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); - try ( - AbstractScheduler scheduler = new JdbcScheduler( - this.applicationContext, - this.flowListenersService - ) - ) { - // wait for execution - AtomicReference lastExecution = new AtomicReference<>(); - executionQueue.addListener(execution -> { - lastExecution.set(execution); - queueCount.countDown(); - assertThat(execution.getFlowId(), is("realtime")); - }); - - worker.run(); - scheduler.run(); - - repositoryLoader.load(Objects.requireNonNull(RealtimeTriggerTest.class.getClassLoader().getResource("flows/sqs/realtime.yaml"))); - - // publish two messages to trigger the flow - Publish task = Publish.builder() - .endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) - .queueUrl(Property.ofValue(queueUrl())) - .region(Property.ofValue(localstack.getRegion())) - .accessKeyId(Property.ofValue(localstack.getAccessKey())) - .secretKeyId(Property.ofValue(localstack.getSecretKey())) - .from( - List.of( - Message.builder().data("Hello World").build() - ) + AtomicReference lastExecution = new AtomicReference<>(); + executionQueue.addListener(execution -> { + lastExecution.set(execution); + queueCount.countDown(); + assertThat(execution.getFlowId(), is("realtime")); + }); + + repositoryLoader.load(Objects.requireNonNull(RealtimeTriggerTest.class.getClassLoader().getResource("flows/sqs/realtime.yaml"))); + + Publish task = Publish.builder() + .endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) + .queueUrl(Property.ofValue(queueUrl())) + .region(Property.ofValue(localstack.getRegion())) + .accessKeyId(Property.ofValue(localstack.getAccessKey())) + .secretKeyId(Property.ofValue(localstack.getSecretKey())) + .from( + List.of( + Message.builder().data("Hello World").build() ) - .build(); + ) + .build(); - var runContext = runContextFactory.of(); + var runContext = runContextFactory.of(); - task.run(runContext); + task.run(runContext); - boolean await = queueCount.await(1, TimeUnit.MINUTES); - assertThat(await, is(true)); + boolean await = queueCount.await(1, TimeUnit.MINUTES); + assertThat(await, is(true)); - Execution last = lastExecution.get(); - assertThat(last.getTrigger().getVariables().size(), is(1)); - assertThat(last.getTrigger().getVariables().get("data"), is("Hello World")); - } + Execution last = lastExecution.get(); + assertThat(last.getTrigger().getVariables().size(), is(1)); + assertThat(last.getTrigger().getVariables().get("data"), is("Hello World")); } } diff --git a/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java index 043d0736..2e260d4e 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java @@ -2,7 +2,6 @@ import java.util.List; import java.util.Objects; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -10,31 +9,21 @@ import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; +import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; -import io.kestra.core.runners.FlowListeners; import io.kestra.core.runners.RunContextFactory; -import io.kestra.jdbc.runner.JdbcScheduler; import io.kestra.plugin.aws.sqs.model.Message; -import io.kestra.scheduler.AbstractScheduler; -import io.kestra.worker.DefaultWorker; - -import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; +@KestraTest(startRunner = true, startScheduler = true) class TriggerTest extends AbstractSqsTest { - @Inject - private ApplicationContext applicationContext; - - @Inject - private FlowListeners flowListenersService; - @Inject private DispatchQueueInterface executionQueue; @@ -46,58 +35,42 @@ class TriggerTest extends AbstractSqsTest { @Test void flow() throws Exception { - // mock flow listeners CountDownLatch queueCount = new CountDownLatch(1); - - // scheduler - DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); - try ( - AbstractScheduler scheduler = new JdbcScheduler( - this.applicationContext, - this.flowListenersService - ) - ) { - // wait for execution - AtomicReference lastExecution = new AtomicReference<>(); - executionQueue.addListener(execution -> { - lastExecution.set(execution); - queueCount.countDown(); - assertThat(execution.getFlowId(), is("sqs-listen")); - }); - - worker.run(); - scheduler.run(); - - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/sqs/sqs-listen.yaml"))); - - // publish two messages to trigger the flow - Publish task = Publish.builder() - .endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) - .queueUrl(Property.ofValue(queueUrl())) - .region(Property.ofValue(localstack.getRegion())) - .accessKeyId(Property.ofValue(localstack.getAccessKey())) - .secretKeyId(Property.ofValue(localstack.getSecretKey())) - .from( - List.of( - Message.builder().data("Hello World").build(), - Message.builder().data("Hello Kestra").delaySeconds(5).build() - ) + AtomicReference lastExecution = new AtomicReference<>(); + executionQueue.addListener(execution -> { + lastExecution.set(execution); + queueCount.countDown(); + assertThat(execution.getFlowId(), is("sqs-listen")); + }); + + repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/sqs/sqs-listen.yaml"))); + + Publish task = Publish.builder() + .endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) + .queueUrl(Property.ofValue(queueUrl())) + .region(Property.ofValue(localstack.getRegion())) + .accessKeyId(Property.ofValue(localstack.getAccessKey())) + .secretKeyId(Property.ofValue(localstack.getSecretKey())) + .from( + List.of( + Message.builder().data("Hello World").build(), + Message.builder().data("Hello Kestra").delaySeconds(5).build() ) - .build(); + ) + .build(); - var runContext = runContextFactory.of(); + var runContext = runContextFactory.of(); - task.run(runContext); + task.run(runContext); - boolean await = queueCount.await(1, TimeUnit.MINUTES); - assertThat(await, is(true)); + boolean await = queueCount.await(1, TimeUnit.MINUTES); + assertThat(await, is(true)); - Execution last = lastExecution.get(); - var count = (Integer) last.getTrigger().getVariables().get("count"); - var uri = (String) last.getTrigger().getVariables().get("uri"); - assertThat(count, is(2)); - assertThat(uri, is(notNullValue())); - } + Execution last = lastExecution.get(); + var count = (Integer) last.getTrigger().getVariables().get("count"); + var uri = (String) last.getTrigger().getVariables().get("uri"); + assertThat(count, is(2)); + assertThat(uri, is(notNullValue())); } }