Skip to content

codaxy/conductor-sharp

Repository files navigation

ConductorSharp

A comprehensive .NET client library for Conductor workflow orchestration engine. Features a strongly-typed workflow builder DSL, task handlers, and quality-of-life additions for building robust workflow applications.

NuGet License: MIT

Note: This documentation has been AI generated and human reviewed.

AI Assistant Users: See SKILL.md for a condensed reference guide optimized for AI coding assistants. It provides quick-reference documentation for all task types, configuration options, and common patterns. This file follows the Agent Skills open standard for extending AI assistant capabilities.

Table of Contents

Installation

Core Packages

# API client for Conductor
dotnet add package ConductorSharp.Client

# Workflow engine with builder DSL, task handlers, and worker scheduling
dotnet add package ConductorSharp.Engine

Additional Packages

# Built-in tasks (WaitSeconds, ReadWorkflowTasks, C# Lambda Tasks)
dotnet add package ConductorSharp.Patterns

# Kafka-based task cancellation notifications
dotnet add package ConductorSharp.KafkaCancellationNotifier

# CLI tool for scaffolding task/workflow definitions
dotnet tool install --global ConductorSharp.Toolkit

Quick Start

1. Configure Services

using ConductorSharp.Engine.Extensions;
using Microsoft.Extensions.Hosting;

var builder = Host.CreateApplicationBuilder(args);
builder.Services
    .AddConductorSharp(baseUrl: "http://localhost:8080")
    .AddExecutionManager(
        maxConcurrentWorkers: 10,
        sleepInterval: 500,
        longPollInterval: 100,
        domain: null,
        typeof(Program).Assembly
    )
    .AddPipelines(pipelines =>
    {
        pipelines.AddRequestResponseLogging();
        pipelines.AddValidation();
    });

builder.Services.RegisterWorkflow<MyWorkflow>();

var host = builder.Build();
await host.RunAsync();

2. Define a Task Handler

using ConductorSharp.Engine.Builders.Metadata;
using ConductorSharp.Engine;

public class PrepareEmailRequest : IRequest<PrepareEmailResponse>
{
    public string CustomerName { get; set; }
    public string Address { get; set; }
}

public class PrepareEmailResponse
{
    public string EmailBody { get; set; }
}

[OriginalName("EMAIL_prepare")]
public class PrepareEmailHandler : TaskRequestHandler<PrepareEmailRequest, PrepareEmailResponse>
{
    public override async Task<PrepareEmailResponse> Handle(PrepareEmailRequest request, CancellationToken cancellationToken)
    {
        var body = $"Hello {request.CustomerName} at {request.Address}!";
        return new PrepareEmailResponse { EmailBody = body };
    }
}

3. Define a Workflow

using ConductorSharp.Engine.Builders;
using ConductorSharp.Engine.Builders.Metadata;

public class SendNotificationInput : WorkflowInput<SendNotificationOutput>
{
    public int CustomerId { get; set; }
}

public class SendNotificationOutput : WorkflowOutput
{
    public string EmailBody { get; set; }
}

[OriginalName("NOTIFICATION_send")]
[WorkflowMetadata(OwnerEmail = "team@example.com")]
public class SendNotificationWorkflow : Workflow<SendNotificationWorkflow, SendNotificationInput, SendNotificationOutput>
{
    public SendNotificationWorkflow(
        WorkflowDefinitionBuilder<SendNotificationWorkflow, SendNotificationInput, SendNotificationOutput> builder
    ) : base(builder) { }

    public GetCustomerHandler GetCustomer { get; set; }
    public PrepareEmailHandler PrepareEmail { get; set; }

    public override void BuildDefinition()
    {
        _builder.AddTask(
            wf => wf.GetCustomer,
            wf => new GetCustomerRequest { CustomerId = wf.WorkflowInput.CustomerId }
        );

        _builder.AddTask(
            wf => wf.PrepareEmail,
            wf => new PrepareEmailRequest 
            { 
                CustomerName = wf.GetCustomer.Output.Name,
                Address = wf.GetCustomer.Output.Address 
            }
        );

        _builder.SetOutput(wf => new SendNotificationOutput
        {
            EmailBody = wf.PrepareEmail.Output.EmailBody
        });
    }
}

