Skip to content

Run healthcheck connection checks in parallel #373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ target/*

*.tmp
*.temp

.DS_Store
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>fi.hsl</groupId>
<artifactId>transitdata-common</artifactId>
<version>2.0.2.1</version>
<version>2.0.3</version>
<packaging>jar</packaging>
<name>Common utilities for Transitdata projects</name>
<properties>
Expand Down
72 changes: 63 additions & 9 deletions src/main/java/fi/hsl/common/health/HealthServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.function.BooleanSupplier;

public class HealthServer {
Expand All @@ -21,7 +22,9 @@ public class HealthServer {
public final int port;
public final String endpoint;
public final HttpServer httpServer;
private List<BooleanSupplier> checks = new ArrayList<>();
private final ExecutorService healthCheckExecutor =
Executors.newCachedThreadPool();
private final List<BooleanSupplier> checks = new CopyOnWriteArrayList<>();

public HealthServer(final int port, @NotNull final String endpoint) throws IOException {
this.port = port;
Expand All @@ -30,14 +33,14 @@ public HealthServer(final int port, @NotNull final String endpoint) throws IOExc
httpServer = HttpServer.create(new InetSocketAddress(port), 0);
httpServer.createContext("/", createDefaultHandler());
httpServer.createContext(endpoint, createHandler());
httpServer.setExecutor(Executors.newSingleThreadExecutor());
httpServer.setExecutor(healthCheckExecutor);
httpServer.start();
log.info("HealthServer started");
}

private void writeResponse(@NotNull final HttpExchange httpExchange, @NotNull final int responseCode, @NotNull final String responseBody) throws IOException {
final byte[] response = responseBody.getBytes("UTF-8");
httpExchange.getResponseHeaders().add("Content-Type", "text/plain; charset=UTF-8");
final byte[] response = responseBody.getBytes(StandardCharsets.UTF_8);
httpExchange.getResponseHeaders().add("Content-Type", "text/plain; charset=" + StandardCharsets.UTF_8.name());
httpExchange.sendResponseHeaders(responseCode, response.length);
final OutputStream out = httpExchange.getResponseBody();
out.write(response);
Expand Down Expand Up @@ -91,16 +94,67 @@ public void clearChecks() {
}

public boolean checkHealth() {
boolean isHealthy = true;
for (final BooleanSupplier check : checks) {
isHealthy &= check.getAsBoolean();
try {
CompletionService<Boolean> executorCompletionService
= new ExecutorCompletionService<>(healthCheckExecutor);
int n = checks.size();
List<Future<Boolean>> futures = new ArrayList<>(n);
try {
for (BooleanSupplier check : checks) {
futures.add(executorCompletionService.submit(checkToCallable(check)));
}
for (int i = 0; i < n; ++i) {
try {
Boolean result = executorCompletionService.take().get();
if (result == null || !result) {
return false; // Return false immediately if any check fails
}
} catch (ExecutionException e) {
log.error("A health check task execution failed. Marking unhealthy.", e.getCause() != null ? e.getCause() : e);
return false;
} catch (InterruptedException e) {
log.error("Health check interrupted. Marking unhealthy.", e);
Thread.currentThread().interrupt();
return false;
}
}
} finally {
for (Future<Boolean> f : futures) {
f.cancel(true);
}
}
return true; // Return true only if all checks pass
} catch (Exception e) {
log.error("Exception during health checks", e);
return false;
}
return isHealthy;
}

public void close() {
if (httpServer != null) {
httpServer.stop(0);
}
if (healthCheckExecutor != null) {
healthCheckExecutor.shutdown();
try {
if (!healthCheckExecutor.awaitTermination(3, TimeUnit.SECONDS)) {
healthCheckExecutor.shutdownNow();
}
} catch (InterruptedException ie) {
healthCheckExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

private static Callable<Boolean> checkToCallable(BooleanSupplier check) {
return () -> {
try {
return check.getAsBoolean();
} catch (Exception e) {
log.error("Exception during health check", e);
return false;
}
};
}
}
111 changes: 111 additions & 0 deletions src/test/java/fi/hsl/common/health/HealthServerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package fi.hsl.common.health;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class HealthServerTest {

private HealthServer healthServer;
private final int testPort = 0;
private final String testEndpoint = "/healthz";

private static class CountingWrapper implements BooleanSupplier {

private final BooleanSupplier delegate;
private final AtomicInteger callCount = new AtomicInteger(0);

public CountingWrapper(BooleanSupplier delegate) {
this.delegate = delegate;
}

@Override
public boolean getAsBoolean() {
callCount.incrementAndGet();
return delegate.getAsBoolean();
}

public int getCallCount() {
return callCount.get();
}
}

@Before
public void setUp() throws IOException {
healthServer = new HealthServer(testPort, testEndpoint);
}

@After
public void tearDown() {
if (healthServer != null) {
healthServer.close();
}
}

@Test
public void singleUnhealthyCheckReturnsFalse() {
CountingWrapper unhealthyCheck = new CountingWrapper(() -> false);
healthServer.addCheck(unhealthyCheck);
boolean healthStatus = healthServer.checkHealth();
assertFalse(
"Health status should be false when one check is unhealthy.",
healthStatus
);
assertEquals(
"UnhealthyCheck should have been called once.",
1,
unhealthyCheck.getCallCount()
);
}

@Test
public void allHealthyChecksReturnsTrue() {
CountingWrapper healthyCheck1 = new CountingWrapper(() -> true);
CountingWrapper healthyCheck2 = new CountingWrapper(() -> true);
healthServer.addCheck(healthyCheck1);
healthServer.addCheck(healthyCheck2);
boolean healthStatus = healthServer.checkHealth();
assertTrue(
"Health status should be true when all checks are healthy.",
healthStatus
);
assertEquals(
"HealthyCheck1 should have been called once.",
1,
healthyCheck1.getCallCount()
);
assertEquals(
"HealthyCheck2 should have been called once.",
1,
healthyCheck2.getCallCount()
);
}

@Test
public void testCheckHealth_CheckThrowsException_ReturnsFalse() {
final RuntimeException testException = new RuntimeException(
"Simulated check failure"
);
CountingWrapper exceptionThrowingCheck = new CountingWrapper(() -> {
throw testException;
});
healthServer.addCheck(exceptionThrowingCheck);
boolean healthStatus = healthServer.checkHealth();
assertFalse(
"Health status should be false when a check throws an exception.",
healthStatus
);
assertEquals(
"ExceptionThrowingCheck should have been called once.",
1,
exceptionThrowingCheck.getCallCount()
);
}
}