Skip to content

Commit 44ed17c

Browse files
authored
Merge pull request #99 from dalelane/jms-rollback
fix: rollback messages to MQ if they cannot be transformed
2 parents f6ca557 + 39864c5 commit 44ed17c

File tree

6 files changed

+123
-3
lines changed

6 files changed

+123
-3
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ COPY --chown=esuser:esgroup --from=builder /opt/kafka/libs/ /opt/kafka/libs/
1010
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-distributed.properties /opt/kafka/config/
1111
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-log4j.properties /opt/kafka/config/
1212
RUN mkdir /opt/kafka/logs && chown esuser:esgroup /opt/kafka/logs
13-
COPY --chown=esuser:esgroup target/kafka-connect-mq-source-1.3.1-jar-with-dependencies.jar /opt/kafka/libs/
13+
COPY --chown=esuser:esgroup target/kafka-connect-mq-source-1.3.2-jar-with-dependencies.jar /opt/kafka/libs/
1414

1515
WORKDIR /opt/kafka
1616

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<groupId>com.ibm.eventstreams.connect</groupId>
2121
<artifactId>kafka-connect-mq-source</artifactId>
2222
<packaging>jar</packaging>
23-
<version>1.3.1</version>
23+
<version>1.3.2</version>
2424
<name>kafka-connect-mq-source</name>
2525
<organization>
2626
<name>IBM Corporation</name>

src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.ibm.eventstreams.connect.mqsource;
1717

18+
import java.util.ArrayList;
1819
import java.util.List;
1920
import java.util.concurrent.TimeoutException;
2021

@@ -23,6 +24,7 @@
2324
import javax.jms.JMSContext;
2425
import javax.jms.JMSException;
2526
import javax.jms.Message;
27+
import javax.jms.MessageConsumer;
2628
import javax.jms.MessageProducer;
2729
import javax.jms.Session;
2830

@@ -143,4 +145,48 @@ public void putAllMessagesToQueue(String queueName, List<Message> messages) thro
143145

144146
connection.close();
145147
}
148+
149+
150+
/**
151+
* Gets all messages from the specified MQ queue. Used in tests to
152+
* verify what is left on the test queue
153+
*/
154+
public List<Message> getAllMessagesFromQueue(String queueName) throws JMSException {
155+
Connection connection = null;
156+
Session session = null;
157+
Destination destination = null;
158+
MessageConsumer consumer = null;
159+
160+
JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
161+
162+
JmsConnectionFactory cf = ff.createConnectionFactory();
163+
cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, "localhost");
164+
cf.setIntProperty(WMQConstants.WMQ_PORT, getMQPort());
165+
cf.setStringProperty(WMQConstants.WMQ_CHANNEL, getChannelName());
166+
cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
167+
cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, getQmgrName());
168+
cf.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, false);
169+
170+
connection = cf.createConnection();
171+
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
172+
173+
destination = session.createQueue(queueName);
174+
consumer = session.createConsumer(destination);
175+
176+
connection.start();
177+
178+
List<Message> messages = new ArrayList<>();
179+
Message message;
180+
do {
181+
message = consumer.receiveNoWait();
182+
if (message != null) {
183+
messages.add(message);
184+
}
185+
}
186+
while (message != null);
187+
188+
connection.close();
189+
190+
return messages;
191+
}
146192
}

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,22 @@
1919
import static org.junit.Assert.assertEquals;
2020
import static org.junit.Assert.assertNotNull;
2121
import static org.junit.Assert.assertNull;
22+
import static org.junit.Assert.assertThrows;
23+
import static org.junit.Assert.assertTrue;
2224

2325
import java.util.ArrayList;
2426
import java.util.Arrays;
2527
import java.util.HashMap;
2628
import java.util.List;
2729
import java.util.Map;
2830

31+
import javax.jms.MapMessage;
2932
import javax.jms.Message;
33+
import javax.jms.MessageFormatException;
3034
import javax.jms.TextMessage;
3135