Core Concepts

Workflow Definition

Workflows are defined by inheriting from Workflow<TWorkflow, TInput, TOutput>:

public class MyWorkflow : Workflow<MyWorkflow, MyWorkflowInput, MyWorkflowOutput>
{
    public MyWorkflow(WorkflowDefinitionBuilder<MyWorkflow, MyWorkflowInput, MyWorkflowOutput> builder) 
        : base(builder) { }

    // Task properties - these become task references in the workflow
    public SomeTaskHandler FirstTask { get; set; }
    public AnotherTaskHandler SecondTask { get; set; }

    public override void BuildDefinition()
    {
        // Add tasks with strongly-typed input expressions
        _builder.AddTask(wf => wf.FirstTask, wf => new SomeTaskRequest { Input = wf.WorkflowInput.SomeValue });
        _builder.AddTask(wf => wf.SecondTask, wf => new AnotherTaskRequest { Input = wf.FirstTask.Output.Result });
        
        // Set workflow output
        _builder.SetOutput(wf => new MyWorkflowOutput { Result = wf.SecondTask.Output.Value });
    }
}

Task Handlers

[OriginalName("MY_TASK_name")]
public class MyTaskHandler : TaskRequestHandler<MyTaskRequest, MyTaskResponse>
{
    public override async Task<MyTaskResponse> Handle(MyTaskRequest request, CancellationToken cancellationToken)
    {
        return new MyTaskResponse { /* ... */ };
    }
}

Input/Output Models

// Workflow I/O
public class MyWorkflowInput : WorkflowInput<MyWorkflowOutput>
{
    public string CustomerId { get; set; }
}

public class MyWorkflowOutput : WorkflowOutput
{
    public string Result { get; set; }
}

// Task I/O
public class MyTaskRequest : IRequest<MyTaskResponse>
{
    [Required]
    public string InputValue { get; set; }
}

public class MyTaskResponse
{
    public string OutputValue { get; set; }
}

Task Input Specification

In Conductor, task inputs in workflows are specified using Conductor expressions with the format: ${SOURCE.input/output.JSONPath}. The SOURCE can be workflow or a task reference name in the workflow definition. input/output refers to the input of the workflow or output of the task. JSONPath is used to traverse the input/output object.

ConductorSharp generates these expressions automatically when writing workflows. Here's an example:

_builder.AddTask(
    wf => wf.PrepareEmail,
    wf => new PrepareEmailRequest
    {
        CustomerName = $"{wf.GetCustomer.Output.FirstName} {wf.GetCustomer.Output.LastName}",
        Address = wf.WorkflowInput.Address
    }
);

This is converted to the following Conductor input parameters specification:

"inputParameters": {
    "customer_name": "${get_customer.output.first_name} ${get_customer.output.last_name}",
    "address": "${workflow.input.address}"
}

Casting

When input/output parameters are of different types, casting can be used:

wf => new PrepareEmailRequest
{
    CustomerName = ((FullName)wf.GetCustomer.Output.Name).FirstName,
    Address = (string)wf.GetCustomer.Output.Address
}

This translates to:

"inputParameters": {
    "customer_name": "${get_customer.output.name.first_name}",
    "address": "${get_customer.output.address}"
}

Array Initialization

Array initialization is supported. Arrays can be typed or dynamic:

wf => new()
{
    Integers = new[] { 1, 2, 3 },
    TestModelList = new List<ArrayTaskInput.TestModel>
    {
        new ArrayTaskInput.TestModel { String = wf.Input.TestValue },
        new ArrayTaskInput.TestModel { String = "List2" }
    },
    Models = new[]
    {
        new ArrayTaskInput.TestModel { String = "Test1" },
        new ArrayTaskInput.TestModel { String = "Test2" }
    },
    Objects = new dynamic[] { new { AnonymousObjProp = "Prop" }, new { Test = "Prop" } }
}

This translates to:

"inputParameters": {
    "integers": [1, 2, 3],
    "test_model_list": [
        {
            "string": "${workflow.input.test_value}"
        },
        {
            "string": "List2"
        }
    ],
    "models": [
        {
            "string": "Test1"
        },
        {
            "string": "Test2"
        }
    ],
    "objects": [
        {
            "anonymous_obj_prop": "Prop"
        },
        {
            "test": "Prop"
        }
    ]
}

