diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Trigger.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Trigger.java index a24d4b91..5bf662ef 100644 --- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Trigger.java +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Trigger.java @@ -8,6 +8,10 @@ public class Trigger implements Deployable { public static final String PAUSED_OPTION = "paused"; + /** Marker option used by FIRE TRIGGER to signal a fire intent through DeploymentService. + * Recognised by deployers (see K8sTriggerDeployer) to short-circuit normal update. */ + public static final String FIRE_OPTION = "fire"; + private final String name; private final UserJob job; private final String cronSchedule; diff --git a/hoptimator-jdbc/src/main/codegen/includes/parserImpls.ftl b/hoptimator-jdbc/src/main/codegen/includes/parserImpls.ftl index ea7f6d63..e8151dda 100644 --- a/hoptimator-jdbc/src/main/codegen/includes/parserImpls.ftl +++ b/hoptimator-jdbc/src/main/codegen/includes/parserImpls.ftl @@ -486,11 +486,13 @@ SqlFire SqlFireTable(Span s) : SqlFire SqlFireTrigger(Span s) : { final SqlIdentifier id; + SqlNodeList options = null; } { id = CompoundIdentifier() + [ options = Options() ] { - return new SqlFireTrigger(s.end(this), id); + return new SqlFireTrigger(s.end(this), id, options); } } diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java index 47b3f871..e362a605 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java @@ -31,6 +31,7 @@ import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTable; import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTrigger; import com.linkedin.hoptimator.jdbc.ddl.SqlDropTrigger; +import com.linkedin.hoptimator.jdbc.ddl.SqlFireTrigger; import com.linkedin.hoptimator.jdbc.ddl.SqlPauseTrigger; import com.linkedin.hoptimator.jdbc.ddl.SqlResumeTrigger; import com.linkedin.hoptimator.util.DeploymentService; @@ -311,6 +312,41 @@ public void execute(SqlResumeTrigger resume, CalcitePrepare.Context context) { updateTriggerPausedState(resume, resume.name, false); } + /** Executes a {@code FIRE TRIGGER name [WITH (k v, ...)]} command. + * Options are merged into the trigger's job properties and the fire intent + * is passed to the deployer via {@link Trigger#FIRE_OPTION}; the deployer is + * responsible for in-flight rejection and bumping the trigger's timestamp. */ + public void execute(SqlFireTrigger fire, CalcitePrepare.Context context) { + logger.info("Validating statement: {}", fire); + try { + ValidationService.validateOrThrow(fire, connection); + } catch (SQLException e) { + throw new DdlException(fire, e.getMessage(), e); + } + + if (fire.name.names.size() > 1) { + throw new DdlException(fire, "Triggers cannot belong to a schema or database."); + } + String name = fire.name.names.get(0); + + Map options = HoptimatorDdlUtils.options(fire.options); + options.put(Trigger.FIRE_OPTION, "true"); + Trigger trigger = new Trigger(name, null, null, options, null, null); + + Collection deployers = null; + try { + logger.info("Firing trigger {} with {} option(s)", name, options.size() - 1); + deployers = DeploymentService.deployers(trigger, connection); + DeploymentService.update(deployers); + logger.info("FIRE TRIGGER {} completed", name); + } catch (Exception e) { + if (deployers != null) { + DeploymentService.restore(deployers); + } + throw new DdlException(fire, e.getMessage(), e); + } + } + /** Executes a {@code DROP TRIGGER} command. */ public void execute(SqlDropTrigger drop, CalcitePrepare.Context context) { logger.info("Validating statement: {}", drop); diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/HoptimatorDdlParserImpl.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/HoptimatorDdlParserImpl.java index d6b9dc53..f7355922 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/HoptimatorDdlParserImpl.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/HoptimatorDdlParserImpl.java @@ -6889,9 +6889,17 @@ final public SqlFire SqlFireTable(Span s) throws ParseException { final public SqlFire SqlFireTrigger(Span s) throws ParseException { final SqlIdentifier id; + SqlNodeList optionList = null; jj_consume_token(TRIGGER); id = CompoundIdentifier(); - {if (true) return new SqlFireTrigger(s.end(this), id);} + switch ((jj_ntk==-1)?jj_ntk():jj_ntk) { + case WITH: + optionList = Options(); + break; + default: + ; + } + {if (true) return new SqlFireTrigger(s.end(this), id, optionList);} throw new Error("Missing return statement in function"); } diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/SqlFireTrigger.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/SqlFireTrigger.java index 2919bf96..4e962c3d 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/SqlFireTrigger.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/SqlFireTrigger.java @@ -21,39 +21,50 @@ import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.SqlSpecialOperator; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; - -import com.google.common.collect.ImmutableList; +import org.apache.calcite.util.ImmutableNullableList; import java.util.List; /** - * Parse tree for {@code FIRE TRIGGER} statement. + * Parse tree for {@code FIRE TRIGGER} statement, optionally carrying a + * {@code WITH (key value, ...)} options list whose pairs are merged into the + * trigger's {@code spec.jobProperties} before the fire timestamp is bumped. */ public class SqlFireTrigger extends SqlFire { private static final SqlOperator OPERATOR = new SqlSpecialOperator("FIRE TRIGGER", SqlKind.OTHER_DDL); public final SqlIdentifier name; + public final SqlNodeList options; - /** - * Creates a SqlFireTrigger. - */ - public SqlFireTrigger(SqlParserPos pos, SqlIdentifier name) { + public SqlFireTrigger(SqlParserPos pos, SqlIdentifier name, SqlNodeList options) { super(OPERATOR, pos); this.name = name; + this.options = options; } + @SuppressWarnings("nullness") @Override public List getOperandList() { - return ImmutableList.of(name); + return ImmutableNullableList.of(name, options); } @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("FIRE"); writer.keyword("TRIGGER"); name.unparse(writer, leftPrec, rightPrec); + if (options != null) { + writer.keyword("WITH"); + SqlWriter.Frame frame = writer.startList("(", ")"); + for (SqlNode c : options) { + writer.sep(","); + c.unparse(writer, 0, 0); + } + writer.endList(frame); + } } } diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java index b095c4d4..4e52593e 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java @@ -7,10 +7,13 @@ import com.linkedin.hoptimator.k8s.models.V1alpha1TableTrigger; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerList; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerSpec; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerStatus; import com.linkedin.hoptimator.util.Template; import io.kubernetes.client.openapi.models.V1ObjectMeta; import java.sql.SQLException; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -45,6 +48,11 @@ public void update() throws SQLException { String canonicalName = K8sUtils.canonicalizeName(trigger.name()); V1alpha1TableTrigger existingTrigger = triggerApi.getIfExists(context.namespace(), canonicalName); + if (trigger.options().containsKey(Trigger.FIRE_OPTION)) { + fire(existingTrigger); + return; + } + Boolean targetPaused = null; if (trigger.options().containsKey(Trigger.PAUSED_OPTION)) { targetPaused = Boolean.TRUE.toString().equals(trigger.options().get(Trigger.PAUSED_OPTION)); @@ -72,6 +80,48 @@ public void update() throws SQLException { super.update(); } + /** Applies a FIRE intent: rejects on in-flight, merges WITH options into spec.jobProperties + * (mirroring CREATE TRIGGER's prefix-stripping), then bumps status.timestamp so the + * TableTriggerReconciler materialises a fresh Job. */ + private void fire(V1alpha1TableTrigger existingTrigger) throws SQLException { + if (existingTrigger == null) { + throw new SQLException("Trigger " + trigger.name() + " not found."); + } + V1alpha1TableTriggerStatus status = existingTrigger.getStatus(); + if (status != null && status.getTimestamp() != null + && (status.getWatermark() == null || status.getTimestamp().isAfter(status.getWatermark()))) { + throw new SQLException("Trigger " + trigger.name() + " has an in-flight execution (timestamp=" + + status.getTimestamp() + ", watermark=" + status.getWatermark() + + "). Wait for it to complete, or pause/resume to abort."); + } + + V1alpha1TableTriggerSpec spec = existingTrigger.getSpec(); + if (spec == null) { + spec = new V1alpha1TableTriggerSpec(); + existingTrigger.spec(spec); + } + Map jobProps = spec.getJobProperties() != null + ? new HashMap<>(spec.getJobProperties()) + : new HashMap<>(); + trigger.options().forEach((key, value) -> { + if (Trigger.FIRE_OPTION.equals(key)) { + return; + } + if (key.startsWith("job.properties.")) { + jobProps.put(key.substring("job.properties.".length()), value); + } else { + jobProps.put(key, value); + } + }); + spec.jobProperties(jobProps); + triggerApi.update(existingTrigger); + + V1alpha1TableTriggerStatus newStatus = status != null ? status : new V1alpha1TableTriggerStatus(); + newStatus.setTimestamp(OffsetDateTime.now(ZoneOffset.UTC)); + existingTrigger.setStatus(newStatus); + triggerApi.updateStatus(existingTrigger, newStatus); + } + private void stampDependencyMetadata(V1alpha1TableTrigger target) { V1ObjectMeta meta = target.getMetadata(); if (meta == null) { diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployerTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployerTest.java index 0dc62226..b0713f12 100644 --- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployerTest.java +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployerTest.java @@ -10,7 +10,11 @@ import com.linkedin.hoptimator.k8s.models.V1alpha1TableTrigger; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerList; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerSpec; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerStatus; import io.kubernetes.client.openapi.models.V1ObjectMeta; + +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -662,4 +666,130 @@ void toK8sObjectSetsSpecPausedWhenPausedOptionTrue() throws SQLException { assertTrue(specs.get(0).contains("paused: true"), "spec.paused=true must be present in rendered YAML when PAUSED_OPTION=true — got: " + specs.get(0)); } + + // ───────── update() FIRE_OPTION (fire-trigger) tests ───────── + + private Trigger fireTrigger(Map extraOptions) { + Map options = new HashMap<>(extraOptions); + options.put(Trigger.FIRE_OPTION, "true"); + return new Trigger("MY_TRIGGER", null, null, options, null, null); + } + + @Test + void updateWithFireOptionBumpsTimestampOnExistingTrigger() throws SQLException { + V1alpha1TableTrigger existing = new V1alpha1TableTrigger() + .metadata(new V1ObjectMeta().name("mytrigger")) + .spec(new V1alpha1TableTriggerSpec()); + triggers.add(existing); + + OffsetDateTime before = OffsetDateTime.now(ZoneOffset.UTC).minusSeconds(1); + K8sTriggerDeployer deployer = makeDeployer(fireTrigger(Collections.emptyMap()), mockContext); + deployer.update(); + + assertNotNull(existing.getStatus(), "status must be set after FIRE"); + assertNotNull(existing.getStatus().getTimestamp(), "status.timestamp must be set"); + assertTrue(existing.getStatus().getTimestamp().isAfter(before), + "status.timestamp must be advanced past pre-fire time"); + } + + @Test + void updateWithFireOptionMergesJobPropertiesStrippingPrefix() throws SQLException { + // Pre-existing jobProperty value must be retained, and new WITH option (with + // job.properties. prefix) must be merged in with the prefix stripped — symmetric + // with the CREATE TRIGGER deployer behaviour. + V1alpha1TableTrigger existing = new V1alpha1TableTrigger() + .metadata(new V1ObjectMeta().name("mytrigger")) + .spec(new V1alpha1TableTriggerSpec() + .jobProperties(new HashMap<>(Collections.singletonMap("existing", "v0")))); + triggers.add(existing); + + Map opts = new HashMap<>(); + opts.put("job.properties.backfill.start.time", "2026-04-01"); + opts.put("job.properties.backfill.end.time", "2026-04-08"); + + K8sTriggerDeployer deployer = makeDeployer(fireTrigger(opts), mockContext); + deployer.update(); + + Map jobProps = existing.getSpec().getJobProperties(); + assertEquals("v0", jobProps.get("existing"), "pre-existing jobProperty must be retained"); + assertEquals("2026-04-01", jobProps.get("backfill.start.time"), + "WITH option must be merged with job.properties. prefix stripped"); + assertEquals("2026-04-08", jobProps.get("backfill.end.time")); + assertFalse(jobProps.containsKey(Trigger.FIRE_OPTION), + "FIRE marker must not leak into jobProperties"); + } + + @Test + void updateWithFireOptionRejectsWhenInFlight() { + // status.timestamp set, watermark null → in-flight Job already in progress. + V1alpha1TableTrigger existing = new V1alpha1TableTrigger() + .metadata(new V1ObjectMeta().name("mytrigger")) + .spec(new V1alpha1TableTriggerSpec()) + .status(new V1alpha1TableTriggerStatus().timestamp(OffsetDateTime.now(ZoneOffset.UTC))); + triggers.add(existing); + + K8sTriggerDeployer deployer = makeDeployer(fireTrigger(Collections.emptyMap()), mockContext); + SQLException ex = assertThrows(SQLException.class, deployer::update, + "FIRE on an in-flight trigger must throw"); + assertTrue(ex.getMessage().contains("in-flight"), + "exception must mention in-flight: " + ex.getMessage()); + } + + @Test + void updateWithFireOptionAllowsRefireAfterWatermarkCaughtUp() throws SQLException { + // status.timestamp == status.watermark → previous fire completed; new FIRE allowed. + OffsetDateTime t = OffsetDateTime.now(ZoneOffset.UTC).minusMinutes(5); + V1alpha1TableTrigger existing = new V1alpha1TableTrigger() + .metadata(new V1ObjectMeta().name("mytrigger")) + .spec(new V1alpha1TableTriggerSpec()) + .status(new V1alpha1TableTriggerStatus().timestamp(t).watermark(t)); + triggers.add(existing); + + K8sTriggerDeployer deployer = makeDeployer(fireTrigger(Collections.emptyMap()), mockContext); + deployer.update(); + + assertTrue(existing.getStatus().getTimestamp().isAfter(t), + "refire must advance status.timestamp past the previous fire"); + } + + @Test + void updateWithFireOptionThrowsWhenTriggerNotFound() { + K8sTriggerDeployer deployer = makeDeployer(fireTrigger(Collections.emptyMap()), mockContext); + SQLException ex = assertThrows(SQLException.class, deployer::update); + assertTrue(ex.getMessage().contains("not found"), + "exception must mention not-found: " + ex.getMessage()); + } + + @Test + void updateWithFireOptionCreatesSpecIfNull() throws SQLException { + V1alpha1TableTrigger existing = new V1alpha1TableTrigger() + .metadata(new V1ObjectMeta().name("mytrigger")) + .spec(null); + triggers.add(existing); + + K8sTriggerDeployer deployer = makeDeployer( + fireTrigger(Collections.singletonMap("job.properties.k", "v")), mockContext); + deployer.update(); + + assertNotNull(existing.getSpec(), "spec must be initialised by FIRE when previously null"); + assertEquals("v", existing.getSpec().getJobProperties().get("k")); + } + + @Test + void updateWithFireOptionKeepsUnprefixedOptionsVerbatim() throws SQLException { + // Unprefixed keys land in spec.jobProperties as-is — symmetric with CREATE which + // accepts both forms. + V1alpha1TableTrigger existing = new V1alpha1TableTrigger() + .metadata(new V1ObjectMeta().name("mytrigger")) + .spec(new V1alpha1TableTriggerSpec()); + triggers.add(existing); + + K8sTriggerDeployer deployer = makeDeployer( + fireTrigger(Collections.singletonMap("backfill.start.time", "2026-04-01")), mockContext); + deployer.update(); + + assertEquals("2026-04-01", + existing.getSpec().getJobProperties().get("backfill.start.time"), + "unprefixed FIRE option must land in jobProperties verbatim"); + } } diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java index d5f7403b..5712537a 100644 --- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java @@ -49,6 +49,11 @@ public void k8sTriggerValidation() throws Exception { run("k8s-trigger-validation.id"); } + @Test + public void k8sTriggerFire() throws Exception { + run("k8s-trigger-fire.id"); + } + @Test public void k8sDdlCreateDatabase() throws Exception { run("k8s-ddl-create-database.id"); diff --git a/hoptimator-k8s/src/test/resources/k8s-trigger-fire.id b/hoptimator-k8s/src/test/resources/k8s-trigger-fire.id new file mode 100644 index 00000000..13dfff6a --- /dev/null +++ b/hoptimator-k8s/src/test/resources/k8s-trigger-fire.id @@ -0,0 +1,50 @@ +!set outputformat mysql +!use k8s + +# Create a trigger we can fire. +create trigger testfire on ads_catalog.ads.ad_clicks as 'my-app' in 'my-mp' with ('job.properties.foo' 'initial'); +(0 rows modified) + +!update + +select name, schema, "TABLE" from "k8s".table_triggers where name = 'testfire'; ++----------+--------+-----------+ +| NAME | SCHEMA | TABLE | ++----------+--------+-----------+ +| testfire | ADS | AD_CLICKS | ++----------+--------+-----------+ +(1 row) + +!ok + +# FIRE without options: bumps timestamp; trigger still resolvable. +fire trigger testfire; +(0 rows modified) + +!update + +# In-flight check: a second FIRE before the watermark catches up is rejected. +fire trigger testfire; +Trigger TESTFIRE has an in-flight execution +!error + +drop trigger testfire; +(0 rows modified) + +!update + +# FIRE with WITH options: options should be merged into spec.jobProperties. +create trigger testfirewith on ads_catalog.ads.ad_clicks as 'my-app' in 'my-mp' with ('job.properties.foo' 'initial'); +(0 rows modified) + +!update + +fire trigger testfirewith with ('job.properties.backfill.start.time' '2026-04-01', 'job.properties.backfill.end.time' '2026-04-08'); +(0 rows modified) + +!update + +drop trigger testfirewith; +(0 rows modified) + +!update