-
Notifications
You must be signed in to change notification settings - Fork 0
feat(python): introduce StriptTrigger and CommandsTrigger #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,241 @@ | ||
| package io.kestra.plugin.scripts.python; | ||
|
|
||
| import io.kestra.core.models.annotations.Example; | ||
| import io.kestra.core.models.annotations.Plugin; | ||
| import io.kestra.core.models.conditions.ConditionContext; | ||
| import io.kestra.core.models.executions.Execution; | ||
| import io.kestra.core.models.property.Property; | ||
| import io.kestra.core.models.tasks.RunnableTaskException; | ||
| import io.kestra.core.models.tasks.runners.TaskException; | ||
| import io.kestra.core.models.triggers.*; | ||
| import io.kestra.core.runners.RunContext; | ||
| import io.kestra.plugin.core.runner.Process; | ||
| import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput; | ||
| import io.swagger.v3.oas.annotations.media.Schema; | ||
| import jakarta.validation.constraints.NotNull; | ||
| import lombok.*; | ||
| import lombok.experimental.SuperBuilder; | ||
|
|
||
| import java.time.Duration; | ||
| import java.time.Instant; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.regex.Matcher; | ||
| import java.util.regex.Pattern; | ||
|
|
||
| @SuperBuilder | ||
| @ToString | ||
| @EqualsAndHashCode | ||
| @Getter | ||
| @NoArgsConstructor | ||
| @Schema(title = "Trigger a flow when Python commands match a condition.") | ||
| @Plugin( | ||
| examples = { | ||
| @Example( | ||
| title = "Trigger when commands fail with an implicit error (exit 1).", | ||
| full = true, | ||
| code = """ | ||
| id: commands_trigger | ||
| namespace: company.team | ||
|
|
||
| triggers: | ||
| - id: commands_failure | ||
| type: io.kestra.plugin.scripts.python.CommandsTrigger | ||
| interval: PT10S | ||
| exitCondition: "exit 1" | ||
| edge: true | ||
| containerImage: ubuntu | ||
| commands: | ||
| - python -c "import sys; sys.exit(1)" | ||
|
|
||
| tasks: | ||
| - id: log | ||
| type: io.kestra.plugin.core.log.Log | ||
| message: "Triggered with exitCode={{ trigger.exitCode }} (condition={{ trigger.condition }})" | ||
| """ | ||
| ) | ||
| } | ||
| ) | ||
| public class CommandsTrigger extends AbstractTrigger | ||
| implements PollingTriggerInterface, TriggerOutput<CommandsTrigger.Output> { | ||
|
|
||
| private static final String DEFAULT_IMAGE = "ubuntu"; | ||
|
|
||
| @Schema( | ||
| title = "Docker image used to execute the commands.", | ||
| description = """ | ||
| Container image used by the underlying Commands task to run shell commands. | ||
| Defaults to 'ubuntu'. | ||
| """ | ||
| ) | ||
| @Builder.Default | ||
| protected Property<String> containerImage = Property.ofValue(DEFAULT_IMAGE); | ||
|
Comment on lines
+64
to
+74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Default container image lacks Python. Same issue as 🤖 Prompt for AI Agents |
||
|
|
||
| @Schema( | ||
| title = "Python commands to execute.", | ||
| description = "Commands executed on each poll (same semantics as the Python Commands task)." | ||
| ) | ||
| @NotNull | ||
| protected Property<List<String>> commands; | ||
|
|
||
| @Schema( | ||
| title = "Condition to match.", | ||
| description = """ | ||
| Condition evaluated after each commands execution. The trigger emits an event only when this condition matches. | ||
|
|
||
| Supported forms: | ||
| - 'exit N' (example: 'exit 1'): matches when the process exit code equals N. | ||
| - Any other string: treated as a regex (or substring if regex is invalid) matched against: | ||
| - the task 'vars' (when commands emit ::{"outputs":...}::), | ||
| - and error logs when the task fails (TaskException). | ||
| """ | ||
| ) | ||
| @NotNull | ||
| protected Property<String> exitCondition; | ||
|
|
||
| @Schema( | ||
| title = "Check interval", | ||
| description = "Interval between polling evaluations." | ||
| ) | ||
| @Builder.Default | ||
| private final Duration interval = Duration.ofSeconds(60); | ||
|
|
||
| @Schema( | ||
| title = "Edge trigger mode.", | ||
| description = """ | ||
| If true, the trigger emits only on a transition from 'not matching' to 'matching' (anti-spam). | ||
| If false, the trigger emits on every poll where the condition matches. | ||
| """ | ||
| ) | ||
| @Builder.Default | ||
| protected Property<Boolean> edge = Property.ofValue(true); | ||
|
|
||
| @Builder.Default | ||
| @Getter(AccessLevel.NONE) | ||
| private final AtomicBoolean lastMatched = new AtomicBoolean(false); | ||
|
|
||
| @Override | ||
| public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception { | ||
| RunContext runContext = conditionContext.getRunContext(); | ||
| boolean rEdge = runContext.render(this.edge).as(Boolean.class).orElse(true); | ||
|
|
||
| Output out = runOnce(runContext); | ||
| boolean matched = matchesCondition(out); | ||
|
|
||
| boolean emit = rEdge | ||
| ? (!lastMatched.getAndSet(matched) && matched) | ||
| : matched; | ||
|
|
||
| if (!emit) { | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| return Optional.of(TriggerService.generateExecution(this, conditionContext, context, out)); | ||
| } | ||
|
|
||
| private Output runOnce(RunContext runContext) throws Exception { | ||
| Commands task = Commands.builder() | ||
| .taskRunner(Process.instance()) | ||
| .containerImage(this.containerImage) | ||
| .commands(this.commands) | ||
| .build(); | ||
|
|
||
| String renderedCondition = runContext.render(this.exitCondition).as(String.class).orElse(""); | ||
|
|
||
| try { | ||
| ScriptOutput taskOutput = task.run(runContext); | ||
| Integer exitCode = safeExitCode(taskOutput); | ||
| Map<String, Object> vars = safeVars(taskOutput); | ||
|
|
||
| return new Output(Instant.now(), renderedCondition, exitCode, vars, null); | ||
| } catch (RunnableTaskException e) { | ||
| ExtractedFailure failure = extractFailure(e); | ||
| return new Output(Instant.now(), renderedCondition, failure.exitCode, null, failure.logs); | ||
| } | ||
| } | ||
|
|
||
| private boolean matchesCondition(Output out) { | ||
| String cond = out.getCondition() == null ? "" : out.getCondition().trim(); | ||
|
|
||
| Matcher exitMatcher = Pattern.compile("^\\s*exit\\s+(\\d+)\\s*$", Pattern.CASE_INSENSITIVE).matcher(cond); | ||
| if (exitMatcher.matches()) { | ||
| int expected = Integer.parseInt(exitMatcher.group(1)); | ||
| return out.getExitCode() != null && out.getExitCode() == expected; | ||
| } | ||
|
Comment on lines
+162
to
+166
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Consider caching the exit-code pattern. Same as 🤖 Prompt for AI Agents |
||
|
|
||
| String haystack = buildHaystack(out); | ||
| if (haystack.isEmpty() || cond.isEmpty()) { | ||
| return false; | ||
| } | ||
|
|
||
| try { | ||
| return Pattern.compile(cond).matcher(haystack).find(); | ||
| } catch (Exception invalidRegex) { | ||
| return haystack.contains(cond); | ||
| } | ||
| } | ||
|
|
||
| private String buildHaystack(Output out) { | ||
| StringBuilder sb = new StringBuilder(); | ||
|
|
||
| if (out.getVars() != null && !out.getVars().isEmpty()) { | ||
| sb.append(out.getVars()).append("\n"); | ||
| } | ||
| if (out.getLogs() != null && !out.getLogs().isBlank()) { | ||
| sb.append(out.getLogs()).append("\n"); | ||
| } | ||
|
|
||
| return sb.toString(); | ||
| } | ||
|
|
||
| private Integer safeExitCode(ScriptOutput taskOutput) { | ||
| try { | ||
| return taskOutput.getExitCode(); | ||
| } catch (Exception ignored) { | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| private Map<String, Object> safeVars(ScriptOutput taskOutput) { | ||
| try { | ||
| return taskOutput.getVars(); | ||
| } catch (Exception ignored) { | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| private record ExtractedFailure(Integer exitCode, String logs) { | ||
| } | ||
|
|
||
| private ExtractedFailure extractFailure(RunnableTaskException e) { | ||
| Integer exitCode = null; | ||
| String logs = null; | ||
|
|
||
| Throwable cur = e.getCause(); | ||
| while (cur != null) { | ||
| if (cur instanceof TaskException te) { | ||
| exitCode = te.getExitCode(); | ||
| try { | ||
| logs = te.getLogConsumer() != null ? te.getLogConsumer().toString() : null; | ||
| } catch (Exception ignored) { | ||
| } | ||
| break; | ||
| } | ||
| cur = cur.getCause(); | ||
| } | ||
|
|
||
| return new ExtractedFailure(exitCode, logs); | ||
| } | ||
|
Comment on lines
+159
to
+230
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion | 🟠 Major Significant code duplication with The 🤖 Prompt for AI Agents |
||
|
|
||
| @Data | ||
| @AllArgsConstructor | ||
| public static class Output implements io.kestra.core.models.tasks.Output { | ||
| private Instant timestamp; | ||
| private String condition; | ||
| private Integer exitCode; | ||
| private Map<String, Object> vars; | ||
| private String logs; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider excluding
lastMatchedfrom@EqualsAndHashCode.Same concern as
ScriptTrigger: the mutablelastMatchedfield should be excluded from equality checks.♻️ Proposed fix
And on the field (Line 117):
`@Builder.Default` `@Getter`(AccessLevel.NONE) +@EqualsAndHashCode.Exclude private final AtomicBoolean lastMatched = new AtomicBoolean(false);🤖 Prompt for AI Agents