Object Initialization

Object initialization is supported, including anonymous objects when initializing sub-properties:

wf => new()
{
    NestedObjects = new TestModel
    {
        Integer = 1,
        String = "test",
        Object = new TestModel
        {
            Integer = 1,
            String = "string",
            Object = new { NestedInput = "1" }
        }
    }
}

This translates to:

"inputParameters": {
    "nested_objects": {
        "integer": 1,
        "string": "test",
        "object": {
            "integer": 1,
            "string": "string",
            "object": {
                "nested_input": "1"
            }
        }
    }
}

Indexing

Dictionary indexing is supported. Indexing using an indexer on arbitrary types is currently not supported:

wf => new()
{
    CustomerName = wf.WorkflowInput.Dictionary["test"].CustomerName,
    Address = wf.WorkflowInput.DoubleDictionary["test"]["address"]
}

This translates to:

"inputParameters": {
    "customer_name": "${workflow.input.dictionary['test'].customer_name}",
    "address": "${workflow.input.double_dictionary['test']['address']}"
}

Workflow Name

You can embed the name of any workflow in task input specification using NamingUtil.NameOf<T>():

wf => new()
{
    Name = $"Workflow name: {NamingUtil.NameOf<StringInterpolation>()}",
    WfName = NamingUtil.NameOf<StringInterpolation>()
}

This translates to:

"inputParameters": {
    "name": "Workflow name: TEST_StringInterpolation",
    "wf_name": "TEST_StringInterpolation"
}

Note: StringInterpolation has an attribute [OriginalName("TEST_StringInterpolation")] applied.

String Concatenation

String concatenation is supported. You can concatenate strings with numbers, input/output parameters, and interpolation strings:

wf => new()
{
    Input = 1
        + "Str_"
        + "2Str_"
        + wf.WorkflowInput.Input
        + $"My input: {wf.WorkflowInput.Input}"
        + NamingUtil.NameOf<StringAddition>()
        + 1
}

This translates to:

"inputParameters": {
    "input": "1Str_2Str_${workflow.input.input}My input: ${workflow.input.input}string_addition1"
}

Note: StringAddition has an attribute [OriginalName("string_addition")] applied.

Metadata Attributes

Attribute Target Description
[OriginalName("NAME")] Class Custom task/workflow name in Conductor
[WorkflowMetadata(...)] Class Workflow metadata (OwnerEmail, OwnerApp, Description, FailureWorkflow)
[Version(n)] Class Version number for sub-workflow references
[TaskDomain("domain")] Class Assign task to specific domain

Note: There is no task equivalent of the WorkflowMetadata attribute. The task metadata is configured when registering the task:

services.RegisterWorkerTask<MyTaskHandler>(options =>
{
    options.OwnerEmail = "team@example.com";
    options.Description = "My task description";
});

Task Types

Simple Task

_builder.AddTask(wf => wf.MySimpleTask, wf => new MySimpleTaskRequest { Input = wf.WorkflowInput.Value });

Sub-Workflow Task

public SubWorkflowTaskModel<ChildWorkflowInput, ChildWorkflowOutput> ChildWorkflow { get; set; }

_builder.AddTask(wf => wf.ChildWorkflow, wf => new ChildWorkflowInput { CustomerId = wf.WorkflowInput.CustomerId });

Switch Task (Conditional Branching)

public SwitchTaskModel SwitchTask { get; set; }
public TaskA TaskInCaseA { get; set; }
public TaskB TaskInCaseB { get; set; }

_builder.AddTask(
    wf => wf.SwitchTask,
    wf => new SwitchTaskInput { SwitchCaseValue = wf.WorkflowInput.Operation },
    new DecisionCases<MyWorkflow>
    {
        ["caseA"] = builder => builder.AddTask(wf => wf.TaskInCaseA, wf => new TaskARequest { }),
        ["caseB"] = builder => builder.AddTask(wf => wf.TaskInCaseB, wf => new TaskBRequest { }),
        DefaultCase = builder => { /* default case tasks */ }
    }
);

Dynamic Task

