Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/Temporalio/Activities/ActivityExecutionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ public CancellationToken CancellationToken
/// <summary>
/// Gets the payload converter in use by this activity worker.
/// </summary>
/// <remarks>
/// If the original converter supported serialization contexts, this is the converter with
/// the activity serialization context applied.
/// </remarks>
public IPayloadConverter PayloadConverter { get; private init; }

/// <summary>
Expand Down
287 changes: 174 additions & 113 deletions src/Temporalio/Worker/WorkflowCodecHelper.cs

Large diffs are not rendered by default.

99 changes: 60 additions & 39 deletions src/Temporalio/Worker/WorkflowInstance.cs

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions src/Temporalio/Worker/WorkflowInstanceDetails.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ namespace Temporalio.Worker
/// <param name="InitialActivation">Initial activation for the workflow.</param>
/// <param name="Init">Start attributes for the workflow.</param>
/// <param name="Interceptors">Interceptors.</param>
/// <param name="PayloadConverter">Payload converter.</param>
/// <param name="FailureConverter">Failure converter.</param>
/// <param name="PayloadConverterNoContext">Payload converter with no context.</param>
/// <param name="PayloadConverterWorkflowContext">Payload converter with workflow context.</param>
/// <param name="FailureConverterNoContext">Failure converter with no context.</param>
/// <param name="FailureConverterWorkflowContext">Failure converter with workflow context.</param>
/// <param name="LoggerFactory">Logger factory.</param>
/// <param name="DisableTracingEvents">Whether tracing events are disabled.</param>
/// <param name="WorkflowStackTrace">Option for workflow stack trace.</param>
Expand All @@ -35,8 +37,10 @@ internal record WorkflowInstanceDetails(
WorkflowActivation InitialActivation,
InitializeWorkflow Init,
IReadOnlyCollection<Interceptors.IWorkerInterceptor> Interceptors,
IPayloadConverter PayloadConverter,
IFailureConverter FailureConverter,
IPayloadConverter PayloadConverterNoContext,
IPayloadConverter PayloadConverterWorkflowContext,
IFailureConverter FailureConverterNoContext,
IFailureConverter FailureConverterWorkflowContext,
ILoggerFactory LoggerFactory,
bool DisableTracingEvents,
WorkflowStackTrace WorkflowStackTrace,
Expand Down
79 changes: 48 additions & 31 deletions src/Temporalio/Worker/WorkflowWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,43 +140,50 @@ private async Task HandleActivationAsync(WorkflowActivation act)
}

WorkflowActivationCompletion comp;
DataConverter dataConverter = options.DataConverter;
DataConverter dataConverterNoContext = options.DataConverter;
DataConverter? dataConverterWorkflowContext = null;
WorkflowCodecHelper.WorkflowCodecContext? codecContext = null;

