Skip to content

Commit 15163c4

Browse files
authored
Add wft fail reason to legacy query responses (#950)
1 parent 29c89f2 commit 15163c4

File tree

35 files changed

+1823
-307
lines changed

35 files changed

+1823
-307
lines changed

client/src/raw.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1336,6 +1336,24 @@ proxier! {
13361336
r.extensions_mut().insert(labels);
13371337
}
13381338
);
1339+
(
1340+
list_workers,
1341+
ListWorkersRequest,
1342+
ListWorkersResponse,
1343+
|r| {
1344+
let labels = namespaced_request!(r);
1345+
r.extensions_mut().insert(labels);
1346+
}
1347+
);
1348+
(
1349+
record_worker_heartbeat,
1350+
RecordWorkerHeartbeatRequest,
1351+
RecordWorkerHeartbeatResponse,
1352+
|r| {
1353+
let labels = namespaced_request!(r);
1354+
r.extensions_mut().insert(labels);
1355+
}
1356+
);
13391357
}
13401358

13411359
proxier! {

core/src/core_tests/queries.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ use crate::{
33
MockPollCfg, MocksHolder, ResponseType, WorkerExt, build_mock_pollers, canned_histories,
44
hist_to_poll_resp, mock_worker, single_hist_mock_sg,
55
},
6-
worker::{LEGACY_QUERY_ID, client::mocks::mock_workflow_client},
6+
worker::{
7+
LEGACY_QUERY_ID,
8+
client::{LegacyQueryResult, mocks::mock_workflow_client},
9+
},
710
};
811
use futures_util::stream;
912
use std::{
@@ -25,7 +28,7 @@ use temporal_sdk_core_protos::{
2528
},
2629
temporal::api::{
2730
common::v1::Payload,
28-
enums::v1::{CommandType, EventType},
31+
enums::v1::{CommandType, EventType, WorkflowTaskFailedCause},
2932
failure::v1::Failure,
3033
history::v1::{ActivityTaskCancelRequestedEventAttributes, History, history_event},
3134
query::v1::WorkflowQuery,
@@ -300,6 +303,12 @@ async fn query_failure_because_nondeterminism(#[values(true, false)] legacy: boo
300303
}];
301304
let mut mock = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_workflow_client());
302305
if legacy {
306+
mock.expect_legacy_query_matcher = Box::new(|_, f| match f {
307+
LegacyQueryResult::Failed(f) => {
308+
f.force_cause() == WorkflowTaskFailedCause::NonDeterministicError
309+
}
310+
_ => false,
311+
});
303312
mock.num_expected_legacy_query_resps = 1;
304313
} else {
305314
mock.num_expected_fails = 1;

core/src/core_tests/workflow_tasks.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2886,6 +2886,7 @@ async fn use_compatible_version_flag(
28862886
VersioningIntent::Compatible => true,
28872887
VersioningIntent::Default => false,
28882888
};
2889+
#[allow(deprecated)]
28892890
mock_client
28902891
.expect_complete_workflow_task()
28912892
.returning(move |mut c| {

core/src/test_help/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ use crate::{
99
worker::{
1010
TaskPollers,
1111
client::{
12-
MockWorkerClient, WorkerClient, WorkflowTaskCompletion, mocks::mock_workflow_client,
12+
LegacyQueryResult, MockWorkerClient, WorkerClient, WorkflowTaskCompletion,
13+
mocks::mock_workflow_client,
1314
},
1415
},
1516
};
@@ -406,6 +407,9 @@ pub(crate) struct MockPollCfg {
406407
/// All calls to fail WFTs must match this predicate
407408
pub(crate) expect_fail_wft_matcher:
408409
Box<dyn Fn(&TaskToken, &WorkflowTaskFailedCause, &Option<Failure>) -> bool + Send>,
410+
/// All calls to legacy query responses must match this predicate
411+
pub(crate) expect_legacy_query_matcher:
412+
Box<dyn Fn(&TaskToken, &LegacyQueryResult) -> bool + Send>,
409413
pub(crate) completion_mock_fn: Option<Box<WFTCompletionMockFn>>,
410414
pub(crate) num_expected_completions: Option<TimesRange>,
411415
/// If being used with the Rust SDK, this is set true. It ensures pollers will not error out
@@ -428,6 +432,7 @@ impl MockPollCfg {
428432
num_expected_legacy_query_resps: 0,
429433
mock_client: mock_workflow_client(),
430434
expect_fail_wft_matcher: Box::new(|_, _, _| true),
435+
expect_legacy_query_matcher: Box::new(|_, _| true),
431436
completion_mock_fn: None,
432437
num_expected_completions: None,
433438
using_rust_sdk: false,
@@ -466,6 +471,7 @@ impl MockPollCfg {
466471
num_expected_legacy_query_resps: 0,
467472
mock_client,
468473
expect_fail_wft_matcher: Box::new(|_, _, _| true),
474+
expect_legacy_query_matcher: Box::new(|_, _| true),
469475
completion_mock_fn: None,
470476
num_expected_completions: None,
471477
using_rust_sdk: false,
@@ -709,6 +715,7 @@ pub(crate) fn build_mock_pollers(mut cfg: MockPollCfg) -> MocksHolder {
709715
let outstanding = outstanding_wf_task_tokens.clone();
710716
cfg.mock_client
711717
.expect_respond_legacy_query()
718+
.withf(cfg.expect_legacy_query_matcher)
712719
.times::<TimesRange>(cfg.num_expected_legacy_query_resps.into())
713720
.returning(move |tt, _| {
714721
outstanding.release_token(&tt);

core/src/worker/client.rs

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Worker-specific client needs
22
33
pub(crate) mod mocks;
4+
use crate::protosext::legacy_query_failure;
45
use parking_lot::RwLock;
56
use std::{sync::Arc, time::Duration};
67
use temporal_client::{
@@ -10,7 +11,7 @@ use temporal_client::{
1011
use temporal_sdk_core_api::worker::WorkerVersioningStrategy;
1112
use temporal_sdk_core_protos::{
1213
TaskToken,
13-
coresdk::workflow_commands::QueryResult,
14+
coresdk::{workflow_commands::QueryResult, workflow_completion},
1415
temporal::api::{
1516
command::v1::Command,
1617
common::v1::{
@@ -34,6 +35,11 @@ use tonic::IntoRequest;
3435

3536
type Result<T, E = tonic::Status> = std::result::Result<T, E>;
3637

38+
pub enum LegacyQueryResult {
39+
Succeeded(QueryResult),
40+
Failed(workflow_completion::Failure),
41+
}
42+
3743
/// Contains everything a worker needs to interact with the server
3844
pub(crate) struct WorkerClientBag {
3945
replaceable_client: RwLock<RetryClient<Client>>,
@@ -197,7 +203,7 @@ pub trait WorkerClient: Sync + Send {
197203
async fn respond_legacy_query(
198204
&self,
199205
task_token: TaskToken,
200-
query_result: QueryResult,
206+
query_result: LegacyQueryResult,
201207
) -> Result<RespondQueryTaskCompletedResponse>;
202208
/// Describe the namespace
203209
async fn describe_namespace(&self) -> Result<DescribeNamespaceResponse>;
@@ -267,6 +273,7 @@ impl WorkerClient for WorkerClientBag {
267273
binary_checksum: self.binary_checksum(),
268274
worker_version_capabilities: self.worker_version_capabilities(),
269275
deployment_options: self.deployment_options(),
276+
worker_heartbeat: None,
270277
}
271278
.into_request();
272279
request.extensions_mut().insert(IsWorkerTaskLongPoll);
@@ -303,6 +310,7 @@ impl WorkerClient for WorkerClientBag {
303310
}),
304311
worker_version_capabilities: self.worker_version_capabilities(),
305312
deployment_options: self.deployment_options(),
313+
worker_heartbeat: None,
306314
}
307315
.into_request();
308316
request.extensions_mut().insert(IsWorkerTaskLongPoll);
@@ -335,6 +343,7 @@ impl WorkerClient for WorkerClientBag {
335343
identity: self.identity.clone(),
336344
worker_version_capabilities: self.worker_version_capabilities(),
337345
deployment_options: self.deployment_options(),
346+
worker_heartbeat: None,
338347
}
339348
.into_request();
340349
request.extensions_mut().insert(IsWorkerTaskLongPoll);
@@ -578,9 +587,20 @@ impl WorkerClient for WorkerClientBag {
578587
async fn respond_legacy_query(
579588
&self,
580589
task_token: TaskToken,
581-
query_result: QueryResult,
590+
query_result: LegacyQueryResult,
582591
) -> Result<RespondQueryTaskCompletedResponse> {
592+
let mut failure = None;
593+
let (query_result, cause) = match query_result {
594+
LegacyQueryResult::Succeeded(s) => (s, WorkflowTaskFailedCause::Unspecified),
595+
#[allow(deprecated)]
596+
LegacyQueryResult::Failed(f) => {
597+
let cause = f.force_cause();
598+
failure = f.failure.clone();
599+
(legacy_query_failure(f), cause)
600+
}
601+
};
583602
let (_, completed_type, query_result, error_message) = query_result.into_components();
603+
584604
Ok(self
585605
.cloned_client()
586606
.respond_query_task_completed(RespondQueryTaskCompletedRequest {
@@ -589,8 +609,8 @@ impl WorkerClient for WorkerClientBag {
589609
query_result,
590610
error_message,
591611
namespace: self.namespace.clone(),
592-
// TODO: https://github.com/temporalio/sdk-core/issues/867
593-
failure: None,
612+
failure,
613+
cause: cause.into(),
594614
})
595615
.await?
596616
.into_inner())
@@ -612,6 +632,7 @@ impl WorkerClient for WorkerClientBag {
612632
identity: self.identity.clone(),
613633
sticky_task_queue,
614634
reason: "graceful shutdown".to_string(),
635+
worker_heartbeat: None,
615636
};
616637

617638
Ok(

core/src/worker/client/mocks.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ mockall::mock! {
135135
fn respond_legacy_query<'a, 'b>(
136136
&self,
137137
task_token: TaskToken,
138-
query_result: QueryResult,
138+
query_result: LegacyQueryResult,
139139
) -> impl Future<Output = Result<RespondQueryTaskCompletedResponse>> + Send + 'b
140140
where 'a: 'b, Self: 'b;
141141

core/src/worker/workflow/machines/cancel_external_state_machine.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,9 @@ pub(super) fn new_external_cancel(
6969
namespace: workflow_execution.namespace,
7070
workflow_id: workflow_execution.workflow_id,
7171
run_id: workflow_execution.run_id,
72-
// Apparently this is effectively deprecated at this point
73-
control: "".to_string(),
7472
child_workflow_only: only_child,
7573
reason,
74+
..Default::default()
7675
},
7776
);
7877
NewMachineWithCommand {

core/src/worker/workflow/machines/child_workflow_state_machine.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -714,7 +714,7 @@ impl WFMachinesAdapter for ChildWorkflowMachine {
714714
run_id: self.shared_state.run_id.clone(),
715715
child_workflow_only: true,
716716
reason,
717-
control: "".to_string(),
717+
..Default::default()
718718
}
719719
.into(),
720720
))

core/src/worker/workflow/machines/nexus_operation_state_machine.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -628,8 +628,8 @@ impl NexusOperationMachine {
628628
endpoint: self.shared_state.endpoint.clone(),
629629
service: self.shared_state.service.clone(),
630630
operation: self.shared_state.operation.clone(),
631-
operation_id: "".to_string(),
632631
operation_token: operation_token.clone().unwrap_or_default(),
632+
..Default::default()
633633
},
634634
)),
635635
..Default::default()

core/src/worker/workflow/machines/signal_external_state_machine.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,8 @@ pub(super) fn new_external_signal(
104104
},
105105
signal_name: attrs.signal_name,
106106
input: attrs.args.into_payloads(),
107-
// Is deprecated
108-
control: "".to_string(),
109107
child_workflow_only: only_child,
108+
..Default::default()
110109
},
111110
);
112111
Ok(NewMachineWithCommand {

0 commit comments

Comments
 (0)