Skip to content

Commit fd938c4

Browse files
authored
Update last cleanup items (#410)
* Remove interceptable poll update * Remove timeout from wf handle result
1 parent 2c15ed3 commit fd938c4

File tree

2 files changed

+35
-78
lines changed

2 files changed

+35
-78
lines changed

temporalio/client.py

Lines changed: 35 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -3979,7 +3979,6 @@ def workflow_run_id(self) -> Optional[str]:
39793979
async def result(
39803980
self,
39813981
*,
3982-
timeout: Optional[timedelta] = None,
39833982
rpc_metadata: Mapping[str, str] = {},
39843983
rpc_timeout: Optional[timedelta] = None,
39853984
) -> LocalReturnType:
@@ -3988,7 +3987,6 @@ async def result(
39883987
specified.
39893988
39903989
Args:
3991-
timeout: Optional timeout specifying maximum wait time for the result.
39923990
rpc_metadata: Headers used on the RPC call. Keys here override client-level RPC metadata keys.
39933991
rpc_timeout: Optional RPC deadline to set for the RPC call. If this elapses, the poll is retried until the
39943992
overall timeout has been reached.
@@ -4007,18 +4005,43 @@ async def result(
40074005
self._result_type,
40084006
)
40094007

4010-
return await self._client._impl.poll_workflow_update(
4011-
PollWorkflowUpdateInput(
4012-
self.workflow_id,
4013-
self.workflow_run_id,
4014-
self.id,
4015-
timeout,
4016-
self._result_type,
4017-
rpc_metadata,
4018-
rpc_timeout,
4019-
)
4008+
req = temporalio.api.workflowservice.v1.PollWorkflowExecutionUpdateRequest(
4009+
namespace=self._client.namespace,
4010+
update_ref=temporalio.api.update.v1.UpdateRef(
4011+
workflow_execution=temporalio.api.common.v1.WorkflowExecution(
4012+
workflow_id=self.workflow_id,
4013+
run_id=self.workflow_run_id or "",
4014+
),
4015+
update_id=self.id,
4016+
),
4017+
identity=self._client.identity,
4018+
wait_policy=temporalio.api.update.v1.WaitPolicy(
4019+
lifecycle_stage=temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED
4020+
),
40204021
)
40214022

4023+
# Continue polling as long as we have either an empty response, or an *rpc* timeout
4024+
while True:
4025+
try:
4026+
res = (
4027+
await self._client.workflow_service.poll_workflow_execution_update(
4028+
req,
4029+
retry=True,
4030+
metadata=rpc_metadata,
4031+
timeout=rpc_timeout,
4032+
)
4033+
)
4034+
if res.HasField("outcome"):
4035+
return await _update_outcome_to_result(
4036+
res.outcome,
4037+
self.id,
4038+
self._client.data_converter,
4039+
self._result_type,
4040+
)
4041+
except RPCError as err:
4042+
if err.status != RPCStatusCode.DEADLINE_EXCEEDED:
4043+
raise
4044+
40224045

40234046
class WorkflowFailureError(temporalio.exceptions.TemporalError):
40244047
"""Error that occurs when a workflow is unsuccessful."""
@@ -4241,19 +4264,6 @@ class StartWorkflowUpdateInput:
42414264
rpc_timeout: Optional[timedelta]
42424265

42434266

4244-
@dataclass
4245-
class PollWorkflowUpdateInput:
4246-
"""Input for :py:meth:`OutboundInterceptor.poll_workflow_update`."""
4247-
4248-
workflow_id: str
4249-
run_id: Optional[str]
4250-
update_id: str
4251-
timeout: Optional[timedelta]
4252-
ret_type: Optional[Type]
4253-
rpc_metadata: Mapping[str, str]
4254-
rpc_timeout: Optional[timedelta]
4255-
4256-
42574267
@dataclass
42584268
class HeartbeatAsyncActivityInput:
42594269
"""Input for :py:meth:`OutboundInterceptor.heartbeat_async_activity`."""
@@ -4504,10 +4514,6 @@ async def start_workflow_update(
45044514
"""Called for every :py:meth:`WorkflowHandle.update` and :py:meth:`WorkflowHandle.start_update` call."""
45054515
return await self.next.start_workflow_update(input)
45064516

4507-
async def poll_workflow_update(self, input: PollWorkflowUpdateInput) -> Any:
4508-
"""May be called when calling :py:meth:`WorkflowUpdateHandle.result`."""
4509-
return await self.next.poll_workflow_update(input)
4510-
45114517
### Async activity calls
45124518

45134519
async def heartbeat_async_activity(
@@ -4885,48 +4891,6 @@ async def start_workflow_update(
48854891

48864892
return update_handle
48874893

4888-
async def poll_workflow_update(self, input: PollWorkflowUpdateInput) -> Any:
4889-
req = temporalio.api.workflowservice.v1.PollWorkflowExecutionUpdateRequest(
4890-
namespace=self._client.namespace,
4891-
update_ref=temporalio.api.update.v1.UpdateRef(
4892-
workflow_execution=temporalio.api.common.v1.WorkflowExecution(
4893-
workflow_id=input.workflow_id,
4894-
run_id=input.run_id or "",
4895-
),
4896-
update_id=input.update_id,
4897-
),
4898-
identity=self._client.identity,
4899-
wait_policy=temporalio.api.update.v1.WaitPolicy(
4900-
lifecycle_stage=temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED
4901-
),
4902-
)
4903-
4904-
async def poll_loop():
4905-
# Continue polling as long as we have either an empty response, or an *rpc* timeout
4906-
while True:
4907-
try:
4908-
res = await self._client.workflow_service.poll_workflow_execution_update(
4909-
req,
4910-
retry=True,
4911-
metadata=input.rpc_metadata,
4912-
timeout=input.rpc_timeout,
4913-
)
4914-
if res.HasField("outcome"):
4915-
return await _update_outcome_to_result(
4916-
res.outcome,
4917-
input.update_id,
4918-
self._client.data_converter,
4919-
input.ret_type,
4920-
)
4921-
except RPCError as err:
4922-
if err.status != RPCStatusCode.DEADLINE_EXCEEDED:
4923-
raise
4924-
4925-
# Wait for at most the *overall* timeout
4926-
return await asyncio.wait_for(
4927-
poll_loop(), input.timeout.total_seconds() if input.timeout else None
4928-
)
4929-
49304894
### Async activity calls
49314895

49324896
async def heartbeat_async_activity(

tests/test_client.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
Client,
3737
Interceptor,
3838
OutboundInterceptor,
39-
PollWorkflowUpdateInput,
4039
QueryWorkflowInput,
4140
RPCError,
4241
RPCStatusCode,
@@ -456,12 +455,6 @@ async def start_workflow_update(
456455
self._parent.traces.append(("start_workflow_update", input))
457456
return await super().start_workflow_update(input)
458457

459-
async def poll_workflow_update(
460-
self, input: PollWorkflowUpdateInput
461-
) -> WorkflowUpdateHandle[Any]:
462-
self._parent.traces.append(("poll_workflow_update", input))
463-
return await super().poll_workflow_update(input)
464-
465458

466459
async def test_interceptor(client: Client, worker: ExternalWorker):
467460
# Create new client from existing client but with a tracing interceptor

0 commit comments

Comments
 (0)