Skip to content

Commit dae22ac

Browse files
authored
Fix wait_condition timeout issue (#90)
Fixes #89
1 parent 2288f41 commit dae22ac

File tree

2 files changed

+52
-2
lines changed

2 files changed

+52
-2
lines changed

temporalio/worker/workflow_instance.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1149,9 +1149,13 @@ def _run_once(self) -> None:
11491149
if self._current_activation_error:
11501150
raise self._current_activation_error
11511151

1152-
# Check conditions which may add to the ready list
1152+
# Check conditions which may add to the ready list. Also remove
1153+
# conditions whose futures have already cancelled (e.g. when
1154+
# timed out).
11531155
self._conditions[:] = [
1154-
t for t in self._conditions if not self._check_condition(*t)
1156+
t
1157+
for t in self._conditions
1158+
if not t[1].done() and not self._check_condition(*t)
11551159
]
11561160
finally:
11571161
asyncio._set_running_loop(None)

tests/worker/test_workflow.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2118,6 +2118,52 @@ async def test_workflow_activity_method(client: Client):
21182118
assert result == MyDataClass(field1="in worker, workflow param, in workflow")
21192119

21202120

2121+
@workflow.defn
2122+
class WaitConditionTimeoutWorkflow:
2123+
def __init__(self) -> None:
2124+
self._done = False
2125+
self._waiting = False
2126+
2127+
@workflow.run
2128+
async def run(self) -> None:
2129+
# Force timeout, ignore, wait again
2130+
try:
2131+
await workflow.wait_condition(lambda: self._done, timeout=0.01)
2132+
raise RuntimeError("Expected timeout")
2133+
except asyncio.TimeoutError:
2134+
pass
2135+
self._waiting = True
2136+
await workflow.wait_condition(lambda: self._done)
2137+
2138+
@workflow.signal
2139+
def done(self) -> None:
2140+
self._done = True
2141+
2142+
@workflow.query
2143+
def waiting(self) -> bool:
2144+
return self._waiting
2145+
2146+
2147+
async def test_workflow_wait_condition_timeout(client: Client):
2148+
async with new_worker(
2149+
client,
2150+
WaitConditionTimeoutWorkflow,
2151+
) as worker:
2152+
handle = await client.start_workflow(
2153+
WaitConditionTimeoutWorkflow.run,
2154+
id=f"workflow-{uuid.uuid4()}",
2155+
task_queue=worker.task_queue,
2156+
)
2157+
# Wait until it's waiting, then send the signal
2158+
async def waiting() -> bool:
2159+
return await handle.query(WaitConditionTimeoutWorkflow.waiting)
2160+
2161+
await assert_eq_eventually(True, waiting)
2162+
await handle.signal(WaitConditionTimeoutWorkflow.done)
2163+
# Wait for result which should succeed
2164+
await handle.result()
2165+
2166+
21212167
def new_worker(
21222168
client: Client,
21232169
*workflows: Type,

0 commit comments

Comments
 (0)