From fb981904f03f41f8690856a75de7b85c70fd4bd2 Mon Sep 17 00:00:00 2001 From: Techcable Date: Sat, 9 Aug 2025 12:37:46 -0700 Subject: [PATCH 1/4] Define a Drain::flush method This is simpler than the API in #332 , but requires more work by slog_async to implement it. --- src/lib.rs | 125 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index e9de127a..37c5efae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1079,6 +1079,16 @@ where let _ = self.drain.log(record, &self.list); } + /// Flush all pending log records, blocking until completion. + /// + /// Will call [`std::io::Write::flush`] if applicable. + /// + /// Returns [`FlushError::NotSupported`] if the underlying drain does not support [`Drain::flush`]. + #[inline] + pub fn flush(&self) -> result::Result<(), FlushError> { + self.drain.flush() + } + /// Get list of key-value pairs assigned to this `Logger` pub fn list(&self) -> &OwnedKVList { &self.list @@ -1159,6 +1169,53 @@ where fn is_enabled(&self, level: Level) -> bool { self.drain.is_enabled(level) } + + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + self.drain.flush() + } +} + +/// An error that occurs when calling [`Drain::flush`]. +#[non_exhaustive] +#[derive(Debug)] +pub enum FlushError { + /// An error that occurs doing IO. + /// + /// Often triggered by [`std::io::Write::flush`] + #[cfg(feature = "std")] + Io(std::io::Error), + /// Indicates this drain does not support flushing. + NotSupported, +} +#[cfg(feature = "std")] +impl From for FlushError { + fn from(value: std::io::Error) -> Self { + FlushError::Io(value) + } +} +#[cfg(has_std_error)] +impl StdError for FlushError { + fn source(&self) -> Option<&(dyn StdError + 'static)> { + match self { + #[cfg(feature = "std")] + FlushError::Io(cause) => Some(cause), + FlushError::NotSupported => None, + } + } +} +impl fmt::Display for FlushError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + #[cfg(feature = "std")] + FlushError::Io(_) => { + f.write_str("Encountered IO error during flushing") + } + FlushError::NotSupported => { + f.write_str("Drain does not support flushing") + } + } + } } // {{{ Drain @@ -1203,6 +1260,15 @@ pub trait Drain { values: &OwnedKVList, ) -> result::Result; + /// Flush all pending log records, blocking until completion. + /// + /// Should call [`std::io::Write::flush`] if applicable. + /// + /// Returns [`FlushError::NotSupported`] if the drain has not implemented this method. + fn flush(&self) -> result::Result<(), FlushError> { + Err(FlushError::NotSupported) + } + /// **Avoid**: Check if messages at the specified log level are **maybe** /// enabled for this logger. /// @@ -1365,6 +1431,10 @@ impl<'a, D: Drain + 'a> Drain for &'a D { fn is_enabled(&self, level: Level) -> bool { (**self).is_enabled(level) } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + (**self).flush() + } } impl<'a, D: Drain + 'a> Drain for &'a mut D { @@ -1382,6 +1452,10 @@ impl<'a, D: Drain + 'a> Drain for &'a mut D { fn is_enabled(&self, level: Level) -> bool { (**self).is_enabled(level) } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + (**self).flush() + } } /// Internal utility module used to "maybe" bound traits @@ -1543,6 +1617,10 @@ impl Drain for Box { fn is_enabled(&self, level: Level) -> bool { (**self).is_enabled(level) } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + (**self).flush() + } } impl Drain for Arc { @@ -1559,6 +1637,10 @@ impl Drain for Arc { fn is_enabled(&self, level: Level) -> bool { (**self).is_enabled(level) } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + (**self).flush() + } } /// `Drain` discarding everything @@ -1582,6 +1664,10 @@ impl Drain for Discard { fn is_enabled(&self, _level: Level) -> bool { false } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + Ok(()) + } } /// `Drain` filtering records @@ -1630,6 +1716,10 @@ where */ self.0.is_enabled(level) } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + self.0.flush() + } } /// `Drain` filtering records by `Record` logging level @@ -1670,6 +1760,10 @@ impl Drain for LevelFilter { fn is_enabled(&self, level: Level) -> bool { level.is_at_least(self.1) && self.0.is_enabled(level) } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + self.0.flush() + } } /// `Drain` mapping error returned by another `Drain` @@ -1711,6 +1805,10 @@ impl Drain for MapError { fn is_enabled(&self, level: Level) -> bool { self.drain.is_enabled(level) } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + self.drain.flush() + } } /// `Drain` duplicating records into two other `Drain`s @@ -1750,6 +1848,17 @@ impl Drain for Duplicate { fn is_enabled(&self, level: Level) -> bool { self.0.is_enabled(level) || self.1.is_enabled(level) } + /// Flush both drains. + /// + /// Will return [`FlushError::NotSupported`] if either drain does not support flushing. + /// If one drain supports flushing and the other does not, + /// it is unspecified whether or not anything will be flushed at all. + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + self.0.flush()?; + self.1.flush()?; + Ok(()) + } } /// `Drain` panicking on error @@ -1796,6 +1905,10 @@ where fn is_enabled(&self, level: Level) -> bool { self.0.is_enabled(level) } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + self.0.flush() + } } /// `Drain` ignoring result @@ -1832,6 +1945,11 @@ impl Drain for IgnoreResult { fn is_enabled(&self, level: Level) -> bool { self.drain.is_enabled(level) } + + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + self.drain.flush() + } } /// Error returned by `Mutex` @@ -1927,6 +2045,13 @@ impl Drain for std::sync::Mutex { fn is_enabled(&self, level: Level) -> bool { self.lock().ok().map_or(true, |lock| lock.is_enabled(level)) } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + let guard = self.lock().map_err(|_poison| { + std::io::Error::new(std::io::ErrorKind::Other, "Mutex is poisoned") + })?; + guard.flush() + } } #[cfg(feature = "parking_lot_0_12")] From 559df850ff7f20c6eeec36445eb012551157de87 Mon Sep 17 00:00:00 2001 From: Techcable Date: Sat, 27 Sep 2025 16:41:24 -0700 Subject: [PATCH 2/4] Add FlushError::Custom wrapping a Box Requires either `feature = "std"` or core::error::Error (rust >= 1.81) --- src/lib.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 37c5efae..feb578eb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1187,6 +1187,9 @@ pub enum FlushError { Io(std::io::Error), /// Indicates this drain does not support flushing. NotSupported, + /// A custom error, which is not directly caused by IO or [`FlushError::NotSupported`]. + #[cfg(has_std_error)] + Custom(Box), } #[cfg(feature = "std")] impl From for FlushError { @@ -1201,6 +1204,8 @@ impl StdError for FlushError { #[cfg(feature = "std")] FlushError::Io(cause) => Some(cause), FlushError::NotSupported => None, + #[cfg(has_std_error)] + FlushError::Custom(cause) => Some(&**cause), } } } @@ -1214,6 +1219,10 @@ impl fmt::Display for FlushError { FlushError::NotSupported => { f.write_str("Drain does not support flushing") } + #[cfg(has_std_error)] + FlushError::Custom(cause) => { + write!(f, "Encountered error during flushing: {cause}") + } } } } From 1d2d9fe7b2e375783069bdacd4cb6371fddca052 Mon Sep 17 00:00:00 2001 From: Techcable Date: Wed, 24 Sep 2025 12:21:47 -0700 Subject: [PATCH 3/4] Make Duplicate::flush more robust on failure --- src/lib.rs | 81 ++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 75 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index feb578eb..d8de5884 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1190,6 +1190,8 @@ pub enum FlushError { /// A custom error, which is not directly caused by IO or [`FlushError::NotSupported`]. #[cfg(has_std_error)] Custom(Box), + /// An error caused by calling [`slog::Duplicate::flush`]. + Duplicate(Box), } #[cfg(feature = "std")] impl From for FlushError { @@ -1206,6 +1208,7 @@ impl StdError for FlushError { FlushError::NotSupported => None, #[cfg(has_std_error)] FlushError::Custom(cause) => Some(&**cause), + FlushError::Duplicate(cause) => Some(cause), } } } @@ -1223,10 +1226,67 @@ impl fmt::Display for FlushError { FlushError::Custom(cause) => { write!(f, "Encountered error during flushing: {cause}") } + FlushError::Duplicate(cause) => write!(f, "{cause}"), } } } +/// An error from calling [`Duplicate::flush`]. +/// +/// This means that at least one of the drains has failed to flush. +#[derive(Debug)] +pub enum DuplicateDrainFlushError { + /// Occurs when calling [`Drain::flush`] the left (first) drain of [`Duplicate`] fails, + /// but flushing the right drain succeeds. + Left(FlushError), + /// Occurs when calling [`Drain::flush`] the right (second) drain of [`Duplicate`] fails, + /// but flushing the left drain succeeds. + Right(FlushError), + /// Occurs when calling [`Drain::flush`] fails for both the left and right + /// (first and second) drains of [`Duplicate`]. + Both(FlushError, FlushError), +} +impl DuplicateDrainFlushError { + /// If flushing the left drain triggered an error, then return it. + /// + /// Returns `None` if flushing the left drain did not cause an error. + pub fn left(&self) -> Option<&'_ FlushError> { + match self { + DuplicateDrainFlushError::Left(left) + | DuplicateDrainFlushError::Both(left, _) => Some(left), + DuplicateDrainFlushError::Right(_) => None, + } + } + + /// If flushing the right drain triggered an error, then return it. + /// + /// Returns `None` if flushing the right drain did not cause an error. + pub fn right(&self) -> Option<&'_ FlushError> { + match self { + DuplicateDrainFlushError::Right(_) => None, + DuplicateDrainFlushError::Both(left, _) => Some(left), + DuplicateDrainFlushError::Left(_) => None, + } + } +} +#[cfg(has_std_error)] +impl StdError for DuplicateDrainFlushError {} +impl fmt::Display for DuplicateDrainFlushError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let (single, name) = match self { + DuplicateDrainFlushError::Left(single) => (single, "left"), + DuplicateDrainFlushError::Right(single) => (single, "right"), + DuplicateDrainFlushError::Both(left, right) => { + return write!( + f, + "Failed to flush both drains: ({left}) and ({right})" + ); + } + }; + write!(f, "Failed to flush {name} drain: ({single})") + } +} + // {{{ Drain /// Logging drain /// @@ -1859,14 +1919,23 @@ impl Drain for Duplicate { } /// Flush both drains. /// - /// Will return [`FlushError::NotSupported`] if either drain does not support flushing. - /// If one drain supports flushing and the other does not, - /// it is unspecified whether or not anything will be flushed at all. + /// If one or both of the drains fails, this will return a [`DuplicateDrainFlushError`]. + /// Even if the first drain fails with an error, the second one will still be flushed. #[inline] fn flush(&self) -> result::Result<(), FlushError> { - self.0.flush()?; - self.1.flush()?; - Ok(()) + let first_res = self.0.flush(); + let second_res = self.1.flush(); + let err = match (first_res, second_res) { + (Ok(()), Ok(())) => { + return Ok(()); // short-circuit success + } + (Err(left), Ok(())) => DuplicateDrainFlushError::Left(left), + (Ok(()), Err(right)) => DuplicateDrainFlushError::Right(right), + (Err(left), Err(right)) => { + DuplicateDrainFlushError::Both(left, right) + } + }; + Err(FlushError::Duplicate(Box::new(err))) } } From a59a459c954a330956d2240f2a92950518c8c73f Mon Sep 17 00:00:00 2001 From: Techcable Date: Wed, 24 Sep 2025 14:33:53 -0700 Subject: [PATCH 4/4] More detailed docs for Drain::flush --- src/lib.rs | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index d8de5884..17b0c115 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1081,7 +1081,8 @@ where /// Flush all pending log records, blocking until completion. /// - /// Will call [`std::io::Write::flush`] if applicable. + /// In many cases this is equivalent to calling [`std::io::Write::flush`] on the underlying stream. + /// See docs on [`Drain::flush`] for more details. /// /// Returns [`FlushError::NotSupported`] if the underlying drain does not support [`Drain::flush`]. #[inline] @@ -1331,9 +1332,37 @@ pub trait Drain { /// Flush all pending log records, blocking until completion. /// + /// This method is logically idempotent. + /// In theory, two successive flush calls are the same as one: + /// They will both flush all message up to the point of the final. + /// In practice, this is not actually the case as IO is complicated and + /// flush calls can return errors. + /// /// Should call [`std::io::Write::flush`] if applicable. + /// If this drain wraps another drain, + /// it should delegate to the flush call of the wrapped drain. + /// + /// An implementation of flush should try to avoid blocking log operations while the flush is in progress. + /// Unfortunately, there are some cases like `impl Drain for Mutex` where this is not possible. + /// + /// A flush call is only required to flush records that are queued before the start of the call. + /// In particular, consider the following interleaving of events + /// ```text + /// thread1 drain.log(record1) + /// thread1: drain.flush() begin + /// thread2: drain.log(record2) + /// thread2: drain.flush() finish + /// ``` + /// In this case, the drain is only required to flush `record1`. + /// It may or may not flush `record2`. + /// This is mainly relevant for the implementation of [`slog_async::Async`], + /// as we have no control over the implementation of [`std::io::Write::flush`]. + /// This behavior is chosen to prevent a flush call from blocking indefinitely + /// in the case of concurrent logging by other threads. /// /// Returns [`FlushError::NotSupported`] if the drain has not implemented this method. + /// + /// [`slog_async::Async`]: https://docs.rs/slog-async/latest/slog_async/struct.Async.html fn flush(&self) -> result::Result<(), FlushError> { Err(FlushError::NotSupported) }