From e64b9bc91503dc9abb43fc0b54fcb99d44d4556d Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Fri, 23 May 2025 14:59:47 -0400 Subject: [PATCH] fix(datadog_agent source): Silence the "Source send cancelled." error --- ...datadog-agent-source-sender-warning.fix.md | 5 ++ src/source_sender/mod.rs | 56 ++++++++++++++----- src/sources/datadog_agent/mod.rs | 7 ++- 3 files changed, 54 insertions(+), 14 deletions(-) create mode 100644 changelog.d/fix-datadog-agent-source-sender-warning.fix.md diff --git a/changelog.d/fix-datadog-agent-source-sender-warning.fix.md b/changelog.d/fix-datadog-agent-source-sender-warning.fix.md new file mode 100644 index 0000000000000..3cb3c6f6f88b2 --- /dev/null +++ b/changelog.d/fix-datadog-agent-source-sender-warning.fix.md @@ -0,0 +1,5 @@ +Eliminated the "Source send cancelled." error and corresponding metric for the +`datadog_agent` source, as Datadog Agent will always resend events when the +connection is dropped after a timeout. + +authors: bruceg diff --git a/src/source_sender/mod.rs b/src/source_sender/mod.rs index bf0ee0525597f..cd629bdb07bee 100644 --- a/src/source_sender/mod.rs +++ b/src/source_sender/mod.rs @@ -310,6 +310,24 @@ impl SourceSender { .send_batch(events) .await } + + /// Disable counting of unsent events when the send future is dropped. + /// + /// As described in `Output::send_event` below, it's possible that the caller drops this future + /// while it is blocked waiting on sending into its channel. This currently only happens for + /// sources that use the `warp` framework for their HTTP server when it detects that the remote + /// has dropped the connection. When that happens, we use `UnsentEventCount` to correctly emit + /// `ComponentEventsDropped` events. This method disables that behavior, eliminating the error + /// log and metric. + /// + /// The whole unsent event count metric can go away when we drop the use of the `warp` framework + /// for HTTP server sources. + pub fn silence_unsent_events(&mut self) { + self.default_output.disable_count_unsent(); + for output in self.named_outputs.values_mut() { + output.disable_count_unsent(); + } + } } /// UnsentEvents tracks the number of events yet to be sent in the buffer. This is used to @@ -320,35 +338,39 @@ impl SourceSender { /// If its internal count is greater than 0 when dropped, the appropriate [ComponentEventsDropped] /// event is emitted. struct UnsentEventCount { - count: usize, + count: Option, span: Span, } impl UnsentEventCount { - fn new(count: usize) -> Self { + fn new(count: usize, enabled: bool) -> Self { Self { - count, + count: enabled.then_some(count), span: Span::current(), } } fn decr(&mut self, count: usize) { - self.count = self.count.saturating_sub(count); + if let Some(ref mut current_count) = self.count { + *current_count = current_count.saturating_sub(count); + } } fn discard(&mut self) { - self.count = 0; + self.count = None; } } impl Drop for UnsentEventCount { fn drop(&mut self) { - if self.count > 0 { - let _enter = self.span.enter(); - emit!(ComponentEventsDropped:: { - count: self.count, - reason: "Source send cancelled." - }); + if let Some(count) = self.count { + if count > 0 { + let _enter = self.span.enter(); + emit!(ComponentEventsDropped:: { + count, + reason: "Source send cancelled." + }); + } } } } @@ -363,6 +385,8 @@ struct Output { /// The OutputId related to this source sender. This is set as the `upstream_id` in /// `EventMetadata` for all event sent through here. output_id: Arc, + /// Whether to count unsent events when dropped + count_unsent: bool, } impl fmt::Debug for Output { @@ -370,6 +394,7 @@ impl fmt::Debug for Output { fmt.debug_struct("Output") .field("sender", &self.sender) .field("output_id", &self.output_id) + .field("count_unsent", &self.count_unsent) // `metrics::Histogram` is missing `impl Debug` .finish() } @@ -393,11 +418,16 @@ impl Output { )))), log_definition, output_id: Arc::new(output_id), + count_unsent: true, }, rx, ) } + fn disable_count_unsent(&mut self) { + self.count_unsent = false; + } + async fn send( &mut self, mut events: EventArray, @@ -438,7 +468,7 @@ impl Output { // It's possible that the caller stops polling this future while it is blocked waiting // on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit // `ComponentEventsDropped` events. - let mut unsent_event_count = UnsentEventCount::new(event.len()); + let mut unsent_event_count = UnsentEventCount::new(event.len(), self.count_unsent); self.send(event, &mut unsent_event_count).await } @@ -464,7 +494,7 @@ impl Output { // on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit // `ComponentEventsDropped` events. let events = events.into_iter().map(Into::into); - let mut unsent_event_count = UnsentEventCount::new(events.len()); + let mut unsent_event_count = UnsentEventCount::new(events.len(), self.count_unsent); for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) { self.send(events, &mut unsent_event_count) .await diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index abb5463608641..a60cc4748ff4e 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -408,10 +408,15 @@ impl DatadogAgentSource { fn build_warp_filters( &self, - out: SourceSender, + mut out: SourceSender, acknowledgements: bool, config: &DatadogAgentConfig, ) -> crate::Result> { + // Silence the "Source send cancelled." error log and error metric because we know that the + // sending Datadog Agent source will always resend after it drops the connection following a + // send timeout. + out.silence_unsent_events(); + let mut filters = (!config.disable_logs).then(|| { logs::build_warp_filter( acknowledgements,