Skip to content

Commit 6560876

Browse files
Fix wft poll balancing with maxpoll=2 (#941)
Co-authored-by: Spencer Judge <spencer@temporal.io>
1 parent d7f2273 commit 6560876

File tree

3 files changed

+207
-21
lines changed

3 files changed

+207
-21
lines changed

.github/workflows/per-pr.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,15 @@ jobs:
6363
# TODO: Upgrade proto once https://github.com/arduino/setup-protoc/issues/99 is fixed
6464
version: '23.x'
6565
repo-token: ${{ secrets.GITHUB_TOKEN }}
66+
# Workaround for https://github.com/actions/runner-images/issues/12432
67+
# from https://github.com/rust-lang/rust/issues/141626#issuecomment-2919419236
68+
# Visual Studio bug tracker https://developercommunity.visualstudio.com/t/Regression-from-1943:-linkexe-crashes/10912960
69+
- name: Setup RUSTFLAGS (Windows)
70+
if: runner.os == 'Windows'
71+
uses: actions/github-script@v7
72+
with:
73+
script: |
74+
core.exportVariable('RUSTFLAGS', '-Csymbol-mangling-version=v0');
6675
- uses: Swatinem/rust-cache@v2
6776
- run: cargo test -- --include-ignored --nocapture
6877
- uses: actions/upload-artifact@v4

core/src/core_tests/workflow_tasks.rs

Lines changed: 151 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{
2-
Worker, advance_fut,
2+
PollWorkflowOptions, Worker, advance_fut,
33
internal_flags::CoreInternalFlags,
44
job_assert,
55
replay::TestHistoryBuilder,
@@ -3150,3 +3150,153 @@ async fn pass_timer_summary_to_metadata() {
31503150
.unwrap();
31513151
worker.run_until_done().await.unwrap();
31523152
}
3153+
3154+
#[tokio::test]
3155+
async fn both_normal_and_sticky_pollers_poll_concurrently() {
3156+
struct Counters {
3157+
// How many time PollWorkflowTaskQueue has been called
3158+
normal_poll_count: AtomicUsize,
3159+
sticky_poll_count: AtomicUsize,
3160+
3161+
// How many pollers are currently active (i.e. PollWorkflowTaskQueue
3162+
// has been called, but not the corresponding CompleteWorkflowTask)
3163+
normal_slots_active_count: AtomicUsize,
3164+
sticky_slots_active_count: AtomicUsize,
3165+
3166+
// Max number of pollers that were active at the same time
3167+
max_total_slots_active_count: AtomicUsize,
3168+
max_normal_slots_active_count: AtomicUsize,
3169+
max_sticky_slots_active_count: AtomicUsize,
3170+
}
3171+
3172+
let counters = Arc::new(Counters {
3173+
normal_poll_count: AtomicUsize::new(0),
3174+
sticky_poll_count: AtomicUsize::new(0),
3175+
normal_slots_active_count: AtomicUsize::new(0),
3176+
sticky_slots_active_count: AtomicUsize::new(0),
3177+
max_total_slots_active_count: AtomicUsize::new(0),
3178+
max_normal_slots_active_count: AtomicUsize::new(0),
3179+
max_sticky_slots_active_count: AtomicUsize::new(0),
3180+
});
3181+
3182+
// Create actual workflow task responses to return from polls
3183+
let mut task_responses = (1..100).map(|i| {
3184+
hist_to_poll_resp(
3185+
&canned_histories::single_timer(&format!("timer-{i}")),
3186+
format!("wf-{i}"),
3187+
1.into(),
3188+
)
3189+
.resp
3190+
});
3191+
3192+
let mut mock_client = mock_workflow_client();
3193+
3194+
// Track normal vs sticky poll requests and return actual workflow tasks
3195+
let cc = Arc::clone(&counters);
3196+
mock_client
3197+
.expect_poll_workflow_task()
3198+
.returning(move |_, opts: PollWorkflowOptions| {
3199+
let mut task_response = task_responses.next().unwrap_or_default();
3200+
3201+
// FIXME: Atomics initially made sense, but this has grown ugly, and there's probably
3202+
// cases where this may produce incorrect results due to race in operation ordering
3203+
// (really didn't put any thought into this). We also can't have
3204+
if opts.sticky_queue_name.is_none() {
3205+
// Normal queue poll
3206+
cc.normal_poll_count.fetch_add(1, Ordering::Relaxed);
3207+
cc.normal_slots_active_count.fetch_add(1, Ordering::Relaxed);
3208+
cc.max_normal_slots_active_count.fetch_max(
3209+
cc.normal_slots_active_count.load(Ordering::Relaxed),
3210+
Ordering::AcqRel,
3211+
);
3212+
cc.max_total_slots_active_count.fetch_max(
3213+
cc.normal_slots_active_count.load(Ordering::Relaxed)
3214+
+ cc.sticky_slots_active_count.load(Ordering::Relaxed),
3215+
Ordering::AcqRel,
3216+
);
3217+
3218+
task_response.task_token = [task_response.task_token, b"normal".to_vec()].concat();
3219+
} else {
3220+
// Sticky queue poll
3221+
cc.sticky_poll_count.fetch_add(1, Ordering::Relaxed);
3222+
cc.sticky_slots_active_count.fetch_add(1, Ordering::Relaxed);
3223+
cc.max_sticky_slots_active_count.fetch_max(
3224+
cc.sticky_slots_active_count.load(Ordering::Acquire),
3225+
Ordering::AcqRel,
3226+
);
3227+
cc.max_total_slots_active_count.fetch_max(
3228+
cc.normal_slots_active_count.load(Ordering::Relaxed)
3229+
+ cc.sticky_slots_active_count.load(Ordering::Relaxed),
3230+
Ordering::AcqRel,
3231+
);
3232+
3233+
task_response.task_token = [task_response.task_token, b"sticky".to_vec()].concat();
3234+
}
3235+
3236+
// Return actual workflow task responses
3237+
Ok(task_response)
3238+
});
3239+
3240+
let cc = Arc::clone(&counters);
3241+
mock_client
3242+
.expect_complete_workflow_task()
3243+
.returning(move |completion| {
3244+
if completion.task_token.0.ends_with(b"normal") {
3245+
cc.normal_slots_active_count.fetch_sub(1, Ordering::Relaxed);
3246+
} else {
3247+
cc.sticky_slots_active_count.fetch_sub(1, Ordering::Relaxed);
3248+
}
3249+
Ok(Default::default())
3250+
});
3251+
3252+
let worker = Worker::new(
3253+
test_worker_cfg()
3254+
.max_cached_workflows(500_usize) // We need cache, but don't want to deal with evictions
3255+
.max_outstanding_workflow_tasks(2_usize)
3256+
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(2_usize))
3257+
.nonsticky_to_sticky_poll_ratio(0.2)
3258+
.no_remote_activities(true)
3259+
.build()
3260+
.unwrap(),
3261+
Some("stickytq".to_string()),
3262+
Arc::new(mock_client),
3263+
None,
3264+
);
3265+
3266+
for _ in 1..50 {
3267+
let activation = worker.poll_workflow_activation().await.unwrap();
3268+
let _ = worker
3269+
.complete_workflow_activation(WorkflowActivationCompletion::empty(activation.run_id))
3270+
.await;
3271+
}
3272+
3273+
assert!(
3274+
counters.normal_poll_count.load(Ordering::Relaxed) > 0,
3275+
"Normal poller should have been called at least once"
3276+
);
3277+
assert!(
3278+
counters.sticky_poll_count.load(Ordering::Relaxed) > 0,
3279+
"Sticky poller should have been called at least once"
3280+
);
3281+
assert!(
3282+
counters
3283+
.max_normal_slots_active_count
3284+
.load(Ordering::Relaxed)
3285+
>= 1,
3286+
"Normal poller should have been active at least once"
3287+
);
3288+
assert!(
3289+
counters
3290+
.max_sticky_slots_active_count
3291+
.load(Ordering::Relaxed)
3292+
>= 1,
3293+
"Sticky poller should have been active at least once"
3294+
);
3295+
assert_eq!(
3296+
counters
3297+
.max_total_slots_active_count
3298+
.load(Ordering::Relaxed),
3299+
2,
3300+
"At peak, there should be exactly 2 pollers active at the same time"
3301+
);
3302+
}

core/src/worker/workflow/wft_poller.rs

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -102,16 +102,6 @@ impl WFTPollerShared {
102102
/// Makes either the sticky or non-sticky poller wait pre-permit-acquisition so that we can
103103
/// balance which kind of queue we poll appropriately.
104104
pub(crate) async fn wait_if_needed(&self, is_sticky: bool) {
105-
// If there's a sticky backlog, prioritize it.
106-
if !is_sticky {
107-
let _ = self
108-
.last_seen_sticky_backlog
109-
.0
110-
.clone()
111-
.wait_for(|v| *v == 0)
112-
.await;
113-
}
114-
115105
// We need to make sure there's at least one poller of both kinds available. So, we check
116106
// that we won't end up using every available permit with one kind of poller. In practice
117107
// this is only ever likely to be an issue with very small numbers of slots.
@@ -121,26 +111,63 @@ impl WFTPollerShared {
121111
{
122112
let mut sticky_active = sticky_active.clone();
123113
let mut non_sticky_active = non_sticky_active.clone();
114+
let mut sticky_backlog = self.last_seen_sticky_backlog.0.clone();
115+
124116
loop {
125117
let num_sticky_active = *sticky_active.borrow_and_update();
126118
let num_non_sticky_active = *non_sticky_active.borrow_and_update();
127-
let both_are_zero = num_sticky_active == 0 && num_non_sticky_active == 0;
128-
if both_are_zero {
119+
let num_sticky_backlog = *sticky_backlog.borrow_and_update();
120+
121+
let allow = || {
129122
if !is_sticky {
130-
break;
123+
// There should always be at least one non-sticky poller.
124+
if num_non_sticky_active == 0 {
125+
return true;
126+
}
127+
128+
// Do not allow an additional non-sticky poller to prevent starting a first sticky poller.
129+
if num_sticky_active == 0 && num_non_sticky_active + 1 >= max_slots {
130+
return false;
131+
}
132+
133+
// If there's a meaningful sticky backlog, prioritize sticky.
134+
if num_sticky_backlog > 1 && num_sticky_backlog > num_sticky_active {
135+
return false;
136+
}
137+
} else {
138+
// There should always be at least one sticky poller.
139+
if num_sticky_active == 0 {
140+
return true;
141+
}
142+
143+
// Do not allow an additional sticky poller to prevent starting a first non-sticky poller.
144+
if num_non_sticky_active == 0 && num_sticky_active + 1 >= max_slots {
145+
return false;
146+
}
147+
148+
// If there's a meaningful sticky backlog, prioritize sticky.
149+
if num_sticky_backlog > 1 && num_sticky_backlog > num_sticky_active {
150+
return true;
151+
}
131152
}
132-
} else {
133-
let would_exceed_max_slots =
134-
(num_sticky_active + num_non_sticky_active + 1) >= max_slots;
135-
let must_wait = would_exceed_max_slots
136-
&& (num_sticky_active == 0 || num_non_sticky_active == 0);
137-
if !must_wait {
138-
break;
153+
154+
// Just balance the two poller types.
155+
// FIXME: Do we need anything more here, to ensure proper balancing?
156+
if num_sticky_active + num_non_sticky_active < max_slots {
157+
return true;
139158
}
159+
160+
false
161+
};
162+
163+
if allow() {
164+
return;
140165
}
166+
141167
tokio::select! {
142168
_ = sticky_active.changed() => (),
143169
_ = non_sticky_active.changed() => (),
170+
_ = sticky_backlog.changed() => (),
144171
}
145172
}
146173
}

0 commit comments

Comments
 (0)