Skip to content

Nexus bug fix: accept event type name casing variant used by latest server #953

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Jul 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ce6d196
Support pascal case in event type name received from server
dandavison Jul 9, 2025
d36dd26
Default to 1, and code review suggestions
dandavison Jul 9, 2025
b94a1d6
Default event ID to 0 if not present in query params
dandavison Jul 9, 2025
30d9737
Do not emit a link for StartWorkflowRequest if the event reference is…
dandavison Jul 9, 2025
996b3e1
Fixup
dandavison Jul 10, 2025
cbf3501
Convert outbound links to pascal case
dandavison Jul 10, 2025
8115ab1
Use empty string instead of None to satisfy mypy
dandavison Jul 10, 2025
513cd37
docstring formatting
dandavison Jul 10, 2025
7875793
_link_conversion module
dandavison Jul 10, 2025
0e64350
Rename utilities
dandavison Jul 10, 2025
d9cea0f
Fix regex
dandavison Jul 10, 2025
3751138
Add happy path tests
dandavison Jul 10, 2025
fff9147
link conversion tests
dandavison Jul 10, 2025
6e4c777
Rename
dandavison Jul 10, 2025
234f89d
link conversion tests
dandavison Jul 10, 2025
a14a1b9
Change order of params in query param string
dandavison Jul 10, 2025
02e2918
Docstrings
dandavison Jul 10, 2025
5ef603c
Make Nexus HTTP port usage in tests work with external server
dandavison Jul 10, 2025
1413f53
Compute host more correctly in nexus utility
dandavison Jul 10, 2025
3710e5f
Reorder assertions
dandavison Jul 10, 2025
0479201
Delete resolved TODO
dandavison Jul 10, 2025
f5803cd
Fix test to access links in both places on workflow event
dandavison Jul 10, 2025
7899721
Allow links to be attached to event in both places
dandavison Jul 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions temporalio/nexus/_link_conversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
from __future__ import annotations

import logging
import re
import urllib.parse
from typing import (
Any,
Optional,
)

import nexusrpc

import temporalio.api.common.v1
import temporalio.api.enums.v1
import temporalio.client

logger = logging.getLogger(__name__)

_LINK_URL_PATH_REGEX = re.compile(
r"^/namespaces/(?P<namespace>[^/]+)/workflows/(?P<workflow_id>[^/]+)/(?P<run_id>[^/]+)/history$"
)
LINK_EVENT_ID_PARAM_NAME = "eventID"
LINK_EVENT_TYPE_PARAM_NAME = "eventType"


def workflow_handle_to_workflow_execution_started_event_link(
handle: temporalio.client.WorkflowHandle[Any, Any],
) -> temporalio.api.common.v1.Link.WorkflowEvent:
"""Create a WorkflowEvent link corresponding to a started workflow"""
if handle.first_execution_run_id is None:
raise ValueError(
f"Workflow handle {handle} has no first execution run ID. "
f"Cannot create WorkflowExecutionStarted event link."
)
return temporalio.api.common.v1.Link.WorkflowEvent(
namespace=handle._client.namespace,
workflow_id=handle.id,
run_id=handle.first_execution_run_id,
event_ref=temporalio.api.common.v1.Link.WorkflowEvent.EventReference(
event_id=1,
event_type=temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
),
# TODO(nexus-preview): RequestIdReference
)


def workflow_event_to_nexus_link(
workflow_event: temporalio.api.common.v1.Link.WorkflowEvent,
) -> nexusrpc.Link:
"""Convert a WorkflowEvent link into a nexusrpc link

Used when propagating links from a StartWorkflow response to a Nexus start operation
response.
"""
scheme = "temporal"
namespace = urllib.parse.quote(workflow_event.namespace)
workflow_id = urllib.parse.quote(workflow_event.workflow_id)
run_id = urllib.parse.quote(workflow_event.run_id)
path = f"/namespaces/{namespace}/workflows/{workflow_id}/{run_id}/history"
query_params = _event_reference_to_query_params(workflow_event.event_ref)
return nexusrpc.Link(
url=urllib.parse.urlunparse((scheme, "", path, "", query_params, "")),
type=workflow_event.DESCRIPTOR.full_name,
)


