Skip to content
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

<!-- Product dependencies -->
<ItemGroup>
<PackageVersion Include="Microsoft.Azure.DurableTask.Core" Version="3.0.0" />
<PackageVersion Include="Microsoft.Azure.DurableTask.Core" Version="3.3.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Abstractions" Version="1.3.0" />
<PackageVersion Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="3.0.3" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="3.1.7" />
Expand Down
12 changes: 8 additions & 4 deletions src/DurableTask.SqlServer/Scripts/logic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ IF TYPE_ID(N'__SchemaNamePlaceholder__.TaskEvents') IS NULL
[PayloadText] varchar(max) NULL,
[PayloadID] uniqueidentifier NULL,
[Version] varchar(100) NULL,
[TraceContext] varchar(800) NULL
[TraceContext] varchar(800) NULL,
[Tags] varchar(8000) NULL
)
GO

Expand Down Expand Up @@ -1031,7 +1032,8 @@ BEGIN
[LockExpiration],
[PayloadID],
[Version],
[TraceContext]
[TraceContext],
[Tags]
)
OUTPUT
INSERTED.[SequenceNumber],
Expand All @@ -1047,7 +1049,8 @@ BEGIN
[LockExpiration],
[PayloadID],
[Version],
[TraceContext]
[TraceContext],
[Tags]
FROM @NewTaskEvents

COMMIT TRANSACTION
Expand Down Expand Up @@ -1310,7 +1313,8 @@ BEGIN
P.[InstanceID] = N.[InstanceID] AND
P.[PayloadID] = N.[PayloadID]) AS [PayloadText],
DATEDIFF(SECOND, [Timestamp], @now) AS [WaitTime],
[TraceContext]
[TraceContext],
[Tags]
FROM NewTasks N
WHERE [TaskHub] = @TaskHub AND [SequenceNumber] = @SequenceNumber

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,23 @@
IF NOT EXISTS (SELECT 1 FROM sys.columns WHERE object_id = OBJECT_ID('__SchemaNamePlaceholder__.Instances') AND name = 'Tags')
ALTER TABLE __SchemaNamePlaceholder__.Instances ADD [Tags] varchar(8000) NULL

-- Add a Tags column to the OrchestrationEvents table type so that merged tags
-- flow through sub-orchestration creation events. To change a type we must first
-- drop all stored procedures that reference it, then drop the type itself.
-- The type and sprocs will be recreated by logic.sql which executes afterwards.
-- Add a new Tags column to the NewTasks table so that orchestration tags
-- propagate to activity task workers via OrchestrationExecutionContext.
IF NOT EXISTS (SELECT 1 FROM sys.columns WHERE object_id = OBJECT_ID('__SchemaNamePlaceholder__.NewTasks') AND name = 'Tags')
ALTER TABLE __SchemaNamePlaceholder__.NewTasks ADD [Tags] varchar(8000) NULL

-- Add Tags columns to the OrchestrationEvents and TaskEvents table types.
-- To change a type we must first drop all stored procedures that reference it,
-- then drop the type itself. The types and sprocs will be recreated by logic.sql
-- which executes afterwards.
IF OBJECT_ID('__SchemaNamePlaceholder__._AddOrchestrationEvents') IS NOT NULL
DROP PROCEDURE __SchemaNamePlaceholder__._AddOrchestrationEvents
IF OBJECT_ID('__SchemaNamePlaceholder__._CheckpointOrchestration') IS NOT NULL
DROP PROCEDURE __SchemaNamePlaceholder__._CheckpointOrchestration
IF OBJECT_ID('__SchemaNamePlaceholder__._CompleteTasks') IS NOT NULL
DROP PROCEDURE __SchemaNamePlaceholder__._CompleteTasks

IF TYPE_ID('__SchemaNamePlaceholder__.OrchestrationEvents') IS NOT NULL
DROP TYPE __SchemaNamePlaceholder__.OrchestrationEvents
IF TYPE_ID('__SchemaNamePlaceholder__.TaskEvents') IS NOT NULL
DROP TYPE __SchemaNamePlaceholder__.TaskEvents
5 changes: 3 additions & 2 deletions src/DurableTask.SqlServer/SqlOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,8 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync(
"@NewTaskEvents",
outboundMessages,
currentWorkItem.EventPayloadMappings,
this.settings.SchemaName);
this.settings.SchemaName,
this.traceHelper);

