From 39bea32a43dba5e3bc759b3687f7e29e05b2e804 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 14 Apr 2025 15:22:09 -0400 Subject: [PATCH 1/3] Add an equal number of thread-blocking workflows --- .github/workflows/ci.yml | 4 ++++ .github/workflows/run-bench.yml | 14 -------------- scripts/run_bench.py | 18 +++++++++++++++++- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2b079e60b..21e498132 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,6 +7,10 @@ on: - "releases/*" jobs: + + nightly: + uses: ./.github/workflows/run-bench.yml + # Build and test the project build-lint-test: strategy: diff --git a/.github/workflows/run-bench.yml b/.github/workflows/run-bench.yml index 6dcad63e8..0cecd6166 100644 --- a/.github/workflows/run-bench.yml +++ b/.github/workflows/run-bench.yml @@ -55,17 +55,3 @@ jobs: - run: poe run-bench --workflow-count 100 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }} - run: poe run-bench --workflow-count 100 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }} - run: poe run-bench --workflow-count 100 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }} - - - run: poe run-bench --workflow-count 1000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }} - - run: poe run-bench --workflow-count 1000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }} - - run: poe run-bench --workflow-count 1000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }} - - - run: poe run-bench --workflow-count 1000 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }} - - run: poe run-bench --workflow-count 1000 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }} - - run: poe run-bench --workflow-count 1000 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }} - - - run: poe run-bench --workflow-count 10000 --max-cached-workflows 10000 --max-concurrent 10000 ${{ inputs.sandbox-arg }} - - run: poe run-bench --workflow-count 10000 --max-cached-workflows 10000 --max-concurrent 10000 ${{ inputs.sandbox-arg }} - - - run: poe run-bench --workflow-count 10000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }} - - run: poe run-bench --workflow-count 10000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }} \ No newline at end of file diff --git a/scripts/run_bench.py b/scripts/run_bench.py index decbe4810..74008d275 100644 --- a/scripts/run_bench.py +++ b/scripts/run_bench.py @@ -31,6 +31,15 @@ async def bench_activity(name: str) -> str: return f"Hello, {name}!" +@workflow.defn +class DeadlockInterruptibleWorkflow: + @workflow.run + async def run(self) -> None: + # Infinite loop, which is interruptible via PyThreadState_SetAsyncExc + while True: + pass + + async def main(): logging.basicConfig( format="%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s", @@ -86,6 +95,13 @@ async def report_mem(): logger.debug("Starting %s workflows", args.workflow_count) pre_start_seconds = time.monotonic() handles = [ + await env.client.start_workflow( + DeadlockInterruptibleWorkflow.run, + id=f"deadlock-interruptible-workflow-{i}-{uuid.uuid4()}", + task_queue=task_queue, + ) + for i in range(args.workflow_count) + ] + [ await env.client.start_workflow( BenchWorkflow.run, f"user-{i}", @@ -101,7 +117,7 @@ async def report_mem(): async with Worker( env.client, task_queue=task_queue, - workflows=[BenchWorkflow], + workflows=[BenchWorkflow, DeadlockInterruptibleWorkflow], activities=[bench_activity], workflow_runner=SandboxedWorkflowRunner() if args.sandbox From 9e1695c6ece16d2abdcd2734c622d9490a316c59 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 14 Apr 2025 15:41:41 -0400 Subject: [PATCH 2/3] Just add one deadlocking workflow --- scripts/run_bench.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/run_bench.py b/scripts/run_bench.py index 74008d275..9cd4080c9 100644 --- a/scripts/run_bench.py +++ b/scripts/run_bench.py @@ -100,7 +100,7 @@ async def report_mem(): id=f"deadlock-interruptible-workflow-{i}-{uuid.uuid4()}", task_queue=task_queue, ) - for i in range(args.workflow_count) + for i in range(1) ] + [ await env.client.start_workflow( BenchWorkflow.run, From d72e9ab8f689164e73af303f3dd56a1cb43ac37c Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 14 Apr 2025 17:38:34 -0400 Subject: [PATCH 3/3] wip --- scripts/run_bench.py | 2 +- temporalio/testing/_workflow.py | 2 +- temporalio/worker/_workflow.py | 2 +- tests/worker/test_workflow.py | 3 +++ 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/scripts/run_bench.py b/scripts/run_bench.py index 9cd4080c9..90a1db80c 100644 --- a/scripts/run_bench.py +++ b/scripts/run_bench.py @@ -70,7 +70,7 @@ async def report_mem(): nonlocal max_mem while True: try: - await asyncio.sleep(0.8) + await asyncio.sleep(0.1) finally: # TODO(cretz): "vms" appears more accurate on Windows, but # rss is more accurate on Linux diff --git a/temporalio/testing/_workflow.py b/temporalio/testing/_workflow.py index 85c5404ea..2b2104dc9 100644 --- a/temporalio/testing/_workflow.py +++ b/temporalio/testing/_workflow.py @@ -89,7 +89,7 @@ async def start_local( ip: str = "127.0.0.1", port: Optional[int] = None, download_dest_dir: Optional[str] = None, - ui: bool = False, + ui: bool = True, runtime: Optional[temporalio.runtime.Runtime] = None, search_attributes: Sequence[temporalio.common.SearchAttributeKey] = (), dev_server_existing_path: Optional[str] = None, diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 37e6810c9..167f3e8b0 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -8,7 +8,6 @@ import os import sys import threading -from dataclasses import dataclass from datetime import timezone from types import TracebackType from typing import ( @@ -657,6 +656,7 @@ def attempt_deadlock_interruption(self) -> None: return deadlocked_thread_id = self.instance.get_thread_id() if deadlocked_thread_id: + print("🌈 _raise_in_thread: interrupting deadlock") temporalio.bridge.runtime.Runtime._raise_in_thread( deadlocked_thread_id, _InterruptDeadlockError ) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index f57f4b9fd..a19fcea33 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -6989,6 +6989,8 @@ async def run(self) -> None: try: while True: pass + except BaseException as e: + print(f"🌈 DeadlockInterruptibleWorkflow: {e.__class__.__name__}({e})") finally: global deadlock_interruptible_completed deadlock_interruptible_completed += 1 @@ -7019,6 +7021,7 @@ async def check_completed(): await assert_eventually(check_completed) completed_sec = time.monotonic() + await handle.result() # Confirm worker shutdown didn't hang assert time.monotonic() - completed_sec < 20