Skip to content

Commit 3d4204a

Browse files
authored
fix(dogstatsd): Fix panic when using length-prefixed streams (#593)
Signed-off-by: Michel Heily <michelheily@gmail.com>
1 parent 6a3e14e commit 3d4204a

File tree

1 file changed

+17
-51
lines changed

1 file changed

+17
-51
lines changed

metrics-exporter-dogstatsd/src/writer.rs

Lines changed: 17 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
use std::{
2-
ops::{Deref, DerefMut},
3-
vec::Drain,
4-
};
1+
use std::vec::Drain;
52

63
use metrics::{Key, Label};
74

@@ -523,16 +520,26 @@ impl PayloadWriter {
523520
}
524521

525522
/// Iterator over all payloads written by a `PayloadWriter`.
523+
///
524+
/// The source payload buffer is immediately drained of consumed data during the creation of this iterator
525+
/// ("pre-pooping our pants"). This ensures that the end state - the payload buffer contains only preserved
526+
/// bytes (like length prefixes) - is established immediately.
527+
///
528+
/// https://faultlore.com/blah/everyone-poops/
526529
pub struct Payloads<'a> {
527-
payloads_buf: ConsumingBufferSwap<'a, u8>,
530+
payloads_buf: Vec<u8>,
528531
start: usize,
529532
offsets: Drain<'a, usize>,
530533
}
531534

532535
impl<'a> Payloads<'a> {
533536
fn new(payload_buf: &'a mut Vec<u8>, offsets: &'a mut Vec<usize>) -> Self {
537+
// When draining payloads, we need to preserve any bytes that come after the last offset.
538+
// These bytes (like length prefixes) are not part of the current payloads but are needed
539+
// for the next write operation.
540+
let drain_size = offsets.last().copied().unwrap_or(0);
534541
Self {
535-
payloads_buf: ConsumingBufferSwap::new(payload_buf),
542+
payloads_buf: payload_buf.drain(0..drain_size).collect(),
536543
start: 0,
537544
offsets: offsets.drain(..),
538545
}
@@ -556,51 +563,6 @@ impl<'a> Payloads<'a> {
556563
}
557564
}
558565

559-
// Helper type for "pre-pooping our pants".
560-
//
561-
// This type is used when a `Vec<T>` is meant to be drained during an operation, such that the buffer is entirely empty
562-
// after the operation is finished. Since it's safe to "forget" a value and not have its drop logic called, how do we
563-
// ensure that we don't leave the buffer in an indeterminate state without drop logic? We pre-poop our pants.
564-
//
565-
// By swapping out the buffer with an empty one, we ensure that the end state -- the buffer is cleared -- is established
566-
// as soon as we create `ConsumingBufferSwap`. When the drop logic is called, we replace the original buffer, which lets
567-
// use reuse the allocation. At worst, if the drop logic doesn't run, then the buffer is still empty.
568-
//
569-
// https://faultlore.com/blah/everyone-poops/
570-
struct ConsumingBufferSwap<'a, T> {
571-
source: &'a mut Vec<T>,
572-
original: Vec<T>,
573-
}
574-
575-
impl<'a, T> ConsumingBufferSwap<'a, T> {
576-
fn new(source: &'a mut Vec<T>) -> Self {
577-
let original = std::mem::take(source);
578-
Self { source, original }
579-
}
580-
}
581-
582-
impl<'a, T> Drop for ConsumingBufferSwap<'a, T> {
583-
fn drop(&mut self) {
584-
// Clear out the original buffer to reset it, and then return it to the source.
585-
self.original.clear();
586-
std::mem::swap(self.source, &mut self.original);
587-
}
588-
}
589-
590-
impl<'a, T> Deref for ConsumingBufferSwap<'a, T> {
591-
type Target = Vec<T>;
592-
593-
fn deref(&self) -> &Self::Target {
594-
&self.original
595-
}
596-
}
597-
598-
impl<'a, T> DerefMut for ConsumingBufferSwap<'a, T> {
599-
fn deref_mut(&mut self) -> &mut Self::Target {
600-
&mut self.original
601-
}
602-
}
603-
604566
fn write_tag(buf: &mut Vec<u8>, label: &Label) {
605567
// If the label value is empty, we treat it as a bare label. This means all we write is something like
606568
// `label_name`, instead of a more naive form, like `label_name:`.
@@ -958,6 +920,10 @@ mod tests {
958920

959921
let actual = buf_from_writer(&mut writer);
960922
assert_eq!(actual, expected);
923+
924+
// Write another payload (as a sanity check for previous panic bug)
925+
let result = writer.write_distribution(&key, values.iter().copied(), None, None);
926+
assert_eq!(result.payloads_written(), 1);
961927
}
962928
}
963929

0 commit comments

Comments
 (0)