Skip to content

Initial rough framework for plugins #952

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ async def connect(
runtime: Optional[temporalio.runtime.Runtime] = None,
http_connect_proxy_config: Optional[HttpConnectProxyConfig] = None,
header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC,
plugins: Sequence[Plugin] = [],
) -> Client:
"""Connect to a Temporal server.

Expand Down Expand Up @@ -178,13 +179,21 @@ async def connect(
runtime=runtime,
http_connect_proxy_config=http_connect_proxy_config,
)

root_plugin: Plugin = _RootPlugin()
for plugin in reversed(list(plugins)):
root_plugin = plugin.init_client_plugin(root_plugin)

service_client = await root_plugin.connect_service_client(connect_config)

return Client(
await temporalio.service.ServiceClient.connect(connect_config),
service_client,
namespace=namespace,
data_converter=data_converter,
interceptors=interceptors,
default_workflow_query_reject_condition=default_workflow_query_reject_condition,
header_codec_behavior=header_codec_behavior,
plugins=plugins,
)

def __init__(
Expand All @@ -198,6 +207,7 @@ def __init__(
temporalio.common.QueryRejectCondition
] = None,
header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC,
plugins: Sequence[Plugin] = [],
):
"""Create a Temporal client from a service client.

Expand All @@ -209,15 +219,22 @@ def __init__(
self._impl = interceptor.intercept_client(self._impl)

# Store the config for tracking
self._config = ClientConfig(
config = ClientConfig(
service_client=service_client,
namespace=namespace,
data_converter=data_converter,
interceptors=interceptors,
default_workflow_query_reject_condition=default_workflow_query_reject_condition,
header_codec_behavior=header_codec_behavior,
plugins=plugins,
)

root_plugin: Plugin = _RootPlugin()
for plugin in reversed(list(plugins)):
root_plugin = plugin.init_client_plugin(root_plugin)

self._config = root_plugin.on_create_client(config)

def config(self) -> ClientConfig:
"""Config, as a dictionary, used to create this client.

Expand Down Expand Up @@ -1510,6 +1527,7 @@ class ClientConfig(TypedDict, total=False):
Optional[temporalio.common.QueryRejectCondition]
]
header_codec_behavior: Required[HeaderCodecBehavior]
plugins: Required[Sequence[Plugin]]


class WorkflowHistoryEventFilterType(IntEnum):
Expand Down Expand Up @@ -7367,3 +7385,27 @@ async def _decode_user_metadata(
if not metadata.HasField("details")
else (await converter.decode([metadata.details]))[0],
)


class Plugin:
def init_client_plugin(self, next: Plugin) -> Plugin:
self.next_client_plugin = next
return self

def on_create_client(self, config: ClientConfig) -> ClientConfig:
return self.next_client_plugin.on_create_client(config)

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
return await self.next_client_plugin.connect_service_client(config)


class _RootPlugin(Plugin):
def on_create_client(self, config: ClientConfig) -> ClientConfig:
return config

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
return await temporalio.service.ServiceClient.connect(config)
2 changes: 2 additions & 0 deletions temporalio/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
WorkflowSlotInfo,
)
from ._worker import (
Plugin,
PollerBehavior,
PollerBehaviorAutoscaling,
PollerBehaviorSimpleMaximum,
Expand Down Expand Up @@ -78,6 +79,7 @@
"ActivityOutboundInterceptor",
"WorkflowInboundInterceptor",
"WorkflowOutboundInterceptor",
"Plugin",
# Interceptor input
"ContinueAsNewInput",
"ExecuteActivityInput",
Expand Down
39 changes: 38 additions & 1 deletion temporalio/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,26 @@ def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior:
]


class Plugin:
def init_worker_plugin(self, next: Plugin) -> Plugin:
self.next_worker_plugin = next
return self

def on_create_worker(self, config: WorkerConfig) -> WorkerConfig:
return self.next_worker_plugin.on_create_worker(config)

async def run_worker(self, worker: Worker) -> None:
await self.next_worker_plugin.run_worker(worker)


class _RootPlugin(Plugin):
def on_create_worker(self, config: WorkerConfig) -> WorkerConfig:
return config

async def run_worker(self, worker: Worker) -> None:
await worker._run()


class Worker:
"""Worker to process workflows and/or activities.

Expand Down Expand Up @@ -153,6 +173,7 @@ def __init__(
nexus_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum(
maximum=5
),
plugins: Sequence[Plugin] = [],
) -> None:
"""Create a worker to process workflows and/or activities.

Expand Down Expand Up @@ -343,11 +364,17 @@ def __init__(
)
interceptors = interceptors_from_client + list(interceptors)

plugins_from_client = cast(
List[Plugin], [p for p in client_config["plugins"] if isinstance(p, Plugin)]
)
plugins = plugins_from_client + list(plugins)
print(plugins)

# Extract the bridge service client
bridge_client = _extract_bridge_client_for_worker(client)

# Store the config for tracking
self._config = WorkerConfig(
config = WorkerConfig(
client=client,
task_queue=task_queue,
activities=activities,
Expand Down Expand Up @@ -382,6 +409,13 @@ def __init__(
use_worker_versioning=use_worker_versioning,
disable_safe_workflow_eviction=disable_safe_workflow_eviction,
)

root_plugin: Plugin = _RootPlugin()
for plugin in reversed(list(plugins)):
root_plugin = plugin.init_worker_plugin(root_plugin)
self._config = root_plugin.on_create_worker(config)
self._plugin = root_plugin

self._started = False
self._shutdown_event = asyncio.Event()
self._shutdown_complete_event = asyncio.Event()
Expand Down Expand Up @@ -646,6 +680,9 @@ async def run(self) -> None:
also cancel the shutdown process. Therefore users are encouraged to use
explicit shutdown instead.
"""
await self._plugin.run_worker(self)

async def _run(self):
# Eagerly validate which will do a namespace check in Core
await self._bridge_worker.validate()

Expand Down
29 changes: 29 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
BuildIdOpPromoteSetByBuildId,
CancelWorkflowInput,
Client,
ClientConfig,
CloudOperationsClient,
Interceptor,
OutboundInterceptor,
Plugin,
QueryWorkflowInput,
RPCError,
RPCStatusCode,
Expand Down Expand Up @@ -1499,3 +1501,30 @@ async def test_cloud_client_simple():
GetNamespaceRequest(namespace=os.environ["TEMPORAL_CLIENT_CLOUD_NAMESPACE"])
)
assert os.environ["TEMPORAL_CLIENT_CLOUD_NAMESPACE"] == result.namespace.namespace


class MyPlugin(Plugin):
def on_create_client(self, config: ClientConfig) -> ClientConfig:
config["namespace"] = "replaced_namespace"
return super().on_create_client(config)

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
config.api_key = "replaced key"
return await super().connect_service_client(config)


async def test_client_plugin(client: Client, env: WorkflowEnvironment):
if env.supports_time_skipping:
pytest.skip("Client connect is only designed for local")

config = client.config()
config["plugins"] = [MyPlugin()]
new_client = Client(**config)
assert new_client.namespace == "replaced_namespace"

new_client = await Client.connect(
client.service_client.config.target_host, plugins=[MyPlugin()]
)
assert new_client.service_client.config.api_key == "replaced key"
52 changes: 50 additions & 2 deletions tests/worker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import asyncio
import concurrent.futures
import sys
import uuid
from datetime import timedelta
from typing import Any, Awaitable, Callable, Optional, Sequence
from urllib.request import urlopen

import temporalio.api.enums.v1
import temporalio.client
import temporalio.worker._worker
from temporalio import activity, workflow
from temporalio.api.workflowservice.v1 import (
Expand All @@ -19,7 +19,11 @@
SetWorkerDeploymentRampingVersionRequest,
SetWorkerDeploymentRampingVersionResponse,
)
from temporalio.client import BuildIdOpAddNewDefault, Client, TaskReachabilityType
from temporalio.client import (
BuildIdOpAddNewDefault,
Client,
TaskReachabilityType,
)
from temporalio.common import PinnedVersioningOverride, RawValue, VersioningBehavior
from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig
from temporalio.service import RPCError
Expand All @@ -38,6 +42,7 @@
SlotReleaseContext,
SlotReserveContext,
Worker,
WorkerConfig,
WorkerDeploymentConfig,
WorkerDeploymentVersion,
WorkerTuner,
Expand Down Expand Up @@ -1184,3 +1189,46 @@ def shutdown(self) -> None:
if self.next_exception_task:
self.next_exception_task.cancel()
setattr(self.worker._bridge_worker, self.attr, self.orig_poll_call)


class MyCombinedPlugin(temporalio.client.Plugin, temporalio.worker.Plugin):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the value to users of this multiple inheritance approach? It seems to be a discoverability loss--knowing you can add a second base class is not a leap I'd expect most people to make.
I'm also wondering how a plugin author will register the name/version of their plugin in this scheme.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say a few things:

  1. It does inform a user of the plugin where it is intended to be used. This could be handled via documentation, but so can the discoverability loss.
  2. It's consistent with our existing layout for Interceptors. Should there also only be one kind of intercepter which you use everywhere? So that you can discover what is interceptable. If you think so, is it worth differing our design between them since that ship has sailed.

@cretz probably has thoughts here.

Name/version aren't present, but easy to add a function(s) to the Plugin classes which has to be implemented. I'll bring this up to the team as well.

Copy link
Member

@cretz cretz Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically what @tconley1428 said. The main benefit is separating what you're plugging into, same as interceptors (and has consistency with interceptors on this front for those familiar). But it's not a big deal, we can make temporalio.common.Plugin that combines client and worker concerns if we want, we've just traditionally tried to separate client and worker abstractions. We don't do worker stuff outside of the worker module for example, which is a logical approach.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does inform a user of the plugin where it is intended to be used.

This is redundantly shown by the names of the methods and the types of those methods, e.g. on_create_worker.

It's consistent with our existing layout for Interceptors

The purpose of a plugin is different from an interceptor. The whole point of a plugin is to bring together disparate things (interceptors, activities, etc) under one roof. The two purposes of this are (a) to help users easily add and (b) to give a roadmap for framework integrators to know what they can/need to do to. It's counterproductive to try to separate them out at this layer.

But it's not a big deal, we can make temporalio.common.Plugin that combines client and worker concerns if we want

👍, I would be in favor of temporalio.common.Plugin. If you want to make that multiply inherit, I don't mind if there's a good reason and as long as the samples all use the common.Plugin.

The main benefit is separating what you're plugging into

I'm not following the root reason here.

def on_create_worker(self, config: WorkerConfig) -> WorkerConfig:
print("Create worker combined plugin")
config["task_queue"] = "combined"
return super().on_create_worker(config)


class MyWorkerPlugin(temporalio.worker.Plugin):
def on_create_worker(self, config: WorkerConfig) -> WorkerConfig:
print("Create worker worker plugin")
config["task_queue"] = "replaced_queue"
return super().on_create_worker(config)

async def run_worker(self, worker: Worker) -> None:
await super().run_worker(worker)


async def test_worker_plugin(client: Client) -> None:
worker = Worker(
client,
task_queue="queue",
activities=[never_run_activity],
plugins=[MyWorkerPlugin()],
)
assert worker.config().get("task_queue") == "replaced_queue"

# Test client plugin propagation to worker plugins
new_config = client.config()
new_config["plugins"] = [MyCombinedPlugin()]
client = Client(**new_config)
worker = Worker(client, task_queue="queue", activities=[never_run_activity])
assert worker.config().get("task_queue") == "combined"

# Test both. Client propagated plugins are called first, so the worker plugin overrides in this case
worker = Worker(
client,
task_queue="queue",
activities=[never_run_activity],
plugins=[MyWorkerPlugin()],
)
assert worker.config().get("task_queue") == "replaced_queue"
Loading