Skip to content

Commit bbbd128

Browse files
committed
changes based on review comments
1 parent 5d646e6 commit bbbd128

File tree

5 files changed

+7
-55
lines changed

5 files changed

+7
-55
lines changed

src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JMSReader.java renamed to src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package com.ibm.eventstreams.connect.mqsource.processor;
16+
package com.ibm.eventstreams.connect.mqsource;
1717

18-
import com.ibm.eventstreams.connect.mqsource.MQSourceConnector;
1918
import com.ibm.eventstreams.connect.mqsource.builders.RecordBuilder;
2019
import com.ibm.mq.MQException;
2120
import com.ibm.mq.constants.MQConstants;

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package com.ibm.eventstreams.connect.mqsource;
1717

18-
import com.ibm.eventstreams.connect.mqsource.processor.JMSReader;
1918
import org.apache.kafka.connect.source.SourceRecord;
2019
import org.apache.kafka.connect.source.SourceTask;
2120
import org.slf4j.Logger;

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,10 @@ public void configure(Map<String, String> props) {
7070
}
7171

7272
String str = props.get(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER);
73-
if (str != null && ("true".equalsIgnoreCase(str) || "false".equalsIgnoreCase(str))) {
73+
7474
copyJmsPropertiesFlag = Boolean.parseBoolean(str);
7575
jmsToKafkaHeaderConverter = new JmsToKafkaHeaderConverter();
76-
}
76+
7777

7878
log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName());
7979
}
@@ -160,9 +160,8 @@ public SourceRecord toSourceRecord(JMSContext context, String topic, boolean mes
160160
SchemaAndValue key = this.getKey(context, topic, message);
161161
SchemaAndValue value = this.getValue(context, topic, messageBodyJms, message);
162162

163-
if (copyJmsPropertiesFlag)
164-
return new SourceRecord(null, null, topic, (Integer) null, key.schema(), key.value(), value.schema(), value.value(), message.getJMSTimestamp(), jmsToKafkaHeaderConverter.convertJmsPropertiesToKafkaHeaders(messageBodyJms, message));
165-
163+
if (copyJmsPropertiesFlag && messageBodyJms)
164+
return new SourceRecord(null, null, topic, (Integer) null, key.schema(), key.value(), value.schema(), value.value(), message.getJMSTimestamp(), jmsToKafkaHeaderConverter.convertJmsPropertiesToKafkaHeaders(message));
166165
else
167166
return new SourceRecord(null, null, topic, key.schema(), key.value(), value.schema(), value.value());
168167

src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,13 @@ public class JmsToKafkaHeaderConverter {
2121
* Copies the JMS properties to Kafka headers.
2222
*
2323
* @param message JMS message.
24-
* @param messageBodyJms flag whether incoming message is treated as jms message.
24+
*
2525
* @return Kafka connect headers.
2626
*/
27-
public ConnectHeaders convertJmsPropertiesToKafkaHeaders(boolean messageBodyJms, Message message) {
27+
public ConnectHeaders convertJmsPropertiesToKafkaHeaders(Message message) {
2828

2929
ConnectHeaders connectHeaders = new ConnectHeaders();
3030

31-
if (messageBodyJms) {
3231

3332

3433
ArrayList jmsPropertyKeys = null;
@@ -47,8 +46,6 @@ public ConnectHeaders convertJmsPropertiesToKafkaHeaders(boolean messageBodyJms,
4746
log.error("JMS message properties could not be read", e);
4847
}
4948

50-
}
51-
5249
return connectHeaders;
5350

5451
}

src/main/resources/log4j2.xml

Lines changed: 0 additions & 42 deletions
This file was deleted.

0 commit comments

Comments
 (0)