Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 232 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,17 @@ where
let _ = self.drain.log(record, &self.list);
}

/// Flush all pending log records, blocking until completion.
///
/// In many cases this is equivalent to calling [`std::io::Write::flush`] on the underlying stream.
/// See docs on [`Drain::flush`] for more details.
///
/// Returns [`FlushError::NotSupported`] if the underlying drain does not support [`Drain::flush`].
#[inline]
pub fn flush(&self) -> result::Result<(), FlushError> {
self.drain.flush()
}

/// Get list of key-value pairs assigned to this `Logger`
pub fn list(&self) -> &OwnedKVList {
&self.list
Expand Down Expand Up @@ -1159,6 +1170,122 @@ where
fn is_enabled(&self, level: Level) -> bool {
self.drain.is_enabled(level)
}

#[inline]
fn flush(&self) -> result::Result<(), FlushError> {
self.drain.flush()
}
}

/// An error that occurs when calling [`Drain::flush`].
#[non_exhaustive]
#[derive(Debug)]
pub enum FlushError {
/// An error that occurs doing IO.
///
/// Often triggered by [`std::io::Write::flush`]
#[cfg(feature = "std")]
Io(std::io::Error),
/// Indicates this drain does not support flushing.
NotSupported,
/// A custom error, which is not directly caused by IO or [`FlushError::NotSupported`].
#[cfg(has_std_error)]
Custom(Box<dyn StdError + Send + Sync + 'static>),
/// An error caused by calling [`slog::Duplicate::flush`].
Duplicate(Box<DuplicateDrainFlushError>),
}
#[cfg(feature = "std")]
impl From<std::io::Error> for FlushError {
fn from(value: std::io::Error) -> Self {
FlushError::Io(value)
}
}
#[cfg(has_std_error)]
impl StdError for FlushError {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match self {
#[cfg(feature = "std")]
FlushError::Io(cause) => Some(cause),
FlushError::NotSupported => None,
#[cfg(has_std_error)]
FlushError::Custom(cause) => Some(&**cause),
FlushError::Duplicate(cause) => Some(cause),
}
}
}
impl fmt::Display for FlushError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
#[cfg(feature = "std")]
FlushError::Io(_) => {
f.write_str("Encountered IO error during flushing")
}
FlushError::NotSupported => {
f.write_str("Drain does not support flushing")
}
#[cfg(has_std_error)]
FlushError::Custom(cause) => {
write!(f, "Encountered error during flushing: {cause}")
}
FlushError::Duplicate(cause) => write!(f, "{cause}"),
}
}
}

/// An error from calling [`Duplicate::flush`].
///
/// This means that at least one of the drains has failed to flush.
#[derive(Debug)]
pub enum DuplicateDrainFlushError {
/// Occurs when calling [`Drain::flush`] the left (first) drain of [`Duplicate`] fails,
/// but flushing the right drain succeeds.
Left(FlushError),
/// Occurs when calling [`Drain::flush`] the right (second) drain of [`Duplicate`] fails,
/// but flushing the left drain succeeds.
Right(FlushError),
/// Occurs when calling [`Drain::flush`] fails for both the left and right
/// (first and second) drains of [`Duplicate`].
Both(FlushError, FlushError),
}
impl DuplicateDrainFlushError {
/// If flushing the left drain triggered an error, then return it.
///
/// Returns `None` if flushing the left drain did not cause an error.
pub fn left(&self) -> Option<&'_ FlushError> {
match self {
DuplicateDrainFlushError::Left(left)
| DuplicateDrainFlushError::Both(left, _) => Some(left),
DuplicateDrainFlushError::Right(_) => None,
}
}