// Catch any exception as a completion failure
try
{
// Create data converter with context before doing any work
string workflowId, workflowType;
IWorkflowCodecHelperInstance? instanceForCodec;
if (runningWorkflows.TryGetValue(act.RunId, out var inst))
{
codecContext = new(
Namespace: options.Namespace,
WorkflowId: inst.Info.WorkflowId,
WorkflowType: inst.Info.WorkflowType,
TaskQueue: options.TaskQueue,
Instance: inst);
workflowId = inst.Info.WorkflowId;
workflowType = inst.Info.WorkflowType;
instanceForCodec = inst;
}
else if (act.Jobs.Select(j => j.InitializeWorkflow).FirstOrDefault(s => s != null) is { } initJob)
{
codecContext = new(
Namespace: options.Namespace,
WorkflowId: initJob.WorkflowId,
WorkflowType: initJob.WorkflowType,
TaskQueue: options.TaskQueue,
Instance: null);
workflowId = initJob.WorkflowId;
workflowType = initJob.WorkflowType;
instanceForCodec = null;
}
else
{
throw new InvalidOperationException("Missing workflow start (unexpectedly evicted?)");
}
dataConverter = dataConverter.WithSerializationContext(
new ISerializationContext.Workflow(
Namespace: codecContext.Namespace, WorkflowId: codecContext.WorkflowId));

// Decode the activation if there is a codec
if (dataConverter.PayloadCodec is { } decodeCodec)
dataConverterWorkflowContext = dataConverterNoContext.WithSerializationContext(
new ISerializationContext.Workflow(options.Namespace, WorkflowId: workflowId));
// We'll only apply codec if one of the two converters has one
if (dataConverterNoContext.PayloadCodec != null ||
dataConverterWorkflowContext.PayloadCodec != null)
{
await WorkflowCodecHelper.DecodeAsync(decodeCodec, codecContext, act).ConfigureAwait(false);
codecContext = new(
CodecNoContext: dataConverterNoContext.PayloadCodec,
CodecWorkflowContext: dataConverterWorkflowContext.PayloadCodec,
Namespace: options.Namespace,
WorkflowId: workflowId,
WorkflowType: workflowType,
TaskQueue: options.TaskQueue,
Instance: instanceForCodec);
}
if (codecContext != null)
{
await WorkflowCodecHelper.DecodeAsync(codecContext, act).ConfigureAwait(false);
}

// Log proto at trace level
Expand All @@ -191,8 +198,12 @@ private async Task HandleActivationAsync(WorkflowActivation act)

// If the workflow is not yet running, create it. We know that we will only get
// one activation per workflow at a time, so GetOrAdd is safe for our use.
var workflow = runningWorkflows.GetOrAdd(act.RunId, _ => CreateInstance(act, dataConverter));
codecContext = codecContext with { Instance = workflow };
var workflow = runningWorkflows.GetOrAdd(act.RunId, _ => CreateInstance(
act, dataConverterNoContext, dataConverterWorkflowContext));
if (codecContext != null)
{
codecContext = codecContext with { Instance = workflow };
}

// Activate or timeout with deadlock timeout
// TODO(cretz): Any reason for users to need to customize factory here?
Expand Down Expand Up @@ -230,9 +241,10 @@ private async Task HandleActivationAsync(WorkflowActivation act)
comp = new() { Failed = new() };
try
{
// Failure converter needs to be in workflow context
comp.Failed.Failure_ = dataConverter.FailureConverter.ToFailure(
e, dataConverter.PayloadConverter);
// Failure converter needs to be in workflow context if available
var dataConverterForFailure = dataConverterWorkflowContext ?? dataConverterNoContext;
comp.Failed.Failure_ = dataConverterForFailure.FailureConverter.ToFailure(
e, dataConverterForFailure.PayloadConverter);
}
catch (Exception inner)
{
Expand All @@ -244,12 +256,12 @@ private async Task HandleActivationAsync(WorkflowActivation act)
// Always set the run ID of the completion
comp.RunId = act.RunId;

// Encode the completion if there is a codec
if (dataConverter.PayloadCodec is { } encodeCodec && codecContext is { } encodeContext)
// Encode the completion if there is a codec context
if (codecContext != null)
{
try
{
await WorkflowCodecHelper.EncodeAsync(encodeCodec, encodeContext, comp).ConfigureAwait(false);
await WorkflowCodecHelper.EncodeAsync(codecContext, comp).ConfigureAwait(false);
}
catch (Exception e)
{
Expand Down Expand Up @@ -313,7 +325,10 @@ private async Task HandleCacheEvictionAsync(WorkflowActivation act, RemoveFromCa
}
}

private IWorkflowInstance CreateInstance(WorkflowActivation act, DataConverter dataConverter)
private IWorkflowInstance CreateInstance(
WorkflowActivation act,
DataConverter dataConverterNoContext,
DataConverter dataConverterWorkflowContext)
{
var init = act.Jobs.Select(j => j.InitializeWorkflow).FirstOrDefault(s => s != null) ??
throw new InvalidOperationException("Missing workflow start (unexpectedly evicted?)");
Expand All @@ -336,8 +351,10 @@ private IWorkflowInstance CreateInstance(WorkflowActivation act, DataConverter d
InitialActivation: act,
Init: init,
Interceptors: options.Interceptors,
PayloadConverter: dataConverter.PayloadConverter,
FailureConverter: dataConverter.FailureConverter,
PayloadConverterNoContext: dataConverterNoContext.PayloadConverter,
PayloadConverterWorkflowContext: dataConverterWorkflowContext.PayloadConverter,
FailureConverterNoContext: dataConverterNoContext.FailureConverter,
FailureConverterWorkflowContext: dataConverterWorkflowContext.FailureConverter,
LoggerFactory: options.LoggerFactory,
DisableTracingEvents: options.DisableWorkflowTracingEventListener,
WorkflowStackTrace: options.WorkflowStackTrace,
Expand Down
4 changes: 4 additions & 0 deletions src/Temporalio/Workflows/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ public static WorkflowUpdateDefinition? DynamicUpdate
/// <summary>
/// Gets the payload converter for the workflow.
/// </summary>
/// <remarks>
/// If the original converter supported serialization contexts, this is the converter with
/// the workflow serialization context applied.
/// </remarks>
public static IPayloadConverter PayloadConverter => Context.PayloadConverter;

/// <summary>
Expand Down
22 changes: 12 additions & 10 deletions tests/Temporalio.Tests/Worker/WorkflowCodecHelperTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ public WorkflowCodecHelperTests(ITestOutputHelper output)
{
}

internal static WorkflowCodecHelper.WorkflowCodecContext SimpleCodecContext { get; } = new(
Namespace: "my-namespace",
WorkflowId: "my-workflow-id",
WorkflowType: "my-workflow-type",
TaskQueue: "my-task-queue",
Instance: null);

[Fact]
public async Task CreateAndVisitPayload_Visiting_ReachesAllExpectedValues()
{
Expand Down Expand Up @@ -58,7 +51,7 @@ await CreateAndVisitPayload(new(), comp, async (ctx, payload) =>
Assert.DoesNotContain("encoded", payload().Metadata.Keys);
foreach (var codec in codecs)
{
await WorkflowCodecHelper.EncodeAsync(codec, SimpleCodecContext, comp);
await WorkflowCodecHelper.EncodeAsync(CreateSimpleCodecContext(codec), comp);
if (!payload().Metadata.ContainsKey("encoded"))
{
Assert.Fail($"Payload at path {ctx.Path} not encoded with codec {codec}");
Expand All @@ -82,7 +75,7 @@ await CreateAndVisitPayload(new(), act, async (ctx, payload) =>
Assert.DoesNotContain("decoded", payload().Metadata.Keys);
foreach (var codec in codecs)
{
await WorkflowCodecHelper.DecodeAsync(codec, SimpleCodecContext, act);
await WorkflowCodecHelper.DecodeAsync(CreateSimpleCodecContext(codec), act);
if (!payload().Metadata.ContainsKey("decoded"))
{
Assert.Fail($"Payload at path {ctx.Path} not decoded with codec {codec}");
Expand All @@ -106,11 +99,20 @@ await CreateAndVisitPayload(new(), comp, async (ctx, payload) =>
if (propInfo?.PropertyType == typeof(Payload))
{
propInfo.SetValue(msg, null);
await WorkflowCodecHelper.EncodeAsync(codec, SimpleCodecContext, comp);
await WorkflowCodecHelper.EncodeAsync(CreateSimpleCodecContext(codec), comp);
}
});
}

private static WorkflowCodecHelper.WorkflowCodecContext CreateSimpleCodecContext(IPayloadCodec codec) => new(
CodecNoContext: codec,
CodecWorkflowContext: codec,
Namespace: "my-namespace",
WorkflowId: "my-workflow-id",
WorkflowType: "my-workflow-type",
TaskQueue: "my-task-queue",
Instance: null);

// Creates payloads as needed, null context if already seen
private static async Task CreateAndVisitPayload(
PayloadVisitContext ctx, IMessage current, Func<PayloadVisitContext, Func<Payload>, Task> visitor)
Expand Down
Loading
Loading