diff --git a/changelog.d/19417_splunk_hec_sink_accept_cookies_acks.enhancement.md b/changelog.d/19417_splunk_hec_sink_accept_cookies_acks.enhancement.md new file mode 100644 index 0000000000000..260f502fb8caf --- /dev/null +++ b/changelog.d/19417_splunk_hec_sink_accept_cookies_acks.enhancement.md @@ -0,0 +1,3 @@ +Added support for a returned cookie from a Splunk HEC response to be used when querying for acknowledgement statuses. + +authors: jvperrin diff --git a/src/sinks/splunk_hec/common/acknowledgements.rs b/src/sinks/splunk_hec/common/acknowledgements.rs index dfd5ea23b5264..a0c8c7d47fe2d 100644 --- a/src/sinks/splunk_hec/common/acknowledgements.rs +++ b/src/sinks/splunk_hec/common/acknowledgements.rs @@ -44,6 +44,15 @@ pub struct HecClientAcknowledgementsConfig { /// Once reached, the sink begins applying backpressure. pub max_pending_acks: NonZeroU64, + /// Specifies the name of a cookie to extract from the Splunk HEC response and use when querying for acknowledgements. + /// + /// This is useful when using a load balancer in front of multiple Splunk indexers in a cluster because the + /// request to check for acknowledgements needs to go to the same indexer that originally received the data, + /// and the cookie can help with that routing. + /// + /// If empty, no cookie will be extracted. + pub cookie_name: String, + #[serde( default, deserialize_with = "crate::serde::bool_or_struct", @@ -60,6 +69,7 @@ impl Default for HecClientAcknowledgementsConfig { query_interval: NonZeroU8::new(10).unwrap(), retry_limit: NonZeroU8::new(30).unwrap(), max_pending_acks: NonZeroU64::new(1_000_000).unwrap(), + cookie_name: String::new(), inner: Default::default(), } } @@ -84,7 +94,8 @@ pub enum HecAckApiError { } struct HecAckClient { - acks: HashMap)>, + // Maps (ack_id, cookie_string) to (retry_count, status_sender) + acks: HashMap<(u64, String), (u8, Sender)>, retry_limit: u8, client: HttpClient, http_request_builder: Arc, @@ -104,69 +115,91 @@ impl HecAckClient { } } - /// Adds an ack id to be queried - fn add(&mut self, ack_id: u64, ack_event_status_sender: Sender) { - self.acks - .insert(ack_id, (self.retry_limit, ack_event_status_sender)); + /// Adds an ack id to be queried with a cookie string + fn add(&mut self, ack_id: u64, cookie: String, ack_event_status_sender: Sender) { + self.acks.insert( + (ack_id, cookie), + (self.retry_limit, ack_event_status_sender), + ); emit!(SplunkIndexerAcknowledgementAckAdded); } /// Queries Splunk HEC with stored ack ids and finalizes events that are successfully acked async fn run(&mut self) { - let ack_query_body = self.get_ack_query_body(); - if !ack_query_body.acks.is_empty() { - let ack_query_response = self.send_ack_query_request(&ack_query_body).await; - - match ack_query_response { - Ok(ack_query_response) => { - debug!(message = "Received ack statuses.", ?ack_query_response); - let acked_ack_ids = ack_query_response - .acks - .iter() - .filter(|&(_ack_id, ack_status)| *ack_status) - .map(|(ack_id, _ack_status)| *ack_id) - .collect::>(); - self.finalize_delivered_ack_ids(acked_ack_ids.as_slice()); - self.expire_ack_ids_with_status(EventStatus::Rejected); - } - Err(error) => { - match error { - HecAckApiError::ClientParseResponse | HecAckApiError::ClientSendQuery => { - // If we are permanently unable to interact with - // Splunk HEC indexer acknowledgements (e.g. due to - // request/response format changes in future - // versions), log an error and fall back to default - // behavior. - emit!(SplunkIndexerAcknowledgementAPIError { - message: "Unable to use indexer acknowledgements. Acknowledging based on initial 200 OK.", - error, - }); - self.finalize_delivered_ack_ids( - self.acks.keys().copied().collect::>().as_slice(), - ); - } - _ => { - emit!(SplunkIndexerAcknowledgementAPIError { - message: - "Unable to send acknowledgement query request. Will retry.", - error, - }); - self.expire_ack_ids_with_status(EventStatus::Errored); + // Group ack IDs by cookie string (cookie may be an empty string for single-indexer clusters) + let ack_groups = self.get_ack_groups_by_cookie(); + let mut error_sending_ack = false; + + // Decrement retries once every loop through all acks + self.decrement_retries(); + + for (cookie, ack_ids) in ack_groups { + if !ack_ids.is_empty() { + let ack_query_body = HecAckStatusRequest { + acks: ack_ids.clone(), + }; + let ack_query_response = + self.send_ack_query_request(&ack_query_body, &cookie).await; + + match ack_query_response { + Ok(ack_query_response) => { + debug!( + message = "Received ack statuses for cookie.", + ?ack_query_response, + ?cookie, + ); + let acked_ack_ids = ack_query_response + .acks + .iter() + .filter(|&(_ack_id, ack_status)| *ack_status) + .map(|(ack_id, _ack_status)| *ack_id) + .collect::>(); + self.finalize_delivered_ack_ids(cookie, acked_ack_ids.as_slice()); + } + Err(error) => { + match error { + HecAckApiError::ClientParseResponse + | HecAckApiError::ClientSendQuery => { + // If we are permanently unable to interact with + // Splunk HEC indexer acknowledgements (e.g. due to + // request/response format changes in future + // versions), log an error and fall back to default + // behavior. + emit!(SplunkIndexerAcknowledgementAPIError { + message: "Unable to use indexer acknowledgements. Acknowledging based on initial 200 OK.", + error, + }); + self.finalize_delivered_ack_ids(cookie, &ack_ids); + } + _ => { + emit!(SplunkIndexerAcknowledgementAPIError { + message: + "Unable to send acknowledgement query request. Will retry.", + error, + }); + error_sending_ack = true; + } } } - } - }; + }; + } } + + if error_sending_ack { + self.expire_ack_ids_with_status(EventStatus::Errored); + } + self.expire_ack_ids_with_status(EventStatus::Rejected); } /// Removes successfully acked ack ids and finalizes associated events - fn finalize_delivered_ack_ids(&mut self, ack_ids: &[u64]) { + fn finalize_delivered_ack_ids(&mut self, cookie: String, ack_ids: &[u64]) { let mut removed_count = 0.0; for ack_id in ack_ids { - if let Some((_, ack_event_status_sender)) = self.acks.remove(ack_id) { + if let Some((_, ack_event_status_sender)) = self.acks.remove(&(*ack_id, cookie.clone())) + { _ = ack_event_status_sender.send(EventStatus::Delivered); removed_count += 1.0; - debug!(message = "Finalized ack id.", ?ack_id); + debug!(message = "Finalized ack id.", ?ack_id, ?cookie); } } emit!(SplunkIndexerAcknowledgementAcksRemoved { @@ -174,11 +207,15 @@ impl HecAckClient { }); } - /// Builds an ack query body with stored ack ids - fn get_ack_query_body(&mut self) -> HecAckStatusRequest { - HecAckStatusRequest { - acks: self.acks.keys().copied().collect::>(), + /// Groups ack IDs by cookie string + fn get_ack_groups_by_cookie(&self) -> HashMap> { + let mut groups: HashMap> = HashMap::new(); + + for (ack_id, cookie) in self.acks.keys() { + groups.entry(cookie.clone()).or_default().push(*ack_id); } + + groups } /// Decrements retry count on all stored ack ids by 1 @@ -191,14 +228,17 @@ impl HecAckClient { /// Removes all expired ack ids (those with a retry count of 0) and /// finalizes associated events with the given status fn expire_ack_ids_with_status(&mut self, status: EventStatus) { - let expired_ack_ids = self + let expired_ack_keys = self .acks .iter() - .filter_map(|(ack_id, (retries, _))| (*retries == 0).then_some(*ack_id)) + .filter_map(|((ack_id, cookie), (retries, _))| { + (*retries == 0).then_some((*ack_id, cookie.clone())) + }) .collect::>(); let mut removed_count = 0.0; - for ack_id in expired_ack_ids { - if let Some((_, ack_event_status_sender)) = self.acks.remove(&ack_id) { + for key in expired_ack_keys { + debug!(message = "Expired ack with status.", ?key, ?status); + if let Some((_, ack_event_status_sender)) = self.acks.remove(&key) { _ = ack_event_status_sender.send(status); removed_count += 1.0; } @@ -208,16 +248,17 @@ impl HecAckClient { }); } - // Sends an ack status query request to Splunk HEC + // Sends an ack status query request to Splunk HEC with the specified cookie async fn send_ack_query_request( &mut self, request_body: &HecAckStatusRequest, + cookie: &str, ) -> Result { - self.decrement_retries(); let request_body_bytes = crate::serde::json::to_bytes(request_body) .map_err(|_| HecAckApiError::ClientBuildRequest)? .freeze(); - let request = self + + let mut request = self .http_request_builder .build_request( request_body_bytes, @@ -228,6 +269,15 @@ impl HecAckClient { ) .map_err(|_| HecAckApiError::ClientBuildRequest)?; + // Add the cookie header if it's not empty + if !cookie.is_empty() { + request.headers_mut().insert( + http::header::COOKIE, + http::header::HeaderValue::from_str(cookie) + .map_err(|_| HecAckApiError::ClientBuildRequest)?, + ); + } + let response = self .client .send(request.map(Body::from)) @@ -250,7 +300,7 @@ impl HecAckClient { } pub async fn run_acknowledgements( - mut receiver: Receiver<(u64, Sender)>, + mut receiver: Receiver<(u64, String, Sender)>, client: HttpClient, http_request_builder: Arc, indexer_acknowledgements: HecClientAcknowledgementsConfig, @@ -271,9 +321,10 @@ pub async fn run_acknowledgements( }, ack_info = receiver.recv() => { match ack_info { - Some((ack_id, tx)) => { - ack_client.add(ack_id, tx); - debug!(message = "Stored ack id.", ?ack_id); + Some((ack_id, cookie, tx)) => { + let cookie_str = cookie.clone(); + ack_client.add(ack_id, cookie_str, tx); + debug!(message = "Stored ack id with cookie.", ?ack_id, ?cookie); }, None => break, } @@ -294,9 +345,7 @@ mod tests { use crate::{ http::HttpClient, sinks::{ - splunk_hec::common::{ - acknowledgements::HecAckStatusRequest, service::HttpRequestBuilder, EndpointTarget, - }, + splunk_hec::common::{service::HttpRequestBuilder, EndpointTarget}, util::Compression, }, }; @@ -319,22 +368,53 @@ mod tests { let mut ack_status_rxs = Vec::new(); for ack_id in ack_ids { let (tx, rx) = oneshot::channel(); - ack_client.add(*ack_id, tx); + ack_client.add(*ack_id, String::new(), tx); ack_status_rxs.push(rx); } ack_status_rxs } #[test] - fn test_get_ack_query_body() { + fn test_get_ack_groups_by_cookie_multiple_groups() { let mut ack_client = get_ack_client(1); - let ack_ids = (0..100).collect::>(); - _ = populate_ack_client(&mut ack_client, &ack_ids); - let expected_ack_body = HecAckStatusRequest { acks: ack_ids }; - let mut ack_request_body = ack_client.get_ack_query_body(); - ack_request_body.acks.sort_unstable(); - assert_eq!(expected_ack_body, ack_request_body); + let (tx1, _) = oneshot::channel(); + ack_client.add(1, "cookie1".to_string(), tx1); + + let (tx2, _) = oneshot::channel(); + ack_client.add(2, "cookie1".to_string(), tx2); + + let (tx3, _) = oneshot::channel(); + ack_client.add(3, "cookie2".to_string(), tx3); + + let groups = ack_client.get_ack_groups_by_cookie(); + + assert_eq!(groups.len(), 2); + assert!(groups.contains_key("cookie1")); + assert!(groups.contains_key("cookie2")); + + let mut cookie1_acks = groups.get("cookie1").unwrap().clone(); + cookie1_acks.sort_unstable(); + assert_eq!(cookie1_acks, vec![1, 2]); + + assert_eq!(groups.get("cookie2").unwrap(), &vec![3]); + } + + #[test] + fn test_get_ack_groups_by_cookie_single_group() { + let mut ack_client = get_ack_client(1); + let expected_ack_ids = (0..100).collect::>(); + _ = populate_ack_client(&mut ack_client, &expected_ack_ids); + + let groups = ack_client.get_ack_groups_by_cookie(); + assert_eq!(groups.len(), 1); + let mut ack_ids = Vec::new(); + for (_, ids) in groups { + ack_ids.extend(ids); + } + ack_ids.sort_unstable(); + + assert_eq!(expected_ack_ids, ack_ids); } #[test] @@ -343,14 +423,23 @@ mod tests { let ack_ids = (0..100).collect::>(); _ = populate_ack_client(&mut ack_client, &ack_ids); - let mut ack_request_body = ack_client.get_ack_query_body(); - ack_request_body.acks.sort_unstable(); - assert_eq!(ack_ids, ack_request_body.acks); + let groups = ack_client.get_ack_groups_by_cookie(); + let mut initial_ack_ids = Vec::new(); + for (_, ids) in groups { + initial_ack_ids.extend(ids); + } + initial_ack_ids.sort_unstable(); + assert_eq!(ack_ids, initial_ack_ids); + ack_client.decrement_retries(); ack_client.expire_ack_ids_with_status(EventStatus::Rejected); - let ack_request_body = ack_client.get_ack_query_body(); - assert!(ack_request_body.acks.is_empty()) + let groups = ack_client.get_ack_groups_by_cookie(); + let mut final_ack_ids = Vec::new(); + for (_, ids) in groups { + final_ack_ids.extend(ids); + } + assert!(final_ack_ids.is_empty()) } #[tokio::test] @@ -359,7 +448,7 @@ mod tests { let ack_ids = (0..100).collect::>(); let ack_status_rxs = populate_ack_client(&mut ack_client, &ack_ids); - ack_client.finalize_delivered_ack_ids(ack_ids.as_slice()); + ack_client.finalize_delivered_ack_ids(String::new(), ack_ids.as_slice()); let mut statuses = ack_status_rxs.into_iter().collect::>(); while let Some(status) = statuses.next().await { assert_eq!(EventStatus::Delivered, status.unwrap()); @@ -379,4 +468,79 @@ mod tests { assert_eq!(EventStatus::Rejected, status.unwrap()); } } + + #[tokio::test] + async fn test_run_with_cookies() { + use super::{HecAckStatusRequest, HecAckStatusResponse}; + use std::collections::HashMap; + use wiremock::{ + matchers::{header, method, path}, + Mock, MockServer, Request, ResponseTemplate, + }; + + let mock_server = MockServer::start().await; + + Mock::given(method("POST")) + .and(path("/services/collector/ack")) + .and(header("Cookie", "cookie1")) + .respond_with(|req: &Request| { + let req_body = + serde_json::from_slice::(req.body.as_slice()).unwrap(); + ResponseTemplate::new(200).set_body_json(HecAckStatusResponse { + acks: req_body + .acks + .into_iter() + .map(|ack_id| (ack_id, true)) + .collect::>(), + }) + }) + .mount(&mock_server) + .await; + + Mock::given(method("POST")) + .and(path("/services/collector/ack")) + .and(header("Cookie", "cookie2")) + .respond_with(|req: &Request| { + let req_body = + serde_json::from_slice::(req.body.as_slice()).unwrap(); + + // Set this to false for cookie2 to simulate the indexer not processing this ack yet + ResponseTemplate::new(200).set_body_json(HecAckStatusResponse { + acks: req_body + .acks + .into_iter() + .map(|ack_id| (ack_id, false)) + .collect::>(), + }) + }) + .mount(&mock_server) + .await; + + let client = HttpClient::new(None, &ProxyConfig::default()).unwrap(); + let http_request_builder = HttpRequestBuilder::new( + mock_server.uri(), + EndpointTarget::default(), + String::from("token"), + Compression::default(), + ); + let mut ack_client = HecAckClient::new(1, client, Arc::new(http_request_builder)); + + let (tx1, rx1) = oneshot::channel(); + ack_client.add(1, "cookie1".to_string(), tx1); + let (tx2, rx2) = oneshot::channel(); + ack_client.add(2, "cookie1".to_string(), tx2); + let (tx3, rx3) = oneshot::channel(); + ack_client.add(3, "cookie2".to_string(), tx3); + + ack_client.run().await; + + // Verify that all acks with cookie1 were marked as delivered, but cookie2 was not + let cookie1_status1 = rx1.await.unwrap(); + let cookie1_status2 = rx2.await.unwrap(); + let cookie2_status = rx3.await.unwrap(); + + assert_eq!(EventStatus::Delivered, cookie1_status1); + assert_eq!(EventStatus::Delivered, cookie1_status2); + assert_eq!(EventStatus::Rejected, cookie2_status); + } } diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index 592ec7e0293f4..2fa88c0e61d74 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -32,9 +32,10 @@ use crate::{ pub struct HecService { pub inner: S, - ack_finalizer_tx: Option)>>, + ack_finalizer_tx: Option)>>, ack_slots: PollSemaphore, current_ack_slot: Option, + indexer_acknowledgements: HecClientAcknowledgementsConfig, } #[derive(Deserialize, Serialize, Debug)] @@ -63,7 +64,7 @@ where rx, ack_client, Arc::clone(&http_request_builder), - indexer_acknowledgements, + indexer_acknowledgements.clone(), )); Some(tx) } else { @@ -76,6 +77,7 @@ where ack_finalizer_tx: tx, ack_slots, current_ack_slot: None, + indexer_acknowledgements, } } } @@ -112,6 +114,7 @@ where fn call(&mut self, mut req: HecRequest) -> Self::Future { let ack_finalizer_tx = self.ack_finalizer_tx.clone(); let ack_slot = self.current_ack_slot.take(); + let cookie_name = self.indexer_acknowledgements.cookie_name.clone(); let metadata = std::mem::take(req.metadata_mut()); let events_count = metadata.event_count(); @@ -128,7 +131,44 @@ where Ok(body) => { if let Some(ack_id) = body.ack_id { let (tx, rx) = oneshot::channel(); - match ack_finalizer_tx.send((ack_id, tx)).await { + + // Extract a single cookie from response headers if available + let cookie_string = if let Some(headers) = response.headers() { + if cookie_name.is_empty() { + String::new() + } else { + headers + .get_all(http::header::SET_COOKIE) + .iter() + .filter_map(|v| v.to_str().ok()) + .find_map(|cookie_header| { + if cookie_header + .starts_with(&format!("{}=", cookie_name)) + { + // Extract just the name + value part (before any semicolon) + // We don't want any attributes like `Path` or `Expires` + // that may be present in the header alongside the value + let value = cookie_header + .split(';') + .next() + .unwrap_or(cookie_header); + Some(value.trim().to_string()) + } else { + None + } + }) + .unwrap_or_default() + } + } else { + // For other response types, use an empty cookie string + // This effectively groups together all the acks into a single bucket, + // which is the same as the previous behavior without any cookies + String::new() + }; + + debug!(message = "Cookie string.", cookie_string = %cookie_string); + + match ack_finalizer_tx.send((ack_id, cookie_string, tx)).await { Ok(_) => rx.await.unwrap_or(EventStatus::Rejected), // If we cannot send ack ids to the ack client, fall back to default behavior Err(error) => { @@ -170,12 +210,17 @@ where pub trait ResponseExt { fn body(&self) -> &Bytes; + fn headers(&self) -> Option<&http::HeaderMap>; } impl ResponseExt for http::Response { fn body(&self) -> &Bytes { self.body() } + + fn headers(&self) -> Option<&http::HeaderMap> { + Some(self.headers()) + } } pub struct HttpRequestBuilder { @@ -378,7 +423,9 @@ mod tests { .respond_with(move |_: &Request| { let ack_id = acknowledgements_enabled.then(|| ACK_ID.fetch_add(1, Ordering::Relaxed)); - ResponseTemplate::new(200).set_body_json(HecAckResponseBody { ack_id }) + ResponseTemplate::new(200) + .set_body_json(HecAckResponseBody { ack_id }) + .insert_header("Set-Cookie", "splunkd_cookie=test-cookie-value") }) .mount(&mock_server) .await; @@ -600,4 +647,24 @@ mod tests { Poll::Ready(Ok(_)) )); } + + #[tokio::test] + async fn acknowledgements_with_cookie_tracking_successful() { + let mock_server = get_hec_mock_server(true, ack_response_always_succeed).await; + + let acknowledgements_config = HecClientAcknowledgementsConfig { + query_interval: NonZeroU8::new(1).unwrap(), + cookie_name: "splunkd_cookie".to_string(), + ..Default::default() + }; + let mut service = get_hec_service(mock_server.uri(), acknowledgements_config); + + let mut responses = FuturesUnordered::new(); + responses.push(service.ready().await.unwrap().call(get_hec_request())); + responses.push(service.ready().await.unwrap().call(get_hec_request())); + responses.push(service.ready().await.unwrap().call(get_hec_request())); + while let Some(response) = responses.next().await { + assert_eq!(EventStatus::Delivered, response.unwrap().event_status) + } + } } diff --git a/website/cue/reference/components/sinks/base/splunk_hec_logs.cue b/website/cue/reference/components/sinks/base/splunk_hec_logs.cue index 0c8187c067592..a0714557b48d3 100644 --- a/website/cue/reference/components/sinks/base/splunk_hec_logs.cue +++ b/website/cue/reference/components/sinks/base/splunk_hec_logs.cue @@ -5,6 +5,19 @@ base: components: sinks: splunk_hec_logs: configuration: { description: "Splunk HEC acknowledgement configuration." required: false type: object: options: { + cookie_name: { + description: """ + Specifies the name of a cookie to extract from the Splunk HEC response and use when querying for acknowledgements. + + This is useful when using a load balancer in front of multiple Splunk indexers in a cluster because the + request to check for acknowledgements needs to go to the same indexer that originally received the data, + and the cookie can help with that routing. + + If empty, no cookie will be extracted. + """ + required: false + type: string: default: "" + } enabled: { description: """ Whether or not end-to-end acknowledgements are enabled. diff --git a/website/cue/reference/components/sinks/base/splunk_hec_metrics.cue b/website/cue/reference/components/sinks/base/splunk_hec_metrics.cue index 80376f7cfe0c0..a8a691ba8c887 100644 --- a/website/cue/reference/components/sinks/base/splunk_hec_metrics.cue +++ b/website/cue/reference/components/sinks/base/splunk_hec_metrics.cue @@ -5,6 +5,19 @@ base: components: sinks: splunk_hec_metrics: configuration: { description: "Splunk HEC acknowledgement configuration." required: false type: object: options: { + cookie_name: { + description: """ + Specifies the name of a cookie to extract from the Splunk HEC response and use when querying for acknowledgements. + + This is useful when using a load balancer in front of multiple Splunk indexers in a cluster because the + request to check for acknowledgements needs to go to the same indexer that originally received the data, + and the cookie can help with that routing. + + If empty, no cookie will be extracted. + """ + required: false + type: string: default: "" + } enabled: { description: """ Whether or not end-to-end acknowledgements are enabled.