Skip to content

Commit bac42d3

Browse files
authored
Apply modern event loop algorithm with new SDK flag (#432)
Fixes #427
1 parent fc0fd7b commit bac42d3

File tree

5 files changed

+559
-60
lines changed

5 files changed

+559
-60
lines changed

src/Temporalio/Worker/WorkflowInstance.cs

Lines changed: 92 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ internal class WorkflowInstance : TaskScheduler, IWorkflowInstance, IWorkflowCon
8585
private WorkflowQueryDefinition? dynamicQuery;
8686
private WorkflowSignalDefinition? dynamicSignal;
8787
private WorkflowUpdateDefinition? dynamicUpdate;
88+
private bool workflowInitialized;
89+
private bool applyModernEventLoopLogic;
8890

8991
/// <summary>
9092
/// Initializes a new instance of the <see cref="WorkflowInstance"/> class.
@@ -529,6 +531,20 @@ public WorkflowActivationCompletion Activate(WorkflowActivation act)
529531
IsReplaying = act.IsReplaying;
530532
UtcNow = act.Timestamp.ToDateTime();
531533

534+
// If the workflow has not been initialized, we are in the first activation and we
535+
// need to set the modern-event-loop-logic flag
536+
if (!workflowInitialized)
537+
{
538+
// If we're not replaying or we are replaying and the flag is already set, set
539+
// to true and mark flag. Otherwise we leave false.
540+
if (!IsReplaying ||
541+
act.AvailableInternalFlags.Contains((uint)WorkflowLogicFlag.ApplyModernEventLoopLogic))
542+
{
543+
applyModernEventLoopLogic = true;
544+
completion.Successful.UsedInternalFlags.Add((uint)WorkflowLogicFlag.ApplyModernEventLoopLogic);
545+
}
546+
}
547+
532548
// Starting callback
533549
onTaskStarting(this);
534550

@@ -541,29 +557,59 @@ public WorkflowActivationCompletion Activate(WorkflowActivation act)
541557
{
542558
// We must set the sync context to null so work isn't posted there
543559
SynchronizationContext.SetSynchronizationContext(null);
544-
// TODO: Temporary workaround in lieu of https://github.com/temporalio/sdk-dotnet/issues/375
545-
var sortedJobs = act.Jobs.OrderBy(j =>
560+
561+
// We only sort jobs in legacy event loop logic, modern relies on Core
562+
List<WorkflowActivationJob> jobs;
563+
if (applyModernEventLoopLogic)
546564
{
547-
switch (j.VariantCase)
565+
jobs = act.Jobs.ToList();
566+
}
567+
else
568+
{
569+
jobs = act.Jobs.OrderBy(j =>
548570
{
549-
case WorkflowActivationJob.VariantOneofCase.NotifyHasPatch:
550-
case WorkflowActivationJob.VariantOneofCase.UpdateRandomSeed:
551-
return 1;
552-
case WorkflowActivationJob.VariantOneofCase.SignalWorkflow:
553-
case WorkflowActivationJob.VariantOneofCase.DoUpdate:
554-
return 2;
555-
case WorkflowActivationJob.VariantOneofCase.InitializeWorkflow:
556-
return 3;
557-
default:
558-
return 4;
559-
}
560-
}).ToList();
561-
// We can trust jobs are deterministically ordered by core
562-
foreach (var job in sortedJobs)
571+
switch (j.VariantCase)
572+
{
573+
case WorkflowActivationJob.VariantOneofCase.NotifyHasPatch:
574+
case WorkflowActivationJob.VariantOneofCase.UpdateRandomSeed:
575+
return 1;
576+
case WorkflowActivationJob.VariantOneofCase.SignalWorkflow:
577+
case WorkflowActivationJob.VariantOneofCase.DoUpdate:
578+
return 2;
579+
case WorkflowActivationJob.VariantOneofCase.InitializeWorkflow:
580+
return 3;
581+
default:
582+
return 4;
583+
}
584+
}).ToList();
585+
}
586+
587+
// Apply each job
588+
foreach (var job in jobs)
563589
{
564590
Apply(job);
565-
// Run scheduler once. Do not check conditions when patching or querying.
566-
var checkConditions = job.NotifyHasPatch == null && job.QueryWorkflow == null;
591+
// We only run the scheduler after each job in legacy event loop logic
592+
if (!applyModernEventLoopLogic)
593+
{
594+
// Run scheduler once. Do not check conditions when patching or
595+
// querying with legacy event loop logic.
596+
var checkConditions = job.NotifyHasPatch == null && job.QueryWorkflow == null;
597+
RunOnce(checkConditions);
598+
}
599+
}
600+
601+
// For modern event loop logic, we initialize here if not initialized
602+
// already
603+
if (applyModernEventLoopLogic && !workflowInitialized)
604+
{
605+
InitializeWorkflow();
606+
}
607+
608+
// For modern event loop logic, we run the event loop only after applying
609+
// everything, and we check conditions if there are any non-query jobs
610+
if (applyModernEventLoopLogic)
611+
{
612+
var checkConditions = jobs.Any(j => j.VariantCase != WorkflowActivationJob.VariantOneofCase.QueryWorkflow);
567613
RunOnce(checkConditions);
568614
}
569615
}
@@ -701,9 +747,20 @@ private void RunOnce(bool checkConditions)
701747
Workflow.OverrideContext.Value = this;
702748
try
703749
{
704-
foreach (var source in conditions.Where(t => t.Item1()).Select(t => t.Item2))
750+
foreach (var condition in conditions)
705751
{
706-
source.TrySetResult(null);
752+
// Check whether the condition evaluates to true
753+
if (condition.Item1())
754+
{
755+
// Set condition as resolved
756+
condition.Item2.TrySetResult(null);
757+
// When applying modern event loop logic, we want to break after the
758+
// first condition is resolved instead of checking/applying all
759+
if (applyModernEventLoopLogic)
760+
{
761+
break;
762+
}
763+
}
707764
}
708765
}
709766
finally
@@ -901,7 +958,11 @@ private void Apply(WorkflowActivationJob job)
901958
ApplySignalWorkflow(job.SignalWorkflow);
902959
break;
903960
case WorkflowActivationJob.VariantOneofCase.InitializeWorkflow:
904-
ApplyInitializeWorkflow(job.InitializeWorkflow);
961+
// We only initialize the workflow at job time on legacy event loop logic
962+
if (!applyModernEventLoopLogic)
963+
{
964+
InitializeWorkflow();
965+
}
905966
break;
906967
case WorkflowActivationJob.VariantOneofCase.UpdateRandomSeed:
907968
ApplyUpdateRandomSeed(job.UpdateRandomSeed);
@@ -1333,8 +1394,16 @@ await inbound.Value.HandleSignalAsync(new(
13331394
}));
13341395
}
13351396

