Skip to content

Commit b9bc00e

Browse files
committed
Review feedback
1 parent 5503673 commit b9bc00e

File tree

8 files changed

+98
-138
lines changed

8 files changed

+98
-138
lines changed

temporalio/common.py

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,59 +1017,39 @@ def __post_init__(self):
10171017
Priority.default = Priority(priority_key=None)
10181018

10191019

1020-
class VersioningBehavior(Enum):
1020+
class VersioningBehavior(IntEnum):
10211021
"""Specifies when a workflow might move from a worker of one Build Id to another.
10221022
10231023
WARNING: Experimental API.
10241024
"""
10251025

1026-
UNSPECIFIED = 1
1026+
UNSPECIFIED = (
1027+
temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_UNSPECIFIED
1028+
)
10271029
""" An unspecified versioning behavior. By default, workers opting into worker versioning will
10281030
be required to specify a behavior. See :py:class:`temporalio.worker.WorkerDeploymentOptions`."""
1029-
PINNED = 2
1031+
PINNED = temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED
10301032
"""The workflow will be pinned to the current Build ID unless manually moved."""
1031-
AUTO_UPGRADE = 3
1033+
AUTO_UPGRADE = (
1034+
temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_AUTO_UPGRADE
1035+
)
10321036
"""The workflow will automatically move to the latest version (default Build ID of the task
10331037
queue) when the next task is dispatched."""
10341038

1035-
def _to_proto(self) -> temporalio.api.enums.v1.VersioningBehavior.ValueType:
1036-
if self == VersioningBehavior.UNSPECIFIED:
1037-
return temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_UNSPECIFIED
1038-
elif self == VersioningBehavior.PINNED:
1039-
return temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED
1040-
elif self == VersioningBehavior.AUTO_UPGRADE:
1041-
return temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_AUTO_UPGRADE
1042-
else:
1043-
raise ValueError(f"Unknown VersioningBehavior: {self}")
1044-
10451039

1040+
@dataclass(frozen=True)
10461041
class WorkerDeploymentVersion:
10471042
"""Represents the version of a specific worker deployment.
10481043
10491044
WARNING: Experimental API.
10501045
"""
10511046

1052-
_deployment_name: str
1053-
_build_id: str
1054-
1055-
def __init__(self, deployment_name: str, build_id: str):
1056-
"""Build a WorkerDeploymentVersion from a deployment name and build ID."""
1057-
self._deployment_name = deployment_name
1058-
self._build_id = build_id
1059-
1060-
@property
1061-
def deployment_name(self) -> str:
1062-
"""The name of the deployment."""
1063-
return self._deployment_name
1064-
1065-
@property
1066-
def build_id(self) -> str:
1067-
"""The Build ID of this version."""
1068-
return self._build_id
1047+
deployment_name: str
1048+
build_id: str
10691049

10701050
def to_canonical_string(self) -> str:
10711051
"""Returns the canonical string representation of the version."""
1072-
return f"{self._deployment_name}.{self._build_id}"
1052+
return f"{self.deployment_name}.{self.build_id}"
10731053

10741054
@staticmethod
10751055
def from_canonical_string(canonical: str) -> WorkerDeploymentVersion:

temporalio/worker/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
from ._worker import (
4646
Worker,
4747
WorkerConfig,
48-
WorkerDeploymentOptions,
48+
WorkerDeploymentConfig,
4949
WorkerDeploymentVersion,
5050
)
5151
from ._workflow_instance import (
@@ -59,7 +59,7 @@
5959
# Primary types
6060
"Worker",
6161
"WorkerConfig",
62-
"WorkerDeploymentOptions",
62+
"WorkerDeploymentConfig",
6363
"WorkerDeploymentVersion",
6464
"Replayer",
6565
"ReplayerConfig",

temporalio/worker/_worker.py

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def __init__(
9393
on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]] = None,
9494
use_worker_versioning: bool = False,
9595
disable_safe_workflow_eviction: bool = False,
96-
deployment_options: Optional[WorkerDeploymentOptions] = None,
96+
deployment_config: Optional[WorkerDeploymentConfig] = None,
9797
) -> None:
9898
"""Create a worker to process workflows and/or activities.
9999
@@ -136,8 +136,8 @@ def __init__(
136136
build_id: Unique identifier for the current runtime. This is best
137137
set as a hash of all code and should change only when code does.
138138
If unset, a best-effort identifier is generated.
139-
Exclusive with `deployment_options`.
140-
WARNING: Deprecated. Use `deployment_options` instead.
139+
Exclusive with `deployment_config`.
140+
WARNING: Deprecated. Use `deployment_config` instead.
141141
identity: Identity for this worker client. If unset, the client
142142
identity is used.
143143
max_cached_workflows: If nonzero, workflows will be cached and
@@ -219,27 +219,28 @@ def __init__(
219219
workflows which it claims to be compatible with. For more
220220
information, see
221221
https://docs.temporal.io/workers#worker-versioning.
222-
Exclusive with `deployment_options`.
223-
WARNING: Deprecated. Use `deployment_options` instead.
222+
Exclusive with `deployment_config`.
223+
WARNING: Deprecated. Use `deployment_config` instead.
224224
disable_safe_workflow_eviction: If true, instead of letting the
225225
workflow collect its tasks properly, the worker will simply let
226226
the Python garbage collector collect the tasks. WARNING: Users
227227
should not set this value to true. The garbage collector will
228228
throw ``GeneratorExit`` in coroutines causing them to wake up
229229
in different threads and run ``finally`` and other code in the
230230
wrong workflow environment.
231-
deployment_options: Deployment options for the worker. Exclusive with `build_id` and
231+
deployment_config: Deployment config for the worker. Exclusive with `build_id` and
232232
`use_worker_versioning`.
233+
WARNING: This is an experimental feature and may change in the future.
233234
"""
234235
if not activities and not workflows:
235236
raise ValueError("At least one activity or workflow must be specified")
236237
if use_worker_versioning and not build_id:
237238
raise ValueError(
238239
"build_id must be specified when use_worker_versioning is True"
239240
)
240-
if deployment_options and (build_id or use_worker_versioning):
241+
if deployment_config and (build_id or use_worker_versioning):
241242
raise ValueError(
242-
"deployment_options cannot be used with build_id or use_worker_versioning"
243+
"deployment_config cannot be used with build_id or use_worker_versioning"
243244
)
244245

245246
# Prepend applicable client interceptors to the given ones
@@ -330,9 +331,9 @@ def __init__(
330331
self._workflow_worker: Optional[_WorkflowWorker] = None
331332
if workflows:
332333
should_enforce_versioning_behavior = (
333-
deployment_options is not None
334-
and deployment_options.use_worker_versioning
335-
and deployment_options.default_versioning_behavior
334+
deployment_config is not None
335+
and deployment_config.use_worker_versioning
336+
and deployment_config.default_versioning_behavior
336337
== temporalio.common.VersioningBehavior.UNSPECIFIED
337338
)
338339
self._workflow_worker = _WorkflowWorker(
@@ -375,9 +376,9 @@ def __init__(
375376
bridge_tuner = tuner._to_bridge_tuner()
376377

377378
versioning_strategy: temporalio.bridge.worker.WorkerVersioningStrategy
378-
if deployment_options:
379+
if deployment_config:
379380
versioning_strategy = (
380-
deployment_options._to_bridge_worker_deployment_options()
381+
deployment_config._to_bridge_worker_deployment_options()
381382
)
382383
elif use_worker_versioning:
383384
build_id = build_id or load_default_build_id()
@@ -712,12 +713,15 @@ class WorkerConfig(TypedDict, total=False):
712713
on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]]
713714
use_worker_versioning: bool
714715
disable_safe_workflow_eviction: bool
715-
deployment_options: Optional[WorkerDeploymentOptions]
716+
deployment_config: Optional[WorkerDeploymentConfig]
716717

717718

718719
@dataclass
719-
class WorkerDeploymentOptions:
720-
"""Options for configuring the Worker Versioning feature."""
720+
class WorkerDeploymentConfig:
721+
"""Options for configuring the Worker Versioning feature.
722+
723+
WARNING: This is an experimental feature and may change in the future.
724+
"""
721725

722726
version: WorkerDeploymentVersion
723727
use_worker_versioning: bool
@@ -732,7 +736,7 @@ def _to_bridge_worker_deployment_options(
732736
build_id=self.version.build_id,
733737
),
734738
use_worker_versioning=self.use_worker_versioning,
735-
default_versioning_behavior=self.default_versioning_behavior._to_proto(),
739+
default_versioning_behavior=self.default_versioning_behavior.value,
736740
)
737741

738742

