Skip to content

Commit 6960a50

Browse files
Merge pull request #46 from aadu-coder/feature/mqheader-to-kafkaheader-conversion
added code for copying JMS String Properties to kafka header conversion
2 parents bff4081 + 8115f31 commit 6960a50

File tree

11 files changed

+238
-63
lines changed

11 files changed

+238
-63
lines changed

kafka-connect.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ apiVersion: apps/v1
33
kind: Deployment
44
metadata:
55
name: kafkaconnect-deploy
6-
labels:
6+
labels:
77
app: kafkaconnect-with-mq-source
88
spec:
99
replicas: 1
@@ -45,7 +45,7 @@ spec:
4545
periodSeconds: 20
4646
ports:
4747
- containerPort: 8083
48-
volumeMounts:
48+
volumeMounts:
4949
- name: connect-config
5050
mountPath: /opt/kafka/config/connect-distributed.properties
5151
subPath: connect-distributed.properties

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@
7272
<version>4.12</version>
7373
<scope>test</scope>
7474
</dependency>
75+
<dependency>
76+
<groupId>org.mockito</groupId>
77+
<artifactId>mockito-core</artifactId>
78+
<version>3.1.0</version>
79+
<scope>test</scope>
80+
</dependency>
7581
<dependency>
7682
<groupId>org.slf4j</groupId>
7783
<artifactId>slf4j-log4j12</artifactId>
@@ -88,6 +94,7 @@
8894
<configuration>
8995
<source>1.8</source>
9096
<target>1.8</target>
97+
<compilerArgument>-Xlint:unchecked</compilerArgument>
9198
</configuration>
9299
</plugin>
93100
<plugin>

src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java

