Skip to content

Commit 9e6e019

Browse files
committed
dogstatsd: remove length prefix in PayloadWriter
Earlier, it was assumed that RemoteAddr::Unixgram has to also prefix length, which made the code complex to pad sent buffer with length, to avoid copy before sending. Since length prefix is no longer needed, make the code simple by pushing the length padding appending only before send.
1 parent 71af3c0 commit 9e6e019

File tree

3 files changed

+20
-100
lines changed

3 files changed

+20
-100
lines changed

metrics-exporter-dogstatsd/src/forwarder/mod.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -110,19 +110,6 @@ pub(crate) struct ForwarderConfiguration {
110110
pub write_timeout: Duration,
111111
}
112112

113-
impl ForwarderConfiguration {
114-
/// Returns `true` if the remote address requires a length prefix to be sent before each payload.
115-
pub fn is_length_prefixed(&self) -> bool {
116-
match self.remote_addr {
117-
RemoteAddr::Udp(_) => false,
118-
#[cfg(target_os = "linux")]
119-
RemoteAddr::Unix(_) => true,
120-
#[cfg(target_os = "linux")]
121-
RemoteAddr::Unixgram(_) => false,
122-
}
123-
}
124-
}
125-
126113
#[cfg(test)]
127114
mod tests {
128115
use std::net::SocketAddrV4;

metrics-exporter-dogstatsd/src/forwarder/sync.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,17 @@ impl Client {
6060
Client::Unixgram(socket) => socket.send(buf),
6161

6262
#[cfg(target_os = "linux")]
63-
Client::Unix(socket) => match socket.write_all(buf) {
64-
Ok(()) => Ok(buf.len()),
65-
Err(e) => Err(e),
66-
},
63+
Client::Unix(socket) => {
64+
match u32::try_from(buf.len()) {
65+
Ok(len) => socket.write_all(&len.to_be_bytes())?,
66+
Err(e) => {
67+
use std::io::{Error, ErrorKind};
68+
return Err(Error::new(ErrorKind::InvalidData, e));
69+
}
70+
}
71+
72+
socket.write_all(buf).map(|()| buf.len())
73+
}
6774
}
6875
}
6976
}
@@ -142,8 +149,7 @@ impl Forwarder {
142149
/// Run the forwarder, sending out payloads to the configured remote address at the configured interval.
143150
pub fn run(mut self) {
144151
let mut flush_state = FlushState::default();
145-
let mut writer =
146-
PayloadWriter::new(self.config.max_payload_len, self.config.is_length_prefixed());
152+
let mut writer = PayloadWriter::new(self.config.max_payload_len);
147153
let mut telemetry_update = TelemetryUpdate::default();
148154

149155
let mut next_flush = Instant::now() + self.config.flush_interval;

metrics-exporter-dogstatsd/src/writer.rs

Lines changed: 8 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -46,29 +46,19 @@ pub(super) struct PayloadWriter {
4646
buf: Vec<u8>,
4747
trailer_buf: Vec<u8>,
4848
offsets: Vec<usize>,
49-
with_length_prefix: bool,
5049
}
5150

5251
impl PayloadWriter {
5352
/// Creates a new `PayloadWriter` with the given maximum payload length.
54-
pub fn new(max_payload_len: usize, with_length_prefix: bool) -> Self {
53+
pub fn new(max_payload_len: usize) -> Self {
5554
// NOTE: This should also be handled in the builder, but we want to just double check here that we're getting a
5655
// properly sanitized value.
5756
assert!(
5857
u32::try_from(max_payload_len).is_ok(),
5958
"maximum payload length must be less than 2^32 bytes"
6059
);
6160

62-
let mut writer = Self {
63-
max_payload_len,
64-
buf: Vec::new(),
65-
trailer_buf: Vec::new(),
66-
offsets: Vec::new(),
67-
with_length_prefix,
68-
};
69-
70-
writer.prepare_for_write();
71-
writer
61+
Self { max_payload_len, buf: Vec::new(), trailer_buf: Vec::new(), offsets: Vec::new() }
7262
}
7363

7464
fn last_offset(&self) -> usize {
@@ -80,21 +70,10 @@ impl PayloadWriter {
8070
//
8171
// If there aren't any committed metrics, then the last offset is simply zero.
8272
let last_offset = self.last_offset();
83-
let maybe_length_prefix_len = if self.with_length_prefix { 4 } else { 0 };
84-
self.buf.len() - last_offset - maybe_length_prefix_len
85-
}
86-
87-
fn prepare_for_write(&mut self) {
88-
if self.with_length_prefix {
89-
// If we're adding length prefixes, we need to write the length of the payload first.
90-
//
91-
// We write a dummy length of zero for now, and then we'll go back and fill it in later.
92-
self.buf.extend_from_slice(&[0, 0, 0, 0]);
93-
}
73+
self.buf.len() - last_offset
9474
}
9575

9676
fn commit(&mut self) -> bool {
97-
let current_last_offset = self.last_offset();
9877
let current_len = self.current_len();
9978
if current_len > self.max_payload_len {
10079
// If the current metric is too long, we need to truncate everything we just wrote to get us back to the end
@@ -107,19 +86,6 @@ impl PayloadWriter {
10786
// Track the new offset.
10887
self.offsets.push(self.buf.len());
10988

110-
// If we're dealing with length-delimited payloads, go back to the beginning of this payload and fill in the
111-
// length of it.
112-
if self.with_length_prefix {
113-
// NOTE: We unwrap the conversion here because we know that `self.max_payload_len` is less than 2^32, and we
114-
// check above that `current_len` is less than or equal to `self.max_payload_len`.
115-
let current_len_buf = u32::try_from(current_len).unwrap().to_le_bytes();
116-
self.buf[current_last_offset..current_last_offset + 4]
117-
.copy_from_slice(&current_len_buf[..]);
118-
}
119-
120-
// Initialize the buffer for the next payload.
121-
self.prepare_for_write();
122-
12389
true
12490
}
12591

@@ -542,7 +508,7 @@ mod tests {
542508
];
543509

544510
for (key, value, ts, prefix, global_labels, expected) in cases {
545-
let mut writer = PayloadWriter::new(8192, false);
511+
let mut writer = PayloadWriter::new(8192);
546512
let result = writer.write_counter(&key, value, ts, prefix, global_labels);
547513
assert_eq!(result.payloads_written(), 1);
548514

@@ -607,7 +573,7 @@ mod tests {
607573
];
608574

609575
for (key, value, ts, prefix, global_labels, expected) in cases {
610-
let mut writer = PayloadWriter::new(8192, false);
576+
let mut writer = PayloadWriter::new(8192);
611577
let result = writer.write_gauge(&key, value, ts, prefix, global_labels);
612578
assert_eq!(result.payloads_written(), 1);
613579

@@ -666,7 +632,7 @@ mod tests {
666632
];
667633

668634
for (key, values, prefix, global_labels, expected) in cases {
669-
let mut writer = PayloadWriter::new(8192, false);
635+
let mut writer = PayloadWriter::new(8192);
670636
let result =
671637
writer.write_histogram(&key, values.iter().copied(), None, prefix, global_labels);
672638
assert_eq!(result.payloads_written(), 1);
@@ -726,7 +692,7 @@ mod tests {
726692
];
727693

728694
for (key, values, prefix, global_labels, expected) in cases {
729-
let mut writer = PayloadWriter::new(8192, false);
695+
let mut writer = PayloadWriter::new(8192);
730696
let result = writer.write_distribution(
731697
&key,
732698
values.iter().copied(),
@@ -741,51 +707,12 @@ mod tests {
741707
}
742708
}
743709

744-
#[test]
745-
fn length_prefix() {
746-
let prefixed = |buf: &str| {
747-
let mut prefixed_buf = Vec::with_capacity(buf.len() + 4);
748-
prefixed_buf.extend_from_slice(&(buf.len() as u32).to_le_bytes());
749-
prefixed_buf.extend_from_slice(buf.as_bytes());
750-
prefixed_buf
751-
};
752-
753-
// Cases are defined as: metric key, metric values, metric timestamp, expected output.
754-
let cases = [
755-
(Key::from("test_distribution"), &[22.22][..], prefixed("test_distribution:22.22|d\n")),
756-
(
757-
Key::from_parts("test_distribution", &[("foo", "bar"), ("baz", "quux")]),
758-
&[88.0][..],
759-
prefixed("test_distribution:88.0|d|#foo:bar,baz:quux\n"),
760-
),
761-
(
762-
Key::from("test_distribution"),
763-
&[22.22, 33.33, 44.44][..],
764-
prefixed("test_distribution:22.22:33.33:44.44|d\n"),
765-
),
766-
(
767-
Key::from_parts("test_distribution", &[("foo", "bar"), ("baz", "quux")]),
768-
&[88.0, 66.6, 123.4][..],
769-
prefixed("test_distribution:88.0:66.6:123.4|d|#foo:bar,baz:quux\n"),
770-
),
771-
];
772-
773-
for (key, values, expected) in cases {
774-
let mut writer = PayloadWriter::new(8192, true);
775-
let result = writer.write_distribution(&key, values.iter().copied(), None, None, &[]);
776-
assert_eq!(result.payloads_written(), 1);
777-
778-
let actual = buf_from_writer(&mut writer);
779-
assert_eq!(actual, expected);
780-
}
781-
}
782-
783710
proptest! {
784711
#[test]
785712
fn property_test_gauntlet(payload_limit in 0..16384usize, inputs in arb_vec(arb_metric(), 1..128)) {
786713
// TODO: Parameterize reservoir size so we can exercise the sample rate stuff.[]
787714

788-
let mut writer = PayloadWriter::new(payload_limit, false);
715+
let mut writer = PayloadWriter::new(payload_limit);
789716
let mut total_input_points: u64 = 0;
790717
let mut payloads_written = 0;
791718
let mut points_dropped = 0;

0 commit comments

Comments
 (0)