Skip to content

Commit 2025f07

Browse files
authored
Add dynamic config function (#842)
1 parent 6f31556 commit 2025f07

File tree

5 files changed

+223
-19
lines changed

5 files changed

+223
-19
lines changed

temporalio/worker/_workflow.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,13 +136,18 @@ def __init__(
136136
if defn.name in self._workflows:
137137
raise ValueError(f"More than one workflow named {defn.name}")
138138
if should_enforce_versioning_behavior:
139-
if defn.versioning_behavior in [
140-
None,
141-
temporalio.common.VersioningBehavior.UNSPECIFIED,
142-
]:
139+
if (
140+
defn.versioning_behavior
141+
in [
142+
None,
143+
temporalio.common.VersioningBehavior.UNSPECIFIED,
144+
]
145+
and not defn.dynamic_config_fn
146+
):
143147
raise ValueError(
144148
f"Workflow {defn.name} must specify a versioning behavior using "
145-
"the `versioning_behavior` argument to `@workflow.defn`."
149+
"the `versioning_behavior` argument to `@workflow.defn` or by "
150+
"defining a function decorated with `@workflow.dynamic_config`."
146151
)
147152

148153
# Prepare the workflow with the runner (this will error in the

temporalio/worker/_workflow_instance.py

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,15 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
332332
# metadata query
333333
self._current_details = ""
334334

335+
# The versioning behavior of this workflow, as established by annotation or by the dynamic
336+
# config function. Is only set once upon initialization.
337+
self._versioning_behavior: Optional[temporalio.common.VersioningBehavior] = None
338+
339+
# Dynamic failure exception types as overridden by the dynamic config function
340+
self._dynamic_failure_exception_types: Optional[
341+
Sequence[type[BaseException]]
342+
] = None
343+
335344
def get_thread_id(self) -> Optional[int]:
336345
return self._current_thread_id
337346

@@ -348,11 +357,7 @@ def activate(
348357
temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion()
349358
)
350359
self._current_completion.successful.SetInParent()
351-
self._current_completion.successful.versioning_behavior = (
352-
self._defn.versioning_behavior.value
353-
if self._defn.versioning_behavior
354-
else temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_UNSPECIFIED
355-
)
360+
356361
self._current_activation_error: Optional[Exception] = None
357362
self._deployment_version_for_current_task = (
358363
act.deployment_version_for_current_task
@@ -419,6 +424,12 @@ def activate(
419424
)
420425
activation_err = None
421426

427+
# Apply versioning behavior if one was established
428+
if self._versioning_behavior:
429+
self._current_completion.successful.versioning_behavior = (
430+
self._versioning_behavior.value
431+
)
432+
422433
# If we're deleting, there better be no more tasks. It is important for
423434
# the integrity of the system that we check this. If there are tasks
424435
# remaining, they and any associated coroutines will get garbage
@@ -439,7 +450,6 @@ def activate(
439450
)
440451
# Set completion failure
441452
self._current_completion.failed.failure.SetInParent()
442-
# TODO: Review - odd that we don't un-set success here?
443453
try:
444454
self._failure_converter.to_failure(
445455
activation_err,
@@ -1728,19 +1738,53 @@ def _convert_payloads(
17281738
def _instantiate_workflow_object(self) -> Any:
17291739
if not self._workflow_input:
17301740
raise RuntimeError("Expected workflow input. This is a Python SDK bug.")
1741+
17311742
if hasattr(self._defn.cls.__init__, "__temporal_workflow_init"):
1732-
return self._defn.cls(*self._workflow_input.args)
1743+
workflow_instance = self._defn.cls(*self._workflow_input.args)
17331744
else:
1734-
return self._defn.cls()
1745+
workflow_instance = self._defn.cls()
1746+
1747+
if self._defn.versioning_behavior:
1748+
self._versioning_behavior = self._defn.versioning_behavior
1749+
# If there's a dynamic config function, call it now after we've instantiated the object
1750+
# but before we start executing the workflow
1751+
if self._defn.name is None and self._defn.dynamic_config_fn is not None:
1752+
dynamic_config = None
1753+
try:
1754+
with self._as_read_only():
1755+
dynamic_config = self._defn.dynamic_config_fn(workflow_instance)
1756+
except Exception as err:
1757+
logger.exception(
1758+
f"Failed to run dynamic config function in workflow {self._info.workflow_type}"
1759+
)
1760+
# Treat as a task failure
1761+
self._current_activation_error = err
1762+
raise self._current_activation_error
1763+
1764+
if dynamic_config:
1765+
if dynamic_config.failure_exception_types is not None:
1766+
self._dynamic_failure_exception_types = (
1767+
dynamic_config.failure_exception_types
1768+
)
1769+
if (
1770+
dynamic_config.versioning_behavior
1771+
!= temporalio.common.VersioningBehavior.UNSPECIFIED
1772+
):
1773+
self._versioning_behavior = dynamic_config.versioning_behavior
1774+
1775+
return workflow_instance
17351776

17361777
def _is_workflow_failure_exception(self, err: BaseException) -> bool:
17371778
# An exception is a failure instead of a task fail if it's already a
17381779
# failure error or if it is a timeout error or if it is an instance of
17391780
# any of the failure types in the worker or workflow-level setting
1781+
wf_failure_exception_types = self._defn.failure_exception_types
1782+
if self._dynamic_failure_exception_types is not None:
1783+
wf_failure_exception_types = self._dynamic_failure_exception_types
17401784
return (
17411785
isinstance(err, temporalio.exceptions.FailureError)
17421786
or isinstance(err, asyncio.TimeoutError)
1743-
or any(isinstance(err, typ) for typ in self._defn.failure_exception_types)
1787+
or any(isinstance(err, typ) for typ in wf_failure_exception_types)
17441788
or any(
17451789
isinstance(err, typ)
17461790
for typ in self._worker_level_failure_exception_types

temporalio/workflow.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import asyncio
66
import contextvars
7+
import dataclasses
78
import inspect
89
import logging
910
import threading
@@ -415,6 +416,61 @@ def decorator(
415416
return decorator(fn.__name__, description, fn, bypass_async_check=True)
416417

417418

419+
@dataclass(frozen=True)
420+
class DynamicWorkflowConfig:
421+
"""Returned by functions using the :py:func:`dynamic_config` decorator, see it for more."""
422+
423+
failure_exception_types: Optional[Sequence[Type[BaseException]]] = None
424+
"""The types of exceptions that, if a workflow-thrown exception extends, will cause the
425+
workflow/update to fail instead of suspending the workflow via task failure. These are applied
426+
in addition to ones set on the worker constructor. If ``Exception`` is set, it effectively will
427+
fail a workflow/update in all user exception cases.
428+
429+
Always overrides the equivalent parameter on :py:func:`defn` if set not-None.
430+
431+
WARNING: This setting is experimental.
432+
"""
433+
versioning_behavior: temporalio.common.VersioningBehavior = (
434+
temporalio.common.VersioningBehavior.UNSPECIFIED
435+
)
436+
"""Specifies the versioning behavior to use for this workflow.
437+
438+
Always overrides the equivalent parameter on :py:func:`defn`.
439+
440+
WARNING: This setting is experimental.
441+
"""
442+
443+
444+
def dynamic_config(
445+
fn: MethodSyncNoParam[SelfType, DynamicWorkflowConfig],
446+
) -> MethodSyncNoParam[SelfType, DynamicWorkflowConfig]:
447+
"""Decorator to allow configuring a dynamic workflow's behavior.
448+
449+
Because dynamic workflows may conceptually represent more than one workflow type, it may be
450+
desirable to have different settings for fields that would normally be passed to
451+
:py:func:`defn`, but vary based on the workflow type name or other information available in
452+
the workflow's context. This function will be called after the workflow's :py:func:`init`,
453+
if it has one, but before the workflow's :py:func:`run` method.
454+
455+
The method must only take self as a parameter, and any values set in the class it returns will
456+
override those provided to :py:func:`defn`.
457+
458+
Cannot be specified on non-dynamic workflows.
459+
460+
Args:
461+
fn: The function to decorate.
462+
"""
463+
if inspect.iscoroutinefunction(fn):
464+
raise ValueError("Workflow dynamic_config method must be synchronous")
465+
params = list(inspect.signature(fn).parameters.values())
466+
if len(params) != 1:
467+
raise ValueError("Workflow dynamic_config method must only take self parameter")
468+
469+
# Add marker attribute
470+
setattr(fn, "__temporal_workflow_dynamic_config", True)
471+
return fn
472+
473+
418474
@dataclass(frozen=True)
419475
class Info:
420476
"""Information about the running workflow.
@@ -1449,6 +1505,7 @@ class _Definition:
14491505
arg_types: Optional[List[Type]] = None
14501506
ret_type: Optional[Type] = None
14511507
versioning_behavior: Optional[temporalio.common.VersioningBehavior] = None
1508+
dynamic_config_fn: Optional[Callable[..., DynamicWorkflowConfig]] = None
14521509

14531510
@staticmethod
14541511
def from_class(cls: Type) -> Optional[_Definition]:
@@ -1513,6 +1570,7 @@ def _apply_to_class(
15131570
# Collect run fn and all signal/query/update fns
15141571
init_fn: Optional[Callable[..., None]] = None
15151572
run_fn: Optional[Callable[..., Awaitable[Any]]] = None
1573+
dynamic_config_fn: Optional[Callable[..., DynamicWorkflowConfig]] = None
15161574
seen_run_attr = False
15171575
signals: Dict[Optional[str], _SignalDefinition] = {}
15181576
queries: Dict[Optional[str], _QueryDefinition] = {}
@@ -1560,6 +1618,17 @@ def _apply_to_class(
15601618
queries[query_defn.name] = query_defn
15611619
elif name == "__init__" and hasattr(member, "__temporal_workflow_init"):
15621620
init_fn = member
1621+
elif hasattr(member, "__temporal_workflow_dynamic_config"):
1622+
if workflow_name:
1623+
issues.append(
1624+
"@workflow.dynamic_config can only be used in dynamic workflows, but "
1625+
f"workflow class {workflow_name} ({cls.__name__}) is not dynamic"
1626+
)
1627+
if dynamic_config_fn:
1628+
issues.append(
1629+
"@workflow.dynamic_config can only be defined once per workflow"
1630+
)
1631+
dynamic_config_fn = member
15631632
elif isinstance(member, UpdateMethodMultiParam):
15641633
update_defn = member._defn
15651634
if update_defn.name in updates:
@@ -1643,6 +1712,7 @@ def _apply_to_class(
16431712
sandboxed=sandboxed,
16441713
failure_exception_types=failure_exception_types,
16451714
versioning_behavior=versioning_behavior,
1715+
dynamic_config_fn=dynamic_config_fn,
16461716
)
16471717
setattr(cls, "__temporal_workflow_definition", defn)
16481718
setattr(run_fn, "__temporal_workflow_definition", defn)

tests/worker/test_worker.py

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
WorkerTuner,
4444
WorkflowSlotInfo,
4545
)
46-
from temporalio.workflow import VersioningIntent
46+
from temporalio.workflow import DynamicWorkflowConfig, VersioningIntent
4747
from tests.helpers import (
4848
assert_eventually,
4949
find_free_port,
@@ -795,8 +795,24 @@ async def run(self, args: Sequence[RawValue]) -> str:
795795
return "dynamic"
796796

797797

798-
async def test_worker_deployment_dynamic_workflow_on_run(
799-
client: Client, env: WorkflowEnvironment
798+
@workflow.defn(dynamic=True, versioning_behavior=VersioningBehavior.PINNED)
799+
class DynamicWorkflowVersioningOnConfigMethod:
800+
@workflow.dynamic_config
801+
def dynamic_config(self) -> DynamicWorkflowConfig:
802+
return DynamicWorkflowConfig(
803+
versioning_behavior=VersioningBehavior.AUTO_UPGRADE
804+
)
805+
806+
@workflow.run
807+
async def run(self, args: Sequence[RawValue]) -> str:
808+
return "dynamic"
809+
810+
811+
async def _test_worker_deployment_dynamic_workflow(
812+
client: Client,
813+
env: WorkflowEnvironment,
814+
workflow_class,
815+
expected_versioning_behavior: temporalio.api.enums.v1.VersioningBehavior.ValueType,
800816
):
801817
if env.supports_time_skipping:
802818
pytest.skip("Test Server doesn't support worker deployments")
@@ -806,7 +822,7 @@ async def test_worker_deployment_dynamic_workflow_on_run(
806822

807823
async with new_worker(
808824
client,
809-
DynamicWorkflowVersioningOnDefn,
825+
workflow_class,
810826
deployment_config=WorkerDeploymentConfig(
811827
version=worker_v1,
812828
use_worker_versioning=True,
@@ -832,11 +848,33 @@ async def test_worker_deployment_dynamic_workflow_on_run(
832848
assert any(
833849
event.HasField("workflow_task_completed_event_attributes")
834850
and event.workflow_task_completed_event_attributes.versioning_behavior
835-
== temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED
851+
== expected_versioning_behavior
836852
for event in history.events
837853
)
838854

839855

856+
async def test_worker_deployment_dynamic_workflow_with_pinned_versioning(
857+
client: Client, env: WorkflowEnvironment
858+
):
859+
await _test_worker_deployment_dynamic_workflow(
860+
client,
861+
env,
862+
DynamicWorkflowVersioningOnDefn,
863+
temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED,
864+
)
865+
866+
867+
async def test_worker_deployment_dynamic_workflow_with_auto_upgrade_versioning(
868+
client: Client, env: WorkflowEnvironment
869+
):
870+
await _test_worker_deployment_dynamic_workflow(
871+
client,
872+
env,
873+
DynamicWorkflowVersioningOnConfigMethod,
874+
temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_AUTO_UPGRADE,
875+
)
876+
877+
840878
@workflow.defn
841879
class NoVersioningAnnotationWorkflow:
842880
@workflow.run

tests/worker/test_workflow.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4878,6 +4878,18 @@ async def run(self, scenario: FailureTypesScenario) -> None:
48784878
await super().run(scenario)
48794879

48804880

4881+
class FailureTypesConfiguredDynamicConfig(FailureTypesWorkflowBase):
4882+
@workflow.dynamic_config
4883+
def dynamic_config(self) -> temporalio.workflow.DynamicWorkflowConfig:
4884+
return temporalio.workflow.DynamicWorkflowConfig(
4885+
failure_exception_types=[Exception]
4886+
)
4887+
4888+
@workflow.run
4889+
async def run(self, scenario: FailureTypesScenario) -> None:
4890+
await super().run(scenario)
4891+
4892+
48814893
async def test_workflow_failure_types_configured(client: Client):
48824894
# Asserter for a single scenario
48834895
async def assert_scenario(
@@ -5047,6 +5059,15 @@ async def run_scenario(
50475059
FailureTypesConfiguredInheritedWorkflow,
50485060
FailureTypesScenario.CAUSE_NON_DETERMINISM,
50495061
)
5062+
# When configured at the workflow level dynamically
5063+
await run_scenario(
5064+
FailureTypesConfiguredDynamicConfig,
5065+
FailureTypesScenario.THROW_CUSTOM_EXCEPTION,
5066+
)
5067+
await run_scenario(
5068+
FailureTypesConfiguredDynamicConfig,
5069+
FailureTypesScenario.CAUSE_NON_DETERMINISM,
5070+
)
50505071

50515072

50525073
@workflow.defn(failure_exception_types=[Exception])
@@ -7374,3 +7395,29 @@ async def test_expose_root_execution(client: Client, env: WorkflowEnvironment):
73747395
assert child_wf_info_root is not None
73757396
assert child_wf_info_root.workflow_id == parent_desc.id
73767397
assert child_wf_info_root.run_id == parent_desc.run_id
7398+
7399+
7400+
@workflow.defn(dynamic=True)
7401+
class WorkflowDynamicConfigFnFailure:
7402+
@workflow.dynamic_config
7403+
def dynamic_config(self) -> temporalio.workflow.DynamicWorkflowConfig:
7404+
raise Exception("Dynamic config failure")
7405+
7406+
@workflow.run
7407+
async def run(self, args: Sequence[RawValue]) -> None:
7408+
raise RuntimeError("Should never actually run")
7409+
7410+
7411+
async def test_workflow_dynamic_config_failure(client: Client):
7412+
async with new_worker(client, WorkflowDynamicConfigFnFailure) as worker:
7413+
handle = await client.start_workflow(
7414+
"verycooldynamicworkflow",
7415+
id=f"dynamic-config-failure-{uuid.uuid4()}",
7416+
task_queue=worker.task_queue,
7417+
execution_timeout=timedelta(seconds=5),
7418+
)
7419+
7420+
# Assert workflow task fails with our expected error message
7421+
await assert_task_fail_eventually(
7422+
handle, message_contains="Dynamic config failure"
7423+
)

0 commit comments

Comments
 (0)