def nexus_link_to_workflow_event(
link: nexusrpc.Link,
) -> Optional[temporalio.api.common.v1.Link.WorkflowEvent]:
"""Convert a nexus link into a WorkflowEvent link

This is used when propagating links from a Nexus start operation request to a
StartWorklow request.
"""
url = urllib.parse.urlparse(link.url)
match = _LINK_URL_PATH_REGEX.match(url.path)
if not match:
logger.warning(
f"Invalid Nexus link: {link}. Expected path to match {_LINK_URL_PATH_REGEX.pattern}"
)
return None
try:
event_ref = _query_params_to_event_reference(url.query)
except ValueError as err:
logger.warning(
f"Failed to parse event reference from Nexus link URL query parameters: {link} ({err})"
)
return None

groups = match.groupdict()
return temporalio.api.common.v1.Link.WorkflowEvent(
namespace=urllib.parse.unquote(groups["namespace"]),
workflow_id=urllib.parse.unquote(groups["workflow_id"]),
run_id=urllib.parse.unquote(groups["run_id"]),
event_ref=event_ref,
)


def _event_reference_to_query_params(
event_ref: temporalio.api.common.v1.Link.WorkflowEvent.EventReference,
) -> str:
event_type_name = temporalio.api.enums.v1.EventType.Name(event_ref.event_type)
if event_type_name.startswith("EVENT_TYPE_"):
event_type_name = _event_type_constant_case_to_pascal_case(
event_type_name.removeprefix("EVENT_TYPE_")
)
return urllib.parse.urlencode(
{
"eventID": event_ref.event_id,
"eventType": event_type_name,
"referenceType": "EventReference",
}
)


def _query_params_to_event_reference(
raw_query_params: str,
) -> temporalio.api.common.v1.Link.WorkflowEvent.EventReference:
"""Return an EventReference from the query params or raise ValueError."""
query_params = urllib.parse.parse_qs(raw_query_params)

[reference_type] = query_params.get("referenceType") or [""]
if reference_type != "EventReference":
raise ValueError(
f"Expected Nexus link URL query parameter referenceType to be EventReference but got: {reference_type}"
)
# event type
[raw_event_type_name] = query_params.get(LINK_EVENT_TYPE_PARAM_NAME) or [""]
if not raw_event_type_name:
raise ValueError(f"query params do not contain event type: {query_params}")
if raw_event_type_name.startswith("EVENT_TYPE_"):
event_type_name = raw_event_type_name
elif re.match("[A-Z][a-z]", raw_event_type_name):
event_type_name = "EVENT_TYPE_" + _event_type_pascal_case_to_constant_case(
raw_event_type_name
)
else:
raise ValueError(f"Invalid event type name: {raw_event_type_name}")

# event id
event_id = 0
[raw_event_id] = query_params.get(LINK_EVENT_ID_PARAM_NAME) or [""]
if raw_event_id:
try:
event_id = int(raw_event_id)
except ValueError:
raise ValueError(f"Query params contain invalid event id: {raw_event_id}")

return temporalio.api.common.v1.Link.WorkflowEvent.EventReference(
event_type=temporalio.api.enums.v1.EventType.Value(event_type_name),
event_id=event_id,
)


def _event_type_constant_case_to_pascal_case(s: str) -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a generic helper, nothing to do with event types.

Suggested change
def _event_type_constant_case_to_pascal_case(s: str) -> str:
def _constant_case_to_pascal_case(s: str) -> str:

Copy link
Contributor Author

@dandavison dandavison Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to qualify their names with the event_type prefix to avoid any implication that they are suitable for use as general case conversion utilities. For example, they have undefined behavior currently if the input doesn't match the expectation that it is already in the expected input form, and the tests don't cover trailing/leading underscores, etc.

"""Convert a CONSTANT_CASE string to PascalCase.

>>> _event_type_constant_case_to_pascal_case("NEXUS_OPERATION_SCHEDULED")
"NexusOperationScheduled"
"""
return re.sub(r"(\b|_)([a-z])", lambda m: m.groups()[1].upper(), s.lower())


def _event_type_pascal_case_to_constant_case(s: str) -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

"""Convert a PascalCase string to CONSTANT_CASE.

>>> _event_type_pascal_case_to_constant_case("NexusOperationScheduled")
"NEXUS_OPERATION_SCHEDULED"
"""
return re.sub(r"([A-Z])", r"_\1", s).lstrip("_").upper()
Copy link
Contributor Author

