Skip to content

Commit 12572b0

Browse files
committed
Dynamic workflows
1 parent 29870b1 commit 12572b0

File tree

5 files changed

+163
-15
lines changed

5 files changed

+163
-15
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ class GreetingWorkflow:
510510
start_to_close_timeout=timedelta(seconds=5),
511511
)
512512
workflow.logger.debug("Greeting set to %s", self._current_greeting)
513-
513+
514514
# Wait for salutation update or complete signal (this can be
515515
# cancelled)
516516
await asyncio.wait(
@@ -536,7 +536,7 @@ class GreetingWorkflow:
536536
@workflow.query
537537
def current_greeting(self) -> str:
538538
return self._current_greeting
539-
539+
540540
@workflow.update
541541
def set_and_get_greeting(self, greeting: str) -> str:
542542
old = self._current_greeting
@@ -622,7 +622,7 @@ Here are the decorators that can be applied:
622622
* May mutate workflow state, and make calls to other workflow APIs like starting activities, etc.
623623
* Also accepts the `name` and `dynamic` parameters like signal, with the same semantics.
624624
* Update handlers may optionally define a validator method by decorating it with `@update_handler_method.validator`.
625-
To reject an update before any events are written to history, throw an exception in a validator. Validators cannot
625+
To reject an update before any events are written to history, throw an exception in a validator. Validators cannot
626626
be `async`, cannot mutate workflow state, and return nothing.
627627
* See [Signal and update handlers](#signal-and-update-handlers) below
628628
* `@workflow.query` - Defines a method as a query
@@ -994,7 +994,7 @@ To run an entire workflow outside of a sandbox, set `sandboxed=False` on the `@w
994994
it. This will run the entire workflow outside of the workflow which means it can share global state and other bad
995995
things.
996996

997-
To disable the sandbox entirely for a worker, set the `Worker` init's `workflow_runner` keyword argument to
997+
To disable the sandbox entirely for a worker, set the `Worker` init's `workflow_runner` keyword argument to
998998
`temporalio.worker.UnsandboxedWorkflowRunner()`. This value is defaulted to
999999
`temporalio.worker.workflow_sandbox.SandboxedWorkflowRunner()` so by changing it to the unsandboxed runner, the sandbox
10001000
will not be used at all.

temporalio/worker/_workflow_instance.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,20 @@ def activate(
414414
)
415415
activation_err = None
416416

417+
# If this workflow is dynamic, try calling the dynamic versioning behavior
418+
if self._defn.name is None:
419+
dvb = self.workflow_get_dynamic_versioning_behavior()
420+
if dvb:
421+
with self._as_read_only():
422+
vb = dvb()
423+
if (
424+
vb
425+
!= temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_UNSPECIFIED
426+
):
427+
self._current_completion.successful.versioning_behavior = (
428+
vb._to_proto()
429+
)
430+
417431
# If we're deleting, there better be no more tasks. It is important for
418432
# the integrity of the system that we check this. If there are tasks
419433
# remaining, they and any associated coroutines will get garbage
@@ -434,6 +448,7 @@ def activate(
434448
)
435449
# Set completion failure
436450
self._current_completion.failed.failure.SetInParent()
451+
# TODO: Review - odd that we don't un-set success here?
437452
try:
438453
self._failure_converter.to_failure(
439454
activation_err,
@@ -1025,6 +1040,14 @@ def workflow_get_update_validator(self, name: Optional[str]) -> Optional[Callabl
10251040
# Bind if a method
10261041
return defn.bind_validator(self._object) if defn.is_method else defn.validator
10271042

1043+
def workflow_get_dynamic_versioning_behavior(
1044+
self,
1045+
) -> Optional[Callable[[], temporalio.common.VersioningBehavior]]:
1046+
defn = self._defn.dynamic_versioning_behavior
1047+
if not defn:
1048+
return None
1049+
return temporalio.workflow._bind_method(self._object, defn)
1050+
10281051
def workflow_info(self) -> temporalio.workflow.Info:
10291052
return self._outbound.info()
10301053

temporalio/workflow.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,6 @@ def defn(
130130
applied in addition to ones set on the worker constructor. If
131131
``Exception`` is set, it effectively will fail a workflow/update in
132132
all user exception cases. WARNING: This setting is experimental.
133-
versioning_behavior: Specifies when this workflow might move from a worker
134-
of one Build Id to another. WARNING: This setting is experimental.
135133
"""
136134

137135
def decorator(cls: ClassType) -> ClassType:
@@ -205,6 +203,7 @@ def run(
205203
Args:
206204
fn: The function to decorate.
207205
versioning_behavior: Specifies the versioning behavior to use for this workflow.
206+
WARNING: This setting is experimental.
208207
"""
209208

210209
def decorator(
@@ -259,6 +258,25 @@ class UnfinishedSignalHandlersWarning(RuntimeWarning):
259258
"""The workflow exited before all signal handlers had finished executing."""
260259

261260

261+
def dynamic_versioning_behavior(
262+
fn: MethodSyncNoParam[SelfType, temporalio.common.VersioningBehavior],
263+
) -> MethodSyncNoParam[SelfType, temporalio.common.VersioningBehavior]:
264+
"""Decorator for specifying versioning behavior in dynamic workflows.
265+
266+
This function must be read-only and not mutate workflow state. Any mutation could lead to
267+
nondeterministic behavior. If the workflow function specifies a `versioning_behavior` on
268+
both ``@workflow.run`` and with a getter, the getter's value will be used whenever it is not
269+
``VersioningBehavior.UNSPECIFIED``.
270+
271+
WARNING: This setting is experimental.
272+
273+
Args:
274+
fn: The method to decorate
275+
"""
276+
setattr(fn, "__temporal_dynamic_versioning_behavior", True)
277+
return fn
278+
279+
262280
@overload
263281
def signal(
264282
fn: CallableSyncOrAsyncReturnNoneType,
@@ -1464,6 +1482,7 @@ class _Definition:
14641482
arg_types: Optional[List[Type]] = None
14651483
ret_type: Optional[Type] = None
14661484
versioning_behavior: Optional[temporalio.common.VersioningBehavior] = None
1485+
dynamic_versioning_behavior: Optional[CallableSyncNoParam] = None
14671486

14681487
@staticmethod
14691488
def from_class(cls: Type) -> Optional[_Definition]:
@@ -1531,6 +1550,7 @@ def _apply_to_class(
15311550
signals: Dict[Optional[str], _SignalDefinition] = {}
15321551
queries: Dict[Optional[str], _QueryDefinition] = {}
15331552
updates: Dict[Optional[str], _UpdateDefinition] = {}
1553+
dynamic_versioning_behavior: Optional[CallableSyncNoParam] = None
15341554
for name, member in inspect.getmembers(cls):
15351555
if hasattr(member, "__temporal_workflow_run"):
15361556
seen_run_attr = getattr(member, "__temporal_workflow_run")
@@ -1574,6 +1594,15 @@ def _apply_to_class(
15741594
queries[query_defn.name] = query_defn
15751595
elif name == "__init__" and hasattr(member, "__temporal_workflow_init"):
15761596
init_fn = member
1597+
elif hasattr(member, "__temporal_dynamic_versioning_behavior"):
1598+
if workflow_name:
1599+
issues.append(
1600+
"Non-dynamic workflows should not specify "
1601+
"@workflow.dynamic_versioning_behavior, which was found on "
1602+
f"{cls.__qualname__}.{name}. Use the versioning_behavior "
1603+
"argument to @workflow.run instead."
1604+
)
1605+
dynamic_versioning_behavior = member
15771606
elif isinstance(member, UpdateMethodMultiParam):
15781607
update_defn = member._defn
15791608
if update_defn.name in updates:
@@ -1658,6 +1687,7 @@ def _apply_to_class(
16581687
sandboxed=sandboxed,
16591688
failure_exception_types=failure_exception_types,
16601689
versioning_behavior=seen_run_attr.versioning_behavior,
1690+
dynamic_versioning_behavior=dynamic_versioning_behavior,
16611691
)
16621692
setattr(cls, "__temporal_workflow_definition", defn)
16631693
setattr(run_fn, "__temporal_workflow_definition", defn)

tests/test_workflow.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -225,12 +225,16 @@ def update2(self, arg1: str):
225225
def base_update(self):
226226
pass
227227

228+
@workflow.dynamic_versioning_behavior
229+
def i_shouldnt_exist(self) -> VersioningBehavior:
230+
return VersioningBehavior.PINNED
231+
228232

229233
def test_workflow_defn_bad():
230234
with pytest.raises(ValueError) as err:
231235
workflow.defn(BadDefn)
232236

233-
assert "Invalid workflow class for 9 reasons" in str(err.value)
237+
assert "Invalid workflow class for 10 reasons" in str(err.value)
234238
assert "Missing @workflow.run method" in str(err.value)
235239
assert (
236240
"Multiple signal methods found for signal1 (at least on signal2 and signal1)"
@@ -264,6 +268,10 @@ def test_workflow_defn_bad():
264268
"@workflow.update defined on BadDefnBase.base_update but not on the override"
265269
in str(err.value)
266270
)
271+
assert (
272+
"Non-dynamic workflows should not specify @workflow.dynamic_versioning_behavior, which "
273+
"was found on BadDefn.i_shouldnt_exist" in str(err.value)
274+
)
267275

268276

269277
def test_workflow_defn_local_class():
@@ -422,9 +430,9 @@ def test_parameters_identical_up_to_naming():
422430
for f1, f2 in itertools.combinations(fns, 2):
423431
name1, name2 = f1.__name__, f2.__name__
424432
expect_equal = name1[0] == name2[0]
425-
assert (
426-
workflow._parameters_identical_up_to_naming(f1, f2) == (expect_equal)
427-
), f"expected {name1} and {name2} parameters{' ' if expect_equal else ' not '}to compare equal"
433+
assert workflow._parameters_identical_up_to_naming(f1, f2) == (expect_equal), (
434+
f"expected {name1} and {name2} parameters{' ' if expect_equal else ' not '}to compare equal"
435+
)
428436

429437

430438
@workflow.defn

tests/worker/test_worker.py

Lines changed: 92 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
import concurrent.futures
55
import uuid
66
from datetime import timedelta
7-
from typing import Any, Awaitable, Callable, Optional
7+
from typing import Any, Awaitable, Callable, Optional, Sequence
88

99
import pytest
1010

11+
import temporalio.api.enums.v1
1112
import temporalio.worker._worker
1213
from temporalio import activity, workflow
1314
from temporalio.api.workflowservice.v1 import (
@@ -19,7 +20,7 @@
1920
SetWorkerDeploymentRampingVersionResponse,
2021
)
2122
from temporalio.client import BuildIdOpAddNewDefault, Client, TaskReachabilityType
22-
from temporalio.common import VersioningBehavior
23+
from temporalio.common import RawValue, VersioningBehavior
2324
from temporalio.service import RPCError
2425
from temporalio.testing import WorkflowEnvironment
2526
from temporalio.worker import (
@@ -729,13 +730,13 @@ async def test_worker_deployment_ramp(client: Client, env: WorkflowEnvironment):
729730
await set_ramping_version(client, conflict_token, v2, 0)
730731
).conflict_token
731732
for i in range(3):
732-
wf = await client.start_workflow(
733+
wfa = await client.start_workflow(
733734
DeploymentVersioningWorkflowV1AutoUpgrade.run,
734735
id=f"versioning-ramp-0-{i}-{uuid.uuid4()}",
735736
task_queue=w1.task_queue,
736737
)
737-
await wf.signal(DeploymentVersioningWorkflowV1AutoUpgrade.do_finish)
738-
res = await wf.result()
738+
await wfa.signal(DeploymentVersioningWorkflowV1AutoUpgrade.do_finish)
739+
res = await wfa.result()
739740
assert res == "version-v1"
740741

741742
# Set ramp to 50 and eventually verify workflows run on both versions
@@ -759,6 +760,92 @@ async def check_results():
759760
await assert_eventually(check_results)
760761

761762

763+
@workflow.defn(dynamic=True)
764+
class DynamicWorkflowVersioningOnRun:
765+
@workflow.run(versioning_behavior=VersioningBehavior.PINNED)
766+
async def run(self, args: Sequence[RawValue]) -> str:
767+
return "dynamic"
768+
769+
770+
@workflow.defn(dynamic=True)
771+
class DynamicWorkflowVersioningWithGetter:
772+
@workflow.run
773+
async def run(self, args: Sequence[RawValue]) -> str:
774+
return "dynamic"
775+
776+
@workflow.dynamic_versioning_behavior
777+
def huh(self) -> VersioningBehavior:
778+
return VersioningBehavior.PINNED
779+
780+
781+
async def _test_dynamic_workflow_versioning(
782+
client: Client, workflow_class, expected_versioning_behavior
783+
):
784+
deployment_name = f"deployment-dynamic-{uuid.uuid4()}"
785+
worker_v1 = WorkerDeploymentVersion(deployment_name=deployment_name, build_id="1.0")
786+
787+
async with new_worker(
788+
client,
789+
workflow_class,
790+
deployment_options=WorkerDeploymentOptions(
791+
version=worker_v1,
792+
use_worker_versioning=True,
793+
),
794+
) as w:
795+
describe_resp = await wait_until_worker_deployment_visible(
796+
client,
797+
worker_v1,
798+
)
799+
await set_current_deployment_version(
800+
client, describe_resp.conflict_token, worker_v1
801+
)
802+
803+
wf = await client.start_workflow(
804+
"cooldynamicworkflow",
805+
id=f"dynamic-workflow-versioning-{uuid.uuid4()}",
806+
task_queue=w.task_queue,
807+
)
808+
result = await wf.result()
809+
assert result == "dynamic"
810+
811+
history = await wf.fetch_history()
812+
assert any(
813+
event.HasField("workflow_task_completed_event_attributes")
814+
and event.workflow_task_completed_event_attributes.versioning_behavior
815+
== expected_versioning_behavior
816+
for event in history.events
817+
)
818+
819+
820+
async def test_worker_deployment_dynamic_workflow_on_run(
821+
client: Client, env: WorkflowEnvironment
822+
):
823+
if env.supports_time_skipping:
824+
pytest.skip("Test Server doesn't support worker deployments")
825+
826+
await _test_dynamic_workflow_versioning(
827+
client,
828+
DynamicWorkflowVersioningOnRun,
829+
temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED,
830+
)
831+
832+
833+
async def test_worker_deployment_dynamic_workflow_getter(
834+
client: Client, env: WorkflowEnvironment
835+
):
836+
if env.supports_time_skipping:
837+
pytest.skip("Test Server doesn't support worker deployments")
838+
839+
await _test_dynamic_workflow_versioning(
840+
client,
841+
DynamicWorkflowVersioningWithGetter,
842+
temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED,
843+
)
844+
845+
846+
# TODO: Test for fail at registration time if deployment versioning on, no default, no behavior
847+
848+
762849
async def wait_until_worker_deployment_visible(
763850
client: Client, version: WorkerDeploymentVersion
764851
) -> DescribeWorkerDeploymentResponse:

0 commit comments

Comments
 (0)