Skip to content

Commit 993bfd3

Browse files
committed
Workflow OperationError / HandlerError test
1 parent bdc40d9 commit 993bfd3

File tree

1 file changed

+93
-1
lines changed

1 file changed

+93
-1
lines changed

tests/nexus/test_workflow_caller.py

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import uuid
33
from dataclasses import dataclass
44
from enum import IntEnum
5-
from typing import Any, Callable, Union
5+
from typing import Any, Callable, Literal, Union
66

77
import nexusrpc
88
import nexusrpc.handler
@@ -28,6 +28,7 @@
2828
import temporalio.api.nexus.v1
2929
import temporalio.api.operatorservice
3030
import temporalio.api.operatorservice.v1
31+
import temporalio.exceptions
3132
from temporalio import nexus, workflow
3233
from temporalio.client import (
3334
Client,
@@ -1082,3 +1083,94 @@ async def assert_handler_workflow_has_link_to_caller_workflow(
10821083
# f"{self._result_fut} "
10831084
# f"Task[{self._task._state}] fut_waiter = {self._task._fut_waiter}) ({self._task._must_cancel})"
10841085
# )
1086+
1087+
1088+
# Handler
1089+
1090+
ActionInSyncOp = Literal["raise_handler_error", "raise_operation_error"]
1091+
1092+
1093+
@dataclass
1094+
class ErrorTestInput:
1095+
task_queue: str
1096+
action_in_sync_op: ActionInSyncOp
1097+
1098+
1099+
@nexusrpc.handler.service_handler
1100+
class ErrorTestService:
1101+
@sync_operation
1102+
async def op(self, ctx: StartOperationContext, input: ErrorTestInput) -> None:
1103+
if input.action_in_sync_op == "raise_handler_error":
1104+
raise nexusrpc.handler.HandlerError(
1105+
"test",
1106+
type=nexusrpc.handler.HandlerErrorType.INTERNAL,
1107+
)
1108+
elif input.action_in_sync_op == "raise_operation_error":
1109+
raise nexusrpc.OperationError(
1110+
"test", state=nexusrpc.OperationErrorState.FAILED
1111+
)
1112+
else:
1113+
raise NotImplementedError(
1114+
f"Unhandled action_in_sync_op: {input.action_in_sync_op}"
1115+
)
1116+
1117+
1118+
# Caller
1119+
1120+
1121+
@workflow.defn(sandboxed=False)
1122+
class ErrorTestCallerWorkflow:
1123+
@workflow.init
1124+
def __init__(self, input: ErrorTestInput):
1125+
self.nexus_client = workflow.NexusClient(
1126+
service=ErrorTestService,
1127+
endpoint=make_nexus_endpoint_name(input.task_queue),
1128+
)
1129+
1130+
@workflow.run
1131+
async def run(self, input: ErrorTestInput) -> list[str]:
1132+
try:
1133+
await self.nexus_client.execute_operation(
1134+
# TODO(nexus-preview): why wasn't this a type error?
1135+
# ErrorTestService.op, ErrorTestCallerWfInput()
1136+
ErrorTestService.op,
1137+
# TODO(nexus-preview): why wasn't this a type error?
1138+
# None
1139+
input,
1140+
)
1141+
except Exception as err:
1142+
return [str(type(err).__name__), str(type(err.__cause__).__name__)]
1143+
assert False, "Unreachable"
1144+
1145+
1146+
@pytest.mark.parametrize(
1147+
"action_in_sync_op", ["raise_handler_error", "raise_operation_error"]
1148+
)
1149+
async def test_errors_raised_by_nexus_operation(
1150+
client: Client, action_in_sync_op: ActionInSyncOp
1151+
):
1152+
task_queue = str(uuid.uuid4())
1153+
async with Worker(
1154+
client,
1155+
nexus_service_handlers=[ErrorTestService()],
1156+
workflows=[ErrorTestCallerWorkflow],
1157+
task_queue=task_queue,
1158+
):
1159+
await create_nexus_endpoint(task_queue, client)
1160+
result = await client.execute_workflow(
1161+
ErrorTestCallerWorkflow.run,
1162+
ErrorTestInput(
1163+
task_queue=task_queue,
1164+
action_in_sync_op=action_in_sync_op,
1165+
),
1166+
id=str(uuid.uuid4()),
1167+
task_queue=task_queue,
1168+
)
1169+
if action_in_sync_op == "raise_handler_error":
1170+
assert result == ["NexusOperationError", "NexusHandlerError"]
1171+
elif action_in_sync_op == "raise_operation_error":
1172+
assert result == ["NexusOperationError", "ApplicationError"]
1173+
else:
1174+
raise NotImplementedError(
1175+
f"Unhandled action_in_sync_op: {action_in_sync_op}"
1176+
)

0 commit comments

Comments
 (0)