Skip to content

Commit e1a9ad8

Browse files
committed
fix: rollback messages to MQ if they cannot be transformed
Some JMS message types cannot be processed by the record builder. To avoid losing these messages, they should be rolled back to the MQ queue that they came from. Signed-off-by: Dale Lane <dale.lane@uk.ibm.com>
1 parent 8505dfe commit e1a9ad8

File tree

3 files changed

+120
-0
lines changed

3 files changed

+120
-0
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.ibm.eventstreams.connect.mqsource;
1717

18+
import java.util.ArrayList;
1819
import java.util.List;
1920
import java.util.concurrent.TimeoutException;
2021

@@ -23,6 +24,7 @@
2324
import javax.jms.JMSContext;
2425
import javax.jms.JMSException;
2526
import javax.jms.Message;
27+
import javax.jms.MessageConsumer;
2628
import javax.jms.MessageProducer;
2729
import javax.jms.Session;
2830

@@ -143,4 +145,48 @@ public void putAllMessagesToQueue(String queueName, List<Message> messages) thro
143145

144146
connection.close();
145147
}
148+
149+
150+
/**
151+
* Gets all messages from the specified MQ queue. Used in tests to
152+
* verify what is left on the test queue
153+
*/
154+
public List<Message> getAllMessagesFromQueue(String queueName) throws JMSException {
155+
Connection connection = null;
156+
Session session = null;
157+
Destination destination = null;
158+
MessageConsumer consumer = null;
159+
160+
JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
161+
162+
JmsConnectionFactory cf = ff.createConnectionFactory();
163+
cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, "localhost");
164+
cf.setIntProperty(WMQConstants.WMQ_PORT, getMQPort());
165+
cf.setStringProperty(WMQConstants.WMQ_CHANNEL, getChannelName());
166+
cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
167+
cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, getQmgrName());
168+
cf.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, false);
169+
170+
connection = cf.createConnection();
171+
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
172+
173+
destination = session.createQueue(queueName);
174+
consumer = session.createConsumer(destination);
175+
176+
connection.start();
177+
178+
List<Message> messages = new ArrayList<>();
179+
Message message;
180+
do {
181+
message = consumer.receiveNoWait();
182+
if (message != null) {
183+
messages.add(message);
184+
}
185+
}
186+
while (message != null);
187+
188+
connection.close();
189+
190+
return messages;
191+
}
146192
}

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,22 @@
1919
import static org.junit.Assert.assertEquals;
2020
import static org.junit.Assert.assertNotNull;
2121
import static org.junit.Assert.assertNull;
22+
import static org.junit.Assert.assertThrows;
23+
import static org.junit.Assert.assertTrue;
2224

2325
import java.util.ArrayList;
2426
import java.util.Arrays;
2527
import java.util.HashMap;
2628
import java.util.List;
2729
import java.util.Map;
2830

31+
import javax.jms.MapMessage;
2932
import javax.jms.Message;
33+
import javax.jms.MessageFormatException;
3034
import javax.jms.TextMessage;
3135

3236
import org.apache.kafka.connect.data.Schema;
37+
import org.apache.kafka.connect.errors.ConnectException;
3338
import org.apache.kafka.connect.source.SourceRecord;
3439
import org.junit.Test;
3540

@@ -246,6 +251,57 @@ public void verifyMessageBatchGroupCommits() throws Exception {
246251

247252

248253

254+
@Test
255+
public void verifyMessageBatchRollback() throws Exception {
256+
MQSourceTask newConnectTask = new MQSourceTask();
257+
258+
Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
259+
connectorConfigProps.put("mq.message.body.jms", "true");
260+
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
261+
connectorConfigProps.put("mq.batch.size", "10");
262+
263+
newConnectTask.start(connectorConfigProps);
264+
265+
// Test overview:
266+
//
267+
// messages 01-15 - valid messages
268+
// message 16 - a message that the builder can't process
269+
// messages 17-30 - valid messages
270+
271+
List<Message> messages = new ArrayList<>();
272+
for (int i = 1; i <= 15; i++) {
273+
messages.add(getJmsContext().createTextMessage("message " + i));
274+
}
275+
MapMessage invalidMessage = getJmsContext().createMapMessage();
276+
invalidMessage.setString("test", "builder cannot convert this");
277+
messages.add(invalidMessage);
278+
for (int i = 17; i <= 30; i++) {
279+
messages.add(getJmsContext().createTextMessage("message " + i));
280+
}
281+
putAllMessagesToQueue(MQ_QUEUE, messages);
282+
283+
List<SourceRecord> kafkaMessages;
284+
285+
// first batch should successfully retrieve messages 01-10
286+
kafkaMessages = newConnectTask.poll();
287+
assertEquals(10, kafkaMessages.size());
288+
newConnectTask.commit();
289+
newConnectTask.commit();
290+
291+
// second batch (11-20) should fail because of message 16
292+
ConnectException exc = assertThrows(ConnectException.class, () -> {
293+
newConnectTask.poll();
294+
});
295+
assertTrue(exc.getMessage().equals("Unsupported JMS message type"));
296+
297+
// there should be 20 messages left on the MQ queue (messages 11-30)
298+
newConnectTask.stop();
299+
List<Message> remainingMQMessages = getAllMessagesFromQueue(MQ_QUEUE);
300+
assertEquals(20, remainingMQMessages.size());
301+
}
302+
303+
304+
249305
@Test
250306
public void verifyMessageIdAsKey() throws Exception {
251307
MQSourceTask newConnectTask = new MQSourceTask();

src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,13 +292,31 @@ public SourceRecord receive(boolean wait) {
292292
}
293293
catch (ConnectException exc) {
294294
log.error("Connect exception {}", exc);
295+
attemptRollback();
295296
throw exc;
296297
}
297298

298299
log.trace("[{}] Exit {}.receive, retval={}", Thread.currentThread().getId(), this.getClass().getName(), sr);
299300
return sr;
300301
}
301302

303+
304+
/**
305+
* Returns messages got from the MQ queue. Called if the builder has failed to transform the
306+
* messages and return them to Connect for producing to Kafka.
307+
*/
308+
private void attemptRollback() {
309+
log.trace("[{}] Entry {}.attemptRollback", Thread.currentThread().getId(), this.getClass().getName());
310+
try {
311+
jmsCtxt.rollback();
312+
}
313+
catch (JMSRuntimeException jmsExc) {
314+
log.error("rollback failed {}", jmsExc);
315+
}
316+
log.trace("[{}] Exit {}.attemptRollback", Thread.currentThread().getId(), this.getClass().getName());
317+
}
318+
319+
302320
/**
303321
* Commits the current transaction. If the current transaction contains a message that could not
304322
* be processed, the transaction is "in peril" and is rolled back instead to avoid data loss.

0 commit comments

Comments
 (0)