Skip to content

Commit 0095c7f

Browse files
committed
rt: overhaul task hooks
This change overhauls the entire task hooks system so that users can propagate arbitrary information between task hook invocations and pass context data between the hook "harnesses" for parent and child tasks at time of spawn. This is intended to be significantly more extensible and long-term maintainable than the current task hooks system, and should ultimately be much easier to stabilize.
1 parent a0af02a commit 0095c7f

33 files changed

+1159
-685
lines changed

tokio/src/lib.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -351,10 +351,7 @@
351351
//! - [`task::Builder`]
352352
//! - Some methods on [`task::JoinSet`]
353353
//! - [`runtime::RuntimeMetrics`]
354-
//! - [`runtime::Builder::on_task_spawn`]
355-
//! - [`runtime::Builder::on_task_terminate`]
356354
//! - [`runtime::Builder::unhandled_panic`]
357-
//! - [`runtime::TaskMeta`]
358355
//!
359356
//! This flag enables **unstable** features. The public API of these features
360357
//! may break in 1.x releases. To enable these features, the `--cfg

tokio/src/runtime/blocking/pool.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,10 +375,15 @@ impl Spawner {
375375
F: FnOnce() -> R + Send + 'static,
376376
R: Send + 'static,
377377
{
378+
// let parent = with_c
378379
let id = task::Id::next();
379380
let fut =
380381
blocking_task::<F, BlockingTask<F>>(BlockingTask::new(func), spawn_meta, id.as_u64());
381382

383+
#[cfg(tokio_unstable)]
384+
let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id, None);
385+
386+
#[cfg(not(tokio_unstable))]
382387
let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id);
383388

384389
let spawned = self.spawn_task(Task::new(task, is_mandatory), rt);

tokio/src/runtime/blocking/schedule.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
#[cfg(feature = "test-util")]
22
use crate::runtime::scheduler;
3-
use crate::runtime::task::{self, Task, TaskHarnessScheduleHooks};
3+
use crate::runtime::task::{self, Task};
44
use crate::runtime::Handle;
5+
#[cfg(tokio_unstable)]
6+
use crate::runtime::{OptionalTaskHooksFactory, OptionalTaskHooksFactoryRef};
57

68
/// `task::Schedule` implementation that does nothing (except some bookkeeping
79
/// in test-util builds). This is unique to the blocking scheduler as tasks
@@ -12,7 +14,8 @@ use crate::runtime::Handle;
1214
pub(crate) struct BlockingSchedule {
1315
#[cfg(feature = "test-util")]
1416
handle: Handle,
15-
hooks: TaskHarnessScheduleHooks,
17+
#[cfg(tokio_unstable)]
18+
hooks_factory: OptionalTaskHooksFactory,
1619
}
1720

1821
impl BlockingSchedule {
@@ -31,9 +34,8 @@ impl BlockingSchedule {
3134
BlockingSchedule {
3235
#[cfg(feature = "test-util")]
3336
handle: handle.clone(),
34-
hooks: TaskHarnessScheduleHooks {
35-
task_terminate_callback: handle.inner.hooks().task_terminate_callback.clone(),
36-
},
37+
#[cfg(tokio_unstable)]
38+
hooks_factory: handle.inner.hooks_factory(),
3739
}
3840
}
3941
}
@@ -58,9 +60,13 @@ impl task::Schedule for BlockingSchedule {
5860
unreachable!();
5961
}
6062

61-
fn hooks(&self) -> TaskHarnessScheduleHooks {
62-
TaskHarnessScheduleHooks {
63-
task_terminate_callback: self.hooks.task_terminate_callback.clone(),
64-
}
63+
#[cfg(tokio_unstable)]
64+
fn hooks_factory(&self) -> OptionalTaskHooksFactory {
65+
self.hooks_factory.clone()
66+
}
67+
68+
#[cfg(tokio_unstable)]
69+
fn hooks_factory_ref(&self) -> OptionalTaskHooksFactoryRef<'_> {
70+
self.hooks_factory.as_ref().map(AsRef::as_ref)
6571
}
6672
}

