Skip to content

Commit 55369f4

Browse files
committed
kafka:9092 using withNetworkAliases
Allow to configure kafka:9092 using withNetworkAliases, a commonly used method for testcontainers.
1 parent 2e0ef57 commit 55369f4

File tree

7 files changed

+127
-34
lines changed

7 files changed

+127
-34
lines changed

docs/modules/kafka.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,18 @@ Create a `KafkaContainer` to use it in your tests:
5454
[Creating a KafkaContainer](../../modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java) inside_block:constructorWithVersion
5555
<!--/codeinclude-->
5656

57+
### withNetworkAliases
58+
59+
The first alias host name defined using `withNetworkAliases` configures port 9092 on the network set by `withNetwork`. Example for `kafka:9092`:
60+
61+
<!--codeinclude-->
62+
[Creating a KafkaContainer](../../modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java) inside_block:registerAlias
63+
<!--/codeinclude-->
64+
65+
This works with `org.testcontainers.kafka.KafkaContainer` and `org.testcontainers.kafka.ConfluentKafkaContainer` but not with deprecated `org.testcontainers.containers.KafkaContainer`.
66+
67+
It only works if `withListener` is not used and `KAFKA_BROKER_ID` environment variable is not set.
68+
5769
## Options
5870
5971
### <a name="zookeeper"></a> Using external Zookeeper