@dandavison dandavison Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incidentally, there are a couple of bugs in the TS versions of these functions (e.g. \b in a character class is backspace I believe, not word boundary, and the utilities would fail on ContainsAOneLetterWord). Probably doesn't affect their actual usage in practice on event type names. cc @mjameswh

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

\b is a word boundary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I do see how this case isn't covered in TS.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works though and is more compact:

function pascalCaseToConstantCase(s: string) {
  return s.replace(/\B[A-Z]/g, (m) => `_${m[0]}`).toUpperCase();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

\b in a character class is backspace I believe, not word boundary

\b is a word boundary.

Not in a character class, which is how it's being used in sdk-typescript

console.log('a'.match(/\b/))   // ""
console.log('a'.match(/[\b]/)) // null

101 changes: 4 additions & 97 deletions temporalio/nexus/_operation_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import dataclasses
import logging
import re
import urllib.parse
from contextvars import ContextVar
from dataclasses import dataclass
from datetime import timedelta
Expand All @@ -20,14 +18,13 @@
overload,
)

import nexusrpc.handler
from nexusrpc.handler import CancelOperationContext, StartOperationContext
from typing_extensions import Concatenate

import temporalio.api.common.v1
import temporalio.api.enums.v1
import temporalio.client
import temporalio.common
from temporalio.nexus import _link_conversion
from temporalio.nexus._token import WorkflowHandle
from temporalio.types import (
MethodAsyncNoParam,
Expand Down Expand Up @@ -128,11 +125,6 @@ def _get_callbacks(
ctx = self.nexus_context
return (
[
# TODO(nexus-prerelease): For WorkflowRunOperation, when it handles the Nexus
# request, it needs to copy the links to the callback in
# StartWorkflowRequest.CompletionCallbacks and to StartWorkflowRequest.Links
# (for backwards compatibility). PR reference in Go SDK:
# https://github.com/temporalio/sdk-go/pull/1945
temporalio.client.NexusCallback(
url=ctx.callback_url,
headers=ctx.callback_headers,
Expand All @@ -147,16 +139,16 @@ def _get_workflow_event_links(
) -> list[temporalio.api.common.v1.Link.WorkflowEvent]:
event_links = []
for inbound_link in self.nexus_context.inbound_links:
if link := _nexus_link_to_workflow_event(inbound_link):
if link := _link_conversion.nexus_link_to_workflow_event(inbound_link):
event_links.append(link)
return event_links

def _add_outbound_links(
self, workflow_handle: temporalio.client.WorkflowHandle[Any, Any]
):
try:
link = _workflow_event_to_nexus_link(
_workflow_handle_to_workflow_execution_started_event_link(
link = _link_conversion.workflow_event_to_nexus_link(
_link_conversion.workflow_handle_to_workflow_execution_started_event_link(
workflow_handle
)
)
Expand Down Expand Up @@ -479,91 +471,6 @@ def set(self) -> None:
_temporal_cancel_operation_context.set(self)


def _workflow_handle_to_workflow_execution_started_event_link(
handle: temporalio.client.WorkflowHandle[Any, Any],
) -> temporalio.api.common.v1.Link.WorkflowEvent:
if handle.first_execution_run_id is None:
raise ValueError(
f"Workflow handle {handle} has no first execution run ID. "
"Cannot create WorkflowExecutionStarted event link."
)
return temporalio.api.common.v1.Link.WorkflowEvent(
namespace=handle._client.namespace,
workflow_id=handle.id,
run_id=handle.first_execution_run_id,
event_ref=temporalio.api.common.v1.Link.WorkflowEvent.EventReference(
event_id=1,
event_type=temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
),
# TODO(nexus-prerelease): RequestIdReference?
)


def _workflow_event_to_nexus_link(
workflow_event: temporalio.api.common.v1.Link.WorkflowEvent,
) -> nexusrpc.Link:
scheme = "temporal"
namespace = urllib.parse.quote(workflow_event.namespace)
workflow_id = urllib.parse.quote(workflow_event.workflow_id)
run_id = urllib.parse.quote(workflow_event.run_id)
path = f"/namespaces/{namespace}/workflows/{workflow_id}/{run_id}/history"
query_params = urllib.parse.urlencode(
{
"eventType": temporalio.api.enums.v1.EventType.Name(
workflow_event.event_ref.event_type
),
"referenceType": "EventReference",
}
)
return nexusrpc.Link(
url=urllib.parse.urlunparse((scheme, "", path, "", query_params, "")),
type=workflow_event.DESCRIPTOR.full_name,
)


_LINK_URL_PATH_REGEX = re.compile(
r"^/namespaces/(?P<namespace>[^/]+)/workflows/(?P<workflow_id>[^/]+)/(?P<run_id>[^/]+)/history$"
)


def _nexus_link_to_workflow_event(
link: nexusrpc.Link,
) -> Optional[temporalio.api.common.v1.Link.WorkflowEvent]:
url = urllib.parse.urlparse(link.url)
match = _LINK_URL_PATH_REGEX.match(url.path)
if not match:
logger.warning(
f"Invalid Nexus link: {link}. Expected path to match {_LINK_URL_PATH_REGEX.pattern}"
)
return None
try:
query_params = urllib.parse.parse_qs(url.query)
[reference_type] = query_params.get("referenceType", [])
if reference_type != "EventReference":
raise ValueError(
f"Expected Nexus link URL query parameter referenceType to be EventReference but got: {reference_type}"
)
[event_type_name] = query_params.get("eventType", [])
event_ref = temporalio.api.common.v1.Link.WorkflowEvent.EventReference(
# TODO(nexus-prerelease): confirm that it is correct not to use event_id.
# Should the proto say explicitly that it's optional or how it behaves when it's missing?
event_type=temporalio.api.enums.v1.EventType.Value(event_type_name)
)
except ValueError as err:
logger.warning(
f"Failed to parse event type from Nexus link URL query parameters: {link} ({err})"
)
event_ref = None

groups = match.groupdict()
return temporalio.api.common.v1.Link.WorkflowEvent(
namespace=urllib.parse.unquote(groups["namespace"]),
workflow_id=urllib.parse.unquote(groups["workflow_id"]),
run_id=urllib.parse.unquote(groups["run_id"]),
event_ref=event_ref,
)


class LoggerAdapter(logging.LoggerAdapter):
"""Logger adapter that adds Nexus operation context information."""

Expand Down
15 changes: 13 additions & 2 deletions tests/helpers/nexus.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import dataclasses
from dataclasses import dataclass
from typing import Any, Mapping, Optional
from urllib.parse import urlparse

import temporalio.api.failure.v1
import temporalio.api.nexus.v1
import temporalio.api.operatorservice.v1
import temporalio.workflow
from temporalio.client import Client
from temporalio.converter import FailureConverter, PayloadConverter
from temporalio.testing import WorkflowEnvironment

with temporalio.workflow.unsafe.imports_passed_through():
import httpx
Expand Down Expand Up @@ -58,7 +60,7 @@ async def start_operation(
# TODO(nexus-preview): Support callback URL as query param
async with httpx.AsyncClient() as http_client:
return await http_client.post(
f"{self.server_address}/nexus/endpoints/{self.endpoint}/services/{self.service}/{operation}",
f"http://{self.server_address}/nexus/endpoints/{self.endpoint}/services/{self.service}/{operation}",
json=body,
headers=headers,
)
Expand All @@ -70,11 +72,20 @@ async def cancel_operation(
) -> httpx.Response:
async with httpx.AsyncClient() as http_client:
return await http_client.post(
f"{self.server_address}/nexus/endpoints/{self.endpoint}/services/{self.service}/{operation}/cancel",
f"http://{self.server_address}/nexus/endpoints/{self.endpoint}/services/{self.service}/{operation}/cancel",
# Token can also be sent as "Nexus-Operation-Token" header
params={"token": token},
)

@staticmethod
def default_server_address(env: WorkflowEnvironment) -> str:
# TODO(nexus-preview): nexus tests are making http requests directly but this is
# not officially supported.
parsed = urlparse(env.client.service_client.config.target_host)
host = parsed.hostname or "127.0.0.1"
http_port = getattr(env, "_http_port", 7243)
return f"{host}:{http_port}"


def dataclass_as_dict(dataclass: Any) -> dict[str, Any]:
"""
Expand Down
Loading