Skip to content

Commit cffd778

Browse files
Expose the WorkerClient trait (#924)
* Expose the `WorkerClient` trait # Motivation The `WorkerClient` trait encapsulates all the methods that the worker needs to make the underlying API calls. Currently, it is only exposed to the crate. However, it is useful to expose this publicly to allow other language clients to provide their native `WorkerClient` implementation e.g. by backing it with the language specific gRPC library. # Modifications This PR makes the `WorkerClient` trait and other types public. # Result Other language SDKs can now substitute in their gRPC client. * Format * Skip cloud test on forks properly --------- Co-authored-by: Spencer Judge <spencer@temporal.io>
1 parent ae85e4c commit cffd778

File tree

4 files changed

+58
-24
lines changed

4 files changed

+58
-24
lines changed

.github/workflows/per-pr.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ jobs:
107107
- run: cargo integ-test
108108

109109
cloud-tests:
110-
if: github.repository_owner == 'temporalio'
110+
if: github.event.pull_request.head.repo.full_name == '' || github.event.pull_request.head.repo.full_name == 'temporalio/sdk-core'
111111
name: Cloud tests
112112
env:
113113
TEMPORAL_CLOUD_ADDRESS: https://${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud:7233

core/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ pub use worker::{
4747
WorkerConfigBuilder,
4848
};
4949

50+
/// Expose [WorkerClient] symbols
51+
pub use crate::worker::client::{
52+
PollActivityOptions, PollOptions, PollWorkflowOptions, WorkerClient, WorkflowTaskCompletion,
53+
};
5054
use crate::{
5155
replay::{HistoryForReplay, ReplayWorkerInput},
5256
telemetry::{

core/src/worker/client.rs

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -120,97 +120,123 @@ impl WorkerClientBag {
120120
/// minimal mocking surface. Delegates to [WorkflowClientTrait] so see that for details.
121121
#[cfg_attr(test, mockall::automock)]
122122
#[async_trait::async_trait]
123-
pub(crate) trait WorkerClient: Sync + Send {
123+
pub trait WorkerClient: Sync + Send {
124+
/// Poll workflow tasks
124125
async fn poll_workflow_task(
125126
&self,
126127
poll_options: PollOptions,
127128
wf_options: PollWorkflowOptions,
128129
) -> Result<PollWorkflowTaskQueueResponse>;
130+
/// Poll activity tasks
129131
async fn poll_activity_task(
130132
&self,
131133
poll_options: PollOptions,
132134
act_options: PollActivityOptions,
133135
) -> Result<PollActivityTaskQueueResponse>;
136+
/// Poll Nexus tasks
134137
async fn poll_nexus_task(
135138
&self,
136139
poll_options: PollOptions,
137140
) -> Result<PollNexusTaskQueueResponse>;
141+
/// Complete a workflow task
138142
async fn complete_workflow_task(
139143
&self,
140144
request: WorkflowTaskCompletion,
141145
) -> Result<RespondWorkflowTaskCompletedResponse>;
146+
/// Complete an activity task
142147
async fn complete_activity_task(
143148
&self,
144149
task_token: TaskToken,
145150
result: Option<Payloads>,
146151
) -> Result<RespondActivityTaskCompletedResponse>;
152+
/// Complete a Nexus task
147153
async fn complete_nexus_task(
148154
&self,
149155
task_token: TaskToken,
150156
response: nexus::v1::Response,
151157
) -> Result<RespondNexusTaskCompletedResponse>;
158+
/// Record an activity heartbeat
152159
async fn record_activity_heartbeat(
153160
&self,
154161
task_token: TaskToken,
155162
details: Option<Payloads>,
156163
) -> Result<RecordActivityTaskHeartbeatResponse>;
164+
/// Cancel an activity task
157165
async fn cancel_activity_task(
158166
&self,
159167
task_token: TaskToken,
160168
details: Option<Payloads>,
161169
) -> Result<RespondActivityTaskCanceledResponse>;
170+
/// Fail an activity task
162171
async fn fail_activity_task(
163172
&self,
164173
task_token: TaskToken,
165174
failure: Option<Failure>,
166175
) -> Result<RespondActivityTaskFailedResponse>;
176+
/// Fail a workflow task
167177
async fn fail_workflow_task(
168178
&self,
169179
task_token: TaskToken,
170180
cause: WorkflowTaskFailedCause,
171181
failure: Option<Failure>,
172182
) -> Result<RespondWorkflowTaskFailedResponse>;
183+
/// Fail a Nexus task
173184
async fn fail_nexus_task(
174185
&self,
175186
task_token: TaskToken,
176187
error: nexus::v1::HandlerError,
177188
) -> Result<RespondNexusTaskFailedResponse>;
189+
/// Get the workflow execution history
178190
async fn get_workflow_execution_history(
179191
&self,
180192
workflow_id: String,
181193
run_id: Option<String>,
182194
page_token: Vec<u8>,
183195
) -> Result<GetWorkflowExecutionHistoryResponse>;
196+
/// Respond to a legacy query
184197
async fn respond_legacy_query(
185198
&self,
186199
task_token: TaskToken,
187200
query_result: QueryResult,
188201
) -> Result<RespondQueryTaskCompletedResponse>;
202+
/// Describe the namespace
189203
async fn describe_namespace(&self) -> Result<DescribeNamespaceResponse>;
204+
/// Shutdown the worker
190205
async fn shutdown_worker(&self, sticky_task_queue: String) -> Result<ShutdownWorkerResponse>;
191206

207+
/// Replace the underlying client
192208
fn replace_client(&self, new_client: RetryClient<Client>);
209+
/// Return server capabilities
193210
fn capabilities(&self) -> Option<Capabilities>;
211+
/// Return workers using this client
194212
fn workers(&self) -> Arc<SlotManager>;
213+
/// Indicates if this is a mock client
195214
fn is_mock(&self) -> bool;
196-
/// Return (sdk_name, sdk_version) from the underlying client configuration
215+
/// Return name and version of the SDK
197216
fn sdk_name_and_version(&self) -> (String, String);
198217
}
199218

219+
/// Configuration options shared by workflow, activity, and Nexus polling calls
200220
#[derive(Debug, Clone)]
201-
pub(crate) struct PollOptions {
202-
pub(crate) task_queue: String,
203-
pub(crate) no_retry: Option<NoRetryOnMatching>,
204-
pub(crate) timeout_override: Option<Duration>,
221+
pub struct PollOptions {
222+
/// The name of the task queue to poll
223+
pub task_queue: String,
224+
/// Prevents retrying on specific gRPC statuses
225+
pub no_retry: Option<NoRetryOnMatching>,
226+
/// Overrides the default RPC timeout for the poll request
227+
pub timeout_override: Option<Duration>,
205228
}
229+
/// Additional options specific to workflow task polling
206230
#[derive(Debug, Clone)]
207-
pub(crate) struct PollWorkflowOptions {
208-
// If true this will be a sticky poll
209-
pub(crate) sticky_queue_name: Option<String>,
231+
pub struct PollWorkflowOptions {
232+
/// Optional sticky queue name for session‐based workflow polling
233+
pub sticky_queue_name: Option<String>,
210234
}
235+
/// Additional options specific to activity task polling
211236
#[derive(Debug, Clone)]
212-
pub(crate) struct PollActivityOptions {
213-
pub(crate) max_tasks_per_sec: Option<f64>,
237+
pub struct PollActivityOptions {
238+
/// Optional rate limit (tasks per second) for activity polling
239+
pub max_tasks_per_sec: Option<f64>,
214240
}
215241

216242
#[async_trait::async_trait]
@@ -634,25 +660,25 @@ impl NamespacedClient for WorkerClientBag {
634660
/// A version of [RespondWorkflowTaskCompletedRequest] that will finish being filled out by the
635661
/// server client
636662
#[derive(Debug, Clone, PartialEq)]
637-
pub(crate) struct WorkflowTaskCompletion {
663+
pub struct WorkflowTaskCompletion {
638664
/// The task token that would've been received from polling for a workflow activation
639-
pub(crate) task_token: TaskToken,
665+
pub task_token: TaskToken,
640666
/// A list of new commands to send to the server, such as starting a timer.
641-
pub(crate) commands: Vec<Command>,
667+
pub commands: Vec<Command>,
642668
/// A list of protocol messages to send to the server.
643-
pub(crate) messages: Vec<ProtocolMessage>,
669+
pub messages: Vec<ProtocolMessage>,
644670
/// If set, indicate that next task should be queued on sticky queue with given attributes.
645-
pub(crate) sticky_attributes: Option<StickyExecutionAttributes>,
671+
pub sticky_attributes: Option<StickyExecutionAttributes>,
646672
/// Responses to queries in the `queries` field of the workflow task.
647-
pub(crate) query_responses: Vec<QueryResult>,
673+
pub query_responses: Vec<QueryResult>,
648674
/// Indicate that the task completion should return a new WFT if one is available
649-
pub(crate) return_new_workflow_task: bool,
675+
pub return_new_workflow_task: bool,
650676
/// Force a new WFT to be created after this completion
651-
pub(crate) force_create_new_workflow_task: bool,
677+
pub force_create_new_workflow_task: bool,
652678
/// SDK-specific metadata to send
653-
pub(crate) sdk_metadata: WorkflowTaskCompletedMetadata,
679+
pub sdk_metadata: WorkflowTaskCompletedMetadata,
654680
/// Metering info
655-
pub(crate) metering_metadata: MeteringMetadata,
681+
pub metering_metadata: MeteringMetadata,
656682
/// Versioning behavior of the workflow, if any.
657-
pub(crate) versioning_behavior: VersioningBehavior,
683+
pub versioning_behavior: VersioningBehavior,
658684
}

core/src/worker/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,11 @@ impl WorkerTrait for Worker {
262262
}
263263

264264
impl Worker {
265-
pub(crate) fn new(
265+
/// Creates a new [Worker] from a [WorkerClient] instance with real task pollers and optional telemetry.
266+
///
267+
/// This is a convenience constructor that logs initialization and delegates to
268+
/// [Worker::new_with_pollers()] using [TaskPollers::Real].
269+
pub fn new(
266270
config: WorkerConfig,
267271
sticky_queue_name: Option<String>,
268272
client: Arc<dyn WorkerClient>,

0 commit comments

Comments
 (0)