From defe0bbfcbb5c1fc6400439f3ba20f49d14b9e0f Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Wed, 9 Jul 2025 15:40:44 -0700 Subject: [PATCH 1/8] Initial rough framework for plugins --- temporalio/client.py | 46 +++++++++++++++++++++++++++++-- temporalio/worker/__init__.py | 2 ++ temporalio/worker/_worker.py | 39 +++++++++++++++++++++++++- tests/test_client.py | 29 +++++++++++++++++++ tests/worker/test_worker.py | 52 +++++++++++++++++++++++++++++++++-- 5 files changed, 163 insertions(+), 5 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 0aab85465..2042c9b4d 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -120,6 +120,7 @@ async def connect( runtime: Optional[temporalio.runtime.Runtime] = None, http_connect_proxy_config: Optional[HttpConnectProxyConfig] = None, header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC, + plugins: Sequence[Plugin] = [], ) -> Client: """Connect to a Temporal server. @@ -178,13 +179,21 @@ async def connect( runtime=runtime, http_connect_proxy_config=http_connect_proxy_config, ) + + root_plugin: Plugin = _RootPlugin() + for plugin in reversed(list(plugins)): + root_plugin = plugin.init_client_plugin(root_plugin) + + service_client = await root_plugin.connect_service_client(connect_config) + return Client( - await temporalio.service.ServiceClient.connect(connect_config), + service_client, namespace=namespace, data_converter=data_converter, interceptors=interceptors, default_workflow_query_reject_condition=default_workflow_query_reject_condition, header_codec_behavior=header_codec_behavior, + plugins=plugins, ) def __init__( @@ -198,6 +207,7 @@ def __init__( temporalio.common.QueryRejectCondition ] = None, header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC, + plugins: Sequence[Plugin] = [], ): """Create a Temporal client from a service client. @@ -209,15 +219,22 @@ def __init__( self._impl = interceptor.intercept_client(self._impl) # Store the config for tracking - self._config = ClientConfig( + config = ClientConfig( service_client=service_client, namespace=namespace, data_converter=data_converter, interceptors=interceptors, default_workflow_query_reject_condition=default_workflow_query_reject_condition, header_codec_behavior=header_codec_behavior, + plugins=plugins, ) + root_plugin: Plugin = _RootPlugin() + for plugin in reversed(list(plugins)): + root_plugin = plugin.init_client_plugin(root_plugin) + + self._config = root_plugin.on_create_client(config) + def config(self) -> ClientConfig: """Config, as a dictionary, used to create this client. @@ -1510,6 +1527,7 @@ class ClientConfig(TypedDict, total=False): Optional[temporalio.common.QueryRejectCondition] ] header_codec_behavior: Required[HeaderCodecBehavior] + plugins: Required[Sequence[Plugin]] class WorkflowHistoryEventFilterType(IntEnum): @@ -7367,3 +7385,27 @@ async def _decode_user_metadata( if not metadata.HasField("details") else (await converter.decode([metadata.details]))[0], ) + + +class Plugin: + def init_client_plugin(self, next: Plugin) -> Plugin: + self.next_client_plugin = next + return self + + def on_create_client(self, config: ClientConfig) -> ClientConfig: + return self.next_client_plugin.on_create_client(config) + + async def connect_service_client( + self, config: temporalio.service.ConnectConfig + ) -> temporalio.service.ServiceClient: + return await self.next_client_plugin.connect_service_client(config) + + +class _RootPlugin(Plugin): + def on_create_client(self, config: ClientConfig) -> ClientConfig: + return config + + async def connect_service_client( + self, config: temporalio.service.ConnectConfig + ) -> temporalio.service.ServiceClient: + return await temporalio.service.ServiceClient.connect(config) diff --git a/temporalio/worker/__init__.py b/temporalio/worker/__init__.py index 83a66e67f..6e062afcc 100644 --- a/temporalio/worker/__init__.py +++ b/temporalio/worker/__init__.py @@ -44,6 +44,7 @@ WorkflowSlotInfo, ) from ._worker import ( + Plugin, PollerBehavior, PollerBehaviorAutoscaling, PollerBehaviorSimpleMaximum, @@ -78,6 +79,7 @@ "ActivityOutboundInterceptor", "WorkflowInboundInterceptor", "WorkflowOutboundInterceptor", + "Plugin", # Interceptor input "ContinueAsNewInput", "ExecuteActivityInput", diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 4d77e111e..3cf1d62bc 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -96,6 +96,26 @@ def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior: ] +class Plugin: + def init_worker_plugin(self, next: Plugin) -> Plugin: + self.next_worker_plugin = next + return self + + def on_create_worker(self, config: WorkerConfig) -> WorkerConfig: + return self.next_worker_plugin.on_create_worker(config) + + async def run_worker(self, worker: Worker) -> None: + await self.next_worker_plugin.run_worker(worker) + + +class _RootPlugin(Plugin): + def on_create_worker(self, config: WorkerConfig) -> WorkerConfig: + return config + + async def run_worker(self, worker: Worker) -> None: + await worker._run() + + class Worker: """Worker to process workflows and/or activities. @@ -153,6 +173,7 @@ def __init__( nexus_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum( maximum=5 ), + plugins: Sequence[Plugin] = [], ) -> None: """Create a worker to process workflows and/or activities. @@ -343,11 +364,17 @@ def __init__( ) interceptors = interceptors_from_client + list(interceptors) + plugins_from_client = cast( + List[Plugin], [p for p in client_config["plugins"] if isinstance(p, Plugin)] + ) + plugins = plugins_from_client + list(plugins) + print(plugins) + # Extract the bridge service client bridge_client = _extract_bridge_client_for_worker(client) # Store the config for tracking - self._config = WorkerConfig( + config = WorkerConfig( client=client, task_queue=task_queue, activities=activities, @@ -382,6 +409,13 @@ def __init__( use_worker_versioning=use_worker_versioning, disable_safe_workflow_eviction=disable_safe_workflow_eviction, ) + + root_plugin: Plugin = _RootPlugin() + for plugin in reversed(list(plugins)): + root_plugin = plugin.init_worker_plugin(root_plugin) + self._config = root_plugin.on_create_worker(config) + self._plugin = root_plugin + self._started = False self._shutdown_event = asyncio.Event() self._shutdown_complete_event = asyncio.Event() @@ -646,6 +680,9 @@ async def run(self) -> None: also cancel the shutdown process. Therefore users are encouraged to use explicit shutdown instead. """ + await self._plugin.run_worker(self) + + async def _run(self): # Eagerly validate which will do a namespace check in Core await self._bridge_worker.validate() diff --git a/tests/test_client.py b/tests/test_client.py index 418d9ff53..0e718e6be 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -41,9 +41,11 @@ BuildIdOpPromoteSetByBuildId, CancelWorkflowInput, Client, + ClientConfig, CloudOperationsClient, Interceptor, OutboundInterceptor, + Plugin, QueryWorkflowInput, RPCError, RPCStatusCode, @@ -1499,3 +1501,30 @@ async def test_cloud_client_simple(): GetNamespaceRequest(namespace=os.environ["TEMPORAL_CLIENT_CLOUD_NAMESPACE"]) ) assert os.environ["TEMPORAL_CLIENT_CLOUD_NAMESPACE"] == result.namespace.namespace + + +class MyPlugin(Plugin): + def on_create_client(self, config: ClientConfig) -> ClientConfig: + config["namespace"] = "replaced_namespace" + return super().on_create_client(config) + + async def connect_service_client( + self, config: temporalio.service.ConnectConfig + ) -> temporalio.service.ServiceClient: + config.api_key = "replaced key" + return await super().connect_service_client(config) + + +async def test_client_plugin(client: Client, env: WorkflowEnvironment): + if env.supports_time_skipping: + pytest.skip("Client connect is only designed for local") + + config = client.config() + config["plugins"] = [MyPlugin()] + new_client = Client(**config) + assert new_client.namespace == "replaced_namespace" + + new_client = await Client.connect( + client.service_client.config.target_host, plugins=[MyPlugin()] + ) + assert new_client.service_client.config.api_key == "replaced key" diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index f1be74b4d..7a72729c7 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -2,13 +2,13 @@ import asyncio import concurrent.futures -import sys import uuid from datetime import timedelta from typing import Any, Awaitable, Callable, Optional, Sequence from urllib.request import urlopen import temporalio.api.enums.v1 +import temporalio.client import temporalio.worker._worker from temporalio import activity, workflow from temporalio.api.workflowservice.v1 import ( @@ -19,7 +19,11 @@ SetWorkerDeploymentRampingVersionRequest, SetWorkerDeploymentRampingVersionResponse, ) -from temporalio.client import BuildIdOpAddNewDefault, Client, TaskReachabilityType +from temporalio.client import ( + BuildIdOpAddNewDefault, + Client, + TaskReachabilityType, +) from temporalio.common import PinnedVersioningOverride, RawValue, VersioningBehavior from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig from temporalio.service import RPCError @@ -38,6 +42,7 @@ SlotReleaseContext, SlotReserveContext, Worker, + WorkerConfig, WorkerDeploymentConfig, WorkerDeploymentVersion, WorkerTuner, @@ -1184,3 +1189,46 @@ def shutdown(self) -> None: if self.next_exception_task: self.next_exception_task.cancel() setattr(self.worker._bridge_worker, self.attr, self.orig_poll_call) + + +class MyCombinedPlugin(temporalio.client.Plugin, temporalio.worker.Plugin): + def on_create_worker(self, config: WorkerConfig) -> WorkerConfig: + print("Create worker combined plugin") + config["task_queue"] = "combined" + return super().on_create_worker(config) + + +class MyWorkerPlugin(temporalio.worker.Plugin): + def on_create_worker(self, config: WorkerConfig) -> WorkerConfig: + print("Create worker worker plugin") + config["task_queue"] = "replaced_queue" + return super().on_create_worker(config) + + async def run_worker(self, worker: Worker) -> None: + await super().run_worker(worker) + + +async def test_worker_plugin(client: Client) -> None: + worker = Worker( + client, + task_queue="queue", + activities=[never_run_activity], + plugins=[MyWorkerPlugin()], + ) + assert worker.config().get("task_queue") == "replaced_queue" + + # Test client plugin propagation to worker plugins + new_config = client.config() + new_config["plugins"] = [MyCombinedPlugin()] + client = Client(**new_config) + worker = Worker(client, task_queue="queue", activities=[never_run_activity]) + assert worker.config().get("task_queue") == "combined" + + # Test both. Client propagated plugins are called first, so the worker plugin overrides in this case + worker = Worker( + client, + task_queue="queue", + activities=[never_run_activity], + plugins=[MyWorkerPlugin()], + ) + assert worker.config().get("task_queue") == "replaced_queue" From 66b031c3d29f4bf60475f1ca119b72a8729b5a38 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Thu, 10 Jul 2025 08:53:21 -0700 Subject: [PATCH 2/8] Ensure plugin modification happen before any other initialization --- temporalio/client.py | 14 +-- temporalio/worker/_worker.py | 200 +++++++++++++++++++---------------- 2 files changed, 114 insertions(+), 100 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 2042c9b4d..145537af7 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -107,6 +107,7 @@ async def connect( namespace: str = "default", api_key: Optional[str] = None, data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default, + plugins: Sequence[Plugin] = [], interceptors: Sequence[Interceptor] = [], default_workflow_query_reject_condition: Optional[ temporalio.common.QueryRejectCondition @@ -120,7 +121,6 @@ async def connect( runtime: Optional[temporalio.runtime.Runtime] = None, http_connect_proxy_config: Optional[HttpConnectProxyConfig] = None, header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC, - plugins: Sequence[Plugin] = [], ) -> Client: """Connect to a Temporal server. @@ -202,22 +202,17 @@ def __init__( *, namespace: str = "default", data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default, + plugins: Sequence[Plugin] = [], interceptors: Sequence[Interceptor] = [], default_workflow_query_reject_condition: Optional[ temporalio.common.QueryRejectCondition ] = None, header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC, - plugins: Sequence[Plugin] = [], ): """Create a Temporal client from a service client. See :py:meth:`connect` for details on the parameters. """ - # Iterate over interceptors in reverse building the impl - self._impl: OutboundInterceptor = _ClientImpl(self) - for interceptor in reversed(list(interceptors)): - self._impl = interceptor.intercept_client(self._impl) - # Store the config for tracking config = ClientConfig( service_client=service_client, @@ -235,6 +230,11 @@ def __init__( self._config = root_plugin.on_create_client(config) + # Iterate over interceptors in reverse building the impl + self._impl: OutboundInterceptor = _ClientImpl(self) + for interceptor in reversed(list(interceptors)): + self._impl = interceptor.intercept_client(self._impl) + def config(self) -> ClientConfig: """Config, as a dictionary, used to create this client. diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 3cf1d62bc..bc9d07353 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -138,6 +138,7 @@ def __init__( nexus_task_executor: Optional[concurrent.futures.Executor] = None, workflow_runner: WorkflowRunner = SandboxedWorkflowRunner(), unsandboxed_workflow_runner: WorkflowRunner = UnsandboxedWorkflowRunner(), + plugins: Sequence[Plugin] = [], interceptors: Sequence[Interceptor] = [], build_id: Optional[str] = None, identity: Optional[str] = None, @@ -173,7 +174,6 @@ def __init__( nexus_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum( maximum=5 ), - plugins: Sequence[Plugin] = [], ) -> None: """Create a worker to process workflows and/or activities. @@ -342,42 +342,11 @@ def __init__( nexus_task_poller_behavior: Specify the behavior of Nexus task polling. Defaults to a 5-poller maximum. """ - # TODO(nexus-preview): max_concurrent_nexus_tasks / tuner support - if not (activities or nexus_service_handlers or workflows): - raise ValueError( - "At least one activity, Nexus service, or workflow must be specified" - ) - if use_worker_versioning and not build_id: - raise ValueError( - "build_id must be specified when use_worker_versioning is True" - ) - if deployment_config and (build_id or use_worker_versioning): - raise ValueError( - "deployment_config cannot be used with build_id or use_worker_versioning" - ) - - # Prepend applicable client interceptors to the given ones - client_config = client.config() - interceptors_from_client = cast( - List[Interceptor], - [i for i in client_config["interceptors"] if isinstance(i, Interceptor)], - ) - interceptors = interceptors_from_client + list(interceptors) - - plugins_from_client = cast( - List[Plugin], [p for p in client_config["plugins"] if isinstance(p, Plugin)] - ) - plugins = plugins_from_client + list(plugins) - print(plugins) - - # Extract the bridge service client - bridge_client = _extract_bridge_client_for_worker(client) - - # Store the config for tracking config = WorkerConfig( client=client, task_queue=task_queue, activities=activities, + nexus_service_handlers=nexus_service_handlers, workflows=workflows, activity_executor=activity_executor, workflow_task_executor=workflow_task_executor, @@ -391,6 +360,7 @@ def __init__( max_concurrent_workflow_tasks=max_concurrent_workflow_tasks, max_concurrent_activities=max_concurrent_activities, max_concurrent_local_activities=max_concurrent_local_activities, + tuner=tuner, max_concurrent_workflow_task_polls=max_concurrent_workflow_task_polls, nonsticky_to_sticky_poll_ratio=nonsticky_to_sticky_poll_ratio, max_concurrent_activity_task_polls=max_concurrent_activity_task_polls, @@ -408,14 +378,54 @@ def __init__( on_fatal_error=on_fatal_error, use_worker_versioning=use_worker_versioning, disable_safe_workflow_eviction=disable_safe_workflow_eviction, + deployment_config=deployment_config, + workflow_task_poller_behavior=workflow_task_poller_behavior, + activity_task_poller_behavior=activity_task_poller_behavior, + nexus_task_poller_behavior=nexus_task_poller_behavior, ) + plugins_from_client = cast( + List[Plugin], [p for p in client.config()["plugins"] if isinstance(p, Plugin)] + ) + plugins = plugins_from_client + list(plugins) + root_plugin: Plugin = _RootPlugin() for plugin in reversed(list(plugins)): root_plugin = plugin.init_worker_plugin(root_plugin) - self._config = root_plugin.on_create_worker(config) + config = root_plugin.on_create_worker(config) self._plugin = root_plugin + self._init_from_config(config) + + def _init_from_config(self, config: WorkerConfig): + """Handles post plugin initialization to ensure original arguments are not used""" + self._config = config + + # TODO(nexus-preview): max_concurrent_nexus_tasks / tuner support + if not (config["activities"] or config["nexus_service_handlers"] or config["workflows"]): + raise ValueError( + "At least one activity, Nexus service, or workflow must be specified" + ) + if config["use_worker_versioning"] and not config["build_id"]: + raise ValueError( + "build_id must be specified when use_worker_versioning is True" + ) + if config["deployment_config"] and (config["build_id"] or config["use_worker_versioning"]): + raise ValueError( + "deployment_config cannot be used with build_id or use_worker_versioning" + ) + + # Prepend applicable client interceptors to the given ones + client_config = config["client"].config() + interceptors_from_client = cast( + List[Interceptor], + [i for i in client_config["interceptors"] if isinstance(i, Interceptor)], + ) + interceptors = interceptors_from_client + list(config["interceptors"]) + + # Extract the bridge service client + bridge_client = _extract_bridge_client_for_worker(config["client"]) + self._started = False self._shutdown_event = asyncio.Event() self._shutdown_complete_event = asyncio.Event() @@ -427,14 +437,14 @@ def __init__( self._runtime = ( bridge_client.config.runtime or temporalio.runtime.Runtime.default() ) - if activities: + if config["activities"]: # Issue warning here if executor max_workers is lower than max # concurrent activities. We do this here instead of in # _ActivityWorker so the stack level is predictable. - max_workers = getattr(activity_executor, "_max_workers", None) - concurrent_activities = max_concurrent_activities - if tuner and tuner._get_activities_max(): - concurrent_activities = tuner._get_activities_max() + max_workers = getattr(config["activity_executor"], "_max_workers", None) + concurrent_activities = config["max_concurrent_activities"] + if config["tuner"] and config["tuner"]._get_activities_max(): + concurrent_activities = config["tuner"]._get_activities_max() if isinstance(max_workers, int) and max_workers < ( concurrent_activities or 0 ): @@ -446,10 +456,10 @@ def __init__( self._activity_worker = _ActivityWorker( bridge_worker=lambda: self._bridge_worker, - task_queue=task_queue, - activities=activities, - activity_executor=activity_executor, - shared_state_manager=shared_state_manager, + task_queue=config["task_queue"], + activities=config["activities"], + activity_executor=config["activity_executor"], + shared_state_manager=config["shared_state_manager"], data_converter=client_config["data_converter"], interceptors=interceptors, metric_meter=self._runtime.metric_meter, @@ -457,23 +467,23 @@ def __init__( == HeaderCodecBehavior.CODEC, ) self._nexus_worker: Optional[_NexusWorker] = None - if nexus_service_handlers: + if config["nexus_service_handlers"]: self._nexus_worker = _NexusWorker( bridge_worker=lambda: self._bridge_worker, - client=client, - task_queue=task_queue, - service_handlers=nexus_service_handlers, + client=config["client"], + task_queue=config["task_queue"], + service_handlers=config["nexus_service_handlers"], data_converter=client_config["data_converter"], interceptors=interceptors, metric_meter=self._runtime.metric_meter, - executor=nexus_task_executor, + executor=config["nexus_task_executor"], ) self._workflow_worker: Optional[_WorkflowWorker] = None - if workflows: + if config["workflows"]: should_enforce_versioning_behavior = ( - deployment_config is not None - and deployment_config.use_worker_versioning - and deployment_config.default_versioning_behavior + config["deployment_config"] is not None + and config["deployment_config"].use_worker_versioning + and config["deployment_config"].default_versioning_behavior == temporalio.common.VersioningBehavior.UNSPECIFIED ) @@ -487,32 +497,33 @@ def check_activity(activity): self._workflow_worker = _WorkflowWorker( bridge_worker=lambda: self._bridge_worker, - namespace=client.namespace, - task_queue=task_queue, - workflows=workflows, - workflow_task_executor=workflow_task_executor, - max_concurrent_workflow_tasks=max_concurrent_workflow_tasks, - workflow_runner=workflow_runner, - unsandboxed_workflow_runner=unsandboxed_workflow_runner, + namespace=config["client"].namespace, + task_queue=config["task_queue"], + workflows=config["workflows"], + workflow_task_executor=config["workflow_task_executor"], + max_concurrent_workflow_tasks=config["max_concurrent_workflow_tasks"], + workflow_runner=config["workflow_runner"], + unsandboxed_workflow_runner=config["unsandboxed_workflow_runner"], data_converter=client_config["data_converter"], interceptors=interceptors, - workflow_failure_exception_types=workflow_failure_exception_types, - debug_mode=debug_mode, - disable_eager_activity_execution=disable_eager_activity_execution, + workflow_failure_exception_types=config["workflow_failure_exception_types"], + debug_mode=config["debug_mode"], + disable_eager_activity_execution=config["disable_eager_activity_execution"], metric_meter=self._runtime.metric_meter, on_eviction_hook=None, - disable_safe_eviction=disable_safe_workflow_eviction, + disable_safe_eviction=config["disable_safe_workflow_eviction"], should_enforce_versioning_behavior=should_enforce_versioning_behavior, assert_local_activity_valid=check_activity, encode_headers=client_config["header_codec_behavior"] != HeaderCodecBehavior.NO_CODEC, ) - if tuner is not None: + tuner = config["tuner"] + if config["tuner"] is not None: if ( - max_concurrent_workflow_tasks - or max_concurrent_activities - or max_concurrent_local_activities + config["max_concurrent_workflow_tasks"] + or config["max_concurrent_activities"] + or config["max_concurrent_local_activities"] ): raise ValueError( "Cannot specify max_concurrent_workflow_tasks, max_concurrent_activities, " @@ -520,38 +531,40 @@ def check_activity(activity): ) else: tuner = WorkerTuner.create_fixed( - workflow_slots=max_concurrent_workflow_tasks, - activity_slots=max_concurrent_activities, - local_activity_slots=max_concurrent_local_activities, + workflow_slots=config["max_concurrent_workflow_tasks"], + activity_slots=config["max_concurrent_activities"], + local_activity_slots=config["max_concurrent_local_activities"], ) bridge_tuner = tuner._to_bridge_tuner() versioning_strategy: temporalio.bridge.worker.WorkerVersioningStrategy - if deployment_config: + if config["deployment_config"]: versioning_strategy = ( - deployment_config._to_bridge_worker_deployment_options() + config["deployment_config"]._to_bridge_worker_deployment_options() ) - elif use_worker_versioning: - build_id = build_id or load_default_build_id() + elif config["use_worker_versioning"]: + build_id = config["build_id"] or load_default_build_id() versioning_strategy = ( temporalio.bridge.worker.WorkerVersioningStrategyLegacyBuildIdBased( build_id_with_versioning=build_id ) ) else: - build_id = build_id or load_default_build_id() + build_id = config["build_id"] or load_default_build_id() versioning_strategy = temporalio.bridge.worker.WorkerVersioningStrategyNone( build_id_no_versioning=build_id ) - if max_concurrent_workflow_task_polls: + workflow_task_poller_behavior = config["workflow_task_poller_behavior"] + if config["max_concurrent_workflow_task_polls"]: workflow_task_poller_behavior = PollerBehaviorSimpleMaximum( - maximum=max_concurrent_workflow_task_polls + maximum=config["max_concurrent_workflow_task_polls"] ) - if max_concurrent_activity_task_polls: + activity_task_poller_behavior = config["activity_task_poller_behavior"] + if config["max_concurrent_activity_task_polls"]: activity_task_poller_behavior = PollerBehaviorSimpleMaximum( - maximum=max_concurrent_activity_task_polls + maximum=config["max_concurrent_activity_task_polls"] ) # Create bridge worker last. We have empirically observed that if it is @@ -564,29 +577,29 @@ def check_activity(activity): self._bridge_worker = temporalio.bridge.worker.Worker.create( bridge_client._bridge_client, temporalio.bridge.worker.WorkerConfig( - namespace=client.namespace, - task_queue=task_queue, - identity_override=identity, - max_cached_workflows=max_cached_workflows, + namespace=config["client"].namespace, + task_queue=config["task_queue"], + identity_override=config["identity"], + max_cached_workflows=config["max_cached_workflows"], tuner=bridge_tuner, - nonsticky_to_sticky_poll_ratio=nonsticky_to_sticky_poll_ratio, + nonsticky_to_sticky_poll_ratio=config["nonsticky_to_sticky_poll_ratio"], # We have to disable remote activities if a user asks _or_ if we # are not running an activity worker at all. Otherwise shutdown # will not proceed properly. - no_remote_activities=no_remote_activities or not activities, + no_remote_activities=config["no_remote_activities"] or not config["activities"], sticky_queue_schedule_to_start_timeout_millis=int( - 1000 * sticky_queue_schedule_to_start_timeout.total_seconds() + 1000 * config["sticky_queue_schedule_to_start_timeout"].total_seconds() ), max_heartbeat_throttle_interval_millis=int( - 1000 * max_heartbeat_throttle_interval.total_seconds() + 1000 * config["max_heartbeat_throttle_interval"].total_seconds() ), default_heartbeat_throttle_interval_millis=int( - 1000 * default_heartbeat_throttle_interval.total_seconds() + 1000 * config["default_heartbeat_throttle_interval"].total_seconds() ), - max_activities_per_second=max_activities_per_second, - max_task_queue_activities_per_second=max_task_queue_activities_per_second, + max_activities_per_second=config["max_activities_per_second"], + max_task_queue_activities_per_second=config["max_task_queue_activities_per_second"], graceful_shutdown_period_millis=int( - 1000 * graceful_shutdown_timeout.total_seconds() + 1000 * config["graceful_shutdown_timeout"].total_seconds() ), # Need to tell core whether we want to consider all # non-determinism exceptions as workflow fail, and whether we do @@ -601,7 +614,7 @@ def check_activity(activity): versioning_strategy=versioning_strategy, workflow_task_poller_behavior=workflow_task_poller_behavior._to_bridge(), activity_task_poller_behavior=activity_task_poller_behavior._to_bridge(), - nexus_task_poller_behavior=nexus_task_poller_behavior._to_bridge(), + nexus_task_poller_behavior=config["nexus_task_poller_behavior"]._to_bridge(), ), ) @@ -852,6 +865,7 @@ class WorkerConfig(TypedDict, total=False): client: temporalio.client.Client task_queue: str activities: Sequence[Callable] + nexus_service_handlers: Sequence[Any] workflows: Sequence[Type] activity_executor: Optional[concurrent.futures.Executor] workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor] @@ -886,7 +900,7 @@ class WorkerConfig(TypedDict, total=False): deployment_config: Optional[WorkerDeploymentConfig] workflow_task_poller_behavior: PollerBehavior activity_task_poller_behavior: PollerBehavior - + nexus_task_poller_behavior: PollerBehavior @dataclass class WorkerDeploymentConfig: From 381e0cc651aee340560f225d2ca2817bf6503ae8 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Thu, 10 Jul 2025 09:41:00 -0700 Subject: [PATCH 3/8] Format --- temporalio/worker/_worker.py | 42 +++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index bc9d07353..5b81e9186 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -385,7 +385,8 @@ def __init__( ) plugins_from_client = cast( - List[Plugin], [p for p in client.config()["plugins"] if isinstance(p, Plugin)] + List[Plugin], + [p for p in client.config()["plugins"] if isinstance(p, Plugin)], ) plugins = plugins_from_client + list(plugins) @@ -402,7 +403,11 @@ def _init_from_config(self, config: WorkerConfig): self._config = config # TODO(nexus-preview): max_concurrent_nexus_tasks / tuner support - if not (config["activities"] or config["nexus_service_handlers"] or config["workflows"]): + if not ( + config["activities"] + or config["nexus_service_handlers"] + or config["workflows"] + ): raise ValueError( "At least one activity, Nexus service, or workflow must be specified" ) @@ -410,7 +415,9 @@ def _init_from_config(self, config: WorkerConfig): raise ValueError( "build_id must be specified when use_worker_versioning is True" ) - if config["deployment_config"] and (config["build_id"] or config["use_worker_versioning"]): + if config["deployment_config"] and ( + config["build_id"] or config["use_worker_versioning"] + ): raise ValueError( "deployment_config cannot be used with build_id or use_worker_versioning" ) @@ -506,9 +513,13 @@ def check_activity(activity): unsandboxed_workflow_runner=config["unsandboxed_workflow_runner"], data_converter=client_config["data_converter"], interceptors=interceptors, - workflow_failure_exception_types=config["workflow_failure_exception_types"], + workflow_failure_exception_types=config[ + "workflow_failure_exception_types" + ], debug_mode=config["debug_mode"], - disable_eager_activity_execution=config["disable_eager_activity_execution"], + disable_eager_activity_execution=config[ + "disable_eager_activity_execution" + ], metric_meter=self._runtime.metric_meter, on_eviction_hook=None, disable_safe_eviction=config["disable_safe_workflow_eviction"], @@ -540,9 +551,9 @@ def check_activity(activity): versioning_strategy: temporalio.bridge.worker.WorkerVersioningStrategy if config["deployment_config"]: - versioning_strategy = ( - config["deployment_config"]._to_bridge_worker_deployment_options() - ) + versioning_strategy = config[ + "deployment_config" + ]._to_bridge_worker_deployment_options() elif config["use_worker_versioning"]: build_id = config["build_id"] or load_default_build_id() versioning_strategy = ( @@ -586,9 +597,11 @@ def check_activity(activity): # We have to disable remote activities if a user asks _or_ if we # are not running an activity worker at all. Otherwise shutdown # will not proceed properly. - no_remote_activities=config["no_remote_activities"] or not config["activities"], + no_remote_activities=config["no_remote_activities"] + or not config["activities"], sticky_queue_schedule_to_start_timeout_millis=int( - 1000 * config["sticky_queue_schedule_to_start_timeout"].total_seconds() + 1000 + * config["sticky_queue_schedule_to_start_timeout"].total_seconds() ), max_heartbeat_throttle_interval_millis=int( 1000 * config["max_heartbeat_throttle_interval"].total_seconds() @@ -597,7 +610,9 @@ def check_activity(activity): 1000 * config["default_heartbeat_throttle_interval"].total_seconds() ), max_activities_per_second=config["max_activities_per_second"], - max_task_queue_activities_per_second=config["max_task_queue_activities_per_second"], + max_task_queue_activities_per_second=config[ + "max_task_queue_activities_per_second" + ], graceful_shutdown_period_millis=int( 1000 * config["graceful_shutdown_timeout"].total_seconds() ), @@ -614,7 +629,9 @@ def check_activity(activity): versioning_strategy=versioning_strategy, workflow_task_poller_behavior=workflow_task_poller_behavior._to_bridge(), activity_task_poller_behavior=activity_task_poller_behavior._to_bridge(), - nexus_task_poller_behavior=config["nexus_task_poller_behavior"]._to_bridge(), + nexus_task_poller_behavior=config[ + "nexus_task_poller_behavior" + ]._to_bridge(), ), ) @@ -902,6 +919,7 @@ class WorkerConfig(TypedDict, total=False): activity_task_poller_behavior: PollerBehavior nexus_task_poller_behavior: PollerBehavior + @dataclass class WorkerDeploymentConfig: """Options for configuring the Worker Versioning feature. From 0db686d27a3075b2987e95589f5fe2e5dde972a4 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Thu, 10 Jul 2025 09:47:12 -0700 Subject: [PATCH 4/8] Use local tuner for type inference --- temporalio/worker/_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 5b81e9186..fca8a35a4 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -530,7 +530,7 @@ def check_activity(activity): ) tuner = config["tuner"] - if config["tuner"] is not None: + if tuner is not None: if ( config["max_concurrent_workflow_tasks"] or config["max_concurrent_activities"] From 6bcd2fd00ac22cbc2e48b55b64a62ea3bd5baa7c Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Fri, 11 Jul 2025 10:39:49 -0700 Subject: [PATCH 5/8] Format --- tests/test_plugins.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 tests/test_plugins.py diff --git a/tests/test_plugins.py b/tests/test_plugins.py new file mode 100644 index 000000000..e69de29bb From b5db95985f02145b73be2b535ef4a995d73c40bf Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Fri, 11 Jul 2025 10:52:47 -0700 Subject: [PATCH 6/8] Some PR updates, refactoring tests, added name --- temporalio/client.py | 10 +++- temporalio/worker/_worker.py | 8 +++ tests/test_client.py | 27 --------- tests/test_plugins.py | 112 +++++++++++++++++++++++++++++++++++ tests/worker/test_worker.py | 43 -------------- 5 files changed, 128 insertions(+), 72 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 145537af7..d32973997 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -228,11 +228,14 @@ def __init__( for plugin in reversed(list(plugins)): root_plugin = plugin.init_client_plugin(root_plugin) - self._config = root_plugin.on_create_client(config) + self._init_from_config(root_plugin.on_create_client(config)) + + def _init_from_config(self, config: ClientConfig): + self._config = config # Iterate over interceptors in reverse building the impl self._impl: OutboundInterceptor = _ClientImpl(self) - for interceptor in reversed(list(interceptors)): + for interceptor in reversed(list(self._config["interceptors"])): self._impl = interceptor.intercept_client(self._impl) def config(self) -> ClientConfig: @@ -7388,6 +7391,9 @@ async def _decode_user_metadata( class Plugin: + def name(self) -> str: + return type(self).__module__ + "." + type(self).__qualname__ + def init_client_plugin(self, next: Plugin) -> Plugin: self.next_client_plugin = next return self diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index fca8a35a4..94342d0a1 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -97,6 +97,9 @@ def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior: class Plugin: + def name(self) -> str: + return type(self).__module__ + "." + type(self).__qualname__ + def init_worker_plugin(self, next: Plugin) -> Plugin: self.next_worker_plugin = next return self @@ -388,6 +391,11 @@ def __init__( List[Plugin], [p for p in client.config()["plugins"] if isinstance(p, Plugin)], ) + for client_plugin in plugins_from_client: + if type(client_plugin) in [type(p) for p in plugins]: + warnings.warn( + f"The same plugin type {type(client_plugin)} is present from both client and worker. It may run twice and may not be the intended behavior." + ) plugins = plugins_from_client + list(plugins) root_plugin: Plugin = _RootPlugin() diff --git a/tests/test_client.py b/tests/test_client.py index 0e718e6be..77e1ad5ab 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1501,30 +1501,3 @@ async def test_cloud_client_simple(): GetNamespaceRequest(namespace=os.environ["TEMPORAL_CLIENT_CLOUD_NAMESPACE"]) ) assert os.environ["TEMPORAL_CLIENT_CLOUD_NAMESPACE"] == result.namespace.namespace - - -class MyPlugin(Plugin): - def on_create_client(self, config: ClientConfig) -> ClientConfig: - config["namespace"] = "replaced_namespace" - return super().on_create_client(config) - - async def connect_service_client( - self, config: temporalio.service.ConnectConfig - ) -> temporalio.service.ServiceClient: - config.api_key = "replaced key" - return await super().connect_service_client(config) - - -async def test_client_plugin(client: Client, env: WorkflowEnvironment): - if env.supports_time_skipping: - pytest.skip("Client connect is only designed for local") - - config = client.config() - config["plugins"] = [MyPlugin()] - new_client = Client(**config) - assert new_client.namespace == "replaced_namespace" - - new_client = await Client.connect( - client.service_client.config.target_host, plugins=[MyPlugin()] - ) - assert new_client.service_client.config.api_key == "replaced key" diff --git a/tests/test_plugins.py b/tests/test_plugins.py index e69de29bb..f2ea57f9c 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -0,0 +1,112 @@ +import warnings + +import pytest + +import temporalio.client +import temporalio.worker +from temporalio.client import Client, ClientConfig, OutboundInterceptor, Plugin +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker, WorkerConfig +from tests.worker.test_worker import never_run_activity + + +class TestClientInterceptor(temporalio.client.Interceptor): + intercepted = False + + def intercept_client(self, next: OutboundInterceptor) -> OutboundInterceptor: + self.intercepted = True + return super().intercept_client(next) + + +class MyClientPlugin(Plugin): + def __init__(self): + self.interceptor = TestClientInterceptor() + + def on_create_client(self, config: ClientConfig) -> ClientConfig: + config["namespace"] = "replaced_namespace" + config["interceptors"] = list(config.get("interceptors") or []) + [ + self.interceptor + ] + return super().on_create_client(config) + + async def connect_service_client( + self, config: temporalio.service.ConnectConfig + ) -> temporalio.service.ServiceClient: + config.api_key = "replaced key" + return await super().connect_service_client(config) + + +async def test_client_plugin(client: Client, env: WorkflowEnvironment): + if env.supports_time_skipping: + pytest.skip("Client connect is only designed for local") + + plugin = MyClientPlugin() + config = client.config() + config["plugins"] = [plugin] + new_client = Client(**config) + assert new_client.namespace == "replaced_namespace" + assert plugin.interceptor.intercepted + assert plugin.name() == "tests.test_plugins.MyClientPlugin" + + new_client = await Client.connect( + client.service_client.config.target_host, plugins=[MyClientPlugin()] + ) + assert new_client.service_client.config.api_key == "replaced key" + + +class MyCombinedPlugin(temporalio.client.Plugin, temporalio.worker.Plugin): + def on_create_worker(self, config: WorkerConfig) -> WorkerConfig: + config["task_queue"] = "combined" + return super().on_create_worker(config) + + +class MyWorkerPlugin(temporalio.worker.Plugin): + def on_create_worker(self, config: WorkerConfig) -> WorkerConfig: + config["task_queue"] = "replaced_queue" + return super().on_create_worker(config) + + async def run_worker(self, worker: Worker) -> None: + await super().run_worker(worker) + + +async def test_worker_plugin_basic_config(client: Client) -> None: + worker = Worker( + client, + task_queue="queue", + activities=[never_run_activity], + plugins=[MyWorkerPlugin()], + ) + assert worker.config().get("task_queue") == "replaced_queue" + + # Test client plugin propagation to worker plugins + new_config = client.config() + new_config["plugins"] = [MyCombinedPlugin()] + client = Client(**new_config) + worker = Worker(client, task_queue="queue", activities=[never_run_activity]) + assert worker.config().get("task_queue") == "combined" + + # Test both. Client propagated plugins are called first, so the worker plugin overrides in this case + worker = Worker( + client, + task_queue="queue", + activities=[never_run_activity], + plugins=[MyWorkerPlugin()], + ) + assert worker.config().get("task_queue") == "replaced_queue" + + +async def test_worker_duplicated_plugin(client: Client) -> None: + new_config = client.config() + new_config["plugins"] = [MyCombinedPlugin()] + client = Client(**new_config) + + with warnings.catch_warnings(record=True) as warning_list: + worker = Worker( + client, + task_queue="queue", + activities=[never_run_activity], + plugins=[MyCombinedPlugin()], + ) + + assert len(warning_list) == 1 + assert "The same plugin type" in str(warning_list[0].message) diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index 7a72729c7..7c08cfa37 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -1189,46 +1189,3 @@ def shutdown(self) -> None: if self.next_exception_task: self.next_exception_task.cancel() setattr(self.worker._bridge_worker, self.attr, self.orig_poll_call) - - -class MyCombinedPlugin(temporalio.client.Plugin, temporalio.worker.Plugin): - def on_create_worker(self, config: WorkerConfig) -> WorkerConfig: - print("Create worker combined plugin") - config["task_queue"] = "combined" - return super().on_create_worker(config) - - -class MyWorkerPlugin(temporalio.worker.Plugin): - def on_create_worker(self, config: WorkerConfig) -> WorkerConfig: - print("Create worker worker plugin") - config["task_queue"] = "replaced_queue" - return super().on_create_worker(config) - - async def run_worker(self, worker: Worker) -> None: - await super().run_worker(worker) - - -async def test_worker_plugin(client: Client) -> None: - worker = Worker( - client, - task_queue="queue", - activities=[never_run_activity], - plugins=[MyWorkerPlugin()], - ) - assert worker.config().get("task_queue") == "replaced_queue" - - # Test client plugin propagation to worker plugins - new_config = client.config() - new_config["plugins"] = [MyCombinedPlugin()] - client = Client(**new_config) - worker = Worker(client, task_queue="queue", activities=[never_run_activity]) - assert worker.config().get("task_queue") == "combined" - - # Test both. Client propagated plugins are called first, so the worker plugin overrides in this case - worker = Worker( - client, - task_queue="queue", - activities=[never_run_activity], - plugins=[MyWorkerPlugin()], - ) - assert worker.config().get("task_queue") == "replaced_queue" From 759e65048dab1302e289007fb26f1b2cb82b89fc Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Fri, 11 Jul 2025 11:04:49 -0700 Subject: [PATCH 7/8] Added docstrings --- temporalio/client.py | 59 ++++++++++++++++++++++++++++++++++++ temporalio/worker/_worker.py | 50 ++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+) diff --git a/temporalio/client.py b/temporalio/client.py index d32973997..94ff35fbd 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -133,6 +133,14 @@ async def connect( metadata doesn't already have an "authorization" key. data_converter: Data converter to use for all data conversions to/from payloads. + plugins: Set of plugins that are chained together to allow + intercepting and modifying client creation and service connection. + The earlier plugins wrap the later ones. + + Any plugins that also implement + :py:class:`temporalio.worker.Plugin` will be used as worker + plugins too so they should not be given when creating a + worker. interceptors: Set of interceptors that are chained together to allow intercepting of client calls. The earlier interceptors wrap the later ones. @@ -7391,19 +7399,70 @@ async def _decode_user_metadata( class Plugin: + """Base class for client plugins that can intercept and modify client behavior. + + Plugins allow customization of client creation and service connection processes + through a chain of responsibility pattern. Each plugin can modify the client + configuration or intercept service client connections. + + If the plugin is also a temporalio.worker.Plugin, it will additionally be propagated as a worker plugin. + You should likley not also provide it to the worker as that will result in the plugin being applied twice. + """ + def name(self) -> str: + """Get the name of this plugin. Can be overridden if desired to provide a more appropriate name. + + Returns: + The fully qualified name of the plugin class (module.classname). + """ return type(self).__module__ + "." + type(self).__qualname__ def init_client_plugin(self, next: Plugin) -> Plugin: + """Initialize this plugin in the plugin chain. + + This method sets up the chain of responsibility pattern by storing a reference + to the next plugin in the chain. It is called during client creation to build + the plugin chain. + + Args: + next: The next plugin in the chain to delegate to. + + Returns: + This plugin instance for method chaining. + """ self.next_client_plugin = next return self def on_create_client(self, config: ClientConfig) -> ClientConfig: + """Hook called when creating a client to allow modification of configuration. + + This method is called during client creation and allows plugins to modify + the client configuration before the client is fully initialized. Plugins + can add interceptors, modify connection parameters, or change other settings. + + Args: + config: The client configuration dictionary to potentially modify. + + Returns: + The modified client configuration. + """ return self.next_client_plugin.on_create_client(config) async def connect_service_client( self, config: temporalio.service.ConnectConfig ) -> temporalio.service.ServiceClient: + """Hook called when connecting to the Temporal service. + + This method is called during service client connection and allows plugins + to intercept or modify the connection process. Plugins can modify connection + parameters, add authentication, or provide custom connection logic. + + Args: + config: The service connection configuration. + + Returns: + The connected service client. + """ return await self.next_client_plugin.connect_service_client(config) diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 94342d0a1..30774c390 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -97,17 +97,63 @@ def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior: class Plugin: + """Base class for worker plugins that can intercept and modify worker behavior. + + Plugins allow customization of worker creation and execution processes + through a chain of responsibility pattern. Each plugin can modify the worker + configuration or intercept worker execution. + """ + def name(self) -> str: + """Get the qualified name of this plugin. Can be overridden if desired to provide a more appropriate name. + + Returns: + The fully qualified name of the plugin class (module.classname). + """ return type(self).__module__ + "." + type(self).__qualname__ def init_worker_plugin(self, next: Plugin) -> Plugin: + """Initialize this plugin in the plugin chain. + + This method sets up the chain of responsibility pattern by storing a reference + to the next plugin in the chain. It is called during worker creation to build + the plugin chain. + + Args: + next: The next plugin in the chain to delegate to. + + Returns: + This plugin instance for method chaining. + """ self.next_worker_plugin = next return self def on_create_worker(self, config: WorkerConfig) -> WorkerConfig: + """Hook called when creating a worker to allow modification of configuration. + + This method is called during worker creation and allows plugins to modify + the worker configuration before the worker is fully initialized. Plugins + can modify task queue names, adjust concurrency settings, add interceptors, + or change other worker settings. + + Args: + config: The worker configuration dictionary to potentially modify. + + Returns: + The modified worker configuration. + """ return self.next_worker_plugin.on_create_worker(config) async def run_worker(self, worker: Worker) -> None: + """Hook called when running a worker to allow interception of execution. + + This method is called when the worker is started and allows plugins to + intercept or wrap the worker execution. Plugins can add monitoring, + custom lifecycle management, or other execution-time behavior. + + Args: + worker: The worker instance to run. + """ await self.next_worker_plugin.run_worker(worker) @@ -224,6 +270,10 @@ def __init__( workflow_runner: Runner for workflows. unsandboxed_workflow_runner: Runner for workflows that opt-out of sandboxing. + plugins: Collection of plugins for this worker. Any plugins already + on the client that also implement :py:class:`temporalio.worker.Plugin` are + prepended to this list and should not be explicitly given here + to avoid running the plugin twice. interceptors: Collection of interceptors for this worker. Any interceptors already on the client that also implement :py:class:`Interceptor` are prepended to this list and should From e1dfe5cd38aa5ae37e1793c0de5005ae148d31d0 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Fri, 11 Jul 2025 11:15:05 -0700 Subject: [PATCH 8/8] Update readme --- README.md | 135 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) diff --git a/README.md b/README.md index a2b06271a..689a9133e 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,9 @@ informal introduction to the features and their implementation. - [Worker Shutdown](#worker-shutdown) - [Testing](#testing-1) - [Nexus](#nexus) + - [Plugins](#plugins) + - [Client Plugins](#client-plugins) + - [Worker Plugins](#worker-plugins) - [Workflow Replay](#workflow-replay) - [Observability](#observability) - [Metrics](#metrics) @@ -1416,6 +1419,138 @@ https://github.com/temporalio/samples-python/tree/nexus/hello_nexus). ``` +### Plugins + +Plugins provide a way to extend and customize the behavior of Temporal clients and workers through a chain of +responsibility pattern. They allow you to intercept and modify client creation, service connections, worker +configuration, and worker execution. Common customizations may include but are not limited to: + +1. DataConverter +2. Activities +3. Workflows +4. Interceptors + +A single plugin class can implement both client and worker plugin interfaces to share common logic between both +contexts. When used with a client, it will automatically be propagated to any workers created with that client. + +#### Client Plugins + +Client plugins can intercept and modify client configuration and service connections. They are useful for adding +authentication, modifying connection parameters, or adding custom behavior during client creation. + +Here's an example of a client plugin that adds custom authentication: + +```python +from temporalio.client import Plugin, ClientConfig +import temporalio.service + +class AuthenticationPlugin(Plugin): + def __init__(self, api_key: str): + self.api_key = api_key + + def on_create_client(self, config: ClientConfig) -> ClientConfig: + # Modify client configuration + config["namespace"] = "my-secure-namespace" + return super().on_create_client(config) + + async def connect_service_client( + self, config: temporalio.service.ConnectConfig + ) -> temporalio.service.ServiceClient: + # Add authentication to the connection + config.api_key = self.api_key + return await super().connect_service_client(config) + +# Use the plugin when connecting +client = await Client.connect( + "my-server.com:7233", + plugins=[AuthenticationPlugin("my-api-key")] +) +``` + +#### Worker Plugins + +Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring, +custom lifecycle management, or modifying worker settings. + +Here's an example of a worker plugin that adds custom monitoring: + +```python +from temporalio.worker import Plugin, WorkerConfig, Worker +import logging + +class MonitoringPlugin(Plugin): + def __init__(self): + self.logger = logging.getLogger(__name__) + + def on_create_worker(self, config: WorkerConfig) -> WorkerConfig: + # Modify worker configuration + original_task_queue = config["task_queue"] + config["task_queue"] = f"monitored-{original_task_queue}" + self.logger.info(f"Worker created for task queue: {config['task_queue']}") + return super().on_create_worker(config) + + async def run_worker(self, worker: Worker) -> None: + self.logger.info("Starting worker execution") + try: + await super().run_worker(worker) + finally: + self.logger.info("Worker execution completed") + +# Use the plugin when creating a worker +worker = Worker( + client, + task_queue="my-task-queue", + workflows=[MyWorkflow], + activities=[my_activity], + plugins=[MonitoringPlugin()] +) +``` + +For plugins that need to work with both clients and workers, you can implement both interfaces in a single class: + +```python +from temporalio.client import Plugin as ClientPlugin +from temporalio.worker import Plugin as WorkerPlugin + +class UnifiedPlugin(ClientPlugin, WorkerPlugin): + def on_create_client(self, config: ClientConfig) -> ClientConfig: + # Client-side customization + config["namespace"] = "unified-namespace" + return super().on_create_client(config) + + def on_create_worker(self, config: WorkerConfig) -> WorkerConfig: + # Worker-side customization + config["max_cached_workflows"] = 500 + return super().on_create_worker(config) + + async def run_worker(self, worker: Worker) -> None: + print("Starting unified worker") + await super().run_worker(worker) + +# Create client with the unified plugin +client = await Client.connect( + "localhost:7233", + plugins=[UnifiedPlugin()] +) + +# Worker will automatically inherit the plugin from the client +worker = Worker( + client, + task_queue="my-task-queue", + workflows=[MyWorkflow], + activities=[my_activity] +) +``` + +**Important Notes:** + +- Plugins are executed in reverse order (last plugin wraps the first), forming a chain of responsibility +- Client plugins that also implement worker plugin interfaces are automatically propagated to workers +- Avoid providing the same plugin to both client and worker to prevent double execution +- Plugin methods should call `super()` to maintain the plugin chain +- Each plugin's `name()` method returns a unique identifier for debugging purposes + + ### Workflow Replay Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,