Skip to content

Commit 2b6613c

Browse files
author
Andrew Schofield
committed
Reformatting and documentation tweaks
1 parent 6960a50 commit 2b6613c

File tree

6 files changed

+28
-42
lines changed

6 files changed

+28
-42
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
226226
| mq.record.builder | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder |
227227
| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | |
228228
| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes, JMSDestination |
229+
| mq.jms.properties.copy.to.kafka.headers | Whether to copy JMS message properties to Kafka headers | boolean | false | |
229230
| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite |
230231
| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern |
231232
| mq.ssl.keystore.location | The path to the JKS keystore to use for SSL (TLS) connections | string | JVM keystore | Local path to a JKS file |

config/mq-source.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ mq.record.builder=com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBu
6363
# Don't forget to set key.converter to a compatible converter as described in README.md
6464
# mq.record.builder.key.header=
6565

66+
# Whether to copy JMS message properties to Kafka headers
67+
# mq.jms.properties.copy.to.kafka.headers=
68+
6669
# The name of the cipher suite for TLS (SSL) connection (default blank, meaning do not use TLS) - optional
6770
# See https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.1.0/com.ibm.mq.dev.doc/q113220_.htm for valid values
6871
# mq.ssl.cipher.suite=

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ public class MQSourceConnector extends SourceConnector {
7878
public static final String CONFIG_DISPLAY_MQ_MESSAGE_BODY_JMS = "Message body as JMS";
7979

8080
public static final String CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER = "mq.jms.properties.copy.to.kafka.headers";
81-
public static final String CONFIG_DOCUMENTATION_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER = "Whether to copy JMS Message properties to Kafka headers";
82-
public static final String CONFIG_DISPLAY_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER = "Copy JMS Message Property to Kafka Headers";
81+
public static final String CONFIG_DOCUMENTATION_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER = "Whether to copy JMS message properties to Kafka headers.";
82+
public static final String CONFIG_DISPLAY_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER = "Copy JMS message properties to Kafka headers";
8383

8484
public static final String CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER = "mq.record.builder.key.header";
8585
public static final String CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_KEY_HEADER = "The JMS message header to use as the Kafka record key.";
@@ -123,13 +123,11 @@ public class MQSourceConnector extends SourceConnector {
123123
public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_MQMD_READ = "Whether to enable reading of all MQMD fields.";
124124
public static final String CONFIG_DISPLAY_MQ_MESSAGE_MQMD_READ = "Enable reading of MQMD fields";
125125

126-
127126
public static final String CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP = "mq.user.authentication.mqcsp";
128127
public static final String CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP = "Whether to use MQ connection security parameters (MQCSP).";
129128
public static final String CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP = "User authentication using MQCSP";
130129

131130
public static final String CONFIG_NAME_TOPIC = "topic";
132-
133131
public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic.";
134132
public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic";
135133

@@ -292,15 +290,13 @@ public class MQSourceConnector extends SourceConnector {
292290
CONFIG_DISPLAY_MQ_MESSAGE_MQMD_READ);
293291

294292
config.define(CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP, Type.BOOLEAN, Boolean.TRUE, Importance.LOW,
295-
CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP, CONFIG_GROUP_MQ, 20, Width.SHORT,
296-
CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP);
297-
293+
CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP, CONFIG_GROUP_MQ, 20, Width.SHORT,
294+
CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP);
298295

299296
config.define(CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, Type.BOOLEAN, Boolean.FALSE, Importance.LOW,
300297
CONFIG_DOCUMENTATION_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, CONFIG_GROUP_MQ, 21, Width.MEDIUM,
301298
CONFIG_DISPLAY_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER);
302299

303-
304300
config.define(CONFIG_NAME_TOPIC, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH,
305301
CONFIG_DOCUMENTATION_TOPIC, null, 0, Width.MEDIUM,
306302
CONFIG_DISPLAY_TOPIC);

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,9 @@ else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_J
7979
}
8080

8181
String str = props.get(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER);
82-
8382
copyJmsPropertiesFlag = Boolean.parseBoolean(Optional.ofNullable(str).orElse("false"));
84-
8583
jmsToKafkaHeaderConverter = new JmsToKafkaHeaderConverter();
8684

87-
8885
log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName());
8986
}
9087

@@ -174,10 +171,5 @@ public SchemaAndValue getKey(JMSContext context, String topic, Message message)
174171
return new SourceRecord(null, null, topic, (Integer) null, key.schema(), key.value(), value.schema(), value.value(), message.getJMSTimestamp(), jmsToKafkaHeaderConverter.convertJmsPropertiesToKafkaHeaders(message));
175172
else
176173
return new SourceRecord(null, null, topic, key.schema(), key.value(), value.schema(), value.value());
177-
178-
179174
}
180-
181-
182-
183175
}
Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2017, 2018, 2019 IBM Corporation
2+
* Copyright 2019 IBM Corporation
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,18 +21,14 @@
2121

2222
import javax.jms.JMSException;
2323
import javax.jms.Message;
24-
import java.util.ArrayList;
2524
import java.util.Collections;
2625
import java.util.Enumeration;
2726
import java.util.List;
2827

29-
/*
28+
/**
3029
* Single responsibility class to copy JMS properties to Kafka headers.
31-
*
32-
* */
33-
30+
*/
3431
public class JmsToKafkaHeaderConverter {
35-
3632
private static final Logger log = LoggerFactory.getLogger(JmsToKafkaHeaderConverter.class);
3733

3834
/**
@@ -43,30 +39,28 @@ public class JmsToKafkaHeaderConverter {
4339
* @return Kafka connect headers.
4440
*/
4541
public ConnectHeaders convertJmsPropertiesToKafkaHeaders(Message message) {
46-
4742
ConnectHeaders connectHeaders = new ConnectHeaders();
4843

44+
try {
45+
@SuppressWarnings("unchecked")
46+
Enumeration<String> propertyNames = (Enumeration<String>)message.getPropertyNames();
47+
List<String> jmsPropertyKeys = Collections.list(propertyNames);
4948

50-
try {
51-
@SuppressWarnings("unchecked")
52-
//com.ibm.msg.client.jms.internal.JmsMessageImpl.getPropertyNames returns Enumeration<String>
53-
Enumeration<String> propertyNames = (Enumeration<String>)message.getPropertyNames();
54-
List<String> jmsPropertyKeys = Collections.list(propertyNames);
55-
56-
jmsPropertyKeys.forEach(key -> {
49+
jmsPropertyKeys.forEach(key -> {
5750
try {
5851
connectHeaders.addString(key.toString(), message.getObjectProperty(key.toString()).toString());
59-
} catch (JMSException e) {
60-
//Not failing the message processing if JMS properties cannot be read for some reason.
61-
log.error("JMS message properties could not be read", e);
6252
}
63-
});
64-
} catch (JMSException e) {
65-
//Not failing the message processing if JMS properties cannot be read for some reason.
66-
log.error("JMS message properties could not be read", e);
67-
}
68-
69-
return connectHeaders;
53+
catch (JMSException e) {
54+
// Not failing the message processing if JMS properties cannot be read for some reason.
55+
log.warn("JMS exception {}", e);
56+
}
57+
});
58+
}
59+
catch (JMSException e) {
60+
// Not failing the message processing if JMS properties cannot be read for some reason.
61+
log.warn("JMS exception {}", e);
62+
}
7063

64+
return connectHeaders;
7165
}
7266
}

src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2017, 2018, 2019 IBM Corporation
2+
* Copyright 2019 IBM Corporation
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

0 commit comments

Comments
 (0)