Skip to content

Commit b100ff8

Browse files
Add support for Kafka keys
1 parent 37f3359 commit b100ff8

File tree

10 files changed

+232
-41
lines changed

10 files changed

+232
-41
lines changed

README.md

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ Build the connector using Maven:
2525
mvn clean package
2626
```
2727

28-
Once built, the output is a single JAR called `target/kafka-connect-mq-source-0.5-SNAPSHOT-jar-with-dependencies.jar` which contains all of the required dependencies.
28+
Once built, the output is a single JAR called `target/kafka-connect-mq-source-0.6-SNAPSHOT-jar-with-dependencies.jar` which contains all of the required dependencies.
2929

3030

3131
## Running the connector
@@ -91,7 +91,6 @@ mq.message.body.jms=true
9191
value.converter=org.apache.kafka.connect.storage.StringConverter
9292
```
9393

94-
9594
### The gory detail
9695
The messages received from MQ are processed by a record builder which builds a Kafka Connect record to represent the message. There are two record builders supplied with the MQ source connector. The connector has a configuration option *mq.message.body.jms* that controls whether it interprets the MQ messages as JMS messages or regular MQ messages.
9796

@@ -113,6 +112,17 @@ You must then choose a converter than can handle the value schema and class. The
113112

114113
In addition, there is another converter for the Avro format that is part of the Confluent Platform. This has not been tested with the MQ source connector at this time.
115114

115+
### Key support and partitioning
116+
By default, the connector does not use keys for the Kafka messages it publishes. It can be configured to use the JMS message headers to set the key of the Kafka records. You could use this, for example, to use the MQMD correlation identifier as the partitioning key when the messages are published to Kafka. There are three valid values for the `mq.record.builder.key.header` that controls this behavior.
117+
118+
| mq.record.builder.key.header | Key schema | Key class | Recommended value for key.converter |
119+
| ---------------------------- |---------------- | --------- | ------------------------------------------------------ |
120+
| JMSMessageID | OPTIONAL_STRING | String | org.apache.kafka.connect.storage.StringConverter |
121+
| JMSCorrelationID | OPTIONAL_STRING | String | org.apache.kafka.connect.storage.StringConverter |
122+
| JMSCorrelationIDAsBytes | OPTIONAL_BYTES | byte[] | org.apache.kafka.connect.converters.ByteArrayConverter |
123+
124+
In MQ, the message ID and correlation ID are both 24-byte arrays. As strings, the connector represents them using a sequence of 48 hexadecimal characters.
125+
116126

117127
## Security
118128
The connector supports authentication with user name and password and also connections secured with TLS using a server-side certificate and mutual authentication with client-side certificates.
@@ -132,32 +142,32 @@ For troubleshooting, or to better understand the handshake performed by the IBM
132142
## Configuration
133143
The configuration options for the MQ Source Connector are as follows:
134144

135-
| Name | Description | Type | Default | Valid values |
136-
| ----------------------- | ----------------------------------------------------------- | ------- | ------------- | -------------------------------- |
137-
| mq.queue.manager | The name of the MQ queue manager | string | | MQ queue manager name |
138-
| mq.connection.name.list | List of connection names for queue manager | string | | host(port)[,host(port),...] |
139-
| mq.channel.name | The name of the server-connection channel | string | | MQ channel name |
140-
| mq.queue | The name of the source MQ queue | string | | MQ queue name |
141-
| mq.user.name | The user name for authenticating with the queue manager | string | | User name |
142-
| mq.password | The password for authenticating with the queue manager | string | | Password |
143-
| mq.record.builder | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder |
144-
| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | |
145-
| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite |
146-
| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern |
147-
| topic | The name of the target Kafka topic | string | | Topic name |
145+
| Name | Description | Type | Default | Valid values |
146+
| ---------------------------- | ----------------------------------------------------------- | ------- | ------------- | ------------------------------------------------------- |
147+
| mq.queue.manager | The name of the MQ queue manager | string | | MQ queue manager name |
148+
| mq.connection.name.list | List of connection names for queue manager | string | | host(port)[,host(port),...] |
149+
| mq.channel.name | The name of the server-connection channel | string | | MQ channel name |
150+
| mq.queue | The name of the source MQ queue | string | | MQ queue name |
151+
| mq.user.name | The user name for authenticating with the queue manager | string | | User name |
152+
| mq.password | The password for authenticating with the queue manager | string | | Password |
153+
| mq.record.builder | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder |
154+
| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | |
155+
| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes |
156+
| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite |
157+
| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern |
158+
| topic | The name of the target Kafka topic | string | | Topic name |
148159

