From e8e758558ea24ae59b05a9eeb6ba3b55fc278857 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Tue, 20 Jan 2026 17:16:27 +0100 Subject: [PATCH] [Fix #1107] Adding workflow status change event Signed-off-by: fjtirado --- .../impl/WorkflowApplication.java | 12 ++++++ .../impl/WorkflowMutableInstance.java | 24 +++++++---- .../lifecycle/WorkflowExecutionListener.java | 2 + .../impl/lifecycle/WorkflowStatusEvent.java | 40 +++++++++++++++++++ .../ce/AbstractLifeCyclePublisher.java | 33 ++++++++++++++- .../ce/WorkflowStatusCEDataEvent.java | 21 ++++++++++ .../impl/test/TraceExecutionListener.java | 33 +++++++++------ 7 files changed, 143 insertions(+), 22 deletions(-) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowStatusEvent.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowStatusCEDataEvent.java diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index 8c880537d..cd70f881a 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -70,6 +70,7 @@ public class WorkflowApplication implements AutoCloseable { private final EventConsumer eventConsumer; private final Collection eventPublishers; private final boolean lifeCycleCEPublishingEnabled; + private final boolean lifeCycleStatusChangeEnabled; private final WorkflowModelFactory modelFactory; private final WorkflowModelFactory contextFactory; private final WorkflowScheduler scheduler; @@ -95,6 +96,7 @@ private WorkflowApplication(Builder builder) { this.eventConsumer = builder.eventConsumer; this.eventPublishers = builder.eventPublishers; this.lifeCycleCEPublishingEnabled = builder.lifeCycleCEPublishingEnabled; + this.lifeCycleStatusChangeEnabled = builder.lifeCycleStatusChangeEnabled; this.modelFactory = builder.modelFactory; this.contextFactory = builder.contextFactory; this.scheduler = builder.scheduler; @@ -179,6 +181,7 @@ public SchemaValidator getValidator(SchemaInline inline) { private RuntimeDescriptorFactory descriptorFactory = () -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap()); private boolean lifeCycleCEPublishingEnabled = true; + private boolean lifeCycleStatusChangeEnabled = true; private WorkflowModelFactory modelFactory; private WorkflowModelFactory contextFactory; private Map> additionalObjects = new HashMap<>(); @@ -224,6 +227,11 @@ public Builder disableLifeCycleCEPublishing() { return this; } + public Builder disableStatusChangePublishing() { + this.lifeCycleStatusChangeEnabled = false; + return this; + } + public Builder withExecutorFactory(ExecutorServiceFactory executorFactory) { this.executorFactory = executorFactory; return this; @@ -421,6 +429,10 @@ public boolean isLifeCycleCEPublishingEnabled() { return lifeCycleCEPublishingEnabled; } + public boolean isStatusChangePublishingEnabled() { + return lifeCycleStatusChangeEnabled; + } + public WorkflowScheduler scheduler() { return scheduler; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java index b724c0128..b02d07155 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java @@ -23,6 +23,7 @@ import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; import java.time.Instant; import java.util.Map; @@ -38,7 +39,7 @@ public class WorkflowMutableInstance implements WorkflowInstance { - protected final AtomicReference status; + private final AtomicReference status; protected final String id; protected final WorkflowModel input; @@ -75,7 +76,7 @@ protected final CompletableFuture startExecution(Runnable runnabl if (future != null) { return future; } - status.set(WorkflowStatus.RUNNING); + status(WorkflowStatus.RUNNING); runnable.run(); future = TaskExecutorHelper.processTaskList( @@ -106,7 +107,7 @@ private void whenCompleted(WorkflowModel result, Throwable ex) { private void handleException(Throwable ex) { if (!(ex instanceof CancellationException)) { - status.set(WorkflowStatus.FAULTED); + status(WorkflowStatus.FAULTED); publishEvent( workflowContext, l -> l.onWorkflowFailed(new WorkflowFailedEvent(workflowContext, ex))); } @@ -120,7 +121,7 @@ private WorkflowModel whenSuccess(WorkflowModel node) { .map(f -> f.apply(workflowContext, null, node)) .orElse(node); workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(output)); - status.set(WorkflowStatus.COMPLETED); + status(WorkflowStatus.COMPLETED); publishEvent( workflowContext, l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext, output))); @@ -177,7 +178,14 @@ public T outputAs(Class clazz) { } public void status(WorkflowStatus state) { - this.status.set(state); + WorkflowStatus prevState = this.status.getAndSet(state); + if (prevState != state) { + publishEvent( + workflowContext, + l -> + l.onWorkflowStatusChanged( + new WorkflowStatusEvent(workflowContext, prevState, state))); + } } @Override @@ -213,7 +221,7 @@ public boolean suspend() { protected final void internalSuspend() { suspended = new ConcurrentHashMap<>(); - status.set(WorkflowStatus.SUSPENDED); + status(WorkflowStatus.SUSPENDED); } @Override @@ -259,7 +267,7 @@ public CompletableFuture suspendedCheck(TaskContext t) { suspended.put(suspendedTask, t); return suspendedTask; } else if (TaskExecutorHelper.isActive(status.get())) { - status.set(WorkflowStatus.RUNNING); + status(WorkflowStatus.RUNNING); } } finally { statusLock.unlock(); @@ -272,7 +280,7 @@ public boolean cancel() { try { statusLock.lock(); if (TaskExecutorHelper.isActive(status.get())) { - status.set(WorkflowStatus.CANCELLED); + status(WorkflowStatus.CANCELLED); publishEvent( workflowContext, l -> l.onWorkflowCancelled(new WorkflowCancelledEvent(workflowContext))); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java index e88e8cbdf..d8fd1825c 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java @@ -43,6 +43,8 @@ default void onTaskResumed(TaskResumedEvent ev) {} default void onTaskRetried(TaskRetriedEvent ev) {} + default void onWorkflowStatusChanged(WorkflowStatusEvent ev) {} + @Override default void close() {} } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowStatusEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowStatusEvent.java new file mode 100644 index 000000000..aa7ff746b --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowStatusEvent.java @@ -0,0 +1,40 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowStatus; + +public class WorkflowStatusEvent extends WorkflowEvent { + + private final WorkflowStatus status; + private final WorkflowStatus prevStatus; + + public WorkflowStatusEvent( + WorkflowContextData workflow, WorkflowStatus prevStatus, WorkflowStatus status) { + super(workflow); + this.status = status; + this.prevStatus = prevStatus; + } + + public WorkflowStatus status() { + return status; + } + + public WorkflowStatus previousStatus() { + return prevStatus; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java index ca736bc8b..f7e489fb9 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java @@ -41,6 +41,7 @@ import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; import java.time.OffsetDateTime; import java.util.Collection; @@ -63,6 +64,8 @@ public abstract class AbstractLifeCyclePublisher implements WorkflowExecutionLis private static final String WORKFLOW_RESUMED = "io.serverlessworkflow.workflow.resumed.v1"; private static final String WORKFLOW_FAULTED = "io.serverlessworkflow.workflow.faulted.v1"; private static final String WORKFLOW_CANCELLED = "io.serverlessworkflow.workflow.cancelled.v1"; + private static final String WORKFLOW_STATUS_CHANGED = + "io.serverlessworkflow.workflow.status-changed.v1"; public static Collection getLifeCycleTypes() { return Set.of( @@ -78,7 +81,8 @@ public static Collection getLifeCycleTypes() { WORKFLOW_SUSPENDED, WORKFLOW_RESUMED, WORKFLOW_FAULTED, - WORKFLOW_CANCELLED); + WORKFLOW_CANCELLED, + WORKFLOW_STATUS_CHANGED); } @Override @@ -263,6 +267,23 @@ public void onWorkflowFailed(WorkflowFailedEvent event) { .build()); } + @Override + public void onWorkflowStatusChanged(WorkflowStatusEvent event) { + if (appl(event).isStatusChangePublishingEnabled()) { + publish( + event, + ev -> + builder() + .withData( + cloudEventData( + new WorkflowStatusCEDataEvent( + id(ev), ref(ev), ev.eventDate(), ev.status().toString()), + this::convert)) + .withType(WORKFLOW_STATUS_CHANGED) + .build()); + } + } + protected byte[] convert(WorkflowStartedCEData data) { return convertToBytes(data); } @@ -315,10 +336,14 @@ protected byte[] convert(TaskResumedCEData data) { return convertToBytes(data); } + protected byte[] convert(WorkflowStatusCEDataEvent data) { + return convertToBytes(data); + } + protected abstract byte[] convertToBytes(T data); protected void publish(T ev, Function ceFunction) { - WorkflowApplication appl = ev.workflowContext().definition().application(); + WorkflowApplication appl = appl(ev); if (appl.isLifeCycleCEPublishingEnabled()) { publish(appl, ceFunction.apply(ev)); } @@ -342,6 +367,10 @@ private static CloudEventBuilder builder() { .withTime(OffsetDateTime.now()); } + private static WorkflowApplication appl(WorkflowEvent ev) { + return ev.workflowContext().definition().application(); + } + private static String id(WorkflowEvent ev) { return ev.workflowContext().instanceData().id(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowStatusCEDataEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowStatusCEDataEvent.java new file mode 100644 index 000000000..645d2e10d --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowStatusCEDataEvent.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import java.time.OffsetDateTime; + +public record WorkflowStatusCEDataEvent( + String name, WorkflowDefinitionCEData definition, OffsetDateTime updatetAt, String status) {} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java index 679e8d564..9e75cd38f 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java @@ -15,19 +15,16 @@ */ package io.serverlessworkflow.impl.test; -import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent; import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; -import io.serverlessworkflow.impl.lifecycle.TaskResumedEvent; import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; -import io.serverlessworkflow.impl.lifecycle.TaskSuspendedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +33,7 @@ public class TraceExecutionListener implements WorkflowExecutionListener { private static final Logger logger = LoggerFactory.getLogger(TraceExecutionListener.class); + @Override public void onWorkflowStarted(WorkflowStartedEvent ev) { logger.info( "Workflow definition {} with id {} started at {} with data {}", @@ -45,6 +43,7 @@ public void onWorkflowStarted(WorkflowStartedEvent ev) { ev.workflowContext().instanceData().input()); } + @Override public void onWorkflowResumed(WorkflowResumedEvent ev) { logger.info( "Workflow definition {} with id {} resumed at {}", @@ -53,6 +52,7 @@ public void onWorkflowResumed(WorkflowResumedEvent ev) { ev.eventDate()); } + @Override public void onWorkflowSuspended(WorkflowSuspendedEvent ev) { logger.info( "Workflow definition {} with id {} suspended at {}", @@ -61,6 +61,7 @@ public void onWorkflowSuspended(WorkflowSuspendedEvent ev) { ev.eventDate()); } + @Override public void onWorkflowCompleted(WorkflowCompletedEvent ev) { logger.info( "Workflow definition {} with id {} completed at {}", @@ -69,6 +70,7 @@ public void onWorkflowCompleted(WorkflowCompletedEvent ev) { ev.eventDate()); } + @Override public void onWorkflowFailed(WorkflowFailedEvent ev) { logger.info( "Workflow definition {} with id {} failed at {}", @@ -78,8 +80,7 @@ public void onWorkflowFailed(WorkflowFailedEvent ev) { ev.cause()); } - public void onWorkflowCancelled(WorkflowCancelledEvent ev) {} - + @Override public void onTaskStarted(TaskStartedEvent ev) { logger.info( "Task {} started at {}, position {}", @@ -88,6 +89,7 @@ public void onTaskStarted(TaskStartedEvent ev) { ev.taskContext().position()); } + @Override public void onTaskCompleted(TaskCompletedEvent ev) { logger.info( "Task {} completed at {} with output {}", @@ -96,6 +98,7 @@ public void onTaskCompleted(TaskCompletedEvent ev) { ev.taskContext().output().asJavaObject()); } + @Override public void onTaskFailed(TaskFailedEvent ev) { logger.info( "Task {} failed at {}", @@ -105,12 +108,7 @@ public void onTaskFailed(TaskFailedEvent ev) { ev.cause()); } - public void onTaskCancelled(TaskCancelledEvent ev) {} - - public void onTaskSuspended(TaskSuspendedEvent ev) {} - - public void onTaskResumed(TaskResumedEvent ev) {} - + @Override public void onTaskRetried(TaskRetriedEvent ev) { logger.info( "Task {} retried at {}, position {}", @@ -118,4 +116,15 @@ public void onTaskRetried(TaskRetriedEvent ev) { ev.eventDate(), ev.taskContext().position()); } + + @Override + public void onWorkflowStatusChanged(WorkflowStatusEvent ev) { + logger.info( + "Workflow definition {} with id {} changed status from {} to {} at {}", + ev.workflowContext().definition().workflow().getDocument().getName(), + ev.workflowContext().instanceData().id(), + ev.previousStatus(), + ev.status(), + ev.eventDate()); + } }