temporalio/worker/_workflow.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,14 @@ def __init__(
144144
if not_in_annotation:
145145
raise ValueError(
146146
f"Workflow {defn.name} must specify a versioning behavior using "
147-
"the `versioning_behavior` argument to `@workflow.run`."
147+
"the `versioning_behavior` argument to `@workflow.defn`."
148148
)
149149
else:
150-
if defn.dynamic_versioning_behavior is None:
150+
if not_in_annotation and defn.dynamic_versioning_behavior is None:
151151
raise ValueError(
152-
f"Dynamic Workflow {defn.name} must specify a versioning behavior "
153-
"using `@workflow.dynamic_versioning_behavior` or the "
154-
"`versioning_behavior` argument to `@workflow.run`."
152+
f"Dynamic Workflow {defn.cls.__qualname__} must specify a versioning "
153+
"behavior using `@workflow.dynamic_versioning_behavior` or the "
154+
"`versioning_behavior` argument to `@workflow.defn`."
155155
)
156156

157157
# Prepare the workflow with the runner (this will error in the

temporalio/worker/_workflow_instance.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ def activate(
346346
)
347347
self._current_completion.successful.SetInParent()
348348
self._current_completion.successful.versioning_behavior = (
349-
self._defn.versioning_behavior._to_proto()
349+
self._defn.versioning_behavior.value
350350
if self._defn.versioning_behavior
351351
else temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_UNSPECIFIED
352352
)
@@ -425,7 +425,7 @@ def activate(
425425
!= temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_UNSPECIFIED
426426
):
427427
self._current_completion.successful.versioning_behavior = (
428-
vb._to_proto()
428+
vb.value
429429
)
430430

431431
# If we're deleting, there better be no more tasks. It is important for

temporalio/workflow.py

Lines changed: 25 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,16 @@ def defn(
9090
name: Optional[str] = None,
9191
sandboxed: bool = True,
9292
failure_exception_types: Sequence[Type[BaseException]] = [],
93+
versioning_behavior: temporalio.common.VersioningBehavior = temporalio.common.VersioningBehavior.UNSPECIFIED,
9394
) -> Callable[[ClassType], ClassType]: ...
9495

9596

9697
@overload
9798
def defn(
98-
*, sandboxed: bool = True, dynamic: bool = False
99+
*,
100+
sandboxed: bool = True,
101+
dynamic: bool = False,
102+
versioning_behavior: temporalio.common.VersioningBehavior = temporalio.common.VersioningBehavior.UNSPECIFIED,
99103
) -> Callable[[ClassType], ClassType]: ...
100104

101105

@@ -106,6 +110,7 @@ def defn(
106110
sandboxed: bool = True,
107111
dynamic: bool = False,
108112
failure_exception_types: Sequence[Type[BaseException]] = [],
113+
versioning_behavior: temporalio.common.VersioningBehavior = temporalio.common.VersioningBehavior.UNSPECIFIED,
109114
):
110115
"""Decorator for workflow classes.
111116
@@ -127,6 +132,8 @@ def defn(
127132
applied in addition to ones set on the worker constructor. If
128133
``Exception`` is set, it effectively will fail a workflow/update in
129134
all user exception cases. WARNING: This setting is experimental.
135+
versioning_behavior: Specifies the versioning behavior to use for this workflow.
136+
WARNING: This setting is experimental.
130137
"""
131138

132139
def decorator(cls: ClassType) -> ClassType:
@@ -136,6 +143,7 @@ def decorator(cls: ClassType) -> ClassType:
136143
workflow_name=name or cls.__name__ if not dynamic else None,
137144
sandboxed=sandboxed,
138145
failure_exception_types=failure_exception_types,
146+
versioning_behavior=versioning_behavior,
139147
)
140148
return cls
141149