100644100755
Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/**
22
* Copyright 2017, 2018, 2019 IBM Corporation
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
@@ -67,7 +67,7 @@ public class JMSReader {
6767
private MQQueue queue;
6868

6969
private RecordBuilder builder;
70-
70+
7171
private boolean connected = false; // Whether connected to MQ
7272
private boolean inflight = false; // Whether messages in-flight in current transaction
7373
private boolean inperil = false; // Whether current transaction must be forced to roll back
@@ -82,7 +82,7 @@ public JMSReader() {}
8282

8383
/**
8484
* Configure this class.
85-
*
85+
*
8686
* @param props initial configuration
8787
*
8888
* @throws ConnectException Operation failed and connector should stop.
@@ -114,14 +114,14 @@ public void configure(Map<String, String> props) {
114114
if (connectionMode != null) {
115115
if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT)) {
116116
transportType = WMQConstants.WMQ_CM_CLIENT;
117-
}
117+
}
118118
else if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS)) {
119119
transportType = WMQConstants.WMQ_CM_BINDINGS;
120-
}
120+
}
121121
else {
122122
log.error("Unsupported MQ connection mode {}", connectionMode);
123123
throw new ConnectException("Unsupported MQ connection mode");
124-
}
124+
}
125125
}
126126

127127
try {
@@ -144,7 +144,7 @@ else if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE
144144
throw new ConnectException("CCDT file url invalid", e);
145145
}
146146
mqConnFactory.setCCDTURL(ccdtUrlObject);
147-
}
147+
}
148148
else {
149149
mqConnFactory.setConnectionNameList(connectionNameList);
150150
mqConnFactory.setChannel(channelName);
@@ -165,10 +165,10 @@ else if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE
165165
}
166166

167167
queue = new MQQueue(queueName);
168-
168+
169169
this.userName = userName;
170170
this.password = password;
171-
171+
172172
this.messageBodyJms = false;
173173
queue.setMessageBodyStyle(WMQConstants.WMQ_MESSAGE_BODY_MQ);
174174
if (mbj != null) {
@@ -216,11 +216,11 @@ public void connect() {
216216
}
217217
else {
218218
jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED);
219-
}
219+
}
220220

221221
jmsCons = jmsCtxt.createConsumer(queue);
222222
connected = true;
223-
223+
224224
log.info("Connection to MQ established");
225225
}
226226
catch (JMSRuntimeException jmse) {
@@ -272,7 +272,7 @@ public SourceRecord receive(boolean wait) {
272272
// to commit once we've passed it on to Kafka. Temporarily mark the transaction as "in-peril"
273273
// so that any exception thrown will result in the transaction rolling back instead of committing.
274274
inperil = true;
275-
275+
276276
sr = builder.toSourceRecord(jmsCtxt, topic, messageBodyJms, m);
277277
inperil = false;
278278
}
@@ -357,12 +357,12 @@ private boolean connectInternal() {
357357
}
358358
else {
359359
jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED);
360-
}
360+
}
361361

362362
jmsCons = jmsCtxt.createConsumer(queue);
363363
reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN;
364364
connected = true;
365-
365+
366366
log.info("Connection to MQ established");
367367
}
368368
catch (JMSRuntimeException jmse) {

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

100644100755
Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/**
22
* Copyright 2017, 2018, 2019 IBM Corporation
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
@@ -61,22 +61,26 @@ public class MQSourceConnector extends SourceConnector {
6161
public static final String CONFIG_DOCUMENTATION_MQ_USER_NAME = "The user name for authenticating with the queue manager.";
6262
public static final String CONFIG_DISPLAY_MQ_USER_NAME = "User name";
6363

64-
public static final String CONFIG_NAME_MQ_PASSWORD = "mq.password";
64+
public static final String CONFIG_NAME_MQ_PASSWORD = "mq.password";
6565
public static final String CONFIG_DOCUMENTATION_MQ_PASSWORD = "The password for authenticating with the queue manager.";
6666
public static final String CONFIG_DISPLAY_MQ_PASSWORD = "Password";
6767

68-
public static final String CONFIG_NAME_MQ_CCDT_URL = "mq.ccdt.url";
68+
public static final String CONFIG_NAME_MQ_CCDT_URL = "mq.ccdt.url";
6969
public static final String CONFIG_DOCUMENTATION_MQ_CCDT_URL = "The CCDT URL to use to establish a connection to the queue manager.";
7070
public static final String CONFIG_DISPLAY_MQ_CCDT_URL = "CCDT URL";
7171

7272
public static final String CONFIG_NAME_MQ_RECORD_BUILDER = "mq.record.builder";
7373
public static final String CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER = "The class used to build the Kafka Connect records.";
7474
public static final String CONFIG_DISPLAY_MQ_RECORD_BUILDER = "Record builder";
7575

76-
public static final String CONFIG_NAME_MQ_MESSAGE_BODY_JMS = "mq.message.body.jms";
76+
public static final String CONFIG_NAME_MQ_MESSAGE_BODY_JMS = "mq.message.body.jms";
7777
public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_BODY_JMS = "Whether to interpret the message body as a JMS message type.";
7878
public static final String CONFIG_DISPLAY_MQ_MESSAGE_BODY_JMS = "Message body as JMS";
7979

80+
public static final String CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER = "mq.jms.properties.copy.to.kafka.headers";
81+
public static final String CONFIG_DOCUMENTATION_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER = "Whether to copy JMS Message properties to Kafka headers";
82+
public static final String CONFIG_DISPLAY_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER = "Copy JMS Message Property to Kafka Headers";
83+
8084
public static final String CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER = "mq.record.builder.key.header";
8185
public static final String CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_KEY_HEADER = "The JMS message header to use as the Kafka record key.";
8286
public static final String CONFIG_DISPLAY_MQ_RECORD_BUILDER_KEY_HEADER = "Record builder key header";
@@ -85,11 +89,11 @@ public class MQSourceConnector extends SourceConnector {
8589
public static final String CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONIDASBYTES = "JMSCorrelationIDAsBytes";
8690
public static final String CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSDESTINATION = "JMSDestination";
8791

88-
public static final String CONFIG_NAME_MQ_SSL_CIPHER_SUITE = "mq.ssl.cipher.suite";
92+
public static final String CONFIG_NAME_MQ_SSL_CIPHER_SUITE = "mq.ssl.cipher.suite";
8993
public static final String CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE = "The name of the cipher suite for the TLS (SSL) connection.";
9094
public static final String CONFIG_DISPLAY_MQ_SSL_CIPHER_SUITE = "SSL cipher suite";
9195

92-
public static final String CONFIG_NAME_MQ_SSL_PEER_NAME = "mq.ssl.peer.name";
96+
public static final String CONFIG_NAME_MQ_SSL_PEER_NAME = "mq.ssl.peer.name";
9397
public static final String CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME = "The distinguished name pattern of the TLS (SSL) peer.";
9498
public static final String CONFIG_DISPLAY_MQ_SSL_PEER_NAME = "SSL peer name";
9599

@@ -119,11 +123,13 @@ public class MQSourceConnector extends SourceConnector {
119123
public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_MQMD_READ = "Whether to enable reading of all MQMD fields.";
120124
public static final String CONFIG_DISPLAY_MQ_MESSAGE_MQMD_READ = "Enable reading of MQMD fields";
121125

126+
122127
public static final String CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP = "mq.user.authentication.mqcsp";
123128
public static final String CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP = "Whether to use MQ connection security parameters (MQCSP).";
124129
public static final String CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP = "User authentication using MQCSP";
125130

126131
public static final String CONFIG_NAME_TOPIC = "topic";
132+
127133
public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic.";
128134
public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic";
129135

@@ -168,7 +174,7 @@ public class MQSourceConnector extends SourceConnector {
168174
*/
169175
@Override public Class<? extends Task> taskClass() {
170176
return MQSourceTask.class;
171-
}
177+
}
172178

