Skip to content

Commit ce2dd16

Browse files
Support for destination as key and reading MQMD
1 parent 3a73dce commit ce2dd16

File tree

6 files changed

+35
-9
lines changed

6 files changed

+35
-9
lines changed

config/mq-source.properties

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2017, 2018 IBM Corporation
1+
# Copyright 2017, 2018, 2019 IBM Corporation
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -56,7 +56,7 @@ mq.record.builder=com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBu
5656
# mq.message.body.jms=
5757

5858
# The JMS message header to use as the Kafka record key - optional
59-
# Valid values are JMSMessageID, JMSCorrelationID and JMSCorrelationIDAsBytes
59+
# Valid values are JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes and JMSDestination
6060
# Don't forget to set key.converter to a compatible converter as described in README.md
6161
# mq.record.builder.key.header=
6262

@@ -67,6 +67,9 @@ mq.record.builder=com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBu
6767
# The distinguished name pattern of the TLS (SSL) peer - optional
6868
# mq.ssl.peer.name=
6969

70+
# Whether to enable reading of all MQMD fields (default false) - optional
71+
# mq.message.mqmd.read=
72+
7073
# The name of the target Kafka topic - required
7174
topic=
7275

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<groupId>com.ibm.eventstreams.connect</groupId>
2121
<artifactId>kafka-connect-mq-source</artifactId>
2222
<packaging>jar</packaging>
23-
<version>1.0.2</version>
23+
<version>1.1.0</version>
2424
<name>kafka-connect-mq-source</name>
2525
<organization>
2626
<name>IBM Corporation</name>

src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ public void configure(Map<String, String> props) {
9393
String ccdtUrl = props.get(MQSourceConnector.CONFIG_NAME_MQ_CCDT_URL);
9494
String builderClass = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER);
9595
String mbj = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS);
96+
String mdr = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_MQMD_READ);
9697
String sslCipherSuite = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE);
9798
String sslPeerName = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_PEER_NAME);
9899
String topic = props.get(MQSourceConnector.CONFIG_NAME_TOPIC);
@@ -157,6 +158,12 @@ else if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE
157158
}
158159
}
159160

161+
if (mdr != null) {
162+
if (Boolean.parseBoolean(mdr)) {
163+
queue.setBooleanProperty(WMQConstants.WMQ_MQMD_READ_ENABLED, true);
164+
}
165+
}
166+
160167
this.topic = topic;
161168
}
162169
catch (JMSException | JMSRuntimeException jmse) {

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public class MQSourceConnector extends SourceConnector {
8383
public static final String CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSMESSAGEID = "JMSMessageID";
8484
public static final String CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONID = "JMSCorrelationID";
8585
public static final String CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONIDASBYTES = "JMSCorrelationIDAsBytes";
86+
public static final String CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSDESTINATION = "JMSDestination";
8687

8788
public static final String CONFIG_NAME_MQ_SSL_CIPHER_SUITE = "mq.ssl.cipher.suite";
8889
public static final String CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE = "The name of the cipher suite for the TLS (SSL) connection.";
@@ -98,11 +99,15 @@ public class MQSourceConnector extends SourceConnector {
9899
public static final int CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT = 250;
99100
public static final int CONFIG_VALUE_MQ_BATCH_SIZE_MINIMUM = 1;
100101

102+
public static final String CONFIG_NAME_MQ_MESSAGE_MQMD_READ = "mq.message.mqmd.read";
103+
public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_MQMD_READ = "Whether to enable reading of all MQMD fields.";
104+
public static final String CONFIG_DISPLAY_MQ_MESSAGE_MQMD_READ = "Enable reading of MQMD fields";
105+
101106
public static final String CONFIG_NAME_TOPIC = "topic";
102107
public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic.";
103108
public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic";
104109

105-
public static String VERSION = "1.0.2";
110+
public static String VERSION = "1.1.0";
106111

107112
private Map<String, String> configProps;
108113

@@ -240,7 +245,11 @@ public class MQSourceConnector extends SourceConnector {
240245
CONFIG_DOCUMENTATION_MQ_BATCH_SIZE, CONFIG_GROUP_MQ, 14, Width.MEDIUM,
241246
CONFIG_DISPLAY_MQ_BATCH_SIZE);
242247

243-
config.define(CONFIG_NAME_TOPIC, Type.STRING, null, Importance.HIGH,
248+
config.define(CONFIG_NAME_MQ_MESSAGE_MQMD_READ, Type.BOOLEAN, Boolean.FALSE, Importance.LOW,
249+
CONFIG_DOCUMENTATION_MQ_MESSAGE_MQMD_READ, CONFIG_GROUP_MQ, 15, Width.SHORT,
250+
CONFIG_DISPLAY_MQ_MESSAGE_MQMD_READ);
251+
252+
config.define(CONFIG_NAME_TOPIC, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH,
244253
CONFIG_DOCUMENTATION_TOPIC, null, 0, Width.MEDIUM,
245254
CONFIG_DISPLAY_TOPIC);
246255

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
public abstract class BaseRecordBuilder implements RecordBuilder {
3838
private static final Logger log = LoggerFactory.getLogger(BaseRecordBuilder.class);
3939

40-
public enum KeyHeader {NONE, MESSAGE_ID, CORRELATION_ID, CORRELATION_ID_AS_BYTES};
40+
public enum KeyHeader {NONE, MESSAGE_ID, CORRELATION_ID, CORRELATION_ID_AS_BYTES, DESTINATION};
4141
protected KeyHeader keyheader = KeyHeader.NONE;
4242

4343
/**
@@ -64,8 +64,11 @@ else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_J
6464
keyheader = KeyHeader.CORRELATION_ID_AS_BYTES;
6565
log.debug("Setting Kafka record key from JMSCorrelationIDAsBytes header field");
6666
}
67-
else
68-
{
67+
else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSDESTINATION)) {
68+
keyheader = KeyHeader.DESTINATION;
69+
log.debug("Setting Kafka record key from JMSDestination header field");
70+
}
71+
else {
6972
log.error("Unsupported MQ record builder key header value {}", kh);
7073
throw new ConnectException("Unsupported MQ record builder key header value");
7174
}
@@ -115,6 +118,10 @@ public SchemaAndValue getKey(JMSContext context, String topic, Message message)
115118
keySchema = Schema.OPTIONAL_BYTES_SCHEMA;
116119
key = message.getJMSCorrelationIDAsBytes();
117120
break;
121+
case DESTINATION:
122+
keySchema = Schema.OPTIONAL_STRING_SCHEMA;
123+
key = message.getJMSDestination().toString();
124+
break;
118125
default:
119126
break;
120127
}

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,4 @@ else if (message instanceof TextMessage) {
8181

8282
return converter.toConnectData(topic, payload);
8383
}
84-
}
84+
}

0 commit comments

Comments
 (0)