/// If flushing the right drain triggered an error, then return it.
///
/// Returns `None` if flushing the right drain did not cause an error.
pub fn right(&self) -> Option<&'_ FlushError> {
match self {
DuplicateDrainFlushError::Right(_) => None,
DuplicateDrainFlushError::Both(left, _) => Some(left),
DuplicateDrainFlushError::Left(_) => None,
}
}
}
#[cfg(has_std_error)]
impl StdError for DuplicateDrainFlushError {}
impl fmt::Display for DuplicateDrainFlushError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let (single, name) = match self {
DuplicateDrainFlushError::Left(single) => (single, "left"),
DuplicateDrainFlushError::Right(single) => (single, "right"),
DuplicateDrainFlushError::Both(left, right) => {
return write!(
f,
"Failed to flush both drains: ({left}) and ({right})"
);
}
};
write!(f, "Failed to flush {name} drain: ({single})")
}
}

// {{{ Drain
Expand Down Expand Up @@ -1203,6 +1330,43 @@ pub trait Drain {
values: &OwnedKVList,
) -> result::Result<Self::Ok, Self::Err>;

/// Flush all pending log records, blocking until completion.
///
/// This method is logically idempotent.
/// In theory, two successive flush calls are the same as one:
/// They will both flush all message up to the point of the final.
/// In practice, this is not actually the case as IO is complicated and
/// flush calls can return errors.
///
/// Should call [`std::io::Write::flush`] if applicable.
/// If this drain wraps another drain,
/// it should delegate to the flush call of the wrapped drain.
///
/// An implementation of flush should try to avoid blocking log operations while the flush is in progress.
/// Unfortunately, there are some cases like `impl Drain for Mutex` where this is not possible.
///
/// A flush call is only required to flush records that are queued before the start of the call.
/// In particular, consider the following interleaving of events
/// ```text
/// thread1 drain.log(record1)
/// thread1: drain.flush() begin
/// thread2: drain.log(record2)
/// thread2: drain.flush() finish
/// ```
/// In this case, the drain is only required to flush `record1`.
/// It may or may not flush `record2`.
/// This is mainly relevant for the implementation of [`slog_async::Async`],
/// as we have no control over the implementation of [`std::io::Write::flush`].
/// This behavior is chosen to prevent a flush call from blocking indefinitely
/// in the case of concurrent logging by other threads.
///
/// Returns [`FlushError::NotSupported`] if the drain has not implemented this method.
///
/// [`slog_async::Async`]: https://docs.rs/slog-async/latest/slog_async/struct.Async.html
fn flush(&self) -> result::Result<(), FlushError> {
Err(FlushError::NotSupported)
}

/// **Avoid**: Check if messages at the specified log level are **maybe**
/// enabled for this logger.
///
Expand Down Expand Up @@ -1365,6 +1529,10 @@ impl<'a, D: Drain + 'a> Drain for &'a D {
fn is_enabled(&self, level: Level) -> bool {
(**self).is_enabled(level)
}
#[inline]
fn flush(&self) -> result::Result<(), FlushError> {
(**self).flush()
}
}

impl<'a, D: Drain + 'a> Drain for &'a mut D {
Expand All @@ -1382,6 +1550,10 @@ impl<'a, D: Drain + 'a> Drain for &'a mut D {
fn is_enabled(&self, level: Level) -> bool {
(**self).is_enabled(level)
}
#[inline]
fn flush(&self) -> result::Result<(), FlushError> {
(**self).flush()
}
}

/// Internal utility module used to "maybe" bound traits
Expand Down Expand Up @@ -1543,6 +1715,10 @@ impl<D: Drain + ?Sized> Drain for Box<D> {
fn is_enabled(&self, level: Level) -> bool {
(**self).is_enabled(level)
}
#[inline]
fn flush(&self) -> result::Result<(), FlushError> {
(**self).flush()
}
}

