Skip to content

Commit cea3f02

Browse files
committed
_link_conversion module
1 parent 37b6134 commit cea3f02

File tree

3 files changed

+162
-149
lines changed

3 files changed

+162
-149
lines changed

temporalio/nexus/_link_conversion.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
import re
5+
import urllib.parse
6+
from typing import (
7+
Any,
8+
Optional,
9+
)
10+
11+
import nexusrpc
12+
13+
import temporalio.api.common.v1
14+
import temporalio.api.enums.v1
15+
import temporalio.client
16+
17+
logger = logging.getLogger(__name__)
18+
19+
_LINK_URL_PATH_REGEX = re.compile(
20+
r"^/namespaces/(?P<namespace>[^/]+)/workflows/(?P<workflow_id>[^/]+)/(?P<run_id>[^/]+)/history$"
21+
)
22+
LINK_EVENT_ID_PARAM_NAME = "eventID"
23+
LINK_EVENT_TYPE_PARAM_NAME = "eventType"
24+
25+
26+
def workflow_handle_to_workflow_execution_started_event_link(
27+
handle: temporalio.client.WorkflowHandle[Any, Any],
28+
) -> temporalio.api.common.v1.Link.WorkflowEvent:
29+
if handle.first_execution_run_id is None:
30+
raise ValueError(
31+
f"Workflow handle {handle} has no first execution run ID. "
32+
f"Cannot create WorkflowExecutionStarted event link."
33+
)
34+
return temporalio.api.common.v1.Link.WorkflowEvent(
35+
namespace=handle._client.namespace,
36+
workflow_id=handle.id,
37+
run_id=handle.first_execution_run_id,
38+
event_ref=temporalio.api.common.v1.Link.WorkflowEvent.EventReference(
39+
event_id=1,
40+
event_type=temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
41+
),
42+
# TODO(nexus-preview): RequestIdReference?
43+
)
44+
45+
46+
def workflow_event_to_nexus_link(
47+
workflow_event: temporalio.api.common.v1.Link.WorkflowEvent,
48+
) -> nexusrpc.Link:
49+
scheme = "temporal"
50+
namespace = urllib.parse.quote(workflow_event.namespace)
51+
workflow_id = urllib.parse.quote(workflow_event.workflow_id)
52+
run_id = urllib.parse.quote(workflow_event.run_id)
53+
path = f"/namespaces/{namespace}/workflows/{workflow_id}/{run_id}/history"
54+
query_params = _query_params_from_event_reference(workflow_event.event_ref)
55+
return nexusrpc.Link(
56+
url=urllib.parse.urlunparse((scheme, "", path, "", query_params, "")),
57+
type=workflow_event.DESCRIPTOR.full_name,
58+
)
59+
60+
61+
def nexus_link_to_workflow_event(
62+
link: nexusrpc.Link,
63+
) -> Optional[temporalio.api.common.v1.Link.WorkflowEvent]:
64+
url = urllib.parse.urlparse(link.url)
65+
match = _LINK_URL_PATH_REGEX.match(url.path)
66+
if not match:
67+
logger.warning(
68+
f"Invalid Nexus link: {link}. Expected path to match {_LINK_URL_PATH_REGEX.pattern}"
69+
)
70+
return None
71+
try:
72+
event_ref = _event_reference_from_query_params(url.query)
73+
except ValueError as err:
74+
logger.warning(
75+
f"Failed to parse event reference from Nexus link URL query parameters: {link} ({err})"
76+
)
77+
return None
78+
79+
groups = match.groupdict()
80+
return temporalio.api.common.v1.Link.WorkflowEvent(
81+
namespace=urllib.parse.unquote(groups["namespace"]),
82+
workflow_id=urllib.parse.unquote(groups["workflow_id"]),
83+
run_id=urllib.parse.unquote(groups["run_id"]),
84+
event_ref=event_ref,
85+
)
86+
87+
88+
def _query_params_from_event_reference(
89+
event_ref: temporalio.api.common.v1.Link.WorkflowEvent.EventReference,
90+
) -> str:
91+
event_type_name = temporalio.api.enums.v1.EventType.Name(event_ref.event_type)
92+
if event_type_name.startswith("EVENT_TYPE_"):
93+
event_type_name = _constant_case_to_pascal_case(
94+
event_type_name.removeprefix("EVENT_TYPE_")
95+
)
96+
return urllib.parse.urlencode(
97+
{"eventType": event_type_name, "referenceType": "EventReference"}
98+
)
99+
100+
101+
def _event_reference_from_query_params(
102+
raw_query_params: str,
103+
) -> temporalio.api.common.v1.Link.WorkflowEvent.EventReference:
104+
"""Return an EventReference from the query params or raise ValueError."""
105+
query_params = urllib.parse.parse_qs(raw_query_params)
106+
107+
[reference_type] = query_params.get("referenceType") or [""]
108+
if reference_type != "EventReference":
109+
raise ValueError(
110+
f"Expected Nexus link URL query parameter referenceType to be EventReference but got: {reference_type}"
111+
)
112+
# event type
113+
[raw_event_type_name] = query_params.get(LINK_EVENT_TYPE_PARAM_NAME) or [""]
114+
if not raw_event_type_name:
115+
raise ValueError(f"query params do not contain event type: {query_params}")
116+
if raw_event_type_name.startswith("EVENT_TYPE_"):
117+
event_type_name = raw_event_type_name
118+
elif re.match("[A-Z][a-z]", raw_event_type_name):
119+
event_type_name = "EVENT_TYPE_" + _pascal_case_to_constant_case(
120+
raw_event_type_name
121+
)
122+
else:
123+
raise ValueError(f"Invalid event type name: {raw_event_type_name}")
124+
125+
# event id
126+
event_id = 0
127+
[raw_event_id] = query_params.get(LINK_EVENT_ID_PARAM_NAME) or [""]
128+
if raw_event_id:
129+
try:
130+
event_id = int(raw_event_id)
131+
except ValueError:
132+
raise ValueError(f"Query params contain invalid event id: {raw_event_id}")
133+
134+
return temporalio.api.common.v1.Link.WorkflowEvent.EventReference(
135+
event_type=temporalio.api.enums.v1.EventType.Value(event_type_name),
136+
event_id=event_id,
137+
)
138+
139+
140+
def _constant_case_to_pascal_case(s: str) -> str:
141+
"""Convert a CONSTANT_CASE string to PascalCase.
142+
143+
>>> _constant_case_to_pascal_case("NEXUS_OPERATION_SCHEDULED")
144+
"NexusOperationScheduled"
145+
"""
146+
return re.sub(r"(\b|_)([a-z])", lambda m: m.groups()[1].upper(), s.lower())
147+
148+
149+
def _pascal_case_to_constant_case(s: str) -> str:
150+
"""Convert a PascalCase string to CONSTANT_CASE.
151+
152+
>>> _pascal_case_to_constant_case("NexusOperationScheduled")
153+
"NEXUS_OPERATION_SCHEDULED"
154+
"""
155+
return re.sub(r"([^\b])([A-Z])", lambda m: "_".join(m.groups()), s).upper()

