From 374494cdeeaf0a34125bc1622ec0fc92cfe80230 Mon Sep 17 00:00:00 2001 From: Amberley Su Date: Mon, 24 Mar 2025 04:59:57 +0000 Subject: [PATCH 1/4] made shutdown timeout configurable, included integration tests and unit tests --- tracing-appender/src/non_blocking.rs | 179 +++++++++++++++++- .../tests/non-blocking-timeout.rs | 62 ++++++ 2 files changed, 235 insertions(+), 6 deletions(-) create mode 100644 tracing-appender/tests/non-blocking-timeout.rs diff --git a/tracing-appender/src/non_blocking.rs b/tracing-appender/src/non_blocking.rs index fe43ecc0af..f63ab0e285 100644 --- a/tracing-appender/src/non_blocking.rs +++ b/tracing-appender/src/non_blocking.rs @@ -106,6 +106,7 @@ pub struct WorkerGuard { _guard: Option>, sender: Sender, shutdown: Sender<()>, + shutdown_timeout: Duration, } /// A non-blocking writer. @@ -144,7 +145,7 @@ impl NonBlocking { /// The returned `NonBlocking` writer will have the [default configuration][default] values. /// Other configurations can be specified using the [builder] interface. /// - /// [default]: NonBlockingBuilder::default + /// [default]: NonBlockingBuilder::default() /// [builder]: NonBlockingBuilder pub fn new(writer: T) -> (NonBlocking, WorkerGuard) { NonBlockingBuilder::default().finish(writer) @@ -155,6 +156,7 @@ impl NonBlocking { buffered_lines_limit: usize, is_lossy: bool, thread_name: String, + shutdown_timeout: Duration, ) -> (NonBlocking, WorkerGuard) { let (sender, receiver) = bounded(buffered_lines_limit); @@ -165,6 +167,7 @@ impl NonBlocking { worker.worker_thread(thread_name), sender.clone(), shutdown_sender, + shutdown_timeout, ); ( @@ -192,6 +195,7 @@ pub struct NonBlockingBuilder { buffered_lines_limit: usize, is_lossy: bool, thread_name: String, + shutdown_timeout: Duration, } impl NonBlockingBuilder { @@ -227,8 +231,20 @@ impl NonBlockingBuilder { self.buffered_lines_limit, self.is_lossy, self.thread_name, + self.shutdown_timeout, ) } + + /// Sets the timeout for shutdown of the worker thread. + /// + /// This is the maximum amount of time the main thread will wait + /// for the worker thread to finish proccessing pending logs during shutdown + /// + /// The default timeout is 1 second. + pub fn shutdown_timeout(mut self, timeout: Duration) -> NonBlockingBuilder { + self.shutdown_timeout = timeout; + self + } } impl Default for NonBlockingBuilder { @@ -237,6 +253,7 @@ impl Default for NonBlockingBuilder { buffered_lines_limit: DEFAULT_BUFFERED_LINES_LIMIT, is_lossy: true, thread_name: "tracing-appender".to_string(), + shutdown_timeout: Duration::from_secs(1), } } } @@ -276,11 +293,17 @@ impl<'a> MakeWriter<'a> for NonBlocking { } impl WorkerGuard { - fn new(handle: JoinHandle<()>, sender: Sender, shutdown: Sender<()>) -> Self { + fn new( + handle: JoinHandle<()>, + sender: Sender, + shutdown: Sender<()>, + shutdown_timeout: Duration, + ) -> Self { WorkerGuard { _guard: Some(handle), sender, shutdown, + shutdown_timeout, } } } @@ -295,14 +318,27 @@ impl Drop for WorkerGuard { // Attempt to wait for `Worker` to flush all messages before dropping. This happens // when the `Worker` calls `recv()` on a zero-capacity channel. Use `send_timeout` // so that drop is not blocked indefinitely. - // TODO: Make timeout configurable. - let _ = self.shutdown.send_timeout((), Duration::from_millis(1000)); + // The shutdown timeout now is configurable + match self.shutdown.send_timeout((), self.shutdown_timeout) { + Ok(_) => (), + Err(SendTimeoutError::Timeout(_)) => { + eprintln!( + "Shutting down logging worker timed out after {:?}.", + self.shutdown_timeout + ); + } + Err(SendTimeoutError::Disconnected(_)) => { + eprintln!("Shutdown failed because logging worker was disconnected"); + } + } } - Err(SendTimeoutError::Disconnected(_)) => (), - Err(SendTimeoutError::Timeout(e)) => println!( + Err(SendTimeoutError::Timeout(e)) => eprintln!( "Failed to send shutdown signal to logging worker. Error: {:?}", e ), + Err(SendTimeoutError::Disconnected(_)) => { + eprintln!("Logging worker disconnected before shutdown signal"); + } } } } @@ -493,4 +529,135 @@ mod test { assert_eq!(10, hello_count); assert_eq!(0, error_count.dropped_lines()); } + + use std::{ + io::{self, Write}, + sync::atomic::{AtomicUsize, Ordering}, + sync::Arc, + }; + + struct ControlledWriter { + counter: Arc, + ready_tx: mpsc::Sender<()>, + proceed_rx: mpsc::Receiver<()>, + } + + impl ControlledWriter { + fn new() -> (Self, mpsc::Sender<()>, mpsc::Receiver<()>) { + let (ready_tx, ready_rx) = mpsc::channel(); + let (proceed_tx, proceed_rx) = mpsc::channel(); + let writer = ControlledWriter { + counter: Arc::new(AtomicUsize::new(0)), + ready_tx, + proceed_rx, + }; + (writer, proceed_tx, ready_rx) + } + } + + impl Write for ControlledWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.ready_tx.send(()).unwrap(); + self.proceed_rx.recv().unwrap(); + self.counter.fetch_add(1, Ordering::SeqCst); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + #[test] + fn test_complete_message_processing() { + let (writer, proceed_tx, ready_rx) = ControlledWriter::new(); + let counter = writer.counter.clone(); + + let (mut non_blocking, guard) = NonBlockingBuilder::default().finish(writer); + + for i in 0..3 { + non_blocking + .write_all(format!("msg{}\n", i).as_bytes()) + .unwrap(); + } + + for _ in 0..3 { + ready_rx.recv().unwrap(); + proceed_tx.send(()).unwrap(); + } + + drop(guard); + + assert_eq!( + counter.load(Ordering::SeqCst), + 3, + "All messages should be processed" + ); + } + + #[test] + fn test_partial_message_processing() { + let (writer, proceed_tx, ready_rx) = ControlledWriter::new(); + let counter = writer.counter.clone(); + + let (mut non_blocking, guard) = NonBlockingBuilder::default().finish(writer); + + for i in 0..3 { + non_blocking + .write_all(format!("msg{}\n", i).as_bytes()) + .unwrap(); + } + + ready_rx.recv().unwrap(); + proceed_tx.send(()).unwrap(); + + drop(guard); + + let processed = counter.load(Ordering::SeqCst); + assert!(processed >= 1, "At least one message should be processed"); + assert!(processed < 3, "Not all messages should be processed"); + } + + #[test] + fn test_no_message_processing() { + let (writer, _proceed_tx, _ready_rx) = ControlledWriter::new(); + let counter = writer.counter.clone(); + + let (mut non_blocking, guard) = NonBlockingBuilder::default().finish(writer); + + for i in 0..3 { + non_blocking + .write_all(format!("msg{}\n", i).as_bytes()) + .unwrap(); + } + + drop(guard); + + assert_eq!( + counter.load(Ordering::SeqCst), + 0, + "No messages should be processed" + ); + } + + #[test] + fn test_single_message_processing() { + let (writer, proceed_tx, ready_rx) = ControlledWriter::new(); + let counter = writer.counter.clone(); + + let (mut non_blocking, guard) = NonBlockingBuilder::default().finish(writer); + + non_blocking.write_all(b"single message\n").unwrap(); + + ready_rx.recv().unwrap(); + proceed_tx.send(()).unwrap(); + + drop(guard); + + assert_eq!( + counter.load(Ordering::SeqCst), + 1, + "Single message should be processed" + ); + } } diff --git a/tracing-appender/tests/non-blocking-timeout.rs b/tracing-appender/tests/non-blocking-timeout.rs new file mode 100644 index 0000000000..7c63ec870c --- /dev/null +++ b/tracing-appender/tests/non-blocking-timeout.rs @@ -0,0 +1,62 @@ +use std::{ + io::{self, Write}, + sync::atomic::{AtomicBool, AtomicU64, Ordering}, + thread, + time::{Duration, Instant}, +}; +use tracing_appender::non_blocking::NonBlockingBuilder; + +static BLOCK_IN_WORKER: AtomicBool = AtomicBool::new(false); +static BLOCK_DURATION_SECS: AtomicU64 = AtomicU64::new(3); + +struct BlockingMemoryWriter { + buffer: Vec, +} + +impl BlockingMemoryWriter { + fn new() -> Self { + Self { buffer: Vec::new() } + } +} + +impl Write for BlockingMemoryWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + if BLOCK_IN_WORKER.load(Ordering::Relaxed) { + let block_secs = BLOCK_DURATION_SECS.load(Ordering::Relaxed); + thread::sleep(Duration::from_secs(block_secs)); + } + self.buffer.extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +#[test] +fn test_shutdown_timeout_behavior() { + let timeout = Duration::from_millis(300); + let blocking_writer = BlockingMemoryWriter::new(); + + let (mut non_blocking, guard) = NonBlockingBuilder::default() + .shutdown_timeout(timeout) + .finish(blocking_writer); + + non_blocking.write_all(b"test data\n").unwrap(); + + thread::sleep(Duration::from_millis(50)); + BLOCK_IN_WORKER.store(true, Ordering::Relaxed); + non_blocking.write_all(b"blocking data\n").unwrap(); + + let start = Instant::now(); + drop(guard); + let elapsed = start.elapsed(); + + assert!( + elapsed >= timeout, + "Shutdown completed before timeout: {:?}, expected at least {:?}", + elapsed, + timeout + ); +} From 144982d06b2855bcdae26ab2919ce557ff3fb24b Mon Sep 17 00:00:00 2001 From: Amberley Su Date: Mon, 24 Mar 2025 05:01:13 +0000 Subject: [PATCH 2/4] added in lib file to pass clippy checks --- tracing-journald/src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tracing-journald/src/lib.rs b/tracing-journald/src/lib.rs index 3f0e561284..842c4acd2e 100644 --- a/tracing-journald/src/lib.rs +++ b/tracing-journald/src/lib.rs @@ -515,11 +515,11 @@ pub struct PriorityMappings { impl PriorityMappings { /// Returns the default priority mappings: /// - /// - [`tracing::Level::ERROR`]: [`Priority::Error`] (3) - /// - [`tracing::Level::WARN`]: [`Priority::Warning`] (4) - /// - [`tracing::Level::INFO`]: [`Priority::Notice`] (5) - /// - [`tracing::Level::DEBUG`]: [`Priority::Informational`] (6) - /// - [`tracing::Level::TRACE`]: [`Priority::Debug`] (7) + /// - [`tracing::Level::ERROR`][]: [`Priority::Error`] (3) + /// - [`tracing::Level::WARN`][]: [`Priority::Warning`] (4) + /// - [`tracing::Level::INFO`][]: [`Priority::Notice`] (5) + /// - [`tracing::Level::DEBUG`][]: [`Priority::Informational`] (6) + /// - [`tracing::Level::TRACE`][]: [`Priority::Debug`] (7) pub fn new() -> PriorityMappings { Self { error: Priority::Error, From d83c01cf771b7ad2a398edc914db8f43ea587ad3 Mon Sep 17 00:00:00 2001 From: Amberley Su Date: Tue, 8 Apr 2025 05:41:39 +0000 Subject: [PATCH 3/4] fixing clippy checks on overindentation and on `.next_back()` replacing `.last()` --- tracing-attributes/src/expand.rs | 2 +- tracing-futures/src/lib.rs | 16 ++++++------- tracing-subscriber/src/filter/env/mod.rs | 14 +++++------ .../src/filter/layer_filters/mod.rs | 2 +- tracing-subscriber/src/fmt/fmt_layer.rs | 20 ++++++++-------- tracing-subscriber/src/fmt/mod.rs | 24 +++++++++---------- tracing/src/lib.rs | 2 +- 7 files changed, 40 insertions(+), 40 deletions(-) diff --git a/tracing-attributes/src/expand.rs b/tracing-attributes/src/expand.rs index ec4bc28181..6481de758e 100644 --- a/tracing-attributes/src/expand.rs +++ b/tracing-attributes/src/expand.rs @@ -453,7 +453,7 @@ impl RecordType { if path .segments .iter() - .last() + .next_back() .map(|path_segment| { let ident = path_segment.ident.to_string(); Self::TYPES_FOR_VALUE.iter().any(|&t| t == ident) diff --git a/tracing-futures/src/lib.rs b/tracing-futures/src/lib.rs index 335e7aa5cc..9b92959dda 100644 --- a/tracing-futures/src/lib.rs +++ b/tracing-futures/src/lib.rs @@ -25,15 +25,15 @@ //! features with other crates in the asynchronous ecosystem: //! //! - `tokio`: Enables compatibility with the `tokio` crate, including -//! [`Instrument`] and [`WithSubscriber`] implementations for -//! `tokio::executor::Executor`, `tokio::runtime::Runtime`, and -//! `tokio::runtime::current_thread`. Enabled by default. +//! [`Instrument`] and [`WithSubscriber`] implementations for +//! `tokio::executor::Executor`, `tokio::runtime::Runtime`, and +//! `tokio::runtime::current_thread`. Enabled by default. //! - `tokio-executor`: Enables compatibility with the `tokio-executor` -//! crate, including [`Instrument`] and [`WithSubscriber`] -//! implementations for types implementing `tokio_executor::Executor`. -//! This is intended primarily for use in crates which depend on -//! `tokio-executor` rather than `tokio`; in general the `tokio` feature -//! should be used instead. +//! crate, including [`Instrument`] and [`WithSubscriber`] +//! implementations for types implementing `tokio_executor::Executor`. +//! This is intended primarily for use in crates which depend on +//! `tokio-executor` rather than `tokio`; in general the `tokio` feature +//! should be used instead. //! - `std-future`: Enables compatibility with `std::future::Future`. //! - `futures-01`: Enables compatibility with version 0.1.x of the [`futures`] //! crate. diff --git a/tracing-subscriber/src/filter/env/mod.rs b/tracing-subscriber/src/filter/env/mod.rs index 813e32b447..314e065ca2 100644 --- a/tracing-subscriber/src/filter/env/mod.rs +++ b/tracing-subscriber/src/filter/env/mod.rs @@ -56,16 +56,16 @@ use tracing_core::{ /// Each component (`target`, `span`, `field`, `value`, and `level`) will be covered in turn. /// /// - `target` matches the event or span's target. In general, this is the module path and/or crate name. -/// Examples of targets `h2`, `tokio::net`, or `tide::server`. For more information on targets, -/// please refer to [`Metadata`]'s documentation. +/// Examples of targets `h2`, `tokio::net`, or `tide::server`. For more information on targets, +/// please refer to [`Metadata`]'s documentation. /// - `span` matches on the span's name. If a `span` directive is provided alongside a `target`, -/// the `span` directive will match on spans _within_ the `target`. +/// the `span` directive will match on spans _within_ the `target`. /// - `field` matches on [fields] within spans. Field names can also be supplied without a `value` -/// and will match on any [`Span`] or [`Event`] that has a field with that name. -/// For example: `[span{field=\"value\"}]=debug`, `[{field}]=trace`. +/// and will match on any [`Span`] or [`Event`] that has a field with that name. +/// For example: `[span{field=\"value\"}]=debug`, `[{field}]=trace`. /// - `value` matches on the value of a span's field. If a value is a numeric literal or a bool, -/// it will match _only_ on that value. Otherwise, this filter matches the -/// [`std::fmt::Debug`] output from the value. +/// it will match _only_ on that value. Otherwise, this filter matches the +/// [`std::fmt::Debug`] output from the value. /// - `level` sets a maximum verbosity level accepted by this directive. /// /// When a field value directive (`[{=}]=...`) matches a diff --git a/tracing-subscriber/src/filter/layer_filters/mod.rs b/tracing-subscriber/src/filter/layer_filters/mod.rs index f349d4ce6a..078051c783 100644 --- a/tracing-subscriber/src/filter/layer_filters/mod.rs +++ b/tracing-subscriber/src/filter/layer_filters/mod.rs @@ -135,7 +135,7 @@ impl FilterMap { /// 2. If all the bits are set, then every per-layer filter has decided it /// doesn't want to enable that span or event. In that case, the /// `Registry`'s `enabled` method will return `false`, so that -/// recording a span or event can be skipped entirely. +/// recording a span or event can be skipped entirely. #[derive(Debug)] pub(crate) struct FilterState { enabled: Cell, diff --git a/tracing-subscriber/src/fmt/fmt_layer.rs b/tracing-subscriber/src/fmt/fmt_layer.rs index 15351767e0..25c0f3c3a1 100644 --- a/tracing-subscriber/src/fmt/fmt_layer.rs +++ b/tracing-subscriber/src/fmt/fmt_layer.rs @@ -442,22 +442,22 @@ where /// The following options are available: /// /// - `FmtSpan::NONE`: No events will be synthesized when spans are - /// created, entered, exited, or closed. Data from spans will still be - /// included as the context for formatted events. This is the default. + /// created, entered, exited, or closed. Data from spans will still be + /// included as the context for formatted events. This is the default. /// - `FmtSpan::NEW`: An event will be synthesized when spans are created. /// - `FmtSpan::ENTER`: An event will be synthesized when spans are entered. /// - `FmtSpan::EXIT`: An event will be synthesized when spans are exited. /// - `FmtSpan::CLOSE`: An event will be synthesized when a span closes. If - /// [timestamps are enabled][time] for this formatter, the generated - /// event will contain fields with the span's _busy time_ (the total - /// time for which it was entered) and _idle time_ (the total time that - /// the span existed but was not entered). + /// [timestamps are enabled][time] for this formatter, the generated + /// event will contain fields with the span's _busy time_ (the total + /// time for which it was entered) and _idle time_ (the total time that + /// the span existed but was not entered). /// - `FmtSpan::ACTIVE`: Events will be synthesized when spans are entered - /// or exited. + /// or exited. /// - `FmtSpan::FULL`: Events will be synthesized whenever a span is - /// created, entered, exited, or closed. If timestamps are enabled, the - /// close event will contain the span's busy and idle time, as - /// described above. + /// created, entered, exited, or closed. If timestamps are enabled, the + /// close event will contain the span's busy and idle time, as + /// described above. /// /// The options can be enabled in any combination. For instance, the following /// will synthesize events whenever spans are created and closed: diff --git a/tracing-subscriber/src/fmt/mod.rs b/tracing-subscriber/src/fmt/mod.rs index 6a80c0e346..ef04bd6a6b 100644 --- a/tracing-subscriber/src/fmt/mod.rs +++ b/tracing-subscriber/src/fmt/mod.rs @@ -36,9 +36,9 @@ //! //! For example: //! - Setting `RUST_LOG=debug` enables all `Span`s and `Event`s -//! set to the log level `DEBUG` or higher +//! set to the log level `DEBUG` or higher //! - Setting `RUST_LOG=my_crate=trace` enables `Span`s and `Event`s -//! in `my_crate` at all log levels +//! in `my_crate` at all log levels //! //! **Note**: This should **not** be called by libraries. Libraries should use //! [`tracing`] to publish `tracing` `Event`s. @@ -570,22 +570,22 @@ where /// The following options are available: /// /// - `FmtSpan::NONE`: No events will be synthesized when spans are - /// created, entered, exited, or closed. Data from spans will still be - /// included as the context for formatted events. This is the default. + /// created, entered, exited, or closed. Data from spans will still be + /// included as the context for formatted events. This is the default. /// - `FmtSpan::NEW`: An event will be synthesized when spans are created. /// - `FmtSpan::ENTER`: An event will be synthesized when spans are entered. /// - `FmtSpan::EXIT`: An event will be synthesized when spans are exited. /// - `FmtSpan::CLOSE`: An event will be synthesized when a span closes. If - /// [timestamps are enabled][time] for this formatter, the generated - /// event will contain fields with the span's _busy time_ (the total - /// time for which it was entered) and _idle time_ (the total time that - /// the span existed but was not entered). + /// [timestamps are enabled][time] for this formatter, the generated + /// event will contain fields with the span's _busy time_ (the total + /// time for which it was entered) and _idle time_ (the total time that + /// the span existed but was not entered). /// - `FmtSpan::ACTIVE`: An event will be synthesized when spans are entered - /// or exited. + /// or exited. /// - `FmtSpan::FULL`: Events will be synthesized whenever a span is - /// created, entered, exited, or closed. If timestamps are enabled, the - /// close event will contain the span's busy and idle time, as - /// described above. + /// created, entered, exited, or closed. If timestamps are enabled, the + /// close event will contain the span's busy and idle time, as + /// described above. /// /// The options can be enabled in any combination. For instance, the following /// will synthesize events whenever spans are created and closed: diff --git a/tracing/src/lib.rs b/tracing/src/lib.rs index 8b6c7f0b95..9b8fdde16a 100644 --- a/tracing/src/lib.rs +++ b/tracing/src/lib.rs @@ -710,7 +710,7 @@ //! `tracing-subscriber`'s `FmtSubscriber`, you don't need to depend on //! `tracing-log` directly. //! - [`tracing-appender`] provides utilities for outputting tracing data, -//! including a file appender and non blocking writer. +//! including a file appender and non blocking writer. //! //! Additionally, there are also several third-party crates which are not //! maintained by the `tokio` project. These include: From 7272fea17a02c4208a4ceb50b99d229cdf9f7081 Mon Sep 17 00:00:00 2001 From: Amberley Su Date: Fri, 11 Apr 2025 20:13:41 +0000 Subject: [PATCH 4/4] updated the broken doc links --- tracing-journald/src/lib.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/tracing-journald/src/lib.rs b/tracing-journald/src/lib.rs index 842c4acd2e..4fbe5ac67f 100644 --- a/tracing-journald/src/lib.rs +++ b/tracing-journald/src/lib.rs @@ -515,11 +515,17 @@ pub struct PriorityMappings { impl PriorityMappings { /// Returns the default priority mappings: /// - /// - [`tracing::Level::ERROR`][]: [`Priority::Error`] (3) - /// - [`tracing::Level::WARN`][]: [`Priority::Warning`] (4) - /// - [`tracing::Level::INFO`][]: [`Priority::Notice`] (5) - /// - [`tracing::Level::DEBUG`][]: [`Priority::Informational`] (6) - /// - [`tracing::Level::TRACE`][]: [`Priority::Debug`] (7) + /// - [`Level::ERROR`][]: [`Priority::Error`] (3) + /// - [`Level::WARN`][]: [`Priority::Warning`] (4) + /// - [`Level::INFO`][]: [`Priority::Notice`] (5) + /// - [`Level::DEBUG`][]: [`Priority::Informational`] (6) + /// - [`Level::TRACE`][]: [`Priority::Debug`] (7) + /// + /// [`Level::ERROR`]: tracing_core::Level::ERROR + /// [`Level::WARN`]: tracing_core::Level::WARN + /// [`Level::INFO`]: tracing_core::Level::INFO + /// [`Level::DEBUG`]: tracing_core::Level::DEBUG + /// [`Level::TRACE`]: tracing_core::Level::TRACE pub fn new() -> PriorityMappings { Self { error: Priority::Error,