Skip to content

Commit 580b6fc

Browse files
authored
Skip conditions during patch and query jobs (#137)
Fixes #129
1 parent 5b0e1c8 commit 580b6fc

File tree

2 files changed

+44
-9
lines changed

2 files changed

+44
-9
lines changed

temporalio/worker/workflow_instance.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -256,15 +256,17 @@ def activate(
256256
job_sets[3].append(job)
257257

258258
# Apply every job set, running after each set
259-
for job_set in job_sets:
259+
for index, job_set in enumerate(job_sets):
260260
if not job_set:
261261
continue
262262
for job in job_set:
263263
# Let errors bubble out of these to the caller to fail the task
264264
self._apply(job)
265265

266-
# Run one iteration of the loop
267-
self._run_once()
266+
# Run one iteration of the loop. We do not allow conditions to
267+
# be checked in patch jobs (first index) or query jobs (last
268+
# index).
269+
self._run_once(check_conditions=index == 1 or index == 2)
268270
except Exception as err:
269271
logger.warning(
270272
f"Failed activation on workflow {self._info.workflow_type} with ID {self._info.workflow_id} and run ID {self._info.run_id}",
@@ -1153,7 +1155,7 @@ def _register_task(self, task: asyncio.Task, *, name: Optional[str]) -> None:
11531155
if hasattr(task, "_log_destroy_pending"):
11541156
setattr(task, "_log_destroy_pending", False)
11551157

1156-
def _run_once(self) -> None:
1158+
def _run_once(self, *, check_conditions: bool) -> None:
11571159
try:
11581160
asyncio._set_running_loop(self)
11591161

@@ -1178,11 +1180,12 @@ def _run_once(self) -> None:
11781180
# Check conditions which may add to the ready list. Also remove
11791181
# conditions whose futures have already cancelled (e.g. when
11801182
# timed out).
1181-
self._conditions[:] = [
1182-
t
1183-
for t in self._conditions
1184-
if not t[1].done() and not self._check_condition(*t)
1185-
]
1183+
if check_conditions:
1184+
self._conditions[:] = [
1185+
t
1186+
for t in self._conditions
1187+
if not t[1].done() and not self._check_condition(*t)
1188+
]
11861189
finally:
11871190
asyncio._set_running_loop(None)
11881191

tests/worker/test_workflow.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2349,6 +2349,38 @@ async def test_workflow_memo(client: Client):
23492349
pass
23502350

23512351

2352+
@workflow.defn
2353+
class QueryAffectConditionWorkflow:
2354+
def __init__(self) -> None:
2355+
self.seen_query = False
2356+
2357+
@workflow.run
2358+
async def run(self) -> None:
2359+
def condition_never_after_query():
2360+
assert not self.seen_query
2361+
return False
2362+
2363+
while True:
2364+
await workflow.wait_condition(condition_never_after_query)
2365+
2366+
@workflow.query
2367+
def check_condition(self) -> bool:
2368+
# This is a bad thing, to mutate a workflow during a query, this is just
2369+
# for this test
2370+
self.seen_query = True
2371+
return True
2372+
2373+
2374+
async def test_workflow_query_does_not_run_condition(client: Client):
2375+
async with new_worker(client, QueryAffectConditionWorkflow) as worker:
2376+
handle = await client.start_workflow(
2377+
QueryAffectConditionWorkflow.run,
2378+
id=f"workflow-{uuid.uuid4()}",
2379+
task_queue=worker.task_queue,
2380+
)
2381+
assert await handle.query(QueryAffectConditionWorkflow.check_condition)
2382+
2383+
23522384
def new_worker(
23532385
client: Client,
23542386
*workflows: Type,

0 commit comments

Comments
 (0)