impl<D: Drain + ?Sized> Drain for Arc<D> {
Expand All @@ -1559,6 +1735,10 @@ impl<D: Drain + ?Sized> Drain for Arc<D> {
fn is_enabled(&self, level: Level) -> bool {
(**self).is_enabled(level)
}
#[inline]
fn flush(&self) -> result::Result<(), FlushError> {
(**self).flush()
}
}

/// `Drain` discarding everything
Expand All @@ -1582,6 +1762,10 @@ impl Drain for Discard {
fn is_enabled(&self, _level: Level) -> bool {
false
}
#[inline]
fn flush(&self) -> result::Result<(), FlushError> {
Ok(())
}
}

/// `Drain` filtering records
Expand Down Expand Up @@ -1630,6 +1814,10 @@ where
*/
self.0.is_enabled(level)
}
#[inline]
fn flush(&self) -> result::Result<(), FlushError> {
self.0.flush()
}
}

/// `Drain` filtering records by `Record` logging level
Expand Down Expand Up @@ -1670,6 +1858,10 @@ impl<D: Drain> Drain for LevelFilter<D> {
fn is_enabled(&self, level: Level) -> bool {
level.is_at_least(self.1) && self.0.is_enabled(level)
}
#[inline]
fn flush(&self) -> result::Result<(), FlushError> {
self.0.flush()
}
}

/// `Drain` mapping error returned by another `Drain`
Expand Down Expand Up @@ -1711,6 +1903,10 @@ impl<D: Drain, E> Drain for MapError<D, E> {
fn is_enabled(&self, level: Level) -> bool {
self.drain.is_enabled(level)
}
#[inline]
fn flush(&self) -> result::Result<(), FlushError> {
self.drain.flush()
}
}

/// `Drain` duplicating records into two other `Drain`s
Expand Down Expand Up @@ -1750,6 +1946,26 @@ impl<D1: Drain, D2: Drain> Drain for Duplicate<D1, D2> {
fn is_enabled(&self, level: Level) -> bool {
self.0.is_enabled(level) || self.1.is_enabled(level)
}
/// Flush both drains.
///
/// If one or both of the drains fails, this will return a [`DuplicateDrainFlushError`].
/// Even if the first drain fails with an error, the second one will still be flushed.
#[inline]
fn flush(&self) -> result::Result<(), FlushError> {
let first_res = self.0.flush();
let second_res = self.1.flush();
let err = match (first_res, second_res) {
(Ok(()), Ok(())) => {
return Ok(()); // short-circuit success
}
(Err(left), Ok(())) => DuplicateDrainFlushError::Left(left),
(Ok(()), Err(right)) => DuplicateDrainFlushError::Right(right),
(Err(left), Err(right)) => {
DuplicateDrainFlushError::Both(left, right)
}
};
Err(FlushError::Duplicate(Box::new(err)))
}
}

/// `Drain` panicking on error
Expand Down Expand Up @@ -1796,6 +2012,10 @@ where
fn is_enabled(&self, level: Level) -> bool {
self.0.is_enabled(level)
}
#[inline]
fn flush(&self) -> result::Result<(), FlushError> {
self.0.flush()
}
}

/// `Drain` ignoring result
Expand Down Expand Up @@ -1832,6 +2052,11 @@ impl<D: Drain> Drain for IgnoreResult<D> {
fn is_enabled(&self, level: Level) -> bool {
self.drain.is_enabled(level)
}

#[inline]
fn flush(&self) -> result::Result<(), FlushError> {
self.drain.flush()
}
}

/// Error returned by `Mutex<D : Drain>`
Expand Down Expand Up @@ -1927,6 +2152,13 @@ impl<D: Drain> Drain for std::sync::Mutex<D> {
fn is_enabled(&self, level: Level) -> bool {
self.lock().ok().map_or(true, |lock| lock.is_enabled(level))
}
#[inline]
fn flush(&self) -> result::Result<(), FlushError> {
let guard = self.lock().map_err(|_poison| {
std::io::Error::new(std::io::ErrorKind::Other, "Mutex is poisoned")
})?;
guard.flush()
}
}

#[cfg(feature = "parking_lot_0_12")]
Expand Down