173179
/**
174180
* Returns a set of configurations for Tasks based on the current configuration,
@@ -209,7 +215,7 @@ public class MQSourceConnector extends SourceConnector {
209215
CONFIG_DOCUMENTATION_MQ_QUEUE_MANAGER, CONFIG_GROUP_MQ, 1, Width.MEDIUM,
210216
CONFIG_DISPLAY_MQ_QUEUE_MANAGER);
211217

212-
config.define(CONFIG_NAME_MQ_CONNECTION_MODE, Type.STRING, CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT,
218+
config.define(CONFIG_NAME_MQ_CONNECTION_MODE, Type.STRING, CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT,
213219
ConfigDef.ValidString.in(CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT,
214220
CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS),
215221
Importance.MEDIUM,
@@ -286,8 +292,14 @@ public class MQSourceConnector extends SourceConnector {
286292
CONFIG_DISPLAY_MQ_MESSAGE_MQMD_READ);
287293

288294
config.define(CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP, Type.BOOLEAN, Boolean.TRUE, Importance.LOW,
289-
CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP, CONFIG_GROUP_MQ, 20, Width.SHORT,
290-
CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP);
295+
CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP, CONFIG_GROUP_MQ, 20, Width.SHORT,
296+
CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP);
297+
298+
299+
config.define(CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, Type.BOOLEAN, Boolean.FALSE, Importance.LOW,
300+
CONFIG_DOCUMENTATION_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, CONFIG_GROUP_MQ, 21, Width.MEDIUM,
301+
CONFIG_DISPLAY_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER);
302+
291303

292304
config.define(CONFIG_NAME_TOPIC, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH,
293305
CONFIG_DOCUMENTATION_TOPIC, null, 0, Width.MEDIUM,

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java

100644100755
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/**
22
* Copyright 2017, 2018, 2019 IBM Corporation
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
@@ -227,7 +227,7 @@ public void commit() throws InterruptedException {
227227
willClose = true;
228228
}
229229
}
230-
230+
231231
if (willClose) {
232232
// Close the connection to MQ to clean up
233233
if (reader != null) {

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java

100644100755
Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,7 @@
1616
package com.ibm.eventstreams.connect.mqsource.builders;
1717

1818
import com.ibm.eventstreams.connect.mqsource.MQSourceConnector;
19-
20-
import java.util.Map;
21-
22-
import javax.jms.JMSContext;
23-
import javax.jms.JMSException;
24-
import javax.jms.Message;
25-
19+
import com.ibm.eventstreams.connect.mqsource.processor.JmsToKafkaHeaderConverter;
2620
import org.apache.kafka.connect.data.Schema;
2721
import org.apache.kafka.connect.data.SchemaAndValue;
2822
import org.apache.kafka.connect.errors.ConnectException;
@@ -31,6 +25,12 @@
3125
import org.slf4j.Logger;
3226
import org.slf4j.LoggerFactory;
3327

28+
import javax.jms.JMSContext;
29+
import javax.jms.JMSException;
30+
import javax.jms.Message;
31+
import java.util.Map;
32+
import java.util.Optional;
33+
3434
/**
3535
* Builds Kafka Connect SourceRecords from messages.
3636
*/
@@ -40,9 +40,13 @@ public abstract class BaseRecordBuilder implements RecordBuilder {
4040
public enum KeyHeader {NONE, MESSAGE_ID, CORRELATION_ID, CORRELATION_ID_AS_BYTES, DESTINATION};
4141
protected KeyHeader keyheader = KeyHeader.NONE;
4242

43+
44+
private boolean copyJmsPropertiesFlag = Boolean.FALSE;
45+
private JmsToKafkaHeaderConverter jmsToKafkaHeaderConverter;
46+
4347
/**
4448
* Configure this class.
45-
*
49+
*
4650
* @param props initial configuration
4751
*
4852
* @throws ConnectException Operation failed and connector should stop.
@@ -74,18 +78,25 @@ else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_J
7478
}
7579
}
7680

77-
log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName());
78-
}
81+
String str = props.get(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER);
82+
83+
copyJmsPropertiesFlag = Boolean.parseBoolean(Optional.ofNullable(str).orElse("false"));
84+
85+
jmsToKafkaHeaderConverter = new JmsToKafkaHeaderConverter();
86+
87+
88+
log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName());
89+
}
7990

8091
/**
8192
* Gets the key to use for the Kafka Connect SourceRecord.
82-
*
93+
*
8394
* @param context the JMS context to use for building messages
8495
* @param topic the Kafka topic
8596
* @param message the message
86-
*
97+
*
8798
* @return the Kafka Connect SourceRecord's key
88-
*
99+
*
89100
* @throws JMSException Message could not be converted
90101
*/
91102
public SchemaAndValue getKey(JMSContext context, String topic, Message message) throws JMSException {
@@ -131,34 +142,42 @@ public SchemaAndValue getKey(JMSContext context, String topic, Message message)
131142

132143
/**
133144
* Gets the value to use for the Kafka Connect SourceRecord.
134-
*
145+
*
135146
* @param context the JMS context to use for building messages
136147
* @param topic the Kafka topic
137148
* @param messageBodyJms whether to interpret MQ messages as JMS messages
138149
* @param message the message
139-
*
150+
*
140151
* @return the Kafka Connect SourceRecord's value
141-
*
152+
*
142153
* @throws JMSException Message could not be converted
143154
*/
144155
public abstract SchemaAndValue getValue(JMSContext context, String topic, boolean messageBodyJms, Message message) throws JMSException;
145156

146157
/**
147158
* Convert a message into a Kafka Connect SourceRecord.
148-
*
159+
*
149160
* @param context the JMS context to use for building messages
150161
* @param topic the Kafka topic
151162
* @param messageBodyJms whether to interpret MQ messages as JMS messages
152163
* @param message the message
153-
*
164+
*
154165
* @return the Kafka Connect SourceRecord
155-
*
166+
*
156167
* @throws JMSException Message could not be converted
157168
*/
158169
@Override public SourceRecord toSourceRecord(JMSContext context, String topic, boolean messageBodyJms, Message message) throws JMSException {
159170
SchemaAndValue key = this.getKey(context, topic, message);
160171
SchemaAndValue value = this.getValue(context, topic, messageBodyJms, message);
161172

162-
return new SourceRecord(null, null, topic, key.schema(), key.value(), value.schema(), value.value());
163-
}
173+
if (copyJmsPropertiesFlag && messageBodyJms)
174+
return new SourceRecord(null, null, topic, (Integer) null, key.schema(), key.value(), value.schema(), value.value(), message.getJMSTimestamp(), jmsToKafkaHeaderConverter.convertJmsPropertiesToKafkaHeaders(message));
175+
else
176+
return new SourceRecord(null, null, topic, key.schema(), key.value(), value.schema(), value.value());
177+
178+
179+
}
180+
181+
182+
164183
}

0 commit comments

Comments
 (0)