public DynamicTaskModel<ExpectedInput, ExpectedOutput> DynamicHandler { get; set; }

_builder.AddTask(
    wf => wf.DynamicHandler,
    wf => new DynamicTaskInput<ExpectedInput, ExpectedOutput>
    {
        TaskInput = new ExpectedInput { CustomerId = wf.WorkflowInput.CustomerId },
        TaskToExecute = wf.WorkflowInput.TaskName  // Task name resolved at runtime
    }
);

Dynamic Fork-Join Task

public DynamicForkJoinTaskModel DynamicFork { get; set; }

_builder.AddTask(
    wf => wf.DynamicFork,
    wf => new DynamicForkJoinInput
    {
        DynamicTasks = /* list of tasks */,
        DynamicTasksInput = /* corresponding inputs */
    }
);

Do-While Loop Task

public DoWhileTaskModel DoWhile { get; set; }
public CustomerGetHandler GetCustomer { get; set; }

_builder.AddTask(
    wf => wf.DoWhile,
    wf => new DoWhileInput { Value = wf.WorkflowInput.Loops },
    "$.do_while.iteration < $.value",  // Loop condition
    builder =>
    {
        builder.AddTask(wf => wf.GetCustomer, wf => new CustomerGetRequest { CustomerId = "CUSTOMER-1" });
    }
);

Note: ConductorSharp does not provide a strongly typed output for the DoWhile task, as can be seen from the implementation:

public class DoWhileTaskModel : TaskModel<DoWhileInput, NoOutput>
{
}

Lambda Task (JavaScript)

public class LambdaInput : IRequest<LambdaOutput>
{
    public string Value { get; set; }
}

public class LambdaOutput
{
    public string Something { get; set; }
}

public LambdaTaskModel<LambdaInput, LambdaOutput> LambdaTask { get; set; }


_builder.AddTask(
    wf => wf.LambdaTask,
    wf => new LambdaInput { Value = wf.WorkflowInput.Input },
    script: "return { something: $.Value.toUpperCase() }"  // JavaScript expression
);

For context, in the above parameterized generic class LambdaTaskModel, the LambdaOutput instance is available as Output.Result.Something. This is less than ideal, but is the current way of things. Reasoning can be seen in the implementation:

public abstract class LambdaOutputModel<O>
{
  public O Result { get; set; }
}

public abstract class LambdaTaskModel<I, O> where I : IRequest<O>
{
  public I Input { get; set; }

  public LambdaOutputModel<O> Output { get; set; }
}

Wait Task

public WaitTaskModel WaitTask { get; set; }

_builder.AddTask(
    wf => wf.WaitTask,
    wf => new WaitTaskInput { Duration = "1h" }  // or Until = "2024-01-01T00:00:00Z"
);

Terminate Task

public TerminateTaskModel TerminateTask { get; set; }

_builder.AddTask(
    wf => wf.TerminateTask,
    wf => new TerminateTaskInput
    {
        TerminationStatus = "COMPLETED",
        WorkflowOutput = new { Result = "Done" }
    }
);

Human Task

public HumanTaskModel<HumanTaskOutput> HumanTask { get; set; }

_builder.AddTask(
    wf => wf.HumanTask,
    wf => new HumanTaskInput<HumanTaskOutput> { /* ... */ }
);

JSON JQ Transform Task

public JsonJqTransformTaskModel<JqInput, JqOutput> TransformTask { get; set; }

_builder.AddTask(
    wf => wf.TransformTask,
    wf => new JqInput { QueryExpression = ".data | map(.name)", Data = wf.WorkflowInput.Items }
);

PassThrough Task (Raw Definition)

For tasks not covered by the builder:

_builder.AddTasks(new WorkflowTask
{
    Name = "CUSTOM_task",
    TaskReferenceName = "custom_ref",
    Type = "CUSTOM",
    InputParameters = new Dictionary<string, object> { ["key"] = "value" }
});

Optional Tasks

Mark tasks as optional (workflow continues on failure):

_builder.AddTask(wf => wf.OptionalTask, wf => new OptionalTaskRequest { }).AsOptional();

Configuration

Execution Manager