1336-
private void ApplyInitializeWorkflow(InitializeWorkflow init)
1397+
private void ApplyUpdateRandomSeed(UpdateRandomSeed update) =>
1398+
Random = new(update.RandomnessSeed);
1399+
1400+
private void InitializeWorkflow()
13371401
{
1402+
if (workflowInitialized)
1403+
{
1404+
throw new InvalidOperationException("Workflow unexpectedly initialized");
1405+
}
1406+
workflowInitialized = true;
13381407
_ = QueueNewTaskAsync(() => RunTopLevelAsync(async () =>
13391408
{
13401409
var input = new ExecuteWorkflowInput(
@@ -1349,9 +1418,6 @@ private void ApplyInitializeWorkflow(InitializeWorkflow init)
13491418
}));
13501419
}
13511420

1352-
private void ApplyUpdateRandomSeed(UpdateRandomSeed update) =>
1353-
Random = new(update.RandomnessSeed);
1354-
13551421
private void OnQueryDefinitionAdded(string name, WorkflowQueryDefinition defn)
13561422
{
13571423
if (defn.Dynamic)

src/Temporalio/Worker/WorkflowLogicFlag.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,21 @@ internal enum WorkflowLogicFlag : uint
1111
/// set.
1212
/// </summary>
1313
ReorderWorkflowCompletion = 1,
14+
15+
/// <summary>
16+
/// Before this flag, workflow async logic did the following (in no particular order):
17+
/// * Reorder jobs itself.
18+
/// * Schedule primary workflow method when the initialize job was seen.
19+
/// * Tick the event loop after each job processed.
20+
/// * Check all conditions at once and resolve all that are satisfied.
21+
/// We have since learned that all of these are wrong. So if this flag is set on the first
22+
/// activation/task of a workflow, the following logic now applies respectively:
23+
/// * Leave Core's job ordering as is.
24+
/// * Do not schedule the primary workflow method until after all jobs have been applied and
25+
/// it's not already present.
26+
/// * Tick the event loop only once after everything applied.
27+
/// * Only resolve the first condition that is satisfied and re-run the entire event loop.
28+
/// </summary>
29+
ApplyModernEventLoopLogic = 2,
1430
}
1531
}

0 commit comments

Comments
 (0)