Skip to content

Commit bd8fc53

Browse files
Change exception handling to enable reconnection
1 parent 5cbd3cd commit bd8fc53

File tree

1 file changed

+43
-23
lines changed

1 file changed

+43
-23
lines changed

src/main/java/com/ibm/mq/kafkaconnect/JMSReader.java

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2017 IBM Corporation
2+
* Copyright 2017, 2018 IBM Corporation
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -139,8 +139,24 @@ public void configure(Map<String, String> props) {
139139
* @throws ConnectException Operation failed and connector should stop.
140140
*/
141141
public void connect() throws ConnectException, RetriableException {
142-
connectInternal();
143-
log.info("Connection to MQ established");
142+
try {
143+
if (userName != null) {
144+
jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED);
145+
}
146+
else {
147+
jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED);
148+
}
149+
150+
jmsCons = jmsCtxt.createConsumer(queue);
151+
connected = true;
152+
153+
log.info("Connection to MQ established");
154+
}
155+
catch (JMSRuntimeException jmse) {
156+
log.info("Connection to MQ could not be established");
157+
log.debug("JMS exception {}", jmse);
158+
handleException(jmse);
159+
}
144160
}
145161

146162
/**
@@ -149,12 +165,11 @@ public void connect() throws ConnectException, RetriableException {
149165
* @param wait Whether to wait indefinitely for a message
150166
*
151167
* @return The SourceRecord representing the message
152-
*
153-
* @throws RetriableException Operation failed, but connector should continue to retry.
154-
* @throws ConnectException Operation failed and connector should stop.
155168
*/
156-
public SourceRecord receive(boolean wait) throws ConnectException, RetriableException {
157-
connectInternal();
169+
public SourceRecord receive(boolean wait) {
170+
if (!connectInternal()) {
171+
return null;
172+
}
158173

159174
Message m = null;
160175
SourceRecord sr = null;
@@ -197,12 +212,12 @@ public SourceRecord receive(boolean wait) throws ConnectException, RetriableExce
197212
/**
198213
* Commits the current transaction. If the current transaction contains a message that could not
199214
* be processed, the transaction is "in peril" and is rolled back instead to avoid data loss.
200-
*
201-
* @throws RetriableException Operation failed, but connector should continue to retry.
202-
* @throws ConnectException Operation failed and connector should stop.
203215
*/
204-
public void commit() throws ConnectException, RetriableException {
205-
connectInternal();
216+
public void commit() {
217+
if (!connectInternal()) {
218+
return;
219+
}
220+
206221
try {
207222
if (inflight) {
208223
inflight = false;
@@ -211,7 +226,6 @@ public void commit() throws ConnectException, RetriableException {
211226
inperil = false;
212227
log.trace("Rolling back in-flight transaction");
213228
jmsCtxt.rollback();
214-
throw new RetriableException("Transaction rolled back");
215229
}
216230
else {
217231
jmsCtxt.commit();
@@ -241,16 +255,15 @@ public void close() {
241255
/**
242256
* Internal method to connect to MQ.
243257
*
244-
* @throws RetriableException Operation failed, but connector should continue to retry.
245-
* @throws ConnectException Operation failed and connector should stop.
258+
* @return true if connection can be used, false otherwise
246259
*/
247-
private void connectInternal() throws ConnectException, RetriableException {
260+
private boolean connectInternal() {
248261
if (connected) {
249-
return;
262+
return true;
250263
}
251264

252265
if (closeNow.get()) {
253-
throw new ConnectException("Connection closing");
266+
return false;
254267
}
255268

256269
try {
@@ -263,11 +276,16 @@ private void connectInternal() throws ConnectException, RetriableException {
263276

264277
jmsCons = jmsCtxt.createConsumer(queue);
265278
connected = true;
279+
280+
log.info("Connection to MQ established");
266281
}
267282
catch (JMSRuntimeException jmse) {
268283
log.debug("JMS exception {}", jmse);
269284
handleException(jmse);
285+
return false;
270286
}
287+
288+
return true;
271289
}
272290

273291
/**
@@ -289,14 +307,15 @@ private void closeInternal() {
289307
finally
290308
{
291309
jmsCtxt = null;
310+
log.debug("Connection to MQ closed");
292311
}
293312
}
294313

295314
/**
296315
* Handles exceptions from MQ. Some JMS exceptions are treated as retriable meaning that the
297316
* connector can keep running and just trying again is likely to fix things.
298317
*/
299-
private void handleException(Throwable exc) throws ConnectException, RetriableException {
318+
private ConnectException handleException(Throwable exc) {
300319
boolean isRetriable = false;
301320
boolean mustClose = true;
302321
int reason = -1;
@@ -330,7 +349,7 @@ private void handleException(Throwable exc) throws ConnectException, RetriableEx
330349
isRetriable = true;
331350
break;
332351

333-
// These reason codes indicates that the connect is still OK, but just retrying later
352+
// These reason codes indicate that the connect is still OK, but just retrying later
334353
// will probably recover - possibly with administrative action on the queue manager
335354
case MQConstants.MQRC_GET_INHIBITED:
336355
isRetriable = true;
@@ -343,8 +362,9 @@ private void handleException(Throwable exc) throws ConnectException, RetriableEx
343362
}
344363

345364
if (isRetriable) {
346-
throw new RetriableException(exc);
365+
return new RetriableException(exc);
347366
}
348-
throw new ConnectException(exc);
367+
368+
return new ConnectException(exc);
349369
}
350370
}

0 commit comments

Comments
 (0)