From 71af3c032b6cf7dc58b1684333b86c32b6822065 Mon Sep 17 00:00:00 2001 From: Mariappan Ramasamy <1221719+nappairam@users.noreply.github.com> Date: Mon, 3 Mar 2025 17:09:55 +0800 Subject: [PATCH 1/3] dogstatsd: send length_prefix only for Unix Stream Length prefix is only needed in case of Stream and not for Datagram whether it is Unix or UDP socket. Reference: https://github.com/DataDog/datadog-go/blob/bae3560e5c664d64b71c0e8ca89326afa362e12a/statsd/uds.go#L62 --- metrics-exporter-dogstatsd/src/forwarder/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics-exporter-dogstatsd/src/forwarder/mod.rs b/metrics-exporter-dogstatsd/src/forwarder/mod.rs index 513476a6..a4ded968 100644 --- a/metrics-exporter-dogstatsd/src/forwarder/mod.rs +++ b/metrics-exporter-dogstatsd/src/forwarder/mod.rs @@ -118,7 +118,7 @@ impl ForwarderConfiguration { #[cfg(target_os = "linux")] RemoteAddr::Unix(_) => true, #[cfg(target_os = "linux")] - RemoteAddr::Unixgram(_) => true, + RemoteAddr::Unixgram(_) => false, } } } From 9e6e01919b0f85c6b5dd89f588d530de4d1daa0b Mon Sep 17 00:00:00 2001 From: Mariappan Ramasamy <1221719+nappairam@users.noreply.github.com> Date: Mon, 3 Mar 2025 17:09:55 +0800 Subject: [PATCH 2/3] dogstatsd: remove length prefix in PayloadWriter Earlier, it was assumed that RemoteAddr::Unixgram has to also prefix length, which made the code complex to pad sent buffer with length, to avoid copy before sending. Since length prefix is no longer needed, make the code simple by pushing the length padding appending only before send. --- .../src/forwarder/mod.rs | 13 --- .../src/forwarder/sync.rs | 18 ++-- metrics-exporter-dogstatsd/src/writer.rs | 89 ++----------------- 3 files changed, 20 insertions(+), 100 deletions(-) diff --git a/metrics-exporter-dogstatsd/src/forwarder/mod.rs b/metrics-exporter-dogstatsd/src/forwarder/mod.rs index a4ded968..8ba93437 100644 --- a/metrics-exporter-dogstatsd/src/forwarder/mod.rs +++ b/metrics-exporter-dogstatsd/src/forwarder/mod.rs @@ -110,19 +110,6 @@ pub(crate) struct ForwarderConfiguration { pub write_timeout: Duration, } -impl ForwarderConfiguration { - /// Returns `true` if the remote address requires a length prefix to be sent before each payload. - pub fn is_length_prefixed(&self) -> bool { - match self.remote_addr { - RemoteAddr::Udp(_) => false, - #[cfg(target_os = "linux")] - RemoteAddr::Unix(_) => true, - #[cfg(target_os = "linux")] - RemoteAddr::Unixgram(_) => false, - } - } -} - #[cfg(test)] mod tests { use std::net::SocketAddrV4; diff --git a/metrics-exporter-dogstatsd/src/forwarder/sync.rs b/metrics-exporter-dogstatsd/src/forwarder/sync.rs index e81f6d95..deb3c6d4 100644 --- a/metrics-exporter-dogstatsd/src/forwarder/sync.rs +++ b/metrics-exporter-dogstatsd/src/forwarder/sync.rs @@ -60,10 +60,17 @@ impl Client { Client::Unixgram(socket) => socket.send(buf), #[cfg(target_os = "linux")] - Client::Unix(socket) => match socket.write_all(buf) { - Ok(()) => Ok(buf.len()), - Err(e) => Err(e), - }, + Client::Unix(socket) => { + match u32::try_from(buf.len()) { + Ok(len) => socket.write_all(&len.to_be_bytes())?, + Err(e) => { + use std::io::{Error, ErrorKind}; + return Err(Error::new(ErrorKind::InvalidData, e)); + } + } + + socket.write_all(buf).map(|()| buf.len()) + } } } } @@ -142,8 +149,7 @@ impl Forwarder { /// Run the forwarder, sending out payloads to the configured remote address at the configured interval. pub fn run(mut self) { let mut flush_state = FlushState::default(); - let mut writer = - PayloadWriter::new(self.config.max_payload_len, self.config.is_length_prefixed()); + let mut writer = PayloadWriter::new(self.config.max_payload_len); let mut telemetry_update = TelemetryUpdate::default(); let mut next_flush = Instant::now() + self.config.flush_interval; diff --git a/metrics-exporter-dogstatsd/src/writer.rs b/metrics-exporter-dogstatsd/src/writer.rs index 44812179..9bc5b05e 100644 --- a/metrics-exporter-dogstatsd/src/writer.rs +++ b/metrics-exporter-dogstatsd/src/writer.rs @@ -46,12 +46,11 @@ pub(super) struct PayloadWriter { buf: Vec, trailer_buf: Vec, offsets: Vec, - with_length_prefix: bool, } impl PayloadWriter { /// Creates a new `PayloadWriter` with the given maximum payload length. - pub fn new(max_payload_len: usize, with_length_prefix: bool) -> Self { + pub fn new(max_payload_len: usize) -> Self { // NOTE: This should also be handled in the builder, but we want to just double check here that we're getting a // properly sanitized value. assert!( @@ -59,16 +58,7 @@ impl PayloadWriter { "maximum payload length must be less than 2^32 bytes" ); - let mut writer = Self { - max_payload_len, - buf: Vec::new(), - trailer_buf: Vec::new(), - offsets: Vec::new(), - with_length_prefix, - }; - - writer.prepare_for_write(); - writer + Self { max_payload_len, buf: Vec::new(), trailer_buf: Vec::new(), offsets: Vec::new() } } fn last_offset(&self) -> usize { @@ -80,21 +70,10 @@ impl PayloadWriter { // // If there aren't any committed metrics, then the last offset is simply zero. let last_offset = self.last_offset(); - let maybe_length_prefix_len = if self.with_length_prefix { 4 } else { 0 }; - self.buf.len() - last_offset - maybe_length_prefix_len - } - - fn prepare_for_write(&mut self) { - if self.with_length_prefix { - // If we're adding length prefixes, we need to write the length of the payload first. - // - // We write a dummy length of zero for now, and then we'll go back and fill it in later. - self.buf.extend_from_slice(&[0, 0, 0, 0]); - } + self.buf.len() - last_offset } fn commit(&mut self) -> bool { - let current_last_offset = self.last_offset(); let current_len = self.current_len(); if current_len > self.max_payload_len { // If the current metric is too long, we need to truncate everything we just wrote to get us back to the end @@ -107,19 +86,6 @@ impl PayloadWriter { // Track the new offset. self.offsets.push(self.buf.len()); - // If we're dealing with length-delimited payloads, go back to the beginning of this payload and fill in the - // length of it. - if self.with_length_prefix { - // NOTE: We unwrap the conversion here because we know that `self.max_payload_len` is less than 2^32, and we - // check above that `current_len` is less than or equal to `self.max_payload_len`. - let current_len_buf = u32::try_from(current_len).unwrap().to_le_bytes(); - self.buf[current_last_offset..current_last_offset + 4] - .copy_from_slice(¤t_len_buf[..]); - } - - // Initialize the buffer for the next payload. - self.prepare_for_write(); - true } @@ -542,7 +508,7 @@ mod tests { ]; for (key, value, ts, prefix, global_labels, expected) in cases { - let mut writer = PayloadWriter::new(8192, false); + let mut writer = PayloadWriter::new(8192); let result = writer.write_counter(&key, value, ts, prefix, global_labels); assert_eq!(result.payloads_written(), 1); @@ -607,7 +573,7 @@ mod tests { ]; for (key, value, ts, prefix, global_labels, expected) in cases { - let mut writer = PayloadWriter::new(8192, false); + let mut writer = PayloadWriter::new(8192); let result = writer.write_gauge(&key, value, ts, prefix, global_labels); assert_eq!(result.payloads_written(), 1); @@ -666,7 +632,7 @@ mod tests { ]; for (key, values, prefix, global_labels, expected) in cases { - let mut writer = PayloadWriter::new(8192, false); + let mut writer = PayloadWriter::new(8192); let result = writer.write_histogram(&key, values.iter().copied(), None, prefix, global_labels); assert_eq!(result.payloads_written(), 1); @@ -726,7 +692,7 @@ mod tests { ]; for (key, values, prefix, global_labels, expected) in cases { - let mut writer = PayloadWriter::new(8192, false); + let mut writer = PayloadWriter::new(8192); let result = writer.write_distribution( &key, values.iter().copied(), @@ -741,51 +707,12 @@ mod tests { } } - #[test] - fn length_prefix() { - let prefixed = |buf: &str| { - let mut prefixed_buf = Vec::with_capacity(buf.len() + 4); - prefixed_buf.extend_from_slice(&(buf.len() as u32).to_le_bytes()); - prefixed_buf.extend_from_slice(buf.as_bytes()); - prefixed_buf - }; - - // Cases are defined as: metric key, metric values, metric timestamp, expected output. - let cases = [ - (Key::from("test_distribution"), &[22.22][..], prefixed("test_distribution:22.22|d\n")), - ( - Key::from_parts("test_distribution", &[("foo", "bar"), ("baz", "quux")]), - &[88.0][..], - prefixed("test_distribution:88.0|d|#foo:bar,baz:quux\n"), - ), - ( - Key::from("test_distribution"), - &[22.22, 33.33, 44.44][..], - prefixed("test_distribution:22.22:33.33:44.44|d\n"), - ), - ( - Key::from_parts("test_distribution", &[("foo", "bar"), ("baz", "quux")]), - &[88.0, 66.6, 123.4][..], - prefixed("test_distribution:88.0:66.6:123.4|d|#foo:bar,baz:quux\n"), - ), - ]; - - for (key, values, expected) in cases { - let mut writer = PayloadWriter::new(8192, true); - let result = writer.write_distribution(&key, values.iter().copied(), None, None, &[]); - assert_eq!(result.payloads_written(), 1); - - let actual = buf_from_writer(&mut writer); - assert_eq!(actual, expected); - } - } - proptest! { #[test] fn property_test_gauntlet(payload_limit in 0..16384usize, inputs in arb_vec(arb_metric(), 1..128)) { // TODO: Parameterize reservoir size so we can exercise the sample rate stuff.[] - let mut writer = PayloadWriter::new(payload_limit, false); + let mut writer = PayloadWriter::new(payload_limit); let mut total_input_points: u64 = 0; let mut payloads_written = 0; let mut points_dropped = 0; From 8d3d692c28787ff50f6dbf70fb0f48ec293f3435 Mon Sep 17 00:00:00 2001 From: Mariappan Ramasamy <1221719+nappairam@users.noreply.github.com> Date: Mon, 3 Mar 2025 17:23:57 +0800 Subject: [PATCH 3/3] dogstatds: consolidate metrics in a single payload until max_payload_len This avoids sending each metric in a separate UDP packet and saves CPU cycles. --- metrics-exporter-dogstatsd/src/writer.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/metrics-exporter-dogstatsd/src/writer.rs b/metrics-exporter-dogstatsd/src/writer.rs index 9bc5b05e..e59dde73 100644 --- a/metrics-exporter-dogstatsd/src/writer.rs +++ b/metrics-exporter-dogstatsd/src/writer.rs @@ -83,8 +83,19 @@ impl PayloadWriter { return false; } - // Track the new offset. - self.offsets.push(self.buf.len()); + // Offset update + if current_len + self.last_offset() <= self.max_payload_len { + // If the current metric can be written within the max_payload_len + // replace the last offset (if there is valid offset) + if let Some(last_offset) = self.offsets.last_mut() { + *last_offset = self.buf.len(); + } else { + self.offsets.push(self.buf.len()); + } + } else { + // - else add a new offset to send current metric in a new Packet + self.offsets.push(self.buf.len()); + } true }