This is the workflow prototype. The project would like to provide a centralized workflow management.
User can compose and monitor workflow execution. To do this, user need a set of @WorkflowOperations,
and (TODO bowei) a repository storing workflow dependency graph, and traverse state.
This is a maven project. Do it maven way.
- In application starting phase, in the logic of
WorkflowModule,OperationCompletionInterceptorintercepts completions of invocations of all the methods annotated byWorkflowOperation. Put the returned objects, methodCompletionDTOs, into a queue,FakeKinesis. In this phase, instances and bindings btw implementation and interfaces will be created. - When application is started, by supplying a list of
WorkflowArrangement, anOperationWeaveris created. The weaver stores: a mapping between operation names (java method names) and Entry<impl object instance,java.lang.reflect.Method>; a mapping between operation names (java method names) andGraphNode. Map<operation name,GraphNode> should match the corresponding Map<operation name, Entry<impl object instance,java.lang.reflect.Method>> Map sizes are the same. Keys are the same. - A list of
WorkflowArrangements is an list of edges stating that "If operation1 is completed, operation3 can be started if all requirements are met" "If operation2 is completed, operation3 can be started if all requirements are met". They are just directed edges in an acyclic graph. An acyclic graph will be created based on these workflow arrangements in theOperationWeaver - Use
OperationWeaverto create aWorkflowTraverse. - Invoke all methods whose indegrees are zero. Their returned object will be put into the queue,
by the
OperationCompletionInterceptor. - keep checking
workflowTraverse.isTraverseFinished(), if it has not finished, take anOperationCompletionMessagefrom the queue, and proceed the traverse by callingworkflowWeaver.proceedTraverse(workflowTraverse, operationCompletionMessage, workerThreadPool).
public class Demo {
public static void main(String[] args) {
// we should provide one thread pool for our workflow
ExecutorService workerThreadPool = Executors.newFixedThreadPool(3);
// we provide one timed trigger to simulate some event happened at some time
ScheduledExecutorService timedTrigger = Executors.newScheduledThreadPool(1);
try {
// create our weaver, weaver fetched workflow arrangement from server
OperationWeaver workflowWeaver =
new OperationWeaverImpl(mockFetchWorkflowArrangementFromServer());
System.out.println("finished initializing workflowWeaver. Program starts...\n\n");
// Suppose tenant will upload file in 2 seconds.
timedTrigger.schedule(() -> {
ETLOperations etlOperations = workflowWeaver.getInjector()
.getInstance(ETLOperations.class);
etlOperations.uploadFileOperation("demo-file.csv");
}, 2, TimeUnit.SECONDS);
// create one traverse for tenant : "testTenant", this is a traverse of 2017-02-19
WorkflowTraverse workflowTraverse =
workflowWeaver.createTraverse("testTenant", LocalDate.of(2017, 2, 19));
// let me prepare the message queue I am going to read
FakeKinesis fakeKinesis = workflowWeaver.getInjector()
.getInstance(FakeKinesis.class);
// start consuming message queue.
while (!workflowTraverse.isTraverseFinished()) {
// blocking take, if no message in queue, following line of code will block
OperationCompletionMessage operationCompletionMessage = fakeKinesis.take();
// now we have the message, we proceed traverse using this completion message
workflowWeaver.proceedTraverse(workflowTraverse,
operationCompletionMessage,
workerThreadPool);
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
// we have finished traverse, shutdown thread pool
workerThreadPool.shutdown();
timedTrigger.shutdownNow();
}
}
}TODO
TODO: Write history
TODO: Write credits
TODO: Write license