Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ public class Trigger implements Deployable {

public static final String PAUSED_OPTION = "paused";

/** Marker option used by FIRE TRIGGER to signal a fire intent through DeploymentService.
* Recognised by deployers (see K8sTriggerDeployer) to short-circuit normal update. */
public static final String FIRE_OPTION = "fire";

private final String name;
private final UserJob job;
private final String cronSchedule;
Expand Down
4 changes: 3 additions & 1 deletion hoptimator-jdbc/src/main/codegen/includes/parserImpls.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -486,11 +486,13 @@ SqlFire SqlFireTable(Span s) :
SqlFire SqlFireTrigger(Span s) :
{
final SqlIdentifier id;
SqlNodeList options = null;
}
{
<TRIGGER> id = CompoundIdentifier()
[ options = Options() ]
{
return new SqlFireTrigger(s.end(this), id);
return new SqlFireTrigger(s.end(this), id, options);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTable;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTrigger;
import com.linkedin.hoptimator.jdbc.ddl.SqlDropTrigger;
import com.linkedin.hoptimator.jdbc.ddl.SqlFireTrigger;
import com.linkedin.hoptimator.jdbc.ddl.SqlPauseTrigger;
import com.linkedin.hoptimator.jdbc.ddl.SqlResumeTrigger;
import com.linkedin.hoptimator.util.DeploymentService;
Expand Down Expand Up @@ -311,6 +312,41 @@ public void execute(SqlResumeTrigger resume, CalcitePrepare.Context context) {
updateTriggerPausedState(resume, resume.name, false);
}

/** Executes a {@code FIRE TRIGGER name [WITH (k v, ...)]} command.
* Options are merged into the trigger's job properties and the fire intent
* is passed to the deployer via {@link Trigger#FIRE_OPTION}; the deployer is
* responsible for in-flight rejection and bumping the trigger's timestamp. */
public void execute(SqlFireTrigger fire, CalcitePrepare.Context context) {
logger.info("Validating statement: {}", fire);
try {
ValidationService.validateOrThrow(fire, connection);
} catch (SQLException e) {
throw new DdlException(fire, e.getMessage(), e);
}

if (fire.name.names.size() > 1) {
throw new DdlException(fire, "Triggers cannot belong to a schema or database.");
}
String name = fire.name.names.get(0);

Map<String, String> options = HoptimatorDdlUtils.options(fire.options);
options.put(Trigger.FIRE_OPTION, "true");
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the difference between create trigger and fire trigger is essentially this fire: true? Wonder what happens if you create trigger ... with (fire 'true')?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of now, WITH (fire 'true') on create trigger doesnt set Trigger.FIRE_OPTION. We could wire it up easily but i think it will get little more complex,

  1. If create trigger succeeds but the fire step fails, do we roll back the CR?
  2. CREATE OR REPLACE ... WITH (fire 'true') needs to decide if we fire only on replace, only on create, or both?

So, i feel we should keep the firing a trigger as a separate grammar for itself.

Trigger trigger = new Trigger(name, null, null, options, null, null);

Collection<Deployer> deployers = null;
try {
logger.info("Firing trigger {} with {} option(s)", name, options.size() - 1);
deployers = DeploymentService.deployers(trigger, connection);
DeploymentService.update(deployers);
logger.info("FIRE TRIGGER {} completed", name);
} catch (Exception e) {
if (deployers != null) {
DeploymentService.restore(deployers);
}
throw new DdlException(fire, e.getMessage(), e);
}
}

/** Executes a {@code DROP TRIGGER} command. */
public void execute(SqlDropTrigger drop, CalcitePrepare.Context context) {
logger.info("Validating statement: {}", drop);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6889,9 +6889,17 @@ final public SqlFire SqlFireTable(Span s) throws ParseException {

final public SqlFire SqlFireTrigger(Span s) throws ParseException {
final SqlIdentifier id;
SqlNodeList optionList = null;
jj_consume_token(TRIGGER);
id = CompoundIdentifier();
{if (true) return new SqlFireTrigger(s.end(this), id);}
switch ((jj_ntk==-1)?jj_ntk():jj_ntk) {
case WITH:
optionList = Options();
break;
default:
;
}
{if (true) return new SqlFireTrigger(s.end(this), id, optionList);}
throw new Error("Missing return statement in function");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,50 @@
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;

import com.google.common.collect.ImmutableList;
import org.apache.calcite.util.ImmutableNullableList;

import java.util.List;

/**
* Parse tree for {@code FIRE TRIGGER} statement.
* Parse tree for {@code FIRE TRIGGER} statement, optionally carrying a
* {@code WITH (key value, ...)} options list whose pairs are merged into the
* trigger's {@code spec.jobProperties} before the fire timestamp is bumped.
*/
public class SqlFireTrigger extends SqlFire {

private static final SqlOperator OPERATOR =
new SqlSpecialOperator("FIRE TRIGGER", SqlKind.OTHER_DDL);
public final SqlIdentifier name;
public final SqlNodeList options;

/**
* Creates a SqlFireTrigger.
*/
public SqlFireTrigger(SqlParserPos pos, SqlIdentifier name) {
public SqlFireTrigger(SqlParserPos pos, SqlIdentifier name, SqlNodeList options) {
super(OPERATOR, pos);
this.name = name;
this.options = options;
}

@SuppressWarnings("nullness")
@Override public List<SqlNode> getOperandList() {
return ImmutableList.of(name);
return ImmutableNullableList.of(name, options);
}

@Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("FIRE");
writer.keyword("TRIGGER");
name.unparse(writer, leftPrec, rightPrec);
if (options != null) {
writer.keyword("WITH");
SqlWriter.Frame frame = writer.startList("(", ")");
for (SqlNode c : options) {
writer.sep(",");
c.unparse(writer, 0, 0);
}
writer.endList(frame);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTrigger;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerList;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerSpec;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerStatus;
import com.linkedin.hoptimator.util.Template;
import io.kubernetes.client.openapi.models.V1ObjectMeta;

import java.sql.SQLException;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -45,6 +48,11 @@ public void update() throws SQLException {
String canonicalName = K8sUtils.canonicalizeName(trigger.name());
V1alpha1TableTrigger existingTrigger = triggerApi.getIfExists(context.namespace(), canonicalName);

if (trigger.options().containsKey(Trigger.FIRE_OPTION)) {
fire(existingTrigger);
return;
}

Boolean targetPaused = null;
if (trigger.options().containsKey(Trigger.PAUSED_OPTION)) {
targetPaused = Boolean.TRUE.toString().equals(trigger.options().get(Trigger.PAUSED_OPTION));
Expand Down Expand Up @@ -72,6 +80,48 @@ public void update() throws SQLException {
super.update();
}

/** Applies a FIRE intent: rejects on in-flight, merges WITH options into spec.jobProperties
* (mirroring CREATE TRIGGER's prefix-stripping), then bumps status.timestamp so the
* TableTriggerReconciler materialises a fresh Job. */
private void fire(V1alpha1TableTrigger existingTrigger) throws SQLException {
if (existingTrigger == null) {
throw new SQLException("Trigger " + trigger.name() + " not found.");
}
V1alpha1TableTriggerStatus status = existingTrigger.getStatus();
if (status != null && status.getTimestamp() != null
&& (status.getWatermark() == null || status.getTimestamp().isAfter(status.getWatermark()))) {
throw new SQLException("Trigger " + trigger.name() + " has an in-flight execution (timestamp="
+ status.getTimestamp() + ", watermark=" + status.getWatermark()
+ "). Wait for it to complete, or pause/resume to abort.");
}

V1alpha1TableTriggerSpec spec = existingTrigger.getSpec();
if (spec == null) {
spec = new V1alpha1TableTriggerSpec();
existingTrigger.spec(spec);
}
Map<String, String> jobProps = spec.getJobProperties() != null
? new HashMap<>(spec.getJobProperties())
: new HashMap<>();
trigger.options().forEach((key, value) -> {
if (Trigger.FIRE_OPTION.equals(key)) {
return;
}
if (key.startsWith("job.properties.")) {
jobProps.put(key.substring("job.properties.".length()), value);
} else {
jobProps.put(key, value);
}
});
spec.jobProperties(jobProps);
triggerApi.update(existingTrigger);

V1alpha1TableTriggerStatus newStatus = status != null ? status : new V1alpha1TableTriggerStatus();
newStatus.setTimestamp(OffsetDateTime.now(ZoneOffset.UTC));
existingTrigger.setStatus(newStatus);
triggerApi.updateStatus(existingTrigger, newStatus);
}

private void stampDependencyMetadata(V1alpha1TableTrigger target) {
V1ObjectMeta meta = target.getMetadata();
if (meta == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTrigger;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerList;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerSpec;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerStatus;
import io.kubernetes.client.openapi.models.V1ObjectMeta;

import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -662,4 +666,130 @@ void toK8sObjectSetsSpecPausedWhenPausedOptionTrue() throws SQLException {
assertTrue(specs.get(0).contains("paused: true"),
"spec.paused=true must be present in rendered YAML when PAUSED_OPTION=true — got: " + specs.get(0));
}

// ───────── update() FIRE_OPTION (fire-trigger) tests ─────────

private Trigger fireTrigger(Map<String, String> extraOptions) {
Map<String, String> options = new HashMap<>(extraOptions);
options.put(Trigger.FIRE_OPTION, "true");
return new Trigger("MY_TRIGGER", null, null, options, null, null);
}

@Test
void updateWithFireOptionBumpsTimestampOnExistingTrigger() throws SQLException {
V1alpha1TableTrigger existing = new V1alpha1TableTrigger()
.metadata(new V1ObjectMeta().name("mytrigger"))
.spec(new V1alpha1TableTriggerSpec());
triggers.add(existing);

OffsetDateTime before = OffsetDateTime.now(ZoneOffset.UTC).minusSeconds(1);
K8sTriggerDeployer deployer = makeDeployer(fireTrigger(Collections.emptyMap()), mockContext);
deployer.update();

assertNotNull(existing.getStatus(), "status must be set after FIRE");
assertNotNull(existing.getStatus().getTimestamp(), "status.timestamp must be set");
assertTrue(existing.getStatus().getTimestamp().isAfter(before),
"status.timestamp must be advanced past pre-fire time");
}

@Test
void updateWithFireOptionMergesJobPropertiesStrippingPrefix() throws SQLException {
// Pre-existing jobProperty value must be retained, and new WITH option (with
// job.properties. prefix) must be merged in with the prefix stripped — symmetric
// with the CREATE TRIGGER deployer behaviour.
V1alpha1TableTrigger existing = new V1alpha1TableTrigger()
.metadata(new V1ObjectMeta().name("mytrigger"))
.spec(new V1alpha1TableTriggerSpec()
.jobProperties(new HashMap<>(Collections.singletonMap("existing", "v0"))));
triggers.add(existing);

Map<String, String> opts = new HashMap<>();
opts.put("job.properties.backfill.start.time", "2026-04-01");
opts.put("job.properties.backfill.end.time", "2026-04-08");

K8sTriggerDeployer deployer = makeDeployer(fireTrigger(opts), mockContext);
deployer.update();

Map<String, String> jobProps = existing.getSpec().getJobProperties();
assertEquals("v0", jobProps.get("existing"), "pre-existing jobProperty must be retained");
assertEquals("2026-04-01", jobProps.get("backfill.start.time"),
"WITH option must be merged with job.properties. prefix stripped");
assertEquals("2026-04-08", jobProps.get("backfill.end.time"));
assertFalse(jobProps.containsKey(Trigger.FIRE_OPTION),
"FIRE marker must not leak into jobProperties");
}

@Test
void updateWithFireOptionRejectsWhenInFlight() {
// status.timestamp set, watermark null → in-flight Job already in progress.
V1alpha1TableTrigger existing = new V1alpha1TableTrigger()
.metadata(new V1ObjectMeta().name("mytrigger"))
.spec(new V1alpha1TableTriggerSpec())
.status(new V1alpha1TableTriggerStatus().timestamp(OffsetDateTime.now(ZoneOffset.UTC)));
triggers.add(existing);

K8sTriggerDeployer deployer = makeDeployer(fireTrigger(Collections.emptyMap()), mockContext);
SQLException ex = assertThrows(SQLException.class, deployer::update,
"FIRE on an in-flight trigger must throw");
assertTrue(ex.getMessage().contains("in-flight"),
"exception must mention in-flight: " + ex.getMessage());
}

@Test
void updateWithFireOptionAllowsRefireAfterWatermarkCaughtUp() throws SQLException {
// status.timestamp == status.watermark → previous fire completed; new FIRE allowed.
OffsetDateTime t = OffsetDateTime.now(ZoneOffset.UTC).minusMinutes(5);
V1alpha1TableTrigger existing = new V1alpha1TableTrigger()
.metadata(new V1ObjectMeta().name("mytrigger"))
.spec(new V1alpha1TableTriggerSpec())
.status(new V1alpha1TableTriggerStatus().timestamp(t).watermark(t));
triggers.add(existing);

K8sTriggerDeployer deployer = makeDeployer(fireTrigger(Collections.emptyMap()), mockContext);
deployer.update();

assertTrue(existing.getStatus().getTimestamp().isAfter(t),
"refire must advance status.timestamp past the previous fire");
}

@Test
void updateWithFireOptionThrowsWhenTriggerNotFound() {
K8sTriggerDeployer deployer = makeDeployer(fireTrigger(Collections.emptyMap()), mockContext);
SQLException ex = assertThrows(SQLException.class, deployer::update);
assertTrue(ex.getMessage().contains("not found"),
"exception must mention not-found: " + ex.getMessage());
}

@Test
void updateWithFireOptionCreatesSpecIfNull() throws SQLException {
V1alpha1TableTrigger existing = new V1alpha1TableTrigger()
.metadata(new V1ObjectMeta().name("mytrigger"))
.spec(null);
triggers.add(existing);

K8sTriggerDeployer deployer = makeDeployer(
fireTrigger(Collections.singletonMap("job.properties.k", "v")), mockContext);
deployer.update();

assertNotNull(existing.getSpec(), "spec must be initialised by FIRE when previously null");
assertEquals("v", existing.getSpec().getJobProperties().get("k"));
}

@Test
void updateWithFireOptionKeepsUnprefixedOptionsVerbatim() throws SQLException {
// Unprefixed keys land in spec.jobProperties as-is — symmetric with CREATE which
// accepts both forms.
V1alpha1TableTrigger existing = new V1alpha1TableTrigger()
.metadata(new V1ObjectMeta().name("mytrigger"))
.spec(new V1alpha1TableTriggerSpec());
triggers.add(existing);

K8sTriggerDeployer deployer = makeDeployer(
fireTrigger(Collections.singletonMap("backfill.start.time", "2026-04-01")), mockContext);
deployer.update();

assertEquals("2026-04-01",
existing.getSpec().getJobProperties().get("backfill.start.time"),
"unprefixed FIRE option must land in jobProperties verbatim");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public void k8sTriggerValidation() throws Exception {
run("k8s-trigger-validation.id");
}

@Test
public void k8sTriggerFire() throws Exception {
run("k8s-trigger-fire.id");
}

@Test
public void k8sDdlCreateDatabase() throws Exception {
run("k8s-ddl-create-database.id");
Expand Down
Loading
Loading