Skip to content

Commit 657a13d

Browse files
authored
Don't require timer presence when fired (#169)
Fixes #167
1 parent bc62186 commit 657a13d

File tree

2 files changed

+64
-3
lines changed

2 files changed

+64
-3
lines changed

temporalio/worker/workflow_instance.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -380,10 +380,11 @@ def _apply_cancel_workflow(
380380
def _apply_fire_timer(
381381
self, job: temporalio.bridge.proto.workflow_activation.FireTimer
382382
) -> None:
383+
# We ignore an absent handler because it may have been cancelled and
384+
# removed earlier this activation by a signal
383385
handle = self._pending_timers.pop(job.seq, None)
384-
if not handle:
385-
raise RuntimeError(f"Failed finding timer handle for sequence {job.seq}")
386-
self._ready.append(handle)
386+
if handle:
387+
self._ready.append(handle)
387388

388389
def _apply_query_workflow(
389390
self, job: temporalio.bridge.proto.workflow_activation.QueryWorkflow

tests/worker/test_workflow.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2380,6 +2380,66 @@ async def test_workflow_query_does_not_run_condition(client: Client):
23802380
assert await handle.query(QueryAffectConditionWorkflow.check_condition)
23812381

23822382

2383+
@workflow.defn
2384+
class CancelSignalAndTimerFiredInSameTaskWorkflow:
2385+
@workflow.run
2386+
async def run(self) -> None:
2387+
# Start a 1 hour timer
2388+
self.timer_task = asyncio.create_task(asyncio.sleep(60 * 60))
2389+
# Wait on it
2390+
try:
2391+
await self.timer_task
2392+
assert False
2393+
except asyncio.CancelledError:
2394+
pass
2395+
2396+
@workflow.signal
2397+
def cancel_timer(self) -> None:
2398+
self.timer_task.cancel()
2399+
2400+
2401+
async def test_workflow_cancel_signal_and_timer_fired_in_same_task(
2402+
client: Client, env: WorkflowEnvironment
2403+
):
2404+
# This test only works when we support time skipping
2405+
if not env.supports_time_skipping:
2406+
pytest.skip("Need to skip time to validate this test")
2407+
2408+
# TODO(cretz): There is a bug in the Java test server, probably
2409+
# https://github.com/temporalio/sdk-java/issues/1138 where the first
2410+
# unlock-and-sleep hangs when running this test after
2411+
# test_workflow_cancel_activity. So we create a new test environment here.
2412+
async with await WorkflowEnvironment.start_time_skipping() as env:
2413+
# Start worker for 30 mins
2414+
async with new_worker(
2415+
client, CancelSignalAndTimerFiredInSameTaskWorkflow
2416+
) as worker:
2417+
task_queue = worker.task_queue
2418+
handle = await client.start_workflow(
2419+
CancelSignalAndTimerFiredInSameTaskWorkflow.run,
2420+
id=f"workflow-{uuid.uuid4()}",
2421+
task_queue=task_queue,
2422+
)
2423+
# Wait 30 mins so the worker is waiting on timer
2424+
await env.sleep(30 * 60)
2425+
2426+
# Listen to handler result in background so the auto-skipping works
2427+
result_task = asyncio.create_task(handle.result())
2428+
2429+
# Now that worker is stopped, send a signal and wait another hour to pass
2430+
# the timer
2431+
await handle.signal(CancelSignalAndTimerFiredInSameTaskWorkflow.cancel_timer)
2432+
await env.sleep(60 * 60)
2433+
2434+
# Start worker again and wait for workflow completion
2435+
async with new_worker(
2436+
client, CancelSignalAndTimerFiredInSameTaskWorkflow, task_queue=task_queue
2437+
):
2438+
# This used to not complete because a signal cancelling the timer was
2439+
# not respected by the timer fire
2440+
await result_task
2441+
2442+
23832443
def new_worker(
23842444
client: Client,
23852445
*workflows: Type,

0 commit comments

Comments
 (0)