From efb35c28e5214093df09cdf2a774c8a2062c041a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 27 May 2025 13:08:19 +0200 Subject: [PATCH 1/7] fix(dnstap source): implement all TCP dnstap options to reduce error The dnstap TCP source was initially built based on the TCP socket source, but not all of the options were correctly implemented, which left the request limiter unused. This also changes the multithreaded approach to do async-aware waits, because previous implementation used thread sleep in async context. Related: #20744 --- src/sources/util/framestream.rs | 152 ++++++++++++++++++++++++-------- 1 file changed, 116 insertions(+), 36 deletions(-) diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index 404e56c306306..8d776b19798e2 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -11,14 +11,13 @@ use std::{ atomic::{AtomicU32, Ordering}, Arc, Mutex, }, - thread, time::Duration, }; use bytes::{Buf, Bytes, BytesMut}; use futures::{ executor::block_on, - future, + future::{self, OptionFuture}, sink::{Sink, SinkExt}, stream::{self, StreamExt, TryStreamExt}, }; @@ -520,10 +519,10 @@ async fn handle_stream( mut frame_handler: impl TcpFrameHandler + Send + Sync + Clone + 'static, mut shutdown_signal: ShutdownSignal, mut socket: MaybeTlsIncomingStream, - _tripwire: BoxFuture<'static, ()>, + mut tripwire: BoxFuture<'static, ()>, peer_addr: SocketAddr, out: SourceSender, - _request_limiter: RequestLimiter, + request_limiter: RequestLimiter, ) { tokio::select! { result = socket.handshake() => { @@ -567,23 +566,101 @@ async fn handle_stream( let span = info_span!("connection"); span.record("peer_addr", field::debug(&peer_addr)); let received_from: Option = Some(peer_addr.to_string().into()); - let active_parsing_task_nums = Arc::new(AtomicU32::new(0)); - build_framestream_source( - frame_handler, + let connection_close_timeout = OptionFuture::from( + frame_handler + .max_connection_duration_secs() + .map(|timeout_secs| tokio::time::sleep(Duration::from_secs(timeout_secs))), + ); + tokio::pin!(connection_close_timeout); + + let content_type = frame_handler.content_type(); + let mut event_sink = out.clone(); + let (sock_sink, sock_stream) = Framed::new( socket, - received_from, - out, - shutdown_signal, - span, - active_parsing_task_nums, - move |error| { + length_delimited::Builder::new() + .max_frame_length(frame_handler.max_frame_length()) + .new_codec(), + ) + .split(); + let mut reader = FrameStreamReader::new(Box::new(sock_sink), content_type); + let frame_handler_copy = frame_handler.clone(); + let mut frames = sock_stream + .map_err(move |error| { emit!(TcpSocketError { error: &error, peer_addr, }); - }, - ); + }) + .filter_map(move |frame| { + future::ready(match frame { + Ok(f) => reader.handle_frame(Bytes::from(f)), + Err(_) => None, + }) + }); + + let active_parsing_task_nums = Arc::new(AtomicU32::new(0)); + loop { + let mut permit = tokio::select! { + _ = &mut tripwire => break, + Some(_) = &mut connection_close_timeout => { + break; + }, + _ = &mut shutdown_signal => { + break; + }, + permit = request_limiter.acquire() => { + Some(permit) + } + else => break, + }; + + let timeout = tokio::time::sleep(Duration::from_millis(10)); + tokio::pin!(timeout); + + tokio::select! { + _ = &mut tripwire => break, + _ = &mut shutdown_signal => break, + _ = &mut timeout => { + // This connection is currently holding a permit, but has not received data for some time. Release + // the permit to let another connection try + continue; + } + res = frames.next() => { + match res { + Some(frame) => { + if let Some(permit) = &mut permit { + // Note that this is intentionally not the "number of events in a single request", but rather + // the "number of events currently available". This may contain events from multiple events, + // but it should always contain all events from each request. + permit.decoding_finished(1); + }; + if frame_handler.multithreaded() { + spawn_event_handling_tasks( + frame, + frame_handler_copy.clone(), + event_sink.clone(), + received_from.clone(), + Arc::clone(&active_parsing_task_nums), + frame_handler_copy.max_frame_handling_tasks(), + ).await; + } else if let Some(event) = frame_handler_copy.handle_event(received_from.clone(), frame) { + if let Err(e) = event_sink.send_event(event).await { + error!("Error sending event: {:?}.", e); + } + } + } + None => { + debug!("Connection closed."); + break + }, + } + } + else => break, + } + + drop(permit); + } } /** @@ -778,14 +855,13 @@ fn build_framestream_source( let handler = async move { frames .for_each(move |f| { - future::ready({ - let max_frame_handling_tasks = - frame_handler_copy.max_frame_handling_tasks(); - let f_handler = frame_handler_copy.clone(); - let received_from_copy = received_from.clone(); - let event_sink_copy = event_sink.clone(); - let active_task_nums_copy = Arc::clone(&active_task_nums); + let max_frame_handling_tasks = frame_handler_copy.max_frame_handling_tasks(); + let f_handler = frame_handler_copy.clone(); + let received_from_copy = received_from.clone(); + let event_sink_copy = event_sink.clone(); + let active_task_nums_copy = Arc::clone(&active_task_nums); + async move { spawn_event_handling_tasks( f, f_handler, @@ -793,8 +869,9 @@ fn build_framestream_source( received_from_copy, active_task_nums_copy, max_frame_handling_tasks, - ); - }) + ) + .await; + } }) .await; info!("Finished sending."); @@ -803,7 +880,7 @@ fn build_framestream_source( } } -fn spawn_event_handling_tasks( +async fn spawn_event_handling_tasks( event_data: Bytes, event_handler: impl FrameHandler + Send + Sync + 'static, mut event_sink: SourceSender, @@ -811,7 +888,7 @@ fn spawn_event_handling_tasks( active_task_nums: Arc, max_frame_handling_tasks: u32, ) -> JoinHandle<()> { - wait_for_task_quota(&active_task_nums, max_frame_handling_tasks); + wait_for_task_quota(&active_task_nums, max_frame_handling_tasks).await; tokio::spawn(async move { future::ready({ @@ -826,9 +903,9 @@ fn spawn_event_handling_tasks( }) } -fn wait_for_task_quota(active_task_nums: &Arc, max_tasks: u32) { +async fn wait_for_task_quota(active_task_nums: &Arc, max_tasks: u32) { while max_tasks > 0 && max_tasks < active_task_nums.load(Ordering::Acquire) { - thread::sleep(Duration::from_millis(3)); + tokio::time::sleep(Duration::from_millis(3)).await; } active_task_nums.fetch_add(1, Ordering::AcqRel); } @@ -1677,14 +1754,17 @@ mod test { })); for i in 0..total_events { - join_handles.push(spawn_event_handling_tasks( - Bytes::from(format!("event_{}", i)), - MockFrameHandler::new("test_content".to_string(), true, extra_routine.clone()), - out.clone(), - None, - Arc::clone(&active_task_nums_copy), - max_frame_handling_tasks, - )); + join_handles.push( + spawn_event_handling_tasks( + Bytes::from(format!("event_{}", i)), + MockFrameHandler::new("test_content".to_string(), true, extra_routine.clone()), + out.clone(), + None, + Arc::clone(&active_task_nums_copy), + max_frame_handling_tasks, + ) + .await, + ); } future::join_all(join_handles).await; From 95c5f0dba59580396eae32bba746e55fc38b829e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 30 May 2025 09:50:21 +0200 Subject: [PATCH 2/7] Add changelog entry --- changelog.d/23123_dnstap_tcp_errors.fix.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/23123_dnstap_tcp_errors.fix.md diff --git a/changelog.d/23123_dnstap_tcp_errors.fix.md b/changelog.d/23123_dnstap_tcp_errors.fix.md new file mode 100644 index 0000000000000..c8686aa3e1f20 --- /dev/null +++ b/changelog.d/23123_dnstap_tcp_errors.fix.md @@ -0,0 +1,3 @@ +Improved backpressure and load handling of dnstap TCP source. + +authors: esensar Quad9DNS From 5b92d84633111445906497c2edf1b4a247edd011 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 11 Jun 2025 10:18:09 +0200 Subject: [PATCH 3/7] Update src/sources/util/framestream.rs Co-authored-by: Pavlos Rontidis --- src/sources/util/framestream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index 8d776b19798e2..cc1f0ea96c1ce 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -1756,7 +1756,7 @@ mod test { for i in 0..total_events { join_handles.push( spawn_event_handling_tasks( - Bytes::from(format!("event_{}", i)), + Bytes::from(format!("event_{i}")), MockFrameHandler::new("test_content".to_string(), true, extra_routine.clone()), out.clone(), None, From 78cce2284dd473ce3d85de878ee01fceb039ade2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 11 Jun 2025 10:18:20 +0200 Subject: [PATCH 4/7] Update src/sources/util/framestream.rs Co-authored-by: Pavlos Rontidis --- src/sources/util/framestream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index cc1f0ea96c1ce..7de92de9feb93 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -646,7 +646,7 @@ async fn handle_stream( ).await; } else if let Some(event) = frame_handler_copy.handle_event(received_from.clone(), frame) { if let Err(e) = event_sink.send_event(event).await { - error!("Error sending event: {:?}.", e); + error!("Error sending event: {e:?}."); } } } From 862513f677ca5ec0098654e73a022948b9930777 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 11 Jun 2025 11:04:12 +0200 Subject: [PATCH 5/7] Move out tcp frame handle into a function --- src/sources/util/framestream.rs | 42 +++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index 7de92de9feb93..22c21301321b9 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -584,7 +584,6 @@ async fn handle_stream( ) .split(); let mut reader = FrameStreamReader::new(Box::new(sock_sink), content_type); - let frame_handler_copy = frame_handler.clone(); let mut frames = sock_stream .map_err(move |error| { emit!(TcpSocketError { @@ -635,20 +634,7 @@ async fn handle_stream( // but it should always contain all events from each request. permit.decoding_finished(1); }; - if frame_handler.multithreaded() { - spawn_event_handling_tasks( - frame, - frame_handler_copy.clone(), - event_sink.clone(), - received_from.clone(), - Arc::clone(&active_parsing_task_nums), - frame_handler_copy.max_frame_handling_tasks(), - ).await; - } else if let Some(event) = frame_handler_copy.handle_event(received_from.clone(), frame) { - if let Err(e) = event_sink.send_event(event).await { - error!("Error sending event: {e:?}."); - } - } + handle_tcp_frame(&mut frame_handler, frame, &mut event_sink, received_from.clone(), Arc::clone(&active_parsing_task_nums)).await; } None => { debug!("Connection closed."); @@ -663,6 +649,32 @@ async fn handle_stream( } } +async fn handle_tcp_frame( + frame_handler: &mut T, + frame: Bytes, + event_sink: &mut SourceSender, + received_from: Option, + active_parsing_task_nums: Arc, +) where + T: TcpFrameHandler + Send + Sync + Clone + 'static, +{ + if frame_handler.multithreaded() { + spawn_event_handling_tasks( + frame, + frame_handler.clone(), + event_sink.clone(), + received_from, + active_parsing_task_nums, + frame_handler.max_frame_handling_tasks(), + ) + .await; + } else if let Some(event) = frame_handler.handle_event(received_from, frame) { + if let Err(e) = event_sink.send_event(event).await { + error!("Error sending event: {e:?}."); + } + } +} + /** * Based off of the build_unix_source function. * Functions similarly, but uses the FrameStreamReader to deal with From 22c8c3ce4c1b68d6b556c208530a08fc77679593 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 11 Jun 2025 15:07:39 +0200 Subject: [PATCH 6/7] Move 10 ms timeout to a constant --- src/sources/util/framestream.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index 22c21301321b9..56ab85c765da6 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -62,6 +62,12 @@ use super::net::{RequestLimiter, SocketListenAddr}; const FSTRM_CONTROL_FRAME_LENGTH_MAX: usize = 512; const FSTRM_CONTROL_FIELD_CONTENT_TYPE_LENGTH_MAX: usize = 256; +/// If a connection does not receive any data during this short timeout, +/// it should release its permit (and try to obtain a new one) allowing other connections to read. +/// It is very short because any incoming data will avoid this timeout, +/// so it mainly prevents holding permits without consuming any data +const PERMIT_HOLD_TIMEOUT_MS: usize = 10; + pub type FrameStreamSink = Box + Send + Unpin>; pub struct FrameStreamReader { @@ -614,7 +620,7 @@ async fn handle_stream( else => break, }; - let timeout = tokio::time::sleep(Duration::from_millis(10)); + let timeout = tokio::time::sleep(Duration::from_millis(PERMIT_HOLD_TIMEOUT_MS)); tokio::pin!(timeout); tokio::select! { From f27f24be528e0e93f61f17a85ec47afda71507ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 12 Jun 2025 09:18:55 +0200 Subject: [PATCH 7/7] Fix clippy warnings --- src/sources/util/framestream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index 56ab85c765da6..3b770da195965 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -66,7 +66,7 @@ const FSTRM_CONTROL_FIELD_CONTENT_TYPE_LENGTH_MAX: usize = 256; /// it should release its permit (and try to obtain a new one) allowing other connections to read. /// It is very short because any incoming data will avoid this timeout, /// so it mainly prevents holding permits without consuming any data -const PERMIT_HOLD_TIMEOUT_MS: usize = 10; +const PERMIT_HOLD_TIMEOUT_MS: u64 = 10; pub type FrameStreamSink = Box + Send + Unpin>;