@@ -164,29 +172,7 @@ def init(
164172
return init_fn
165173

166174

167-
@dataclass(frozen=True)
168-
class _RunAttributes:
169-
versioning_behavior: temporalio.common.VersioningBehavior
170-
171-
172-
@overload
173-
def run(
174-
fn: CallableAsyncType,
175-
) -> CallableAsyncType: ...
176-
177-
178-
@overload
179-
def run(
180-
*,
181-
versioning_behavior: temporalio.common.VersioningBehavior = temporalio.common.VersioningBehavior.UNSPECIFIED,
182-
) -> Callable[[CallableAsyncType], CallableAsyncType]: ...
183-
184-
185-
def run(
186-
fn: Optional[CallableAsyncType] = None,
187-
*,
188-
versioning_behavior: temporalio.common.VersioningBehavior = temporalio.common.VersioningBehavior.UNSPECIFIED,
189-
):
175+
def run(fn: CallableAsyncType) -> CallableAsyncType:
190176
"""Decorator for the workflow run method.
191177
192178
This must be used on one and only one async method defined on the same class
@@ -199,36 +185,18 @@ def run(
199185
200186
Args:
201187
fn: The function to decorate.
202-
versioning_behavior: Specifies the versioning behavior to use for this workflow.
203-
WARNING: This setting is experimental.
204188
"""
205-
206-
def decorator(
207-
versioning_behavior: temporalio.common.VersioningBehavior, fn: CallableAsyncType
208-
) -> CallableAsyncType:
209-
if not inspect.iscoroutinefunction(fn):
210-
raise ValueError("Workflow run method must be an async function")
211-
# Disallow local classes because we need to have the class globally
212-
# referenceable by name
213-
if "<locals>" in fn.__qualname__:
214-
raise ValueError(
215-
"Local classes unsupported, @workflow.run cannot be on a local class"
216-
)
217-
setattr(
218-
fn,
219-
"__temporal_workflow_run",
220-
_RunAttributes(versioning_behavior=versioning_behavior),
221-
)
222-
# TODO(cretz): Why is MyPy unhappy with this return?
223-
return fn # type: ignore[return-value]
224-
225-
if fn is None:
226-
return partial(
227-
decorator,
228-
versioning_behavior,
189+
if not inspect.iscoroutinefunction(fn):
190+
raise ValueError("Workflow run method must be an async function")
191+
# Disallow local classes because we need to have the class globally
192+
# referenceable by name
193+
if "<locals>" in fn.__qualname__:
194+
raise ValueError(
195+
"Local classes unsupported, @workflow.run cannot be on a local class"
229196
)
230-
else:
231-
return decorator(versioning_behavior, fn)
197+
setattr(fn, "__temporal_workflow_run", True)
198+
# TODO(cretz): Why is MyPy unhappy with this return?
199+
return fn # type: ignore[return-value]
232200

233201

234202
class HandlerUnfinishedPolicy(Enum):
@@ -1534,6 +1502,7 @@ def _apply_to_class(
15341502
workflow_name: Optional[str],
15351503
sandboxed: bool,
15361504
failure_exception_types: Sequence[Type[BaseException]],
1505+
versioning_behavior: temporalio.common.VersioningBehavior,
15371506
) -> None:
15381507
# Check it's not being doubly applied
15391508
if _Definition.from_class(cls):
@@ -1543,14 +1512,14 @@ def _apply_to_class(
15431512
# Collect run fn and all signal/query/update fns
15441513
init_fn: Optional[Callable[..., None]] = None
15451514
run_fn: Optional[Callable[..., Awaitable[Any]]] = None
1546-
seen_run_attr: Optional[_RunAttributes] = None
1515+
seen_run_attr = False
15471516
signals: Dict[Optional[str], _SignalDefinition] = {}
15481517
queries: Dict[Optional[str], _QueryDefinition] = {}
15491518
updates: Dict[Optional[str], _UpdateDefinition] = {}
15501519
dynamic_versioning_behavior: Optional[CallableSyncNoParam] = None
15511520
for name, member in inspect.getmembers(cls):
15521521
if hasattr(member, "__temporal_workflow_run"):
1553-
seen_run_attr = getattr(member, "__temporal_workflow_run")
1522+
seen_run_attr = True
15541523
if not _is_unbound_method_on_cls(member, cls):
15551524
issues.append(
15561525
f"@workflow.run method {name} must be defined on {cls.__qualname__}"
@@ -1627,8 +1596,7 @@ def _apply_to_class(
16271596
):
16281597
continue
16291598
if hasattr(base_member, "__temporal_workflow_run"):
1630-
# TODO: Not sure this needs to exist?
1631-
# seen_run_attr = True
1599+
seen_run_attr = True
16321600
if not run_fn or base_member.__name__ != run_fn.__name__:
16331601
issues.append(
16341602
f"@workflow.run defined on {base_member.__qualname__} but not on the override"
@@ -1683,7 +1651,7 @@ def _apply_to_class(
16831651
updates=updates,
16841652
sandboxed=sandboxed,
16851653
failure_exception_types=failure_exception_types,
1686-
versioning_behavior=seen_run_attr.versioning_behavior,
1654+
versioning_behavior=versioning_behavior,
16871655
dynamic_versioning_behavior=dynamic_versioning_behavior,
16881656
)
16891657
setattr(cls, "__temporal_workflow_definition", defn)

0 commit comments

Comments
 (0)