Skip to content

Commit 01c5278

Browse files
committed
Merge remote-tracking branch 'origin/master' into bugfix-22313-amqp-reconnect
2 parents b6180e5 + f31839b commit 01c5278

File tree

122 files changed

+754
-245
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

122 files changed

+754
-245
lines changed

.github/actions/spelling/allow.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ downsides
291291
downwardapi
292292
ede
293293
emoji
294+
enableable
294295
esbuild
295296
etld
296297
fakeintake

.github/actions/spelling/expect.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ efgh
175175
Elhage
176176
emerg
177177
Enableable
178+
enableable
178179
endianess
179180
endler
180181
eni

Cargo.lock

Lines changed: 11 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ rand_distr = { version = "0.5.1", default-features = false }
163163
semver = { version = "1.0.26", default-features = false, features = ["serde", "std"] }
164164
serde_json = { version = "1.0.140", default-features = false, features = ["raw_value", "std"] }
165165
serde = { version = "1.0.219", default-features = false, features = ["alloc", "derive", "rc"] }
166-
snafu = { version = "0.7.5", default-features = false, features = ["futures", "std"] }
166+
snafu = { version = "0.8.0", default-features = false, features = ["futures", "std"] }
167167
tempfile = "3.20.0"
168168
tokio = { version = "1.45.1", default-features = false, features = ["full"] }
169169
toml = { version = "0.8.23", default-features = false, features = ["display", "parse"] }
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
The `kubernetes_logs` source now includes a new `max_merged_line_bytes` configuration option. This setting enables users to cap the size of log lines after they’ve been combined using `auto_partial_merge`. Previously, the `max_line_bytes` field only restricted line sizes *before* merging, leaving no practical way to limit the length of merged lines—unless you set a size so tiny that it prevented merging altogether by stopping short of the continuation character. This new option gives you better control over merged line sizes.
2+
3+
authors: ganelo
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Added `rate_limit_num` and `rate_limit_duration_secs` options to `kafka` sink, to enable rate limiting this sink.
2+
3+
authors: esensar Quad9DNS
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Adds support for session tokens in AWS authentication options. When using temporary credentials (access key, secret key, and session token), the session token is required. Temporary credentials can be provided by an external system and updated using the `SECRET` backend.
2+
3+
authors: anil-db

lib/codecs/src/decoding/framing/chunked_gelf.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,6 @@ impl ChunkedGelfDecoder {
387387
warn!(
388388
message_id = message_id,
389389
timeout_secs = timeout.as_secs_f64(),
390-
internal_log_rate_limit = true,
391390
"Message was not fully received within the timeout window. Discarding it."
392391
);
393392
}
@@ -409,7 +408,6 @@ impl ChunkedGelfDecoder {
409408
debug!(
410409
message_id = message_id,
411410
sequence_number = sequence_number,
412-
internal_log_rate_limit = true,
413411
"Received a duplicate chunk. Ignoring it."
414412
);
415413
return Ok(None);

lib/dnstap-parser/src/internal_events.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ impl<E: std::fmt::Display> InternalEvent for DnstapParseWarning<E> {
1414
error = %self.error,
1515
stage = error_stage::PROCESSING,
1616
error_type = error_type::PARSER_FAILED,
17-
internal_log_rate_limit = true,
17+
1818
);
1919
}
2020
}

lib/file-source/src/buffer.rs

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
1-
use std::io::{self, BufRead};
1+
use std::{
2+
cmp::min,
3+
io::{self, BufRead},
4+
};
25

36
use bstr::Finder;
47
use bytes::BytesMut;
5-
use tracing::warn;
68

79
use crate::FilePosition;
810

