-
Notifications
You must be signed in to change notification settings - Fork 1.8k
fix(dnstap source): implement all TCP dnstap options to reduce error #23123
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 5 commits
efb35c2
95c5f0d
5b92d84
78cce22
862513f
22c8c3c
f27f24b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
Improved backpressure and load handling of dnstap TCP source. | ||
|
||
authors: esensar Quad9DNS |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,14 +11,13 @@ use std::{ | |
atomic::{AtomicU32, Ordering}, | ||
Arc, Mutex, | ||
}, | ||
thread, | ||
time::Duration, | ||
}; | ||
|
||
use bytes::{Buf, Bytes, BytesMut}; | ||
use futures::{ | ||
executor::block_on, | ||
future, | ||
future::{self, OptionFuture}, | ||
sink::{Sink, SinkExt}, | ||
stream::{self, StreamExt, TryStreamExt}, | ||
}; | ||
|
@@ -520,10 +519,10 @@ async fn handle_stream( | |
mut frame_handler: impl TcpFrameHandler + Send + Sync + Clone + 'static, | ||
mut shutdown_signal: ShutdownSignal, | ||
mut socket: MaybeTlsIncomingStream<TcpStream>, | ||
_tripwire: BoxFuture<'static, ()>, | ||
mut tripwire: BoxFuture<'static, ()>, | ||
peer_addr: SocketAddr, | ||
out: SourceSender, | ||
_request_limiter: RequestLimiter, | ||
request_limiter: RequestLimiter, | ||
) { | ||
tokio::select! { | ||
result = socket.handshake() => { | ||
|
@@ -567,23 +566,113 @@ async fn handle_stream( | |
let span = info_span!("connection"); | ||
span.record("peer_addr", field::debug(&peer_addr)); | ||
let received_from: Option<Bytes> = Some(peer_addr.to_string().into()); | ||
let active_parsing_task_nums = Arc::new(AtomicU32::new(0)); | ||
|
||
build_framestream_source( | ||
frame_handler, | ||
let connection_close_timeout = OptionFuture::from( | ||
frame_handler | ||
.max_connection_duration_secs() | ||
.map(|timeout_secs| tokio::time::sleep(Duration::from_secs(timeout_secs))), | ||
); | ||
tokio::pin!(connection_close_timeout); | ||
|
||
let content_type = frame_handler.content_type(); | ||
let mut event_sink = out.clone(); | ||
let (sock_sink, sock_stream) = Framed::new( | ||
socket, | ||
received_from, | ||
out, | ||
shutdown_signal, | ||
span, | ||
active_parsing_task_nums, | ||
move |error| { | ||
length_delimited::Builder::new() | ||
.max_frame_length(frame_handler.max_frame_length()) | ||
.new_codec(), | ||
) | ||
.split(); | ||
let mut reader = FrameStreamReader::new(Box::new(sock_sink), content_type); | ||
let mut frames = sock_stream | ||
.map_err(move |error| { | ||
emit!(TcpSocketError { | ||
error: &error, | ||
peer_addr, | ||
}); | ||
}, | ||
); | ||
}) | ||
.filter_map(move |frame| { | ||
future::ready(match frame { | ||
Ok(f) => reader.handle_frame(Bytes::from(f)), | ||
Err(_) => None, | ||
}) | ||
}); | ||
|
||
let active_parsing_task_nums = Arc::new(AtomicU32::new(0)); | ||
loop { | ||
let mut permit = tokio::select! { | ||
_ = &mut tripwire => break, | ||
Some(_) = &mut connection_close_timeout => { | ||
break; | ||
}, | ||
_ = &mut shutdown_signal => { | ||
break; | ||
}, | ||
permit = request_limiter.acquire() => { | ||
Some(permit) | ||
} | ||
else => break, | ||
}; | ||
pront marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
let timeout = tokio::time::sleep(Duration::from_millis(10)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's extract this into a constant and also document why this value was selected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I copied this over from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, it's an existing code smell. From the PR:
We can use this as a comment here. |
||
tokio::pin!(timeout); | ||
|
||
tokio::select! { | ||
_ = &mut tripwire => break, | ||
_ = &mut shutdown_signal => break, | ||
_ = &mut timeout => { | ||
// This connection is currently holding a permit, but has not received data for some time. Release | ||
// the permit to let another connection try | ||
continue; | ||
} | ||
res = frames.next() => { | ||
match res { | ||
Some(frame) => { | ||
if let Some(permit) = &mut permit { | ||
// Note that this is intentionally not the "number of events in a single request", but rather | ||
// the "number of events currently available". This may contain events from multiple events, | ||
// but it should always contain all events from each request. | ||
permit.decoding_finished(1); | ||
}; | ||
handle_tcp_frame(&mut frame_handler, frame, &mut event_sink, received_from.clone(), Arc::clone(&active_parsing_task_nums)).await; | ||
} | ||
None => { | ||
debug!("Connection closed."); | ||
break | ||
}, | ||
} | ||
} | ||
else => break, | ||
} | ||
|
||
drop(permit); | ||
} | ||
} | ||
|
||
async fn handle_tcp_frame<T>( | ||
frame_handler: &mut T, | ||
frame: Bytes, | ||
event_sink: &mut SourceSender, | ||
received_from: Option<Bytes>, | ||
active_parsing_task_nums: Arc<AtomicU32>, | ||
) where | ||
T: TcpFrameHandler + Send + Sync + Clone + 'static, | ||
{ | ||
if frame_handler.multithreaded() { | ||
spawn_event_handling_tasks( | ||
frame, | ||
frame_handler.clone(), | ||
event_sink.clone(), | ||
received_from, | ||
active_parsing_task_nums, | ||
frame_handler.max_frame_handling_tasks(), | ||
) | ||
.await; | ||
} else if let Some(event) = frame_handler.handle_event(received_from, frame) { | ||
if let Err(e) = event_sink.send_event(event).await { | ||
error!("Error sending event: {e:?}."); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -778,23 +867,23 @@ fn build_framestream_source<T: Send + 'static>( | |
let handler = async move { | ||
frames | ||
.for_each(move |f| { | ||
future::ready({ | ||
let max_frame_handling_tasks = | ||
frame_handler_copy.max_frame_handling_tasks(); | ||
let f_handler = frame_handler_copy.clone(); | ||
let received_from_copy = received_from.clone(); | ||
let event_sink_copy = event_sink.clone(); | ||
let active_task_nums_copy = Arc::clone(&active_task_nums); | ||
let max_frame_handling_tasks = frame_handler_copy.max_frame_handling_tasks(); | ||
let f_handler = frame_handler_copy.clone(); | ||
let received_from_copy = received_from.clone(); | ||
let event_sink_copy = event_sink.clone(); | ||
let active_task_nums_copy = Arc::clone(&active_task_nums); | ||
|
||
async move { | ||
spawn_event_handling_tasks( | ||
f, | ||
f_handler, | ||
event_sink_copy, | ||
received_from_copy, | ||
active_task_nums_copy, | ||
max_frame_handling_tasks, | ||
); | ||
}) | ||
) | ||
.await; | ||
} | ||
}) | ||
.await; | ||
info!("Finished sending."); | ||
|
@@ -803,15 +892,15 @@ fn build_framestream_source<T: Send + 'static>( | |
} | ||
} | ||
|
||
fn spawn_event_handling_tasks( | ||
async fn spawn_event_handling_tasks( | ||
event_data: Bytes, | ||
event_handler: impl FrameHandler + Send + Sync + 'static, | ||
mut event_sink: SourceSender, | ||
received_from: Option<Bytes>, | ||
active_task_nums: Arc<AtomicU32>, | ||
max_frame_handling_tasks: u32, | ||
) -> JoinHandle<()> { | ||
wait_for_task_quota(&active_task_nums, max_frame_handling_tasks); | ||
wait_for_task_quota(&active_task_nums, max_frame_handling_tasks).await; | ||
|
||
tokio::spawn(async move { | ||
future::ready({ | ||
|
@@ -826,9 +915,9 @@ fn spawn_event_handling_tasks( | |
}) | ||
} | ||
|
||
fn wait_for_task_quota(active_task_nums: &Arc<AtomicU32>, max_tasks: u32) { | ||
async fn wait_for_task_quota(active_task_nums: &Arc<AtomicU32>, max_tasks: u32) { | ||
while max_tasks > 0 && max_tasks < active_task_nums.load(Ordering::Acquire) { | ||
thread::sleep(Duration::from_millis(3)); | ||
tokio::time::sleep(Duration::from_millis(3)).await; | ||
esensar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
active_task_nums.fetch_add(1, Ordering::AcqRel); | ||
} | ||
|
@@ -1677,14 +1766,17 @@ mod test { | |
})); | ||
|
||
for i in 0..total_events { | ||
join_handles.push(spawn_event_handling_tasks( | ||
Bytes::from(format!("event_{}", i)), | ||
MockFrameHandler::new("test_content".to_string(), true, extra_routine.clone()), | ||
out.clone(), | ||
None, | ||
Arc::clone(&active_task_nums_copy), | ||
max_frame_handling_tasks, | ||
)); | ||
join_handles.push( | ||
spawn_event_handling_tasks( | ||
Bytes::from(format!("event_{i}")), | ||
MockFrameHandler::new("test_content".to_string(), true, extra_routine.clone()), | ||
out.clone(), | ||
None, | ||
Arc::clone(&active_task_nums_copy), | ||
max_frame_handling_tasks, | ||
) | ||
.await, | ||
); | ||
} | ||
|
||
future::join_all(join_handles).await; | ||
|
Uh oh!
There was an error while loading. Please reload this page.