Skip to content

Commit 4d82754

Browse files
authored
Fix bad balacing between sticky & nonsticky WFT polling with low slot/poller counts (#925)
1 parent cffd778 commit 4d82754

File tree

8 files changed

+241
-146
lines changed

8 files changed

+241
-146
lines changed

core-api/src/worker.rs

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ pub struct WorkerConfig {
4040
#[builder(setter(into = false, strip_option), default)]
4141
pub tuner: Option<Arc<dyn WorkerTuner + Send + Sync>>,
4242
/// Maximum number of concurrent poll workflow task requests we will perform at a time on this
43-
/// worker's task queue. See also [WorkerConfig::nonsticky_to_sticky_poll_ratio]. Must be at
44-
/// least 1.
43+
/// worker's task queue. See also [WorkerConfig::nonsticky_to_sticky_poll_ratio].
44+
/// If using SimpleMaximum, Must be at least 2 when `max_cached_workflows` > 0, or is an error.
4545
#[builder(default = "PollerBehavior::SimpleMaximum(5)")]
4646
pub workflow_task_poller_behavior: PollerBehavior,
4747
/// Only applies when using [PollerBehavior::SimpleMaximum]
@@ -135,7 +135,8 @@ pub struct WorkerConfig {
135135

136136
/// The maximum allowed number of workflow tasks that will ever be given to this worker at one
137137
/// time. Note that one workflow task may require multiple activations - so the WFT counts as
138-
/// "outstanding" until all activations it requires have been completed.
138+
/// "outstanding" until all activations it requires have been completed. Must be at least 2 if
139+
/// `max_cached_workflows` is > 0, or is an error.
139140
///
140141
/// Mutually exclusive with `tuner`
141142
#[builder(setter(into, strip_option), default)]
@@ -224,6 +225,41 @@ impl WorkerConfigBuilder {
224225
}
225226
}
226227

228+
if matches!(self.max_outstanding_workflow_tasks.as_ref(), Some(Some(v)) if *v == 0) {
229+
return Err("`max_outstanding_workflow_tasks` must be > 0".to_owned());
230+
}
231+
if matches!(self.max_outstanding_activities.as_ref(), Some(Some(v)) if *v == 0) {
232+
return Err("`max_outstanding_activities` must be > 0".to_owned());
233+
}
234+
if matches!(self.max_outstanding_local_activities.as_ref(), Some(Some(v)) if *v == 0) {
235+
return Err("`max_outstanding_local_activities` must be > 0".to_owned());
236+
}
237+
if matches!(self.max_outstanding_nexus_tasks.as_ref(), Some(Some(v)) if *v == 0) {
238+
return Err("`max_outstanding_nexus_tasks` must be > 0".to_owned());
239+
}
240+
241+
if let Some(cache) = self.max_cached_workflows.as_ref() {
242+
if *cache > 0 {
243+
if let Some(Some(max_wft)) = self.max_outstanding_workflow_tasks.as_ref() {
244+
if *max_wft < 2 {
245+
return Err(
246+
"`max_cached_workflows` > 0 requires `max_outstanding_workflow_tasks` >= 2"
247+
.to_owned(),
248+
);
249+
}
250+
}
251+
if let Some(b) = self.workflow_task_poller_behavior.as_ref() {
252+
if matches!(b, PollerBehavior::SimpleMaximum(u) if *u < 2) {
253+
return Err(
254+
"`max_cached_workflows` > 0 requires `workflow_task_poller_behavior` to be at least 2"
255+
.to_owned(),
256+
);
257+
}
258+
b.validate()?
259+
}
260+
}
261+
}
262+
227263
if self.tuner.is_some()
228264
&& (self.max_outstanding_workflow_tasks.is_some()
229265
|| self.max_outstanding_activities.is_some()
@@ -264,7 +300,9 @@ impl WorkerConfigBuilder {
264300
/// This trait allows users to customize the performance characteristics of workers dynamically.
265301
/// For more, see the docstrings of the traits in the return types of its functions.
266302
pub trait WorkerTuner {
267-
/// Return a [SlotSupplier] for workflow tasks
303+
/// Return a [SlotSupplier] for workflow tasks. Note that workflow task slot suppliers must be
304+
/// willing to hand out a minimum of one non-sticky slot and one sticky slot if workflow caching
305+
/// is enabled, otherwise the worker may fail to process new tasks.
268306
fn workflow_task_slot_supplier(
269307
&self,
270308
) -> Arc<dyn SlotSupplier<SlotKind = WorkflowSlotKind> + Send + Sync>;
@@ -483,7 +521,7 @@ pub enum PollerBehavior {
483521
/// requires a slot to be available before beginning polling.
484522
Autoscaling {
485523
/// At least this many poll calls will always be attempted (assuming slots are available).
486-
/// Cannot be less than two for workflow tasks, or one for other tasks.
524+
/// Cannot be zero.
487525
minimum: usize,
488526
/// At most this many poll calls will ever be open at once. Must be >= `minimum`.
489527
maximum: usize,

core/src/abstractions.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,35 @@ macro_rules! dbg_panic {
393393
}
394394
pub(crate) use dbg_panic;
395395

396+
pub(crate) struct ActiveCounter<F: Fn(usize)>(watch::Sender<usize>, Option<Arc<F>>);
397+
impl<F> ActiveCounter<F>
398+
where
399+
F: Fn(usize),
400+
{
401+
pub(crate) fn new(a: watch::Sender<usize>, change_fn: Option<Arc<F>>) -> Self {
402+
a.send_modify(|v| {
403+
*v += 1;
404+
if let Some(cfn) = change_fn.as_ref() {
405+
cfn(*v);
406+
}
407+
});
408+
Self(a, change_fn)
409+
}
410+
}
411+
impl<F> Drop for ActiveCounter<F>
412+
where
413+
F: Fn(usize),
414+
{
415+
fn drop(&mut self) {
416+
self.0.send_modify(|v| {
417+
*v -= 1;
418+
if let Some(cfn) = self.1.as_ref() {
419+
cfn(*v)
420+
};
421+
});
422+
}
423+
}
424+
396425
#[cfg(test)]
397426
pub(crate) mod tests {
398427
use super::*;

core/src/pollers/poll_buffer.rs

Lines changed: 22 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{
2-
abstractions::{MeteredPermitDealer, OwnedMeteredSemPermit, dbg_panic},
2+
abstractions::{ActiveCounter, MeteredPermitDealer, OwnedMeteredSemPermit, dbg_panic},
33
pollers::{self, Poller},
44
worker::{
55
WFTPollerShared,
@@ -53,7 +53,7 @@ pub(crate) struct LongPollBuffer<T, SK: SlotKind> {
5353
}
5454

5555
pub(crate) struct WorkflowTaskOptions {
56-
pub(crate) wft_poller_shared: Arc<WFTPollerShared>,
56+
pub(crate) wft_poller_shared: Option<Arc<WFTPollerShared>>,
5757
}
5858

5959
pub(crate) struct ActivityTaskOptions {
@@ -77,27 +77,27 @@ impl LongPollBuffer<PollWorkflowTaskQueueResponse, WorkflowSlotKind> {
7777
) -> Self {
7878
let is_sticky = sticky_queue.is_some();
7979
let poll_scaler = PollScaler::new(poller_behavior, num_pollers_handler, shutdown.clone());
80-
if is_sticky {
81-
options
82-
.wft_poller_shared
83-
.set_sticky_active(poll_scaler.active_rx.clone());
84-
} else {
85-
options
86-
.wft_poller_shared
87-
.set_non_sticky_active(poll_scaler.active_rx.clone());
88-
};
89-
let shared = options.wft_poller_shared.clone();
90-
let pre_permit_delay = Some(move || {
91-
let shared = shared.clone();
92-
async move {
93-
shared.wait_if_needed(is_sticky).await;
80+
if let Some(wftps) = options.wft_poller_shared.as_ref() {
81+
if is_sticky {
82+
wftps.set_sticky_active(poll_scaler.active_rx.clone());
83+
} else {
84+
wftps.set_non_sticky_active(poll_scaler.active_rx.clone());
85+
};
86+
}
87+
let pre_permit_delay = options.wft_poller_shared.clone().map(|wftps| {
88+
move || {
89+
let shared = wftps.clone();
90+
async move {
91+
shared.wait_if_needed(is_sticky).await;
92+
}
9493
}
9594
});
96-
let post_poll_fn = Some(move |t: &PollWorkflowTaskQueueResponse| {
97-
if is_sticky {
98-
options
99-
.wft_poller_shared
100-
.record_sticky_backlog(t.backlog_count_hint as usize)
95+
96+
let post_poll_fn = options.wft_poller_shared.clone().map(|wftps| {
97+
move |t: &PollWorkflowTaskQueueResponse| {
98+
if is_sticky {
99+
wftps.record_sticky_backlog(t.backlog_count_hint as usize)
100+
}
101101
}
102102
});
103103
let no_retry = if matches!(poller_behavior, PollerBehavior::Autoscaling { .. }) {
@@ -391,35 +391,6 @@ async fn handle_task_panic(t: JoinHandle<()>) {
391391
}
392392
}
393393

394-
struct ActiveCounter<F: Fn(usize)>(watch::Sender<usize>, Option<Arc<F>>);
395-
impl<F> ActiveCounter<F>
396-
where
397-
F: Fn(usize),
398-
{
399-
fn new(a: watch::Sender<usize>, change_fn: Option<Arc<F>>) -> Self {
400-
a.send_modify(|v| {
401-
*v += 1;
402-
if let Some(cfn) = change_fn.as_ref() {
403-
cfn(*v);
404-
}
405-
});
406-
Self(a, change_fn)
407-
}
408-
}
409-
impl<F> Drop for ActiveCounter<F>
410-
where
411-
F: Fn(usize),
412-
{
413-
fn drop(&mut self) {
414-
self.0.send_modify(|v| {
415-
*v -= 1;
416-
if let Some(cfn) = self.1.as_ref() {
417-
cfn(*v)
418-
};
419-
});
420-
}
421-
}
422-
423394
/// The PollScaler is responsible for managing the number of pollers based on the current load.
424395
///
425396
/// It does so by receiving suggestions from the server about whether to scale up or down. It will
@@ -753,7 +724,7 @@ mod tests {
753724
CancellationToken::new(),
754725
None::<fn(usize)>,
755726
WorkflowTaskOptions {
756-
wft_poller_shared: Arc::new(WFTPollerShared::new()),
727+
wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(10)))),
757728
},
758729
);
759730

core/src/worker/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -867,9 +867,10 @@ fn wft_poller_behavior(config: &WorkerConfig, is_sticky: bool) -> PollerBehavior
867867
config.nonsticky_to_sticky_poll_ratio,
868868
))
869869
} else {
870-
PollerBehavior::SimpleMaximum(m.saturating_sub(
871-
calc_max_nonsticky(m, config.nonsticky_to_sticky_poll_ratio).max(1),
872-
))
870+
PollerBehavior::SimpleMaximum(
871+
m.saturating_sub(calc_max_nonsticky(m, config.nonsticky_to_sticky_poll_ratio))
872+
.max(1),
873+
)
873874
}
874875
} else {
875876
config.workflow_task_poller_behavior

core/src/worker/workflow/wft_poller.rs

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,19 @@ pub(crate) fn make_wft_poller(
3131
> + Sized
3232
+ 'static {
3333
let wft_metrics = metrics.with_new_attrs([workflow_poller()]);
34-
let wft_poller_shared = Arc::new(WFTPollerShared::new());
34+
let poller_behavior = wft_poller_behavior(config, false);
35+
let wft_poller_shared = if sticky_queue_name.is_some() {
36+
Some(Arc::new(WFTPollerShared::new(
37+
wft_slots.available_permits(),
38+
)))
39+
} else {
40+
None
41+
};
3542
let wf_task_poll_buffer = LongPollBuffer::new_workflow_task(
3643
client.clone(),
3744
config.task_queue.clone(),
3845
None,
39-
wft_poller_behavior(config, false),
46+
poller_behavior,
4047
wft_slots.clone(),
4148
shutdown_token.child_token(),
4249
Some(move |np| {
@@ -74,14 +81,16 @@ pub(crate) struct WFTPollerShared {
7481
last_seen_sticky_backlog: (watch::Receiver<usize>, watch::Sender<usize>),
7582
sticky_active: OnceLock<watch::Receiver<usize>>,
7683
non_sticky_active: OnceLock<watch::Receiver<usize>>,
84+
max_slots: Option<usize>,
7785
}
7886
impl WFTPollerShared {
79-
pub(crate) fn new() -> Self {
87+
pub(crate) fn new(max_slots: Option<usize>) -> Self {
8088
let (tx, rx) = watch::channel(0);
8189
Self {
8290
last_seen_sticky_backlog: (rx, tx),
8391
sticky_active: OnceLock::new(),
8492
non_sticky_active: OnceLock::new(),
93+
max_slots,
8594
}
8695
}
8796
pub(crate) fn set_sticky_active(&self, rx: watch::Receiver<usize>) {
@@ -95,36 +104,49 @@ impl WFTPollerShared {
95104
pub(crate) async fn wait_if_needed(&self, is_sticky: bool) {
96105
// If there's a sticky backlog, prioritize it.
97106
if !is_sticky {
98-
let backlog = *self.last_seen_sticky_backlog.0.borrow();
99-
if backlog > 1 {
100-
let _ = self
101-
.last_seen_sticky_backlog
102-
.0
103-
.clone()
104-
.wait_for(|v| *v <= 1)
105-
.await;
106-
}
107+
let _ = self
108+
.last_seen_sticky_backlog
109+
.0
110+
.clone()
111+
.wait_for(|v| *v == 0)
112+
.await;
107113
}
108114

109-
// If there's no meaningful sticky backlog, balance poller counts
110-
if *self.last_seen_sticky_backlog.0.borrow() <= 1 {
115+
// We need to make sure there's at least one poller of both kinds available. So, we check
116+
// that we won't end up using every available permit with one kind of poller. In practice
117+
// this is only ever likely to be an issue with very small numbers of slots.
118+
if let Some(max_slots) = self.max_slots {
111119
if let Some((sticky_active, non_sticky_active)) =
112120
self.sticky_active.get().zip(self.non_sticky_active.get())
113121
{
114-
if is_sticky {
115-
let _ = sticky_active
116-
.clone()
117-
.wait_for(|v| *v <= *non_sticky_active.borrow())
118-
.await;
119-
} else {
120-
let _ = non_sticky_active
121-
.clone()
122-
.wait_for(|v| *v <= *sticky_active.borrow())
123-
.await;
122+
let mut sticky_active = sticky_active.clone();
123+
let mut non_sticky_active = non_sticky_active.clone();
124+
loop {
125+
let num_sticky_active = *sticky_active.borrow_and_update();
126+
let num_non_sticky_active = *non_sticky_active.borrow_and_update();
127+
let both_are_zero = num_sticky_active == 0 && num_non_sticky_active == 0;
128+
if both_are_zero {
129+
if !is_sticky {
130+
break;
131+
}
132+
} else {
133+
let would_exceed_max_slots =
134+
(num_sticky_active + num_non_sticky_active + 1) >= max_slots;
135+
let must_wait = would_exceed_max_slots
136+
&& (num_sticky_active == 0 || num_non_sticky_active == 0);
137+
if !must_wait {
138+
break;
139+
}
140+
}
141+
tokio::select! {
142+
_ = sticky_active.changed() => (),
143+
_ = non_sticky_active.changed() => (),
144+
}
124145
}
125146
}
126147
}
127148
}
149+
128150
pub(crate) fn record_sticky_backlog(&self, v: usize) {
129151
let _ = self.last_seen_sticky_backlog.1.send(v);
130152
}

tests/integ_tests/metrics_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ async fn one_slot_worker_reports_available_slot() {
154154
// Need to use two for WFTs because there are a minimum of 2 pollers b/c of sticky polling
155155
.max_outstanding_workflow_tasks(2_usize)
156156
.max_outstanding_nexus_tasks(1_usize)
157-
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(1_usize))
157+
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(2_usize))
158158
.build()
159159
.unwrap();
160160

0 commit comments

Comments
 (0)