Skip to content

feat(Kafka): add SdaKafkaHealthEvent to allow update of KafkaHealthIn… #711

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/starter-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,21 @@ You need to autowire the KafkaTemplate using a Qualifier.
@Qualifier("kafkaByteArrayDltTemplate") KafkaTemplate<String, ?> recoverTemplate,
```

## Health Indicator

`KafkaHealthIndicator` is used to determine if connection to the Kafka broker is possible.
In case the library is used to consume messages, and the
[Message Listener Container](https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/message-listener-container.html)
has stopped, the health indicator can be updated using the `SdaKafkaHealthEvent`.

```java

@Autowired
private ApplicationEventPublisher applicationEventPublisher;

@EventListener
public void eventHandlerContainerStopped(ContainerStoppedEvent event) {
applicationEventPublisher.publishEvent(new SdaKafkaHealthEvent(event.getSource(), false, "Container stopped due to error"));
}

```
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.sdase.commons.spring.boot.kafka.events.SdaKafkaHealthEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
import org.springframework.boot.actuate.endpoint.OperationResponseBody;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Component;

Expand All @@ -35,6 +37,9 @@ public class KafkaHealthIndicator extends AbstractHealthIndicator implements Ope
private final AdminClient kafkaAdminClient;
private final Duration kafkaCommandTimeout;

private boolean eventIsHealthy = true;
private String eventMessage;

public KafkaHealthIndicator(
@Value("${management.health.kafka.timeout:4s}") Duration kafkaCommandTimeout,
KafkaAdmin kafkaAdmin) {
Expand All @@ -48,6 +53,12 @@ public KafkaHealthIndicator(

@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {

if (!eventIsHealthy) {
builder.down().withDetails(Map.of("info", eventMessage)).build();
return;
}

kafkaAdminClient
.listTopics(
new ListTopicsOptions()
Expand All @@ -57,4 +68,10 @@ protected void doHealthCheck(Health.Builder builder) throws Exception {

builder.up().withDetails(Map.of("info", "Kafka health check operation succeeded")).build();
}

@EventListener
public void onKafkaHealthEvent(SdaKafkaHealthEvent event) {
eventIsHealthy = event.isHealthy();
eventMessage = event.getMessage();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2022- SDA SE Open Industry Solutions (https://www.sda.se)
*
* Use of this source code is governed by an MIT-style
* license that can be found in the LICENSE file or at
* https://opensource.org/licenses/MIT.
*/
package org.sdase.commons.spring.boot.kafka.events;

import jakarta.validation.constraints.NotNull;
import org.springframework.context.ApplicationEvent;

/**
* Event that is published when the health of the Kafka cluster changes.
* Can be used in combination with <a href="https://docs.spring.io/spring-kafka/reference/kafka/events.html">Application Events</a>
* to react to health changes.
*/
public class SdaKafkaHealthEvent extends ApplicationEvent {

boolean isHealthy;
private String message;

public SdaKafkaHealthEvent(Object source, boolean isHealthy, @NotNull String message) {
super(source);
this.isHealthy = isHealthy;
this.message = message;
}

public boolean isHealthy() {
return isHealthy;
}

public String getMessage() {
return message;
}
}
Loading