A comprehensive .NET client library for Conductor workflow orchestration engine. Features a strongly-typed workflow builder DSL, task handlers, and quality-of-life additions for building robust workflow applications.
Note: This documentation has been AI generated and human reviewed.
AI Assistant Users: See SKILL.md for a condensed reference guide optimized for AI coding assistants. It provides quick-reference documentation for all task types, configuration options, and common patterns. This file follows the Agent Skills open standard for extending AI assistant capabilities.
- Installation
- Quick Start
- Core Concepts
- Task Types
- Configuration
- Pipeline Behaviors
- Health Checks
- Patterns Package
- Kafka Cancellation Notifier
- Toolkit CLI
- API Services
- Running the Examples
- General Notes
# API client for Conductor
dotnet add package ConductorSharp.Client
# Workflow engine with builder DSL, task handlers, and worker scheduling
dotnet add package ConductorSharp.Engine# Built-in tasks (WaitSeconds, ReadWorkflowTasks, C# Lambda Tasks)
dotnet add package ConductorSharp.Patterns
# Kafka-based task cancellation notifications
dotnet add package ConductorSharp.KafkaCancellationNotifier
# CLI tool for scaffolding task/workflow definitions
dotnet tool install --global ConductorSharp.Toolkitusing ConductorSharp.Engine.Extensions;
using Microsoft.Extensions.Hosting;
var builder = Host.CreateApplicationBuilder(args);
builder.Services
.AddConductorSharp(baseUrl: "http://localhost:8080")
.AddExecutionManager(
maxConcurrentWorkers: 10,
sleepInterval: 500,
longPollInterval: 100,
domain: null,
typeof(Program).Assembly
)
.AddPipelines(pipelines =>
{
pipelines.AddRequestResponseLogging();
pipelines.AddValidation();
});
builder.Services.RegisterWorkflow<MyWorkflow>();
var host = builder.Build();
await host.RunAsync();using ConductorSharp.Engine.Builders.Metadata;
using ConductorSharp.Engine;
public class PrepareEmailRequest : IRequest<PrepareEmailResponse>
{
public string CustomerName { get; set; }
public string Address { get; set; }
}
public class PrepareEmailResponse
{
public string EmailBody { get; set; }
}
[OriginalName("EMAIL_prepare")]
public class PrepareEmailHandler : TaskRequestHandler<PrepareEmailRequest, PrepareEmailResponse>
{
public override async Task<PrepareEmailResponse> Handle(PrepareEmailRequest request, CancellationToken cancellationToken)
{
var body = $"Hello {request.CustomerName} at {request.Address}!";
return new PrepareEmailResponse { EmailBody = body };
}
}using ConductorSharp.Engine.Builders;
using ConductorSharp.Engine.Builders.Metadata;
public class SendNotificationInput : WorkflowInput<SendNotificationOutput>
{
public int CustomerId { get; set; }
}
public class SendNotificationOutput : WorkflowOutput
{
public string EmailBody { get; set; }
}
[OriginalName("NOTIFICATION_send")]
[WorkflowMetadata(OwnerEmail = "team@example.com")]
public class SendNotificationWorkflow : Workflow<SendNotificationWorkflow, SendNotificationInput, SendNotificationOutput>
{
public SendNotificationWorkflow(
WorkflowDefinitionBuilder<SendNotificationWorkflow, SendNotificationInput, SendNotificationOutput> builder
) : base(builder) { }
public GetCustomerHandler GetCustomer { get; set; }
public PrepareEmailHandler PrepareEmail { get; set; }
public override void BuildDefinition()
{
_builder.AddTask(
wf => wf.GetCustomer,
wf => new GetCustomerRequest { CustomerId = wf.WorkflowInput.CustomerId }
);
_builder.AddTask(
wf => wf.PrepareEmail,
wf => new PrepareEmailRequest
{
CustomerName = wf.GetCustomer.Output.Name,
Address = wf.GetCustomer.Output.Address
}
);
_builder.SetOutput(wf => new SendNotificationOutput
{
EmailBody = wf.PrepareEmail.Output.EmailBody
});
}
}Workflows are defined by inheriting from Workflow<TWorkflow, TInput, TOutput>:
public class MyWorkflow : Workflow<MyWorkflow, MyWorkflowInput, MyWorkflowOutput>
{
public MyWorkflow(WorkflowDefinitionBuilder<MyWorkflow, MyWorkflowInput, MyWorkflowOutput> builder)
: base(builder) { }
// Task properties - these become task references in the workflow
public SomeTaskHandler FirstTask { get; set; }
public AnotherTaskHandler SecondTask { get; set; }
public override void BuildDefinition()
{
// Add tasks with strongly-typed input expressions
_builder.AddTask(wf => wf.FirstTask, wf => new SomeTaskRequest { Input = wf.WorkflowInput.SomeValue });
_builder.AddTask(wf => wf.SecondTask, wf => new AnotherTaskRequest { Input = wf.FirstTask.Output.Result });
// Set workflow output
_builder.SetOutput(wf => new MyWorkflowOutput { Result = wf.SecondTask.Output.Value });
}
}[OriginalName("MY_TASK_name")]
public class MyTaskHandler : TaskRequestHandler<MyTaskRequest, MyTaskResponse>
{
public override async Task<MyTaskResponse> Handle(MyTaskRequest request, CancellationToken cancellationToken)
{
return new MyTaskResponse { /* ... */ };
}
}// Workflow I/O
public class MyWorkflowInput : WorkflowInput<MyWorkflowOutput>
{
public string CustomerId { get; set; }
}
public class MyWorkflowOutput : WorkflowOutput
{
public string Result { get; set; }
}
// Task I/O
public class MyTaskRequest : IRequest<MyTaskResponse>
{
[Required]
public string InputValue { get; set; }
}
public class MyTaskResponse
{
public string OutputValue { get; set; }
}In Conductor, task inputs in workflows are specified using Conductor expressions with the format: ${SOURCE.input/output.JSONPath}. The SOURCE can be workflow or a task reference name in the workflow definition. input/output refers to the input of the workflow or output of the task. JSONPath is used to traverse the input/output object.
ConductorSharp generates these expressions automatically when writing workflows. Here's an example:
_builder.AddTask(
wf => wf.PrepareEmail,
wf => new PrepareEmailRequest
{
CustomerName = $"{wf.GetCustomer.Output.FirstName} {wf.GetCustomer.Output.LastName}",
Address = wf.WorkflowInput.Address
}
);This is converted to the following Conductor input parameters specification:
"inputParameters": {
"customer_name": "${get_customer.output.first_name} ${get_customer.output.last_name}",
"address": "${workflow.input.address}"
}When input/output parameters are of different types, casting can be used:
wf => new PrepareEmailRequest
{
CustomerName = ((FullName)wf.GetCustomer.Output.Name).FirstName,
Address = (string)wf.GetCustomer.Output.Address
}This translates to:
"inputParameters": {
"customer_name": "${get_customer.output.name.first_name}",
"address": "${get_customer.output.address}"
}Array initialization is supported. Arrays can be typed or dynamic:
wf => new()
{
Integers = new[] { 1, 2, 3 },
TestModelList = new List<ArrayTaskInput.TestModel>
{
new ArrayTaskInput.TestModel { String = wf.Input.TestValue },
new ArrayTaskInput.TestModel { String = "List2" }
},
Models = new[]
{
new ArrayTaskInput.TestModel { String = "Test1" },
new ArrayTaskInput.TestModel { String = "Test2" }
},
Objects = new dynamic[] { new { AnonymousObjProp = "Prop" }, new { Test = "Prop" } }
}This translates to:
"inputParameters": {
"integers": [1, 2, 3],
"test_model_list": [
{
"string": "${workflow.input.test_value}"
},
{
"string": "List2"
}
],
"models": [
{
"string": "Test1"
},
{
"string": "Test2"
}
],
"objects": [
{
"anonymous_obj_prop": "Prop"
},
{
"test": "Prop"
}
]
}Object initialization is supported, including anonymous objects when initializing sub-properties:
wf => new()
{
NestedObjects = new TestModel
{
Integer = 1,
String = "test",
Object = new TestModel
{
Integer = 1,
String = "string",
Object = new { NestedInput = "1" }
}
}
}This translates to:
"inputParameters": {
"nested_objects": {
"integer": 1,
"string": "test",
"object": {
"integer": 1,
"string": "string",
"object": {
"nested_input": "1"
}
}
}
}Dictionary indexing is supported. Indexing using an indexer on arbitrary types is currently not supported:
wf => new()
{
CustomerName = wf.WorkflowInput.Dictionary["test"].CustomerName,
Address = wf.WorkflowInput.DoubleDictionary["test"]["address"]
}This translates to:
"inputParameters": {
"customer_name": "${workflow.input.dictionary['test'].customer_name}",
"address": "${workflow.input.double_dictionary['test']['address']}"
}You can embed the name of any workflow in task input specification using NamingUtil.NameOf<T>():
wf => new()
{
Name = $"Workflow name: {NamingUtil.NameOf<StringInterpolation>()}",
WfName = NamingUtil.NameOf<StringInterpolation>()
}This translates to:
"inputParameters": {
"name": "Workflow name: TEST_StringInterpolation",
"wf_name": "TEST_StringInterpolation"
}Note: StringInterpolation has an attribute [OriginalName("TEST_StringInterpolation")] applied.
String concatenation is supported. You can concatenate strings with numbers, input/output parameters, and interpolation strings:
wf => new()
{
Input = 1
+ "Str_"
+ "2Str_"
+ wf.WorkflowInput.Input
+ $"My input: {wf.WorkflowInput.Input}"
+ NamingUtil.NameOf<StringAddition>()
+ 1
}This translates to:
"inputParameters": {
"input": "1Str_2Str_${workflow.input.input}My input: ${workflow.input.input}string_addition1"
}Note: StringAddition has an attribute [OriginalName("string_addition")] applied.
| Attribute | Target | Description |
|---|---|---|
[OriginalName("NAME")] |
Class | Custom task/workflow name in Conductor |
[WorkflowMetadata(...)] |
Class | Workflow metadata (OwnerEmail, OwnerApp, Description, FailureWorkflow) |
[Version(n)] |
Class | Version number for sub-workflow references |
[TaskDomain("domain")] |
Class | Assign task to specific domain |
Note: There is no task equivalent of the WorkflowMetadata attribute. The task metadata is configured when registering the task:
services.RegisterWorkerTask<MyTaskHandler>(options =>
{
options.OwnerEmail = "team@example.com";
options.Description = "My task description";
});_builder.AddTask(wf => wf.MySimpleTask, wf => new MySimpleTaskRequest { Input = wf.WorkflowInput.Value });public SubWorkflowTaskModel<ChildWorkflowInput, ChildWorkflowOutput> ChildWorkflow { get; set; }
_builder.AddTask(wf => wf.ChildWorkflow, wf => new ChildWorkflowInput { CustomerId = wf.WorkflowInput.CustomerId });public SwitchTaskModel SwitchTask { get; set; }
public TaskA TaskInCaseA { get; set; }
public TaskB TaskInCaseB { get; set; }
_builder.AddTask(
wf => wf.SwitchTask,
wf => new SwitchTaskInput { SwitchCaseValue = wf.WorkflowInput.Operation },
new DecisionCases<MyWorkflow>
{
["caseA"] = builder => builder.AddTask(wf => wf.TaskInCaseA, wf => new TaskARequest { }),
["caseB"] = builder => builder.AddTask(wf => wf.TaskInCaseB, wf => new TaskBRequest { }),
DefaultCase = builder => { /* default case tasks */ }
}
);public DynamicTaskModel<ExpectedInput, ExpectedOutput> DynamicHandler { get; set; }
_builder.AddTask(
wf => wf.DynamicHandler,
wf => new DynamicTaskInput<ExpectedInput, ExpectedOutput>
{
TaskInput = new ExpectedInput { CustomerId = wf.WorkflowInput.CustomerId },
TaskToExecute = wf.WorkflowInput.TaskName // Task name resolved at runtime
}
);public DynamicForkJoinTaskModel DynamicFork { get; set; }
_builder.AddTask(
wf => wf.DynamicFork,
wf => new DynamicForkJoinInput
{
DynamicTasks = /* list of tasks */,
DynamicTasksInput = /* corresponding inputs */
}
);public DoWhileTaskModel DoWhile { get; set; }
public CustomerGetHandler GetCustomer { get; set; }
_builder.AddTask(
wf => wf.DoWhile,
wf => new DoWhileInput { Value = wf.WorkflowInput.Loops },
"$.do_while.iteration < $.value", // Loop condition
builder =>
{
builder.AddTask(wf => wf.GetCustomer, wf => new CustomerGetRequest { CustomerId = "CUSTOMER-1" });
}
);Note: ConductorSharp does not provide a strongly typed output for the DoWhile task, as can be seen from the implementation:
public class DoWhileTaskModel : TaskModel<DoWhileInput, NoOutput>
{
}public class LambdaInput : IRequest<LambdaOutput>
{
public string Value { get; set; }
}
public class LambdaOutput
{
public string Something { get; set; }
}
public LambdaTaskModel<LambdaInput, LambdaOutput> LambdaTask { get; set; }
_builder.AddTask(
wf => wf.LambdaTask,
wf => new LambdaInput { Value = wf.WorkflowInput.Input },
script: "return { something: $.Value.toUpperCase() }" // JavaScript expression
);For context, in the above parameterized generic class LambdaTaskModel, the LambdaOutput instance is available as Output.Result.Something. This is less than ideal, but is the current way of things. Reasoning can be seen in the implementation:
public abstract class LambdaOutputModel<O>
{
public O Result { get; set; }
}
public abstract class LambdaTaskModel<I, O> where I : IRequest<O>
{
public I Input { get; set; }
public LambdaOutputModel<O> Output { get; set; }
}public WaitTaskModel WaitTask { get; set; }
_builder.AddTask(
wf => wf.WaitTask,
wf => new WaitTaskInput { Duration = "1h" } // or Until = "2024-01-01T00:00:00Z"
);public TerminateTaskModel TerminateTask { get; set; }
_builder.AddTask(
wf => wf.TerminateTask,
wf => new TerminateTaskInput
{
TerminationStatus = "COMPLETED",
WorkflowOutput = new { Result = "Done" }
}
);public HumanTaskModel<HumanTaskOutput> HumanTask { get; set; }
_builder.AddTask(
wf => wf.HumanTask,
wf => new HumanTaskInput<HumanTaskOutput> { /* ... */ }
);public JsonJqTransformTaskModel<JqInput, JqOutput> TransformTask { get; set; }
_builder.AddTask(
wf => wf.TransformTask,
wf => new JqInput { QueryExpression = ".data | map(.name)", Data = wf.WorkflowInput.Items }
);For tasks not covered by the builder:
_builder.AddTasks(new WorkflowTask
{
Name = "CUSTOM_task",
TaskReferenceName = "custom_ref",
Type = "CUSTOM",
InputParameters = new Dictionary<string, object> { ["key"] = "value" }
});Mark tasks as optional (workflow continues on failure):
_builder.AddTask(wf => wf.OptionalTask, wf => new OptionalTaskRequest { }).AsOptional();services
.AddConductorSharp(baseUrl: "http://localhost:8080")
.AddExecutionManager(
maxConcurrentWorkers: 10, // Max concurrent task executions
sleepInterval: 500, // Base polling interval (ms)
longPollInterval: 100, // Long poll timeout (ms)
domain: "my-domain", // Optional worker domain
typeof(Program).Assembly // Assemblies containing handlers
);services
.AddConductorSharp(baseUrl: "http://primary-conductor:8080")
.AddAlternateClient(
baseUrl: "http://secondary-conductor:8080",
key: "Secondary",
apiPath: "api",
ignoreInvalidCertificate: false
);
// Usage with keyed services
public class MyController(
IWorkflowService primaryService,
[FromKeyedServices("Secondary")] IWorkflowService secondaryService
) { }// Default: Inverse exponential backoff
.AddExecutionManager(...)
// Constant interval polling
.AddExecutionManager(...)
.UseConstantPollTimingStrategy()Register standalone tasks without workflow:
services.RegisterWorkerTask<MyTaskHandler>(options =>
{
options.OwnerEmail = "team@example.com";
options.Description = "My task description";
});Behaviors form a middleware pipeline for task execution (powered by MediatR):
.AddPipelines(pipelines =>
{
// Add custom behavior (runs first)
pipelines.AddCustomBehavior(typeof(MyCustomBehavior<,>));
// Built-in behaviors
pipelines.AddExecutionTaskTracking(); // Track task execution metrics
pipelines.AddContextLogging(); // Add context to log scopes
pipelines.AddRequestResponseLogging(); // Log requests/responses
pipelines.AddValidation(); // Validate using DataAnnotations
})public class TimingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
{
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken cancellationToken)
{
var sw = Stopwatch.StartNew();
var response = await next();
Console.WriteLine($"Execution took {sw.ElapsedMilliseconds}ms");
return response;
}
}// In Program.cs
builder.Services.AddHealthChecks()
.AddCheck<ConductorSharpHealthCheck>("conductor-worker");
// Configure health service
.AddExecutionManager(...)
.SetHealthCheckService<FileHealthService>() // or InMemoryHealthService| Service | Description |
|---|---|
InMemoryHealthService |
In-memory health state (default) |
FileHealthService |
Persists health to CONDUCTORSHARP_HEALTH.json file |
Access workflow/task metadata in handlers:
public class MyHandler : TaskRequestHandler<MyRequest, MyResponse>
{
private readonly ConductorSharpExecutionContext _context;
public MyHandler(ConductorSharpExecutionContext context)
{
_context = context;
}
public override async Task<MyResponse> Handle(MyRequest request, CancellationToken cancellationToken)
{
var workflowId = _context.WorkflowId;
var taskId = _context.TaskId;
var correlationId = _context.CorrelationId;
// ...
}
}Additional built-in tasks and utilities:
.AddExecutionManager(...)
.AddConductorSharpPatterns() // Adds WaitSeconds, ReadWorkflowTasks
.AddCSharpLambdaTasks() // Adds C# lambda task supportpublic WaitSeconds WaitTask { get; set; }
_builder.AddTask(wf => wf.WaitTask, wf => new WaitSecondsRequest { Seconds = 30 });Read task data from another workflow:
public ReadWorkflowTasks ReadTasks { get; set; }
_builder.AddTask(
wf => wf.ReadTasks,
wf => new ReadWorkflowTasksInput
{
WorkflowId = wf.WorkflowInput.TargetWorkflowId,
TaskNames = "task1,task2" // Comma-separated reference names
}
);Execute C# code inline in workflows:
public CSharpLambdaTaskModel<LambdaInput, LambdaOutput> InlineLambda { get; set; }
_builder.AddTask(
wf => wf.InlineLambda,
wf => new LambdaInput { Value = wf.WorkflowInput.Input },
input => new LambdaOutput { Result = input.Value.ToUpperInvariant() }
);Handle task cancellation via Kafka events:
.AddExecutionManager(...)
.AddKafkaCancellationNotifier(
kafkaBootstrapServers: "localhost:9092",
topicName: "conductor.status.task",
groupId: "my-worker-group",
createTopicOnStartup: true
)appsettings.json:
{
"Conductor": {
"BaseUrl": "http://localhost:8080",
"MaxConcurrentWorkers": 10,
"SleepInterval": 500,
"LongPollInterval": 100,
"KafkaCancellationNotifier": {
"BootstrapServers": "localhost:9092",
"GroupId": "my-worker",
"TopicName": "conductor.status.task"
}
}
}Generate C# models from existing Conductor task/workflow definitions.
dotnet tool install --global ConductorSharp.Toolkit --version 3.0.1-beta3Create conductorsharp.yaml:
baseUrl: http://localhost:8080
apiPath: api
namespace: MyApp.Generated
destination: ./Generated# Scaffold all tasks and workflows
dotnet-conductorsharp
# Use custom config file
dotnet-conductorsharp -f myconfig.yaml
# Filter by name
dotnet-conductorsharp -n CUSTOMER_get -n ORDER_create
# Filter by owner email
dotnet-conductorsharp -e team@example.com
# Filter by owner app
dotnet-conductorsharp -a my-application
# Skip tasks or workflows
dotnet-conductorsharp --no-tasks
dotnet-conductorsharp --no-workflows
# Preview without generating files
dotnet-conductorsharp --dry-run| Option | Description |
|---|---|
-f, --file |
Configuration file path (default: conductorsharp.yaml) |
-n, --name |
Filter by task/workflow name (can specify multiple) |
-a, --app |
Filter by owner app |
-e, --email |
Filter by owner email |
--no-tasks |
Skip task scaffolding |
--no-workflows |
Skip workflow scaffolding |
--dry-run |
Preview what would be generated |
Inject these services to interact with Conductor programmatically:
| Service | Description |
|---|---|
IWorkflowService |
Start, pause, resume, terminate workflows |
ITaskService |
Update tasks, get logs, poll for tasks |
IMetadataService |
Manage workflow/task definitions |
IAdminService |
Admin operations, queue management |
IEventService |
Event handlers |
IQueueAdminService |
Queue administration |
IWorkflowBulkService |
Bulk workflow operations |
IHealthService |
Conductor server health |
IExternalPayloadService |
External payload storage |
public class WorkflowController : ControllerBase
{
private readonly IWorkflowService _workflowService;
private readonly IMetadataService _metadataService;
public WorkflowController(IWorkflowService workflowService, IMetadataService metadataService)
{
_workflowService = workflowService;
_metadataService = metadataService;
}
[HttpPost("start")]
public async Task<string> StartWorkflow([FromBody] StartRequest request)
{
return await _workflowService.StartAsync(new StartWorkflowRequest
{
Name = "MY_workflow",
Version = 1,
Input = new Dictionary<string, object> { ["customerId"] = request.CustomerId }
});
}
[HttpGet("definitions")]
public async Task<ICollection<WorkflowDef>> GetDefinitions()
{
return await _metadataService.ListWorkflowsAsync();
}
}-
Clone and run Conductor:
git clone https://github.com/conductor-oss/conductor.git cd conductor docker-compose up -d -
Conductor UI available at: http://localhost:5000 (may vary by version)
The solution includes three example projects:
| Project | Description |
|---|---|
ConductorSharp.Definitions |
Console app with workflow definitions |
ConductorSharp.ApiEnabled |
Web API with workflow execution endpoints |
ConductorSharp.NoApi |
Console app with Kafka cancellation support |
# Run with Docker Compose
docker-compose up
# Or run individual projects
cd examples/ConductorSharp.Definitions
dotnet runThe Conductor events are currently not supported by the library.
MIT License - see LICENSE for details.
Contributions are welcome! Please feel free to submit a Pull Request.