command.Parameters.AddHistoryEventsParameter(
"@NewHistoryEvents",
Expand Down Expand Up @@ -485,7 +486,7 @@ public override async Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkIte
using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}._CompleteTasks");

command.Parameters.AddMessageIdParameter("@CompletedTasks", workItem.TaskMessage, this.settings.SchemaName);
command.Parameters.AddTaskEventsParameter("@Results", responseMessage, this.settings.SchemaName);
command.Parameters.AddTaskEventsParameter("@Results", responseMessage, this.settings.SchemaName, this.traceHelper);

OrchestrationInstance instance = workItem.TaskMessage.OrchestrationInstance;
try
Expand Down
26 changes: 17 additions & 9 deletions src/DurableTask.SqlServer/SqlTypes/TaskEventSqlType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace DurableTask.SqlServer.SqlTypes
using System.Data.SqlTypes;
using System.Linq;
using DurableTask.Core;
using DurableTask.SqlServer.Logging;
using Microsoft.Data.SqlClient;
using Microsoft.Data.SqlClient.Server;

Expand All @@ -30,6 +31,7 @@ static class TaskEventSqlType
new SqlMetaData("PayloadID", SqlDbType.UniqueIdentifier),
new SqlMetaData("Version", SqlDbType.VarChar, 100),
new SqlMetaData("TraceContext", SqlDbType.VarChar, 800),
new SqlMetaData("Tags", SqlDbType.VarChar, 8000),
};

static class ColumnOrdinals
Expand All @@ -48,36 +50,40 @@ static class ColumnOrdinals
public const int PayloadId = 10;
public const int Version = 11;
public const int TraceContext = 12;
public const int Tags = 13;
}

public static SqlParameter AddTaskEventsParameter(
this SqlParameterCollection commandParameters,
string paramName,
IList<TaskMessage> outboundMessages,
EventPayloadMap eventPayloadMap,
string schemaName)
string schemaName,
LogHelper logHelper)
{
SqlParameter param = commandParameters.Add(paramName, SqlDbType.Structured);
param.TypeName = $"{schemaName}.TaskEvents";
param.Value = ToTaskMessagesParameter(outboundMessages, eventPayloadMap);
param.Value = ToTaskMessagesParameter(outboundMessages, eventPayloadMap, logHelper);
return param;
}

public static SqlParameter AddTaskEventsParameter(
this SqlParameterCollection commandParameters,
string paramName,
TaskMessage message,
string schemaName)
string schemaName,
LogHelper logHelper)
{
SqlParameter param = commandParameters.Add(paramName, SqlDbType.Structured);
param.TypeName = $"{schemaName}.TaskEvents";
param.Value = ToTaskMessageParameter(message);
param.Value = ToTaskMessageParameter(message, logHelper);
return param;
}

static IEnumerable<SqlDataRecord>? ToTaskMessagesParameter(
this IEnumerable<TaskMessage> messages,
EventPayloadMap? eventPayloadMap)
EventPayloadMap? eventPayloadMap,
LogHelper logHelper)
{
if (!messages.Any())
{
Expand All @@ -92,21 +98,22 @@ IEnumerable<SqlDataRecord> GetTaskEventRecords()
var record = new SqlDataRecord(TaskEventSchema);
foreach (TaskMessage msg in messages)
{
yield return PopulateTaskMessageRecord(msg, record, eventPayloadMap);
yield return PopulateTaskMessageRecord(msg, record, eventPayloadMap, logHelper);
}
}
}

static IEnumerable<SqlDataRecord> ToTaskMessageParameter(TaskMessage msg)
static IEnumerable<SqlDataRecord> ToTaskMessageParameter(TaskMessage msg, LogHelper logHelper)
{
var record = new SqlDataRecord(TaskEventSchema);
yield return PopulateTaskMessageRecord(msg, record, eventPayloadMap: null);
yield return PopulateTaskMessageRecord(msg, record, eventPayloadMap: null, logHelper);
}

