From 96c425d8c13db326f2cfda29f1895d8e191a5fe6 Mon Sep 17 00:00:00 2001 From: Haikuo Liu Date: Mon, 9 Jun 2025 15:46:24 +0800 Subject: [PATCH 1/3] Loki Sink supports setting max live time of persistent connection --- src/sinks/loki/config.rs | 3 +++ src/sinks/loki/service.rs | 29 ++++++++++++++++++++++++++--- src/sinks/loki/sink.rs | 1 + 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/sinks/loki/config.rs b/src/sinks/loki/config.rs index 06f827b353bab..e63422ca93637 100644 --- a/src/sinks/loki/config.rs +++ b/src/sinks/loki/config.rs @@ -112,6 +112,9 @@ pub struct LokiConfig { #[configurable(derived)] pub tls: Option, + #[configurable(derived)] + pub keep_alive_requests: Option, + #[configurable(derived)] #[serde( default, diff --git a/src/sinks/loki/service.rs b/src/sinks/loki/service.rs index 2a4c33280732f..e7cca7efafbd4 100644 --- a/src/sinks/loki/service.rs +++ b/src/sinks/loki/service.rs @@ -4,6 +4,7 @@ use bytes::Bytes; use http::StatusCode; use snafu::Snafu; use tracing::Instrument; +use std::sync::atomic::{AtomicI64, Ordering}; use crate::{ http::{Auth, HttpClient}, @@ -82,10 +83,12 @@ impl MetaDescriptive for LokiRequest { } } -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct LokiService { endpoint: UriSerde, client: HttpClient, + keep_alive_requests: i64, + request_count: AtomicI64, } impl LokiService { @@ -94,13 +97,25 @@ impl LokiService { endpoint: UriSerde, path: String, auth: Option, + keep_alive_requests: Option, ) -> crate::Result { let endpoint = endpoint.append_path(&path)?.with_auth(auth); - - Ok(Self { client, endpoint }) + let request_count = AtomicI64::new(0); + Ok(Self { client, endpoint, keep_alive_requests: keep_alive_requests.unwrap_or(0), request_count }) } } +impl Clone for LokiService { + fn clone(&self) -> Self { + Self { + endpoint: self.endpoint.clone(), + client: self.client.clone(), + keep_alive_requests: self.keep_alive_requests.clone(), + request_count: AtomicI64::new(self.request_count.load(Ordering::SeqCst)), + } + } + } + impl Service for LokiService { type Response = LokiResponse; type Error = LokiError; @@ -127,6 +142,14 @@ impl Service for LokiService { req = req.header("Content-Encoding", ce); } + if self.keep_alive_requests > 0 { + let request_count = self.request_count.fetch_add(1, Ordering::Relaxed); + if request_count >= self.keep_alive_requests { + self.request_count.store(0, Ordering::Relaxed); + req = req.header("Connection", "close"); + } + } + let body = hyper::Body::from(request.payload); let mut req = req.body(body).unwrap(); diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index 37be3a9ee7478..1892bba55ab57 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -468,6 +468,7 @@ impl LokiSink { config.endpoint, config.path, config.auth, + config.keep_alive_requests, )?); let transformer = config.encoding.transformer(); From 0684a54dc572dffbab53339060c01354edda8bac Mon Sep 17 00:00:00 2001 From: Haikuo Liu Date: Tue, 10 Jun 2025 09:25:15 +0800 Subject: [PATCH 2/3] add changelog --- ...pports_setting_requests_of_persistent_connection.feature.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/23168_loki_sink_supports_setting_requests_of_persistent_connection.feature.md diff --git a/changelog.d/23168_loki_sink_supports_setting_requests_of_persistent_connection.feature.md b/changelog.d/23168_loki_sink_supports_setting_requests_of_persistent_connection.feature.md new file mode 100644 index 0000000000000..1095741b84b58 --- /dev/null +++ b/changelog.d/23168_loki_sink_supports_setting_requests_of_persistent_connection.feature.md @@ -0,0 +1,3 @@ +Add keep_alive_requests option for Loki sink to support release persistent connection. This can help load balancing of downstream Loki cluster scaling. + +authors: tiantangkuohai \ No newline at end of file From 66950a15d6157bf047b3b271ff9cca2777cef22a Mon Sep 17 00:00:00 2001 From: Haikuo Liu Date: Tue, 10 Jun 2025 09:38:46 +0800 Subject: [PATCH 3/3] add changelog --- ...upports_setting_requests_of_persistent_connection.feature.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/23168_loki_sink_supports_setting_requests_of_persistent_connection.feature.md b/changelog.d/23168_loki_sink_supports_setting_requests_of_persistent_connection.feature.md index 1095741b84b58..f827db6c9bbb3 100644 --- a/changelog.d/23168_loki_sink_supports_setting_requests_of_persistent_connection.feature.md +++ b/changelog.d/23168_loki_sink_supports_setting_requests_of_persistent_connection.feature.md @@ -1,3 +1,3 @@ -Add keep_alive_requests option for Loki sink to support release persistent connection. This can help load balancing of downstream Loki cluster scaling. +Introduce the `keep_alive_requests` option in the Loki sink to enable releasing persistent connections, thereby enhancing load balancing during downstream Loki cluster scaling. authors: tiantangkuohai \ No newline at end of file