services
    .AddConductorSharp(baseUrl: "http://localhost:8080")
    .AddExecutionManager(
        maxConcurrentWorkers: 10,    // Max concurrent task executions
        sleepInterval: 500,          // Base polling interval (ms)
        longPollInterval: 100,       // Long poll timeout (ms)
        domain: "my-domain",         // Optional worker domain
        typeof(Program).Assembly     // Assemblies containing handlers
    );

Multiple Conductor Instances

services
    .AddConductorSharp(baseUrl: "http://primary-conductor:8080")
    .AddAlternateClient(
        baseUrl: "http://secondary-conductor:8080",
        key: "Secondary",
        apiPath: "api",
        ignoreInvalidCertificate: false
    );

// Usage with keyed services
public class MyController(
    IWorkflowService primaryService,
    [FromKeyedServices("Secondary")] IWorkflowService secondaryService
) { }

Poll Timing Strategies

// Default: Inverse exponential backoff
.AddExecutionManager(...)

// Constant interval polling
.AddExecutionManager(...)
.UseConstantPollTimingStrategy()

Worker Task Registration

Register standalone tasks without workflow:

services.RegisterWorkerTask<MyTaskHandler>(options =>
{
    options.OwnerEmail = "team@example.com";
    options.Description = "My task description";
});

Pipeline Behaviors

Behaviors form a middleware pipeline for task execution (powered by MediatR):

.AddPipelines(pipelines =>
{
    // Add custom behavior (runs first)
    pipelines.AddCustomBehavior(typeof(MyCustomBehavior<,>));
    
    // Built-in behaviors
    pipelines.AddExecutionTaskTracking();  // Track task execution metrics
    pipelines.AddContextLogging();         // Add context to log scopes
    pipelines.AddRequestResponseLogging(); // Log requests/responses
    pipelines.AddValidation();             // Validate using DataAnnotations
})

Custom Behavior Example

public class TimingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
{
    public async Task<TResponse> Handle(
        TRequest request, 
        RequestHandlerDelegate<TResponse> next, 
        CancellationToken cancellationToken)
    {
        var sw = Stopwatch.StartNew();
        var response = await next();
        Console.WriteLine($"Execution took {sw.ElapsedMilliseconds}ms");
        return response;
    }
}

Health Checks

ASP.NET Core Integration

// In Program.cs
builder.Services.AddHealthChecks()
    .AddCheck<ConductorSharpHealthCheck>("conductor-worker");

// Configure health service
.AddExecutionManager(...)
.SetHealthCheckService<FileHealthService>()  // or InMemoryHealthService

Available Health Services

Service Description
InMemoryHealthService In-memory health state (default)
FileHealthService Persists health to CONDUCTORSHARP_HEALTH.json file

Execution Context

Access workflow/task metadata in handlers:

public class MyHandler : TaskRequestHandler<MyRequest, MyResponse>
{
    private readonly ConductorSharpExecutionContext _context;

    public MyHandler(ConductorSharpExecutionContext context)
    {
        _context = context;
    }

    public override async Task<MyResponse> Handle(MyRequest request, CancellationToken cancellationToken)
    {
        var workflowId = _context.WorkflowId;
        var taskId = _context.TaskId;
        var correlationId = _context.CorrelationId;
        // ...
    }
}

Patterns Package

Additional built-in tasks and utilities:

.AddExecutionManager(...)
.AddConductorSharpPatterns()      // Adds WaitSeconds, ReadWorkflowTasks
.AddCSharpLambdaTasks()           // Adds C# lambda task support

WaitSeconds Task

public WaitSeconds WaitTask { get; set; }

_builder.AddTask(wf => wf.WaitTask, wf => new WaitSecondsRequest { Seconds = 30 });

ReadWorkflowTasks Task

Read task data from another workflow:

public ReadWorkflowTasks ReadTasks { get; set; }

_builder.AddTask(
    wf => wf.ReadTasks,
    wf => new ReadWorkflowTasksInput 
    { 
        WorkflowId = wf.WorkflowInput.TargetWorkflowId,
        TaskNames = "task1,task2"  // Comma-separated reference names
    }
);

C# Lambda Tasks

Execute C# code inline in workflows:

public CSharpLambdaTaskModel<LambdaInput, LambdaOutput> InlineLambda { get; set; }

