Skip to content

Commit ce0e917

Browse files
esensarpront
authored andcommitted
feat(kafka sink): add rate limit configuration for kafka sink (vectordotdev#23196)
* feature(kafka sink): add rate limit configuration for `kafka` sink Related: vectordotdev#21879 * Add changelog entry --------- Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com>
1 parent ddbfbfb commit ce0e917

File tree

5 files changed

+59
-2
lines changed

5 files changed

+59
-2
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Added `rate_limit_num` and `rate_limit_duration_secs` options to `kafka` sink, to enable rate limiting this sink.
2+
3+
authors: esensar Quad9DNS

src/sinks/kafka/config.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,18 @@ pub struct KafkaSinkConfig {
9696
#[configurable(metadata(docs::advanced))]
9797
pub message_timeout_ms: Duration,
9898

99+
/// The time window used for the `rate_limit_num` option.
100+
#[configurable(metadata(docs::type_unit = "seconds"))]
101+
#[configurable(metadata(docs::human_name = "Rate Limit Duration"))]
102+
#[serde(default = "default_rate_limit_duration_secs")]
103+
pub rate_limit_duration_secs: u64,
104+
105+
/// The maximum number of requests allowed within the `rate_limit_duration_secs` time window.
106+
#[configurable(metadata(docs::type_unit = "requests"))]
107+
#[configurable(metadata(docs::human_name = "Rate Limit Number"))]
108+
#[serde(default = "default_rate_limit_num")]
109+
pub rate_limit_num: u64,
110+
99111
/// A map of advanced options to pass directly to the underlying `librdkafka` client.
100112
///
101113
/// For more information on configuration options, see [Configuration properties][config_props_docs].
@@ -134,6 +146,14 @@ const fn default_message_timeout_ms() -> Duration {
134146
Duration::from_millis(300000) // default in librdkafka
135147
}
136148

149+
const fn default_rate_limit_duration_secs() -> u64 {
150+
1
151+
}
152+
153+
const fn default_rate_limit_num() -> u64 {
154+
i64::MAX as u64 // i64 avoids TOML deserialize issue
155+
}
156+
137157
fn example_librdkafka_options() -> HashMap<String, String> {
138158
HashMap::<_, _>::from_iter([
139159
("client.id".to_string(), "${ENV_VAR}".to_string()),
@@ -245,6 +265,8 @@ impl GenerateConfig for KafkaSinkConfig {
245265
auth: Default::default(),
246266
socket_timeout_ms: default_socket_timeout_ms(),
247267
message_timeout_ms: default_message_timeout_ms(),
268+
rate_limit_duration_secs: default_rate_limit_duration_secs(),
269+
rate_limit_num: default_rate_limit_num(),
248270
librdkafka_options: Default::default(),
249271
headers_key: None,
250272
acknowledgements: Default::default(),

src/sinks/kafka/sink.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
use std::time::Duration;
2+
13
use rdkafka::{
24
error::KafkaError,
35
producer::{BaseProducer, FutureProducer, Producer},
46
ClientConfig,
57
};
68
use snafu::{ResultExt, Snafu};
9+
use tower::limit::RateLimit;
710
use tracing::Span;
811
use vrl::path::OwnedTargetPath;
912

@@ -27,7 +30,7 @@ pub(super) enum BuildError {
2730
pub struct KafkaSink {
2831
transformer: Transformer,
2932
encoder: Encoder<()>,
30-
service: KafkaService,
33+
service: RateLimit<KafkaService>,
3134
topic: Template,
3235
key_field: Option<OwnedTargetPath>,
3336
headers_key: Option<OwnedTargetPath>,
@@ -57,7 +60,12 @@ impl KafkaSink {
5760
headers_key: config.headers_key.map(|key| key.0),
5861
transformer,
5962
encoder,
60-
service: KafkaService::new(producer),
63+
service: ServiceBuilder::new()
64+
.rate_limit(
65+
config.rate_limit_num,
66+
Duration::from_secs(config.rate_limit_duration_secs),
67+
)
68+
.service(KafkaService::new(producer)),
6169
topic: config.topic,
6270
key_field: config.key_field.map(|key| key.0),
6371
})

src/sinks/kafka/tests.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ mod integration_test {
5959
auth: KafkaAuthConfig::default(),
6060
socket_timeout_ms: Duration::from_millis(60000),
6161
message_timeout_ms: Duration::from_millis(300000),
62+
rate_limit_duration_secs: 1,
63+
rate_limit_num: i64::MAX as u64,
6264
librdkafka_options: HashMap::new(),
6365
headers_key: None,
6466
acknowledgements: Default::default(),
@@ -85,6 +87,8 @@ mod integration_test {
8587
auth: KafkaAuthConfig::default(),
8688
socket_timeout_ms: Duration::from_millis(60000),
8789
message_timeout_ms: Duration::from_millis(300000),
90+
rate_limit_duration_secs: 1,
91+
rate_limit_num: i64::MAX as u64,
8892
librdkafka_options: HashMap::new(),
8993
headers_key: None,
9094
acknowledgements: Default::default(),
@@ -185,6 +189,8 @@ mod integration_test {
185189
},
186190
socket_timeout_ms: Duration::from_millis(60000),
187191
message_timeout_ms: Duration::from_millis(300000),
192+
rate_limit_duration_secs: 1,
193+
rate_limit_num: i64::MAX as u64,
188194
batch,
189195
librdkafka_options,
190196
headers_key: None,
@@ -334,6 +340,8 @@ mod integration_test {
334340
auth: kafka_auth.clone(),
335341
socket_timeout_ms: Duration::from_millis(60000),
336342
message_timeout_ms: Duration::from_millis(300000),
343+
rate_limit_duration_secs: 1,
344+
rate_limit_num: i64::MAX as u64,
337345
librdkafka_options: HashMap::new(),
338346
headers_key: Some(headers_key.clone()),
339347
acknowledgements: Default::default(),

website/cue/reference/components/sinks/base/kafka.cue

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,22 @@ base: components: sinks: kafka: configuration: {
489489
unit: "milliseconds"
490490
}
491491
}
492+
rate_limit_duration_secs: {
493+
description: "The time window used for the `rate_limit_num` option."
494+
required: false
495+
type: uint: {
496+
default: 1
497+
unit: "seconds"
498+
}
499+
}
500+
rate_limit_num: {
501+
description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window."
502+
required: false
503+
type: uint: {
504+
default: 9223372036854775807
505+
unit: "requests"
506+
}
507+
}
492508
sasl: {
493509
description: "Configuration for SASL authentication when interacting with Kafka."
494510
required: false

0 commit comments

Comments
 (0)