10
10
Callable ,
11
11
Optional ,
12
12
Type ,
13
+ Union ,
13
14
)
14
15
15
- from nexusrpc .handler import (
16
- StartOperationContext ,
17
- get_start_method_input_and_output_type_annotations ,
18
- )
16
+ from nexusrpc .handler import StartOperationContext
19
17
from nexusrpc .types import (
20
18
InputT ,
21
19
OutputT ,
@@ -41,7 +39,8 @@ def get_workflow_run_start_method_input_and_output_type_annotations(
41
39
`start` must be a type-annotated start method that returns a
42
40
:py:class:`temporalio.nexus.WorkflowHandle`.
43
41
"""
44
- input_type , output_type = get_start_method_input_and_output_type_annotations (start )
42
+
43
+ input_type , output_type = _get_start_method_input_and_output_type_annotations (start )
45
44
origin_type = typing .get_origin (output_type )
46
45
if not origin_type :
47
46
output_type = None
@@ -66,6 +65,51 @@ def get_workflow_run_start_method_input_and_output_type_annotations(
66
65
return input_type , output_type
67
66
68
67
68
+ def _get_start_method_input_and_output_type_annotations (
69
+ start : Callable [
70
+ [ServiceHandlerT , WorkflowRunOperationContext , InputT ],
71
+ Union [OutputT , Awaitable [OutputT ]],
72
+ ],
73
+ ) -> tuple [
74
+ Optional [Type [InputT ]],
75
+ Optional [Type [OutputT ]],
76
+ ]:
77
+ """Return operation input and output types.
78
+
79
+ `start` must be a type-annotated start method that returns a synchronous result.
80
+ """
81
+ try :
82
+ type_annotations = typing .get_type_hints (start )
83
+ except TypeError :
84
+ # TODO(preview): stacklevel
85
+ warnings .warn (
86
+ f"Expected decorated start method { start } to have type annotations"
87
+ )
88
+ return None , None
89
+ output_type = type_annotations .pop ("return" , None )
90
+
91
+ if len (type_annotations ) != 2 :
92
+ # TODO(preview): stacklevel
93
+ suffix = f": { type_annotations } " if type_annotations else ""
94
+ warnings .warn (
95
+ f"Expected decorated start method { start } to have exactly 2 "
96
+ f"type-annotated parameters (ctx and input), but it has { len (type_annotations )} "
97
+ f"{ suffix } ."
98
+ )
99
+ input_type = None
100
+ else :
101
+ ctx_type , input_type = type_annotations .values ()
102
+ if not issubclass (ctx_type , WorkflowRunOperationContext ):
103
+ # TODO(preview): stacklevel
104
+ warnings .warn (
105
+ f"Expected first parameter of { start } to be an instance of "
106
+ f"WorkflowRunOperationContext, but is { ctx_type } ."
107
+ )
108
+ input_type = None
109
+
110
+ return input_type , output_type
111
+
112
+
69
113
def get_callable_name (fn : Callable [..., Any ]) -> str :
70
114
method_name = getattr (fn , "__name__" , None )
71
115
if not method_name and callable (fn ) and hasattr (fn , "__call__" ):
0 commit comments