static SqlDataRecord PopulateTaskMessageRecord(
TaskMessage msg,
SqlDataRecord record,
EventPayloadMap? eventPayloadMap)
EventPayloadMap? eventPayloadMap,
LogHelper logHelper)
{
record.SetSqlString(ColumnOrdinals.InstanceID, msg.OrchestrationInstance.InstanceId);
record.SetSqlString(ColumnOrdinals.ExecutionID, msg.OrchestrationInstance.ExecutionId);
Expand Down Expand Up @@ -140,6 +147,7 @@ static SqlDataRecord PopulateTaskMessageRecord(

record.SetSqlString(ColumnOrdinals.Version, SqlUtils.GetVersion(msg.Event));
record.SetSqlString(ColumnOrdinals.TraceContext, SqlUtils.GetTraceContext(msg.Event));
record.SetSqlString(ColumnOrdinals.Tags, SqlUtils.GetMergedTaskTagsJson(msg, logHelper));
return record;
}
}
Expand Down
56 changes: 46 additions & 10 deletions src/DurableTask.SqlServer/SqlUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public static HistoryEvent GetHistoryEvent(this DbDataReader reader, bool isOrch
Input = GetPayloadText(reader),
Name = GetName(reader),
Version = GetVersion(reader),
Tags = GetTags(reader),
ParentTraceContext = GetTraceContext(reader),
};
break;
Expand Down Expand Up @@ -515,21 +516,56 @@ internal static SqlString GetTagsJson(HistoryEvent e, LogHelper logHelper)
{
if (e is ExecutionStartedEvent startedEvent && startedEvent.Tags != null && startedEvent.Tags.Count > 0)
{
string json = DTUtils.SerializeToJson(startedEvent.Tags);
int utf8Bytes = Encoding.UTF8.GetByteCount(json);
if (utf8Bytes > MaxTagsPayloadSize)
return SerializeTagsJson(startedEvent.Tags, logHelper, (e as ExecutionStartedEvent)?.ParentInstance?.OrchestrationInstance?.InstanceId);
}

return SqlString.Null;
}

internal static SqlString GetMergedTaskTagsJson(TaskMessage msg, LogHelper logHelper)
{
IDictionary<string, string>? orchestrationTags = msg.OrchestrationExecutionContext?.OrchestrationTags;
IDictionary<string, string>? activityTags = (msg.Event as TaskScheduledEvent)?.Tags;

bool hasOrchTags = orchestrationTags != null && orchestrationTags.Count > 0;
bool hasActTags = activityTags != null && activityTags.Count > 0;

if (!hasOrchTags && !hasActTags)
{
return SqlString.Null;
}

// Merge flat: orchestration tags as base, activity tags override on key collision.
if (hasOrchTags && hasActTags)
{
var merged = new Dictionary<string, string>(orchestrationTags!);
foreach (var kvp in activityTags!)
{
logHelper.GenericWarning(
$"Dropping oversized tags ({utf8Bytes} bytes, max {MaxTagsPayloadSize}) for sub-orchestration. " +
$"The merged parent+child tags exceed the allowed limit and will not be persisted.",
instanceId: (e as ExecutionStartedEvent)?.ParentInstance?.OrchestrationInstance?.InstanceId);
return SqlString.Null;
merged[kvp.Key] = kvp.Value;
}
return SerializeTagsJson(merged, logHelper, msg.OrchestrationInstance?.InstanceId);
}

return json;
return SerializeTagsJson(
hasOrchTags ? orchestrationTags! : activityTags!,
logHelper,
msg.OrchestrationInstance?.InstanceId);
}

static SqlString SerializeTagsJson(IDictionary<string, string> tags, LogHelper logHelper, string? instanceId)
{
string json = DTUtils.SerializeToJson(tags);
int utf8Bytes = Encoding.UTF8.GetByteCount(json);
if (utf8Bytes > MaxTagsPayloadSize)
{
logHelper.GenericWarning(
$"Dropping oversized tags ({utf8Bytes} bytes, max {MaxTagsPayloadSize}). " +
$"The tags exceed the allowed limit and will not be persisted.",
instanceId: instanceId);
return SqlString.Null;
}

return SqlString.Null;
return json;
}

internal static void AddTagsParameter(
Expand Down
4 changes: 2 additions & 2 deletions src/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
<!-- Version settings: https://andrewlock.net/version-vs-versionsuffix-vs-packageversion-what-do-they-all-mean/ -->
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>5</MinorVersion>
<PatchVersion>4</PatchVersion>
<MinorVersion>6</MinorVersion>
<PatchVersion>0</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).$(MinorVersion).0.0</AssemblyVersion>
Expand Down
Loading
Loading