diff --git a/changelog.d/23123_dnstap_tcp_errors.fix.md b/changelog.d/23123_dnstap_tcp_errors.fix.md new file mode 100644 index 0000000000000..c8686aa3e1f20 --- /dev/null +++ b/changelog.d/23123_dnstap_tcp_errors.fix.md @@ -0,0 +1,3 @@ +Improved backpressure and load handling of dnstap TCP source. + +authors: esensar Quad9DNS diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index 404e56c306306..3b770da195965 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -11,14 +11,13 @@ use std::{ atomic::{AtomicU32, Ordering}, Arc, Mutex, }, - thread, time::Duration, }; use bytes::{Buf, Bytes, BytesMut}; use futures::{ executor::block_on, - future, + future::{self, OptionFuture}, sink::{Sink, SinkExt}, stream::{self, StreamExt, TryStreamExt}, }; @@ -63,6 +62,12 @@ use super::net::{RequestLimiter, SocketListenAddr}; const FSTRM_CONTROL_FRAME_LENGTH_MAX: usize = 512; const FSTRM_CONTROL_FIELD_CONTENT_TYPE_LENGTH_MAX: usize = 256; +/// If a connection does not receive any data during this short timeout, +/// it should release its permit (and try to obtain a new one) allowing other connections to read. +/// It is very short because any incoming data will avoid this timeout, +/// so it mainly prevents holding permits without consuming any data +const PERMIT_HOLD_TIMEOUT_MS: u64 = 10; + pub type FrameStreamSink = Box + Send + Unpin>; pub struct FrameStreamReader { @@ -520,10 +525,10 @@ async fn handle_stream( mut frame_handler: impl TcpFrameHandler + Send + Sync + Clone + 'static, mut shutdown_signal: ShutdownSignal, mut socket: MaybeTlsIncomingStream, - _tripwire: BoxFuture<'static, ()>, + mut tripwire: BoxFuture<'static, ()>, peer_addr: SocketAddr, out: SourceSender, - _request_limiter: RequestLimiter, + request_limiter: RequestLimiter, ) { tokio::select! { result = socket.handshake() => { @@ -567,23 +572,113 @@ async fn handle_stream( let span = info_span!("connection"); span.record("peer_addr", field::debug(&peer_addr)); let received_from: Option = Some(peer_addr.to_string().into()); - let active_parsing_task_nums = Arc::new(AtomicU32::new(0)); - build_framestream_source( - frame_handler, + let connection_close_timeout = OptionFuture::from( + frame_handler + .max_connection_duration_secs() + .map(|timeout_secs| tokio::time::sleep(Duration::from_secs(timeout_secs))), + ); + tokio::pin!(connection_close_timeout); + + let content_type = frame_handler.content_type(); + let mut event_sink = out.clone(); + let (sock_sink, sock_stream) = Framed::new( socket, - received_from, - out, - shutdown_signal, - span, - active_parsing_task_nums, - move |error| { + length_delimited::Builder::new() + .max_frame_length(frame_handler.max_frame_length()) + .new_codec(), + ) + .split(); + let mut reader = FrameStreamReader::new(Box::new(sock_sink), content_type); + let mut frames = sock_stream + .map_err(move |error| { emit!(TcpSocketError { error: &error, peer_addr, }); - }, - ); + }) + .filter_map(move |frame| { + future::ready(match frame { + Ok(f) => reader.handle_frame(Bytes::from(f)), + Err(_) => None, + }) + }); + + let active_parsing_task_nums = Arc::new(AtomicU32::new(0)); + loop { + let mut permit = tokio::select! { + _ = &mut tripwire => break, + Some(_) = &mut connection_close_timeout => { + break; + }, + _ = &mut shutdown_signal => { + break; + }, + permit = request_limiter.acquire() => { + Some(permit) + } + else => break, + }; + + let timeout = tokio::time::sleep(Duration::from_millis(PERMIT_HOLD_TIMEOUT_MS)); + tokio::pin!(timeout); + + tokio::select! { + _ = &mut tripwire => break, + _ = &mut shutdown_signal => break, + _ = &mut timeout => { + // This connection is currently holding a permit, but has not received data for some time. Release + // the permit to let another connection try + continue; + } + res = frames.next() => { + match res { + Some(frame) => { + if let Some(permit) = &mut permit { + // Note that this is intentionally not the "number of events in a single request", but rather + // the "number of events currently available". This may contain events from multiple events, + // but it should always contain all events from each request. + permit.decoding_finished(1); + }; + handle_tcp_frame(&mut frame_handler, frame, &mut event_sink, received_from.clone(), Arc::clone(&active_parsing_task_nums)).await; + } + None => { + debug!("Connection closed."); + break + }, + } + } + else => break, + } + + drop(permit); + } +} + +async fn handle_tcp_frame( + frame_handler: &mut T, + frame: Bytes, + event_sink: &mut SourceSender, + received_from: Option, + active_parsing_task_nums: Arc, +) where + T: TcpFrameHandler + Send + Sync + Clone + 'static, +{ + if frame_handler.multithreaded() { + spawn_event_handling_tasks( + frame, + frame_handler.clone(), + event_sink.clone(), + received_from, + active_parsing_task_nums, + frame_handler.max_frame_handling_tasks(), + ) + .await; + } else if let Some(event) = frame_handler.handle_event(received_from, frame) { + if let Err(e) = event_sink.send_event(event).await { + error!("Error sending event: {e:?}."); + } + } } /** @@ -778,14 +873,13 @@ fn build_framestream_source( let handler = async move { frames .for_each(move |f| { - future::ready({ - let max_frame_handling_tasks = - frame_handler_copy.max_frame_handling_tasks(); - let f_handler = frame_handler_copy.clone(); - let received_from_copy = received_from.clone(); - let event_sink_copy = event_sink.clone(); - let active_task_nums_copy = Arc::clone(&active_task_nums); + let max_frame_handling_tasks = frame_handler_copy.max_frame_handling_tasks(); + let f_handler = frame_handler_copy.clone(); + let received_from_copy = received_from.clone(); + let event_sink_copy = event_sink.clone(); + let active_task_nums_copy = Arc::clone(&active_task_nums); + async move { spawn_event_handling_tasks( f, f_handler, @@ -793,8 +887,9 @@ fn build_framestream_source( received_from_copy, active_task_nums_copy, max_frame_handling_tasks, - ); - }) + ) + .await; + } }) .await; info!("Finished sending."); @@ -803,7 +898,7 @@ fn build_framestream_source( } } -fn spawn_event_handling_tasks( +async fn spawn_event_handling_tasks( event_data: Bytes, event_handler: impl FrameHandler + Send + Sync + 'static, mut event_sink: SourceSender, @@ -811,7 +906,7 @@ fn spawn_event_handling_tasks( active_task_nums: Arc, max_frame_handling_tasks: u32, ) -> JoinHandle<()> { - wait_for_task_quota(&active_task_nums, max_frame_handling_tasks); + wait_for_task_quota(&active_task_nums, max_frame_handling_tasks).await; tokio::spawn(async move { future::ready({ @@ -826,9 +921,9 @@ fn spawn_event_handling_tasks( }) } -fn wait_for_task_quota(active_task_nums: &Arc, max_tasks: u32) { +async fn wait_for_task_quota(active_task_nums: &Arc, max_tasks: u32) { while max_tasks > 0 && max_tasks < active_task_nums.load(Ordering::Acquire) { - thread::sleep(Duration::from_millis(3)); + tokio::time::sleep(Duration::from_millis(3)).await; } active_task_nums.fetch_add(1, Ordering::AcqRel); } @@ -1677,14 +1772,17 @@ mod test { })); for i in 0..total_events { - join_handles.push(spawn_event_handling_tasks( - Bytes::from(format!("event_{}", i)), - MockFrameHandler::new("test_content".to_string(), true, extra_routine.clone()), - out.clone(), - None, - Arc::clone(&active_task_nums_copy), - max_frame_handling_tasks, - )); + join_handles.push( + spawn_event_handling_tasks( + Bytes::from(format!("event_{i}")), + MockFrameHandler::new("test_content".to_string(), true, extra_routine.clone()), + out.clone(), + None, + Arc::clone(&active_task_nums_copy), + max_frame_handling_tasks, + ) + .await, + ); } future::join_all(join_handles).await;