Skip to content

Commit aff198b

Browse files
authored
Set result types for string-based activity, child workflow, and query calls (#334)
1 parent 07b2043 commit aff198b

File tree

4 files changed

+117
-4
lines changed

4 files changed

+117
-4
lines changed

temporalio/client.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1277,6 +1277,7 @@ async def query(
12771277
arg: Any = temporalio.common._arg_unset,
12781278
*,
12791279
args: Sequence[Any] = [],
1280+
result_type: Optional[Type] = None,
12801281
reject_condition: Optional[temporalio.common.QueryRejectCondition] = None,
12811282
rpc_metadata: Mapping[str, str] = {},
12821283
rpc_timeout: Optional[timedelta] = None,
@@ -1289,6 +1290,7 @@ async def query(
12891290
arg: Any = temporalio.common._arg_unset,
12901291
*,
12911292
args: Sequence[Any] = [],
1293+
result_type: Optional[Type] = None,
12921294
reject_condition: Optional[temporalio.common.QueryRejectCondition] = None,
12931295
rpc_metadata: Mapping[str, str] = {},
12941296
rpc_timeout: Optional[timedelta] = None,
@@ -1308,6 +1310,8 @@ async def query(
13081310
query: Query function or name on the workflow.
13091311
arg: Single argument to the query.
13101312
args: Multiple arguments to the query. Cannot be set if arg is.
1313+
result_type: For string queries, this can set the specific result
1314+
type hint to deserialize into.
13111315
reject_condition: Condition for rejecting the query. If unset/None,
13121316
defaults to the client's default (which is defaulted to None).
13131317
rpc_metadata: Headers used on the RPC call. Keys here override
@@ -1322,7 +1326,7 @@ async def query(
13221326
RPCError: Workflow details could not be fetched.
13231327
"""
13241328
query_name: str
1325-
ret_type: Optional[Type] = None
1329+
ret_type = result_type
13261330
if callable(query):
13271331
defn = temporalio.workflow._QueryDefinition.from_fn(query)
13281332
if not defn:

temporalio/worker/_workflow_instance.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,7 @@ def workflow_start_activity(
812812
activity: Any,
813813
*args: Any,
814814
task_queue: Optional[str],
815+
result_type: Optional[Type],
815816
schedule_to_close_timeout: Optional[timedelta],
816817
schedule_to_start_timeout: Optional[timedelta],
817818
start_to_close_timeout: Optional[timedelta],
@@ -823,7 +824,7 @@ def workflow_start_activity(
823824
# Get activity definition if it's callable
824825
name: str
825826
arg_types: Optional[List[Type]] = None
826-
ret_type: Optional[Type] = None
827+
ret_type = result_type
827828
if isinstance(activity, str):
828829
name = activity
829830
elif callable(activity):
@@ -859,6 +860,7 @@ async def workflow_start_child_workflow(
859860
*args: Any,
860861
id: str,
861862
task_queue: Optional[str],
863+
result_type: Optional[Type],
862864
cancellation_type: temporalio.workflow.ChildWorkflowCancellationType,
863865
parent_close_policy: temporalio.workflow.ParentClosePolicy,
864866
execution_timeout: Optional[timedelta],
@@ -873,7 +875,7 @@ async def workflow_start_child_workflow(
873875
# Use definition if callable
874876
name: str
875877
arg_types: Optional[List[Type]] = None
876-
ret_type: Optional[Type] = None
878+
ret_type = result_type
877879
if isinstance(workflow, str):
878880
name = workflow
879881
elif callable(workflow):
@@ -910,6 +912,7 @@ def workflow_start_local_activity(
910912
self,
911913
activity: Any,
912914
*args: Any,
915+
result_type: Optional[Type],
913916
schedule_to_close_timeout: Optional[timedelta],
914917
schedule_to_start_timeout: Optional[timedelta],
915918
start_to_close_timeout: Optional[timedelta],
@@ -921,7 +924,7 @@ def workflow_start_local_activity(
921924
# Get activity definition if it's callable
922925
name: str
923926
arg_types: Optional[List[Type]] = None
924-
ret_type: Optional[Type] = None
927+
ret_type = result_type
925928
if isinstance(activity, str):
926929
name = activity
927930
elif callable(activity):

temporalio/workflow.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,7 @@ def workflow_start_activity(
446446
activity: Any,
447447
*args: Any,
448448
task_queue: Optional[str],
449+
result_type: Optional[Type],
449450
schedule_to_close_timeout: Optional[timedelta],
450451
schedule_to_start_timeout: Optional[timedelta],
451452
start_to_close_timeout: Optional[timedelta],
@@ -463,6 +464,7 @@ async def workflow_start_child_workflow(
463464
*args: Any,
464465
id: str,
465466
task_queue: Optional[str],
467+
result_type: Optional[Type],
466468
cancellation_type: ChildWorkflowCancellationType,
467469
parent_close_policy: ParentClosePolicy,
468470
execution_timeout: Optional[timedelta],
@@ -481,6 +483,7 @@ def workflow_start_local_activity(
481483
self,
482484
activity: Any,
483485
*args: Any,
486+
result_type: Optional[Type],
484487
schedule_to_close_timeout: Optional[timedelta],
485488
schedule_to_start_timeout: Optional[timedelta],
486489
start_to_close_timeout: Optional[timedelta],
@@ -1265,6 +1268,7 @@ def start_activity(
12651268
*,
12661269
args: Sequence[Any] = [],
12671270
task_queue: Optional[str] = None,
1271+
result_type: Optional[Type] = None,
12681272
schedule_to_close_timeout: Optional[timedelta] = None,
12691273
schedule_to_start_timeout: Optional[timedelta] = None,
12701274
start_to_close_timeout: Optional[timedelta] = None,
@@ -1282,6 +1286,7 @@ def start_activity(
12821286
*,
12831287
args: Sequence[Any] = [],
12841288
task_queue: Optional[str] = None,
1289+
result_type: Optional[Type] = None,
12851290
schedule_to_close_timeout: Optional[timedelta] = None,
12861291
schedule_to_start_timeout: Optional[timedelta] = None,
12871292
start_to_close_timeout: Optional[timedelta] = None,
@@ -1301,6 +1306,8 @@ def start_activity(
13011306
args: Multiple arguments to the activity. Cannot be set if arg is.
13021307
task_queue: Task queue to run the activity on. Defaults to the current
13031308
workflow's task queue.
1309+
result_type: For string activities, this can set the specific result
1310+
type hint to deserialize into.
13041311
schedule_to_close_timeout: Max amount of time the activity can take from
13051312
first being scheduled to being completed before it times out. This
13061313
is inclusive of all retries.
@@ -1326,6 +1333,7 @@ def start_activity(
13261333
activity,
13271334
*temporalio.common._arg_or_args(arg, args),
13281335
task_queue=task_queue,
1336+
result_type=result_type,
13291337
schedule_to_close_timeout=schedule_to_close_timeout,
13301338
schedule_to_start_timeout=schedule_to_start_timeout,
13311339
start_to_close_timeout=start_to_close_timeout,
@@ -1450,6 +1458,7 @@ async def execute_activity(
14501458
*,
14511459
args: Sequence[Any] = [],
14521460
task_queue: Optional[str] = None,
1461+
result_type: Optional[Type] = None,
14531462
schedule_to_close_timeout: Optional[timedelta] = None,
14541463
schedule_to_start_timeout: Optional[timedelta] = None,
14551464
start_to_close_timeout: Optional[timedelta] = None,
@@ -1467,6 +1476,7 @@ async def execute_activity(
14671476
*,
14681477
args: Sequence[Any] = [],
14691478
task_queue: Optional[str] = None,
1479+
result_type: Optional[Type] = None,
14701480
schedule_to_close_timeout: Optional[timedelta] = None,
14711481
schedule_to_start_timeout: Optional[timedelta] = None,
14721482
start_to_close_timeout: Optional[timedelta] = None,
@@ -1485,6 +1495,7 @@ async def execute_activity(
14851495
activity,
14861496
*temporalio.common._arg_or_args(arg, args),
14871497
task_queue=task_queue,
1498+
result_type=result_type,
14881499
schedule_to_close_timeout=schedule_to_close_timeout,
14891500
schedule_to_start_timeout=schedule_to_start_timeout,
14901501
start_to_close_timeout=start_to_close_timeout,
@@ -1623,6 +1634,7 @@ def start_activity_class(
16231634
activity,
16241635
*temporalio.common._arg_or_args(arg, args),
16251636
task_queue=task_queue,
1637+
result_type=None,
16261638
schedule_to_close_timeout=schedule_to_close_timeout,
16271639
schedule_to_start_timeout=schedule_to_start_timeout,
16281640
start_to_close_timeout=start_to_close_timeout,
@@ -1761,6 +1773,7 @@ async def execute_activity_class(
17611773
activity,
17621774
*temporalio.common._arg_or_args(arg, args),
17631775
task_queue=task_queue,
1776+
result_type=None,
17641777
schedule_to_close_timeout=schedule_to_close_timeout,
17651778
schedule_to_start_timeout=schedule_to_start_timeout,
17661779
start_to_close_timeout=start_to_close_timeout,
@@ -1899,6 +1912,7 @@ def start_activity_method(
18991912
activity,
19001913
*temporalio.common._arg_or_args(arg, args),
19011914
task_queue=task_queue,
1915+
result_type=None,
19021916
schedule_to_close_timeout=schedule_to_close_timeout,
19031917
schedule_to_start_timeout=schedule_to_start_timeout,
19041918
start_to_close_timeout=start_to_close_timeout,
@@ -2039,6 +2053,7 @@ async def execute_activity_method(
20392053
activity,
20402054
*temporalio.common._arg_or_args(arg, args),
20412055
task_queue=task_queue,
2056+
result_type=None,
20422057
schedule_to_close_timeout=schedule_to_close_timeout,
20432058
schedule_to_start_timeout=schedule_to_start_timeout,
20442059
start_to_close_timeout=start_to_close_timeout,
@@ -2170,6 +2185,7 @@ def start_local_activity(
21702185
arg: Any = temporalio.common._arg_unset,
21712186
*,
21722187
args: Sequence[Any] = [],
2188+
result_type: Optional[Type] = None,
21732189
schedule_to_close_timeout: Optional[timedelta] = None,
21742190
schedule_to_start_timeout: Optional[timedelta] = None,
21752191
start_to_close_timeout: Optional[timedelta] = None,
@@ -2186,6 +2202,7 @@ def start_local_activity(
21862202
arg: Any = temporalio.common._arg_unset,
21872203
*,
21882204
args: Sequence[Any] = [],
2205+
result_type: Optional[Type] = None,
21892206
schedule_to_close_timeout: Optional[timedelta] = None,
21902207
schedule_to_start_timeout: Optional[timedelta] = None,
21912208
start_to_close_timeout: Optional[timedelta] = None,
@@ -2206,6 +2223,8 @@ def start_local_activity(
22062223
activity: Activity name or function reference.
22072224
arg: Single argument to the activity.
22082225
args: Multiple arguments to the activity. Cannot be set if arg is.
2226+
result_type: For string activities, this can set the specific result
2227+
type hint to deserialize into.
22092228
activity_id: Optional unique identifier for the activity.
22102229
schedule_to_close_timeout: Max amount of time the activity can take from
22112230
first being scheduled to being completed before it times out. This
@@ -2229,6 +2248,7 @@ def start_local_activity(
22292248
return _Runtime.current().workflow_start_local_activity(
22302249
activity,
22312250
*temporalio.common._arg_or_args(arg, args),
2251+
result_type=result_type,
22322252
schedule_to_close_timeout=schedule_to_close_timeout,
22332253
schedule_to_start_timeout=schedule_to_start_timeout,
22342254
start_to_close_timeout=start_to_close_timeout,
@@ -2346,6 +2366,7 @@ async def execute_local_activity(
23462366
arg: Any = temporalio.common._arg_unset,
23472367
*,
23482368
args: Sequence[Any] = [],
2369+
result_type: Optional[Type] = None,
23492370
schedule_to_close_timeout: Optional[timedelta] = None,
23502371
schedule_to_start_timeout: Optional[timedelta] = None,
23512372
start_to_close_timeout: Optional[timedelta] = None,
@@ -2362,6 +2383,7 @@ async def execute_local_activity(
23622383
arg: Any = temporalio.common._arg_unset,
23632384
*,
23642385
args: Sequence[Any] = [],
2386+
result_type: Optional[Type] = None,
23652387
schedule_to_close_timeout: Optional[timedelta] = None,
23662388
schedule_to_start_timeout: Optional[timedelta] = None,
23672389
start_to_close_timeout: Optional[timedelta] = None,
@@ -2382,6 +2404,7 @@ async def execute_local_activity(
23822404
return await _Runtime.current().workflow_start_local_activity(
23832405
activity,
23842406
*temporalio.common._arg_or_args(arg, args),
2407+
result_type=result_type,
23852408
schedule_to_close_timeout=schedule_to_close_timeout,
23862409
schedule_to_start_timeout=schedule_to_start_timeout,
23872410
start_to_close_timeout=start_to_close_timeout,
@@ -2515,6 +2538,7 @@ def start_local_activity_class(
25152538
return _Runtime.current().workflow_start_local_activity(
25162539
activity,
25172540
*temporalio.common._arg_or_args(arg, args),
2541+
result_type=None,
25182542
schedule_to_close_timeout=schedule_to_close_timeout,
25192543
schedule_to_start_timeout=schedule_to_start_timeout,
25202544
start_to_close_timeout=start_to_close_timeout,
@@ -2650,6 +2674,7 @@ async def execute_local_activity_class(
26502674
return await _Runtime.current().workflow_start_local_activity(
26512675
activity,
26522676
*temporalio.common._arg_or_args(arg, args),
2677+
result_type=None,
26532678
schedule_to_close_timeout=schedule_to_close_timeout,
26542679
schedule_to_start_timeout=schedule_to_start_timeout,
26552680
start_to_close_timeout=start_to_close_timeout,
@@ -2783,6 +2808,7 @@ def start_local_activity_method(
27832808
return _Runtime.current().workflow_start_local_activity(
27842809
activity,
27852810
*temporalio.common._arg_or_args(arg, args),
2811+
result_type=None,
27862812
schedule_to_close_timeout=schedule_to_close_timeout,
27872813
schedule_to_start_timeout=schedule_to_start_timeout,
27882814
start_to_close_timeout=start_to_close_timeout,
@@ -2918,6 +2944,7 @@ async def execute_local_activity_method(
29182944
return await _Runtime.current().workflow_start_local_activity(
29192945
activity,
29202946
*temporalio.common._arg_or_args(arg, args),
2947+
result_type=None,
29212948
schedule_to_close_timeout=schedule_to_close_timeout,
29222949
schedule_to_start_timeout=schedule_to_start_timeout,
29232950
start_to_close_timeout=start_to_close_timeout,
@@ -3127,6 +3154,7 @@ async def start_child_workflow(
31273154
args: Sequence[Any] = [],
31283155
id: Optional[str] = None,
31293156
task_queue: Optional[str] = None,
3157+
result_type: Optional[Type] = None,
31303158
cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
31313159
parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE,
31323160
execution_timeout: Optional[timedelta] = None,
@@ -3148,6 +3176,7 @@ async def start_child_workflow(
31483176
args: Sequence[Any] = [],
31493177
id: Optional[str] = None,
31503178
task_queue: Optional[str] = None,
3179+
result_type: Optional[Type] = None,
31513180
cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
31523181
parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE,
31533182
execution_timeout: Optional[timedelta] = None,
@@ -3170,6 +3199,8 @@ async def start_child_workflow(
31703199
defaults to :py:func:`uuid4`.
31713200
task_queue: Task queue to run the workflow on. Defaults to the current
31723201
workflow's task queue.
3202+
result_type: For string workflows, this can set the specific result type
3203+
hint to deserialize into.
31733204
cancellation_type: How the child workflow will react to cancellation.
31743205
parent_close_policy: How to handle the child workflow when the parent
31753206
workflow closes.
@@ -3191,6 +3222,7 @@ async def start_child_workflow(
31913222
*temporalio.common._arg_or_args(arg, args),
31923223
id=id or str(uuid4()),
31933224
task_queue=task_queue,
3225+
result_type=result_type,
31943226
cancellation_type=cancellation_type,
31953227
parent_close_policy=parent_close_policy,
31963228
execution_timeout=execution_timeout,
@@ -3278,6 +3310,7 @@ async def execute_child_workflow(
32783310
args: Sequence[Any] = [],
32793311
id: Optional[str] = None,
32803312
task_queue: Optional[str] = None,
3313+
result_type: Optional[Type] = None,
32813314
cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
32823315
parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE,
32833316
execution_timeout: Optional[timedelta] = None,
@@ -3299,6 +3332,7 @@ async def execute_child_workflow(
32993332
args: Sequence[Any] = [],
33003333
id: Optional[str] = None,
33013334
task_queue: Optional[str] = None,
3335+
result_type: Optional[Type] = None,
33023336
cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
33033337
parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE,
33043338
execution_timeout: Optional[timedelta] = None,
@@ -3321,6 +3355,7 @@ async def execute_child_workflow(
33213355
*temporalio.common._arg_or_args(arg, args),
33223356
id=id or str(uuid4()),
33233357
task_queue=task_queue,
3358+
result_type=result_type,
33243359
cancellation_type=cancellation_type,
33253360
parent_close_policy=parent_close_policy,
33263361
execution_timeout=execution_timeout,

0 commit comments

Comments
 (0)