Skip to content

Commit 764a88b

Browse files
authored
Enable failing WF on nondeterminism errors (#702)
1 parent 8013c5d commit 764a88b

File tree

10 files changed

+219
-47
lines changed

10 files changed

+219
-47
lines changed

.github/workflows/per-pr.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ concurrency:
1313
jobs:
1414
build-and-test:
1515
name: "Format, docs, and lint"
16-
timeout-minutes: 20
16+
timeout-minutes: 10
1717
runs-on: ubuntu-latest
1818
steps:
1919
- uses: actions/checkout@v2
@@ -48,6 +48,7 @@ jobs:
4848

4949
test:
5050
name: Unit Tests
51+
timeout-minutes: 10
5152
runs-on: ubuntu-latest
5253
steps:
5354
- uses: actions/checkout@v2
@@ -77,6 +78,7 @@ jobs:
7778

7879
fmt:
7980
name: Integ tests
81+
timeout-minutes: 20
8082
runs-on: ubuntu-latest
8183
steps:
8284
- uses: actions/checkout@v2

core-api/src/errors.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,11 @@ pub enum CompleteActivityError {
6060
completion: Option<ActivityExecutionResult>,
6161
},
6262
}
63+
64+
/// Errors we can encounter during workflow processing which we may treat as either WFT failures
65+
/// or whole-workflow failures depending on user preference.
66+
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
67+
pub enum WorkflowErrorType {
68+
/// A nondeterminism error
69+
Nondeterminism,
70+
}

core-api/src/worker.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use std::time::Duration;
1+
use crate::errors::WorkflowErrorType;
2+
use std::{
3+
collections::{HashMap, HashSet},
4+
time::Duration,
5+
};
26

37
const MAX_OUTSTANDING_WFT_DEFAULT: usize = 100;
48
const MAX_CONCURRENT_WFT_POLLS_DEFAULT: usize = 5;
@@ -124,6 +128,16 @@ pub struct WorkerConfig {
124128
/// timeout.
125129
#[builder(default = "Duration::from_secs(5)")]
126130
pub local_timeout_buffer_for_activities: Duration,
131+
132+
/// Any error types listed here will cause any workflow being processed by this worker to fail,
133+
/// rather than simply failing the workflow task.
134+
#[builder(default)]
135+
pub workflow_failure_errors: HashSet<WorkflowErrorType>,
136+
137+
/// Like [WorkerConfig::workflow_failure_errors], but specific to certain workflow types (the
138+
/// map key).
139+
#[builder(default)]
140+
pub workflow_types_to_failure_errors: HashMap<String, HashSet<WorkflowErrorType>>,
127141
}
128142

129143
impl WorkerConfig {
@@ -136,6 +150,20 @@ impl WorkerConfig {
136150
.saturating_sub(self.max_nonsticky_polls())
137151
.max(1)
138152
}
153+
/// Returns true if the configuration specifies we should fail a workflow on a certain error
154+
/// type rather than failing the workflow task.
155+
pub fn should_fail_workflow(
156+
&self,
157+
workflow_type: &str,
158+
error_type: &WorkflowErrorType,
159+
) -> bool {
160+
self.workflow_failure_errors.contains(error_type)
161+
|| self
162+
.workflow_types_to_failure_errors
163+
.get(workflow_type)
164+
.map(|s| s.contains(error_type))
165+
.unwrap_or(false)
166+
}
139167
}
140168