11+
pub struct ReadResult {
12+
pub successfully_read: Option<usize>,
13+
pub discarded_for_size_and_truncated: Vec<BytesMut>,
14+
}
15+
916
/// Read up to `max_size` bytes from `reader`, splitting by `delim`
1017
///
1118
/// The function reads up to `max_size` bytes from `reader`, splitting the input
@@ -29,17 +36,18 @@ use crate::FilePosition;
2936
/// Benchmarks indicate that this function processes in the high single-digit
3037
/// GiB/s range for buffers of length 1KiB. For buffers any smaller than this
3138
/// the overhead of setup dominates our benchmarks.
32-
pub fn read_until_with_max_size<R: BufRead + ?Sized>(
33-
reader: &mut R,
34-
position: &mut FilePosition,
35-
delim: &[u8],
36-
buf: &mut BytesMut,
39+
pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>(
40+
reader: &'a mut R,
41+
position: &'a mut FilePosition,
42+
delim: &'a [u8],
43+
buf: &'a mut BytesMut,
3744
max_size: usize,
38-
) -> io::Result<Option<usize>> {
45+
) -> io::Result<ReadResult> {
3946
let mut total_read = 0;
4047
let mut discarding = false;
4148
let delim_finder = Finder::new(delim);
4249
let delim_len = delim.len();
50+
let mut discarded_for_size_and_truncated = Vec::new();
4351
loop {
4452
let available: &[u8] = match reader.fill_buf() {
4553
Ok(n) => n,
@@ -68,16 +76,20 @@ pub fn read_until_with_max_size<R: BufRead + ?Sized>(
6876
total_read += used;
6977

7078
if !discarding && buf.len() > max_size {
71-
warn!(
72-
message = "Found line that exceeds max_line_bytes; discarding.",
73-
internal_log_rate_limit = true
74-
);
79+
// keep only the first <1k bytes to make sure we can actually emit a usable error
80+
let length_to_keep = min(1000, max_size);
81+
let mut truncated: BytesMut = BytesMut::zeroed(length_to_keep);
82+
truncated.copy_from_slice(&buf[0..length_to_keep]);
83+
discarded_for_size_and_truncated.push(truncated);
7584
discarding = true;
7685
}
7786

7887
if done {
7988
if !discarding {
80-
return Ok(Some(total_read));
89+
return Ok(ReadResult {
90+
successfully_read: Some(total_read),
91+
discarded_for_size_and_truncated,
92+
});
8193
} else {
8294
discarding = false;
8395
buf.clear();
@@ -87,7 +99,10 @@ pub fn read_until_with_max_size<R: BufRead + ?Sized>(
8799
// us to observe an incomplete write. We return None here and let the loop continue
88100
// next time the method is called. This is safe because the buffer is specific to this
89101
// FileWatcher.
90-
return Ok(None);
102+
return Ok(ReadResult {
103+
successfully_read: None,
104+
discarded_for_size_and_truncated,
105+
});
91106
}
92107
}
93108
}
@@ -99,6 +114,8 @@ mod test {
99114
use bytes::{BufMut, BytesMut};
100115
use quickcheck::{QuickCheck, TestResult};
101116

117+
use crate::buffer::ReadResult;
118+
102119
use super::read_until_with_max_size;
103120

104121
fn qc_inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
@@ -181,7 +198,10 @@ mod test {
181198
)
182199
.unwrap()
183200
{
184-
None => {
201+
ReadResult {
202+
successfully_read: None,
203+
discarded_for_size_and_truncated: _,
204+
} => {
185205
// Subject only returns None if this is the last chunk _and_
186206
// the chunk did not contain a delimiter _or_ the delimiter
187207
// was outside the max_size range _or_ the current chunk is empty.
@@ -190,7 +210,10 @@ mod test {
190210
.any(|details| ((details.chunk_index == idx) && details.within_max_size));
191211
assert!(chunk.is_empty() || !has_valid_delimiter)
192212
}
193-
Some(total_read) => {
213+
ReadResult {
214+
successfully_read: Some(total_read),
215+
discarded_for_size_and_truncated: _,
216+
} => {
194217
// Now that the function has returned we confirm that the
195218
// returned details match our `first_delim` and also that
196219
// the `buffer` is populated correctly.

lib/file-source/src/file_server.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use tracing::{debug, error, info, trace};
1919

2020
use crate::{
2121
checkpointer::{Checkpointer, CheckpointsView},
22-
file_watcher::FileWatcher,
22+
file_watcher::{FileWatcher, RawLineResult},
2323
fingerprinter::{FileFingerprint, Fingerprinter},
2424
paths_provider::PathsProvider,
2525
FileSourceInternalEvents, ReadFrom,
@@ -263,7 +263,19 @@ where
263263

264264
let start = time::Instant::now();
265265
let mut bytes_read: usize = 0;
266-
while let Ok(Some(line)) = watcher.read_line() {
266+
while let Ok(RawLineResult {
267+
raw_line: Some(line),
268+
discarded_for_size_and_truncated,
269+
}) = watcher.read_line()
270+
{
271+
discarded_for_size_and_truncated.iter().for_each(|buf| {
272+
self.emitter.emit_file_line_too_long(
273+
&buf.clone(),
274+
self.max_line_bytes,
275+
buf.len(),
276+
)
277+
});
278+
267279
let sz = line.bytes.len();
268280
trace!(
269281
message = "Read bytes.",

0 commit comments

Comments
 (0)