3236
import org.apache.kafka.connect.data.Schema;
37+
import org.apache.kafka.connect.errors.ConnectException;
3338
import org.apache.kafka.connect.source.SourceRecord;
3439
import org.junit.After;
3540
import org.junit.Test;
@@ -259,6 +264,57 @@ public void verifyMessageBatchGroupCommits() throws Exception {
259264

260265

261266

267+
@Test
268+
public void verifyMessageBatchRollback() throws Exception {
269+
MQSourceTask newConnectTask = new MQSourceTask();
270+
271+
Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
272+
connectorConfigProps.put("mq.message.body.jms", "true");
273+
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
274+
connectorConfigProps.put("mq.batch.size", "10");
275+
276+
newConnectTask.start(connectorConfigProps);
277+
278+
// Test overview:
279+
//
280+
// messages 01-15 - valid messages
281+
// message 16 - a message that the builder can't process
282+
// messages 17-30 - valid messages
283+
284+
List<Message> messages = new ArrayList<>();
285+
for (int i = 1; i <= 15; i++) {
286+
messages.add(getJmsContext().createTextMessage("message " + i));
287+
}
288+
MapMessage invalidMessage = getJmsContext().createMapMessage();
289+
invalidMessage.setString("test", "builder cannot convert this");
290+
messages.add(invalidMessage);
291+
for (int i = 17; i <= 30; i++) {
292+
messages.add(getJmsContext().createTextMessage("message " + i));
293+
}
294+
putAllMessagesToQueue(MQ_QUEUE, messages);
295+
296+
List<SourceRecord> kafkaMessages;
297+
298+
// first batch should successfully retrieve messages 01-10
299+
kafkaMessages = newConnectTask.poll();
300+
assertEquals(10, kafkaMessages.size());
301+
newConnectTask.commit();
302+
newConnectTask.commit();
303+
304+
// second batch (11-20) should fail because of message 16
305+
ConnectException exc = assertThrows(ConnectException.class, () -> {
306+
newConnectTask.poll();
307+
});
308+
assertTrue(exc.getMessage().equals("Unsupported JMS message type"));
309+
310+
// there should be 20 messages left on the MQ queue (messages 11-30)
311+
newConnectTask.stop();
312+
List<Message> remainingMQMessages = getAllMessagesFromQueue(MQ_QUEUE);
313+
assertEquals(20, remainingMQMessages.size());
314+
}
315+
316+
317+
262318
@Test
263319
public void verifyMessageIdAsKey() throws Exception {
264320
connectTask = new MQSourceTask();

src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,13 +292,31 @@ public SourceRecord receive(boolean wait) {
292292
}
293293
catch (ConnectException exc) {
294294
log.error("Connect exception {}", exc);
295+
attemptRollback();
295296
throw exc;
296297
}
297298

298299
log.trace("[{}] Exit {}.receive, retval={}", Thread.currentThread().getId(), this.getClass().getName(), sr);
299300
return sr;
300301
}
301302

303+
304+
/**
305+
* Returns messages got from the MQ queue. Called if the builder has failed to transform the
306+
* messages and return them to Connect for producing to Kafka.
307+
*/
308+
private void attemptRollback() {
309+
log.trace("[{}] Entry {}.attemptRollback", Thread.currentThread().getId(), this.getClass().getName());
310+
try {
311+
jmsCtxt.rollback();
312+
}
313+
catch (JMSRuntimeException jmsExc) {
314+
log.error("rollback failed {}", jmsExc);
315+
}
316+
log.trace("[{}] Exit {}.attemptRollback", Thread.currentThread().getId(), this.getClass().getName());
317+
}
318+
319+
302320
/**
303321
* Commits the current transaction. If the current transaction contains a message that could not
304322
* be processed, the transaction is "in peril" and is rolled back instead to avoid data loss.

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public class MQSourceConnector extends SourceConnector {
135135
public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic.";
136136
public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic";
137137

138-
public static String VERSION = "1.3.1";
138+
public static String VERSION = "1.3.2";
139139

140140
private Map<String, String> configProps;
141141

0 commit comments

Comments
 (0)