141169
impl WorkerConfigBuilder {

core/src/worker/workflow/machines/workflow_machines.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1293,21 +1293,12 @@ impl WorkflowMachines {
12931293
self.process_cancellation(CommandID::LocalActivity(attrs.seq))?;
12941294
}
12951295
WFCommand::CompleteWorkflow(attrs) => {
1296-
if !self.replaying {
1297-
self.metrics.wf_completed();
1298-
}
12991296
self.add_terminal_command(complete_workflow(attrs));
13001297
}
13011298
WFCommand::FailWorkflow(attrs) => {
1302-
if !self.replaying {
1303-
self.metrics.wf_failed();
1304-
}
13051299
self.add_terminal_command(fail_workflow(attrs));
13061300
}
13071301
WFCommand::ContinueAsNew(attrs) => {
1308-
if !self.replaying {
1309-
self.metrics.wf_continued_as_new();
1310-
}
13111302
let attrs = self.augment_continue_as_new_with_current_values(attrs);
13121303
let use_compat = self.determine_use_compatible_flag(
13131304
attrs.versioning_intent(),
@@ -1316,9 +1307,6 @@ impl WorkflowMachines {
13161307
self.add_terminal_command(continue_as_new(attrs, use_compat));
13171308
}
13181309
WFCommand::CancelWorkflow(attrs) => {
1319-
if !self.replaying {
1320-
self.metrics.wf_canceled();
1321-
}
13221310
self.add_terminal_command(cancel_workflow(attrs));
13231311
}
13241312
WFCommand::SetPatchMarker(attrs) => {

core/src/worker/workflow/managed_run.rs

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,23 @@ use std::{
2323
mem,
2424
ops::Add,
2525
rc::Rc,
26-
sync::mpsc::Sender,
26+
sync::{mpsc::Sender, Arc},
2727
time::{Duration, Instant},
2828
};
29+
use temporal_sdk_core_api::{errors::WorkflowErrorType, worker::WorkerConfig};
2930
use temporal_sdk_core_protos::{
3031
coresdk::{
3132
workflow_activation::{
3233
create_evict_activation, query_to_job, remove_from_cache::EvictionReason,
3334
workflow_activation_job, RemoveFromCache, WorkflowActivation,
3435
},
35-
workflow_commands::QueryResult,
36+
workflow_commands::{FailWorkflowExecution, QueryResult},
3637
workflow_completion,
3738
},
38-
temporal::api::{enums::v1::WorkflowTaskFailedCause, failure::v1::Failure},
39+
temporal::api::{
40+
command::v1::command::Attributes as CmdAttribs, enums::v1::WorkflowTaskFailedCause,
41+
failure::v1::Failure,
42+
},
3943
TaskToken,
4044
};
4145
use tokio::sync::oneshot;
@@ -92,6 +96,7 @@ pub(super) struct ManagedRun {
9296
/// We store the paginator used for our own run's history fetching
9397
paginator: Option<HistoryPaginator>,
9498
completion_waiting_on_page_fetch: Option<RunActivationCompletion>,
99+
config: Arc<WorkerConfig>,
95100
}
96101
impl ManagedRun {
97102
pub(super) fn new(
@@ -100,6 +105,7 @@ impl ManagedRun {
100105
local_activity_request_sink: Rc<dyn LocalActivityRequestSink>,
101106
) -> (Self, RunUpdateAct) {
102107
let metrics = basics.metrics.clone();
108+
let config = basics.worker_config.clone();
103109
let wfm = WorkflowManager::new(basics);
104110
let mut me = Self {
105111
wfm,
@@ -114,6 +120,7 @@ impl ManagedRun {
114120
metrics,
115121
paginator: None,
116122
completion_waiting_on_page_fetch: None,
123+
config,
117124
};
118125
let rua = me.incoming_wft(wft);
119126
(me, rua)
@@ -534,7 +541,6 @@ impl ManagedRun {
534541
return None;
535542
};
536543

537-
self.metrics.wf_task_failed();
538544
let message = format!("Workflow activation completion failed: {:?}", &failure);
539545
// We don't want to fail queries that could otherwise be retried
540546
let is_no_report_query_fail = self.pending_work_is_legacy_query()
@@ -570,12 +576,35 @@ impl ManagedRun {
570576
)
571577
}
572578
} else if should_report {
573-
ActivationCompleteOutcome::ReportWFTFail(FailedActivationWFTReport::Report(
574-
tt, cause, failure,
575-
))
579+
// Check if we should fail the workflow instead of the WFT because of user's preferences
580+
if matches!(cause, WorkflowTaskFailedCause::NonDeterministicError)
581+
&& self.config.should_fail_workflow(
582+
&self.wfm.machines.workflow_type,
583+
&WorkflowErrorType::Nondeterminism,
584+
)
585+
{
586+
warn!(failure=?failure, "Failing workflow due to nondeterminism error");
587+
return self
588+
.successful_completion(
589+
vec![WFCommand::FailWorkflow(FailWorkflowExecution {
590+
failure: failure.failure,
591+
})],
592+
vec![],
593+
resp_chan,
594+
)
595+
.unwrap_or_else(|e| {
596+
dbg_panic!("Got next page request when auto-failing workflow: {e:?}");
597+
None
598+
});
599+
} else {
600+
ActivationCompleteOutcome::ReportWFTFail(FailedActivationWFTReport::Report(
601+
tt, cause, failure,
602+
))
603+
}
576604
} else {
577605
ActivationCompleteOutcome::WFTFailedDontReport
578606
};
607+
self.metrics.wf_task_failed();
579608
self.reply_to_complete(outcome, resp_chan);
580609
rur
581610
}
@@ -1039,6 +1068,25 @@ impl ManagedRun {
10391068
)
10401069
};
10411070

1071+
// Record metrics for any outgoing terminal commands
1072+
for cmd in commands.iter() {
1073+
match cmd.attributes.as_ref() {
1074+
Some(CmdAttribs::CompleteWorkflowExecutionCommandAttributes(_)) => {
1075+
self.metrics.wf_completed();
1076+
}
1077+
Some(CmdAttribs::FailWorkflowExecutionCommandAttributes(_)) => {
1078+
self.metrics.wf_failed();
1079+
}
1080+
Some(CmdAttribs::ContinueAsNewWorkflowExecutionCommandAttributes(_)) => {
1081+
self.metrics.wf_continued_as_new();
1082+
}
1083+
Some(CmdAttribs::CancelWorkflowExecutionCommandAttributes(_)) => {
1084+
self.metrics.wf_canceled();
1085+
}
1086+
_ => (),
1087+
}
1088+
}
1089+
10421090
ActivationCompleteOutcome::ReportWFTSuccess(ServerCommandsWithWorkflowInfo {
10431091
task_token: data.task_token,
10441092
action: ActivationAction::WftComplete {

test-utils/src/lib.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ pub struct CoreWfStarter {
165165
/// Options to use when starting workflow(s)
166166
pub workflow_options: WorkflowOptions,
167167
initted_worker: OnceCell<InitializedWorker>,
168-
runtime_override: Option<CoreRuntime>,
168+
runtime_override: Option<Arc<CoreRuntime>>,
169169
}
170170
struct InitializedWorker {
171171
worker: Arc<dyn CoreWorker>,
@@ -197,7 +197,7 @@ impl CoreWfStarter {
197197
worker_config,
198198
initted_worker: OnceCell::new(),
199199
workflow_options: Default::default(),
200-
runtime_override,
200+
runtime_override: runtime_override.map(Arc::new),
201201
}
202202
}
203203

@@ -208,7 +208,7 @@ impl CoreWfStarter {
208208
task_queue_name: self.task_queue_name.clone(),
209209
worker_config: self.worker_config.clone(),
210210
workflow_options: self.workflow_options.clone(),
211-
runtime_override: None,
211+
runtime_override: self.runtime_override.clone(),
212212
initted_worker: Default::default(),
213213
}
214214
}
@@ -239,6 +239,7 @@ impl CoreWfStarter {
239239
self.start_wf_with_id(self.task_queue_name.clone()).await
240240
}
241241

242+
/// Starts the workflow using the worker, returns run id.
242243
pub async fn start_with_worker(
243244
&self,
244245
wf_name: impl Into<String>,
@@ -496,6 +497,19 @@ impl TestWorker {
496497
Ok(res)
497498
}
498499

500+
pub fn expect_workflow_completion(&self, wf_id: impl Into<String>, run_id: Option<String>) {
501+
self.started_workflows.lock().push(WorkflowExecutionInfo {
502+
namespace: self
503+
.client
504+
.as_ref()
505+
.map(|c| c.namespace())
506+
.unwrap_or(NAMESPACE)
507+
.to_owned(),
508+
workflow_id: wf_id.into(),
509+
run_id,
510+
});
511+
}
512+
499513
/// Runs until all expected workflows have completed
500514
pub async fn run_until_done(&mut self) -> Result<(), anyhow::Error> {
501515
self.run_until_done_intercepted(Option::<TestWorkerCompletionIceptor>::None)

tests/integ_tests/metrics_tests.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ use tokio::{join, sync::Barrier, task::AbortHandle};
3434

3535
static ANY_PORT: &str = "127.0.0.1:0";
3636

37-
async fn get_text(endpoint: String) -> String {
37+
pub async fn get_text(endpoint: String) -> String {
3838
reqwest::get(endpoint).await.unwrap().text().await.unwrap()
3939
}
4040

41-
struct AbortOnDrop {
41+
pub struct AbortOnDrop {
4242
ah: AbortHandle,
4343
}
4444
impl Drop for AbortOnDrop {
@@ -47,7 +47,7 @@ impl Drop for AbortOnDrop {
4747
}
4848
}
4949

50-
fn prom_metrics() -> (TelemetryOptions, SocketAddr, AbortOnDrop) {
50+
pub fn prom_metrics() -> (TelemetryOptions, SocketAddr, AbortOnDrop) {
5151
let mut telemopts = get_integ_telem_options();
5252
let prom_info = start_prometheus_metric_exporter(
5353
PrometheusExporterOptionsBuilder::default()

0 commit comments

Comments
 (0)