Skip to content

Commit ea8a06c

Browse files
authored
feat: support for copying Kafka headers to MQ message properties (#53)
This pull request introduces a new feature to the MQ Sink Connector. If enabled, all headers on messages consumed from Kafka will be added to messages put to MQ as string properties. The feature is named to be consistent with the equivalent feature in the MQ Source Connector (`mq.kafka.headers.copy.to.jms.properties` where the Source Connector currently has `mq.jms.properties.copy.to.kafka.headers`) The new feature is disabled by default to maintain compatibility with current behaviour. Closes: #52 Signed-off-by: Dale Lane <dale.lane@uk.ibm.com>
1 parent 515baf7 commit ea8a06c

File tree

9 files changed

+241
-39
lines changed

9 files changed

+241
-39
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ COPY --chown=esuser:esgroup --from=builder /opt/kafka/libs/ /opt/kafka/libs/
1010
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-distributed.properties /opt/kafka/config/
1111
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-log4j.properties /opt/kafka/config/
1212
RUN mkdir /opt/kafka/logs && chown esuser:esgroup /opt/kafka/logs
13-
COPY --chown=esuser:esgroup target/kafka-connect-mq-sink-1.3.1-jar-with-dependencies.jar /opt/kafka/libs/
13+
COPY --chown=esuser:esgroup target/kafka-connect-mq-sink-1.4.0-jar-with-dependencies.jar /opt/kafka/libs/
1414

1515
WORKDIR /opt/kafka
1616

README.md

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,13 @@ curl -X POST -H "Content-Type: application/json" http://localhost:8083/connector
9898
This repository includes an example Dockerfile to run Kafka Connect in distributed mode. It also adds in the MQ sink connector as an available connector plugin. It uses the default `connect-distributed.properties` and `connect-log4j.properties` files.
9999

100100
1. `mvn clean package`
101-
1. `docker build -t kafkaconnect-with-mq-sink:1.3.0 .`
102-
1. `docker run -p 8083:8083 kafkaconnect-with-mq-sink:1.3.0`
101+
1. `docker build -t kafkaconnect-with-mq-sink:1.4.0 .`
102+
1. `docker run -p 8083:8083 kafkaconnect-with-mq-sink:1.4.0`
103103

104104
**NOTE:** To provide custom properties files create a folder called `config` containing the `connect-distributed.properties` and `connect-log4j.properties` files and use a Docker volume to make them available when running the container like this:
105105

106106
``` shell
107-
docker run -v $(pwd)/config:/opt/kafka/config -p 8083:8083 kafkaconnect-with-mq-sink:1.3.0
107+
docker run -v $(pwd)/config:/opt/kafka/config -p 8083:8083 kafkaconnect-with-mq-sink:1.4.0
108108
```
109109

110110
To start the MQ connector, you can use `config/mq-sink.json` in this repository after replacing all placeholders and use a command like this:
@@ -282,35 +282,36 @@ For troubleshooting, or to better understand the handshake performed by the IBM
282282
## Configuration
283283
The configuration options for the Kafka Connect sink connector for IBM MQ are as follows:
284284

285-
| Name | Description | Type | Default | Valid values |
286-
| ------------------------------------- | ---------------------------------------------------------------------- | ------- | -------------- | --------------------------------- |
287-
| topics or topics.regex | List of Kafka source topics | string | | topic1[,topic2,...] |
288-
| mq.queue.manager | The name of the MQ queue manager | string | | MQ queue manager name |
289-
| mq.connection.mode | The connection mode - bindings or client | string | client | client, bindings |
290-
| mq.connection.name.list | List of connection names for queue manager | string | | host(port)[,host(port),...] |
291-
| mq.channel.name | The name of the server-connection channel | string | | MQ channel name |
292-
| mq.queue | The name of the target MQ queue | string | | MQ queue name |
293-
| mq.user.name | The user name for authenticating with the queue manager | string | | User name |
294-
| mq.password | The password for authenticating with the queue manager | string | | Password |
295-
| mq.user.authentication.mqcsp | Whether to use MQ connection security parameters (MQCSP) | boolean | true | |
296-
| mq.ccdt.url | The URL for the CCDT file containing MQ connection details | string | | URL for obtaining a CCDT file |
297-
| mq.message.builder | The class used to build the MQ message | string | | Class implementing MessageBuilder |
298-
| mq.message.body.jms | Whether to generate the message body as a JMS message type | boolean | false | |
299-
| mq.time.to.live | Time-to-live in milliseconds for messages sent to MQ | long | 0 (unlimited) | [0,...] |
300-
| mq.persistent | Send persistent or non-persistent messages to MQ | boolean | true | |
301-
| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite |
302-
| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern |
303-
| mq.ssl.keystore.location | The path to the JKS keystore to use for SSL (TLS) connections | string | JVM keystore | Local path to a JKS file |
304-
| mq.ssl.keystore.password | The password of the JKS keystore to use for SSL (TLS) connections | string | | |
305-
| mq.ssl.truststore.location | The path to the JKS truststore to use for SSL (TLS) connections | string | JVM truststore | Local path to a JKS file |
306-
| mq.ssl.truststore.password | The password of the JKS truststore to use for SSL (TLS) connections | string | | |
307-
| mq.ssl.use.ibm.cipher.mappings | Whether to set system property to control use of IBM cipher mappings | boolean | | |
308-
| mq.message.builder.key.header | The JMS message header to set from the Kafka record key | string | | JMSCorrelationID |
309-
| mq.message.builder.value.converter | The class and prefix for message builder's value converter | string | | Class implementing Converter |
310-
| mq.message.builder.topic.property | The JMS message property to set from the Kafka topic | string | | Blank or valid JMS property name |
311-
| mq.message.builder.partition.property | The JMS message property to set from the Kafka partition | string | | Blank or valid JMS property name |
312-
| mq.message.builder.offset.property | The JMS message property to set from the Kafka offset | string | | Blank or valid JMS property name |
313-
| mq.reply.queue | The name of the reply-to queue | string | | MQ queue name or queue URI |
285+
| Name | Description | Type | Default | Valid values |
286+
| --------------------------------------- | ---------------------------------------------------------------------- | ------- | -------------- | --------------------------------- |
287+
| topics or topics.regex | List of Kafka source topics | string | | topic1[,topic2,...] |
288+
| mq.queue.manager | The name of the MQ queue manager | string | | MQ queue manager name |
289+
| mq.connection.mode | The connection mode - bindings or client | string | client | client, bindings |
290+
| mq.connection.name.list | List of connection names for queue manager | string | | host(port)[,host(port),...] |
291+
| mq.channel.name | The name of the server-connection channel | string | | MQ channel name |
292+
| mq.queue | The name of the target MQ queue | string | | MQ queue name |
293+
| mq.user.name | The user name for authenticating with the queue manager | string | | User name |
294+
| mq.password | The password for authenticating with the queue manager | string | | Password |
295+
| mq.user.authentication.mqcsp | Whether to use MQ connection security parameters (MQCSP) | boolean | true | |
296+
| mq.ccdt.url | The URL for the CCDT file containing MQ connection details | string | | URL for obtaining a CCDT file |
297+
| mq.message.builder | The class used to build the MQ message | string | | Class implementing MessageBuilder |
298+
| mq.message.body.jms | Whether to generate the message body as a JMS message type | boolean | false | |
299+
| mq.time.to.live | Time-to-live in milliseconds for messages sent to MQ | long | 0 (unlimited) | [0,...] |
300+
| mq.persistent | Send persistent or non-persistent messages to MQ | boolean | true | |
301+
| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite |
302+
| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern |
303+
| mq.ssl.keystore.location | The path to the JKS keystore to use for SSL (TLS) connections | string | JVM keystore | Local path to a JKS file |
304+
| mq.ssl.keystore.password | The password of the JKS keystore to use for SSL (TLS) connections | string | | |
305+
| mq.ssl.truststore.location | The path to the JKS truststore to use for SSL (TLS) connections | string | JVM truststore | Local path to a JKS file |
306+
| mq.ssl.truststore.password | The password of the JKS truststore to use for SSL (TLS) connections | string | | |
307+
| mq.ssl.use.ibm.cipher.mappings | Whether to set system property to control use of IBM cipher mappings | boolean | | |
308+
| mq.message.builder.key.header | The JMS message header to set from the Kafka record key | string | | JMSCorrelationID |
309+
| mq.kafka.headers.copy.to.jms.properties | Whether to copy Kafka headers to JMS message properties | boolean | false | |
310+
| mq.message.builder.value.converter | The class and prefix for message builder's value converter | string | | Class implementing Converter |
311+
| mq.message.builder.topic.property | The JMS message property to set from the Kafka topic | string | | Blank or valid JMS property name |
312+
| mq.message.builder.partition.property | The JMS message property to set from the Kafka partition | string | | Blank or valid JMS property name |
313+
| mq.message.builder.offset.property | The JMS message property to set from the Kafka offset | string | | Blank or valid JMS property name |
314+
| mq.reply.queue | The name of the reply-to queue | string | | MQ queue name or queue URI |
314315

315316

316317
### Using a CCDT file

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<groupId>com.ibm.eventstreams.connect</groupId>
2121
<artifactId>kafka-connect-mq-sink</artifactId>
2222
<packaging>jar</packaging>
23-
<version>1.3.1</version>
23+
<version>1.4.0</version>
2424
<name>kafka-connect-mq-sink</name>
2525
<organization>
2626
<name>IBM Corporation</name>

src/integration/java/com/ibm/eventstreams/connect/mqsink/AbstractJMSContextIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public String getConnectionName() {
104104
private void waitForQueueManagerStartup() throws TimeoutException {
105105
WaitingConsumer logConsumer = new WaitingConsumer();
106106
MQ_CONTAINER.followOutput(logConsumer);
107-
logConsumer.waitUntil(logline -> logline.getUtf8String().contains("AMQ5975I"));
107+
logConsumer.waitUntil(logline -> logline.getUtf8String().contains("AMQ5806I: Queued Publish/Subscribe Daemon started for queue manager"));
108108
}
109109

110110

src/integration/java/com/ibm/eventstreams/connect/mqsink/MQSinkTaskAuthIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public void testAuthenticatedQueueManager() throws Exception {
9595
private void waitForQueueManagerStartup() throws TimeoutException {
9696
WaitingConsumer logConsumer = new WaitingConsumer();
9797
MQ_CONTAINER.followOutput(logConsumer);
98-
logConsumer.waitUntil(logline -> logline.getUtf8String().contains("AMQ5975I"));
98+
logConsumer.waitUntil(logline -> logline.getUtf8String().contains("AMQ5806I: Queued Publish/Subscribe Daemon started for queue manager"));
9999
}
100100

101101
private List<Message> getAllMessagesFromQueue() throws JMSException {

src/integration/java/com/ibm/eventstreams/connect/mqsink/builders/DefaultMessageBuilderIT.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@
1616
package com.ibm.eventstreams.connect.mqsink.builders;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertFalse;
20+
import static org.junit.Assert.assertNull;
1921

2022
import java.nio.ByteBuffer;
2123

2224
import javax.jms.BytesMessage;
2325
import javax.jms.Message;
2426
import javax.jms.TextMessage;
2527

28+
import org.apache.kafka.common.record.TimestampType;
2629
import org.apache.kafka.connect.data.Schema;
30+
import org.apache.kafka.connect.header.ConnectHeaders;
2731
import org.apache.kafka.connect.sink.SinkRecord;
2832
import org.junit.Before;
2933
import org.junit.Test;
@@ -104,6 +108,32 @@ public void buildByteBufferMessageWithSchema() throws Exception {
104108
value.put(payload);
105109
createAndVerifyByteMessage(Schema.BYTES_SCHEMA, value, TEST_MESSAGE);
106110
}
111+
112+
@Test
113+
public void buildMessageWithTextHeader() throws Exception {
114+
final String TOPIC = "TOPIC.NAME";
115+
final int PARTITION = 0;
116+
final long OFFSET = 0;
117+
118+
final String TEST_HEADER_KEY = "TestHeader";
119+
120+
ConnectHeaders headers = new ConnectHeaders();
121+
headers.addString(TEST_HEADER_KEY, "This is a test header");
122+
123+
SinkRecord record = new SinkRecord(TOPIC, PARTITION,
124+
Schema.STRING_SCHEMA, "mykey",
125+
Schema.STRING_SCHEMA, "Test message",
126+
OFFSET,
127+
null, TimestampType.NO_TIMESTAMP_TYPE,
128+
headers);
129+
130+
// header should not have been copied across by default
131+
Message message = builder.fromSinkRecord(getJmsContext(), record);
132+
assertNull(message.getStringProperty(TEST_HEADER_KEY));
133+
134+
// no message properties should be set by default
135+
assertFalse(message.getPropertyNames().hasMoreElements());
136+
}
107137

108138

109139
private void createAndVerifyEmptyMessage(Schema valueSchema) throws Exception {
@@ -121,7 +151,8 @@ private void createAndVerifyStringMessage(Schema valueSchema, String value) thro
121151

122152
private void createAndVerifyIntegerMessage(Schema valueSchema, Integer value) throws Exception {
123153
Message message = builder.fromSinkRecord(getJmsContext(), generateSinkRecord(valueSchema, value));
124-
assertEquals(value, new Integer(message.getBody(String.class)));
154+
Integer intValue = Integer.parseInt(message.getBody(String.class));
155+
assertEquals(value, intValue);
125156
}
126157

127158
private void createAndVerifyByteMessage(Schema valueSchema, Object value, String valueAsString) throws Exception {

0 commit comments

Comments
 (0)