149160

150161
## Future enhancements
151162
The connector is intentionally basic. The idea is to enhance it over time with additional features to make it more capable. Some possible future enhancements are:
152163
* Configurable schema for MQ messages
153164
* Simplification of handling message formats
154-
* Message key support
155165
* JMX metrics
156166
* Separate TLS configuration for the connector so that keystore location and so on can be specified as configurations
157167

158168

159169
## Issues and contributions
160-
For issues relating specifically to this connect, please use the [GitHub issue tracker](https://github.com/ibm-messaging/kafka-connect-mq-source/issues). If you do submit a Pull Request related to this connector, please indicate in the Pull Request that you accept and agree to be bound by the terms of the [IBM Contributor License Agreement](CLA.md).
170+
For issues relating specifically to this connector, please use the [GitHub issue tracker](https://github.com/ibm-messaging/kafka-connect-mq-source/issues). If you do submit a Pull Request related to this connector, please indicate in the Pull Request that you accept and agree to be bound by the terms of the [IBM Contributor License Agreement](CLA.md).
161171

162172

163173
## License

UsingMQwithKafkaConnect.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ topic=TSOURCE
154154

155155
Change directory to the Kafka root directory. Start the connector worker replacing `<connector-root-directory>`:
156156
``` shell
157-
CLASSPATH=<connector-root-directory>/target/kafka-connect-mq-source-0.5-SNAPSHOT-jar-with-dependencies.jar bin/connect-standalone.sh config/connect-standalone.properties ~/mq-source.properties
157+
CLASSPATH=<connector-root-directory>/target/kafka-connect-mq-source-0.6-SNAPSHOT-jar-with-dependencies.jar bin/connect-standalone.sh config/connect-standalone.properties ~/mq-source.properties
158158
```
159159

160160
Wait while the worker starts and then prints:
@@ -212,7 +212,7 @@ mq.password=passw0rd
212212

213213
Change directory to the Kafka root directory. Start the connector worker replacing `<connector-root-directory>`:
214214
``` shell
215-
CLASSPATH=<connector-root-directory>/target/kafka-connect-mq-sink-0.5-SNAPSHOT-jar-with-dependencies.jar bin/kafka-connect-standalone config/connect-standalone.properties ~/mq-sink.properties
215+
CLASSPATH=<connector-root-directory>/target/kafka-connect-mq-sink-0.6-SNAPSHOT-jar-with-dependencies.jar bin/kafka-connect-standalone config/connect-standalone.properties ~/mq-sink.properties
216216
```
217217

218218
Wait while the worker starts and then prints:

config/mq-source.properties

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2017 IBM Corporation
1+
# Copyright 2017, 2018 IBM Corporation
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -43,6 +43,11 @@ mq.record.builder=com.ibm.mq.kafkaconnect.builders.DefaultRecordBuilder
4343
# Whether to interpret the message body as a JMS message type (default false) - optional
4444
# mq.message.body.jms=
4545

46+
# The JMS message header to use as the Kafka record key - optional
47+
# Valid values are JMSMessageID, JMSCorrelationID and JMSCorrelationIDAsBytes
48+
# Don't forget to set key.converter to a compatible converter as described in README.md
49+
# mq.record.builder.key.header=
50+
4651
# The name of the cipher suite for TLS (SSL) connection (default blank, meaning do not use TLS) - optional
4752
# See https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.0.0/com.ibm.mq.dev.doc/q113220_.htm for valid values
4853
# mq.ssl.cipher.suite=
@@ -54,7 +59,6 @@ mq.record.builder=com.ibm.mq.kafkaconnect.builders.DefaultRecordBuilder
5459
topic=
5560

5661
# The converters control conversion of data between the internal Kafka Connect representation and the messages in Kafka.
57-
# Note that ByteArrayConverter is new in Kafka 0.11.
5862
# key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
5963
# key.converter=org.apache.kafka.connect.storage.StringConverter
6064
# key.converter=org.apache.kafka.connect.json.JsonConverter

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<groupId>com.ibm.mq.kafkaconnect</groupId>
2121
<artifactId>kafka-connect-mq-source</artifactId>
2222
<packaging>jar</packaging>
23-
<version>0.5-SNAPSHOT</version>
23+
<version>0.6-SNAPSHOT</version>
2424
<name>kafka-connect-mq-source</name>
2525
<organization>
2626
<name>IBM Corporation</name>

src/main/java/com/ibm/mq/kafkaconnect/JMSReader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ public void configure(Map<String, String> props) {
126126
try {
127127
Class<? extends RecordBuilder> c = Class.forName(builderClass).asSubclass(RecordBuilder.class);
128128
builder = c.newInstance();
129+
builder.configure(props);
129130
}
130131
catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NullPointerException exc) {
131132
log.debug("Could not instantiate message builder {}", builderClass);

src/main/java/com/ibm/mq/kafkaconnect/MQSourceConnector.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,13 @@ public class MQSourceConnector extends SourceConnector {
6767
public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_BODY_JMS = "Whether to interpret the message body as a JMS message type.";
6868
public static final String CONFIG_DISPLAY_MQ_MESSAGE_BODY_JMS = "Message body as JMS";
6969

70+
public static final String CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER = "mq.record.builder.key.header";
71+
public static final String CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_KEY_HEADER = "The JMS message header to use as the Kafka record key.";
72+
public static final String CONFIG_DISPLAY_MQ_RECORD_BUILDER_KEY_HEADER = "Record builder key header";
73+
public static final String CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSMESSAGEID = "JMSMessageID";
74+
public static final String CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONID = "JMSCorrelationID";
75+
public static final String CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONIDASBYTES = "JMSCorrelationIDAsBytes";
76+
7077
public static final String CONFIG_NAME_MQ_SSL_CIPHER_SUITE = "mq.ssl.cipher.suite";
7178
public static final String CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE = "The name of the cipher suite for the TLS (SSL) connection.";
7279
public static final String CONFIG_DISPLAY_MQ_SSL_CIPHER_SUITE = "SSL cipher suite";
@@ -79,7 +86,7 @@ public class MQSourceConnector extends SourceConnector {
7986
public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic.";
8087
public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic";
8188

82-
public static String VERSION = "0.5";
89+
public static String VERSION = "0.6";
8390

8491
private Map<String, String> configProps;
8592

@@ -174,12 +181,16 @@ public class MQSourceConnector extends SourceConnector {
174181
CONFIG_DOCUMENTATION_MQ_MESSAGE_BODY_JMS, CONFIG_GROUP_MQ, 8, Width.SHORT,
175182
CONFIG_DISPLAY_MQ_MESSAGE_BODY_JMS);
176183

184+
config.define(CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER, Type.STRING, null, Importance.MEDIUM,
185+
CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_KEY_HEADER, CONFIG_GROUP_MQ, 9, Width.MEDIUM,
186+
CONFIG_DISPLAY_MQ_RECORD_BUILDER_KEY_HEADER);
187+
177188
config.define(CONFIG_NAME_MQ_SSL_CIPHER_SUITE, Type.STRING, null, Importance.MEDIUM,
178-
CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE, CONFIG_GROUP_MQ, 9, Width.MEDIUM,
189+
CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE, CONFIG_GROUP_MQ, 10, Width.MEDIUM,
179190
CONFIG_DISPLAY_MQ_SSL_CIPHER_SUITE);
180191

181192
config.define(CONFIG_NAME_MQ_SSL_PEER_NAME, Type.STRING, null, Importance.MEDIUM,
182-
CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME, CONFIG_GROUP_MQ, 10, Width.MEDIUM,
193+
CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME, CONFIG_GROUP_MQ, 11, Width.MEDIUM,
183194
CONFIG_DISPLAY_MQ_SSL_PEER_NAME);
184195

185196
config.define(CONFIG_NAME_TOPIC, Type.STRING, null, Importance.HIGH,

0 commit comments

Comments
 (0)