|
1 | 1 | import asyncio
|
2 | 2 | import uuid
|
3 | 3 | from dataclasses import dataclass
|
| 4 | +from datetime import timedelta |
4 | 5 | from enum import IntEnum
|
5 | 6 | from itertools import zip_longest
|
6 | 7 | from typing import Any, Callable, Literal, Union
|
|
39 | 40 | WorkflowHandle,
|
40 | 41 | )
|
41 | 42 | from temporalio.common import WorkflowIDConflictPolicy
|
42 |
| -from temporalio.exceptions import ApplicationError, CancelledError, NexusOperationError |
| 43 | +from temporalio.exceptions import ( |
| 44 | + ApplicationError, |
| 45 | + CancelledError, |
| 46 | + NexusOperationError, |
| 47 | + TimeoutError, |
| 48 | +) |
43 | 49 | from temporalio.nexus import WorkflowRunOperationContext, workflow_run_operation
|
44 | 50 | from temporalio.service import RPCError, RPCStatusCode
|
45 | 51 | from temporalio.worker import Worker
|
@@ -1505,3 +1511,52 @@ async def test_errors_raised_by_nexus_operation(
|
1505 | 1511 | id=str(uuid.uuid4()),
|
1506 | 1512 | task_queue=task_queue,
|
1507 | 1513 | )
|
| 1514 | + |
| 1515 | + |
| 1516 | +# Timeout test |
| 1517 | +@service_handler |
| 1518 | +class TimeoutTestService: |
| 1519 | + @sync_operation |
| 1520 | + async def op_handler_that_never_returns( |
| 1521 | + self, ctx: StartOperationContext, input: None |
| 1522 | + ) -> None: |
| 1523 | + await asyncio.Future() |
| 1524 | + |
| 1525 | + |
| 1526 | +@workflow.defn |
| 1527 | +class TimeoutTestCallerWorkflow: |
| 1528 | + @workflow.init |
| 1529 | + def __init__(self): |
| 1530 | + self.nexus_client = workflow.NexusClient( |
| 1531 | + service=TimeoutTestService, |
| 1532 | + endpoint=make_nexus_endpoint_name(workflow.info().task_queue), |
| 1533 | + ) |
| 1534 | + |
| 1535 | + @workflow.run |
| 1536 | + async def run(self) -> None: |
| 1537 | + await self.nexus_client.execute_operation( |
| 1538 | + TimeoutTestService.op_handler_that_never_returns, |
| 1539 | + None, |
| 1540 | + schedule_to_close_timeout=timedelta(seconds=0.1), |
| 1541 | + ) |
| 1542 | + |
| 1543 | + |
| 1544 | +async def test_timeout_error_raised_by_nexus_operation(client: Client): |
| 1545 | + task_queue = str(uuid.uuid4()) |
| 1546 | + async with Worker( |
| 1547 | + client, |
| 1548 | + nexus_service_handlers=[TimeoutTestService()], |
| 1549 | + workflows=[TimeoutTestCallerWorkflow], |
| 1550 | + task_queue=task_queue, |
| 1551 | + ): |
| 1552 | + await create_nexus_endpoint(task_queue, client) |
| 1553 | + try: |
| 1554 | + await client.execute_workflow( |
| 1555 | + TimeoutTestCallerWorkflow.run, |
| 1556 | + id=str(uuid.uuid4()), |
| 1557 | + task_queue=task_queue, |
| 1558 | + ) |
| 1559 | + except Exception as err: |
| 1560 | + assert isinstance(err, WorkflowFailureError) |
| 1561 | + assert isinstance(err.__cause__, NexusOperationError) |
| 1562 | + assert isinstance(err.__cause__.__cause__, TimeoutError) |
0 commit comments