Skip to content

time: delay the Arc::clone of the scheduler::Handle until registering timer #7461

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tokio/src/runtime/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
)]

use crate::runtime::park::{ParkThread, UnparkThread};
use crate::util::error::TIME_DISABLED_ERROR;

use std::io;
use std::time::Duration;
Expand Down Expand Up @@ -110,7 +111,7 @@ impl Handle {
pub(crate) fn time(&self) -> &crate::runtime::time::Handle {
self.time
.as_ref()
.expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
.expect(TIME_DISABLED_ERROR)
}

pub(crate) fn clock(&self) -> &Clock {
Expand Down
30 changes: 30 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,24 @@ cfg_rt! {
}
}

/// # Panics
///
/// Panics if the current [`Context`] is not available
/// in the current thread.
// remove this `allow(dead_code)` when this method
// is used by other other modules except the `time`.
#[cfg_attr(not(feature = "time"), allow(dead_code))]
#[track_caller]
pub(crate) fn with_current<F, R>(f: F) -> R
where
F: FnOnce(&Handle) -> R,
{
match context::with_current(|hdl| f(hdl)) {
Ok(ret) => ret,
Err(e) => panic!("{e}"),
}
}

pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner {
match_flavor!(self, Handle(h) => &h.blocking_spawner)
}
Expand Down Expand Up @@ -271,5 +289,17 @@ cfg_not_rt! {
pub(crate) fn current() -> Handle {
panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
}

// remove this `allow(dead_code)` when this method
// is used by other other modules except the `time`.
#[cfg_attr(not(feature = "time"), allow(dead_code))]
#[track_caller]
pub(crate) fn with_current<F, R>(_f: F) -> R
where
F: FnOnce(&Handle) -> R,
{
panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
}

}
}
65 changes: 34 additions & 31 deletions tokio/src/runtime/time/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ use crate::loom::sync::atomic::AtomicU64;
use crate::loom::sync::atomic::Ordering;

use crate::runtime::scheduler;
use crate::runtime::time;
use crate::sync::AtomicWaker;
use crate::time::Instant;
use crate::util::error::{RUNTIME_SHUTTING_DOWN_ERROR, TIME_DISABLED_ERROR};
use crate::util::linked_list;

use pin_project_lite::pin_project;
Expand Down Expand Up @@ -285,9 +287,6 @@ pin_project! {
// before polling.
#[derive(Debug)]
pub(crate) struct TimerEntry {
// Arc reference to the runtime handle. We can only free the driver after
// deregistering everything from their respective timer wheels.
driver: scheduler::Handle,
// Shared inner structure; this is part of an intrusive linked list, and
// therefore other references can exist to it while mutable references to
// Entry exist.
Expand Down Expand Up @@ -340,6 +339,10 @@ pub(crate) struct TimerShared {
/// Only accessed under the entry lock.
pointers: linked_list::Pointers<TimerShared>,

// Arc reference to the runtime handle. We can only free the driver after
// deregistering everything from their respective timer wheels.
driver: scheduler::Handle,

/// The time when the [`TimerEntry`] was registered into the Wheel,
/// [`STATE_DEREGISTERED`] means it is not registered.
///
Expand Down Expand Up @@ -384,11 +387,19 @@ generate_addr_of_methods! {

impl TimerShared {
pub(super) fn new() -> Self {
Self {
registered_when: AtomicU64::new(0),
pointers: linked_list::Pointers::new(),
state: StateCell::default(),
_p: PhantomPinned,
// ensure both scheduler handle and time driver are available,
// otherwise panic
let maybe_hdl =
scheduler::Handle::with_current(|hdl| hdl.driver().time.as_ref().map(|_| hdl.clone()));
match maybe_hdl {
None => panic!("{TIME_DISABLED_ERROR}"),
Some(hdl) => Self {
driver: hdl,
registered_when: AtomicU64::new(STATE_DEREGISTERED),
pointers: linked_list::Pointers::new(),
state: StateCell::default(),
_p: PhantomPinned,
},
}
}

Expand Down Expand Up @@ -453,6 +464,10 @@ impl TimerShared {
pub(super) fn might_be_registered(&self) -> bool {
self.state.might_be_registered()
}

fn driver(&self) -> &time::Handle {
self.driver.driver().time()
}
}

unsafe impl linked_list::Link for TimerShared {
Expand All @@ -479,12 +494,8 @@ unsafe impl linked_list::Link for TimerShared {

impl TimerEntry {
#[track_caller]
pub(crate) fn new(handle: scheduler::Handle, deadline: Instant) -> Self {
// Panic if the time driver is not enabled
let _ = handle.driver().time();

pub(crate) fn new(deadline: Instant) -> Self {
Self {
driver: handle,
inner: None,
deadline,
registered: false,
Expand Down Expand Up @@ -565,15 +576,14 @@ impl TimerEntry {
// driver did so far and happens-before everything the driver does in
// the future. While we have the lock held, we also go ahead and
// deregister the entry if necessary.
unsafe { self.driver().clear_entry(NonNull::from(inner)) };
unsafe { inner.driver().clear_entry(NonNull::from(inner)) };
}

pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) {
let this = self.as_mut().project();
*this.deadline = new_time;
*this.registered = reregister;

let tick = self.driver().time_source().deadline_to_tick(new_time);
let inner = match self.inner() {
Some(inner) => inner,
None => {
Expand All @@ -582,15 +592,17 @@ impl TimerEntry {
.expect("inner should already be initialized by `this.init_inner()`")
}
};
let tick = inner.driver().time_source().deadline_to_tick(new_time);

if inner.extend_expiration(tick).is_ok() {
return;
}

if reregister {
unsafe {
self.driver()
.reregister(&self.driver.driver().io, tick, inner.into());
inner
.driver()
.reregister(&inner.driver.driver().io, tick, inner.into());
}
}
}
Expand All @@ -599,12 +611,6 @@ impl TimerEntry {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), super::Error>> {
assert!(
!self.driver().is_shutdown(),
"{}",
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
);

if !self.registered {
let deadline = self.deadline;
self.as_mut().reset(deadline, true);
Expand All @@ -613,16 +619,13 @@ impl TimerEntry {
let inner = self
.inner()
.expect("inner should already be initialized by `self.reset()`");
inner.state.poll(cx.waker())
}

pub(crate) fn driver(&self) -> &super::Handle {
self.driver.driver().time()
}
assert!(
!inner.driver().is_shutdown(),
"{RUNTIME_SHUTTING_DOWN_ERROR}"
);

#[cfg(all(tokio_unstable, feature = "tracing"))]
pub(crate) fn clock(&self) -> &super::Clock {
self.driver.driver().clock()
inner.state.poll(cx.waker())
}
}

Expand Down
Loading
Loading