Skip to content

feat(sinks): Loki Sink supports setting requests of persistent connection #23168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Introduce the `keep_alive_requests` option in the Loki sink to enable releasing persistent connections, thereby enhancing load balancing during downstream Loki cluster scaling.

authors: tiantangkuohai
3 changes: 3 additions & 0 deletions src/sinks/loki/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ pub struct LokiConfig {
#[configurable(derived)]
pub tls: Option<TlsConfig>,

#[configurable(derived)]
pub keep_alive_requests: Option<i64>,

#[configurable(derived)]
#[serde(
default,
Expand Down
29 changes: 26 additions & 3 deletions src/sinks/loki/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use bytes::Bytes;
use http::StatusCode;
use snafu::Snafu;
use tracing::Instrument;
use std::sync::atomic::{AtomicI64, Ordering};

use crate::{
http::{Auth, HttpClient},
Expand Down Expand Up @@ -82,10 +83,12 @@ impl MetaDescriptive for LokiRequest {
}
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct LokiService {
endpoint: UriSerde,
client: HttpClient,
keep_alive_requests: i64,
request_count: AtomicI64,
}

impl LokiService {
Expand All @@ -94,13 +97,25 @@ impl LokiService {
endpoint: UriSerde,
path: String,
auth: Option<Auth>,
keep_alive_requests: Option<i64>,
) -> crate::Result<Self> {
let endpoint = endpoint.append_path(&path)?.with_auth(auth);

Ok(Self { client, endpoint })
let request_count = AtomicI64::new(0);
Ok(Self { client, endpoint, keep_alive_requests: keep_alive_requests.unwrap_or(0), request_count })
}
}

impl Clone for LokiService {
fn clone(&self) -> Self {
Self {
endpoint: self.endpoint.clone(),
client: self.client.clone(),
keep_alive_requests: self.keep_alive_requests.clone(),
request_count: AtomicI64::new(self.request_count.load(Ordering::SeqCst)),
}
}
}

impl Service<LokiRequest> for LokiService {
type Response = LokiResponse;
type Error = LokiError;
Expand All @@ -127,6 +142,14 @@ impl Service<LokiRequest> for LokiService {
req = req.header("Content-Encoding", ce);
}

if self.keep_alive_requests > 0 {
let request_count = self.request_count.fetch_add(1, Ordering::Relaxed);
if request_count >= self.keep_alive_requests {
self.request_count.store(0, Ordering::Relaxed);
req = req.header("Connection", "close");
}
}

let body = hyper::Body::from(request.payload);
let mut req = req.body(body).unwrap();

Expand Down
1 change: 1 addition & 0 deletions src/sinks/loki/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ impl LokiSink {
config.endpoint,
config.path,
config.auth,
config.keep_alive_requests,
)?);

let transformer = config.encoding.transformer();
Expand Down
Loading