tokio/src/runtime/builder.rs

Lines changed: 21 additions & 210 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
#![cfg_attr(loom, allow(unused_imports))]
22

3+
use crate::runtime::blocking::BlockingPool;
34
use crate::runtime::handle::Handle;
4-
use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
5+
use crate::runtime::scheduler::CurrentThread;
6+
use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime};
57
#[cfg(tokio_unstable)]
6-
use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
8+
use crate::runtime::{
9+
metrics::HistogramConfiguration, LocalOptions, LocalRuntime, OptionalTaskHooksFactory,
10+
TaskHookHarnessFactory,
11+
};
712
use crate::util::rand::{RngSeed, RngSeedGenerator};
8-
9-
use crate::runtime::blocking::BlockingPool;
10-
use crate::runtime::scheduler::CurrentThread;
1113
use std::fmt;
1214
use std::io;
15+
#[cfg(tokio_unstable)]
16+
use std::sync::Arc;
1317
use std::thread::ThreadId;
1418
use std::time::Duration;
1519

@@ -85,19 +89,8 @@ pub struct Builder {
8589
/// To run after each thread is unparked.
8690
pub(super) after_unpark: Option<Callback>,
8791

88-
/// To run before each task is spawned.
89-
pub(super) before_spawn: Option<TaskCallback>,
90-
91-
/// To run before each poll
9292
#[cfg(tokio_unstable)]
93-
pub(super) before_poll: Option<TaskCallback>,
94-
95-
/// To run after each poll
96-
#[cfg(tokio_unstable)]
97-
pub(super) after_poll: Option<TaskCallback>,
98-
99-
/// To run after each task is terminated.
100-
pub(super) after_termination: Option<TaskCallback>,
93+
pub(super) task_hook_harness_factory: OptionalTaskHooksFactory,
10194

10295
/// Customizable keep alive timeout for `BlockingPool`
10396
pub(super) keep_alive: Option<Duration>,
@@ -287,13 +280,8 @@ impl Builder {
287280
before_park: None,
288281
after_unpark: None,
289282

290-
before_spawn: None,
291-
after_termination: None,
292-
293283
#[cfg(tokio_unstable)]
294-
before_poll: None,
295-
#[cfg(tokio_unstable)]
296-
after_poll: None,
284+
task_hook_harness_factory: None,
297285

298286
keep_alive: None,
299287

@@ -685,188 +673,19 @@ impl Builder {
685673
self
686674
}
687675

688-
/// Executes function `f` just before a task is spawned.
689-
///
690-
/// `f` is called within the Tokio context, so functions like
691-
/// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
692-
/// invoked immediately.
693-
///
694-
/// This can be used for bookkeeping or monitoring purposes.
695-
///
696-
/// Note: There can only be one spawn callback for a runtime; calling this function more
697-
/// than once replaces the last callback defined, rather than adding to it.
698-
///
699-
/// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
700-
///
701-
/// **Note**: This is an [unstable API][unstable]. The public API of this type
702-
/// may break in 1.x releases. See [the documentation on unstable
703-
/// features][unstable] for details.
704-
///
705-
/// [unstable]: crate#unstable-features
706-
///
707-
/// # Examples
708-
///
709-
/// ```
710-
/// # use tokio::runtime;
711-
/// # pub fn main() {
712-
/// let runtime = runtime::Builder::new_current_thread()
713-
/// .on_task_spawn(|_| {
714-
/// println!("spawning task");
715-
/// })
716-
/// .build()
717-
/// .unwrap();
718-
///
719-
/// runtime.block_on(async {
720-
/// tokio::task::spawn(std::future::ready(()));
721-
///
722-
/// for _ in 0..64 {
723-
/// tokio::task::yield_now().await;
724-
/// }
725-
/// })
726-
/// # }
727-
/// ```
728-
#[cfg(all(not(loom), tokio_unstable))]
729-
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
730-
pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
731-
where
732-
F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
733-
{
734-
self.before_spawn = Some(std::sync::Arc::new(f));
735-
self
736-
}
737-
738-
/// Executes function `f` just before a task is polled
739-
///
740-
/// `f` is called within the Tokio context, so functions like
741-
/// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
742-
/// invoked immediately.
743-
///
744-
/// **Note**: This is an [unstable API][unstable]. The public API of this type
745-
/// may break in 1.x releases. See [the documentation on unstable
746-
/// features][unstable] for details.
747-
///
748-
/// [unstable]: crate#unstable-features
749-
///
750-
/// # Examples
751-
///
752-
/// ```
753-
/// # use std::sync::{atomic::AtomicUsize, Arc};
754-
/// # use tokio::task::yield_now;
755-
/// # pub fn main() {
756-
/// let poll_start_counter = Arc::new(AtomicUsize::new(0));
757-
/// let poll_start = poll_start_counter.clone();
758-
/// let rt = tokio::runtime::Builder::new_multi_thread()
759-
/// .enable_all()
760-
/// .on_before_task_poll(move |meta| {
761-
/// println!("task {} is about to be polled", meta.id())
762-
/// })
763-
/// .build()
764-
/// .unwrap();
765-
/// let task = rt.spawn(async {
766-
/// yield_now().await;
767-
/// });
768-
/// let _ = rt.block_on(task);
769-
///
770-
/// # }
771-
/// ```
772-
#[cfg(tokio_unstable)]
773-
pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
774-
where
775-
F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
776-
{
777-
self.before_poll = Some(std::sync::Arc::new(f));
778-
self
779-
}
780-
781-
/// Executes function `f` just after a task is polled
782-
///
783-
/// `f` is called within the Tokio context, so functions like
784-
/// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
785-
/// invoked immediately.
786-
///
787-
/// **Note**: This is an [unstable API][unstable]. The public API of this type
788-
/// may break in 1.x releases. See [the documentation on unstable
789-
/// features][unstable] for details.
790-
///
791-
/// [unstable]: crate#unstable-features
792-
///
793-
/// # Examples
794-
///
795-
/// ```
796-
/// # use std::sync::{atomic::AtomicUsize, Arc};
797-
/// # use tokio::task::yield_now;
798-
/// # pub fn main() {
799-
/// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
800-
/// let poll_stop = poll_stop_counter.clone();
801-
/// let rt = tokio::runtime::Builder::new_multi_thread()
802-
/// .enable_all()
803-
/// .on_after_task_poll(move |meta| {
804-
/// println!("task {} completed polling", meta.id());
805-
/// })
806-
/// .build()
807-
/// .unwrap();
808-
/// let task = rt.spawn(async {
809-
/// yield_now().await;
810-
/// });
811-
/// let _ = rt.block_on(task);
812-
///
813-
/// # }
814-
/// ```
815-
#[cfg(tokio_unstable)]
816-
pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
817-
where
818-
F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
819-
{
820-
self.after_poll = Some(std::sync::Arc::new(f));
821-
self
822-
}
823-
824-
/// Executes function `f` just after a task is terminated.
825-
///
826-
/// `f` is called within the Tokio context, so functions like
827-
/// [`tokio::spawn`](crate::spawn) can be called.
828-
///
829-
/// This can be used for bookkeeping or monitoring purposes.
830-
///
831-
/// Note: There can only be one task termination callback for a runtime; calling this
832-
/// function more than once replaces the last callback defined, rather than adding to it.
676+
/// Factory method for producing "fallback" task hook harnesses.
833677
///
834-
/// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
835-
///
836-
/// **Note**: This is an [unstable API][unstable]. The public API of this type
837-
/// may break in 1.x releases. See [the documentation on unstable
838-
/// features][unstable] for details.
839-
///
840-
/// [unstable]: crate#unstable-features
841-
///
842-
/// # Examples
843-
///
844-
/// ```
845-
/// # use tokio::runtime;
846-
/// # pub fn main() {
847-
/// let runtime = runtime::Builder::new_current_thread()
848-
/// .on_task_terminate(|_| {
849-
/// println!("killing task");
850-
/// })
851-
/// .build()
852-
/// .unwrap();
853-
///
854-
/// runtime.block_on(async {
855-
/// tokio::task::spawn(std::future::ready(()));
856-
///
857-
/// for _ in 0..64 {
858-
/// tokio::task::yield_now().await;
859-
/// }
860-
/// })
861-
/// # }
862-
/// ```
678+
/// The order of operations for assigning the hook harness for a task are as follows:
679+
/// 1. [`crate::task::spawn_with_hooks`], if used.
680+
/// 2. [`crate::runtime::task_hooks::TaskHookHarnessFactory`], if it returns something other than [Option::None].
681+
/// 3. This function.
863682
#[cfg(all(not(loom), tokio_unstable))]
864683
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
865-
pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
684+
pub fn hook_harness_factory<T>(&mut self, hooks: T) -> &mut Self
866685
where
867-
F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
686+
T: TaskHookHarnessFactory + Send + Sync + 'static,
868687
{
869-
self.after_termination = Some(std::sync::Arc::new(f));
688+
self.task_hook_harness_factory = Some(Arc::new(hooks));
870689
self
871690
}
872691

@@ -1475,12 +1294,8 @@ impl Builder {
14751294
Config {
14761295
before_park: self.before_park.clone(),
14771296
after_unpark: self.after_unpark.clone(),
1478-
before_spawn: self.before_spawn.clone(),
1479-
#[cfg(tokio_unstable)]
1480-
before_poll: self.before_poll.clone(),
14811297
#[cfg(tokio_unstable)]
1482-
after_poll: self.after_poll.clone(),
1483-
after_termination: self.after_termination.clone(),
1298+
task_hook_factory: self.task_hook_harness_factory.clone(),
14841299
global_queue_interval: self.global_queue_interval,
14851300
event_interval: self.event_interval,
14861301
#[cfg(tokio_unstable)]
@@ -1628,12 +1443,8 @@ cfg_rt_multi_thread! {
16281443
Config {
16291444
before_park: self.before_park.clone(),
16301445
after_unpark: self.after_unpark.clone(),
1631-
before_spawn: self.before_spawn.clone(),
1632-
#[cfg(tokio_unstable)]
1633-
before_poll: self.before_poll.clone(),
16341446
#[cfg(tokio_unstable)]
1635-
after_poll: self.after_poll.clone(),
1636-
after_termination: self.after_termination.clone(),
1447+
task_hook_factory: self.task_hook_harness_factory.clone(),
16371448
global_queue_interval: self.global_queue_interval,
16381449
event_interval: self.event_interval,
16391450
#[cfg(tokio_unstable)]

tokio/src/runtime/config.rs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"),
33
allow(dead_code)
44
)]
5-
use crate::runtime::{Callback, TaskCallback};
5+
6+
use crate::runtime::Callback;
7+
#[cfg(tokio_unstable)]
8+
use crate::runtime::OptionalTaskHooksFactory;
69
use crate::util::RngSeedGenerator;
710

811
pub(crate) struct Config {
@@ -18,19 +21,9 @@ pub(crate) struct Config {
1821
/// Callback for a worker unparking itself
1922
pub(crate) after_unpark: Option<Callback>,
2023

21-
/// To run before each task is spawned.
22-
pub(crate) before_spawn: Option<TaskCallback>,
23-
24-
/// To run after each task is terminated.
25-
pub(crate) after_termination: Option<TaskCallback>,
26-
27-
/// To run before each poll
28-
#[cfg(tokio_unstable)]
29-
pub(crate) before_poll: Option<TaskCallback>,
30-
31-
/// To run after each poll
24+
/// Called on task spawn to generate the attached task hook harness.
3225
#[cfg(tokio_unstable)]
33-
pub(crate) after_poll: Option<TaskCallback>,
26+
pub(crate) task_hook_factory: OptionalTaskHooksFactory,
3427

3528
/// The multi-threaded scheduler includes a per-worker LIFO slot used to
3629
/// store the last scheduled task. This can improve certain usage patterns,

0 commit comments

Comments
 (0)