Skip to content

Commit db683a1

Browse files
ganelooganelpront
authored andcommitted
enhancement(kubernetes_logs source): Allow specification of a maximum line size to be applied after merging instead of just before (#22582)
* Add config for maximum allowed line size after merging * Add warns when we drop partial logs for being too big; shift some comments around * Add changelog * Format * Increment component_discarded_events_total on violation of max_line_size and max_merged_line_size * Update changelog.d/22581_max_merged_line_bytes.feature.md Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com> * Don't emit expired events that are too big nor ones that don't appear to be partial; fix test * Fix another test * Update src/sources/kubernetes_logs/mod.rs Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com> * Update src/sources/kubernetes_logs/mod.rs Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com> * Remove inadvertently added file * Include Value rather than Event in error struct * Rename field in bucket struct * Move max_merged_line_bytes from being a param to being a field on the state struct * Make new config field optional, defaulting to old behavior * Format * Appease check-events * docs regen * Tweak wording of doc; emit only first 1k bytes of dropped lines in error * Rename fields for clarity * Per PR feedback: copy just the initial 1000 bytes rather than cloning the whole buffer and then truncating, use more idiomatic Rust for handling both configured and unconfigured cases of max_merged_line_bytes * Allow spelling of already-merged changelog filename * Don't try to include more characters than there actually are in the slice * Don't just get enough capacity, make sure length matches too * Formatting --------- Co-authored-by: Orri Ganel <oganel@palantir.com> Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com>
1 parent f93fa08 commit db683a1

File tree

14 files changed

+358
-54
lines changed

14 files changed

+358
-54
lines changed

.github/actions/spelling/expect.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ efgh
175175
Elhage
176176
emerg
177177
Enableable
178+
enableable
178179
endianess
179180
endler
180181
eni
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
The `kubernetes_logs` source now includes a new `max_merged_line_bytes` configuration option. This setting enables users to cap the size of log lines after they’ve been combined using `auto_partial_merge`. Previously, the `max_line_bytes` field only restricted line sizes *before* merging, leaving no practical way to limit the length of merged lines—unless you set a size so tiny that it prevented merging altogether by stopping short of the continuation character. This new option gives you better control over merged line sizes.
2+
3+
authors: ganelo

lib/file-source/src/buffer.rs

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
1-
use std::io::{self, BufRead};
1+
use std::{
2+
cmp::min,
3+
io::{self, BufRead},
4+
};
25

36
use bstr::Finder;
47
use bytes::BytesMut;
5-
use tracing::warn;
68

79
use crate::FilePosition;
810

11+
pub struct ReadResult {
12+
pub successfully_read: Option<usize>,
13+
pub discarded_for_size_and_truncated: Vec<BytesMut>,
14+
}
15+
916
/// Read up to `max_size` bytes from `reader`, splitting by `delim`
1017
///
1118
/// The function reads up to `max_size` bytes from `reader`, splitting the input
@@ -29,17 +36,18 @@ use crate::FilePosition;
2936
/// Benchmarks indicate that this function processes in the high single-digit
3037
/// GiB/s range for buffers of length 1KiB. For buffers any smaller than this
3138
/// the overhead of setup dominates our benchmarks.
32-
pub fn read_until_with_max_size<R: BufRead + ?Sized>(
33-
reader: &mut R,
34-
position: &mut FilePosition,
35-
delim: &[u8],
36-
buf: &mut BytesMut,
39+
pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>(
40+
reader: &'a mut R,
41+
position: &'a mut FilePosition,
42+
delim: &'a [u8],
43+
buf: &'a mut BytesMut,
3744
max_size: usize,
38-
) -> io::Result<Option<usize>> {
45+
) -> io::Result<ReadResult> {
3946
let mut total_read = 0;
4047
let mut discarding = false;
4148
let delim_finder = Finder::new(delim);
4249
let delim_len = delim.len();
50+
let mut discarded_for_size_and_truncated = Vec::new();
4351
loop {
4452
let available: &[u8] = match reader.fill_buf() {
4553
Ok(n) => n,
@@ -68,16 +76,20 @@ pub fn read_until_with_max_size<R: BufRead + ?Sized>(
6876
total_read += used;
6977

7078
if !discarding && buf.len() > max_size {
71-
warn!(
72-
message = "Found line that exceeds max_line_bytes; discarding.",
73-
internal_log_rate_limit = true
74-
);
79+
// keep only the first <1k bytes to make sure we can actually emit a usable error
80+
let length_to_keep = min(1000, max_size);
81+
let mut truncated: BytesMut = BytesMut::zeroed(length_to_keep);
82+
truncated.copy_from_slice(&buf[0..length_to_keep]);
83+
discarded_for_size_and_truncated.push(truncated);
7584
discarding = true;
7685
}
7786

7887
if done {
7988
if !discarding {
80-
return Ok(Some(total_read));
89+
return Ok(ReadResult {
90+
successfully_read: Some(total_read),
91+
discarded_for_size_and_truncated,
92+
});
8193
} else {
8294
discarding = false;
8395
buf.clear();
@@ -87,7 +99,10 @@ pub fn read_until_with_max_size<R: BufRead + ?Sized>(
8799
// us to observe an incomplete write. We return None here and let the loop continue
88100
// next time the method is called. This is safe because the buffer is specific to this
89101
// FileWatcher.
90-
return Ok(None);
102+
return Ok(ReadResult {
103+
successfully_read: None,
104+
discarded_for_size_and_truncated,
105+
});
91106
}
92107
}
93108
}
@@ -99,6 +114,8 @@ mod test {
99114
use bytes::{BufMut, BytesMut};
100115
use quickcheck::{QuickCheck, TestResult};
101116

117+
use crate::buffer::ReadResult;
118+
102119
use super::read_until_with_max_size;
103120

104121
fn qc_inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
@@ -181,7 +198,10 @@ mod test {
181198
)
182199
.unwrap()
183200
{
184-
None => {
201+
ReadResult {
202+
successfully_read: None,
203+
discarded_for_size_and_truncated: _,
204+
} => {
185205
// Subject only returns None if this is the last chunk _and_
186206
// the chunk did not contain a delimiter _or_ the delimiter
187207
// was outside the max_size range _or_ the current chunk is empty.
@@ -190,7 +210,10 @@ mod test {
190210
.any(|details| ((details.chunk_index == idx) && details.within_max_size));
191211
assert!(chunk.is_empty() || !has_valid_delimiter)
192212
}
193-
Some(total_read) => {
213+
ReadResult {
214+
successfully_read: Some(total_read),
215+
discarded_for_size_and_truncated: _,
216+
} => {
194217
// Now that the function has returned we confirm that the
195218
// returned details match our `first_delim` and also that
196219
// the `buffer` is populated correctly.

lib/file-source/src/file_server.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use tracing::{debug, error, info, trace};
1919

2020
use crate::{
2121
checkpointer::{Checkpointer, CheckpointsView},
22-
file_watcher::FileWatcher,
22+
file_watcher::{FileWatcher, RawLineResult},
2323
fingerprinter::{FileFingerprint, Fingerprinter},
2424
paths_provider::PathsProvider,
2525
FileSourceInternalEvents, ReadFrom,
@@ -263,7 +263,19 @@ where
263263

264264
let start = time::Instant::now();
265265
let mut bytes_read: usize = 0;
266-
while let Ok(Some(line)) = watcher.read_line() {
266+
while let Ok(RawLineResult {
267+
raw_line: Some(line),
268+
discarded_for_size_and_truncated,
269+
}) = watcher.read_line()
270+
{
271+
discarded_for_size_and_truncated.iter().for_each(|buf| {
272+
self.emitter.emit_file_line_too_long(
273+
&buf.clone(),
274+
self.max_line_bytes,
275+
buf.len(),
276+
)
277+
});
278+
267279
let sz = line.bytes.len();
268280
trace!(
269281
message = "Read bytes.",

lib/file-source/src/file_watcher/mod.rs

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ use tracing::debug;
1212
use vector_common::constants::GZIP_MAGIC;
1313

1414
use crate::{
15-
buffer::read_until_with_max_size, metadata_ext::PortableFileExt, FilePosition, ReadFrom,
15+
buffer::{read_until_with_max_size, ReadResult},
16+
metadata_ext::PortableFileExt,
17+
FilePosition, ReadFrom,
1618
};
1719
#[cfg(test)]
1820
mod tests;
@@ -28,6 +30,12 @@ pub(super) struct RawLine {
2830
pub bytes: Bytes,
2931
}
3032

33+
#[derive(Debug)]
34+
pub struct RawLineResult {
35+
pub raw_line: Option<RawLine>,
36+
pub discarded_for_size_and_truncated: Vec<BytesMut>,
37+
}
38+
3139
/// The `FileWatcher` struct defines the polling based state machine which reads
3240
/// from a file path, transparently updating the underlying file descriptor when
3341
/// the file has been rolled over, as is common for logs.
@@ -207,7 +215,7 @@ impl FileWatcher {
207215
/// This function will attempt to read a new line from its file, blocking,
208216
/// up to some maximum but unspecified amount of time. `read_line` will open
209217
/// a new file handler as needed, transparently to the caller.
210-
pub(super) fn read_line(&mut self) -> io::Result<Option<RawLine>> {
218+
pub(super) fn read_line(&mut self) -> io::Result<RawLineResult> {
211219
self.track_read_attempt();
212220

213221
let reader = &mut self.reader;
@@ -220,14 +228,23 @@ impl FileWatcher {
220228
&mut self.buf,
221229
self.max_line_bytes,
222230
) {
223-
Ok(Some(_)) => {
231+
Ok(ReadResult {
232+
successfully_read: Some(_),
233+
discarded_for_size_and_truncated,
234+
}) => {
224235
self.track_read_success();
225-
Ok(Some(RawLine {
226-
offset: initial_position,
227-
bytes: self.buf.split().freeze(),
228-
}))
236+
Ok(RawLineResult {
237+
raw_line: Some(RawLine {
238+
offset: initial_position,
239+
bytes: self.buf.split().freeze(),
240+
}),
241+
discarded_for_size_and_truncated,
242+
})
229243
}
230-
Ok(None) => {
244+
Ok(ReadResult {
245+
successfully_read: None,
246+
discarded_for_size_and_truncated,
247+
}) => {
231248
if !self.file_findable() {
232249
self.set_dead();
233250
// File has been deleted, so return what we have in the buffer, even though it
@@ -237,16 +254,25 @@ impl FileWatcher {
237254
if buf.is_empty() {
238255
// EOF
239256
self.reached_eof = true;
240-
Ok(None)
257+
Ok(RawLineResult {
258+
raw_line: None,
259+
discarded_for_size_and_truncated,
260+
})
241261
} else {
242-
Ok(Some(RawLine {
243-
offset: initial_position,
244-
bytes: buf,
245-
}))
262+
Ok(RawLineResult {
263+
raw_line: Some(RawLine {
264+
offset: initial_position,
265+
bytes: buf,
266+
}),
267+
discarded_for_size_and_truncated,
268+
})
246269
}
247270
} else {
248271
self.reached_eof = true;
249-
Ok(None)
272+
Ok(RawLineResult {
273+
raw_line: None,
274+
discarded_for_size_and_truncated,
275+
})
250276
}
251277
}
252278
Err(e) => {

lib/file-source/src/file_watcher/tests/experiment.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use bytes::Bytes;
88
use quickcheck::{QuickCheck, TestResult};
99

1010
use crate::{
11-
file_watcher::{tests::*, FileWatcher},
11+
file_watcher::{tests::*, FileWatcher, RawLineResult},
1212
ReadFrom,
1313
};
1414

@@ -96,11 +96,14 @@ fn experiment(actions: Vec<FileWatcherAction>) {
9696
Err(_) => {
9797
unreachable!();
9898
}
99-
Ok(Some(line)) if line.bytes.is_empty() => {
99+
Ok(RawLineResult {
100+
raw_line: Some(line),
101+
..
102+
}) if line.bytes.is_empty() => {
100103
attempts -= 1;
101104
continue;
102105
}
103-
Ok(None) => {
106+
Ok(RawLineResult { raw_line: None, .. }) => {
104107
attempts -= 1;
105108
continue;
106109
}

lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use bytes::Bytes;
44
use quickcheck::{QuickCheck, TestResult};
55

66
use crate::{
7-
file_watcher::{tests::*, FileWatcher},
7+
file_watcher::{tests::*, FileWatcher, RawLineResult},
88
ReadFrom,
99
};
1010

@@ -63,17 +63,23 @@ fn experiment_no_truncations(actions: Vec<FileWatcherAction>) {
6363
Err(_) => {
6464
unreachable!();
6565
}
66-
Ok(Some(line)) if line.bytes.is_empty() => {
66+
Ok(RawLineResult {
67+
raw_line: Some(line),
68+
..
69+
}) if line.bytes.is_empty() => {
6770
attempts -= 1;
6871
assert!(fwfiles[read_index].read_line().is_none());
6972
continue;
7073
}
71-
Ok(None) => {
74+
Ok(RawLineResult { raw_line: None, .. }) => {
7275
attempts -= 1;
7376
assert!(fwfiles[read_index].read_line().is_none());
7477
continue;
7578
}
76-
Ok(Some(line)) => {
79+
Ok(RawLineResult {
80+
raw_line: Some(line),
81+
..
82+
}) => {
7783
let exp = fwfiles[read_index].read_line().expect("could not readline");
7884
assert_eq!(exp.into_bytes(), line.bytes);
7985
// assert_eq!(sz, buf.len() + 1);

lib/file-source/src/fingerprinter.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,7 @@ mod test {
392392
time::Duration,
393393
};
394394

395+
use bytes::BytesMut;
395396
use flate2::write::GzEncoder;
396397
use tempfile::{tempdir, TempDir};
397398

@@ -813,5 +814,7 @@ mod test {
813814
fn emit_files_open(&self, _: usize) {}
814815

815816
fn emit_path_globbing_failed(&self, _: &Path, _: &Error) {}
817+
818+
fn emit_file_line_too_long(&self, _: &BytesMut, _: usize, _: usize) {}
816819
}
817820
}

lib/file-source/src/internal_events.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use std::{io::Error, path::Path, time::Duration};
22

3+
use bytes::BytesMut;
4+
35
/// Every internal event in this crate has a corresponding
46
/// method in this trait which should emit the event.
57
pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static {
@@ -26,4 +28,11 @@ pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static {
2628
fn emit_files_open(&self, count: usize);
2729

2830
fn emit_path_globbing_failed(&self, path: &Path, error: &Error);
31+
32+
fn emit_file_line_too_long(
33+
&self,
34+
truncated_bytes: &BytesMut,
35+
configured_limit: usize,
36+
encountered_size_so_far: usize,
37+
);
2938
}

0 commit comments

Comments
 (0)