From bd49e45bb312ac7dafbf2f7be42e0a0d7bd6558b Mon Sep 17 00:00:00 2001 From: TomuHirata Date: Tue, 8 Jul 2025 15:34:31 +0900 Subject: [PATCH 1/2] fix broken Otel propagation in dspy ParallelExecutor --- dspy/utils/parallelizer.py | 31 ++++++++++++++++++++ pyproject.toml | 4 ++- tests/utils/test_parallelizer.py | 34 ++++++++++++++++++++++ uv.lock | 50 ++++++++++++++++++++++++++++++-- 4 files changed, 115 insertions(+), 4 deletions(-) diff --git a/dspy/utils/parallelizer.py b/dspy/utils/parallelizer.py index d45df534ed..36d40d3e3d 100644 --- a/dspy/utils/parallelizer.py +++ b/dspy/utils/parallelizer.py @@ -13,6 +13,29 @@ logger = logging.getLogger(__name__) +def _with_otel_context(otel_context): + """Decorator to attach OpenTelemetry context to a function.""" + def decorator(func): + def wrapper(*args, **kwargs): + ctx_token = None + if otel_context: + try: + from opentelemetry.context import attach, detach + ctx_token = attach(otel_context) + except ImportError: + pass + try: + return func(*args, **kwargs) + finally: + if ctx_token: + try: + detach(ctx_token) + except: + pass + return wrapper + return decorator + + class ParallelExecutor: def __init__( self, @@ -75,7 +98,15 @@ def _execute_parallel(self, function, data): start_time_lock = threading.Lock() resubmitted = set() + # Capture OpenTelemetry context for trace preservation + try: + from opentelemetry.context import get_current + otel_context = get_current() + except ImportError: + otel_context = None + # This is the worker function each thread will run. + @_with_otel_context(otel_context) def worker(parent_overrides, submission_id, index, item): if self.cancel_jobs.is_set(): return index, job_cancelled diff --git a/pyproject.toml b/pyproject.toml index a2c092853c..58d5ba931a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,7 +66,9 @@ test_extras = [ "datasets>=2.14.6", "pandas>=2.1.1", "optuna>=3.4.0", - "langchain_core", + "langchain_core>=0.3.65", + "opentelemetry-api<3,>=1.9.0", + "opentelemetry-sdk<3,>=1.9.0" ] [tool.setuptools.packages.find] diff --git a/tests/utils/test_parallelizer.py b/tests/utils/test_parallelizer.py index 28307e4ea8..316bbcd7f6 100644 --- a/tests/utils/test_parallelizer.py +++ b/tests/utils/test_parallelizer.py @@ -1,3 +1,4 @@ +import importlib import time import pytest @@ -59,3 +60,36 @@ def task(item): # Verify that the results exclude the failed task assert results == [1, 2, None, 4, 5] + +@pytest.mark.skipif(not importlib.util.find_spec("opentelemetry"), reason="OpenTelemetry not installed") +@pytest.mark.extra +def test_otel_context_propagation(): + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + + tracer_provider = TracerProvider() + trace.set_tracer_provider(tracer_provider) + tracer = trace.get_tracer(__name__) + + with tracer.start_as_current_span("parent_span") as parent_span: + parent_span.set_attribute("test_attribute", "parent_value") + parent_span_id = parent_span.get_span_context().span_id + parent_trace_id = parent_span.get_span_context().trace_id + + seen_spans = [] + + def task(item): + with tracer.start_as_current_span("child_span") as child_span: + child_span.set_attribute("test_attribute", "child_value") + seen_spans.append(child_span) + return item * 2 + + executor = ParallelExecutor(num_threads=2, disable_progress_bar=True) + executor.execute(task, [1, 2, 3]) + + assert len(seen_spans) == 3 + + for span in seen_spans: + if span.name == "child_span": + assert span.parent.span_id == parent_span_id + assert span.get_span_context().trace_id == parent_trace_id diff --git a/uv.lock b/uv.lock index 3710d4109b..5b28d13552 100644 --- a/uv.lock +++ b/uv.lock @@ -664,7 +664,7 @@ wheels = [ [[package]] name = "dspy" -version = "3.0.0b1" +version = "3.0.0b2" source = { editable = "." } dependencies = [ { name = "anyio" }, @@ -719,6 +719,8 @@ test-extras = [ { name = "langchain-core" }, { name = "mcp", version = "1.9.3", source = { registry = "https://pypi.org/simple" }, marker = "sys_platform != 'win32'" }, { name = "mcp", version = "1.9.4", source = { registry = "https://pypi.org/simple" }, marker = "sys_platform == 'win32'" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-sdk" }, { name = "optuna" }, { name = "pandas" }, ] @@ -741,7 +743,7 @@ requires-dist = [ { name = "joblib", specifier = "~=1.3" }, { name = "json-repair", specifier = ">=0.30.0" }, { name = "langchain-core", marker = "extra == 'langchain'" }, - { name = "langchain-core", marker = "extra == 'test-extras'" }, + { name = "langchain-core", marker = "extra == 'test-extras'", specifier = ">=0.3.65" }, { name = "litellm", specifier = ">=1.64.0" }, { name = "litellm", marker = "sys_platform == 'win32' and extra == 'dev'", specifier = ">=1.64.0" }, { name = "litellm", extras = ["proxy"], marker = "sys_platform != 'win32' and extra == 'dev'", specifier = ">=1.64.0" }, @@ -750,6 +752,8 @@ requires-dist = [ { name = "mcp", marker = "python_full_version >= '3.10' and extra == 'test-extras'" }, { name = "numpy", specifier = ">=1.26.0" }, { name = "openai", specifier = ">=0.28.1" }, + { name = "opentelemetry-api", marker = "extra == 'test-extras'", specifier = ">=1.9.0,<3" }, + { name = "opentelemetry-sdk", marker = "extra == 'test-extras'", specifier = ">=1.9.0,<3" }, { name = "optuna", specifier = ">=3.4.0" }, { name = "optuna", marker = "extra == 'test-extras'", specifier = ">=3.4.0" }, { name = "pandas", marker = "extra == 'test-extras'", specifier = ">=2.1.1" }, @@ -788,7 +792,7 @@ name = "exceptiongroup" version = "1.3.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "typing-extensions", marker = "python_full_version < '3.13'" }, + { name = "typing-extensions", marker = "python_full_version < '3.11'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/0b/9f/a65090624ecf468cdca03533906e7c69ed7588582240cfe7cc9e770b50eb/exceptiongroup-1.3.0.tar.gz", hash = "sha256:b241f5885f560bc56a59ee63ca4c6a8bfa46ae4ad651af316d4e81817bb9fd88", size = 29749, upload-time = "2025-05-10T17:42:51.123Z" } wheels = [ @@ -1963,6 +1967,46 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f4/03/ef68d77a38dd383cbed7fc898857d394d5a8b0520a35f054e7fe05dc3ac1/openai-1.88.0-py3-none-any.whl", hash = "sha256:7edd7826b3b83f5846562a6f310f040c79576278bf8e3687b30ba05bb5dff978", size = 734293, upload-time = "2025-06-17T05:04:43.858Z" }, ] +[[package]] +name = "opentelemetry-api" +version = "1.34.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "importlib-metadata" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/4d/5e/94a8cb759e4e409022229418294e098ca7feca00eb3c467bb20cbd329bda/opentelemetry_api-1.34.1.tar.gz", hash = "sha256:64f0bd06d42824843731d05beea88d4d4b6ae59f9fe347ff7dfa2cc14233bbb3", size = 64987, upload-time = "2025-06-10T08:55:19.818Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a5/3a/2ba85557e8dc024c0842ad22c570418dc02c36cbd1ab4b832a93edf071b8/opentelemetry_api-1.34.1-py3-none-any.whl", hash = "sha256:b7df4cb0830d5a6c29ad0c0691dbae874d8daefa934b8b1d642de48323d32a8c", size = 65767, upload-time = "2025-06-10T08:54:56.717Z" }, +] + +[[package]] +name = "opentelemetry-sdk" +version = "1.34.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/6f/41/fe20f9036433da8e0fcef568984da4c1d1c771fa072ecd1a4d98779dccdd/opentelemetry_sdk-1.34.1.tar.gz", hash = "sha256:8091db0d763fcd6098d4781bbc80ff0971f94e260739aa6afe6fd379cdf3aa4d", size = 159441, upload-time = "2025-06-10T08:55:33.028Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/07/1b/def4fe6aa73f483cabf4c748f4c25070d5f7604dcc8b52e962983491b29e/opentelemetry_sdk-1.34.1-py3-none-any.whl", hash = "sha256:308effad4059562f1d92163c61c8141df649da24ce361827812c40abb2a1e96e", size = 118477, upload-time = "2025-06-10T08:55:16.02Z" }, +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.55b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/5d/f0/f33458486da911f47c4aa6db9bda308bb80f3236c111bf848bd870c16b16/opentelemetry_semantic_conventions-0.55b1.tar.gz", hash = "sha256:ef95b1f009159c28d7a7849f5cbc71c4c34c845bb514d66adfdf1b3fff3598b3", size = 119829, upload-time = "2025-06-10T08:55:33.881Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1a/89/267b0af1b1d0ba828f0e60642b6a5116ac1fd917cde7fc02821627029bd1/opentelemetry_semantic_conventions-0.55b1-py3-none-any.whl", hash = "sha256:5da81dfdf7d52e3d37f8fe88d5e771e191de924cfff5f550ab0b8f7b2409baed", size = 196223, upload-time = "2025-06-10T08:55:17.638Z" }, +] + [[package]] name = "optuna" version = "4.4.0" From bc7abf030b31ebd4764c95c59cf4f912e0353242 Mon Sep 17 00:00:00 2001 From: TomuHirata Date: Tue, 8 Jul 2025 15:54:17 +0900 Subject: [PATCH 2/2] use @wraps --- dspy/utils/parallelizer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dspy/utils/parallelizer.py b/dspy/utils/parallelizer.py index 36d40d3e3d..a9c412d83f 100644 --- a/dspy/utils/parallelizer.py +++ b/dspy/utils/parallelizer.py @@ -7,6 +7,7 @@ import time import traceback from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait +from functools import wraps import tqdm @@ -16,6 +17,7 @@ def _with_otel_context(otel_context): """Decorator to attach OpenTelemetry context to a function.""" def decorator(func): + @wraps(func) def wrapper(*args, **kwargs): ctx_token = None if otel_context: