Skip to content

Commit d4a0f55

Browse files
committed
added code for copying mq header to kafka header conversion
1 parent f2638f0 commit d4a0f55

File tree

7 files changed

+36
-1
lines changed

7 files changed

+36
-1
lines changed

src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java

100644100755
File mode changed.

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

100644100755
File mode changed.

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java

100644100755
File mode changed.

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java

100644100755
Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import com.ibm.eventstreams.connect.mqsource.MQSourceConnector;
1919

20+
import java.util.Arrays;
21+
import java.util.Enumeration;
2022
import java.util.Map;
2123

2224
import javax.jms.JMSContext;
@@ -26,6 +28,9 @@
2628
import org.apache.kafka.connect.data.Schema;
2729
import org.apache.kafka.connect.data.SchemaAndValue;
2830
import org.apache.kafka.connect.errors.ConnectException;
31+
import org.apache.kafka.connect.header.ConnectHeaders;
32+
import org.apache.kafka.connect.header.Header;
33+
import org.apache.kafka.connect.header.Headers;
2934
import org.apache.kafka.connect.source.SourceRecord;
3035

3136
import org.slf4j.Logger;
@@ -159,6 +164,36 @@ public SchemaAndValue getKey(JMSContext context, String topic, Message message)
159164
SchemaAndValue key = this.getKey(context, topic, message);
160165
SchemaAndValue value = this.getValue(context, topic, messageBodyJms, message);
161166

162-
return new SourceRecord(null, null, topic, key.schema(), key.value(), value.schema(), value.value());
167+
//SourceRecord sourceRecord = new SourceRecord(null, null, topic, key.schema(), key.value(), value.schema(), value.value());
168+
ConnectHeaders connectHeaders = convertJmsPropertiesToKafkaHeaders(messageBodyJms, message);
169+
SourceRecord sourceRecord = new SourceRecord(null, null, topic, (Integer)null, key.schema(), key.value(), value.schema(), value.value(), message.getJMSTimestamp(), connectHeaders);
170+
171+
return sourceRecord;
172+
}
173+
174+
public ConnectHeaders convertJmsPropertiesToKafkaHeaders(boolean messageBodyJms, Message message){
175+
176+
ConnectHeaders connectHeaders = new ConnectHeaders();
177+
178+
if (messageBodyJms==true) {
179+
180+
try {
181+
Enumeration jmsPropertyNames = message.getPropertyNames();
182+
while (jmsPropertyNames.hasMoreElements()) {
183+
Object jmsPropertyKey = jmsPropertyNames.nextElement();
184+
String jmsPropertyValue = message.getStringProperty(jmsPropertyKey.toString());
185+
connectHeaders.addString(jmsPropertyKey.toString(), jmsPropertyValue);
186+
187+
}
188+
}
189+
catch (JMSException e){
190+
log.error("MQ/JMS message header could not be read", e);
191+
}
192+
193+
194+
}
195+
196+
return connectHeaders;
197+
163198
}
164199
}

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilder.java

100644100755
File mode changed.

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java

100644100755
File mode changed.

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilder.java

100644100755
File mode changed.

0 commit comments

Comments
 (0)