modules/kafka/src/main/java/org/testcontainers/kafka/ConfluentKafkaContainer.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,22 @@ protected void configure() {
4848

4949
@Override
5050
protected void containerIsStarting(InspectContainerResponse containerInfo) {
51+
List<String> networkAliases = getNetworkAliases();
52+
String plaintextAdvertisedListener;
53+
if (!getEnvMap().containsKey("KAFKA_BROKER_ID") && listeners.isEmpty() && networkAliases.size() > 1) {
54+
// 0 is the random network alias generated by GenericContainer
55+
plaintextAdvertisedListener = "PLAINTEXT://" + networkAliases.get(1) + ":" + KafkaHelper.KAFKA_PORT;
56+
} else {
57+
plaintextAdvertisedListener = "PLAINTEXT://" + getBootstrapServers();
58+
}
59+
5160
String brokerAdvertisedListener = String.format(
5261
"BROKER://%s:%s",
5362
containerInfo.getConfig().getHostName(),
5463
"9093"
5564
);
5665
List<String> advertisedListeners = new ArrayList<>();
57-
advertisedListeners.add("PLAINTEXT://" + getBootstrapServers());
66+
advertisedListeners.add(plaintextAdvertisedListener);
5867
advertisedListeners.add(brokerAdvertisedListener);
5968

6069
advertisedListeners.addAll(KafkaHelper.resolveAdvertisedListeners(this.advertisedListeners));
@@ -87,8 +96,15 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {
8796
* <p>
8897
* Default advertised listeners:
8998
* <ul>
90-
* <li>{@code container.getConfig().getHostName():9092}</li>
91-
* <li>{@code container.getHost():container.getMappedPort(9093)}</li>
99+
* <li>{@code container.getConfig().getHostName():container.getMappedPort(9092)}</li>
100+
* <li>{@code container.getHost():9093}</li>
101+
* </ul>
102+
* <p>
103+
* Default advertised listeners if {@code withNetworkAliases} is used, {@code withListener} is not used,
104+
* and {@code KAFKA_BROKER_ID} environment variable is not set:
105+
* <ul>
106+
* <li>{@code first network alias:9092}</li>
107+
* <li>{@code container.getHost():9093}</li>
92108
* </ul>
93109
* @param listener a listener with format {@code host:port}
94110
* @return this {@link ConfluentKafkaContainer} instance
@@ -118,8 +134,15 @@ public ConfluentKafkaContainer withListener(String listener) {
118134
* <p>
119135
* Default advertised listeners:
120136
* <ul>
121-
* <li>{@code container.getConfig().getHostName():9092}</li>
122-
* <li>{@code container.getHost():container.getMappedPort(9093)}</li>
137+
* <li>{@code container.getConfig().getHostName():container.getMappedPort(9092)}</li>
138+
* <li>{@code container.getHost():9093}</li>
139+
* </ul>
140+
* <p>
141+
* Default advertised listeners if {@code withNetworkAliases} is used, {@code withListener} is not used,
142+
* and {@code KAFKA_BROKER_ID} environment variable is not set:
143+
* <ul>
144+
* <li>{@code first network alias:9092}</li>
145+
* <li>{@code container.getHost():9093}</li>
123146
* </ul>
124147
* @param listener a supplier that will provide a listener
125148
* @param advertisedListener a supplier that will provide a listener

modules/kafka/src/main/java/org/testcontainers/kafka/KafkaContainer.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,18 +54,27 @@ protected void configure() {
5454

5555
@Override
5656
protected void containerIsStarting(InspectContainerResponse containerInfo) {
57+
List<String> networkAliases = getNetworkAliases();
58+
String plaintextAdvertisedListener;
59+
if (!getEnvMap().containsKey("KAFKA_BROKER_ID") && listeners.isEmpty() && networkAliases.size() > 1) {
60+
// 0 is the random network alias generated by GenericContainer
61+
plaintextAdvertisedListener = "PLAINTEXT://" + networkAliases.get(1) + ":" + KafkaHelper.KAFKA_PORT;
62+
} else {
63+
plaintextAdvertisedListener = "PLAINTEXT://" + getBootstrapServers();
64+
}
65+
5766
String brokerAdvertisedListener = String.format(
5867
"BROKER://%s:%s",
5968
containerInfo.getConfig().getHostName(),
6069
"9093"
6170
);
71+
6272
List<String> advertisedListeners = new ArrayList<>();
63-
advertisedListeners.add("PLAINTEXT://" + getBootstrapServers());
73+
advertisedListeners.add(plaintextAdvertisedListener);
6474
advertisedListeners.add(brokerAdvertisedListener);
6575

6676
advertisedListeners.addAll(KafkaHelper.resolveAdvertisedListeners(this.advertisedListeners));
6777
String kafkaAdvertisedListeners = String.join(",", advertisedListeners);
68-
6978
String command = "#!/bin/bash\n";
7079
// exporting KAFKA_ADVERTISED_LISTENERS with the container hostname
7180
command += String.format("export KAFKA_ADVERTISED_LISTENERS=%s\n", kafkaAdvertisedListeners);
@@ -93,8 +102,15 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {
93102
* <p>
94103
* Default advertised listeners:
95104
* <ul>
96-
* <li>{@code container.getConfig().getHostName():9092}</li>
97-
* <li>{@code container.getHost():container.getMappedPort(9093)}</li>
105+
* <li>{@code container.getConfig().getHostName():container.getMappedPort(9092)}</li>
106+
* <li>{@code container.getHost():9093}</li>
107+
* </ul>
108+
* <p>
109+
* Default advertised listeners if {@code withNetworkAliases} is used, {@code withListener} is not used,
110+
* and {@code KAFKA_BROKER_ID} environment variable is not set:
111+
* <ul>
112+
* <li>{@code first network alias:9092}</li>
113+
* <li>{@code container.getHost():9093}</li>
98114
* </ul>
99115
* @param listener a listener with format {@code host:port}
100116
* @return this {@link KafkaContainer} instance
@@ -124,8 +140,15 @@ public KafkaContainer withListener(String listener) {
124140
* <p>
125141
* Default advertised listeners:
126142
* <ul>
127-
* <li>{@code container.getConfig().getHostName():9092}</li>
128-
* <li>{@code container.getHost():container.getMappedPort(9093)}</li>
143+
* <li>{@code container.getConfig().getHostName():container.getMappedPort(9092)}</li>
144+
* <li>{@code container.getHost():9093}</li>
145+
* </ul>
146+
* <p>
147+
* Default advertised listeners if {@code withNetworkAliases} is used, {@code withListener} is not used,
148+
* and {@code KAFKA_BROKER_ID} environment variable is not set:
149+
* <ul>
150+
* <li>{@code first network alias:9092}</li>
151+
* <li>{@code container.getHost():9093}</li>
129152
* </ul>
130153
* @param listener a supplier that will provide a listener
131154
* @param advertisedListener a supplier that will provide a listener

modules/kafka/src/test/java/org/testcontainers/AbstractKafka.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.testcontainers;
22

33
import com.google.common.collect.ImmutableMap;
4+
import lombok.SneakyThrows;
45
import org.apache.kafka.clients.admin.AdminClient;
56
import org.apache.kafka.clients.admin.AdminClientConfig;
67
import org.apache.kafka.clients.admin.NewTopic;
@@ -15,6 +16,7 @@
1516
import org.apache.kafka.common.serialization.StringDeserializer;
1617
import org.apache.kafka.common.serialization.StringSerializer;
1718
import org.awaitility.Awaitility;
19+
import org.testcontainers.containers.Network;
1820

1921
import java.time.Duration;
2022
import java.util.Collection;
@@ -150,4 +152,16 @@ protected static String getJaasConfig() {
150152
"user_test=\"secret\";";
151153
return jaasConfig;
152154
}
155+
156+
@SneakyThrows
157+
protected void assertKafka(String listener, Network network) {
158+
try (KCatContainer kcat = new KCatContainer().withNetwork(network)) {
159+
kcat.start();
160+
161+
kcat.execInContainer("kcat", "-b", listener, "-t", "msgs", "-P", "-l", "/data/msgs.txt");
162+
String stdout = kcat.execInContainer("kcat", "-b", listener, "-C", "-t", "msgs", "-c", "1").getStdout();
163+
164+
assertThat(stdout).contains("Message produced by kcat");
165+
}
166+
}
153167
}

modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,19 @@ public void testKraftPrecedenceOverEmbeddedZookeeper() throws Exception {
185185
}
186186
}
187187

188+
@Test
189+
public void testUsageWithNetworkAlias() {
190+
try (
191+
Network network = Network.newNetwork();
192+
KafkaContainer kafka = new KafkaContainer(KAFKA_KRAFT_TEST_IMAGE)
193+
.withNetworkAliases("mykafka")
194+
.withNetwork(network)
195+
) {
196+
kafka.start();
197+
assertKafka("mykafka:9092", network);
198+
}
199+
}
200+
188201
@Test
189202
public void testUsageWithListener() throws Exception {
190203
try (

modules/kafka/src/test/java/org/testcontainers/kafka/ConfluentKafkaContainerTest.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
import org.testcontainers.containers.SocatContainer;
1010
import org.testcontainers.utility.MountableFile;
1111

12-
import static org.assertj.core.api.Assertions.assertThat;
13-
1412
public class ConfluentKafkaContainerTest extends AbstractKafka {
1513

1614
@Test
@@ -24,6 +22,19 @@ public void testUsage() throws Exception {
2422
}
2523
}
2624

25+
@Test
26+
public void testUsageWithNetworkAlias() {
27+
try (
28+
Network network = Network.newNetwork();
29+
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")
30+
.withNetworkAliases("mykafka")
31+
.withNetwork(network)
32+
) {
33+
kafka.start();
34+
assertKafka("mykafka:9092", network);
35+
}
36+
}
37+
2738
@Test
2839
public void testUsageWithListener() throws Exception {
2940
try (
@@ -36,14 +47,7 @@ public void testUsageWithListener() throws Exception {
3647
KCatContainer kcat = new KCatContainer().withNetwork(network)
3748
) {
3849
kafka.start();
39-
kcat.start();
40-
41-
kcat.execInContainer("kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt");
42-
String stdout = kcat
43-
.execInContainer("kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1")
44-
.getStdout();
45-
46-
assertThat(stdout).contains("Message produced by kcat");
50+
assertKafka("kafka:19092", network);
4751
}
4852
}
4953

modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,9 @@
22

33
import org.junit.Test;
44
import org.testcontainers.AbstractKafka;
5-
import org.testcontainers.KCatContainer;
65
import org.testcontainers.containers.Network;
76
import org.testcontainers.containers.SocatContainer;
87

9-
import static org.assertj.core.api.Assertions.assertThat;
10-
118
public class KafkaContainerTest extends AbstractKafka {
129

1310
@Test
@@ -22,25 +19,32 @@ public void testUsage() throws Exception {
2219
}
2320

2421
@Test
25-
public void testUsageWithListener() throws Exception {
22+
public void testUsageWithNetworkAlias() {
23+
try (
24+
// registerAlias {
25+
Network network = Network.newNetwork();
26+
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
27+
.withNetworkAliases("kafka")
28+
.withNetwork(network);
29+
// }
30+
) {
31+
kafka.start();
32+
assertKafka("kafka:9092", network);
33+
}
34+
}
35+
36+
@Test
37+
public void testUsageWithListener() {
2638
try (
2739
Network network = Network.newNetwork();
2840
// registerListener {
2941
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
3042
.withListener("kafka:19092")
3143
.withNetwork(network);
3244
// }
33-
KCatContainer kcat = new KCatContainer().withNetwork(network)
3445
) {
3546
kafka.start();
36-
kcat.start();
37-
38-
kcat.execInContainer("kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt");
39-
String stdout = kcat
40-
.execInContainer("kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1")
41-
.getStdout();
42-
43-
assertThat(stdout).contains("Message produced by kcat");
47+
assertKafka("kafka:19092", network);
4448
}
4549
}
4650

0 commit comments

Comments
 (0)