Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class WorkflowApplication implements AutoCloseable {
private final EventConsumer<?, ?> eventConsumer;
private final Collection<EventPublisher> eventPublishers;
private final boolean lifeCycleCEPublishingEnabled;
private final boolean lifeCycleStatusChangeEnabled;
private final WorkflowModelFactory modelFactory;
private final WorkflowModelFactory contextFactory;
private final WorkflowScheduler scheduler;
Expand All @@ -95,6 +96,7 @@ private WorkflowApplication(Builder builder) {
this.eventConsumer = builder.eventConsumer;
this.eventPublishers = builder.eventPublishers;
this.lifeCycleCEPublishingEnabled = builder.lifeCycleCEPublishingEnabled;
this.lifeCycleStatusChangeEnabled = builder.lifeCycleStatusChangeEnabled;
this.modelFactory = builder.modelFactory;
this.contextFactory = builder.contextFactory;
this.scheduler = builder.scheduler;
Expand Down Expand Up @@ -179,6 +181,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
private RuntimeDescriptorFactory descriptorFactory =
() -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap());
private boolean lifeCycleCEPublishingEnabled = true;
private boolean lifeCycleStatusChangeEnabled = true;
private WorkflowModelFactory modelFactory;
private WorkflowModelFactory contextFactory;
private Map<String, WorkflowAdditionalObject<?>> additionalObjects = new HashMap<>();
Expand Down Expand Up @@ -224,6 +227,11 @@ public Builder disableLifeCycleCEPublishing() {
return this;
}

public Builder disableStatusChangePublishing() {
this.lifeCycleStatusChangeEnabled = false;
return this;
}

public Builder withExecutorFactory(ExecutorServiceFactory executorFactory) {
this.executorFactory = executorFactory;
return this;
Expand Down Expand Up @@ -421,6 +429,10 @@ public boolean isLifeCycleCEPublishingEnabled() {
return lifeCycleCEPublishingEnabled;
}

public boolean isStatusChangePublishingEnabled() {
return lifeCycleStatusChangeEnabled;
}

public WorkflowScheduler scheduler() {
return scheduler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent;
import java.time.Instant;
import java.util.Map;
Expand All @@ -38,7 +39,7 @@

public class WorkflowMutableInstance implements WorkflowInstance {

protected final AtomicReference<WorkflowStatus> status;
private final AtomicReference<WorkflowStatus> status;
protected final String id;
protected final WorkflowModel input;

Expand Down Expand Up @@ -75,7 +76,7 @@ protected final CompletableFuture<WorkflowModel> startExecution(Runnable runnabl
if (future != null) {
return future;
}
status.set(WorkflowStatus.RUNNING);
status(WorkflowStatus.RUNNING);
runnable.run();
future =
TaskExecutorHelper.processTaskList(
Expand Down Expand Up @@ -106,7 +107,7 @@ private void whenCompleted(WorkflowModel result, Throwable ex) {

private void handleException(Throwable ex) {
if (!(ex instanceof CancellationException)) {
status.set(WorkflowStatus.FAULTED);
status(WorkflowStatus.FAULTED);
publishEvent(
workflowContext, l -> l.onWorkflowFailed(new WorkflowFailedEvent(workflowContext, ex)));
}
Expand All @@ -120,7 +121,7 @@ private WorkflowModel whenSuccess(WorkflowModel node) {
.map(f -> f.apply(workflowContext, null, node))
.orElse(node);
workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(output));
status.set(WorkflowStatus.COMPLETED);
status(WorkflowStatus.COMPLETED);
publishEvent(
workflowContext,
l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext, output)));
Expand Down Expand Up @@ -177,7 +178,14 @@ public <T> T outputAs(Class<T> clazz) {
}

public void status(WorkflowStatus state) {
this.status.set(state);
WorkflowStatus prevState = this.status.getAndSet(state);
if (prevState != state) {
publishEvent(
workflowContext,
l ->
l.onWorkflowStatusChanged(
new WorkflowStatusEvent(workflowContext, prevState, state)));
}
}

@Override
Expand Down Expand Up @@ -213,7 +221,7 @@ public boolean suspend() {

protected final void internalSuspend() {
suspended = new ConcurrentHashMap<>();
status.set(WorkflowStatus.SUSPENDED);
status(WorkflowStatus.SUSPENDED);
}

@Override
Expand Down Expand Up @@ -259,7 +267,7 @@ public CompletableFuture<TaskContext> suspendedCheck(TaskContext t) {
suspended.put(suspendedTask, t);
return suspendedTask;
} else if (TaskExecutorHelper.isActive(status.get())) {
status.set(WorkflowStatus.RUNNING);
status(WorkflowStatus.RUNNING);
}
} finally {
statusLock.unlock();
Expand All @@ -272,7 +280,7 @@ public boolean cancel() {
try {
statusLock.lock();
if (TaskExecutorHelper.isActive(status.get())) {
status.set(WorkflowStatus.CANCELLED);
status(WorkflowStatus.CANCELLED);
publishEvent(
workflowContext,
l -> l.onWorkflowCancelled(new WorkflowCancelledEvent(workflowContext)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ default void onTaskResumed(TaskResumedEvent ev) {}

default void onTaskRetried(TaskRetriedEvent ev) {}

default void onWorkflowStatusChanged(WorkflowStatusEvent ev) {}

@Override
default void close() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.lifecycle;

import io.serverlessworkflow.impl.WorkflowContextData;
import io.serverlessworkflow.impl.WorkflowStatus;

public class WorkflowStatusEvent extends WorkflowEvent {

private final WorkflowStatus status;
private final WorkflowStatus prevStatus;

public WorkflowStatusEvent(
WorkflowContextData workflow, WorkflowStatus prevStatus, WorkflowStatus status) {
super(workflow);
this.status = status;
this.prevStatus = prevStatus;
}

public WorkflowStatus status() {
return status;
}

public WorkflowStatus previousStatus() {
return prevStatus;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent;
import java.time.OffsetDateTime;
import java.util.Collection;
Expand All @@ -63,6 +64,8 @@ public abstract class AbstractLifeCyclePublisher implements WorkflowExecutionLis
private static final String WORKFLOW_RESUMED = "io.serverlessworkflow.workflow.resumed.v1";
private static final String WORKFLOW_FAULTED = "io.serverlessworkflow.workflow.faulted.v1";
private static final String WORKFLOW_CANCELLED = "io.serverlessworkflow.workflow.cancelled.v1";
private static final String WORKFLOW_STATUS_CHANGED =
"io.serverlessworkflow.workflow.status-changed.v1";

public static Collection<String> getLifeCycleTypes() {
return Set.of(
Expand All @@ -78,7 +81,8 @@ public static Collection<String> getLifeCycleTypes() {
WORKFLOW_SUSPENDED,
WORKFLOW_RESUMED,
WORKFLOW_FAULTED,
WORKFLOW_CANCELLED);
WORKFLOW_CANCELLED,
WORKFLOW_STATUS_CHANGED);
}

@Override
Expand Down Expand Up @@ -263,6 +267,23 @@ public void onWorkflowFailed(WorkflowFailedEvent event) {
.build());
}

@Override
public void onWorkflowStatusChanged(WorkflowStatusEvent event) {
if (appl(event).isStatusChangePublishingEnabled()) {
publish(
event,
ev ->
builder()
.withData(
cloudEventData(
new WorkflowStatusCEDataEvent(
id(ev), ref(ev), ev.eventDate(), ev.status().toString()),
this::convert))
.withType(WORKFLOW_STATUS_CHANGED)
.build());
}
}

protected byte[] convert(WorkflowStartedCEData data) {
return convertToBytes(data);
}
Expand Down Expand Up @@ -315,10 +336,14 @@ protected byte[] convert(TaskResumedCEData data) {
return convertToBytes(data);
}

protected byte[] convert(WorkflowStatusCEDataEvent data) {
return convertToBytes(data);
}

protected abstract <T> byte[] convertToBytes(T data);

protected <T extends WorkflowEvent> void publish(T ev, Function<T, CloudEvent> ceFunction) {
WorkflowApplication appl = ev.workflowContext().definition().application();
WorkflowApplication appl = appl(ev);
if (appl.isLifeCycleCEPublishingEnabled()) {
publish(appl, ceFunction.apply(ev));
}
Expand All @@ -342,6 +367,10 @@ private static CloudEventBuilder builder() {
.withTime(OffsetDateTime.now());
}

private static WorkflowApplication appl(WorkflowEvent ev) {
return ev.workflowContext().definition().application();
}

private static String id(WorkflowEvent ev) {
return ev.workflowContext().instanceData().id();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.lifecycle.ce;

import java.time.OffsetDateTime;

public record WorkflowStatusCEDataEvent(
String name, WorkflowDefinitionCEData definition, OffsetDateTime updatetAt, String status) {}
Loading