Skip to content

Commit 35ace10

Browse files
Exponential back-off for MQ reconnect
1 parent e3d17e6 commit 35ace10

File tree

1 file changed

+21
-4
lines changed

1 file changed

+21
-4
lines changed

src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,15 @@ public class JMSReader {
5858

5959
private RecordBuilder builder;
6060

61-
private boolean connected = false; // Whether connected to MQ
62-
private boolean inflight = false; // Whether messages in-flight in current transaction
63-
private boolean inperil = false; // Whether current transaction must be forced to roll back
64-
private AtomicBoolean closeNow = new AtomicBoolean(); // Whether close has been requested
61+
private boolean connected = false; // Whether connected to MQ
62+
private boolean inflight = false; // Whether messages in-flight in current transaction
63+
private boolean inperil = false; // Whether current transaction must be forced to roll back
64+
private AtomicBoolean closeNow = new AtomicBoolean(); // Whether close has been requested
65+
private long reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN; // Delay between repeated reconnect attempts
6566

6667
private static long RECEIVE_TIMEOUT = 30000l;
68+
private static long RECONNECT_DELAY_MILLIS_MIN = 64l;
69+
private static long RECONNECT_DELAY_MILLIS_MAX = 8192l;
6770

6871
public JMSReader() {}
6972

@@ -297,11 +300,25 @@ private boolean connectInternal() {
297300
}
298301

299302
jmsCons = jmsCtxt.createConsumer(queue);
303+
reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN;
300304
connected = true;
301305

302306
log.info("Connection to MQ established");
303307
}
304308
catch (JMSRuntimeException jmse) {
309+
// Delay slightly so that repeated reconnect loops don't run too fast
310+
try {
311+
Thread.sleep(reconnectDelayMillis);
312+
}
313+
catch (InterruptedException ie) {
314+
;
315+
}
316+
317+
if (reconnectDelayMillis < RECONNECT_DELAY_MILLIS_MAX)
318+
{
319+
reconnectDelayMillis = reconnectDelayMillis * 2;
320+
}
321+
305322
log.error("JMS exception {}", jmse);
306323
handleException(jmse);
307324
log.trace("[{}] Exit {}.connectInternal, retval=false", Thread.currentThread().getId(), this.getClass().getName());

0 commit comments

Comments
 (0)