Skip to content

Commit a155eeb

Browse files
committed
Poll concurrently at will (#545)
* Unleash the pollers * Refactor things so semaphore can live inside poller * Enforce config requirement of pollers no more than slots * Make unusual panic I noticed non-fatal
1 parent a2bfb96 commit a155eeb

21 files changed

+522
-352
lines changed

core-api/src/worker.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use std::time::Duration;
22
use tokio::sync::mpsc::UnboundedSender;
33

4+
const MAX_OUTSTANDING_WFT_DEFAULT: usize = 100;
5+
const MAX_CONCURRENT_WFT_POLLS_DEFAULT: usize = 5;
6+
47
/// Defines per-worker configuration options
58
#[derive(Debug, Clone, derive_builder::Builder, serde::Serialize, serde::Deserialize)]
69
#[builder(setter(into), build_fn(validate = "Self::validate"))]
@@ -31,7 +34,7 @@ pub struct WorkerConfig {
3134
/// "outstanding" until all activations it requires have been completed.
3235
///
3336
/// Cannot be larger than `max_cached_workflows`.
34-
#[builder(default = "100")]
37+
#[builder(default = "MAX_OUTSTANDING_WFT_DEFAULT")]
3538
pub max_outstanding_workflow_tasks: usize,
3639
/// The maximum number of activity tasks that will ever be given to this worker concurrently
3740
#[builder(default = "100")]
@@ -43,7 +46,7 @@ pub struct WorkerConfig {
4346
/// Maximum number of concurrent poll workflow task requests we will perform at a time on this
4447
/// worker's task queue. See also [WorkerConfig::nonsticky_to_sticky_poll_ratio]. Must be at
4548
/// least 1.
46-
#[builder(default = "5")]
49+
#[builder(default = "MAX_CONCURRENT_WFT_POLLS_DEFAULT")]
4750
pub max_concurrent_wft_polls: usize,
4851
/// [WorkerConfig::max_concurrent_wft_polls] * this number = the number of max pollers that will
4952
/// be allowed for the nonsticky queue when sticky tasks are enabled. If both defaults are used,
@@ -161,6 +164,18 @@ impl WorkerConfigBuilder {
161164
);
162165
}
163166
}
167+
if self
168+
.max_concurrent_wft_polls
169+
.unwrap_or(MAX_CONCURRENT_WFT_POLLS_DEFAULT)
170+
> self
171+
.max_outstanding_workflow_tasks
172+
.unwrap_or(MAX_OUTSTANDING_WFT_DEFAULT)
173+
{
174+
return Err(
175+
"`max_concurrent_wft_polls` cannot exceed `max_outstanding_workflow_tasks`"
176+
.to_owned(),
177+
);
178+
}
164179
Ok(())
165180
}
166181
}

core/src/abstractions.rs

Lines changed: 7 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ pub mod take_cell;
44

