Skip to content

Commit 00b5507

Browse files
authored
Evictions in their own activations (#712)
1 parent d0a5949 commit 00b5507

File tree

8 files changed

+69
-130
lines changed

8 files changed

+69
-130
lines changed

.github/workflows/per-pr.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ jobs:
2222
- uses: actions-rs/toolchain@v1
2323
with:
2424
profile: minimal
25-
toolchain: 1.74.0
25+
toolchain: 1.77.0
2626
override: true
2727
- name: Install protoc
2828
uses: arduino/setup-protoc@v1
@@ -55,7 +55,7 @@ jobs:
5555
- uses: actions-rs/toolchain@v1
5656
with:
5757
profile: minimal
58-
toolchain: 1.74.0
58+
toolchain: 1.77.0
5959
override: true
6060
- name: Install protoc
6161
uses: arduino/setup-protoc@v1
@@ -85,7 +85,7 @@ jobs:
8585
- uses: actions-rs/toolchain@v1
8686
with:
8787
profile: minimal
88-
toolchain: 1.74.0
88+
toolchain: 1.77.0
8989
override: true
9090
- name: Install protoc
9191
uses: arduino/setup-protoc@v1

core/src/telemetry/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ impl TelemetryInstance {
121121
}
122122

123123
thread_local! {
124-
static SUB_GUARD: RefCell<Option<tracing::subscriber::DefaultGuard>> = RefCell::new(None);
124+
static SUB_GUARD: RefCell<Option<tracing::subscriber::DefaultGuard>> =
125+
const { RefCell::new(None) };
125126
}
126127
/// Set the trace subscriber for the current thread. This must be done in every thread which uses
127128
/// core stuff, otherwise traces/logs will not be collected on that thread. For example, if using

core/src/test_help/mod.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use temporal_sdk_core_api::{
3737
};
3838
use temporal_sdk_core_protos::{
3939
coresdk::{
40-
workflow_activation::WorkflowActivation,
40+
workflow_activation::{workflow_activation_job, WorkflowActivation},
4141
workflow_commands::workflow_command,
4242
workflow_completion::{self, workflow_activation_completion, WorkflowActivationCompletion},
4343
},
@@ -831,18 +831,23 @@ pub(crate) async fn poll_and_reply_clears_outstanding_evicts<'a>(
831831
}
832832

833833
let mut res = worker.poll_workflow_activation().await.unwrap();
834-
let contains_eviction = res.eviction_index();
834+
if res.jobs.iter().any(|j| {
835+
matches!(
836+
j.variant,
837+
Some(workflow_activation_job::Variant::RemoveFromCache(_))
838+
)
839+
}) && res.jobs.len() > 1
840+
{
841+
panic!("Saw an activation with an eviction & other work! {res:?}");
842+
}
843+
let is_eviction = res.is_only_eviction();
835844

836845
let mut do_release = false;
837-
if let Some(eviction_job_ix) = contains_eviction {
838-
// If the job list has an eviction, make sure it was the last item in the list
839-
// then remove it, since in the tests we don't explicitly specify evict assertions
840-
assert_eq!(
841-
eviction_job_ix,
842-
res.jobs.len() - 1,
843-
"Eviction job was not last job in job list"
844-
);
845-
res.jobs.remove(eviction_job_ix);
846+
847+
if is_eviction {
848+
// If the job is an eviction, clear it, since in the tests we don't explicitly
849+
// specify evict assertions
850+
res.jobs.clear();
846851
do_release = true;
847852
}
848853

@@ -873,7 +878,7 @@ pub(crate) async fn poll_and_reply_clears_outstanding_evicts<'a>(
873878
}
874879
// Restart assertions from the beginning if it was an eviction (and workflow execution
875880
// isn't over)
876-
if contains_eviction.is_some() && !ends_execution {
881+
if is_eviction && !ends_execution {
877882
continue 'outer;
878883
}
879884

core/src/worker/workflow/managed_run.rs

Lines changed: 25 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use temporal_sdk_core_protos::{
3131
coresdk::{
3232
workflow_activation::{
3333
create_evict_activation, query_to_job, remove_from_cache::EvictionReason,
34-
workflow_activation_job, RemoveFromCache, WorkflowActivation,
34+
workflow_activation_job, WorkflowActivation,
3535
},
3636
workflow_commands::{FailWorkflowExecution, QueryResult},
3737
workflow_completion,
@@ -347,15 +347,8 @@ impl ManagedRun {
347347
}
348348
}
349349
if let Some(wte) = self.trying_to_evict.clone() {
350-
let mut act = self.wfm.machines.get_wf_activation();
351-
// No other jobs make any sense to send if we encountered an error.
352-
if self.am_broken {
353-
act.jobs = vec![];
354-
}
355-
act.append_evict_job(RemoveFromCache {
356-
message: wte.message,
357-
reason: wte.reason as i32,
358-
});
350+
let act =
351+
create_evict_activation(self.run_id().to_string(), wte.message, wte.reason);
359352
Ok(Some(ActivationOrAuto::LangActivation(act)))
360353
} else {
361354
Ok(None)
@@ -375,7 +368,7 @@ impl ManagedRun {
375368
used_flags: Vec<u32>,
376369
resp_chan: Option<oneshot::Sender<ActivationCompleteResult>>,
377370
) -> Result<RunUpdateAct, NextPageReq> {
378-
let activation_was_only_eviction = self.activation_has_only_eviction();
371+
let activation_was_only_eviction = self.activation_is_eviction();
379372
let (task_token, has_pending_query, start_time) = if let Some(entry) = self.wft.as_ref() {
380373
(
381374
entry.info.task_token.clone(),
@@ -444,15 +437,14 @@ impl ManagedRun {
444437
.collect();
445438

446439
if activation_was_only_eviction && !commands.is_empty() {
447-
dbg_panic!("Reply to an eviction only containing an eviction included commands");
440+
dbg_panic!("Reply to an eviction included commands");
448441
}
449442

450443
let rac = RunActivationCompletion {
451444
task_token,
452445
start_time,
453446
commands,
454-
activation_was_eviction: self.activation_has_eviction(),
455-
activation_was_only_eviction,
447+
activation_was_eviction: self.activation_is_eviction(),
456448
has_pending_query,
457449
query_responses,
458450
used_flags,
@@ -623,7 +615,8 @@ impl ManagedRun {
623615
) -> (bool, BufferedTasks) {
624616
let evict = if self.activation().map(pred).unwrap_or_default() {
625617
let act = self.activation.take();
626-
act.map(|a| a.has_eviction()).unwrap_or_default()
618+
act.map(|a| matches!(a, OutstandingActivation::Eviction))
619+
.unwrap_or_default()
627620
} else {
628621
false
629622
};
@@ -655,14 +648,14 @@ impl ManagedRun {
655648
task_token: completion.task_token,
656649
query_responses: completion.query_responses,
657650
has_pending_query: completion.has_pending_query,
658-
activation_was_only_eviction: completion.activation_was_only_eviction,
651+
activation_was_eviction: completion.activation_was_eviction,
659652
};
660653

661654
self.wfm.machines.add_lang_used_flags(completion.used_flags);
662655

663-
// If this is just bookkeeping after a reply to an only-eviction activation, we can bypass
656+
// If this is just bookkeeping after a reply to an eviction activation, we can bypass
664657
// everything, since there is no reason to continue trying to update machines.
665-
if completion.activation_was_only_eviction {
658+
if completion.activation_was_eviction {
666659
return Ok(Some(self.prepare_complete_resp(
667660
completion.resp_chan,
668661
data,
@@ -793,11 +786,9 @@ impl ManagedRun {
793786
self.trying_to_evict.is_some()
794787
};
795788
let act_work = if ignore_evicts {
796-
if let Some(ref act) = self.activation {
797-
!act.has_only_eviction()
798-
} else {
799-
false
800-
}
789+
self.activation
790+
.map(|a| !matches!(a, OutstandingActivation::Eviction))
791+
.unwrap_or_default()
801792
} else {
802793
self.activation.is_some()
803794
};
@@ -854,7 +845,7 @@ impl ManagedRun {
854845
return EvictionRequestResult::EvictionRequested(attempts, run_upd);
855846
}
856847

857-
if !self.activation_has_eviction() && self.trying_to_evict.is_none() {
848+
if !self.activation_is_eviction() && self.trying_to_evict.is_none() {
858849
debug!(run_id=%info.run_id, reason=%info.message, "Eviction requested");
859850
self.trying_to_evict = Some(info);
860851
EvictionRequestResult::EvictionRequested(attempts, self.check_more_activations())
@@ -990,13 +981,12 @@ impl ManagedRun {
990981
fn insert_outstanding_activation(&mut self, act: &ActivationOrAuto) {
991982
let act_type = match &act {
992983
ActivationOrAuto::LangActivation(act) | ActivationOrAuto::ReadyForQueries(act) => {
993-
if act.is_legacy_query() {
984+
if act.is_only_eviction() {
985+
OutstandingActivation::Eviction
986+
} else if act.is_legacy_query() {
994987
OutstandingActivation::LegacyQuery
995988
} else {
996-
OutstandingActivation::Normal {
997-
contains_eviction: act.eviction_index().is_some(),
998-
num_jobs: act.jobs.len(),
999-
}
989+
OutstandingActivation::Normal
1000990
}
1001991
}
1002992
ActivationOrAuto::Autocomplete { .. } | ActivationOrAuto::AutoFail { .. } => {
@@ -1021,7 +1011,7 @@ impl ManagedRun {
10211011
due_to_heartbeat_timeout: bool,
10221012
) -> FulfillableActivationComplete {
10231013
let mut machines_wft_response = self.wfm.prepare_for_wft_response();
1024-
if data.activation_was_only_eviction
1014+
if data.activation_was_eviction
10251015
&& (machines_wft_response.commands().peek().is_some()
10261016
|| machines_wft_response.has_messages())
10271017
&& !self.am_broken
@@ -1045,7 +1035,7 @@ impl ManagedRun {
10451035
let should_respond = !(machines_wft_response.has_pending_jobs
10461036
|| machines_wft_response.replaying
10471037
|| is_query_playback
1048-
|| data.activation_was_only_eviction
1038+
|| data.activation_was_eviction
10491039
|| machines_wft_response.have_seen_terminal_event);
10501040
// If there are pending LA resolutions, and we're responding to a query here,
10511041
// we want to make sure to force a new task, as otherwise once we tell lang about
@@ -1058,7 +1048,7 @@ impl ManagedRun {
10581048
let outcome = if should_respond || has_query_responses {
10591049
// If we broke there could be commands or messages in the pipe that we didn't
10601050
// get a chance to handle properly during replay. Don't send them.
1061-
let (commands, messages) = if self.am_broken && data.activation_was_only_eviction {
1051+
let (commands, messages) = if self.am_broken && data.activation_was_eviction {
10621052
(vec![], vec![])
10631053
} else {
10641054
(
@@ -1168,15 +1158,9 @@ impl ManagedRun {
11681158
self.wfm.machines.last_processed_event
11691159
}
11701160

1171-
fn activation_has_eviction(&mut self) -> bool {
1172-
self.activation
1173-
.map(OutstandingActivation::has_eviction)
1174-
.unwrap_or_default()
1175-
}
1176-
1177-
fn activation_has_only_eviction(&mut self) -> bool {
1161+
fn activation_is_eviction(&mut self) -> bool {
11781162
self.activation
1179-
.map(OutstandingActivation::has_only_eviction)
1163+
.map(|a| matches!(a, OutstandingActivation::Eviction))
11801164
.unwrap_or_default()
11811165
}
11821166

@@ -1245,7 +1229,7 @@ struct CompletionDataForWFT {
12451229
task_token: TaskToken,
12461230
query_responses: Vec<QueryResult>,
12471231
has_pending_query: bool,
1248-
activation_was_only_eviction: bool,
1232+
activation_was_eviction: bool,
12491233
}
12501234

12511235
/// Manages an instance of a [WorkflowMachines], which is not thread-safe, as well as other data
@@ -1385,7 +1369,6 @@ struct RunActivationCompletion {
13851369
start_time: Instant,
13861370
commands: Vec<WFCommand>,
13871371
activation_was_eviction: bool,
1388-
activation_was_only_eviction: bool,
13891372
has_pending_query: bool,
13901373
query_responses: Vec<QueryResult>,
13911374
used_flags: Vec<u32>,

core/src/worker/workflow/mod.rs

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -817,39 +817,15 @@ impl OutstandingTask {
817817
#[derive(Copy, Clone, Debug)]
818818
pub(crate) enum OutstandingActivation {
819819
/// A normal activation with a joblist
820-
Normal {
821-
/// True if there is an eviction in the joblist
822-
contains_eviction: bool,
823-
/// Number of jobs in the activation
824-
num_jobs: usize,
825-
},
820+
Normal,
821+
/// An eviction job
822+
Eviction,
826823
/// An activation for a legacy query
827824
LegacyQuery,
828825
/// A fake activation which is never sent to lang, but used internally
829826
Autocomplete,
830827
}
831828

832-
impl OutstandingActivation {
833-
pub(crate) const fn has_only_eviction(self) -> bool {
834-
matches!(
835-
self,
836-
OutstandingActivation::Normal {
837-
contains_eviction: true,
838-
num_jobs: nj
839-
}
840-
if nj == 1)
841-
}
842-
pub(crate) const fn has_eviction(self) -> bool {
843-
matches!(
844-
self,
845-
OutstandingActivation::Normal {
846-
contains_eviction: true,
847-
..
848-
}
849-
)
850-
}
851-
}
852-
853829
/// Contains important information about a given workflow task that we need to memorize while
854830
/// lang handles it.
855831
#[derive(Clone, Debug)]

fsm/rustfsm_procmacro/tests/trybuild/no_handle_conversions_require_into_fail.stderr

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ error[E0277]: the trait bound `One: From<Two>` is not satisfied
22
--> tests/trybuild/no_handle_conversions_require_into_fail.rs:11:5
33
|
44
11 | Two --(B)--> One;
5-
| ^^^ the trait `From<Two>` is not implemented for `One`
5+
| ^^^ the trait `From<Two>` is not implemented for `One`, which is required by `Two: Into<One>`
66
|
77
= note: required for `Two` to implement `Into<One>`
88
note: required by a bound in `TransitionResult::<Sm, Ds>::from`

sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import "temporal/sdk/core/common/common.proto";
3030
// * Signal and update handlers should be invoked before workflow routines are iterated. That is to
3131
// say before the users' main workflow function and anything spawned by it is allowed to continue.
3232
// * Queries always go last (and, in fact, always come in their own activation)
33+
// * Evictions also always come in their own activation
3334
//
3435
// The downside of this reordering is that a signal or update handler may not observe that some
3536
// other event had already happened (ex: an activity completed) when it is first invoked, though it
@@ -41,11 +42,9 @@ import "temporal/sdk/core/common/common.proto";
4142
//
4243
// ## Evictions
4344
//
44-
// Activations that contain only a `remove_from_cache` job should not cause the workflow code
45-
// to be invoked and may be responded to with an empty command list. Eviction jobs may also
46-
// appear with other jobs, but will always appear last in the job list. In this case it is
47-
// expected that the workflow code will be invoked, and the response produced as normal, but
48-
// the caller should evict the run after doing so.
45+
// Evictions appear as an activations that contains only a `remove_from_cache` job. Such activations
46+
// should not cause the workflow code to be invoked and may be responded to with an empty command
47+
// list.
4948
message WorkflowActivation {
5049
// The id of the currently active run of the workflow. Also used as a cache key. There may
5150
// only ever be one active workflow task (and hence activation) of a run at one time.
@@ -81,7 +80,8 @@ message WorkflowActivationJob {
8180
FireTimer fire_timer = 2;
8281
// Workflow was reset. The randomness seed must be updated.
8382
UpdateRandomSeed update_random_seed = 4;
84-
// A request to query the workflow was received.
83+
// A request to query the workflow was received. It is guaranteed that queries (one or more)
84+
// always come in their own activation after other mutating jobs.
8585
QueryWorkflow query_workflow = 5;
8686
// A request to cancel the workflow was received.
8787
CancelWorkflow cancel_workflow = 6;
@@ -103,11 +103,9 @@ message WorkflowActivationJob {
103103
ResolveRequestCancelExternalWorkflow resolve_request_cancel_external_workflow = 13;
104104
// A request to handle a workflow update.
105105
DoUpdate do_update = 14;
106-
// Remove the workflow identified by the [WorkflowActivation] containing this job from the cache
107-
// after performing the activation.
108-
//
109-
// If other job variant are present in the list, this variant will be the last job in the
110-
// job list. The string value is a reason for eviction.
106+
// Remove the workflow identified by the [WorkflowActivation] containing this job from the
107+
// cache after performing the activation. It is guaranteed that this will be the only job
108+
// in the activation if present.
111109
RemoveFromCache remove_from_cache = 50;
112110
}
113111
}
@@ -208,7 +206,7 @@ message ResolveChildWorkflowExecutionStartFailure {
208206
// `failure` should be ChildWorkflowFailure with cause set to CancelledFailure.
209207
// The failure is constructed in core for lang's convenience.
210208
message ResolveChildWorkflowExecutionStartCancelled {
211-
temporal.api.failure.v1.Failure failure = 1;
209+
temporal.api.failure.v1.Failure failure = 1;
212210
}
213211

214212
// Notify a workflow that a child workflow execution has been resolved

0 commit comments

Comments
 (0)