Skip to content

Commit ad412a9

Browse files
authored
util: add track_caller to public APIs (#4785)
* util: add track_caller to public APIs Functions that may panic can be annotated with `#[track_caller]` so that in the event of a panic, the function where the user called the panicking function is shown instead of the file and line within Tokio source. This change adds `#[track_caller]` to all the non-unstable public APIs in tokio-util where the documentation describes how the function may panic due to incorrect context or inputs. In one place, an assert was added where the described behavior appeared not to be implemented. The documentation for `DelayQueue::reserve` states that the function will panic if the new capacity exceeds the maximum number of entries the queue can contain. However, the function didn't panic until a higher number caused by an allocation failure. This is inconsistent with `DelayQueue::insert_at` which will panic if the number of entries were to go over MAX_ENTRIES. Tests are included to cover each potentially panicking function. Refs: #4413 * fix tests on FreeBSD 32-bit (I hope) Some tests were failing on FreeBSD 32-bit because the "times too far in the future" for DelayQueue were also too far in the future for the OS. Fixed by copying the MAX_DURATION value from where it's defined and using it to create a duration that is just 1 more than the maximum. This will start to break once we get close (within 2 and a bit years) of the Epochalypse (19 Jan, 2038) - but a lot of other things are going to be breaking on FreeBSD 32-bit by then anyway.
1 parent 55078ff commit ad412a9

File tree

7 files changed

+249
-1
lines changed

7 files changed

+249
-1
lines changed

tokio-util/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ tokio-stream = { version = "0.1", path = "../tokio-stream" }
5555
async-stream = "0.3.0"
5656
futures = "0.3.0"
5757
futures-test = "0.3.5"
58+
parking_lot = "0.12.0"
5859

5960
[package.metadata.docs.rs]
6061
all-features = true

tokio-util/src/io/sync_bridge.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,10 @@ impl<T: Unpin> SyncIoBridge<T> {
8585
///
8686
/// Use e.g. `SyncIoBridge::new(Box::pin(src))`.
8787
///
88-
/// # Panic
88+
/// # Panics
8989
///
9090
/// This will panic if called outside the context of a Tokio runtime.
91+
#[track_caller]
9192
pub fn new(src: T) -> Self {
9293
Self::new_with_handle(src, tokio::runtime::Handle::current())
9394
}

tokio-util/src/sync/mpsc.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ impl<T: Send + 'static> PollSender<T> {
136136
///
137137
/// If `poll_reserve` was not successfully called prior to calling `send_item`, then this method
138138
/// will panic.
139+
#[track_caller]
139140
pub fn send_item(&mut self, value: T) -> Result<(), PollSendError<T>> {
140141
let (result, next_state) = match self.take_state() {
141142
State::Idle(_) | State::Acquiring => {

tokio-util/src/task/spawn_pinned.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ impl LocalPoolHandle {
5757
/// pool via [`LocalPoolHandle::spawn_pinned`].
5858
///
5959
/// # Panics
60+
///
6061
/// Panics if the pool size is less than one.
62+
#[track_caller]
6163
pub fn new(pool_size: usize) -> LocalPoolHandle {
6264
assert!(pool_size > 0);
6365

@@ -167,6 +169,7 @@ impl LocalPoolHandle {
167169
/// }
168170
/// ```
169171
///
172+
#[track_caller]
170173
pub fn spawn_pinned_by_idx<F, Fut>(&self, create_task: F, idx: usize) -> JoinHandle<Fut::Output>
171174
where
172175
F: FnOnce() -> Fut,
@@ -196,6 +199,7 @@ struct LocalPool {
196199

197200
impl LocalPool {
198201
/// Spawn a `?Send` future onto a worker
202+
#[track_caller]
199203
fn spawn_pinned<F, Fut>(
200204
&self,
201205
create_task: F,
@@ -324,6 +328,7 @@ impl LocalPool {
324328
}
325329
}
326330

331+
#[track_caller]
327332
fn find_worker_by_idx(&self, idx: usize) -> (&LocalWorkerHandle, JobCountGuard) {
328333
let worker = &self.workers[idx];
329334
worker.task_count.fetch_add(1, Ordering::SeqCst);

tokio-util/src/time/delay_queue.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,7 @@ impl<T> DelayQueue<T> {
531531
/// [`reset`]: method@Self::reset
532532
/// [`Key`]: struct@Key
533533
/// [type]: #
534+
#[track_caller]
534535
pub fn insert_at(&mut self, value: T, when: Instant) -> Key {
535536
assert!(self.slab.len() < MAX_ENTRIES, "max entries exceeded");
536537

@@ -649,10 +650,12 @@ impl<T> DelayQueue<T> {
649650
/// [`reset`]: method@Self::reset
650651
/// [`Key`]: struct@Key
651652
/// [type]: #
653+
#[track_caller]
652654
pub fn insert(&mut self, value: T, timeout: Duration) -> Key {
653655
self.insert_at(value, Instant::now() + timeout)
654656
}
655657

658+
#[track_caller]
656659
fn insert_idx(&mut self, when: u64, key: Key) {
657660
use self::wheel::{InsertError, Stack};
658661

@@ -674,6 +677,7 @@ impl<T> DelayQueue<T> {
674677
/// # Panics
675678
///
676679
/// Panics if the key is not contained in the expired queue or the wheel.
680+
#[track_caller]
677681
fn remove_key(&mut self, key: &Key) {
678682
use crate::time::wheel::Stack;
679683

@@ -713,6 +717,7 @@ impl<T> DelayQueue<T> {
713717
/// assert_eq!(*item.get_ref(), "foo");
714718
/// # }
715719
/// ```
720+
#[track_caller]
716721
pub fn remove(&mut self, key: &Key) -> Expired<T> {
717722
let prev_deadline = self.next_deadline();
718723

@@ -769,6 +774,7 @@ impl<T> DelayQueue<T> {
769774
/// // "foo" is now scheduled to be returned in 10 seconds
770775
/// # }
771776
/// ```
777+
#[track_caller]
772778
pub fn reset_at(&mut self, key: &Key, when: Instant) {
773779
self.remove_key(key);
774780

@@ -873,6 +879,7 @@ impl<T> DelayQueue<T> {
873879
/// // "foo"is now scheduled to be returned in 10 seconds
874880
/// # }
875881
/// ```
882+
#[track_caller]
876883
pub fn reset(&mut self, key: &Key, timeout: Duration) {
877884
self.reset_at(key, Instant::now() + timeout);
878885
}
@@ -978,7 +985,12 @@ impl<T> DelayQueue<T> {
978985
/// assert!(delay_queue.capacity() >= 11);
979986
/// # }
980987
/// ```
988+
#[track_caller]
981989
pub fn reserve(&mut self, additional: usize) {
990+
assert!(
991+
self.slab.capacity() + additional <= MAX_ENTRIES,
992+
"max queue capacity exceeded"
993+
);
982994
self.slab.reserve(additional);
983995
}
984996

@@ -1117,6 +1129,7 @@ impl<T> wheel::Stack for Stack<T> {
11171129
}
11181130
}
11191131

1132+
#[track_caller]
11201133
fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store) {
11211134
let key = *item;
11221135
assert!(store.contains(item));

tokio-util/src/time/wheel/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ where
118118
}
119119

120120
/// Remove `item` from the timing wheel.
121+
#[track_caller]
121122
pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) {
122123
let when = T::when(item, store);
123124

tokio-util/tests/panic.rs

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
#![warn(rust_2018_idioms)]
2+
#![cfg(feature = "full")]
3+
4+
use parking_lot::{const_mutex, Mutex};
5+
use std::error::Error;
6+
use std::panic;
7+
use std::sync::Arc;
8+
use tokio::runtime::Runtime;
9+
use tokio::sync::mpsc::channel;
10+
use tokio::time::{Duration, Instant};
11+
use tokio_test::task;
12+
use tokio_util::io::SyncIoBridge;
13+
use tokio_util::sync::PollSender;
14+
use tokio_util::task::LocalPoolHandle;
15+
use tokio_util::time::DelayQueue;
16+
17+
// Taken from tokio-util::time::wheel, if that changes then
18+
const MAX_DURATION_MS: u64 = (1 << (36)) - 1;
19+
20+
fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<String> {
21+
static PANIC_MUTEX: Mutex<()> = const_mutex(());
22+
23+
{
24+
let _guard = PANIC_MUTEX.lock();
25+
let panic_file: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
26+
27+
let prev_hook = panic::take_hook();
28+
{
29+
let panic_file = panic_file.clone();
30+
panic::set_hook(Box::new(move |panic_info| {
31+
let panic_location = panic_info.location().unwrap();
32+
panic_file
33+
.lock()
34+
.clone_from(&Some(panic_location.file().to_string()));
35+
}));
36+
}
37+
38+
let result = panic::catch_unwind(func);
39+
// Return to the previously set panic hook (maybe default) so that we get nice error
40+
// messages in the tests.
41+
panic::set_hook(prev_hook);
42+
43+
if result.is_err() {
44+
panic_file.lock().clone()
45+
} else {
46+
None
47+
}
48+
}
49+
}
50+
51+
#[test]
52+
fn sync_bridge_new_panic_caller() -> Result<(), Box<dyn Error>> {
53+
let panic_location_file = test_panic(|| {
54+
let _ = SyncIoBridge::new(tokio::io::empty());
55+
});
56+
57+
// The panic location should be in this file
58+
assert_eq!(&panic_location_file.unwrap(), file!());
59+
60+
Ok(())
61+
}
62+
63+
#[test]
64+
fn poll_sender_send_item_panic_caller() -> Result<(), Box<dyn Error>> {
65+
let panic_location_file = test_panic(|| {
66+
let (send, _) = channel::<u32>(3);
67+
let mut send = PollSender::new(send);
68+
69+
let _ = send.send_item(42);
70+
});
71+
72+
// The panic location should be in this file
73+
assert_eq!(&panic_location_file.unwrap(), file!());
74+
75+
Ok(())
76+
}
77+
78+
#[test]
79+
80+
fn local_pool_handle_new_panic_caller() -> Result<(), Box<dyn Error>> {
81+
let panic_location_file = test_panic(|| {
82+
let _ = LocalPoolHandle::new(0);
83+
});
84+
85+
// The panic location should be in this file
86+
assert_eq!(&panic_location_file.unwrap(), file!());
87+
88+
Ok(())
89+
}
90+
91+
#[test]
92+
93+
fn local_pool_handle_spawn_pinned_by_idx_panic_caller() -> Result<(), Box<dyn Error>> {
94+
let panic_location_file = test_panic(|| {
95+
let rt = basic();
96+
97+
rt.block_on(async {
98+
let handle = LocalPoolHandle::new(2);
99+
handle.spawn_pinned_by_idx(|| async { "test" }, 3);
100+
});
101+
});
102+
103+
// The panic location should be in this file
104+
assert_eq!(&panic_location_file.unwrap(), file!());
105+
106+
Ok(())
107+
}
108+
#[test]
109+
fn delay_queue_insert_at_panic_caller() -> Result<(), Box<dyn Error>> {
110+
let panic_location_file = test_panic(|| {
111+
let rt = basic();
112+
rt.block_on(async {
113+
let mut queue = task::spawn(DelayQueue::with_capacity(3));
114+
115+
//let st = std::time::Instant::from(SystemTime::UNIX_EPOCH);
116+
let _k = queue.insert_at(
117+
"1",
118+
Instant::now() + Duration::from_millis(MAX_DURATION_MS + 1),
119+
);
120+
});
121+
});
122+
123+
// The panic location should be in this file
124+
assert_eq!(&panic_location_file.unwrap(), file!());
125+
126+
Ok(())
127+
}
128+
129+
#[test]
130+
fn delay_queue_insert_panic_caller() -> Result<(), Box<dyn Error>> {
131+
let panic_location_file = test_panic(|| {
132+
let rt = basic();
133+
rt.block_on(async {
134+
let mut queue = task::spawn(DelayQueue::with_capacity(3));
135+
136+
let _k = queue.insert("1", Duration::from_millis(MAX_DURATION_MS + 1));
137+
});
138+
});
139+
140+
// The panic location should be in this file
141+
assert_eq!(&panic_location_file.unwrap(), file!());
142+
143+
Ok(())
144+
}
145+
146+
#[test]
147+
fn delay_queue_remove_panic_caller() -> Result<(), Box<dyn Error>> {
148+
let panic_location_file = test_panic(|| {
149+
let rt = basic();
150+
rt.block_on(async {
151+
let mut queue = task::spawn(DelayQueue::with_capacity(3));
152+
153+
let key = queue.insert_at("1", Instant::now());
154+
queue.remove(&key);
155+
queue.remove(&key);
156+
});
157+
});
158+
159+
// The panic location should be in this file
160+
assert_eq!(&panic_location_file.unwrap(), file!());
161+
162+
Ok(())
163+
}
164+
165+
#[test]
166+
fn delay_queue_reset_at_panic_caller() -> Result<(), Box<dyn Error>> {
167+
let panic_location_file = test_panic(|| {
168+
let rt = basic();
169+
rt.block_on(async {
170+
let mut queue = task::spawn(DelayQueue::with_capacity(3));
171+
172+
let key = queue.insert_at("1", Instant::now());
173+
queue.reset_at(
174+
&key,
175+
Instant::now() + Duration::from_millis(MAX_DURATION_MS + 1),
176+
);
177+
});
178+
});
179+
180+
// The panic location should be in this file
181+
assert_eq!(&panic_location_file.unwrap(), file!());
182+
183+
Ok(())
184+
}
185+
186+
#[test]
187+
fn delay_queue_reset_panic_caller() -> Result<(), Box<dyn Error>> {
188+
let panic_location_file = test_panic(|| {
189+
let rt = basic();
190+
rt.block_on(async {
191+
let mut queue = task::spawn(DelayQueue::with_capacity(3));
192+
193+
let key = queue.insert_at("1", Instant::now());
194+
queue.reset(&key, Duration::from_millis(MAX_DURATION_MS + 1));
195+
});
196+
});
197+
198+
// The panic location should be in this file
199+
assert_eq!(&panic_location_file.unwrap(), file!());
200+
201+
Ok(())
202+
}
203+
204+
#[test]
205+
fn delay_queue_reserve_panic_caller() -> Result<(), Box<dyn Error>> {
206+
let panic_location_file = test_panic(|| {
207+
let rt = basic();
208+
rt.block_on(async {
209+
let mut queue = task::spawn(DelayQueue::<u32>::with_capacity(3));
210+
211+
queue.reserve((1 << 30) as usize);
212+
});
213+
});
214+
215+
// The panic location should be in this file
216+
assert_eq!(&panic_location_file.unwrap(), file!());
217+
218+
Ok(())
219+
}
220+
221+
fn basic() -> Runtime {
222+
tokio::runtime::Builder::new_current_thread()
223+
.enable_all()
224+
.build()
225+
.unwrap()
226+
}

0 commit comments

Comments
 (0)