55
use crate::MetricsContext;
66
use derive_more::DebugCustom;
7-
use futures::{stream, Stream, StreamExt};
87
use std::{
98
fmt::{Debug, Formatter},
109
sync::{
@@ -47,6 +46,11 @@ impl MeteredSemaphore {
4746
self.sem.available_permits()
4847
}
4948

49+
#[cfg(test)]
50+
pub fn unused_permits(&self) -> usize {
51+
self.sem.available_permits() + self.unused_claimants.load(Ordering::Acquire)
52+
}
53+
5054
pub async fn acquire_owned(&self) -> Result<OwnedMeteredSemPermit, AcquireError> {
5155
let res = self.sem.clone().acquire_owned().await?;
5256
Ok(self.build_owned(res))
@@ -114,8 +118,8 @@ impl ClosableMeteredSemaphore {
114118

115119
impl ClosableMeteredSemaphore {
116120
#[cfg(test)]
117-
pub fn available_permits(&self) -> usize {
118-
self.inner.available_permits()
121+
pub fn unused_permits(&self) -> usize {
122+
self.inner.unused_permits()
119123
}
120124

121125
/// Request to close the semaphore and prevent new permits from being acquired.
@@ -231,35 +235,6 @@ impl UsedMeteredSemPermit {
231235
}
232236
}
233237

234-
/// From the input stream, create a new stream which only pulls from the input stream when allowed.
235-
/// When allowed is determined by the passed in `proceeder` emitting an item. The input stream is
236-
/// only pulled from when that future resolves.
237-
///
238-
/// This is *almost* identical to `zip`, but does not terminate early if the input stream closes.
239-
/// The proceeder must allow the poll before the returned stream closes. If the proceeder terminates
240-
/// the overall stream will terminate.
241-
pub(crate) fn stream_when_allowed<S, AS>(
242-
input: S,
243-
proceeder: AS,
244-
) -> impl Stream<Item = (S::Item, AS::Item)>
245-
where
246-
S: Stream + Send + 'static,
247-
AS: Stream + Send + 'static,
248-
{
249-
let stream = stream::unfold(
250-
(proceeder.boxed(), input.boxed()),
251-
|(mut proceeder, mut input)| async {
252-
let v = proceeder.next().await;
253-
if let Some(v) = v {
254-
input.next().await.map(|i| ((i, v), (proceeder, input)))
255-
} else {
256-
None
257-
}
258-
},
259-
);
260-
stream
261-
}
262-
263238
macro_rules! dbg_panic {
264239
($($arg:tt)*) => {
265240
error!($($arg)*);
@@ -271,49 +246,6 @@ pub(crate) use dbg_panic;
271246
#[cfg(test)]
272247
mod tests {
273248
use super::*;
274-
use futures::pin_mut;
275-
use std::task::Poll;
276-
use tokio::sync::mpsc::unbounded_channel;
277-
use tokio_stream::wrappers::UnboundedReceiverStream;
278-
279-
#[test]
280-
fn stream_when_allowed_works() {
281-
let inputs = stream::iter([1, 2, 3]);
282-
let (allow_tx, allow_rx) = unbounded_channel();
283-
let when_allowed = stream_when_allowed(inputs, UnboundedReceiverStream::new(allow_rx));
284-
285-
let waker = futures::task::noop_waker_ref();
286-
let mut cx = std::task::Context::from_waker(waker);
287-
pin_mut!(when_allowed);
288-
289-
allow_tx.send(()).unwrap();
290-
assert_eq!(
291-
when_allowed.poll_next_unpin(&mut cx),
292-
Poll::Ready(Some((1, ())))
293-
);
294-
// Now, it won't be ready
295-
for _ in 1..10 {
296-
assert_eq!(when_allowed.poll_next_unpin(&mut cx), Poll::Pending);
297-
}
298-
allow_tx.send(()).unwrap();
299-
assert_eq!(
300-
when_allowed.poll_next_unpin(&mut cx),
301-
Poll::Ready(Some((2, ())))
302-
);
303-
for _ in 1..10 {
304-
assert_eq!(when_allowed.poll_next_unpin(&mut cx), Poll::Pending);
305-
}
306-
allow_tx.send(()).unwrap();
307-
assert_eq!(
308-
when_allowed.poll_next_unpin(&mut cx),
309-
Poll::Ready(Some((3, ())))
310-
);
311-
for _ in 1..10 {
312-
assert_eq!(when_allowed.poll_next_unpin(&mut cx), Poll::Pending);
313-
}
314-
allow_tx.send(()).unwrap();
315-
assert_eq!(when_allowed.poll_next_unpin(&mut cx), Poll::Ready(None));
316-
}
317249

318250
#[tokio::test]
319251
async fn closable_semaphore_permit_drop_returns_permit() {

core/src/core_tests/activity_tasks.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,26 +1058,34 @@ async fn graceful_shutdown(#[values(true, false)] at_max_outstanding: bool) {
10581058
let _task_q = "q";
10591059
let grace_period = Duration::from_millis(200);
10601060
let mut tasks = three_tasks();
1061-
let mut mock_client = mock_workflow_client();
1062-
mock_client
1063-
.expect_poll_activity_task()
1061+
let mut mock_act_poller = mock_poller();
1062+
mock_act_poller
1063+
.expect_poll()
10641064
.times(3)
1065-
.returning(move |_, _| Ok(tasks.pop_front().unwrap()));
1065+
.returning(move || Some(Ok(tasks.pop_front().unwrap())));
1066+
mock_act_poller
1067+
.expect_poll()
1068+
.times(1)
1069+
.returning(move || None);
10661070
// They shall all be reported as failed
1071+
let mut mock_client = mock_workflow_client();
10671072
mock_client
10681073
.expect_fail_activity_task()
10691074
.times(3)
10701075
.returning(|_, _| Ok(Default::default()));
10711076

10721077
let max_outstanding = if at_max_outstanding { 3_usize } else { 100 };
1073-
let worker = Worker::new_test(
1074-
test_worker_cfg()
1078+
let mw = MockWorkerInputs {
1079+
act_poller: Some(Box::from(mock_act_poller)),
1080+
config: test_worker_cfg()
10751081
.graceful_shutdown_period(grace_period)
10761082
.max_outstanding_activities(max_outstanding)
1083+
.max_concurrent_at_polls(1_usize) // Makes test logic simple
10771084
.build()
10781085
.unwrap(),
1079-
mock_client,
1080-
);
1086+
..Default::default()
1087+
};
1088+
let worker = mock_worker(MocksHolder::from_mock_worker(mock_client, mw));
10811089

10821090
let _1 = worker.poll_activity_task().await.unwrap();
10831091

core/src/core_tests/workers.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use crate::{
22
prost_dur,
33
test_help::{
4-
build_fake_worker, build_mock_pollers, canned_histories, mock_manual_poller, mock_worker,
5-
MockPollCfg, MockWorkerInputs, MocksHolder, ResponseType, WorkerExt,
4+
build_fake_worker, build_mock_pollers, canned_histories, mock_worker, MockPollCfg,
5+
MockWorkerInputs, MocksHolder, ResponseType, WorkerExt,
66
},
77
worker::client::mocks::mock_workflow_client,
88
PollActivityError, PollWfError,
99
};
10-
use futures::FutureExt;
10+
use futures_util::{stream, stream::StreamExt};
1111
use std::{cell::RefCell, time::Duration};
1212
use temporal_sdk_core_api::Worker;
1313
use temporal_sdk_core_protos::{
@@ -16,7 +16,9 @@ use temporal_sdk_core_protos::{
1616
workflow_commands::{workflow_command, CompleteWorkflowExecution, StartTimer},
1717
workflow_completion::WorkflowActivationCompletion,
1818
},
19-
temporal::api::workflowservice::v1::RespondWorkflowTaskCompletedResponse,
19+
temporal::api::workflowservice::v1::{
20+
PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse,
21+
},
2022
};
2123
use temporal_sdk_core_test_utils::start_timer_cmd;
2224
use tokio::sync::{watch, Barrier};
@@ -87,20 +89,18 @@ async fn shutdown_worker_can_complete_pending_activation() {
8789
#[tokio::test]
8890
async fn worker_shutdown_during_poll_doesnt_deadlock() {
8991
let (tx, rx) = watch::channel(false);
90-
let mut mock_poller = mock_manual_poller();
9192
let rx = rx.clone();
92-
mock_poller.expect_poll().returning(move || {
93-
let mut rx = rx.clone();
94-
async move {
95-
// Don't resolve polls until worker shuts down
96-
rx.changed().await.unwrap();
97-
// We don't want to return a real response here because it would get buffered and
98-
// then we'd have real work to do to be able to finish shutdown.
99-
Some(Ok(Default::default()))
100-
}
101-
.boxed()
93+
let stream = stream::unfold(rx, |mut rx| async move {
94+
// Don't resolve polls until worker shuts down
95+
rx.changed().await.unwrap();
96+
// We don't want to return a real response here because it would get buffered and
97+
// then we'd have real work to do to be able to finish shutdown.
98+
Some((
99+
Ok(PollWorkflowTaskQueueResponse::default().try_into().unwrap()),
100+
rx,
101+
))
102102
});
103-
let mw = MockWorkerInputs::new_from_poller(Box::new(mock_poller));
103+
let mw = MockWorkerInputs::new(stream.boxed());
104104
let mut mock_client = mock_workflow_client();
105105
mock_client
106106
.expect_complete_workflow_task()

core/src/core_tests/workflow_tasks.rs

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use rstest::{fixture, rstest};
1818
use std::{
1919
collections::{HashMap, HashSet, VecDeque},
2020
sync::{
21-
atomic::{AtomicU64, Ordering},
21+
atomic::{AtomicU64, AtomicUsize, Ordering},
2222
Arc,
2323
},
2424
time::Duration,
@@ -60,6 +60,7 @@ use temporal_sdk_core_test_utils::{fanout_tasks, start_timer_cmd, WorkerTestHelp
6060
use tokio::{
6161
join,
6262
sync::{Barrier, Semaphore},
63+
time,
6364
};
6465

6566
#[fixture(hist_batches = &[])]
@@ -1535,8 +1536,6 @@ async fn failing_wft_doesnt_eat_permit_forever() {
15351536
.unwrap();
15361537
}
15371538
assert_eq!(worker.outstanding_workflow_tasks().await, 0);
1538-
// 1 permit is in use because the next task is buffered and has re-used the permit
1539-
assert_eq!(worker.available_wft_permits().await, 1);
15401539
// We should be "out of work" because the mock service thinks we didn't complete the last task,
15411540
// which we didn't, because we don't spam failures. The real server would eventually time out
15421541
// the task. Mock doesn't understand that, so the WFT permit is released because eventually a
@@ -1545,7 +1544,7 @@ async fn failing_wft_doesnt_eat_permit_forever() {
15451544
outstanding_mock_tasks.unwrap().release_run(&run_id);
15461545
let activation = worker.poll_workflow_activation().await.unwrap();
15471546
// There should be no change in permits, since this just unbuffered the buffered task
1548-
assert_eq!(worker.available_wft_permits().await, 1);
1547+
assert_eq!(worker.available_wft_permits(), 1);
15491548
worker
15501549
.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
15511550
activation.run_id,
@@ -1554,7 +1553,7 @@ async fn failing_wft_doesnt_eat_permit_forever() {
15541553
.await
15551554
.unwrap();
15561555
worker.shutdown().await;
1557-
assert_eq!(worker.available_wft_permits().await, 2);
1556+
assert_eq!(worker.available_wft_permits(), 2);
15581557
}
15591558

15601559
#[tokio::test]
@@ -1875,7 +1874,7 @@ async fn poll_faster_than_complete_wont_overflow_cache() {
18751874

18761875
join!(blocking_poll, complete_evict);
18771876
// p5 outstanding and final poll outstanding -- hence one permit available
1878-
assert_eq!(core.available_wft_permits().await, 1);
1877+
assert_eq!(core.available_wft_permits(), 1);
18791878
assert_eq!(core.cached_workflows().await, 3);
18801879
}
18811880

@@ -2588,3 +2587,72 @@ async fn history_length_with_fail_and_timeout(
25882587
.unwrap();
25892588
worker.run_until_done().await.unwrap();
25902589
}
2590+
2591+
#[tokio::test]
2592+
async fn poller_wont_run_ahead_of_task_slots() {
2593+
let popped_tasks = Arc::new(AtomicUsize::new(0));
2594+
let ptc = popped_tasks.clone();
2595+
let mut bunch_of_first_tasks = (1..50).map(move |i| {
2596+
ptc.fetch_add(1, Ordering::Relaxed);
2597+
hist_to_poll_resp(
2598+
&canned_histories::single_timer(&format!("{i}")),
2599+
format!("wf-{i}"),
2600+
1.into(),
2601+
)
2602+
.resp
2603+
});
2604+
let mut mock_client = mock_workflow_client();
2605+
mock_client
2606+
.expect_poll_workflow_task()
2607+
.returning(move |_, _| Ok(bunch_of_first_tasks.next().unwrap()));
2608+
mock_client
2609+
.expect_complete_workflow_task()
2610+
.returning(|_| Ok(Default::default()));
2611+
2612+
let worker = Worker::new_test(
2613+
test_worker_cfg()
2614+
.max_cached_workflows(10_usize)
2615+
.max_outstanding_workflow_tasks(10_usize)
2616+
.max_concurrent_wft_polls(10_usize)
2617+
.no_remote_activities(true)
2618+
.build()
2619+
.unwrap(),
2620+
mock_client,
2621+
);
2622+
2623+
// Should be able to get up to 10 tasks
2624+
let mut tasks = vec![];
2625+
for _ in 0..10 {
2626+
tasks.push(worker.poll_workflow_activation().await.unwrap());
2627+
}
2628+
2629+
assert_eq!(worker.outstanding_workflow_tasks().await, 10);
2630+
assert_eq!(worker.available_wft_permits(), 0);
2631+
assert_eq!(worker.unused_wft_permits(), 0);
2632+
2633+
// This one should hang until we complete some tasks since we're at the limit
2634+
let hung_poll = async {
2635+
// This should end up getting shut down after the other routine finishes tasks
2636+
assert_matches!(
2637+
worker.poll_workflow_activation().await.unwrap_err(),
2638+
PollWfError::ShutDown
2639+
);
2640+
};
2641+
// Wait for a bit concurrently with above, verify no extra tasks got taken, shutdown
2642+
let ender = async {
2643+
time::sleep(Duration::from_millis(300)).await;
2644+
// initiate shutdown, then complete open tasks
2645+
worker.initiate_shutdown();
2646+
for t in tasks {
2647+
worker
2648+
.complete_workflow_activation(WorkflowActivationCompletion::empty(t.run_id))
2649+
.await
2650+
.unwrap();
2651+
}
2652+
worker.shutdown().await;
2653+
};
2654+
join!(hung_poll, ender);
2655+
// We shouldn't have got more than the 10 tasks from the poller -- verifying that the concurrent
2656+
// polling is not exceeding the task limit
2657+
assert_eq!(popped_tasks.load(Ordering::Relaxed), 10);
2658+
}

0 commit comments

Comments
 (0)