From 9abbd72c2888311841eba84a1d8438ea07dd187b Mon Sep 17 00:00:00 2001 From: Steve Liesche Date: Mon, 13 May 2024 09:18:04 +0200 Subject: [PATCH] feat(Kafka): add SdaKafkaHealthEvent to allow update of KafkaHealthIndicator --- docs/starter-kafka.md | 18 ++++++++++ .../boot/kafka/KafkaHealthIndicator.java | 17 +++++++++ .../kafka/events/SdaKafkaHealthEvent.java | 36 +++++++++++++++++++ 3 files changed, 71 insertions(+) create mode 100644 sda-commons-starter-kafka/src/main/java/org/sdase/commons/spring/boot/kafka/events/SdaKafkaHealthEvent.java diff --git a/docs/starter-kafka.md b/docs/starter-kafka.md index afbd3803f..0edd49616 100644 --- a/docs/starter-kafka.md +++ b/docs/starter-kafka.md @@ -136,3 +136,21 @@ You need to autowire the KafkaTemplate using a Qualifier. @Qualifier("kafkaByteArrayDltTemplate") KafkaTemplate recoverTemplate, ``` +## Health Indicator + +`KafkaHealthIndicator` is used to determine if connection to the Kafka broker is possible. +In case the library is used to consume messages, and the +[Message Listener Container](https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/message-listener-container.html) +has stopped, the health indicator can be updated using the `SdaKafkaHealthEvent`. + +```java + +@Autowired +private ApplicationEventPublisher applicationEventPublisher; + +@EventListener +public void eventHandlerContainerStopped(ContainerStoppedEvent event) { + applicationEventPublisher.publishEvent(new SdaKafkaHealthEvent(event.getSource(), false, "Container stopped due to error")); +} + +``` \ No newline at end of file diff --git a/sda-commons-starter-kafka/src/main/java/org/sdase/commons/spring/boot/kafka/KafkaHealthIndicator.java b/sda-commons-starter-kafka/src/main/java/org/sdase/commons/spring/boot/kafka/KafkaHealthIndicator.java index 205ddf46a..a0162d35c 100644 --- a/sda-commons-starter-kafka/src/main/java/org/sdase/commons/spring/boot/kafka/KafkaHealthIndicator.java +++ b/sda-commons-starter-kafka/src/main/java/org/sdase/commons/spring/boot/kafka/KafkaHealthIndicator.java @@ -12,6 +12,7 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.sdase.commons.spring.boot.kafka.events.SdaKafkaHealthEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; @@ -19,6 +20,7 @@ import org.springframework.boot.actuate.endpoint.OperationResponseBody; import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.Health; +import org.springframework.context.event.EventListener; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.stereotype.Component; @@ -35,6 +37,9 @@ public class KafkaHealthIndicator extends AbstractHealthIndicator implements Ope private final AdminClient kafkaAdminClient; private final Duration kafkaCommandTimeout; + private boolean eventIsHealthy = true; + private String eventMessage; + public KafkaHealthIndicator( @Value("${management.health.kafka.timeout:4s}") Duration kafkaCommandTimeout, KafkaAdmin kafkaAdmin) { @@ -48,6 +53,12 @@ public KafkaHealthIndicator( @Override protected void doHealthCheck(Health.Builder builder) throws Exception { + + if (!eventIsHealthy) { + builder.down().withDetails(Map.of("info", eventMessage)).build(); + return; + } + kafkaAdminClient .listTopics( new ListTopicsOptions() @@ -57,4 +68,10 @@ protected void doHealthCheck(Health.Builder builder) throws Exception { builder.up().withDetails(Map.of("info", "Kafka health check operation succeeded")).build(); } + + @EventListener + public void onKafkaHealthEvent(SdaKafkaHealthEvent event) { + eventIsHealthy = event.isHealthy(); + eventMessage = event.getMessage(); + } } diff --git a/sda-commons-starter-kafka/src/main/java/org/sdase/commons/spring/boot/kafka/events/SdaKafkaHealthEvent.java b/sda-commons-starter-kafka/src/main/java/org/sdase/commons/spring/boot/kafka/events/SdaKafkaHealthEvent.java new file mode 100644 index 000000000..4b39d90d7 --- /dev/null +++ b/sda-commons-starter-kafka/src/main/java/org/sdase/commons/spring/boot/kafka/events/SdaKafkaHealthEvent.java @@ -0,0 +1,36 @@ +/* + * Copyright 2022- SDA SE Open Industry Solutions (https://www.sda.se) + * + * Use of this source code is governed by an MIT-style + * license that can be found in the LICENSE file or at + * https://opensource.org/licenses/MIT. + */ +package org.sdase.commons.spring.boot.kafka.events; + +import jakarta.validation.constraints.NotNull; +import org.springframework.context.ApplicationEvent; + +/** + * Event that is published when the health of the Kafka cluster changes. + * Can be used in combination with Application Events + * to react to health changes. + */ +public class SdaKafkaHealthEvent extends ApplicationEvent { + + boolean isHealthy; + private String message; + + public SdaKafkaHealthEvent(Object source, boolean isHealthy, @NotNull String message) { + super(source); + this.isHealthy = isHealthy; + this.message = message; + } + + public boolean isHealthy() { + return isHealthy; + } + + public String getMessage() { + return message; + } +}