diff --git a/src/lib.rs b/src/lib.rs index e9de127a..17b0c115 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1079,6 +1079,17 @@ where let _ = self.drain.log(record, &self.list); } + /// Flush all pending log records, blocking until completion. + /// + /// In many cases this is equivalent to calling [`std::io::Write::flush`] on the underlying stream. + /// See docs on [`Drain::flush`] for more details. + /// + /// Returns [`FlushError::NotSupported`] if the underlying drain does not support [`Drain::flush`]. + #[inline] + pub fn flush(&self) -> result::Result<(), FlushError> { + self.drain.flush() + } + /// Get list of key-value pairs assigned to this `Logger` pub fn list(&self) -> &OwnedKVList { &self.list @@ -1159,6 +1170,122 @@ where fn is_enabled(&self, level: Level) -> bool { self.drain.is_enabled(level) } + + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + self.drain.flush() + } +} + +/// An error that occurs when calling [`Drain::flush`]. +#[non_exhaustive] +#[derive(Debug)] +pub enum FlushError { + /// An error that occurs doing IO. + /// + /// Often triggered by [`std::io::Write::flush`] + #[cfg(feature = "std")] + Io(std::io::Error), + /// Indicates this drain does not support flushing. + NotSupported, + /// A custom error, which is not directly caused by IO or [`FlushError::NotSupported`]. + #[cfg(has_std_error)] + Custom(Box), + /// An error caused by calling [`slog::Duplicate::flush`]. + Duplicate(Box), +} +#[cfg(feature = "std")] +impl From for FlushError { + fn from(value: std::io::Error) -> Self { + FlushError::Io(value) + } +} +#[cfg(has_std_error)] +impl StdError for FlushError { + fn source(&self) -> Option<&(dyn StdError + 'static)> { + match self { + #[cfg(feature = "std")] + FlushError::Io(cause) => Some(cause), + FlushError::NotSupported => None, + #[cfg(has_std_error)] + FlushError::Custom(cause) => Some(&**cause), + FlushError::Duplicate(cause) => Some(cause), + } + } +} +impl fmt::Display for FlushError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + #[cfg(feature = "std")] + FlushError::Io(_) => { + f.write_str("Encountered IO error during flushing") + } + FlushError::NotSupported => { + f.write_str("Drain does not support flushing") + } + #[cfg(has_std_error)] + FlushError::Custom(cause) => { + write!(f, "Encountered error during flushing: {cause}") + } + FlushError::Duplicate(cause) => write!(f, "{cause}"), + } + } +} + +/// An error from calling [`Duplicate::flush`]. +/// +/// This means that at least one of the drains has failed to flush. +#[derive(Debug)] +pub enum DuplicateDrainFlushError { + /// Occurs when calling [`Drain::flush`] the left (first) drain of [`Duplicate`] fails, + /// but flushing the right drain succeeds. + Left(FlushError), + /// Occurs when calling [`Drain::flush`] the right (second) drain of [`Duplicate`] fails, + /// but flushing the left drain succeeds. + Right(FlushError), + /// Occurs when calling [`Drain::flush`] fails for both the left and right + /// (first and second) drains of [`Duplicate`]. + Both(FlushError, FlushError), +} +impl DuplicateDrainFlushError { + /// If flushing the left drain triggered an error, then return it. + /// + /// Returns `None` if flushing the left drain did not cause an error. + pub fn left(&self) -> Option<&'_ FlushError> { + match self { + DuplicateDrainFlushError::Left(left) + | DuplicateDrainFlushError::Both(left, _) => Some(left), + DuplicateDrainFlushError::Right(_) => None, + } + } + + /// If flushing the right drain triggered an error, then return it. + /// + /// Returns `None` if flushing the right drain did not cause an error. + pub fn right(&self) -> Option<&'_ FlushError> { + match self { + DuplicateDrainFlushError::Right(_) => None, + DuplicateDrainFlushError::Both(left, _) => Some(left), + DuplicateDrainFlushError::Left(_) => None, + } + } +} +#[cfg(has_std_error)] +impl StdError for DuplicateDrainFlushError {} +impl fmt::Display for DuplicateDrainFlushError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let (single, name) = match self { + DuplicateDrainFlushError::Left(single) => (single, "left"), + DuplicateDrainFlushError::Right(single) => (single, "right"), + DuplicateDrainFlushError::Both(left, right) => { + return write!( + f, + "Failed to flush both drains: ({left}) and ({right})" + ); + } + }; + write!(f, "Failed to flush {name} drain: ({single})") + } } // {{{ Drain @@ -1203,6 +1330,43 @@ pub trait Drain { values: &OwnedKVList, ) -> result::Result; + /// Flush all pending log records, blocking until completion. + /// + /// This method is logically idempotent. + /// In theory, two successive flush calls are the same as one: + /// They will both flush all message up to the point of the final. + /// In practice, this is not actually the case as IO is complicated and + /// flush calls can return errors. + /// + /// Should call [`std::io::Write::flush`] if applicable. + /// If this drain wraps another drain, + /// it should delegate to the flush call of the wrapped drain. + /// + /// An implementation of flush should try to avoid blocking log operations while the flush is in progress. + /// Unfortunately, there are some cases like `impl Drain for Mutex` where this is not possible. + /// + /// A flush call is only required to flush records that are queued before the start of the call. + /// In particular, consider the following interleaving of events + /// ```text + /// thread1 drain.log(record1) + /// thread1: drain.flush() begin + /// thread2: drain.log(record2) + /// thread2: drain.flush() finish + /// ``` + /// In this case, the drain is only required to flush `record1`. + /// It may or may not flush `record2`. + /// This is mainly relevant for the implementation of [`slog_async::Async`], + /// as we have no control over the implementation of [`std::io::Write::flush`]. + /// This behavior is chosen to prevent a flush call from blocking indefinitely + /// in the case of concurrent logging by other threads. + /// + /// Returns [`FlushError::NotSupported`] if the drain has not implemented this method. + /// + /// [`slog_async::Async`]: https://docs.rs/slog-async/latest/slog_async/struct.Async.html + fn flush(&self) -> result::Result<(), FlushError> { + Err(FlushError::NotSupported) + } + /// **Avoid**: Check if messages at the specified log level are **maybe** /// enabled for this logger. /// @@ -1365,6 +1529,10 @@ impl<'a, D: Drain + 'a> Drain for &'a D { fn is_enabled(&self, level: Level) -> bool { (**self).is_enabled(level) } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + (**self).flush() + } } impl<'a, D: Drain + 'a> Drain for &'a mut D { @@ -1382,6 +1550,10 @@ impl<'a, D: Drain + 'a> Drain for &'a mut D { fn is_enabled(&self, level: Level) -> bool { (**self).is_enabled(level) } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + (**self).flush() + } } /// Internal utility module used to "maybe" bound traits @@ -1543,6 +1715,10 @@ impl Drain for Box { fn is_enabled(&self, level: Level) -> bool { (**self).is_enabled(level) } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + (**self).flush() + } } impl Drain for Arc { @@ -1559,6 +1735,10 @@ impl Drain for Arc { fn is_enabled(&self, level: Level) -> bool { (**self).is_enabled(level) } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + (**self).flush() + } } /// `Drain` discarding everything @@ -1582,6 +1762,10 @@ impl Drain for Discard { fn is_enabled(&self, _level: Level) -> bool { false } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + Ok(()) + } } /// `Drain` filtering records @@ -1630,6 +1814,10 @@ where */ self.0.is_enabled(level) } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + self.0.flush() + } } /// `Drain` filtering records by `Record` logging level @@ -1670,6 +1858,10 @@ impl Drain for LevelFilter { fn is_enabled(&self, level: Level) -> bool { level.is_at_least(self.1) && self.0.is_enabled(level) } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + self.0.flush() + } } /// `Drain` mapping error returned by another `Drain` @@ -1711,6 +1903,10 @@ impl Drain for MapError { fn is_enabled(&self, level: Level) -> bool { self.drain.is_enabled(level) } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + self.drain.flush() + } } /// `Drain` duplicating records into two other `Drain`s @@ -1750,6 +1946,26 @@ impl Drain for Duplicate { fn is_enabled(&self, level: Level) -> bool { self.0.is_enabled(level) || self.1.is_enabled(level) } + /// Flush both drains. + /// + /// If one or both of the drains fails, this will return a [`DuplicateDrainFlushError`]. + /// Even if the first drain fails with an error, the second one will still be flushed. + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + let first_res = self.0.flush(); + let second_res = self.1.flush(); + let err = match (first_res, second_res) { + (Ok(()), Ok(())) => { + return Ok(()); // short-circuit success + } + (Err(left), Ok(())) => DuplicateDrainFlushError::Left(left), + (Ok(()), Err(right)) => DuplicateDrainFlushError::Right(right), + (Err(left), Err(right)) => { + DuplicateDrainFlushError::Both(left, right) + } + }; + Err(FlushError::Duplicate(Box::new(err))) + } } /// `Drain` panicking on error @@ -1796,6 +2012,10 @@ where fn is_enabled(&self, level: Level) -> bool { self.0.is_enabled(level) } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + self.0.flush() + } } /// `Drain` ignoring result @@ -1832,6 +2052,11 @@ impl Drain for IgnoreResult { fn is_enabled(&self, level: Level) -> bool { self.drain.is_enabled(level) } + + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + self.drain.flush() + } } /// Error returned by `Mutex` @@ -1927,6 +2152,13 @@ impl Drain for std::sync::Mutex { fn is_enabled(&self, level: Level) -> bool { self.lock().ok().map_or(true, |lock| lock.is_enabled(level)) } + #[inline] + fn flush(&self) -> result::Result<(), FlushError> { + let guard = self.lock().map_err(|_poison| { + std::io::Error::new(std::io::ErrorKind::Other, "Mutex is poisoned") + })?; + guard.flush() + } } #[cfg(feature = "parking_lot_0_12")]