Skip to content

Commit 7665bf5

Browse files
authored
Useful event ordering tests from discarded changes to event loop (#729)
1 parent 7af48a7 commit 7665bf5

5 files changed

+1423
-2
lines changed

tests/worker/test_replayer.py

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,27 @@
44
from dataclasses import dataclass
55
from datetime import timedelta
66
from pathlib import Path
7-
from typing import Dict
7+
from typing import Any, Dict, Optional, Type
88

99
import pytest
1010

1111
from temporalio import activity, workflow
1212
from temporalio.client import Client, WorkflowFailureError, WorkflowHistory
1313
from temporalio.exceptions import ApplicationError
1414
from temporalio.testing import WorkflowEnvironment
15-
from temporalio.worker import Replayer, Worker
15+
from temporalio.worker import (
16+
ExecuteWorkflowInput,
17+
Interceptor,
18+
Replayer,
19+
Worker,
20+
WorkflowInboundInterceptor,
21+
WorkflowInterceptorClassInput,
22+
)
1623
from tests.helpers import assert_eq_eventually
24+
from tests.worker.test_workflow import (
25+
ActivityAndSignalsWhileWorkflowDown,
26+
SignalsActivitiesTimersUpdatesTracingWorkflow,
27+
)
1728

1829

1930
@activity.defn
@@ -385,3 +396,97 @@ async def test_replayer_command_reordering_backward_compatibility() -> None:
385396
await Replayer(workflows=[UpdateCompletionAfterWorkflowReturn]).replay_workflow(
386397
WorkflowHistory.from_json("fake", history)
387398
)
399+
400+
401+
test_replayer_workflow_res = None
402+
403+
404+
class WorkerWorkflowResultInterceptor(Interceptor):
405+
def workflow_interceptor_class(
406+
self, input: WorkflowInterceptorClassInput
407+
) -> Optional[Type[WorkflowInboundInterceptor]]:
408+
return WorkflowResultInterceptor
409+
410+
411+
class WorkflowResultInterceptor(WorkflowInboundInterceptor):
412+
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
413+
global test_replayer_workflow_res
414+
res = await super().execute_workflow(input)
415+
test_replayer_workflow_res = res
416+
return res
417+
418+
419+
async def test_replayer_async_ordering() -> None:
420+
"""
421+
This test verifies that the order that asyncio tasks/coroutines are woken up matches the
422+
order they were before changes to apply all jobs and then run the event loop, where previously
423+
the event loop was ran after each "batch" of jobs.
424+
"""
425+
histories_and_expecteds = [
426+
(
427+
"test_replayer_event_tracing.json",
428+
[
429+
"sig-before-sync",
430+
"sig-before-1",
431+
"sig-before-2",
432+
"timer-sync",
433+
"act-sync",
434+
"act-1",
435+
"act-2",
436+
"sig-1-sync",
437+
"sig-1-1",
438+
"sig-1-2",
439+
"update-1-sync",
440+
"update-1-1",
441+
"update-1-2",
442+
"timer-1",
443+
"timer-2",
444+
],
445+
),
446+
(
447+
"test_replayer_event_tracing_double_sig_at_start.json",
448+
[
449+
"sig-before-sync",
450+
"sig-before-1",
451+
"sig-1-sync",
452+
"sig-1-1",
453+
"sig-before-2",
454+
"sig-1-2",
455+
"timer-sync",
456+
"act-sync",
457+
"update-1-sync",
458+
"update-1-1",
459+
"update-1-2",
460+
"act-1",
461+
"act-2",
462+
"timer-1",
463+
"timer-2",
464+
],
465+
),
466+
]
467+
for history, expected in histories_and_expecteds:
468+
with Path(__file__).with_name(history).open() as f:
469+
history = f.read()
470+
await Replayer(
471+
workflows=[SignalsActivitiesTimersUpdatesTracingWorkflow],
472+
interceptors=[WorkerWorkflowResultInterceptor()],
473+
).replay_workflow(WorkflowHistory.from_json("fake", history))
474+
assert test_replayer_workflow_res == expected
475+
476+
477+
async def test_replayer_alternate_async_ordering() -> None:
478+
with Path(__file__).with_name(
479+
"test_replayer_event_tracing_alternate.json"
480+
).open() as f:
481+
history = f.read()
482+
await Replayer(
483+
workflows=[ActivityAndSignalsWhileWorkflowDown],
484+
interceptors=[WorkerWorkflowResultInterceptor()],
485+
).replay_workflow(WorkflowHistory.from_json("fake", history))
486+
assert test_replayer_workflow_res == [
487+
"act-start",
488+
"sig-1",
489+
"sig-2",
490+
"counter-2",
491+
"act-done",
492+
]

0 commit comments

Comments
 (0)