Skip to content

Commit 0687151

Browse files
authored
Worker client replacement (#517)
Fixes #513
1 parent 0bb94f8 commit 0687151

File tree

5 files changed

+137
-34
lines changed

5 files changed

+137
-34
lines changed

temporalio/bridge/sdk-core

Submodule sdk-core updated 83 files

temporalio/bridge/src/worker.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,13 @@ impl WorkerRef {
184184
Ok(())
185185
}
186186

187+
fn replace_client(&self, client: &client::ClientRef) {
188+
self.worker
189+
.as_ref()
190+
.expect("missing worker")
191+
.replace_client(client.retry_client.clone().into_inner());
192+
}
193+
187194
fn initiate_shutdown(&self) -> PyResult<()> {
188195
let worker = self.worker.as_ref().unwrap().clone();
189196
worker.initiate_shutdown();

temporalio/bridge/worker.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,10 @@ def request_workflow_eviction(self, run_id: str) -> None:
132132
"""Request a workflow be evicted."""
133133
self._ref.request_workflow_eviction(run_id)
134134

135+
def replace_client(self, client: temporalio.bridge.client.Client) -> None:
136+
"""Replace the worker client."""
137+
self._ref.replace_client(client._ref)
138+
135139
def initiate_shutdown(self) -> None:
136140
"""Start shutdown of the worker."""
137141
self._ref.initiate_shutdown()

temporalio/worker/_worker.py

Lines changed: 64 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -221,23 +221,8 @@ def __init__(
221221
)
222222
interceptors = interceptors_from_client + list(interceptors)
223223

224-
# Extract the bridge service client. We try the service on the client
225-
# first, then we support a worker_service_client on the client's service
226-
# to return underlying service client we can use.
227-
bridge_client: temporalio.service._BridgeServiceClient
228-
if isinstance(client.service_client, temporalio.service._BridgeServiceClient):
229-
bridge_client = client.service_client
230-
elif hasattr(client.service_client, "worker_service_client"):
231-
bridge_client = client.service_client.worker_service_client
232-
if not isinstance(bridge_client, temporalio.service._BridgeServiceClient):
233-
raise TypeError(
234-
"Client's worker_service_client cannot be used for a worker"
235-
)
236-
else:
237-
raise TypeError(
238-
"Client cannot be used for a worker. "
239-
+ "Use the original client's service or set worker_service_client on the wrapped service with the original service client."
240-
)
224+
# Extract the bridge service client
225+
bridge_client = _extract_bridge_client_for_worker(client)
241226

242227
# Store the config for tracking
243228
self._config = WorkerConfig(
@@ -283,7 +268,9 @@ def __init__(
283268

284269
# Create activity and workflow worker
285270
self._activity_worker: Optional[_ActivityWorker] = None
286-
runtime = bridge_client.config.runtime or temporalio.runtime.Runtime.default()
271+
self._runtime = (
272+
bridge_client.config.runtime or temporalio.runtime.Runtime.default()
273+
)
287274
if activities:
288275
# Issue warning here if executor max_workers is lower than max
289276
# concurrent activities. We do this here instead of in
@@ -304,7 +291,7 @@ def __init__(
304291
shared_state_manager=shared_state_manager,
305292
data_converter=client_config["data_converter"],
306293
interceptors=interceptors,
307-
metric_meter=runtime.metric_meter,
294+
metric_meter=self._runtime.metric_meter,
308295
)
309296
self._workflow_worker: Optional[_WorkflowWorker] = None
310297
if workflows:
@@ -321,30 +308,20 @@ def __init__(
321308
workflow_failure_exception_types=workflow_failure_exception_types,
322309
debug_mode=debug_mode,
323310
disable_eager_activity_execution=disable_eager_activity_execution,
324-
metric_meter=runtime.metric_meter,
311+
metric_meter=self._runtime.metric_meter,
325312
on_eviction_hook=None,
326313
disable_safe_eviction=disable_safe_workflow_eviction,
327314
)
328315

329-
# We need an already connected client
330-
# TODO(cretz): How to connect to client inside constructor here? In the
331-
# meantime, we disallow lazy clients from being used for workers. We
332-
# could check whether the connected client is present which means
333-
# lazy-but-already-connected clients would work, but that is confusing
334-
# to users that the client only works if they already made a call on it.
335-
if bridge_client.config.lazy:
336-
raise RuntimeError("Lazy clients cannot be used for workers")
337-
raw_bridge_client = bridge_client._bridge_client
338-
assert raw_bridge_client
339-
340316
# Create bridge worker last. We have empirically observed that if it is
341317
# created before an error is raised from the activity worker
342318
# constructor, a deadlock/hang will occur presumably while trying to
343319
# free it.
344320
# TODO(cretz): Why does this cause a test hang when an exception is
345321
# thrown after it?
322+
assert bridge_client._bridge_client
346323
self._bridge_worker = temporalio.bridge.worker.Worker.create(
347-
raw_bridge_client,
324+
bridge_client._bridge_client,
348325
temporalio.bridge.worker.WorkerConfig(
349326
namespace=client.namespace,
350327
task_queue=task_queue,
@@ -403,6 +380,29 @@ def task_queue(self) -> str:
403380
"""Task queue this worker is on."""
404381
return self._config["task_queue"]
405382

383+
@property
384+
def client(self) -> temporalio.client.Client:
385+
"""Client currently set on the worker."""
386+
return self._config["client"]
387+
388+
@client.setter
389+
def client(self, value: temporalio.client.Client) -> None:
390+
"""Update the client associated with the worker.
391+
392+
Changing the client will make sure the worker starts using it for the
393+
next calls it makes. However, outstanding client calls will still
394+
complete with the existing client. The new client cannot be "lazy" and
395+
must be using the same runtime as the current client.
396+
"""
397+
bridge_client = _extract_bridge_client_for_worker(value)
398+
if self._runtime is not bridge_client.config.runtime:
399+
raise ValueError(
400+
"New client is not on the same runtime as the existing client"
401+
)
402+
assert bridge_client._bridge_client
403+
self._bridge_worker.replace_client(bridge_client._bridge_client)
404+
self._config["client"] = value
405+
406406
@property
407407
def is_running(self) -> bool:
408408
"""Whether the worker is running.
@@ -714,5 +714,37 @@ def _get_module_code(mod_name: str) -> Optional[bytes]:
714714
return None
715715

716716

717+
def _extract_bridge_client_for_worker(
718+
client: temporalio.client.Client,
719+
) -> temporalio.service._BridgeServiceClient:
720+
# Extract the bridge service client. We try the service on the client first,
721+
# then we support a worker_service_client on the client's service to return
722+
# underlying service client we can use.
723+
bridge_client: temporalio.service._BridgeServiceClient
724+
if isinstance(client.service_client, temporalio.service._BridgeServiceClient):
725+
bridge_client = client.service_client
726+
elif hasattr(client.service_client, "worker_service_client"):
727+
bridge_client = client.service_client.worker_service_client
728+
if not isinstance(bridge_client, temporalio.service._BridgeServiceClient):
729+
raise TypeError(
730+
"Client's worker_service_client cannot be used for a worker"
731+
)
732+
else:
733+
raise TypeError(
734+
"Client cannot be used for a worker. "
735+
+ "Use the original client's service or set worker_service_client on the wrapped service with the original service client."
736+
)
737+
738+
# We need an already connected client
739+
# TODO(cretz): How to connect to client inside Worker constructor here? In
740+
# the meantime, we disallow lazy clients from being used for workers. We
741+
# could check whether the connected client is present which means
742+
# lazy-but-already-connected clients would work, but that is confusing
743+
# to users that the client only works if they already made a call on it.
744+
if bridge_client.config.lazy:
745+
raise RuntimeError("Lazy clients cannot be used for workers")
746+
return bridge_client
747+
748+
717749
class _ShutdownRequested(RuntimeError):
718750
pass

tests/worker/test_workflow.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4506,8 +4506,68 @@ async def test_workflow_fail_on_bad_input(client: Client):
45064506
await client.execute_workflow(
45074507
"FailOnBadInputWorkflow",
45084508
123,
4509-
id=f"wf-{uuid}",
4509+
id=f"wf-{uuid.uuid4()}",
45104510
task_queue=worker.task_queue,
45114511
)
45124512
assert isinstance(err.value.cause, ApplicationError)
45134513
assert "Failed decoding arguments" in err.value.cause.message
4514+
4515+
4516+
@workflow.defn
4517+
class TickingWorkflow:
4518+
@workflow.run
4519+
async def run(self) -> None:
4520+
# Just tick every 100ms for 10s
4521+
for _ in range(100):
4522+
await asyncio.sleep(0.1)
4523+
4524+
4525+
async def test_workflow_replace_worker_client(client: Client, env: WorkflowEnvironment):
4526+
if env.supports_time_skipping:
4527+
pytest.skip("Only testing against two real servers")
4528+
# We are going to start a second ephemeral server and then replace the
4529+
# client. So we will start a no-cache ticking workflow with the current
4530+
# client and confirm it has accomplished at least one task. Then we will
4531+
# start another on the other client, and confirm it gets started too. Then
4532+
# we will terminate both. We have to use a ticking workflow with only one
4533+
# poller to force a quick re-poll to recognize our client change quickly (as
4534+
# opposed to just waiting the minute for poll timeout).
4535+
async with await WorkflowEnvironment.start_local() as other_env:
4536+
# Start both workflows on different servers
4537+
task_queue = f"tq-{uuid.uuid4()}"
4538+
handle1 = await client.start_workflow(
4539+
TickingWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=task_queue
4540+
)
4541+
handle2 = await other_env.client.start_workflow(
4542+
TickingWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=task_queue
4543+
)
4544+
4545+
async def any_task_completed(handle: WorkflowHandle) -> bool:
4546+
async for e in handle.fetch_history_events():
4547+
if e.HasField("workflow_task_completed_event_attributes"):
4548+
return True
4549+
return False
4550+
4551+
# Now start the worker on the first env
4552+
async with Worker(
4553+
client,
4554+
task_queue=task_queue,
4555+
workflows=[TickingWorkflow],
4556+
max_cached_workflows=0,
4557+
max_concurrent_workflow_task_polls=1,
4558+
) as worker:
4559+
# Confirm the first ticking workflow has completed a task but not
4560+
# the second
4561+
await assert_eq_eventually(True, lambda: any_task_completed(handle1))
4562+
assert not await any_task_completed(handle2)
4563+
4564+
# Now replace the client, which should be used fairly quickly
4565+
# because we should have timer-done poll completions every 100ms
4566+
worker.client = other_env.client
4567+
4568+
# Now confirm the other workflow has started
4569+
await assert_eq_eventually(True, lambda: any_task_completed(handle2))
4570+
4571+
# Terminate both
4572+
await handle1.terminate()
4573+
await handle2.terminate()

0 commit comments

Comments
 (0)