-
Notifications
You must be signed in to change notification settings - Fork 98
Worker client replacement #517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -221,23 +221,8 @@ def __init__( | |
) | ||
interceptors = interceptors_from_client + list(interceptors) | ||
|
||
# Extract the bridge service client. We try the service on the client | ||
# first, then we support a worker_service_client on the client's service | ||
# to return underlying service client we can use. | ||
bridge_client: temporalio.service._BridgeServiceClient | ||
if isinstance(client.service_client, temporalio.service._BridgeServiceClient): | ||
bridge_client = client.service_client | ||
elif hasattr(client.service_client, "worker_service_client"): | ||
bridge_client = client.service_client.worker_service_client | ||
if not isinstance(bridge_client, temporalio.service._BridgeServiceClient): | ||
raise TypeError( | ||
"Client's worker_service_client cannot be used for a worker" | ||
) | ||
else: | ||
raise TypeError( | ||
"Client cannot be used for a worker. " | ||
+ "Use the original client's service or set worker_service_client on the wrapped service with the original service client." | ||
) | ||
# Extract the bridge service client | ||
bridge_client = _extract_bridge_client_for_worker(client) | ||
|
||
# Store the config for tracking | ||
self._config = WorkerConfig( | ||
|
@@ -283,7 +268,9 @@ def __init__( | |
|
||
# Create activity and workflow worker | ||
self._activity_worker: Optional[_ActivityWorker] = None | ||
runtime = bridge_client.config.runtime or temporalio.runtime.Runtime.default() | ||
self._runtime = ( | ||
bridge_client.config.runtime or temporalio.runtime.Runtime.default() | ||
) | ||
if activities: | ||
# Issue warning here if executor max_workers is lower than max | ||
# concurrent activities. We do this here instead of in | ||
|
@@ -304,7 +291,7 @@ def __init__( | |
shared_state_manager=shared_state_manager, | ||
data_converter=client_config["data_converter"], | ||
interceptors=interceptors, | ||
metric_meter=runtime.metric_meter, | ||
metric_meter=self._runtime.metric_meter, | ||
) | ||
self._workflow_worker: Optional[_WorkflowWorker] = None | ||
if workflows: | ||
|
@@ -321,30 +308,20 @@ def __init__( | |
workflow_failure_exception_types=workflow_failure_exception_types, | ||
debug_mode=debug_mode, | ||
disable_eager_activity_execution=disable_eager_activity_execution, | ||
metric_meter=runtime.metric_meter, | ||
metric_meter=self._runtime.metric_meter, | ||
on_eviction_hook=None, | ||
disable_safe_eviction=disable_safe_workflow_eviction, | ||
) | ||
|
||
# We need an already connected client | ||
# TODO(cretz): How to connect to client inside constructor here? In the | ||
# meantime, we disallow lazy clients from being used for workers. We | ||
# could check whether the connected client is present which means | ||
# lazy-but-already-connected clients would work, but that is confusing | ||
# to users that the client only works if they already made a call on it. | ||
if bridge_client.config.lazy: | ||
raise RuntimeError("Lazy clients cannot be used for workers") | ||
raw_bridge_client = bridge_client._bridge_client | ||
assert raw_bridge_client | ||
|
||
# Create bridge worker last. We have empirically observed that if it is | ||
# created before an error is raised from the activity worker | ||
# constructor, a deadlock/hang will occur presumably while trying to | ||
# free it. | ||
# TODO(cretz): Why does this cause a test hang when an exception is | ||
# thrown after it? | ||
assert bridge_client._bridge_client | ||
self._bridge_worker = temporalio.bridge.worker.Worker.create( | ||
raw_bridge_client, | ||
bridge_client._bridge_client, | ||
temporalio.bridge.worker.WorkerConfig( | ||
namespace=client.namespace, | ||
task_queue=task_queue, | ||
|
@@ -403,6 +380,29 @@ def task_queue(self) -> str: | |
"""Task queue this worker is on.""" | ||
return self._config["task_queue"] | ||
|
||
@property | ||
def client(self) -> temporalio.client.Client: | ||
"""Client currently set on the worker.""" | ||
return self._config["client"] | ||
|
||
@client.setter | ||
def client(self, value: temporalio.client.Client) -> None: | ||
"""Update the client associated with the worker. | ||
|
||
Changing the client will make sure the worker starts using it for the | ||
next calls it makes. However, outstanding client calls will still | ||
complete with the existing client. The new client cannot be "lazy" and | ||
must be using the same runtime as the current client. | ||
""" | ||
bridge_client = _extract_bridge_client_for_worker(value) | ||
if self._runtime is not bridge_client.config.runtime: | ||
raise ValueError( | ||
"New client is not on the same runtime as the existing client" | ||
) | ||
assert bridge_client._bridge_client | ||
self._bridge_worker.replace_client(bridge_client._bridge_client) | ||
self._config["client"] = value | ||
|
||
@property | ||
def is_running(self) -> bool: | ||
"""Whether the worker is running. | ||
|
@@ -714,5 +714,37 @@ def _get_module_code(mod_name: str) -> Optional[bytes]: | |
return None | ||
|
||
|
||
def _extract_bridge_client_for_worker( | ||
client: temporalio.client.Client, | ||
) -> temporalio.service._BridgeServiceClient: | ||
# Extract the bridge service client. We try the service on the client first, | ||
# then we support a worker_service_client on the client's service to return | ||
# underlying service client we can use. | ||
bridge_client: temporalio.service._BridgeServiceClient | ||
if isinstance(client.service_client, temporalio.service._BridgeServiceClient): | ||
bridge_client = client.service_client | ||
elif hasattr(client.service_client, "worker_service_client"): | ||
bridge_client = client.service_client.worker_service_client | ||
if not isinstance(bridge_client, temporalio.service._BridgeServiceClient): | ||
raise TypeError( | ||
"Client's worker_service_client cannot be used for a worker" | ||
) | ||
else: | ||
raise TypeError( | ||
"Client cannot be used for a worker. " | ||
+ "Use the original client's service or set worker_service_client on the wrapped service with the original service client." | ||
) | ||
|
||
# We need an already connected client | ||
# TODO(cretz): How to connect to client inside Worker constructor here? In | ||
# the meantime, we disallow lazy clients from being used for workers. We | ||
# could check whether the connected client is present which means | ||
# lazy-but-already-connected clients would work, but that is confusing | ||
# to users that the client only works if they already made a call on it. | ||
if bridge_client.config.lazy: | ||
raise RuntimeError("Lazy clients cannot be used for workers") | ||
Comment on lines
+744
to
+745
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems a little surprising for someone to run into but the structure does make this annoying to deal with. I guess you would have to asyncify everything from here and up which would also suck. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 This check and TODO has always been there, just moved to a separate methods. But yeah, technically we allow people to instantiate unconnected clients and this just prevents those from being used with workers. |
||
return bridge_client | ||
|
||
|
||
class _ShutdownRequested(RuntimeError): | ||
pass |
Uh oh!
There was an error while loading. Please reload this page.