temporalio/nexus/_operation_context.py

Lines changed: 4 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
import dataclasses
44
import logging
5-
import re
6-
import urllib.parse
75
from contextvars import ContextVar
86
from dataclasses import dataclass
97
from datetime import timedelta
@@ -20,14 +18,13 @@
2018
overload,
2119
)
2220

23-
import nexusrpc.handler
2421
from nexusrpc.handler import CancelOperationContext, StartOperationContext
2522
from typing_extensions import Concatenate
2623

2724
import temporalio.api.common.v1
28-
import temporalio.api.enums.v1
2925
import temporalio.client
3026
import temporalio.common
27+
from temporalio.nexus import _link_conversion
3128
from temporalio.nexus._token import WorkflowHandle
3229
from temporalio.types import (
3330
MethodAsyncNoParam,
@@ -147,16 +144,16 @@ def _get_workflow_event_links(
147144
) -> list[temporalio.api.common.v1.Link.WorkflowEvent]:
148145
event_links = []
149146
for inbound_link in self.nexus_context.inbound_links:
150-
if link := _nexus_link_to_workflow_event(inbound_link):
147+
if link := _link_conversion.nexus_link_to_workflow_event(inbound_link):
151148
event_links.append(link)
152149
return event_links
153150

154151
def _add_outbound_links(
155152
self, workflow_handle: temporalio.client.WorkflowHandle[Any, Any]
156153
):
157154
try:
158-
link = _workflow_event_to_nexus_link(
159-
_workflow_handle_to_workflow_execution_started_event_link(
155+
link = _link_conversion.workflow_event_to_nexus_link(
156+
_link_conversion.workflow_handle_to_workflow_execution_started_event_link(
160157
workflow_handle
161158
)
162159
)
@@ -479,145 +476,6 @@ def set(self) -> None:
479476
_temporal_cancel_operation_context.set(self)
480477

481478

482-
def _workflow_handle_to_workflow_execution_started_event_link(
483-
handle: temporalio.client.WorkflowHandle[Any, Any],
484-
) -> temporalio.api.common.v1.Link.WorkflowEvent:
485-
if handle.first_execution_run_id is None:
486-
raise ValueError(
487-
f"Workflow handle {handle} has no first execution run ID. "
488-
"Cannot create WorkflowExecutionStarted event link."
489-
)
490-
return temporalio.api.common.v1.Link.WorkflowEvent(
491-
namespace=handle._client.namespace,
492-
workflow_id=handle.id,
493-
run_id=handle.first_execution_run_id,
494-
event_ref=temporalio.api.common.v1.Link.WorkflowEvent.EventReference(
495-
event_id=1,
496-
event_type=temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
497-
),
498-
# TODO(nexus-prerelease): RequestIdReference?
499-
)
500-
501-
502-
_LINK_URL_PATH_REGEX = re.compile(
503-
r"^/namespaces/(?P<namespace>[^/]+)/workflows/(?P<workflow_id>[^/]+)/(?P<run_id>[^/]+)/history$"
504-
)
505-
LINK_EVENT_ID_PARAM_NAME = "eventID"
506-
LINK_EVENT_TYPE_PARAM_NAME = "eventType"
507-
508-
509-
def _workflow_event_to_nexus_link(
510-
workflow_event: temporalio.api.common.v1.Link.WorkflowEvent,
511-
) -> nexusrpc.Link:
512-
scheme = "temporal"
513-
namespace = urllib.parse.quote(workflow_event.namespace)
514-
workflow_id = urllib.parse.quote(workflow_event.workflow_id)
515-
run_id = urllib.parse.quote(workflow_event.run_id)
516-
path = f"/namespaces/{namespace}/workflows/{workflow_id}/{run_id}/history"
517-
query_params = _query_params_from_event_reference(workflow_event.event_ref)
518-
return nexusrpc.Link(
519-
url=urllib.parse.urlunparse((scheme, "", path, "", query_params, "")),
520-
type=workflow_event.DESCRIPTOR.full_name,
521-
)
522-
523-
524-
def _nexus_link_to_workflow_event(
525-
link: nexusrpc.Link,
526-
) -> Optional[temporalio.api.common.v1.Link.WorkflowEvent]:
527-
url = urllib.parse.urlparse(link.url)
528-
match = _LINK_URL_PATH_REGEX.match(url.path)
529-
if not match:
530-
logger.warning(
531-
f"Invalid Nexus link: {link}. Expected path to match {_LINK_URL_PATH_REGEX.pattern}"
532-
)
533-
return None
534-
try:
535-
event_ref = _event_reference_from_query_params(url.query)
536-
except ValueError as err:
537-
logger.warning(
538-
f"Failed to parse event reference from Nexus link URL query parameters: {link} ({err})"
539-
)
540-
return None
541-
542-
groups = match.groupdict()
543-
return temporalio.api.common.v1.Link.WorkflowEvent(
544-
namespace=urllib.parse.unquote(groups["namespace"]),
545-
workflow_id=urllib.parse.unquote(groups["workflow_id"]),
546-
run_id=urllib.parse.unquote(groups["run_id"]),
547-
event_ref=event_ref,
548-
)
549-
550-
551-
def _query_params_from_event_reference(
552-
event_ref: temporalio.api.common.v1.Link.WorkflowEvent.EventReference,
553-
) -> str:
554-
event_type_name = temporalio.api.enums.v1.EventType.Name(event_ref.event_type)
555-
if event_type_name.startswith("EVENT_TYPE_"):
556-
event_type_name = _constant_case_to_pascal_case(
557-
event_type_name.removeprefix("EVENT_TYPE_")
558-
)
559-
return urllib.parse.urlencode(
560-
{"eventType": event_type_name, "referenceType": "EventReference"}
561-
)
562-
563-
564-
def _event_reference_from_query_params(
565-
raw_query_params: str,
566-
) -> temporalio.api.common.v1.Link.WorkflowEvent.EventReference:
567-
"""Return an EventReference from the query params or raise ValueError."""
568-
query_params = urllib.parse.parse_qs(raw_query_params)
569-
570-
[reference_type] = query_params.get("referenceType") or [""]
571-
if reference_type != "EventReference":
572-
raise ValueError(
573-
f"Expected Nexus link URL query parameter referenceType to be EventReference but got: {reference_type}"
574-
)
575-
# event type
576-
[raw_event_type_name] = query_params.get(LINK_EVENT_TYPE_PARAM_NAME) or [""]
577-
if not raw_event_type_name:
578-
raise ValueError(f"query params do not contain event type: {query_params}")
579-
if raw_event_type_name.startswith("EVENT_TYPE_"):
580-
event_type_name = raw_event_type_name
581-
elif re.match("[A-Z][a-z]", raw_event_type_name):
582-
event_type_name = "EVENT_TYPE_" + _pascal_case_to_constant_case(
583-
raw_event_type_name
584-
)
585-
else:
586-
raise ValueError(f"Invalid event type name: {raw_event_type_name}")
587-
588-
# event id
589-
event_id = 0
590-
[raw_event_id] = query_params.get(LINK_EVENT_ID_PARAM_NAME) or [""]
591-
if raw_event_id:
592-
try:
593-
event_id = int(raw_event_id)
594-
except ValueError:
595-
raise ValueError(f"Query params contain invalid event id: {raw_event_id}")
596-
597-
return temporalio.api.common.v1.Link.WorkflowEvent.EventReference(
598-
event_type=temporalio.api.enums.v1.EventType.Value(event_type_name),
599-
event_id=event_id,
600-
)
601-
602-
603-
def _constant_case_to_pascal_case(s: str) -> str:
604-
"""Convert a CONSTANT_CASE string to PascalCase.
605-
606-
>>> _constant_case_to_pascal_case("NEXUS_OPERATION_SCHEDULED")
607-
"NexusOperationScheduled"
608-
"""
609-
return re.sub(r"(\b|_)([a-z])", lambda m: m.groups()[1].upper(), s.lower())
610-
611-
612-
def _pascal_case_to_constant_case(s: str) -> str:
613-
"""Convert a PascalCase string to CONSTANT_CASE.
614-
615-
>>> _pascal_case_to_constant_case("NexusOperationScheduled")
616-
"NEXUS_OPERATION_SCHEDULED"
617-
"""
618-
return re.sub(r"([^\b])([A-Z])", lambda m: "_".join(m.groups()), s).upper()
619-
620-
621479
class LoggerAdapter(logging.LoggerAdapter):
622480
"""Logger adapter that adds Nexus operation context information."""
623481

tests/nexus/test_workflow_caller.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import temporalio.api.operatorservice
3232
import temporalio.api.operatorservice.v1
3333
import temporalio.exceptions
34-
import temporalio.nexus._operation_context
34+
import temporalio.nexus._link_conversion
3535
import temporalio.nexus._operation_handlers
3636
from temporalio import nexus, workflow
3737
from temporalio.client import (
@@ -1277,8 +1277,8 @@ async def test_workflow_run_operation_overloads(
12771277

12781278

12791279
def test_link_conversion_utilities():
1280-
p2c = temporalio.nexus._operation_context._pascal_case_to_constant_case
1281-
c2p = temporalio.nexus._operation_context._constant_case_to_pascal_case
1280+
p2c = temporalio.nexus._link_conversion._pascal_case_to_constant_case
1281+
c2p = temporalio.nexus._link_conversion._constant_case_to_pascal_case
12821282

12831283
for p, c in [
12841284
("", ""),

0 commit comments

Comments
 (0)