From 111b209ac9eb135c5a03c11b80e0b92dce01f62a Mon Sep 17 00:00:00 2001 From: Yi Date: Thu, 12 Feb 2026 14:04:02 -0800 Subject: [PATCH 01/18] baseline --- src/DurableTask.SqlServer/Scripts/logic.sql | 64 ++-- .../Scripts/schema-1.3.0.sql | 49 +++ .../SqlOrchestrationService.cs | 23 ++ .../SqlTypes/OrchestrationEventSqlType.cs | 3 + .../SqlTypes/TaskEventSqlType.cs | 3 + src/DurableTask.SqlServer/SqlUtils.cs | 55 ++- .../DurableTask.SqlServer.Tests.csproj | 2 +- .../Integration/DatabaseManagement.cs | 7 + .../Integration/TagTests.cs | 339 ++++++++++++++++++ .../Unit/SqlUtilsTagTests.cs | 220 ++++++++++++ .../Utils/TestService.cs | 34 ++ 11 files changed, 775 insertions(+), 24 deletions(-) create mode 100644 src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql create mode 100644 test/DurableTask.SqlServer.Tests/Integration/TagTests.cs create mode 100644 test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index ca7135d..d7038fb 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -56,7 +56,8 @@ IF TYPE_ID(N'__SchemaNamePlaceholder__.OrchestrationEvents') IS NULL [PayloadID] uniqueidentifier NULL, [ParentInstanceID] varchar(100) NULL, [Version] varchar(100) NULL, - [TraceContext] varchar(800) NULL + [TraceContext] varchar(800) NULL, + [Tags] varchar(max) NULL ) GO @@ -75,7 +76,8 @@ IF TYPE_ID(N'__SchemaNamePlaceholder__.TaskEvents') IS NULL [PayloadText] varchar(max) NULL, [PayloadID] uniqueidentifier NULL, [Version] varchar(100) NULL, - [TraceContext] varchar(800) NULL + [TraceContext] varchar(800) NULL, + [Tags] varchar(max) NULL ) GO @@ -194,7 +196,8 @@ AS P.[TaskHub] = __SchemaNamePlaceholder__.CurrentTaskHub() AND P.[InstanceID] = I.[InstanceID] AND P.[PayloadID] = I.[OutputPayloadID]) AS [OutputText], - I.[ParentInstanceID] + I.[ParentInstanceID], + I.[Tags] FROM Instances I WHERE I.[TaskHub] = __SchemaNamePlaceholder__.CurrentTaskHub() @@ -233,7 +236,8 @@ CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.CreateInstance @InputText varchar(MAX) = NULL, @StartTime datetime2 = NULL, @DedupeStatuses varchar(MAX) = 'Pending,Running', - @TraceContext varchar(800) = NULL + @TraceContext varchar(800) = NULL, + @Tags varchar(MAX) = NULL AS BEGIN DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() @@ -302,7 +306,8 @@ BEGIN [ExecutionID], [RuntimeStatus], [InputPayloadID], - [TraceContext]) + [TraceContext], + [Tags]) VALUES ( @Name, @Version, @@ -311,7 +316,8 @@ BEGIN @ExecutionID, @RuntimeStatus, @InputPayloadID, - @TraceContext + @TraceContext, + @Tags ) INSERT INTO NewEvents ( @@ -348,10 +354,12 @@ BEGIN DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() DECLARE @ParentInstanceID varchar(100) DECLARE @Version varchar(100) + DECLARE @Tags varchar(MAX) SELECT @ParentInstanceID = [ParentInstanceID], - @Version = [Version] + @Version = [Version], + @Tags = [Tags] FROM Instances WHERE [InstanceID] = @InstanceID SELECT @@ -370,7 +378,8 @@ BEGIN [PayloadID], @ParentInstanceID as [ParentInstanceID], @Version as [Version], - H.[TraceContext] + H.[TraceContext], + @Tags as [Tags] FROM History H WITH (INDEX (PK_History)) LEFT OUTER JOIN Payloads P ON P.[TaskHub] = @TaskHub AND @@ -635,6 +644,7 @@ BEGIN DECLARE @parentInstanceID varchar(100) DECLARE @version varchar(100) DECLARE @runtimeStatus varchar(30) + DECLARE @tags varchar(MAX) DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() BEGIN TRANSACTION @@ -654,7 +664,8 @@ BEGIN @instanceID = I.[InstanceID], @parentInstanceID = I.[ParentInstanceID], @runtimeStatus = I.[RuntimeStatus], - @version = I.[Version] + @version = I.[Version], + @tags = I.[Tags] FROM Instances I WITH (READPAST) INNER JOIN NewEvents E WITH (READPAST) ON E.[TaskHub] = @TaskHub AND @@ -684,7 +695,8 @@ BEGIN DATEDIFF(SECOND, [Timestamp], @now) AS [WaitTime], @parentInstanceID as [ParentInstanceID], @version as [Version], - N.[TraceContext] + N.[TraceContext], + @tags as [Tags] FROM NewEvents N LEFT OUTER JOIN __SchemaNamePlaceholder__.[Payloads] P ON P.[TaskHub] = @TaskHub AND @@ -703,7 +715,7 @@ BEGIN END -- Result #2: Basic information about this instance, including its runtime status - SELECT @instanceID AS [InstanceID], @runtimeStatus AS [RuntimeStatus] + SELECT @instanceID AS [InstanceID], @runtimeStatus AS [RuntimeStatus], @tags AS [Tags] -- Result #3: The full event history for the locked instance -- NOTE: This must be kept consistent with the dt.HistoryEvents custom data type @@ -724,7 +736,8 @@ BEGIN [PayloadID], @parentInstanceID as [ParentInstanceID], @version as [Version], - H.[TraceContext] + H.[TraceContext], + @tags as [Tags] FROM History H WITH (INDEX (PK_History)) LEFT OUTER JOIN Payloads P ON P.[TaskHub] = @TaskHub AND @@ -746,7 +759,8 @@ CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._CheckpointOrchestration @DeletedEvents MessageIDs READONLY, @NewHistoryEvents HistoryEvents READONLY, @NewOrchestrationEvents OrchestrationEvents READONLY, - @NewTaskEvents TaskEvents READONLY + @NewTaskEvents TaskEvents READONLY, + @Tags varchar(MAX) = NULL AS BEGIN BEGIN TRANSACTION @@ -860,7 +874,8 @@ BEGIN [LockExpiration] = NULL, -- release the lock [CustomStatusPayloadID] = @CustomStatusPayloadID, [InputPayloadID] = @InputPayloadID, - [OutputPayloadID] = @OutputPayloadID + [OutputPayloadID] = @OutputPayloadID, + [Tags] = @Tags FROM Instances WHERE [TaskHub] = @TaskHub and [InstanceID] = @InstanceID @@ -907,7 +922,8 @@ BEGIN [Version], [ParentInstanceID], [RuntimeStatus], - [TraceContext]) + [TraceContext], + [Tags]) SELECT DISTINCT @TaskHub, E.[InstanceID], @@ -916,7 +932,8 @@ BEGIN E.[Version], E.[ParentInstanceID], 'Pending', - E.[TraceContext] + E.[TraceContext], + E.[Tags] FROM @NewOrchestrationEvents E WHERE E.[EventType] IN ('ExecutionStarted') AND NOT EXISTS ( @@ -1018,7 +1035,8 @@ BEGIN [LockExpiration], [PayloadID], [Version], - [TraceContext] + [TraceContext], + [Tags] ) OUTPUT INSERTED.[SequenceNumber], @@ -1034,7 +1052,8 @@ BEGIN [LockExpiration], [PayloadID], [Version], - [TraceContext] + [TraceContext], + [Tags] FROM @NewTaskEvents COMMIT TRANSACTION @@ -1185,7 +1204,8 @@ BEGIN P.[TaskHub] = @TaskHub AND P.[InstanceID] = I.[InstanceID] AND P.[PayloadID] = I.[OutputPayloadID]) ELSE NULL END AS [OutputText], - I.[TraceContext] + I.[TraceContext], + I.[Tags] FROM Instances I WHERE I.[TaskHub] = @TaskHub AND @@ -1231,7 +1251,8 @@ BEGIN P.[TaskHub] = @TaskHub AND P.[InstanceID] = I.[InstanceID] AND P.[PayloadID] = I.[OutputPayloadID]) ELSE NULL END AS [OutputText], - I.[TraceContext] + I.[TraceContext], + I.[Tags] FROM Instances I WHERE @@ -1295,7 +1316,8 @@ BEGIN P.[InstanceID] = N.[InstanceID] AND P.[PayloadID] = N.[PayloadID]) AS [PayloadText], DATEDIFF(SECOND, [Timestamp], @now) AS [WaitTime], - [TraceContext] + [TraceContext], + [Tags] FROM NewTasks N WHERE [TaskHub] = @TaskHub AND [SequenceNumber] = @SequenceNumber diff --git a/src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql b/src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql new file mode 100644 index 0000000..a688131 --- /dev/null +++ b/src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql @@ -0,0 +1,49 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the MIT License. + +-- PERSISTENT SCHEMA OBJECTS (tables, indexes, types, etc.) +-- +-- The contents of this file must never be changed after +-- being published. Any schema changes must be done in +-- new schema-{major}.{minor}.{patch}.sql scripts. + +-- Add a new Tags column to the Instances table (JSON blob of string key-value pairs) +IF NOT EXISTS (SELECT 1 FROM sys.columns WHERE object_id = OBJECT_ID('__SchemaNamePlaceholder__.Instances') AND name = 'Tags') + ALTER TABLE __SchemaNamePlaceholder__.Instances ADD [Tags] varchar(MAX) NULL + +-- Add a new Tags column to the NewTasks table so that activity middleware can access orchestration tags +IF NOT EXISTS (SELECT 1 FROM sys.columns WHERE object_id = OBJECT_ID('__SchemaNamePlaceholder__.NewTasks') AND name = 'Tags') + ALTER TABLE __SchemaNamePlaceholder__.NewTasks ADD [Tags] varchar(MAX) NULL + +-- Drop custom types that have schema changes (OrchestrationEvents and TaskEvents get a Tags field). +-- Must first drop all stored procedures that depend on them. +-- One way to discover all the stored procs that depend on the types is to query sys.sql_expression_dependencies +-- (credit to https://www.mssqltips.com/sqlservertip/6114/how-to-alter-user-defined-table-type-in-sql-server/): + +/* + SELECT DISTINCT [types].name FROM ( + SELECT s.name as [schema], o.name, def = OBJECT_DEFINITION(d.referencing_id), d.referenced_entity_name + FROM sys.sql_expression_dependencies AS d + INNER JOIN sys.objects AS o + ON d.referencing_id = o.[object_id] + INNER JOIN sys.schemas AS s + ON o.[schema_id] = s.[schema_id] + WHERE d.referenced_database_name IS NULL + AND d.referenced_class_desc = 'TYPE' + AND d.referenced_entity_name IN ('OrchestrationEvents', 'TaskEvents') + ) [types] +*/ + +-- First, drop the referencing stored procedures +IF OBJECT_ID('__SchemaNamePlaceholder__._AddOrchestrationEvents') IS NOT NULL + DROP PROCEDURE __SchemaNamePlaceholder__._AddOrchestrationEvents +IF OBJECT_ID('__SchemaNamePlaceholder__._CheckpointOrchestration') IS NOT NULL + DROP PROCEDURE __SchemaNamePlaceholder__._CheckpointOrchestration +IF OBJECT_ID('__SchemaNamePlaceholder__._CompleteTasks') IS NOT NULL + DROP PROCEDURE __SchemaNamePlaceholder__._CompleteTasks + +-- Next, drop the types that we are changing +IF TYPE_ID('__SchemaNamePlaceholder__.OrchestrationEvents') IS NOT NULL + DROP TYPE __SchemaNamePlaceholder__.OrchestrationEvents +IF TYPE_ID('__SchemaNamePlaceholder__.TaskEvents') IS NOT NULL + DROP TYPE __SchemaNamePlaceholder__.TaskEvents diff --git a/src/DurableTask.SqlServer/SqlOrchestrationService.cs b/src/DurableTask.SqlServer/SqlOrchestrationService.cs index 3dfe161..19c640d 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationService.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationService.cs @@ -388,6 +388,11 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync( currentWorkItem.EventPayloadMappings, this.settings.SchemaName); + string? tagsJson = newRuntimeState.Tags != null && newRuntimeState.Tags.Count > 0 + ? DTUtils.SerializeToJson(newRuntimeState.Tags) + : null; + command.Parameters.Add("@Tags", SqlDbType.VarChar).Value = (object?)tagsJson ?? DBNull.Value; + try { await SqlUtils.ExecuteNonQueryAsync(command, this.traceHelper, instance.InstanceId); @@ -446,6 +451,19 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync( TaskMessage message = reader.GetTaskMessage(); int dequeueCount = reader.GetInt32("DequeueCount"); + // Reconstruct OrchestrationExecutionContext from the Tags column + // so that activity middleware can access orchestration tags. + IDictionary? tags = SqlUtils.GetTags(reader); + if (tags != null) + { + // OrchestrationExecutionContext.OrchestrationTags has an internal setter, + // so we construct it via JSON deserialization. + string contextJson = DTUtils.SerializeToJson( + new { OrchestrationTags = tags }); + message.OrchestrationExecutionContext = + DTUtils.DeserializeFromJson(contextJson); + } + // TODO: poison message handling for high dequeue counts return new TaskActivityWorkItem @@ -522,6 +540,11 @@ public override async Task CreateTaskOrchestrationAsync(TaskMessage creationMess command.Parameters.Add("@StartTime", SqlDbType.DateTime2).Value = startEvent.ScheduledStartTime; command.Parameters.Add("@TraceContext", SqlDbType.VarChar, size: 800).Value = SqlUtils.GetTraceContext(startEvent); + string? tagsJson = startEvent.Tags != null && startEvent.Tags.Count > 0 + ? DTUtils.SerializeToJson(startEvent.Tags) + : null; + command.Parameters.Add("@Tags", SqlDbType.VarChar).Value = (object?)tagsJson ?? DBNull.Value; + if (dedupeStatuses?.Length > 0) { command.Parameters.Add("@DedupeStatuses", SqlDbType.VarChar).Value = string.Join(",", dedupeStatuses); diff --git a/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs b/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs index 9c3e98c..08e7416 100644 --- a/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs +++ b/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs @@ -31,6 +31,7 @@ static class OrchestrationEventSqlType new SqlMetaData("ParentInstanceID", SqlDbType.VarChar, 100), new SqlMetaData("Version", SqlDbType.VarChar, 100), new SqlMetaData("TraceContext", SqlDbType.VarChar, 800), + new SqlMetaData("Tags", SqlDbType.VarChar, -1 /* max */), }; static class ColumnOrdinals @@ -50,6 +51,7 @@ static class ColumnOrdinals public const int ParentInstanceID = 11; public const int Version = 12; public const int TraceContext = 13; + public const int Tags = 14; } public static SqlParameter AddOrchestrationEventsParameter( @@ -152,6 +154,7 @@ static SqlDataRecord PopulateOrchestrationMessage(TaskMessage msg, SqlDataRecord record.SetSqlString(ColumnOrdinals.ParentInstanceID, SqlUtils.GetParentInstanceId(msg.Event)); record.SetSqlString(ColumnOrdinals.Version, SqlUtils.GetVersion(msg.Event)); record.SetSqlString(ColumnOrdinals.TraceContext, SqlUtils.GetTraceContext(msg.Event)); + record.SetSqlString(ColumnOrdinals.Tags, SqlUtils.GetTags(msg.Event)); return record; } diff --git a/src/DurableTask.SqlServer/SqlTypes/TaskEventSqlType.cs b/src/DurableTask.SqlServer/SqlTypes/TaskEventSqlType.cs index 37b8858..1078f39 100644 --- a/src/DurableTask.SqlServer/SqlTypes/TaskEventSqlType.cs +++ b/src/DurableTask.SqlServer/SqlTypes/TaskEventSqlType.cs @@ -30,6 +30,7 @@ static class TaskEventSqlType new SqlMetaData("PayloadID", SqlDbType.UniqueIdentifier), new SqlMetaData("Version", SqlDbType.VarChar, 100), new SqlMetaData("TraceContext", SqlDbType.VarChar, 800), + new SqlMetaData("Tags", SqlDbType.VarChar, -1 /* max */), }; static class ColumnOrdinals @@ -48,6 +49,7 @@ static class ColumnOrdinals public const int PayloadId = 10; public const int Version = 11; public const int TraceContext = 12; + public const int Tags = 13; } public static SqlParameter AddTaskEventsParameter( @@ -140,6 +142,7 @@ static SqlDataRecord PopulateTaskMessageRecord( record.SetSqlString(ColumnOrdinals.Version, SqlUtils.GetVersion(msg.Event)); record.SetSqlString(ColumnOrdinals.TraceContext, SqlUtils.GetTraceContext(msg.Event)); + record.SetSqlString(ColumnOrdinals.Tags, SqlUtils.GetTagsFromContext(msg)); return record; } } diff --git a/src/DurableTask.SqlServer/SqlUtils.cs b/src/DurableTask.SqlServer/SqlUtils.cs index 7fc58ee..897356a 100644 --- a/src/DurableTask.SqlServer/SqlUtils.cs +++ b/src/DurableTask.SqlServer/SqlUtils.cs @@ -97,7 +97,7 @@ public static HistoryEvent GetHistoryEvent(this DbDataReader reader, bool isOrch InstanceId = GetInstanceId(reader), ExecutionId = GetExecutionId(reader), }, - Tags = null, // TODO + Tags = GetTags(reader), Version = GetVersion(reader), ParentTraceContext = GetTraceContext(reader), }; @@ -259,7 +259,8 @@ public static OrchestrationState GetOrchestrationState(this DbDataReader reader) }, OrchestrationStatus = GetRuntimeStatus(reader), Status = GetStringOrNull(reader, reader.GetOrdinal("CustomStatusText")), - ParentInstance = parentInstance + ParentInstance = parentInstance, + Tags = GetTags(reader), }; // The OutputText column is overloaded to contain either orchestration output or failure details @@ -483,6 +484,56 @@ internal static SqlString GetTraceContext(HistoryEvent e) return traceContext; } + internal static SqlString GetTags(HistoryEvent e) + { + if (e is ExecutionStartedEvent startedEvent && + startedEvent.Tags != null && + startedEvent.Tags.Count > 0) + { + return DTUtils.SerializeToJson(startedEvent.Tags); + } + + return SqlString.Null; + } + + internal static SqlString GetTagsFromContext(TaskMessage msg) + { + IDictionary? tags = msg.OrchestrationExecutionContext?.OrchestrationTags; + if (tags != null && tags.Count > 0) + { + return DTUtils.SerializeToJson(tags); + } + + return SqlString.Null; + } + + internal static IDictionary? GetTags(DbDataReader reader) + { + int ordinal; + try + { + ordinal = reader.GetOrdinal("Tags"); + } + catch (IndexOutOfRangeException) + { + // The Tags column may not exist in older schema versions + return null; + } + + if (reader.IsDBNull(ordinal)) + { + return null; + } + + string json = reader.GetString(ordinal); + if (string.IsNullOrEmpty(json)) + { + return null; + } + + return DTUtils.DeserializeFromJson>(json); + } + public static SqlParameter AddInstanceIDsParameter( this SqlParameterCollection commandParameters, string paramName, diff --git a/test/DurableTask.SqlServer.Tests/DurableTask.SqlServer.Tests.csproj b/test/DurableTask.SqlServer.Tests/DurableTask.SqlServer.Tests.csproj index a021dd6..77a36d4 100644 --- a/test/DurableTask.SqlServer.Tests/DurableTask.SqlServer.Tests.csproj +++ b/test/DurableTask.SqlServer.Tests/DurableTask.SqlServer.Tests.csproj @@ -1,7 +1,7 @@  - net6.0 + net8.0 false true 9.0 diff --git a/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs b/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs index 11c0aec..d4c1d02 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs @@ -46,6 +46,7 @@ public void CanEnumerateEmbeddedSqlScripts() "drop-schema.sql", "schema-1.0.0.sql", "schema-1.2.0.sql", + "schema-1.3.0.sql", "logic.sql", "permissions.sql", }; @@ -98,6 +99,7 @@ public async Task CanCreateAndDropSchema(bool isDatabaseMissing) LogAssert.ExecutedSqlScript("drop-schema.sql"), LogAssert.ExecutedSqlScript("schema-1.0.0.sql"), LogAssert.ExecutedSqlScript("schema-1.2.0.sql"), + LogAssert.ExecutedSqlScript("schema-1.3.0.sql"), LogAssert.ExecutedSqlScript("logic.sql"), LogAssert.ExecutedSqlScript("permissions.sql"), LogAssert.SprocCompleted("dt._UpdateVersion")) @@ -158,6 +160,7 @@ public async Task CanCreateAndDropSchemaWithCustomSchemaName(bool isDatabaseMiss LogAssert.ExecutedSqlScript("drop-schema.sql"), LogAssert.ExecutedSqlScript("schema-1.0.0.sql"), LogAssert.ExecutedSqlScript("schema-1.2.0.sql"), + LogAssert.ExecutedSqlScript("schema-1.3.0.sql"), LogAssert.ExecutedSqlScript("logic.sql"), LogAssert.ExecutedSqlScript("permissions.sql"), LogAssert.SprocCompleted($"{schemaName}._UpdateVersion")) @@ -220,6 +223,7 @@ public async Task CanCreateAndDropMultipleSchemas(bool isDatabaseMissing) LogAssert.ExecutedSqlScript("drop-schema.sql"), LogAssert.ExecutedSqlScript("schema-1.0.0.sql"), LogAssert.ExecutedSqlScript("schema-1.2.0.sql"), + LogAssert.ExecutedSqlScript("schema-1.3.0.sql"), LogAssert.ExecutedSqlScript("logic.sql"), LogAssert.ExecutedSqlScript("permissions.sql"), LogAssert.SprocCompleted($"{firstTestSchemaName}._UpdateVersion")) @@ -230,6 +234,7 @@ public async Task CanCreateAndDropMultipleSchemas(bool isDatabaseMissing) LogAssert.ExecutedSqlScript("drop-schema.sql"), LogAssert.ExecutedSqlScript("schema-1.0.0.sql"), LogAssert.ExecutedSqlScript("schema-1.2.0.sql"), + LogAssert.ExecutedSqlScript("schema-1.3.0.sql"), LogAssert.ExecutedSqlScript("logic.sql"), LogAssert.ExecutedSqlScript("permissions.sql"), LogAssert.SprocCompleted($"{secondTestSchemaName}._UpdateVersion")) @@ -313,6 +318,7 @@ public async Task CanCreateIfNotExists(bool isDatabaseMissing) LogAssert.SprocCompleted("dt._GetVersions"), LogAssert.ExecutedSqlScript("schema-1.0.0.sql"), LogAssert.ExecutedSqlScript("schema-1.2.0.sql"), + LogAssert.ExecutedSqlScript("schema-1.3.0.sql"), LogAssert.ExecutedSqlScript("logic.sql"), LogAssert.ExecutedSqlScript("permissions.sql"), LogAssert.SprocCompleted("dt._UpdateVersion")) @@ -366,6 +372,7 @@ public async Task SchemaCreationIsSerializedAndIdempotent(bool isDatabaseMissing LogAssert.SprocCompleted("dt._GetVersions"), LogAssert.ExecutedSqlScript("schema-1.0.0.sql"), LogAssert.ExecutedSqlScript("schema-1.2.0.sql"), + LogAssert.ExecutedSqlScript("schema-1.3.0.sql"), LogAssert.ExecutedSqlScript("logic.sql"), LogAssert.ExecutedSqlScript("permissions.sql"), LogAssert.SprocCompleted("dt._UpdateVersion"), diff --git a/test/DurableTask.SqlServer.Tests/Integration/TagTests.cs b/test/DurableTask.SqlServer.Tests/Integration/TagTests.cs new file mode 100644 index 0000000..ec10a6a --- /dev/null +++ b/test/DurableTask.SqlServer.Tests/Integration/TagTests.cs @@ -0,0 +1,339 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.SqlServer.Tests.Integration +{ + using System; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + using DurableTask.Core; + using DurableTask.SqlServer.Tests.Logging; + using DurableTask.SqlServer.Tests.Utils; + using Newtonsoft.Json; + using Newtonsoft.Json.Linq; + using Xunit; + using Xunit.Abstractions; + + [Collection("Integration")] + public class TagTests : IAsyncLifetime + { + readonly TestService testService; + + public TagTests(ITestOutputHelper output) + { + this.testService = new TestService(output); + } + + Task IAsyncLifetime.InitializeAsync() => this.testService.InitializeAsync(); + + Task IAsyncLifetime.DisposeAsync() => this.testService.DisposeAsync(); + + [Fact] + public async Task OrchestrationWithTags() + { + // Arrange: create tags to associate with the orchestration + var tags = new Dictionary + { + { "environment", "test" }, + { "owner", "integration-test" }, + }; + + string input = $"Hello {DateTime.UtcNow:o}"; + + // Act: run an orchestration with tags + TestInstance instance = await this.testService.RunOrchestrationWithTags( + input, + orchestrationName: "OrchestrationWithTags", + tags: tags, + implementation: (ctx, input) => Task.FromResult(input)); + + OrchestrationState state = await instance.WaitForCompletion(expectedOutput: input); + + // Assert: verify that tags are preserved on the OrchestrationState + Assert.NotNull(state.Tags); + Assert.Equal(2, state.Tags.Count); + Assert.Equal("test", state.Tags["environment"]); + Assert.Equal("integration-test", state.Tags["owner"]); + + // Validate logs + LogAssert.NoWarningsOrErrors(this.testService.LogProvider); + } + + [Fact] + public async Task OrchestrationWithoutTags() + { + string input = $"Hello {DateTime.UtcNow:o}"; + + // Act: run an orchestration without tags (backward compatibility) + TestInstance instance = await this.testService.RunOrchestration( + input, + orchestrationName: "OrchestrationWithoutTags", + implementation: (ctx, input) => Task.FromResult(input)); + + OrchestrationState state = await instance.WaitForCompletion(expectedOutput: input); + + // Assert: tags should be null when none are specified + Assert.Null(state.Tags); + + // Validate logs + LogAssert.NoWarningsOrErrors(this.testService.LogProvider); + } + + [Fact] + public async Task OrchestrationWithEmptyTags() + { + var tags = new Dictionary(); + + string input = $"Hello {DateTime.UtcNow:o}"; + + // Act: run an orchestration with an empty tags dictionary + TestInstance instance = await this.testService.RunOrchestrationWithTags( + input, + orchestrationName: "OrchestrationWithEmptyTags", + tags: tags, + implementation: (ctx, input) => Task.FromResult(input)); + + OrchestrationState state = await instance.WaitForCompletion(expectedOutput: input); + + // Assert: empty tags should result in null (not persisted) + Assert.Null(state.Tags); + + // Validate logs + LogAssert.NoWarningsOrErrors(this.testService.LogProvider); + } + + [Fact] + public async Task TagsSurviveContinueAsNew() + { + // Arrange + var tags = new Dictionary + { + { "workflow", "retry-loop" }, + { "version", "v2" }, + }; + + // Act: run an orchestration that uses ContinueAsNew multiple times + TestInstance instance = await this.testService.RunOrchestrationWithTags( + input: 0, + orchestrationName: "TagsContinueAsNewTest", + tags: tags, + implementation: async (ctx, input) => + { + if (input < 3) + { + await ctx.CreateTimer(DateTime.MinValue, null); + ctx.ContinueAsNew(input + 1); + } + + return input; + }); + + OrchestrationState state = await instance.WaitForCompletion( + expectedOutput: 3, + continuedAsNew: true); + + // Assert: tags should survive multiple ContinueAsNew cycles + Assert.NotNull(state.Tags); + Assert.Equal(2, state.Tags.Count); + Assert.Equal("retry-loop", state.Tags["workflow"]); + Assert.Equal("v2", state.Tags["version"]); + } + + [Fact] + public async Task TagsPreservedAfterActivityExecution() + { + // Arrange + var tags = new Dictionary + { + { "project", "my-project" }, + }; + + string input = "World"; + + // Act: run an orchestration with tags that calls an activity + TestInstance instance = await this.testService.RunOrchestrationWithTags( + input, + orchestrationName: "TagsWithActivityTest", + tags: tags, + implementation: (ctx, input) => ctx.ScheduleTask("SayHello", "", input), + activities: ("SayHello", TestService.MakeActivity((TaskContext ctx, string input) => $"Hello, {input}!"))); + + OrchestrationState state = await instance.WaitForCompletion( + expectedOutput: $"Hello, {input}!"); + + // Assert: tags should still be present on the state after activity execution + Assert.NotNull(state.Tags); + Assert.Single(state.Tags); + Assert.Equal("my-project", state.Tags["project"]); + } + + [Fact] + public async Task TagsVisibleViaQuery() + { + // Arrange + var tags = new Dictionary + { + { "queryTest", "yes" }, + }; + + // Act: run an orchestration with tags and query it + TestInstance instance = await this.testService.RunOrchestrationWithTags( + input: "test", + orchestrationName: "TagsQueryTest", + tags: tags, + implementation: (ctx, input) => Task.FromResult(input)); + + await instance.WaitForCompletion(expectedOutput: "test"); + + // Query the state separately + OrchestrationState queriedState = await instance.GetStateAsync(); + + // Assert: tags should be visible on queried state + Assert.NotNull(queriedState); + Assert.NotNull(queriedState.Tags); + Assert.Equal("yes", queriedState.Tags["queryTest"]); + } + + [Fact] + public async Task SubOrchestrationInheritsTags() + { + // Arrange: parent orchestration has tags + var parentTags = new Dictionary + { + { "team", "platform" }, + { "priority", "high" }, + }; + + string subOrchestrationName = "ChildOrchestrationForTagTest"; + + // Register the sub-orchestration that returns a simple value + this.testService.RegisterInlineOrchestration( + subOrchestrationName, + implementation: (ctx, input) => Task.FromResult("child-done")); + + // Act: parent orchestration calls a sub-orchestration + TestInstance parentInstance = await this.testService.RunOrchestrationWithTags( + input: "parent", + orchestrationName: "ParentOrchestrationForTagTest", + tags: parentTags, + implementation: async (ctx, input) => + { + string childResult = await ctx.CreateSubOrchestrationInstance( + subOrchestrationName, string.Empty, null); + return childResult; + }); + + OrchestrationState parentState = await parentInstance.WaitForCompletion( + timeout: TimeSpan.FromSeconds(15), + expectedOutput: "child-done"); + + // Assert: parent tags should still be present + Assert.NotNull(parentState.Tags); + Assert.Equal("platform", parentState.Tags["team"]); + Assert.Equal("high", parentState.Tags["priority"]); + } + + [Fact] + public async Task TagsWithSpecialCharacters() + { + // Arrange: tags with special characters that could cause JSON/SQL issues + var tags = new Dictionary + { + { "key with spaces", "value with spaces" }, + { "unicode-key-日本語", "unicode-value-中文" }, + { "special\"chars", "value'with\"quotes" }, + }; + + // Act + TestInstance instance = await this.testService.RunOrchestrationWithTags( + input: "special", + orchestrationName: "TagsSpecialCharsTest", + tags: tags, + implementation: (ctx, input) => Task.FromResult(input)); + + OrchestrationState state = await instance.WaitForCompletion(expectedOutput: "special"); + + // Assert: all special character tags should be preserved + Assert.NotNull(state.Tags); + Assert.Equal(3, state.Tags.Count); + Assert.Equal("value with spaces", state.Tags["key with spaces"]); + Assert.Equal("unicode-value-中文", state.Tags["unicode-key-日本語"]); + Assert.Equal("value'with\"quotes", state.Tags["special\"chars"]); + } + + [Fact] + public async Task TagsAvailableInActivityMiddleware() + { + // Arrange: tags that should be accessible in the activity middleware context + var tags = new Dictionary + { + { "tenantId", "tenant-123" }, + { "region", "us-west" }, + }; + + // Act: run an orchestration with tags that calls an activity. + // The activity captures the OrchestrationExecutionContext tags from the TaskActivityWorkItem. + TestInstance instance = await this.testService.RunOrchestrationWithTags( + input: "middleware-test", + orchestrationName: "TagsActivityMiddlewareTest", + tags: tags, + implementation: (ctx, input) => ctx.ScheduleTask("CaptureActivity", "", input), + activities: ("CaptureActivity", TestService.MakeActivity((TaskContext ctx, string input) => + { + // Note: The actual OrchestrationExecutionContext is on the TaskMessage, + // not directly accessible from TaskContext in the activity code. + // We verify it indirectly by checking that the orchestration state still has tags. + return $"processed-{input}"; + }))); + + OrchestrationState state = await instance.WaitForCompletion( + expectedOutput: "processed-middleware-test"); + + // Assert: verify the orchestration completed successfully and tags are preserved + Assert.NotNull(state.Tags); + Assert.Equal("tenant-123", state.Tags["tenantId"]); + Assert.Equal("us-west", state.Tags["region"]); + } + + [Fact] + public async Task TagsOnManyOrchestrations() + { + // Arrange: tags for querying multiple orchestrations + var tags = new Dictionary + { + { "batch", "test-batch-1" }, + }; + + // Act: run an orchestration with tags + TestInstance instance = await this.testService.RunOrchestrationWithTags( + input: "query-test", + orchestrationName: "TagsManyQueryTest", + tags: tags, + implementation: (ctx, input) => Task.FromResult(input)); + + await instance.WaitForCompletion(expectedOutput: "query-test"); + + // Query using GetManyOrchestrations (which uses _QueryManyOrchestrations) + var filter = new SqlOrchestrationQuery(); + IReadOnlyCollection results = + await this.testService.OrchestrationServiceMock.Object.GetManyOrchestrationsAsync( + filter, CancellationToken.None); + + // Assert: at least one result should have our tags + Assert.NotEmpty(results); + bool foundTaggedInstance = false; + foreach (OrchestrationState result in results) + { + if (result.OrchestrationInstance.InstanceId == instance.InstanceId) + { + Assert.NotNull(result.Tags); + Assert.Equal("test-batch-1", result.Tags["batch"]); + foundTaggedInstance = true; + } + } + + Assert.True(foundTaggedInstance, "Did not find the tagged orchestration instance in query results."); + } + } +} diff --git a/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs b/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs new file mode 100644 index 0000000..3561c75 --- /dev/null +++ b/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs @@ -0,0 +1,220 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.SqlServer.Tests.Unit +{ + using System; + using System.Collections.Generic; + using System.Data.SqlTypes; + using DurableTask.Core; + using DurableTask.Core.History; + using Newtonsoft.Json; + using Xunit; + + public class SqlUtilsTagTests + { + [Fact] + public void GetTags_ExecutionStartedEvent_WithTags_ReturnsSerialized() + { + // Arrange + var tags = new Dictionary + { + { "key1", "value1" }, + { "key2", "value2" }, + }; + + var startedEvent = new ExecutionStartedEvent(-1, "input") + { + Tags = tags, + }; + + // Act + SqlString result = SqlUtils.GetTags(startedEvent); + + // Assert + Assert.False(result.IsNull); + var deserialized = JsonConvert.DeserializeObject>(result.Value); + Assert.Equal(2, deserialized.Count); + Assert.Equal("value1", deserialized["key1"]); + Assert.Equal("value2", deserialized["key2"]); + } + + [Fact] + public void GetTags_ExecutionStartedEvent_NullTags_ReturnsNull() + { + // Arrange + var startedEvent = new ExecutionStartedEvent(-1, "input") + { + Tags = null, + }; + + // Act + SqlString result = SqlUtils.GetTags(startedEvent); + + // Assert + Assert.True(result.IsNull); + } + + [Fact] + public void GetTags_ExecutionStartedEvent_EmptyTags_ReturnsNull() + { + // Arrange + var startedEvent = new ExecutionStartedEvent(-1, "input") + { + Tags = new Dictionary(), + }; + + // Act + SqlString result = SqlUtils.GetTags(startedEvent); + + // Assert + Assert.True(result.IsNull); + } + + [Fact] + public void GetTags_NonStartedEvent_ReturnsNull() + { + // Arrange - use a non-ExecutionStartedEvent + var completedEvent = new ExecutionCompletedEvent( + -1, "result", OrchestrationStatus.Completed); + + // Act + SqlString result = SqlUtils.GetTags(completedEvent); + + // Assert + Assert.True(result.IsNull); + } + + [Fact] + public void GetTags_SpecialCharacters_RoundTrips() + { + // Arrange + var tags = new Dictionary + { + { "special\"key", "value'with\"quotes" }, + { "unicode-日本語", "中文" }, + { "key with spaces", "value with spaces" }, + }; + + var startedEvent = new ExecutionStartedEvent(-1, "input") + { + Tags = tags, + }; + + // Act + SqlString result = SqlUtils.GetTags(startedEvent); + + // Assert + Assert.False(result.IsNull); + var deserialized = JsonConvert.DeserializeObject>(result.Value); + Assert.Equal(3, deserialized.Count); + Assert.Equal("value'with\"quotes", deserialized["special\"key"]); + Assert.Equal("中文", deserialized["unicode-日本語"]); + Assert.Equal("value with spaces", deserialized["key with spaces"]); + } + + [Fact] + public void GetTagsFromContext_WithTags_ReturnsSerialized() + { + // Arrange + var tags = new Dictionary + { + { "tenantId", "tenant-123" }, + }; + + string contextJson = JsonConvert.SerializeObject(new { OrchestrationTags = tags }); + var context = JsonConvert.DeserializeObject(contextJson); + + var message = new TaskMessage + { + Event = new TaskScheduledEvent(-1), + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = "test-instance", + ExecutionId = "test-execution", + }, + OrchestrationExecutionContext = context, + }; + + // Act + SqlString result = SqlUtils.GetTagsFromContext(message); + + // Assert + Assert.False(result.IsNull); + var deserialized = JsonConvert.DeserializeObject>(result.Value); + Assert.Single(deserialized); + Assert.Equal("tenant-123", deserialized["tenantId"]); + } + + [Fact] + public void GetTagsFromContext_NullContext_ReturnsNull() + { + // Arrange + var message = new TaskMessage + { + Event = new TaskScheduledEvent(-1), + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = "test-instance", + ExecutionId = "test-execution", + }, + OrchestrationExecutionContext = null, + }; + + // Act + SqlString result = SqlUtils.GetTagsFromContext(message); + + // Assert + Assert.True(result.IsNull); + } + + [Fact] + public void GetTagsFromContext_EmptyTags_ReturnsNull() + { + // Arrange + string contextJson = JsonConvert.SerializeObject(new { OrchestrationTags = new Dictionary() }); + var context = JsonConvert.DeserializeObject(contextJson); + + var message = new TaskMessage + { + Event = new TaskScheduledEvent(-1), + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = "test-instance", + ExecutionId = "test-execution", + }, + OrchestrationExecutionContext = context, + }; + + // Act + SqlString result = SqlUtils.GetTagsFromContext(message); + + // Assert + Assert.True(result.IsNull); + } + + [Fact] + public void GetTagsFromContext_ContextWithNullTags_ReturnsNull() + { + // Arrange - default OrchestrationExecutionContext has null tags + var context = new OrchestrationExecutionContext(); + + var message = new TaskMessage + { + Event = new TaskScheduledEvent(-1), + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = "test-instance", + ExecutionId = "test-execution", + }, + OrchestrationExecutionContext = context, + }; + + // Act + SqlString result = SqlUtils.GetTagsFromContext(message); + + // Assert + Assert.True(result.IsNull); + } + } +} diff --git a/test/DurableTask.SqlServer.Tests/Utils/TestService.cs b/test/DurableTask.SqlServer.Tests/Utils/TestService.cs index 6545e6a..c7644dc 100644 --- a/test/DurableTask.SqlServer.Tests/Utils/TestService.cs +++ b/test/DurableTask.SqlServer.Tests/Utils/TestService.cs @@ -263,6 +263,40 @@ public async Task>> RunOrchestrations> RunOrchestrationWithTags( + TInput input, + string orchestrationName, + IDictionary tags, + Func> implementation, + params (string name, TaskActivity activity)[] activities) + { + // Register the inline orchestration + this.RegisterInlineOrchestration(orchestrationName, string.Empty, implementation); + + foreach ((string name, TaskActivity activity) in activities) + { + this.RegisterInlineActivity(name, string.Empty, activity); + } + + string instanceId = Guid.NewGuid().ToString("N"); + DateTime utcNow = DateTime.UtcNow; + + OrchestrationInstance instance = await this.client.CreateOrchestrationInstanceAsync( + orchestrationName, + string.Empty, + instanceId, + input, + tags); + + return new TestInstance( + this.client, + instance, + orchestrationName, + string.Empty, + utcNow, + input); + } + public void RegisterInlineActivity(string name, string version, TaskActivity activity) { this.worker.AddTaskActivities(new TestObjectCreator(name, version, activity)); From 130eb99d56d98ba3c63c218a587a23320dc74239 Mon Sep 17 00:00:00 2001 From: Yi Date: Thu, 12 Feb 2026 16:36:31 -0800 Subject: [PATCH 02/18] simplification --- src/DurableTask.SqlServer/Scripts/logic.sql | 41 ++-- .../Scripts/schema-1.3.0.sql | 41 +--- .../SqlOrchestrationService.cs | 10 +- .../SqlTypes/OrchestrationEventSqlType.cs | 3 - .../SqlTypes/TaskEventSqlType.cs | 3 - src/DurableTask.SqlServer/SqlUtils.cs | 33 +--- .../Unit/SqlUtilsTagTests.cs | 175 +++--------------- 7 files changed, 58 insertions(+), 248 deletions(-) diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index d7038fb..2e89812 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -56,8 +56,7 @@ IF TYPE_ID(N'__SchemaNamePlaceholder__.OrchestrationEvents') IS NULL [PayloadID] uniqueidentifier NULL, [ParentInstanceID] varchar(100) NULL, [Version] varchar(100) NULL, - [TraceContext] varchar(800) NULL, - [Tags] varchar(max) NULL + [TraceContext] varchar(800) NULL ) GO @@ -76,8 +75,7 @@ IF TYPE_ID(N'__SchemaNamePlaceholder__.TaskEvents') IS NULL [PayloadText] varchar(max) NULL, [PayloadID] uniqueidentifier NULL, [Version] varchar(100) NULL, - [TraceContext] varchar(800) NULL, - [Tags] varchar(max) NULL + [TraceContext] varchar(800) NULL ) GO @@ -933,7 +931,7 @@ BEGIN E.[ParentInstanceID], 'Pending', E.[TraceContext], - E.[Tags] + @Tags FROM @NewOrchestrationEvents E WHERE E.[EventType] IN ('ExecutionStarted') AND NOT EXISTS ( @@ -1035,8 +1033,7 @@ BEGIN [LockExpiration], [PayloadID], [Version], - [TraceContext], - [Tags] + [TraceContext] ) OUTPUT INSERTED.[SequenceNumber], @@ -1052,8 +1049,7 @@ BEGIN [LockExpiration], [PayloadID], [Version], - [TraceContext], - [Tags] + [TraceContext] FROM @NewTaskEvents COMMIT TRANSACTION @@ -1301,25 +1297,26 @@ BEGIN ([VisibleTime] IS NULL OR [VisibleTime] < @now) SELECT TOP (1) - [SequenceNumber], - [InstanceID], - [ExecutionID], - [Name], + N.[SequenceNumber], + N.[InstanceID], + N.[ExecutionID], + N.[Name], 'TaskScheduled' AS [EventType], - [TaskID], - [VisibleTime], - [Timestamp], - [DequeueCount], - [Version], + N.[TaskID], + N.[VisibleTime], + N.[Timestamp], + N.[DequeueCount], + N.[Version], (SELECT TOP 1 [Text] FROM Payloads P WHERE P.[TaskHub] = @TaskHub AND P.[InstanceID] = N.[InstanceID] AND P.[PayloadID] = N.[PayloadID]) AS [PayloadText], - DATEDIFF(SECOND, [Timestamp], @now) AS [WaitTime], - [TraceContext], - [Tags] + DATEDIFF(SECOND, N.[Timestamp], @now) AS [WaitTime], + N.[TraceContext], + I.[Tags] FROM NewTasks N - WHERE [TaskHub] = @TaskHub AND [SequenceNumber] = @SequenceNumber + INNER JOIN Instances I ON I.[TaskHub] = @TaskHub AND I.[InstanceID] = N.[InstanceID] + WHERE N.[TaskHub] = @TaskHub AND N.[SequenceNumber] = @SequenceNumber COMMIT TRANSACTION END diff --git a/src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql b/src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql index a688131..b960715 100644 --- a/src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql +++ b/src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql @@ -7,43 +7,8 @@ -- being published. Any schema changes must be done in -- new schema-{major}.{minor}.{patch}.sql scripts. --- Add a new Tags column to the Instances table (JSON blob of string key-value pairs) +-- Add a new Tags column to the Instances table (JSON blob of string key-value pairs). +-- Tags are read directly from the Instances table in all stored procedures, +-- so no changes to the OrchestrationEvents/TaskEvents types or NewTasks table are needed. IF NOT EXISTS (SELECT 1 FROM sys.columns WHERE object_id = OBJECT_ID('__SchemaNamePlaceholder__.Instances') AND name = 'Tags') ALTER TABLE __SchemaNamePlaceholder__.Instances ADD [Tags] varchar(MAX) NULL - --- Add a new Tags column to the NewTasks table so that activity middleware can access orchestration tags -IF NOT EXISTS (SELECT 1 FROM sys.columns WHERE object_id = OBJECT_ID('__SchemaNamePlaceholder__.NewTasks') AND name = 'Tags') - ALTER TABLE __SchemaNamePlaceholder__.NewTasks ADD [Tags] varchar(MAX) NULL - --- Drop custom types that have schema changes (OrchestrationEvents and TaskEvents get a Tags field). --- Must first drop all stored procedures that depend on them. --- One way to discover all the stored procs that depend on the types is to query sys.sql_expression_dependencies --- (credit to https://www.mssqltips.com/sqlservertip/6114/how-to-alter-user-defined-table-type-in-sql-server/): - -/* - SELECT DISTINCT [types].name FROM ( - SELECT s.name as [schema], o.name, def = OBJECT_DEFINITION(d.referencing_id), d.referenced_entity_name - FROM sys.sql_expression_dependencies AS d - INNER JOIN sys.objects AS o - ON d.referencing_id = o.[object_id] - INNER JOIN sys.schemas AS s - ON o.[schema_id] = s.[schema_id] - WHERE d.referenced_database_name IS NULL - AND d.referenced_class_desc = 'TYPE' - AND d.referenced_entity_name IN ('OrchestrationEvents', 'TaskEvents') - ) [types] -*/ - --- First, drop the referencing stored procedures -IF OBJECT_ID('__SchemaNamePlaceholder__._AddOrchestrationEvents') IS NOT NULL - DROP PROCEDURE __SchemaNamePlaceholder__._AddOrchestrationEvents -IF OBJECT_ID('__SchemaNamePlaceholder__._CheckpointOrchestration') IS NOT NULL - DROP PROCEDURE __SchemaNamePlaceholder__._CheckpointOrchestration -IF OBJECT_ID('__SchemaNamePlaceholder__._CompleteTasks') IS NOT NULL - DROP PROCEDURE __SchemaNamePlaceholder__._CompleteTasks - --- Next, drop the types that we are changing -IF TYPE_ID('__SchemaNamePlaceholder__.OrchestrationEvents') IS NOT NULL - DROP TYPE __SchemaNamePlaceholder__.OrchestrationEvents -IF TYPE_ID('__SchemaNamePlaceholder__.TaskEvents') IS NOT NULL - DROP TYPE __SchemaNamePlaceholder__.TaskEvents diff --git a/src/DurableTask.SqlServer/SqlOrchestrationService.cs b/src/DurableTask.SqlServer/SqlOrchestrationService.cs index 19c640d..4a97aa9 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationService.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationService.cs @@ -388,10 +388,7 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync( currentWorkItem.EventPayloadMappings, this.settings.SchemaName); - string? tagsJson = newRuntimeState.Tags != null && newRuntimeState.Tags.Count > 0 - ? DTUtils.SerializeToJson(newRuntimeState.Tags) - : null; - command.Parameters.Add("@Tags", SqlDbType.VarChar).Value = (object?)tagsJson ?? DBNull.Value; + command.Parameters.AddTagsParameter(newRuntimeState.Tags); try { @@ -540,10 +537,7 @@ public override async Task CreateTaskOrchestrationAsync(TaskMessage creationMess command.Parameters.Add("@StartTime", SqlDbType.DateTime2).Value = startEvent.ScheduledStartTime; command.Parameters.Add("@TraceContext", SqlDbType.VarChar, size: 800).Value = SqlUtils.GetTraceContext(startEvent); - string? tagsJson = startEvent.Tags != null && startEvent.Tags.Count > 0 - ? DTUtils.SerializeToJson(startEvent.Tags) - : null; - command.Parameters.Add("@Tags", SqlDbType.VarChar).Value = (object?)tagsJson ?? DBNull.Value; + command.Parameters.AddTagsParameter(startEvent.Tags); if (dedupeStatuses?.Length > 0) { diff --git a/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs b/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs index 08e7416..9c3e98c 100644 --- a/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs +++ b/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs @@ -31,7 +31,6 @@ static class OrchestrationEventSqlType new SqlMetaData("ParentInstanceID", SqlDbType.VarChar, 100), new SqlMetaData("Version", SqlDbType.VarChar, 100), new SqlMetaData("TraceContext", SqlDbType.VarChar, 800), - new SqlMetaData("Tags", SqlDbType.VarChar, -1 /* max */), }; static class ColumnOrdinals @@ -51,7 +50,6 @@ static class ColumnOrdinals public const int ParentInstanceID = 11; public const int Version = 12; public const int TraceContext = 13; - public const int Tags = 14; } public static SqlParameter AddOrchestrationEventsParameter( @@ -154,7 +152,6 @@ static SqlDataRecord PopulateOrchestrationMessage(TaskMessage msg, SqlDataRecord record.SetSqlString(ColumnOrdinals.ParentInstanceID, SqlUtils.GetParentInstanceId(msg.Event)); record.SetSqlString(ColumnOrdinals.Version, SqlUtils.GetVersion(msg.Event)); record.SetSqlString(ColumnOrdinals.TraceContext, SqlUtils.GetTraceContext(msg.Event)); - record.SetSqlString(ColumnOrdinals.Tags, SqlUtils.GetTags(msg.Event)); return record; } diff --git a/src/DurableTask.SqlServer/SqlTypes/TaskEventSqlType.cs b/src/DurableTask.SqlServer/SqlTypes/TaskEventSqlType.cs index 1078f39..37b8858 100644 --- a/src/DurableTask.SqlServer/SqlTypes/TaskEventSqlType.cs +++ b/src/DurableTask.SqlServer/SqlTypes/TaskEventSqlType.cs @@ -30,7 +30,6 @@ static class TaskEventSqlType new SqlMetaData("PayloadID", SqlDbType.UniqueIdentifier), new SqlMetaData("Version", SqlDbType.VarChar, 100), new SqlMetaData("TraceContext", SqlDbType.VarChar, 800), - new SqlMetaData("Tags", SqlDbType.VarChar, -1 /* max */), }; static class ColumnOrdinals @@ -49,7 +48,6 @@ static class ColumnOrdinals public const int PayloadId = 10; public const int Version = 11; public const int TraceContext = 12; - public const int Tags = 13; } public static SqlParameter AddTaskEventsParameter( @@ -142,7 +140,6 @@ static SqlDataRecord PopulateTaskMessageRecord( record.SetSqlString(ColumnOrdinals.Version, SqlUtils.GetVersion(msg.Event)); record.SetSqlString(ColumnOrdinals.TraceContext, SqlUtils.GetTraceContext(msg.Event)); - record.SetSqlString(ColumnOrdinals.Tags, SqlUtils.GetTagsFromContext(msg)); return record; } } diff --git a/src/DurableTask.SqlServer/SqlUtils.cs b/src/DurableTask.SqlServer/SqlUtils.cs index 897356a..9123948 100644 --- a/src/DurableTask.SqlServer/SqlUtils.cs +++ b/src/DurableTask.SqlServer/SqlUtils.cs @@ -484,29 +484,6 @@ internal static SqlString GetTraceContext(HistoryEvent e) return traceContext; } - internal static SqlString GetTags(HistoryEvent e) - { - if (e is ExecutionStartedEvent startedEvent && - startedEvent.Tags != null && - startedEvent.Tags.Count > 0) - { - return DTUtils.SerializeToJson(startedEvent.Tags); - } - - return SqlString.Null; - } - - internal static SqlString GetTagsFromContext(TaskMessage msg) - { - IDictionary? tags = msg.OrchestrationExecutionContext?.OrchestrationTags; - if (tags != null && tags.Count > 0) - { - return DTUtils.SerializeToJson(tags); - } - - return SqlString.Null; - } - internal static IDictionary? GetTags(DbDataReader reader) { int ordinal; @@ -534,6 +511,16 @@ internal static SqlString GetTagsFromContext(TaskMessage msg) return DTUtils.DeserializeFromJson>(json); } + internal static void AddTagsParameter( + this SqlParameterCollection parameters, + IDictionary? tags) + { + string? json = tags != null && tags.Count > 0 + ? DTUtils.SerializeToJson(tags) + : null; + parameters.Add("@Tags", SqlDbType.VarChar).Value = (object?)json ?? DBNull.Value; + } + public static SqlParameter AddInstanceIDsParameter( this SqlParameterCollection commandParameters, string paramName, diff --git a/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs b/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs index 3561c75..d8052c1 100644 --- a/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs +++ b/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs @@ -5,16 +5,17 @@ namespace DurableTask.SqlServer.Tests.Unit { using System; using System.Collections.Generic; - using System.Data.SqlTypes; + using System.Data; using DurableTask.Core; using DurableTask.Core.History; + using Microsoft.Data.SqlClient; using Newtonsoft.Json; using Xunit; public class SqlUtilsTagTests { [Fact] - public void GetTags_ExecutionStartedEvent_WithTags_ReturnsSerialized() + public void AddTagsParameter_WithTags_SetsJsonValue() { // Arrange var tags = new Dictionary @@ -23,70 +24,49 @@ public void GetTags_ExecutionStartedEvent_WithTags_ReturnsSerialized() { "key2", "value2" }, }; - var startedEvent = new ExecutionStartedEvent(-1, "input") - { - Tags = tags, - }; + using var command = new SqlCommand(); // Act - SqlString result = SqlUtils.GetTags(startedEvent); + command.Parameters.AddTagsParameter(tags); // Assert - Assert.False(result.IsNull); - var deserialized = JsonConvert.DeserializeObject>(result.Value); + SqlParameter param = command.Parameters["@Tags"]; + Assert.NotNull(param); + Assert.Equal(SqlDbType.VarChar, param.SqlDbType); + + string json = (string)param.Value; + var deserialized = JsonConvert.DeserializeObject>(json); Assert.Equal(2, deserialized.Count); Assert.Equal("value1", deserialized["key1"]); Assert.Equal("value2", deserialized["key2"]); } [Fact] - public void GetTags_ExecutionStartedEvent_NullTags_ReturnsNull() - { - // Arrange - var startedEvent = new ExecutionStartedEvent(-1, "input") - { - Tags = null, - }; - - // Act - SqlString result = SqlUtils.GetTags(startedEvent); - - // Assert - Assert.True(result.IsNull); - } - - [Fact] - public void GetTags_ExecutionStartedEvent_EmptyTags_ReturnsNull() + public void AddTagsParameter_NullTags_SetsDBNull() { - // Arrange - var startedEvent = new ExecutionStartedEvent(-1, "input") - { - Tags = new Dictionary(), - }; + using var command = new SqlCommand(); // Act - SqlString result = SqlUtils.GetTags(startedEvent); + command.Parameters.AddTagsParameter(null); // Assert - Assert.True(result.IsNull); + Assert.Equal(DBNull.Value, command.Parameters["@Tags"].Value); } [Fact] - public void GetTags_NonStartedEvent_ReturnsNull() + public void AddTagsParameter_EmptyTags_SetsDBNull() { - // Arrange - use a non-ExecutionStartedEvent - var completedEvent = new ExecutionCompletedEvent( - -1, "result", OrchestrationStatus.Completed); + using var command = new SqlCommand(); // Act - SqlString result = SqlUtils.GetTags(completedEvent); + command.Parameters.AddTagsParameter(new Dictionary()); // Assert - Assert.True(result.IsNull); + Assert.Equal(DBNull.Value, command.Parameters["@Tags"].Value); } [Fact] - public void GetTags_SpecialCharacters_RoundTrips() + public void AddTagsParameter_SpecialCharacters_RoundTrips() { // Arrange var tags = new Dictionary @@ -96,125 +76,18 @@ public void GetTags_SpecialCharacters_RoundTrips() { "key with spaces", "value with spaces" }, }; - var startedEvent = new ExecutionStartedEvent(-1, "input") - { - Tags = tags, - }; + using var command = new SqlCommand(); // Act - SqlString result = SqlUtils.GetTags(startedEvent); + command.Parameters.AddTagsParameter(tags); // Assert - Assert.False(result.IsNull); - var deserialized = JsonConvert.DeserializeObject>(result.Value); + string json = (string)command.Parameters["@Tags"].Value; + var deserialized = JsonConvert.DeserializeObject>(json); Assert.Equal(3, deserialized.Count); Assert.Equal("value'with\"quotes", deserialized["special\"key"]); Assert.Equal("中文", deserialized["unicode-日本語"]); Assert.Equal("value with spaces", deserialized["key with spaces"]); } - - [Fact] - public void GetTagsFromContext_WithTags_ReturnsSerialized() - { - // Arrange - var tags = new Dictionary - { - { "tenantId", "tenant-123" }, - }; - - string contextJson = JsonConvert.SerializeObject(new { OrchestrationTags = tags }); - var context = JsonConvert.DeserializeObject(contextJson); - - var message = new TaskMessage - { - Event = new TaskScheduledEvent(-1), - OrchestrationInstance = new OrchestrationInstance - { - InstanceId = "test-instance", - ExecutionId = "test-execution", - }, - OrchestrationExecutionContext = context, - }; - - // Act - SqlString result = SqlUtils.GetTagsFromContext(message); - - // Assert - Assert.False(result.IsNull); - var deserialized = JsonConvert.DeserializeObject>(result.Value); - Assert.Single(deserialized); - Assert.Equal("tenant-123", deserialized["tenantId"]); - } - - [Fact] - public void GetTagsFromContext_NullContext_ReturnsNull() - { - // Arrange - var message = new TaskMessage - { - Event = new TaskScheduledEvent(-1), - OrchestrationInstance = new OrchestrationInstance - { - InstanceId = "test-instance", - ExecutionId = "test-execution", - }, - OrchestrationExecutionContext = null, - }; - - // Act - SqlString result = SqlUtils.GetTagsFromContext(message); - - // Assert - Assert.True(result.IsNull); - } - - [Fact] - public void GetTagsFromContext_EmptyTags_ReturnsNull() - { - // Arrange - string contextJson = JsonConvert.SerializeObject(new { OrchestrationTags = new Dictionary() }); - var context = JsonConvert.DeserializeObject(contextJson); - - var message = new TaskMessage - { - Event = new TaskScheduledEvent(-1), - OrchestrationInstance = new OrchestrationInstance - { - InstanceId = "test-instance", - ExecutionId = "test-execution", - }, - OrchestrationExecutionContext = context, - }; - - // Act - SqlString result = SqlUtils.GetTagsFromContext(message); - - // Assert - Assert.True(result.IsNull); - } - - [Fact] - public void GetTagsFromContext_ContextWithNullTags_ReturnsNull() - { - // Arrange - default OrchestrationExecutionContext has null tags - var context = new OrchestrationExecutionContext(); - - var message = new TaskMessage - { - Event = new TaskScheduledEvent(-1), - OrchestrationInstance = new OrchestrationInstance - { - InstanceId = "test-instance", - ExecutionId = "test-execution", - }, - OrchestrationExecutionContext = context, - }; - - // Act - SqlString result = SqlUtils.GetTagsFromContext(message); - - // Assert - Assert.True(result.IsNull); - } } } From c880963374074f756fec9d7642504260f0bb28b9 Mon Sep 17 00:00:00 2001 From: Yi Date: Thu, 12 Feb 2026 18:42:55 -0800 Subject: [PATCH 03/18] revert unnecessary changes --- src/DurableTask.SqlServer/Scripts/logic.sql | 39 ++++++++++----------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index 2e89812..e267e24 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -194,8 +194,7 @@ AS P.[TaskHub] = __SchemaNamePlaceholder__.CurrentTaskHub() AND P.[InstanceID] = I.[InstanceID] AND P.[PayloadID] = I.[OutputPayloadID]) AS [OutputText], - I.[ParentInstanceID], - I.[Tags] + I.[ParentInstanceID] FROM Instances I WHERE I.[TaskHub] = __SchemaNamePlaceholder__.CurrentTaskHub() @@ -352,12 +351,10 @@ BEGIN DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() DECLARE @ParentInstanceID varchar(100) DECLARE @Version varchar(100) - DECLARE @Tags varchar(MAX) SELECT @ParentInstanceID = [ParentInstanceID], - @Version = [Version], - @Tags = [Tags] + @Version = [Version] FROM Instances WHERE [InstanceID] = @InstanceID SELECT @@ -376,8 +373,7 @@ BEGIN [PayloadID], @ParentInstanceID as [ParentInstanceID], @Version as [Version], - H.[TraceContext], - @Tags as [Tags] + H.[TraceContext] FROM History H WITH (INDEX (PK_History)) LEFT OUTER JOIN Payloads P ON P.[TaskHub] = @TaskHub AND @@ -1297,26 +1293,27 @@ BEGIN ([VisibleTime] IS NULL OR [VisibleTime] < @now) SELECT TOP (1) - N.[SequenceNumber], - N.[InstanceID], - N.[ExecutionID], - N.[Name], + [SequenceNumber], + [InstanceID], + [ExecutionID], + [Name], 'TaskScheduled' AS [EventType], - N.[TaskID], - N.[VisibleTime], - N.[Timestamp], - N.[DequeueCount], - N.[Version], + [TaskID], + [VisibleTime], + [Timestamp], + [DequeueCount], + [Version], (SELECT TOP 1 [Text] FROM Payloads P WHERE P.[TaskHub] = @TaskHub AND P.[InstanceID] = N.[InstanceID] AND P.[PayloadID] = N.[PayloadID]) AS [PayloadText], - DATEDIFF(SECOND, N.[Timestamp], @now) AS [WaitTime], - N.[TraceContext], - I.[Tags] + DATEDIFF(SECOND, [Timestamp], @now) AS [WaitTime], + [TraceContext], + (SELECT TOP 1 I.[Tags] FROM Instances I WHERE + I.[TaskHub] = @TaskHub AND + I.[InstanceID] = N.[InstanceID]) AS [Tags] FROM NewTasks N - INNER JOIN Instances I ON I.[TaskHub] = @TaskHub AND I.[InstanceID] = N.[InstanceID] - WHERE N.[TaskHub] = @TaskHub AND N.[SequenceNumber] = @SequenceNumber + WHERE [TaskHub] = @TaskHub AND [SequenceNumber] = @SequenceNumber COMMIT TRANSACTION END From 4cb18fa02082215421e5416fd6e97ae067c978ac Mon Sep 17 00:00:00 2001 From: Yi Date: Tue, 17 Feb 2026 17:27:00 -0800 Subject: [PATCH 04/18] improvement, remove unnecessary tag update --- src/DurableTask.SqlServer/Scripts/logic.sql | 8 ++------ .../SqlOrchestrationService.cs | 13 ------------- ...urableTask.SqlServer.AzureFunctions.Tests.csproj | 2 +- 3 files changed, 3 insertions(+), 20 deletions(-) diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index e267e24..4f60767 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -868,8 +868,7 @@ BEGIN [LockExpiration] = NULL, -- release the lock [CustomStatusPayloadID] = @CustomStatusPayloadID, [InputPayloadID] = @InputPayloadID, - [OutputPayloadID] = @OutputPayloadID, - [Tags] = @Tags + [OutputPayloadID] = @OutputPayloadID FROM Instances WHERE [TaskHub] = @TaskHub and [InstanceID] = @InstanceID @@ -1308,10 +1307,7 @@ BEGIN P.[InstanceID] = N.[InstanceID] AND P.[PayloadID] = N.[PayloadID]) AS [PayloadText], DATEDIFF(SECOND, [Timestamp], @now) AS [WaitTime], - [TraceContext], - (SELECT TOP 1 I.[Tags] FROM Instances I WHERE - I.[TaskHub] = @TaskHub AND - I.[InstanceID] = N.[InstanceID]) AS [Tags] + [TraceContext] FROM NewTasks N WHERE [TaskHub] = @TaskHub AND [SequenceNumber] = @SequenceNumber diff --git a/src/DurableTask.SqlServer/SqlOrchestrationService.cs b/src/DurableTask.SqlServer/SqlOrchestrationService.cs index 4a97aa9..62fd223 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationService.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationService.cs @@ -448,19 +448,6 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync( TaskMessage message = reader.GetTaskMessage(); int dequeueCount = reader.GetInt32("DequeueCount"); - // Reconstruct OrchestrationExecutionContext from the Tags column - // so that activity middleware can access orchestration tags. - IDictionary? tags = SqlUtils.GetTags(reader); - if (tags != null) - { - // OrchestrationExecutionContext.OrchestrationTags has an internal setter, - // so we construct it via JSON deserialization. - string contextJson = DTUtils.SerializeToJson( - new { OrchestrationTags = tags }); - message.OrchestrationExecutionContext = - DTUtils.DeserializeFromJson(contextJson); - } - // TODO: poison message handling for high dequeue counts return new TaskActivityWorkItem diff --git a/test/DurableTask.SqlServer.AzureFunctions.Tests/DurableTask.SqlServer.AzureFunctions.Tests.csproj b/test/DurableTask.SqlServer.AzureFunctions.Tests/DurableTask.SqlServer.AzureFunctions.Tests.csproj index aa17493..e5e00b3 100644 --- a/test/DurableTask.SqlServer.AzureFunctions.Tests/DurableTask.SqlServer.AzureFunctions.Tests.csproj +++ b/test/DurableTask.SqlServer.AzureFunctions.Tests/DurableTask.SqlServer.AzureFunctions.Tests.csproj @@ -1,7 +1,7 @@  - net6.0 + net8.0 false true 9.0 From 9a9fc4ad82ca03d67f0cc155b812d52d4945f316 Mon Sep 17 00:00:00 2001 From: Yi Date: Tue, 24 Feb 2026 14:41:13 -0800 Subject: [PATCH 05/18] remove unnecessary sql hange test cleanup rollback .net version change for tets --- src/DurableTask.SqlServer/Scripts/logic.sql | 2 +- .../DurableTask.SqlServer.Tests.csproj | 2 +- .../Integration/Orchestrations.cs | 190 +++++++++- .../Integration/TagTests.cs | 339 ------------------ 4 files changed, 186 insertions(+), 347 deletions(-) delete mode 100644 test/DurableTask.SqlServer.Tests/Integration/TagTests.cs diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index 4f60767..545b167 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -709,7 +709,7 @@ BEGIN END -- Result #2: Basic information about this instance, including its runtime status - SELECT @instanceID AS [InstanceID], @runtimeStatus AS [RuntimeStatus], @tags AS [Tags] + SELECT @instanceID AS [InstanceID], @runtimeStatus AS [RuntimeStatus] -- Result #3: The full event history for the locked instance -- NOTE: This must be kept consistent with the dt.HistoryEvents custom data type diff --git a/test/DurableTask.SqlServer.Tests/DurableTask.SqlServer.Tests.csproj b/test/DurableTask.SqlServer.Tests/DurableTask.SqlServer.Tests.csproj index 77a36d4..a021dd6 100644 --- a/test/DurableTask.SqlServer.Tests/DurableTask.SqlServer.Tests.csproj +++ b/test/DurableTask.SqlServer.Tests/DurableTask.SqlServer.Tests.csproj @@ -1,7 +1,7 @@  - net8.0 + net6.0 false true 9.0 diff --git a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs index fa7a9c7..00fb079 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs @@ -50,9 +50,12 @@ public async Task EmptyOrchestration() orchestrationName, implementation: (ctx, input) => Task.FromResult(input)); - await instance.WaitForCompletion( + OrchestrationState state = await instance.WaitForCompletion( expectedOutput: input); + // Verify backward compatibility: tags should be null when none are specified + Assert.Null(state.Tags); + // Validate logs LogAssert.NoWarningsOrErrors(this.testService.LogProvider); LogAssert.Sequence( @@ -877,14 +880,14 @@ public async Task TerminateScheduledOrchestration() instanceId: null, scheduledStartTime: DateTime.UtcNow.AddSeconds(30), implementation: (ctx, input) => Task.FromResult("done")); - - // Confirm that the orchestration is pending - OrchestrationState state = await instance.GetStateAsync(); - Assert.Equal(OrchestrationStatus.Pending, state.OrchestrationStatus); + + // Confirm that the orchestration is pending + OrchestrationState state = await instance.GetStateAsync(); + Assert.Equal(OrchestrationStatus.Pending, state.OrchestrationStatus); // Terminate the orchestration before it starts await instance.TerminateAsync("Bye!"); - + // Confirm the orchestration was terminated await instance.WaitForCompletion( expectedStatus: OrchestrationStatus.Terminated, @@ -943,5 +946,180 @@ public async Task TerminateSuspendedOrchestration() LogAssert.CheckpointStarting(orchestrationName), LogAssert.CheckpointCompleted(orchestrationName)); } + + [Fact] + public async Task OrchestrationWithTags() + { + string input = $"Hello {DateTime.UtcNow:o}"; + var tags = new Dictionary + { + { "key1", "value1" }, + { "key2", "value2" }, + }; + + TestInstance instance = await this.testService.RunOrchestrationWithTags( + input, + orchestrationName: "OrchestrationWithTags", + tags: tags, + implementation: (ctx, input) => Task.FromResult(input)); + + OrchestrationState state = await instance.WaitForCompletion(expectedOutput: input); + + Assert.NotNull(state.Tags); + Assert.Equal(2, state.Tags.Count); + Assert.Equal("value1", state.Tags["key1"]); + Assert.Equal("value2", state.Tags["key2"]); + + LogAssert.NoWarningsOrErrors(this.testService.LogProvider); + } + + [Fact] + public async Task OrchestrationWithEmptyTags() + { + string input = $"Hello {DateTime.UtcNow:o}"; + + TestInstance instance = await this.testService.RunOrchestrationWithTags( + input, + orchestrationName: "OrchestrationWithEmptyTags", + tags: new Dictionary(), + implementation: (ctx, input) => Task.FromResult(input)); + + OrchestrationState state = await instance.WaitForCompletion(expectedOutput: input); + + Assert.Null(state.Tags); + + LogAssert.NoWarningsOrErrors(this.testService.LogProvider); + } + + [Fact] + public async Task TagsSurviveContinueAsNew() + { + var tags = new Dictionary + { + { "key1", "value1" }, + { "key2", "value2" }, + }; + + TestInstance instance = await this.testService.RunOrchestrationWithTags( + input: 0, + orchestrationName: "TagsContinueAsNewTest", + tags: tags, + implementation: async (ctx, input) => + { + if (input < 3) + { + await ctx.CreateTimer(DateTime.MinValue, null); + ctx.ContinueAsNew(input + 1); + } + + return input; + }); + + OrchestrationState state = await instance.WaitForCompletion( + expectedOutput: 3, + continuedAsNew: true); + + Assert.NotNull(state.Tags); + Assert.Equal(2, state.Tags.Count); + Assert.Equal("value1", state.Tags["key1"]); + Assert.Equal("value2", state.Tags["key2"]); + } + + [Fact] + public async Task SubOrchestrationInheritsTags() + { + var tags = new Dictionary + { + { "key1", "value1" }, + { "key2", "value2" }, + }; + + string subOrchestrationName = "SubOrchestrationForTagTest"; + + this.testService.RegisterInlineOrchestration( + subOrchestrationName, + implementation: (ctx, input) => Task.FromResult("done")); + + TestInstance instance = await this.testService.RunOrchestrationWithTags( + input: (string)null, + orchestrationName: "ParentOrchestrationForTagTest", + tags: tags, + implementation: async (ctx, input) => + { + return await ctx.CreateSubOrchestrationInstance( + subOrchestrationName, string.Empty, null); + }); + + OrchestrationState state = await instance.WaitForCompletion( + timeout: TimeSpan.FromSeconds(15), + expectedOutput: "done"); + + Assert.NotNull(state.Tags); + Assert.Equal("value1", state.Tags["key1"]); + Assert.Equal("value2", state.Tags["key2"]); + } + + [Fact] + public async Task TagsWithSpecialCharacters() + { + string input = $"Hello {DateTime.UtcNow:o}"; + var tags = new Dictionary + { + { "key with spaces", "value with spaces" }, + { "unicode-key-日本語", "unicode-value-中文" }, + { "special\"chars", "value'with\"quotes" }, + }; + + TestInstance instance = await this.testService.RunOrchestrationWithTags( + input, + orchestrationName: "TagsSpecialCharsTest", + tags: tags, + implementation: (ctx, input) => Task.FromResult(input)); + + OrchestrationState state = await instance.WaitForCompletion(expectedOutput: input); + + Assert.NotNull(state.Tags); + Assert.Equal(3, state.Tags.Count); + Assert.Equal("value with spaces", state.Tags["key with spaces"]); + Assert.Equal("unicode-value-中文", state.Tags["unicode-key-日本語"]); + Assert.Equal("value'with\"quotes", state.Tags["special\"chars"]); + } + + [Fact] + public async Task TagsOnManyOrchestrations() + { + string input = $"Hello {DateTime.UtcNow:o}"; + var tags = new Dictionary + { + { "key1", "value1" }, + }; + + TestInstance instance = await this.testService.RunOrchestrationWithTags( + input, + orchestrationName: "TagsManyQueryTest", + tags: tags, + implementation: (ctx, input) => Task.FromResult(input)); + + await instance.WaitForCompletion(expectedOutput: input); + + var filter = new SqlOrchestrationQuery(); + IReadOnlyCollection results = + await this.testService.OrchestrationServiceMock.Object.GetManyOrchestrationsAsync( + filter, CancellationToken.None); + + Assert.NotEmpty(results); + bool foundTaggedInstance = false; + foreach (OrchestrationState result in results) + { + if (result.OrchestrationInstance.InstanceId == instance.InstanceId) + { + Assert.NotNull(result.Tags); + Assert.Equal("value1", result.Tags["key1"]); + foundTaggedInstance = true; + } + } + + Assert.True(foundTaggedInstance, "Did not find the tagged orchestration instance in query results."); + } } } diff --git a/test/DurableTask.SqlServer.Tests/Integration/TagTests.cs b/test/DurableTask.SqlServer.Tests/Integration/TagTests.cs deleted file mode 100644 index ec10a6a..0000000 --- a/test/DurableTask.SqlServer.Tests/Integration/TagTests.cs +++ /dev/null @@ -1,339 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -namespace DurableTask.SqlServer.Tests.Integration -{ - using System; - using System.Collections.Generic; - using System.Threading; - using System.Threading.Tasks; - using DurableTask.Core; - using DurableTask.SqlServer.Tests.Logging; - using DurableTask.SqlServer.Tests.Utils; - using Newtonsoft.Json; - using Newtonsoft.Json.Linq; - using Xunit; - using Xunit.Abstractions; - - [Collection("Integration")] - public class TagTests : IAsyncLifetime - { - readonly TestService testService; - - public TagTests(ITestOutputHelper output) - { - this.testService = new TestService(output); - } - - Task IAsyncLifetime.InitializeAsync() => this.testService.InitializeAsync(); - - Task IAsyncLifetime.DisposeAsync() => this.testService.DisposeAsync(); - - [Fact] - public async Task OrchestrationWithTags() - { - // Arrange: create tags to associate with the orchestration - var tags = new Dictionary - { - { "environment", "test" }, - { "owner", "integration-test" }, - }; - - string input = $"Hello {DateTime.UtcNow:o}"; - - // Act: run an orchestration with tags - TestInstance instance = await this.testService.RunOrchestrationWithTags( - input, - orchestrationName: "OrchestrationWithTags", - tags: tags, - implementation: (ctx, input) => Task.FromResult(input)); - - OrchestrationState state = await instance.WaitForCompletion(expectedOutput: input); - - // Assert: verify that tags are preserved on the OrchestrationState - Assert.NotNull(state.Tags); - Assert.Equal(2, state.Tags.Count); - Assert.Equal("test", state.Tags["environment"]); - Assert.Equal("integration-test", state.Tags["owner"]); - - // Validate logs - LogAssert.NoWarningsOrErrors(this.testService.LogProvider); - } - - [Fact] - public async Task OrchestrationWithoutTags() - { - string input = $"Hello {DateTime.UtcNow:o}"; - - // Act: run an orchestration without tags (backward compatibility) - TestInstance instance = await this.testService.RunOrchestration( - input, - orchestrationName: "OrchestrationWithoutTags", - implementation: (ctx, input) => Task.FromResult(input)); - - OrchestrationState state = await instance.WaitForCompletion(expectedOutput: input); - - // Assert: tags should be null when none are specified - Assert.Null(state.Tags); - - // Validate logs - LogAssert.NoWarningsOrErrors(this.testService.LogProvider); - } - - [Fact] - public async Task OrchestrationWithEmptyTags() - { - var tags = new Dictionary(); - - string input = $"Hello {DateTime.UtcNow:o}"; - - // Act: run an orchestration with an empty tags dictionary - TestInstance instance = await this.testService.RunOrchestrationWithTags( - input, - orchestrationName: "OrchestrationWithEmptyTags", - tags: tags, - implementation: (ctx, input) => Task.FromResult(input)); - - OrchestrationState state = await instance.WaitForCompletion(expectedOutput: input); - - // Assert: empty tags should result in null (not persisted) - Assert.Null(state.Tags); - - // Validate logs - LogAssert.NoWarningsOrErrors(this.testService.LogProvider); - } - - [Fact] - public async Task TagsSurviveContinueAsNew() - { - // Arrange - var tags = new Dictionary - { - { "workflow", "retry-loop" }, - { "version", "v2" }, - }; - - // Act: run an orchestration that uses ContinueAsNew multiple times - TestInstance instance = await this.testService.RunOrchestrationWithTags( - input: 0, - orchestrationName: "TagsContinueAsNewTest", - tags: tags, - implementation: async (ctx, input) => - { - if (input < 3) - { - await ctx.CreateTimer(DateTime.MinValue, null); - ctx.ContinueAsNew(input + 1); - } - - return input; - }); - - OrchestrationState state = await instance.WaitForCompletion( - expectedOutput: 3, - continuedAsNew: true); - - // Assert: tags should survive multiple ContinueAsNew cycles - Assert.NotNull(state.Tags); - Assert.Equal(2, state.Tags.Count); - Assert.Equal("retry-loop", state.Tags["workflow"]); - Assert.Equal("v2", state.Tags["version"]); - } - - [Fact] - public async Task TagsPreservedAfterActivityExecution() - { - // Arrange - var tags = new Dictionary - { - { "project", "my-project" }, - }; - - string input = "World"; - - // Act: run an orchestration with tags that calls an activity - TestInstance instance = await this.testService.RunOrchestrationWithTags( - input, - orchestrationName: "TagsWithActivityTest", - tags: tags, - implementation: (ctx, input) => ctx.ScheduleTask("SayHello", "", input), - activities: ("SayHello", TestService.MakeActivity((TaskContext ctx, string input) => $"Hello, {input}!"))); - - OrchestrationState state = await instance.WaitForCompletion( - expectedOutput: $"Hello, {input}!"); - - // Assert: tags should still be present on the state after activity execution - Assert.NotNull(state.Tags); - Assert.Single(state.Tags); - Assert.Equal("my-project", state.Tags["project"]); - } - - [Fact] - public async Task TagsVisibleViaQuery() - { - // Arrange - var tags = new Dictionary - { - { "queryTest", "yes" }, - }; - - // Act: run an orchestration with tags and query it - TestInstance instance = await this.testService.RunOrchestrationWithTags( - input: "test", - orchestrationName: "TagsQueryTest", - tags: tags, - implementation: (ctx, input) => Task.FromResult(input)); - - await instance.WaitForCompletion(expectedOutput: "test"); - - // Query the state separately - OrchestrationState queriedState = await instance.GetStateAsync(); - - // Assert: tags should be visible on queried state - Assert.NotNull(queriedState); - Assert.NotNull(queriedState.Tags); - Assert.Equal("yes", queriedState.Tags["queryTest"]); - } - - [Fact] - public async Task SubOrchestrationInheritsTags() - { - // Arrange: parent orchestration has tags - var parentTags = new Dictionary - { - { "team", "platform" }, - { "priority", "high" }, - }; - - string subOrchestrationName = "ChildOrchestrationForTagTest"; - - // Register the sub-orchestration that returns a simple value - this.testService.RegisterInlineOrchestration( - subOrchestrationName, - implementation: (ctx, input) => Task.FromResult("child-done")); - - // Act: parent orchestration calls a sub-orchestration - TestInstance parentInstance = await this.testService.RunOrchestrationWithTags( - input: "parent", - orchestrationName: "ParentOrchestrationForTagTest", - tags: parentTags, - implementation: async (ctx, input) => - { - string childResult = await ctx.CreateSubOrchestrationInstance( - subOrchestrationName, string.Empty, null); - return childResult; - }); - - OrchestrationState parentState = await parentInstance.WaitForCompletion( - timeout: TimeSpan.FromSeconds(15), - expectedOutput: "child-done"); - - // Assert: parent tags should still be present - Assert.NotNull(parentState.Tags); - Assert.Equal("platform", parentState.Tags["team"]); - Assert.Equal("high", parentState.Tags["priority"]); - } - - [Fact] - public async Task TagsWithSpecialCharacters() - { - // Arrange: tags with special characters that could cause JSON/SQL issues - var tags = new Dictionary - { - { "key with spaces", "value with spaces" }, - { "unicode-key-日本語", "unicode-value-中文" }, - { "special\"chars", "value'with\"quotes" }, - }; - - // Act - TestInstance instance = await this.testService.RunOrchestrationWithTags( - input: "special", - orchestrationName: "TagsSpecialCharsTest", - tags: tags, - implementation: (ctx, input) => Task.FromResult(input)); - - OrchestrationState state = await instance.WaitForCompletion(expectedOutput: "special"); - - // Assert: all special character tags should be preserved - Assert.NotNull(state.Tags); - Assert.Equal(3, state.Tags.Count); - Assert.Equal("value with spaces", state.Tags["key with spaces"]); - Assert.Equal("unicode-value-中文", state.Tags["unicode-key-日本語"]); - Assert.Equal("value'with\"quotes", state.Tags["special\"chars"]); - } - - [Fact] - public async Task TagsAvailableInActivityMiddleware() - { - // Arrange: tags that should be accessible in the activity middleware context - var tags = new Dictionary - { - { "tenantId", "tenant-123" }, - { "region", "us-west" }, - }; - - // Act: run an orchestration with tags that calls an activity. - // The activity captures the OrchestrationExecutionContext tags from the TaskActivityWorkItem. - TestInstance instance = await this.testService.RunOrchestrationWithTags( - input: "middleware-test", - orchestrationName: "TagsActivityMiddlewareTest", - tags: tags, - implementation: (ctx, input) => ctx.ScheduleTask("CaptureActivity", "", input), - activities: ("CaptureActivity", TestService.MakeActivity((TaskContext ctx, string input) => - { - // Note: The actual OrchestrationExecutionContext is on the TaskMessage, - // not directly accessible from TaskContext in the activity code. - // We verify it indirectly by checking that the orchestration state still has tags. - return $"processed-{input}"; - }))); - - OrchestrationState state = await instance.WaitForCompletion( - expectedOutput: "processed-middleware-test"); - - // Assert: verify the orchestration completed successfully and tags are preserved - Assert.NotNull(state.Tags); - Assert.Equal("tenant-123", state.Tags["tenantId"]); - Assert.Equal("us-west", state.Tags["region"]); - } - - [Fact] - public async Task TagsOnManyOrchestrations() - { - // Arrange: tags for querying multiple orchestrations - var tags = new Dictionary - { - { "batch", "test-batch-1" }, - }; - - // Act: run an orchestration with tags - TestInstance instance = await this.testService.RunOrchestrationWithTags( - input: "query-test", - orchestrationName: "TagsManyQueryTest", - tags: tags, - implementation: (ctx, input) => Task.FromResult(input)); - - await instance.WaitForCompletion(expectedOutput: "query-test"); - - // Query using GetManyOrchestrations (which uses _QueryManyOrchestrations) - var filter = new SqlOrchestrationQuery(); - IReadOnlyCollection results = - await this.testService.OrchestrationServiceMock.Object.GetManyOrchestrationsAsync( - filter, CancellationToken.None); - - // Assert: at least one result should have our tags - Assert.NotEmpty(results); - bool foundTaggedInstance = false; - foreach (OrchestrationState result in results) - { - if (result.OrchestrationInstance.InstanceId == instance.InstanceId) - { - Assert.NotNull(result.Tags); - Assert.Equal("test-batch-1", result.Tags["batch"]); - foundTaggedInstance = true; - } - } - - Assert.True(foundTaggedInstance, "Did not find the tagged orchestration instance in query results."); - } - } -} From f0e4e7c8ccadc94953817a06bf312e0c92ad70a2 Mon Sep 17 00:00:00 2001 From: robin Date: Mon, 2 Mar 2026 11:27:30 -0800 Subject: [PATCH 06/18] Update test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs address comments Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs b/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs index d8052c1..387f48e 100644 --- a/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs +++ b/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs @@ -6,8 +6,6 @@ namespace DurableTask.SqlServer.Tests.Unit using System; using System.Collections.Generic; using System.Data; - using DurableTask.Core; - using DurableTask.Core.History; using Microsoft.Data.SqlClient; using Newtonsoft.Json; using Xunit; From a7d86d1ff262acd64d836448e25c05b4c9fa963d Mon Sep 17 00:00:00 2001 From: robin Date: Mon, 2 Mar 2026 11:28:13 -0800 Subject: [PATCH 07/18] Update test/DurableTask.SqlServer.AzureFunctions.Tests/DurableTask.SqlServer.AzureFunctions.Tests.csproj address comments Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../DurableTask.SqlServer.AzureFunctions.Tests.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/DurableTask.SqlServer.AzureFunctions.Tests/DurableTask.SqlServer.AzureFunctions.Tests.csproj b/test/DurableTask.SqlServer.AzureFunctions.Tests/DurableTask.SqlServer.AzureFunctions.Tests.csproj index e5e00b3..aa17493 100644 --- a/test/DurableTask.SqlServer.AzureFunctions.Tests/DurableTask.SqlServer.AzureFunctions.Tests.csproj +++ b/test/DurableTask.SqlServer.AzureFunctions.Tests/DurableTask.SqlServer.AzureFunctions.Tests.csproj @@ -1,7 +1,7 @@  - net8.0 + net6.0 false true 9.0 From c3dc9b4f03e53e96e7e07d94ce3e1c0cc1503681 Mon Sep 17 00:00:00 2001 From: robin Date: Mon, 2 Mar 2026 11:28:43 -0800 Subject: [PATCH 08/18] Update src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql address copilot comment. Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql b/src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql index b960715..512f993 100644 --- a/src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql +++ b/src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql @@ -11,4 +11,4 @@ -- Tags are read directly from the Instances table in all stored procedures, -- so no changes to the OrchestrationEvents/TaskEvents types or NewTasks table are needed. IF NOT EXISTS (SELECT 1 FROM sys.columns WHERE object_id = OBJECT_ID('__SchemaNamePlaceholder__.Instances') AND name = 'Tags') - ALTER TABLE __SchemaNamePlaceholder__.Instances ADD [Tags] varchar(MAX) NULL + ALTER TABLE __SchemaNamePlaceholder__.Instances ADD [Tags] varchar(8000) NULL From e122326dd50ad9a8d6721e24d5af8610609455b7 Mon Sep 17 00:00:00 2001 From: robin Date: Mon, 2 Mar 2026 11:31:46 -0800 Subject: [PATCH 09/18] Update src/DurableTask.SqlServer/SqlUtils.cs address comment Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/DurableTask.SqlServer/SqlUtils.cs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/DurableTask.SqlServer/SqlUtils.cs b/src/DurableTask.SqlServer/SqlUtils.cs index 9123948..50dcf72 100644 --- a/src/DurableTask.SqlServer/SqlUtils.cs +++ b/src/DurableTask.SqlServer/SqlUtils.cs @@ -508,7 +508,15 @@ internal static SqlString GetTraceContext(HistoryEvent e) return null; } - return DTUtils.DeserializeFromJson>(json); + try + { + return DTUtils.DeserializeFromJson>(json); + } + catch (Exception ex) + { + Debug.WriteLine($"Failed to deserialize Tags JSON payload. Treating as null. Error: {ex}"); + return null; + } } internal static void AddTagsParameter( From 4af318fa0a0a8d9d39cfccce50cd068aa0130380 Mon Sep 17 00:00:00 2001 From: Yi Date: Mon, 2 Mar 2026 11:47:31 -0800 Subject: [PATCH 10/18] address comments from copilot --- .../Integration/Orchestrations.cs | 11 ++++++++++- test/DurableTask.SqlServer.Tests/Utils/TestService.cs | 5 +++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs index 00fb079..f940573 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs @@ -1035,6 +1035,7 @@ public async Task SubOrchestrationInheritsTags() }; string subOrchestrationName = "SubOrchestrationForTagTest"; + string subInstanceId = $"sub-{Guid.NewGuid():N}"; this.testService.RegisterInlineOrchestration( subOrchestrationName, @@ -1047,16 +1048,24 @@ public async Task SubOrchestrationInheritsTags() implementation: async (ctx, input) => { return await ctx.CreateSubOrchestrationInstance( - subOrchestrationName, string.Empty, null); + subOrchestrationName, string.Empty, subInstanceId, null); }); OrchestrationState state = await instance.WaitForCompletion( timeout: TimeSpan.FromSeconds(15), expectedOutput: "done"); + // Verify parent orchestration tags Assert.NotNull(state.Tags); Assert.Equal("value1", state.Tags["key1"]); Assert.Equal("value2", state.Tags["key2"]); + + // Verify sub-orchestration inherited the tags + OrchestrationState subState = await this.testService.GetOrchestrationStateAsync(subInstanceId); + Assert.NotNull(subState); + Assert.NotNull(subState.Tags); + Assert.Equal("value1", subState.Tags["key1"]); + Assert.Equal("value2", subState.Tags["key2"]); } [Fact] diff --git a/test/DurableTask.SqlServer.Tests/Utils/TestService.cs b/test/DurableTask.SqlServer.Tests/Utils/TestService.cs index c7644dc..b74907b 100644 --- a/test/DurableTask.SqlServer.Tests/Utils/TestService.cs +++ b/test/DurableTask.SqlServer.Tests/Utils/TestService.cs @@ -87,6 +87,11 @@ public async Task InitializeAsync(bool startWorker = true, bool legacyErrorPropa this.client = new TaskHubClient(this.OrchestrationServiceMock.Object, loggerFactory: this.loggerFactory); } + public Task GetOrchestrationStateAsync(string instanceId) + { + return this.client.GetOrchestrationStateAsync(new OrchestrationInstance { InstanceId = instanceId }); + } + public Task StartWorkerAsync() => this.worker?.StartAsync() ?? Task.CompletedTask; public Task PurgeAsync(DateTime maximumThreshold, OrchestrationStateTimeRangeFilterType filterType) From ef8b2bab9466472b136428a4e883ddea25a1bbf6 Mon Sep 17 00:00:00 2001 From: Yi Date: Mon, 2 Mar 2026 13:10:43 -0800 Subject: [PATCH 11/18] address comments --- src/DurableTask.SqlServer/Scripts/logic.sql | 6 +++--- src/DurableTask.SqlServer/SqlUtils.cs | 16 ++++++++-------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index 545b167..57df18d 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -234,7 +234,7 @@ CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.CreateInstance @StartTime datetime2 = NULL, @DedupeStatuses varchar(MAX) = 'Pending,Running', @TraceContext varchar(800) = NULL, - @Tags varchar(MAX) = NULL + @Tags varchar(8000) = NULL AS BEGIN DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() @@ -638,7 +638,7 @@ BEGIN DECLARE @parentInstanceID varchar(100) DECLARE @version varchar(100) DECLARE @runtimeStatus varchar(30) - DECLARE @tags varchar(MAX) + DECLARE @tags varchar(8000) DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() BEGIN TRANSACTION @@ -754,7 +754,7 @@ CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._CheckpointOrchestration @NewHistoryEvents HistoryEvents READONLY, @NewOrchestrationEvents OrchestrationEvents READONLY, @NewTaskEvents TaskEvents READONLY, - @Tags varchar(MAX) = NULL + @Tags varchar(8000) = NULL AS BEGIN BEGIN TRANSACTION diff --git a/src/DurableTask.SqlServer/SqlUtils.cs b/src/DurableTask.SqlServer/SqlUtils.cs index 50dcf72..02b33ea 100644 --- a/src/DurableTask.SqlServer/SqlUtils.cs +++ b/src/DurableTask.SqlServer/SqlUtils.cs @@ -75,17 +75,17 @@ public static HistoryEvent GetHistoryEvent(this DbDataReader reader, bool isOrch InstanceId = GetInstanceId(reader), }; break; - case EventType.ExecutionCompleted: - FailureDetails? executionFailedDetails = null; - OrchestrationStatus orchestrationStatus = GetRuntimeStatus(reader); - if (orchestrationStatus == OrchestrationStatus.Failed) - { - TryGetFailureDetails(reader, out executionFailedDetails); + case EventType.ExecutionCompleted: + FailureDetails? executionFailedDetails = null; + OrchestrationStatus orchestrationStatus = GetRuntimeStatus(reader); + if (orchestrationStatus == OrchestrationStatus.Failed) + { + TryGetFailureDetails(reader, out executionFailedDetails); } historyEvent = new ExecutionCompletedEvent( eventId, result: GetPayloadText(reader), - orchestrationStatus: orchestrationStatus, + orchestrationStatus: orchestrationStatus, failureDetails: executionFailedDetails); break; case EventType.ExecutionStarted: @@ -526,7 +526,7 @@ internal static void AddTagsParameter( string? json = tags != null && tags.Count > 0 ? DTUtils.SerializeToJson(tags) : null; - parameters.Add("@Tags", SqlDbType.VarChar).Value = (object?)json ?? DBNull.Value; + parameters.Add("@Tags", SqlDbType.VarChar, 8000).Value = (object?)json ?? DBNull.Value; } public static SqlParameter AddInstanceIDsParameter( From 89c09d61704dafcf25044ce997940c20b34bd262 Mon Sep 17 00:00:00 2001 From: Yi Date: Wed, 4 Mar 2026 19:18:52 -0800 Subject: [PATCH 12/18] address comments --- src/DurableTask.SqlServer/Scripts/logic.sql | 8 ++-- .../Scripts/schema-1.3.0.sql | 14 ++++++- .../SqlOrchestrationService.cs | 1 - .../SqlTypes/OrchestrationEventSqlType.cs | 3 ++ src/DurableTask.SqlServer/SqlUtils.cs | 38 +++++++++++++------ 5 files changed, 46 insertions(+), 18 deletions(-) diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index 57df18d..b47cb32 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -56,7 +56,8 @@ IF TYPE_ID(N'__SchemaNamePlaceholder__.OrchestrationEvents') IS NULL [PayloadID] uniqueidentifier NULL, [ParentInstanceID] varchar(100) NULL, [Version] varchar(100) NULL, - [TraceContext] varchar(800) NULL + [TraceContext] varchar(800) NULL, + [Tags] varchar(8000) NULL ) GO @@ -753,8 +754,7 @@ CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._CheckpointOrchestration @DeletedEvents MessageIDs READONLY, @NewHistoryEvents HistoryEvents READONLY, @NewOrchestrationEvents OrchestrationEvents READONLY, - @NewTaskEvents TaskEvents READONLY, - @Tags varchar(8000) = NULL + @NewTaskEvents TaskEvents READONLY AS BEGIN BEGIN TRANSACTION @@ -926,7 +926,7 @@ BEGIN E.[ParentInstanceID], 'Pending', E.[TraceContext], - @Tags + E.[Tags] FROM @NewOrchestrationEvents E WHERE E.[EventType] IN ('ExecutionStarted') AND NOT EXISTS ( diff --git a/src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql b/src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql index 512f993..d5596dc 100644 --- a/src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql +++ b/src/DurableTask.SqlServer/Scripts/schema-1.3.0.sql @@ -8,7 +8,17 @@ -- new schema-{major}.{minor}.{patch}.sql scripts. -- Add a new Tags column to the Instances table (JSON blob of string key-value pairs). --- Tags are read directly from the Instances table in all stored procedures, --- so no changes to the OrchestrationEvents/TaskEvents types or NewTasks table are needed. IF NOT EXISTS (SELECT 1 FROM sys.columns WHERE object_id = OBJECT_ID('__SchemaNamePlaceholder__.Instances') AND name = 'Tags') ALTER TABLE __SchemaNamePlaceholder__.Instances ADD [Tags] varchar(8000) NULL + +-- Add a Tags column to the OrchestrationEvents table type so that merged tags +-- flow through sub-orchestration creation events. To change a type we must first +-- drop all stored procedures that reference it, then drop the type itself. +-- The type and sprocs will be recreated by logic.sql which executes afterwards. +IF OBJECT_ID('__SchemaNamePlaceholder__._AddOrchestrationEvents') IS NOT NULL + DROP PROCEDURE __SchemaNamePlaceholder__._AddOrchestrationEvents +IF OBJECT_ID('__SchemaNamePlaceholder__._CheckpointOrchestration') IS NOT NULL + DROP PROCEDURE __SchemaNamePlaceholder__._CheckpointOrchestration + +IF TYPE_ID('__SchemaNamePlaceholder__.OrchestrationEvents') IS NOT NULL + DROP TYPE __SchemaNamePlaceholder__.OrchestrationEvents diff --git a/src/DurableTask.SqlServer/SqlOrchestrationService.cs b/src/DurableTask.SqlServer/SqlOrchestrationService.cs index 62fd223..ce4921d 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationService.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationService.cs @@ -388,7 +388,6 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync( currentWorkItem.EventPayloadMappings, this.settings.SchemaName); - command.Parameters.AddTagsParameter(newRuntimeState.Tags); try { diff --git a/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs b/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs index 9c3e98c..062010b 100644 --- a/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs +++ b/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs @@ -31,6 +31,7 @@ static class OrchestrationEventSqlType new SqlMetaData("ParentInstanceID", SqlDbType.VarChar, 100), new SqlMetaData("Version", SqlDbType.VarChar, 100), new SqlMetaData("TraceContext", SqlDbType.VarChar, 800), + new SqlMetaData("Tags", SqlDbType.VarChar, 8000), }; static class ColumnOrdinals @@ -50,6 +51,7 @@ static class ColumnOrdinals public const int ParentInstanceID = 11; public const int Version = 12; public const int TraceContext = 13; + public const int Tags = 14; } public static SqlParameter AddOrchestrationEventsParameter( @@ -152,6 +154,7 @@ static SqlDataRecord PopulateOrchestrationMessage(TaskMessage msg, SqlDataRecord record.SetSqlString(ColumnOrdinals.ParentInstanceID, SqlUtils.GetParentInstanceId(msg.Event)); record.SetSqlString(ColumnOrdinals.Version, SqlUtils.GetVersion(msg.Event)); record.SetSqlString(ColumnOrdinals.TraceContext, SqlUtils.GetTraceContext(msg.Event)); + record.SetSqlString(ColumnOrdinals.Tags, SqlUtils.GetTagsJson(msg.Event)); return record; } diff --git a/src/DurableTask.SqlServer/SqlUtils.cs b/src/DurableTask.SqlServer/SqlUtils.cs index 02b33ea..4bc463c 100644 --- a/src/DurableTask.SqlServer/SqlUtils.cs +++ b/src/DurableTask.SqlServer/SqlUtils.cs @@ -24,6 +24,7 @@ static class SqlUtils { static readonly Random random = new Random(); static readonly char[] TraceContextSeparators = new char[] { '\n' }; + const int MaxTagsPayloadSize = 8000; public static string? GetStringOrNull(this DbDataReader reader, int columnIndex) { @@ -486,16 +487,7 @@ internal static SqlString GetTraceContext(HistoryEvent e) internal static IDictionary? GetTags(DbDataReader reader) { - int ordinal; - try - { - ordinal = reader.GetOrdinal("Tags"); - } - catch (IndexOutOfRangeException) - { - // The Tags column may not exist in older schema versions - return null; - } + int ordinal = reader.GetOrdinal("Tags"); if (reader.IsDBNull(ordinal)) { @@ -519,6 +511,23 @@ internal static SqlString GetTraceContext(HistoryEvent e) } } + internal static SqlString GetTagsJson(HistoryEvent e) + { + if (e is ExecutionStartedEvent startedEvent && startedEvent.Tags != null && startedEvent.Tags.Count > 0) + { + string json = DTUtils.SerializeToJson(startedEvent.Tags); + if (json.Length > MaxTagsPayloadSize) + { + throw new ArgumentException( + $"The serialized tags payload is {json.Length} characters, which exceeds the maximum allowed size of {MaxTagsPayloadSize} characters."); + } + + return json; + } + + return SqlString.Null; + } + internal static void AddTagsParameter( this SqlParameterCollection parameters, IDictionary? tags) @@ -526,7 +535,14 @@ internal static void AddTagsParameter( string? json = tags != null && tags.Count > 0 ? DTUtils.SerializeToJson(tags) : null; - parameters.Add("@Tags", SqlDbType.VarChar, 8000).Value = (object?)json ?? DBNull.Value; + + if (json != null && json.Length > MaxTagsPayloadSize) + { + throw new ArgumentException( + $"The serialized tags payload is {json.Length} characters, which exceeds the maximum allowed size of {MaxTagsPayloadSize} characters."); + } + + parameters.Add("@Tags", SqlDbType.VarChar, MaxTagsPayloadSize).Value = (object?)json ?? DBNull.Value; } public static SqlParameter AddInstanceIDsParameter( From ee619f066ca4fa81a4463b60d6ed6e3d5a67818e Mon Sep 17 00:00:00 2001 From: Yi Date: Fri, 6 Mar 2026 09:42:39 -0800 Subject: [PATCH 13/18] adding new tests --- .../Integration/Orchestrations.cs | 176 ++++++++++++++++-- .../Unit/SqlUtilsTagTests.cs | 67 +++++++ 2 files changed, 229 insertions(+), 14 deletions(-) diff --git a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs index f940573..0c588a4 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs @@ -894,8 +894,8 @@ await instance.WaitForCompletion( expectedOutput: "Bye!"); LogAssert.NoWarningsOrErrors(this.testService.LogProvider); - } - + } + [Fact] public async Task TerminateSuspendedOrchestration() { @@ -913,18 +913,18 @@ public async Task TerminateSuspendedOrchestration() await instance.WaitForStart(); // Suspend the orchestration so that it won't process any new events - await instance.SuspendAsync(); - - // Wait for the orchestration to become suspended - OrchestrationState state = await instance.GetStateAsync(); - TimeSpan waitForSuspendTimeout = TimeSpan.FromSeconds(5); - using CancellationTokenSource cts = new(waitForSuspendTimeout); - while (!cts.IsCancellationRequested && state.OrchestrationStatus != OrchestrationStatus.Suspended) - { - state = await instance.GetStateAsync(); + await instance.SuspendAsync(); + + // Wait for the orchestration to become suspended + OrchestrationState state = await instance.GetStateAsync(); + TimeSpan waitForSuspendTimeout = TimeSpan.FromSeconds(5); + using CancellationTokenSource cts = new(waitForSuspendTimeout); + while (!cts.IsCancellationRequested && state.OrchestrationStatus != OrchestrationStatus.Suspended) + { + state = await instance.GetStateAsync(); } - Assert.Equal(OrchestrationStatus.Suspended, state.OrchestrationStatus); - + Assert.Equal(OrchestrationStatus.Suspended, state.OrchestrationStatus); + // Now terminate the orchestration await instance.TerminateAsync("Bye!"); @@ -940,7 +940,7 @@ public async Task TerminateSuspendedOrchestration() this.testService.LogProvider, LogAssert.AcquiredAppLock(), LogAssert.CheckpointStarting(orchestrationName), - LogAssert.CheckpointCompleted(orchestrationName), + LogAssert.CheckpointCompleted(orchestrationName), LogAssert.CheckpointStarting(orchestrationName), LogAssert.CheckpointCompleted(orchestrationName), LogAssert.CheckpointStarting(orchestrationName), @@ -1094,6 +1094,154 @@ public async Task TagsWithSpecialCharacters() Assert.Equal("value'with\"quotes", state.Tags["special\"chars"]); } + [Fact] + public async Task SubOrchestrationMergesTags() + { + // Parent tags + var parentTags = new Dictionary + { + { "env", "prod" }, + { "shared", "parent-value" }, + }; + + // Sub-orchestration-specific tags (will be merged with parent tags by Core) + var subTags = new Dictionary + { + { "team", "backend" }, + { "shared", "child-override" }, // should override parent's value + }; + + string subOrchName = "SubOrchForMergeTest"; + string subInstanceId = $"sub-merge-{Guid.NewGuid():N}"; + + this.testService.RegisterInlineOrchestration( + subOrchName, + implementation: (ctx, input) => Task.FromResult("done")); + + TestInstance instance = await this.testService.RunOrchestrationWithTags( + input: (string)null, + orchestrationName: "ParentOrchForMergeTest", + tags: parentTags, + implementation: async (ctx, input) => + { + // Use the 5-arg overload that passes sub-orch-specific tags + return await ctx.CreateSubOrchestrationInstance( + subOrchName, string.Empty, subInstanceId, null, subTags); + }); + + await instance.WaitForCompletion( + timeout: TimeSpan.FromSeconds(15), + expectedOutput: "done"); + + // Verify sub-orchestration has MERGED tags (parent + child, child overrides) + OrchestrationState subState = await this.testService.GetOrchestrationStateAsync(subInstanceId); + Assert.NotNull(subState); + Assert.NotNull(subState.Tags); + Assert.Equal("prod", subState.Tags["env"]); // inherited from parent + Assert.Equal("backend", subState.Tags["team"]); // from sub-orch + Assert.Equal("child-override", subState.Tags["shared"]); // child overrides parent + } + + [Fact] + public async Task MultipleSubOrchestrationsMergeDifferentTags() + { + var parentTags = new Dictionary + { + { "env", "prod" }, + }; + + string subOrchName = "SubOrchForFanOutTest"; + string subId1 = $"sub-fanout1-{Guid.NewGuid():N}"; + string subId2 = $"sub-fanout2-{Guid.NewGuid():N}"; + + this.testService.RegisterInlineOrchestration( + subOrchName, + implementation: (ctx, input) => Task.FromResult("done")); + + TestInstance instance = await this.testService.RunOrchestrationWithTags( + input: (string)null, + orchestrationName: "ParentOrchForFanOutTest", + tags: parentTags, + implementation: async (ctx, input) => + { + // Fan-out: create two sub-orchestrations with different tags + var tags1 = new Dictionary { { "region", "us" } }; + var tags2 = new Dictionary { { "region", "eu" } }; + + Task t1 = ctx.CreateSubOrchestrationInstance( + subOrchName, string.Empty, subId1, null, tags1); + Task t2 = ctx.CreateSubOrchestrationInstance( + subOrchName, string.Empty, subId2, null, tags2); + + await Task.WhenAll(t1, t2); + return "done"; + }); + + await instance.WaitForCompletion( + timeout: TimeSpan.FromSeconds(15), + expectedOutput: "done"); + + // Verify each sub-orchestration got its own correctly-merged tags + OrchestrationState sub1 = await this.testService.GetOrchestrationStateAsync(subId1); + Assert.NotNull(sub1?.Tags); + Assert.Equal("prod", sub1.Tags["env"]); // inherited from parent + Assert.Equal("us", sub1.Tags["region"]); // specific to sub-orch 1 + + OrchestrationState sub2 = await this.testService.GetOrchestrationStateAsync(subId2); + Assert.NotNull(sub2?.Tags); + Assert.Equal("prod", sub2.Tags["env"]); // inherited from parent + Assert.Equal("eu", sub2.Tags["region"]); // specific to sub-orch 2 + } + + [Fact] + public async Task MergedTagsExceedMaxSize_ParentStaysRunning() + { + // Parent and child tags are each within the 8000-char limit, + // but exceed it after Core's MergeTags() combines them. + // Expected behavior: the parent's checkpoint fails with ArgumentException + // (thrown by GetTagsJson during TVP population), so the parent stays Running. + var parentTags = new Dictionary + { + { "parentKey", new string('p', 4500) }, + }; + + var childTags = new Dictionary + { + { "childKey", new string('c', 4500) }, + }; + + string subOrchName = "SubOrchForOverflowTest"; + string subInstanceId = $"sub-overflow-{Guid.NewGuid():N}"; + + this.testService.RegisterInlineOrchestration( + subOrchName, + implementation: (ctx, input) => Task.FromResult("done")); + + TestInstance instance = await this.testService.RunOrchestrationWithTags( + input: (string)null, + orchestrationName: "ParentOrchForOverflowTest", + tags: parentTags, + implementation: async (ctx, input) => + { + return await ctx.CreateSubOrchestrationInstance( + subOrchName, string.Empty, subInstanceId, null, childTags); + }); + + // Wait briefly to allow at least one checkpoint attempt + await Task.Delay(TimeSpan.FromSeconds(5)); + + // Parent should still be Pending because the checkpoint keeps failing. + // (The status update from Pending to Running happens inside _CheckpointOrchestration, + // which never succeeds because TVP population throws before the SQL command executes.) + OrchestrationState parentState = await this.testService.GetOrchestrationStateAsync(instance.InstanceId); + Assert.NotNull(parentState); + Assert.Equal(OrchestrationStatus.Pending, parentState.OrchestrationStatus); + + // Sub-orchestration should NOT have been created (checkpoint never succeeded) + OrchestrationState subState = await this.testService.GetOrchestrationStateAsync(subInstanceId); + Assert.Null(subState); + } + [Fact] public async Task TagsOnManyOrchestrations() { diff --git a/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs b/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs index 387f48e..f89b0da 100644 --- a/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs +++ b/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs @@ -6,6 +6,8 @@ namespace DurableTask.SqlServer.Tests.Unit using System; using System.Collections.Generic; using System.Data; + using System.Data.SqlTypes; + using DurableTask.Core.History; using Microsoft.Data.SqlClient; using Newtonsoft.Json; using Xunit; @@ -87,5 +89,70 @@ public void AddTagsParameter_SpecialCharacters_RoundTrips() Assert.Equal("中文", deserialized["unicode-日本語"]); Assert.Equal("value with spaces", deserialized["key with spaces"]); } + + [Fact] + public void AddTagsParameter_TagsExceedMaxSize_ThrowsArgumentException() + { + // Arrange: create tags whose JSON serialization exceeds 8000 chars + var tags = new Dictionary + { + { "key", new string('x', 8000) }, + }; + + using var command = new SqlCommand(); + + // Act & Assert + var ex = Assert.Throws(() => command.Parameters.AddTagsParameter(tags)); + Assert.Contains("exceeds the maximum allowed size of 8000", ex.Message); + } + + [Fact] + public void GetTagsJson_TagsExceedMaxSize_ThrowsArgumentException() + { + // Arrange: simulate merged tags that exceed 8000 chars + // This covers the sub-orchestration merge path where individually-valid + // parent + child tags combine to exceed the limit + var tags = new Dictionary + { + { "key", new string('x', 8000) }, + }; + + var startedEvent = new ExecutionStartedEvent(-1, null) { Tags = tags }; + + // Act & Assert + var ex = Assert.Throws(() => SqlUtils.GetTagsJson(startedEvent)); + Assert.Contains("exceeds the maximum allowed size of 8000", ex.Message); + } + + [Fact] + public void GetTagsJson_NonExecutionStartedEvent_ReturnsNull() + { + // Non-ExecutionStartedEvent should return SqlString.Null + var timerEvent = new TimerFiredEvent(-1); + + SqlString result = SqlUtils.GetTagsJson(timerEvent); + + Assert.True(result.IsNull); + } + + [Fact] + public void GetTagsJson_ExecutionStartedWithTags_ReturnsJson() + { + var tags = new Dictionary + { + { "env", "prod" }, + { "team", "backend" }, + }; + + var startedEvent = new ExecutionStartedEvent(-1, null) { Tags = tags }; + + SqlString result = SqlUtils.GetTagsJson(startedEvent); + + Assert.False(result.IsNull); + var deserialized = JsonConvert.DeserializeObject>(result.Value); + Assert.Equal(2, deserialized.Count); + Assert.Equal("prod", deserialized["env"]); + Assert.Equal("backend", deserialized["team"]); + } } } From dffb82fbf57599326ce0c9f23979d7ff47e7344f Mon Sep 17 00:00:00 2001 From: Yi Date: Fri, 6 Mar 2026 10:55:12 -0800 Subject: [PATCH 14/18] add missing tags to the GetInstanceHistory sproc, so we don't have to handle tag non exist when GetTags is called --- src/DurableTask.SqlServer/Scripts/logic.sql | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index b47cb32..708d74b 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -352,10 +352,12 @@ BEGIN DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() DECLARE @ParentInstanceID varchar(100) DECLARE @Version varchar(100) + DECLARE @Tags varchar(8000) SELECT @ParentInstanceID = [ParentInstanceID], - @Version = [Version] + @Version = [Version], + @Tags = [Tags] FROM Instances WHERE [InstanceID] = @InstanceID SELECT @@ -374,7 +376,8 @@ BEGIN [PayloadID], @ParentInstanceID as [ParentInstanceID], @Version as [Version], - H.[TraceContext] + H.[TraceContext], + @Tags as [Tags] FROM History H WITH (INDEX (PK_History)) LEFT OUTER JOIN Payloads P ON P.[TaskHub] = @TaskHub AND From b0b2eed4dc93113aa89dac3a9c56e476f2cdefd4 Mon Sep 17 00:00:00 2001 From: Yi Date: Wed, 11 Mar 2026 12:15:53 -0700 Subject: [PATCH 15/18] change behavior, instead of throw exception, drop tags if surpassing size limit --- .../SqlOrchestrationService.cs | 5 ++-- .../SqlTypes/OrchestrationEventSqlType.cs | 23 +++++++++-------- src/DurableTask.SqlServer/SqlUtils.cs | 9 ++++--- .../Integration/Orchestrations.cs | 25 ++++++++----------- .../Unit/SqlUtilsTagTests.cs | 21 ++++++++++------ 5 files changed, 46 insertions(+), 37 deletions(-) diff --git a/src/DurableTask.SqlServer/SqlOrchestrationService.cs b/src/DurableTask.SqlServer/SqlOrchestrationService.cs index ce4921d..04286fb 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationService.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationService.cs @@ -372,7 +372,8 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync( timerMessages, continuedAsNewMessage, currentWorkItem.EventPayloadMappings, - this.settings.SchemaName); + this.settings.SchemaName, + this.traceHelper); command.Parameters.AddTaskEventsParameter( "@NewTaskEvents", @@ -546,7 +547,7 @@ public override async Task SendTaskOrchestrationMessageAsync(TaskMessage message using SqlConnection connection = await this.GetAndOpenConnectionAsync(); using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}._AddOrchestrationEvents"); - command.Parameters.AddOrchestrationEventsParameter("@NewOrchestrationEvents", message, this.settings.SchemaName); + command.Parameters.AddOrchestrationEventsParameter("@NewOrchestrationEvents", message, this.settings.SchemaName, this.traceHelper); string instanceId = message.OrchestrationInstance.InstanceId; await SqlUtils.ExecuteNonQueryAsync(command, this.traceHelper, instanceId); diff --git a/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs b/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs index 062010b..1694aba 100644 --- a/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs +++ b/src/DurableTask.SqlServer/SqlTypes/OrchestrationEventSqlType.cs @@ -61,7 +61,8 @@ public static SqlParameter AddOrchestrationEventsParameter( IList timerMessages, TaskMessage continuedAsNewMessage, EventPayloadMap eventPayloadMap, - string schemaName) + string schemaName, + LogHelper logHelper) { SqlParameter param = commandParameters.Add(paramName, SqlDbType.Structured); param.TypeName = $"{schemaName}.OrchestrationEvents"; @@ -72,7 +73,7 @@ public static SqlParameter AddOrchestrationEventsParameter( messages = messages.Append(continuedAsNewMessage); } - param.Value = ToOrchestrationMessageParameter(messages, eventPayloadMap); + param.Value = ToOrchestrationMessageParameter(messages, eventPayloadMap, logHelper); return param; } @@ -80,17 +81,19 @@ public static SqlParameter AddOrchestrationEventsParameter( this SqlParameterCollection commandParameters, string paramName, TaskMessage message, - string schemaName) + string schemaName, + LogHelper logHelper) { SqlParameter param = commandParameters.Add(paramName, SqlDbType.Structured); param.TypeName = $"{schemaName}.OrchestrationEvents"; - param.Value = ToOrchestrationMessageParameter(message); + param.Value = ToOrchestrationMessageParameter(message, logHelper); return param; } static IEnumerable? ToOrchestrationMessageParameter( this IEnumerable messages, - EventPayloadMap eventPayloadMap) + EventPayloadMap eventPayloadMap, + LogHelper logHelper) { if (!messages.Any()) { @@ -107,18 +110,18 @@ IEnumerable GetOrchestrationMessageRecords() var record = new SqlDataRecord(OrchestrationEventSchema); foreach (TaskMessage msg in messages) { - yield return PopulateOrchestrationMessage(msg, record, eventPayloadMap); + yield return PopulateOrchestrationMessage(msg, record, eventPayloadMap, logHelper); } } } - static IEnumerable ToOrchestrationMessageParameter(TaskMessage msg) + static IEnumerable ToOrchestrationMessageParameter(TaskMessage msg, LogHelper logHelper) { var record = new SqlDataRecord(OrchestrationEventSchema); - yield return PopulateOrchestrationMessage(msg, record, eventPayloadMap: null); + yield return PopulateOrchestrationMessage(msg, record, eventPayloadMap: null, logHelper); } - static SqlDataRecord PopulateOrchestrationMessage(TaskMessage msg, SqlDataRecord record, EventPayloadMap? eventPayloadMap) + static SqlDataRecord PopulateOrchestrationMessage(TaskMessage msg, SqlDataRecord record, EventPayloadMap? eventPayloadMap, LogHelper logHelper) { string instanceId = msg.OrchestrationInstance.InstanceId; @@ -154,7 +157,7 @@ static SqlDataRecord PopulateOrchestrationMessage(TaskMessage msg, SqlDataRecord record.SetSqlString(ColumnOrdinals.ParentInstanceID, SqlUtils.GetParentInstanceId(msg.Event)); record.SetSqlString(ColumnOrdinals.Version, SqlUtils.GetVersion(msg.Event)); record.SetSqlString(ColumnOrdinals.TraceContext, SqlUtils.GetTraceContext(msg.Event)); - record.SetSqlString(ColumnOrdinals.Tags, SqlUtils.GetTagsJson(msg.Event)); + record.SetSqlString(ColumnOrdinals.Tags, SqlUtils.GetTagsJson(msg.Event, logHelper)); return record; } diff --git a/src/DurableTask.SqlServer/SqlUtils.cs b/src/DurableTask.SqlServer/SqlUtils.cs index 4bc463c..7f6804f 100644 --- a/src/DurableTask.SqlServer/SqlUtils.cs +++ b/src/DurableTask.SqlServer/SqlUtils.cs @@ -511,15 +511,18 @@ internal static SqlString GetTraceContext(HistoryEvent e) } } - internal static SqlString GetTagsJson(HistoryEvent e) + internal static SqlString GetTagsJson(HistoryEvent e, LogHelper logHelper) { if (e is ExecutionStartedEvent startedEvent && startedEvent.Tags != null && startedEvent.Tags.Count > 0) { string json = DTUtils.SerializeToJson(startedEvent.Tags); if (json.Length > MaxTagsPayloadSize) { - throw new ArgumentException( - $"The serialized tags payload is {json.Length} characters, which exceeds the maximum allowed size of {MaxTagsPayloadSize} characters."); + logHelper.GenericWarning( + $"Dropping oversized tags ({json.Length} chars, max {MaxTagsPayloadSize}) for sub-orchestration. " + + $"The merged parent+child tags exceed the allowed limit and will not be persisted.", + instanceId: (e as ExecutionStartedEvent)?.ParentInstance?.OrchestrationInstance?.InstanceId); + return SqlString.Null; } return json; diff --git a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs index 0c588a4..fc3ff44 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs @@ -1194,12 +1194,14 @@ await instance.WaitForCompletion( } [Fact] - public async Task MergedTagsExceedMaxSize_ParentStaysRunning() + public async Task MergedTagsExceedMaxSize_OversizedTagsDropped() { // Parent and child tags are each within the 8000-char limit, // but exceed it after Core's MergeTags() combines them. - // Expected behavior: the parent's checkpoint fails with ArgumentException - // (thrown by GetTagsJson during TVP population), so the parent stays Running. + // Expected behavior: oversized merged tags are silently dropped + // (with a trace warning), the sub-orchestration is created with + // null tags, and the parent completes normally. + var parentTags = new Dictionary { { "parentKey", new string('p', 4500) }, @@ -1227,19 +1229,14 @@ public async Task MergedTagsExceedMaxSize_ParentStaysRunning() subOrchName, string.Empty, subInstanceId, null, childTags); }); - // Wait briefly to allow at least one checkpoint attempt - await Task.Delay(TimeSpan.FromSeconds(5)); - - // Parent should still be Pending because the checkpoint keeps failing. - // (The status update from Pending to Running happens inside _CheckpointOrchestration, - // which never succeeds because TVP population throws before the SQL command executes.) - OrchestrationState parentState = await this.testService.GetOrchestrationStateAsync(instance.InstanceId); - Assert.NotNull(parentState); - Assert.Equal(OrchestrationStatus.Pending, parentState.OrchestrationStatus); + // Parent should complete normally (sub-orch returns "done") + await instance.WaitForCompletion(expectedOutput: "done"); - // Sub-orchestration should NOT have been created (checkpoint never succeeded) + // Sub-orchestration should have been created, but with null tags + // because the merged tags exceeded the maximum size. OrchestrationState subState = await this.testService.GetOrchestrationStateAsync(subInstanceId); - Assert.Null(subState); + Assert.NotNull(subState); + Assert.Null(subState.Tags); } [Fact] diff --git a/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs b/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs index f89b0da..b97dbda 100644 --- a/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs +++ b/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs @@ -9,11 +9,13 @@ namespace DurableTask.SqlServer.Tests.Unit using System.Data.SqlTypes; using DurableTask.Core.History; using Microsoft.Data.SqlClient; + using Microsoft.Extensions.Logging.Abstractions; using Newtonsoft.Json; using Xunit; public class SqlUtilsTagTests { + static readonly LogHelper TestLogHelper = new LogHelper(NullLogger.Instance); [Fact] public void AddTagsParameter_WithTags_SetsJsonValue() { @@ -107,11 +109,12 @@ public void AddTagsParameter_TagsExceedMaxSize_ThrowsArgumentException() } [Fact] - public void GetTagsJson_TagsExceedMaxSize_ThrowsArgumentException() + public void GetTagsJson_TagsExceedMaxSize_ReturnsNullAndDropsTags() { - // Arrange: simulate merged tags that exceed 8000 chars + // Arrange: simulate merged tags that exceed 8000 chars. // This covers the sub-orchestration merge path where individually-valid - // parent + child tags combine to exceed the limit + // parent + child tags combine to exceed the limit. + // Expected: returns SqlString.Null (tags silently dropped with a warning). var tags = new Dictionary { { "key", new string('x', 8000) }, @@ -119,9 +122,11 @@ public void GetTagsJson_TagsExceedMaxSize_ThrowsArgumentException() var startedEvent = new ExecutionStartedEvent(-1, null) { Tags = tags }; - // Act & Assert - var ex = Assert.Throws(() => SqlUtils.GetTagsJson(startedEvent)); - Assert.Contains("exceeds the maximum allowed size of 8000", ex.Message); + // Act + SqlString result = SqlUtils.GetTagsJson(startedEvent, TestLogHelper); + + // Assert: oversized tags are dropped, not thrown + Assert.True(result.IsNull); } [Fact] @@ -130,7 +135,7 @@ public void GetTagsJson_NonExecutionStartedEvent_ReturnsNull() // Non-ExecutionStartedEvent should return SqlString.Null var timerEvent = new TimerFiredEvent(-1); - SqlString result = SqlUtils.GetTagsJson(timerEvent); + SqlString result = SqlUtils.GetTagsJson(timerEvent, TestLogHelper); Assert.True(result.IsNull); } @@ -146,7 +151,7 @@ public void GetTagsJson_ExecutionStartedWithTags_ReturnsJson() var startedEvent = new ExecutionStartedEvent(-1, null) { Tags = tags }; - SqlString result = SqlUtils.GetTagsJson(startedEvent); + SqlString result = SqlUtils.GetTagsJson(startedEvent, TestLogHelper); Assert.False(result.IsNull); var deserialized = JsonConvert.DeserializeObject>(result.Value); From 92ab9ea673f508e1b39be904f84635bda841ab31 Mon Sep 17 00:00:00 2001 From: Yi Date: Wed, 11 Mar 2026 12:43:57 -0700 Subject: [PATCH 16/18] Retry CI From 2b0985b98ef8ab22fd0fa43a7a9a1ff2a79fc04c Mon Sep 17 00:00:00 2001 From: Yi Date: Wed, 11 Mar 2026 15:08:50 -0700 Subject: [PATCH 17/18] address comments --- src/DurableTask.SqlServer/SqlUtils.cs | 15 ++++++++++----- .../Unit/SqlUtilsTagTests.cs | 2 +- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/DurableTask.SqlServer/SqlUtils.cs b/src/DurableTask.SqlServer/SqlUtils.cs index 7f6804f..8d6de4d 100644 --- a/src/DurableTask.SqlServer/SqlUtils.cs +++ b/src/DurableTask.SqlServer/SqlUtils.cs @@ -516,10 +516,11 @@ internal static SqlString GetTagsJson(HistoryEvent e, LogHelper logHelper) if (e is ExecutionStartedEvent startedEvent && startedEvent.Tags != null && startedEvent.Tags.Count > 0) { string json = DTUtils.SerializeToJson(startedEvent.Tags); - if (json.Length > MaxTagsPayloadSize) + int utf8Bytes = Encoding.UTF8.GetByteCount(json); + if (utf8Bytes > MaxTagsPayloadSize) { logHelper.GenericWarning( - $"Dropping oversized tags ({json.Length} chars, max {MaxTagsPayloadSize}) for sub-orchestration. " + + $"Dropping oversized tags ({utf8Bytes} bytes, max {MaxTagsPayloadSize}) for sub-orchestration. " + $"The merged parent+child tags exceed the allowed limit and will not be persisted.", instanceId: (e as ExecutionStartedEvent)?.ParentInstance?.OrchestrationInstance?.InstanceId); return SqlString.Null; @@ -539,10 +540,14 @@ internal static void AddTagsParameter( ? DTUtils.SerializeToJson(tags) : null; - if (json != null && json.Length > MaxTagsPayloadSize) + if (json != null) { - throw new ArgumentException( - $"The serialized tags payload is {json.Length} characters, which exceeds the maximum allowed size of {MaxTagsPayloadSize} characters."); + int utf8Bytes = Encoding.UTF8.GetByteCount(json); + if (utf8Bytes > MaxTagsPayloadSize) + { + throw new ArgumentException( + $"The serialized tags payload is {utf8Bytes} bytes, which exceeds the maximum allowed size of {MaxTagsPayloadSize} bytes."); + } } parameters.Add("@Tags", SqlDbType.VarChar, MaxTagsPayloadSize).Value = (object?)json ?? DBNull.Value; diff --git a/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs b/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs index b97dbda..df61341 100644 --- a/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs +++ b/test/DurableTask.SqlServer.Tests/Unit/SqlUtilsTagTests.cs @@ -105,7 +105,7 @@ public void AddTagsParameter_TagsExceedMaxSize_ThrowsArgumentException() // Act & Assert var ex = Assert.Throws(() => command.Parameters.AddTagsParameter(tags)); - Assert.Contains("exceeds the maximum allowed size of 8000", ex.Message); + Assert.Contains("exceeds the maximum allowed size of 8000 bytes", ex.Message); } [Fact] From 51baad78ba1c9a4a187205e4ca66a5543057600b Mon Sep 17 00:00:00 2001 From: Yi Date: Wed, 11 Mar 2026 15:21:18 -0700 Subject: [PATCH 18/18] Retry CI