Skip to content

Commit bff21ab

Browse files
authored
rt: set task budget after block_in_place call (tokio-rs#2502)
In some cases, when a call to `block_in_place` completes, the runtime is reinstated on the thread. In this case, the task budget must also be set in order to avoid starving other tasks on the worker.
1 parent 07533a5 commit bff21ab

File tree

3 files changed

+95
-26
lines changed

3 files changed

+95
-26
lines changed

tokio/src/coop.rs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,20 @@ cfg_rt_threaded! {
9292
/// Run the given closure with a cooperative task budget. When the function
9393
/// returns, the budget is reset to the value prior to calling the function.
9494
#[inline(always)]
95-
pub(crate) fn budget<F, R>(f: F) -> R
96-
where
97-
F: FnOnce() -> R,
98-
{
95+
pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
96+
with_budget(Budget::initial(), f)
97+
}
98+
99+
cfg_rt_threaded! {
100+
/// Set the current task's budget
101+
#[cfg(feature = "blocking")]
102+
pub(crate) fn set(budget: Budget) {
103+
CURRENT.with(|cell| cell.set(budget))
104+
}
105+
}
106+
107+
#[inline(always)]
108+
fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
99109
struct ResetGuard<'a> {
100110
cell: &'a Cell<Budget>,
101111
prev: Budget,
@@ -110,7 +120,7 @@ where
110120
CURRENT.with(move |cell| {
111121
let prev = cell.get();
112122

113-
cell.set(Budget::initial());
123+
cell.set(budget);
114124

115125
let _guard = ResetGuard { cell, prev };
116126

@@ -127,10 +137,14 @@ cfg_rt_threaded! {
127137

128138
cfg_blocking_impl! {
129139
/// Forcibly remove the budgeting constraints early.
130-
pub(crate) fn stop() {
140+
///
141+
/// Returns the remaining budget
142+
pub(crate) fn stop() -> Budget {
131143
CURRENT.with(|cell| {
144+
let prev = cell.get();
132145
cell.set(Budget::unconstrained());
133-
});
146+
prev
147+
})
134148
}
135149
}
136150

tokio/src/runtime/thread_pool/worker.rs

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
//! "core" is handed off to a new thread allowing the scheduler to continue to
55
//! make progress while the originating thread blocks.
66
7+
use crate::coop;
78
use crate::loom::rand::seed;
89
use crate::loom::sync::{Arc, Mutex};
910
use crate::park::{Park, Unpark};
@@ -179,31 +180,27 @@ cfg_blocking! {
179180
F: FnOnce() -> R,
180181
{
181182
// Try to steal the worker core back
182-
struct Reset(bool);
183+
struct Reset(coop::Budget);
183184

184185
impl Drop for Reset {
185186
fn drop(&mut self) {
186187
CURRENT.with(|maybe_cx| {
187-
if !self.0 {
188-
// We were not the ones to give away the core,
189-
// so we do not get to restore it either.
190-
// This is necessary so that with a nested
191-
// block_in_place, the inner block_in_place
192-
// does not restore the core.
193-
return;
194-
}
195-
196188
if let Some(cx) = maybe_cx {
197189
let core = cx.worker.core.take();
198190
let mut cx_core = cx.core.borrow_mut();
199191
assert!(cx_core.is_none());
200192
*cx_core = core;
193+
194+
// Reset the task budget as we are re-entering the
195+
// runtime.
196+
coop::set(self.0);
201197
}
202198
});
203199
}
204200
}
205201

206202
let mut had_core = false;
203+
207204
CURRENT.with(|maybe_cx| {
208205
match (crate::runtime::enter::context(), maybe_cx.is_some()) {
209206
(EnterContext::Entered { .. }, true) => {
@@ -231,16 +228,12 @@ cfg_blocking! {
231228
return;
232229
}
233230
}
231+
234232
let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
235233

236234
// Get the worker core. If none is set, then blocking is fine!
237235
let core = match cx.core.borrow_mut().take() {
238-
Some(core) => {
239-
// We are effectively leaving the executor, so we need to
240-
// forcibly end budgeting.
241-
crate::coop::stop();
242-
core
243-
},
236+
Some(core) => core,
244237
None => return,
245238
};
246239

@@ -263,9 +256,12 @@ cfg_blocking! {
263256
runtime::spawn_blocking(move || run(worker));
264257
});
265258

266-
let _reset = Reset(had_core);
267259

268260
if had_core {
261+
// Unset the current task's budget. Blocking sections are not
262+
// constrained by task budgets.
263+
let _reset = Reset(coop::stop());
264+
269265
crate::runtime::enter::exit(f)
270266
} else {
271267
f()
@@ -349,7 +345,7 @@ impl Context {
349345
*self.core.borrow_mut() = Some(core);
350346

351347
// Run the task
352-
crate::coop::budget(|| {
348+
coop::budget(|| {
353349
task.run();
354350

355351
// As long as there is budget remaining and a task exists in the
@@ -368,7 +364,7 @@ impl Context {
368364
None => return Ok(core),
369365
};
370366

371-
if crate::coop::has_budget_remaining() {
367+
if coop::has_budget_remaining() {
372368
// Run the LIFO task, then loop
373369
*self.core.borrow_mut() = Some(core);
374370
task.run();

tokio/tests/rt_threaded.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use tokio::runtime::{self, Runtime};
77
use tokio::sync::oneshot;
88
use tokio_test::{assert_err, assert_ok};
99

10+
use futures::future::poll_fn;
1011
use std::future::Future;
1112
use std::pin::Pin;
1213
use std::sync::atomic::AtomicUsize;
@@ -322,6 +323,64 @@ fn multi_threadpool() {
322323
done_rx.recv().unwrap();
323324
}
324325

326+
// When `block_in_place` returns, it attempts to reclaim the yielded runtime
327+
// worker. In this case, the remainder of the task is on the runtime worker and
328+
// must take part in the cooperative task budgeting system.
329+
//
330+
// The test ensures that, when this happens, attempting to consume from a
331+
// channel yields occasionally even if there are values ready to receive.
332+
#[test]
333+
fn coop_and_block_in_place() {
334+
use tokio::sync::mpsc;
335+
336+
let mut rt = tokio::runtime::Builder::new()
337+
.threaded_scheduler()
338+
// Setting max threads to 1 prevents another thread from claiming the
339+
// runtime worker yielded as part of `block_in_place` and guarantees the
340+
// same thread will reclaim the worker at the end of the
341+
// `block_in_place` call.
342+
.max_threads(1)
343+
.build()
344+
.unwrap();
345+
346+
rt.block_on(async move {
347+
let (mut tx, mut rx) = mpsc::channel(1024);
348+
349+
// Fill the channel
350+
for _ in 0..1024 {
351+
tx.send(()).await.unwrap();
352+
}
353+
354+
drop(tx);
355+
356+
tokio::spawn(async move {
357+
// Block in place without doing anything
358+
tokio::task::block_in_place(|| {});
359+
360+
// Receive all the values, this should trigger a `Pending` as the
361+
// coop limit will be reached.
362+
poll_fn(|cx| {
363+
while let Poll::Ready(v) = {
364+
tokio::pin! {
365+
let fut = rx.recv();
366+
}
367+
368+
Pin::new(&mut fut).poll(cx)
369+
} {
370+
if v.is_none() {
371+
panic!("did not yield");
372+
}
373+
}
374+
375+
Poll::Ready(())
376+
})
377+
.await
378+
})
379+
.await
380+
.unwrap();
381+
});
382+
}
383+
325384
// Testing this does not panic
326385
#[test]
327386
fn max_threads() {

0 commit comments

Comments
 (0)