Skip to content

Commit e64b9bc

Browse files
committed
fix(datadog_agent source): Silence the "Source send cancelled." error
1 parent bb6a914 commit e64b9bc

File tree

3 files changed

+54
-14
lines changed

3 files changed

+54
-14
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Eliminated the "Source send cancelled." error and corresponding metric for the
2+
`datadog_agent` source, as Datadog Agent will always resend events when the
3+
connection is dropped after a timeout.
4+
5+
authors: bruceg

src/source_sender/mod.rs

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,24 @@ impl SourceSender {
310310
.send_batch(events)
311311
.await
312312
}
313+
314+
/// Disable counting of unsent events when the send future is dropped.
315+
///
316+
/// As described in `Output::send_event` below, it's possible that the caller drops this future
317+
/// while it is blocked waiting on sending into its channel. This currently only happens for
318+
/// sources that use the `warp` framework for their HTTP server when it detects that the remote
319+
/// has dropped the connection. When that happens, we use `UnsentEventCount` to correctly emit
320+
/// `ComponentEventsDropped` events. This method disables that behavior, eliminating the error
321+
/// log and metric.
322+
///
323+
/// The whole unsent event count metric can go away when we drop the use of the `warp` framework
324+
/// for HTTP server sources.
325+
pub fn silence_unsent_events(&mut self) {
326+
self.default_output.disable_count_unsent();
327+
for output in self.named_outputs.values_mut() {
328+
output.disable_count_unsent();
329+
}
330+
}
313331
}
314332

315333
/// UnsentEvents tracks the number of events yet to be sent in the buffer. This is used to
@@ -320,35 +338,39 @@ impl SourceSender {
320338
/// If its internal count is greater than 0 when dropped, the appropriate [ComponentEventsDropped]
321339
/// event is emitted.
322340
struct UnsentEventCount {
323-
count: usize,
341+
count: Option<usize>,
324342
span: Span,
325343
}
326344

327345
impl UnsentEventCount {
328-
fn new(count: usize) -> Self {
346+
fn new(count: usize, enabled: bool) -> Self {
329347
Self {
330-
count,
348+
count: enabled.then_some(count),
331349
span: Span::current(),
332350
}
333351
}
334352

335353
fn decr(&mut self, count: usize) {
336-
self.count = self.count.saturating_sub(count);
354+
if let Some(ref mut current_count) = self.count {
355+
*current_count = current_count.saturating_sub(count);
356+
}
337357
}
338358

339359
fn discard(&mut self) {
340-
self.count = 0;
360+
self.count = None;
341361
}
342362
}
343363

344364
impl Drop for UnsentEventCount {
345365
fn drop(&mut self) {
346-
if self.count > 0 {
347-
let _enter = self.span.enter();
348-
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
349-
count: self.count,
350-
reason: "Source send cancelled."
351-
});
366+
if let Some(count) = self.count {
367+
if count > 0 {
368+
let _enter = self.span.enter();
369+
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
370+
count,
371+
reason: "Source send cancelled."
372+
});
373+
}
352374
}
353375
}
354376
}
@@ -363,13 +385,16 @@ struct Output {
363385
/// The OutputId related to this source sender. This is set as the `upstream_id` in
364386
/// `EventMetadata` for all event sent through here.
365387
output_id: Arc<OutputId>,
388+
/// Whether to count unsent events when dropped
389+
count_unsent: bool,
366390
}
367391

368392
impl fmt::Debug for Output {
369393
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
370394
fmt.debug_struct("Output")
371395
.field("sender", &self.sender)
372396
.field("output_id", &self.output_id)
397+
.field("count_unsent", &self.count_unsent)
373398
// `metrics::Histogram` is missing `impl Debug`
374399
.finish()
375400
}
@@ -393,11 +418,16 @@ impl Output {
393418
)))),
394419
log_definition,
395420
output_id: Arc::new(output_id),
421+
count_unsent: true,
396422
},
397423
rx,
398424
)
399425
}
400426

427+
fn disable_count_unsent(&mut self) {
428+
self.count_unsent = false;
429+
}
430+
401431
async fn send(
402432
&mut self,
403433
mut events: EventArray,
@@ -438,7 +468,7 @@ impl Output {
438468
// It's possible that the caller stops polling this future while it is blocked waiting
439469
// on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit
440470
// `ComponentEventsDropped` events.
441-
let mut unsent_event_count = UnsentEventCount::new(event.len());
471+
let mut unsent_event_count = UnsentEventCount::new(event.len(), self.count_unsent);
442472
self.send(event, &mut unsent_event_count).await
443473
}
444474

@@ -464,7 +494,7 @@ impl Output {
464494
// on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit
465495
// `ComponentEventsDropped` events.
466496
let events = events.into_iter().map(Into::into);
467-
let mut unsent_event_count = UnsentEventCount::new(events.len());
497+
let mut unsent_event_count = UnsentEventCount::new(events.len(), self.count_unsent);
468498
for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) {
469499
self.send(events, &mut unsent_event_count)
470500
.await

src/sources/datadog_agent/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,10 +408,15 @@ impl DatadogAgentSource {
408408

409409
fn build_warp_filters(
410410
&self,
411-
out: SourceSender,
411+
mut out: SourceSender,
412412
acknowledgements: bool,
413413
config: &DatadogAgentConfig,
414414
) -> crate::Result<BoxedFilter<(Response,)>> {
415+
// Silence the "Source send cancelled." error log and error metric because we know that the
416+
// sending Datadog Agent source will always resend after it drops the connection following a
417+
// send timeout.
418+
out.silence_unsent_events();
419+
415420
let mut filters = (!config.disable_logs).then(|| {
416421
logs::build_warp_filter(
417422
acknowledgements,

0 commit comments

Comments
 (0)