_builder.AddTask(
    wf => wf.InlineLambda,
    wf => new LambdaInput { Value = wf.WorkflowInput.Input },
    input => new LambdaOutput { Result = input.Value.ToUpperInvariant() }
);

Kafka Cancellation Notifier

Handle task cancellation via Kafka events:

.AddExecutionManager(...)
.AddKafkaCancellationNotifier(
    kafkaBootstrapServers: "localhost:9092",
    topicName: "conductor.status.task",
    groupId: "my-worker-group",
    createTopicOnStartup: true
)

appsettings.json:

{
  "Conductor": {
    "BaseUrl": "http://localhost:8080",
    "MaxConcurrentWorkers": 10,
    "SleepInterval": 500,
    "LongPollInterval": 100,
    "KafkaCancellationNotifier": {
      "BootstrapServers": "localhost:9092",
      "GroupId": "my-worker",
      "TopicName": "conductor.status.task"
    }
  }
}

Toolkit CLI

Generate C# models from existing Conductor task/workflow definitions.

Installation

dotnet tool install --global ConductorSharp.Toolkit --version 3.0.1-beta3

Configuration

Create conductorsharp.yaml:

baseUrl: http://localhost:8080
apiPath: api
namespace: MyApp.Generated
destination: ./Generated

Usage

# Scaffold all tasks and workflows
dotnet-conductorsharp

# Use custom config file
dotnet-conductorsharp -f myconfig.yaml

# Filter by name
dotnet-conductorsharp -n CUSTOMER_get -n ORDER_create

# Filter by owner email
dotnet-conductorsharp -e team@example.com

# Filter by owner app
dotnet-conductorsharp -a my-application

# Skip tasks or workflows
dotnet-conductorsharp --no-tasks
dotnet-conductorsharp --no-workflows

# Preview without generating files
dotnet-conductorsharp --dry-run

Command Options

Option Description
-f, --file Configuration file path (default: conductorsharp.yaml)
-n, --name Filter by task/workflow name (can specify multiple)
-a, --app Filter by owner app
-e, --email Filter by owner email
--no-tasks Skip task scaffolding
--no-workflows Skip workflow scaffolding
--dry-run Preview what would be generated

API Services

Inject these services to interact with Conductor programmatically:

Service Description
IWorkflowService Start, pause, resume, terminate workflows
ITaskService Update tasks, get logs, poll for tasks
IMetadataService Manage workflow/task definitions
IAdminService Admin operations, queue management
IEventService Event handlers
IQueueAdminService Queue administration
IWorkflowBulkService Bulk workflow operations
IHealthService Conductor server health
IExternalPayloadService External payload storage

Example Usage

public class WorkflowController : ControllerBase
{
    private readonly IWorkflowService _workflowService;
    private readonly IMetadataService _metadataService;

    public WorkflowController(IWorkflowService workflowService, IMetadataService metadataService)
    {
        _workflowService = workflowService;
        _metadataService = metadataService;
    }

    [HttpPost("start")]
    public async Task<string> StartWorkflow([FromBody] StartRequest request)
    {
        return await _workflowService.StartAsync(new StartWorkflowRequest
        {
            Name = "MY_workflow",
            Version = 1,
            Input = new Dictionary<string, object> { ["customerId"] = request.CustomerId }
        });
    }

    [HttpGet("definitions")]
    public async Task<ICollection<WorkflowDef>> GetDefinitions()
    {
        return await _metadataService.ListWorkflowsAsync();
    }
}

Running the Examples

Prerequisites

  1. Clone and run Conductor:

    git clone https://github.com/conductor-oss/conductor.git
    cd conductor
    docker-compose up -d
  2. Conductor UI available at: http://localhost:5000 (may vary by version)

Starting the Examples

The solution includes three example projects:

Project Description
ConductorSharp.Definitions Console app with workflow definitions
ConductorSharp.ApiEnabled Web API with workflow execution endpoints
ConductorSharp.NoApi Console app with Kafka cancellation support
# Run with Docker Compose
docker-compose up

# Or run individual projects
cd examples/ConductorSharp.Definitions
dotnet run

General Notes

Events Not Supported

The Conductor events are currently not supported by the library.

License

MIT License - see LICENSE for details.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 8