Skip to content

Commit d924e77

Browse files
committed
feat: new config to control constants in JMSWorker
- New config for JMS receive timeout - New config for JMS reconnect minimum delay - New config for JMS reconnect maximum delay Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent fb50376 commit d924e77

File tree

6 files changed

+112
-17
lines changed

6 files changed

+112
-17
lines changed

.github/ISSUE_TEMPLATE/BUG-REPORT.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ body:
5757
label: Version
5858
description: What version of our software are you running?
5959
options:
60-
- 2.2.0 (Default)
60+
- 2.2.1 (Default)
6161
- 1.3.5
6262
- older (<1.3.5)
6363
validations:

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,9 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
305305
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |
306306
| mq.max.poll.blocked.time.ms | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. |
307307
| mq.client.reconnect.options | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED |
308+
| mq.jms.receive.timeout | The timeout in milliseconds for receiving messages from JMS Consumer. | long | 2000L | 1 or greater |
309+
| mq.jms.reconnect.delay.min.ms | The minimum delay in milliseconds for reconnect attempts. | long | 64L | 1 or greater |
310+
| mq.jms.reconnect.delay.max.ms | The maximum delay in milliseconds for reconnect attempts. | long | 8192L | 1 or greater |
308311

309312
### Using a CCDT file
310313

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<groupId>com.ibm.eventstreams.connect</groupId>
2121
<artifactId>kafka-connect-mq-source</artifactId>
2222
<packaging>jar</packaging>
23-
<version>2.2.0</version>
23+
<version>2.2.1</version>
2424
<name>kafka-connect-mq-source</name>
2525
<organization>
2626
<name>IBM Corporation</name>

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,4 +654,39 @@ public void verifyEmptyTextMessage() throws Exception {
654654

655655
connectTask.commitRecord(kafkaMessage);
656656
}
657+
658+
@Test
659+
public void testJmsWorkerWithCustomReciveForConsumerAndCustomReconnectValues() throws Exception {
660+
connectTask = getSourceTaskWithEmptyKafkaOffset();
661+
662+
final Map<String, String> connectorConfigProps = createExactlyOnceConnectorProperties();
663+
connectorConfigProps.put("mq.message.body.jms", "true");
664+
connectorConfigProps.put("mq.record.builder",
665+
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
666+
connectorConfigProps.put("mq.jms.receive.timeout", "5000");
667+
connectorConfigProps.put("mq.jms.reconnect.delay.min.ms", "100");
668+
connectorConfigProps.put("mq.jms.reconnect.delay.max.ms", "10000");
669+
670+
final JMSWorker shared = new JMSWorker();
671+
shared.configure(getPropertiesConfig(connectorConfigProps));
672+
final JMSWorker dedicated = new JMSWorker();
673+
dedicated.configure(getPropertiesConfig(connectorConfigProps));
674+
final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
675+
676+
connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);
677+
678+
final List<Message> messages = createAListOfMessages(getJmsContext(), 2, "message ");
679+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
680+
681+
connectTask.poll();
682+
683+
final List<Message> stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE);
684+
assertThat(stateMsgs1.size()).isEqualTo(1);
685+
shared.attemptRollback();
686+
assertThat(stateMsgs1.size()).isEqualTo(1);
687+
688+
assertEquals(5000L, shared.getReceiveTimeout());
689+
assertEquals(100L, shared.getReconnectDelayMillisMin());
690+
assertEquals(10000L, shared.getReconnectDelayMillisMax());
691+
}
657692
}

src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,22 @@ public class JMSWorker {
7070

7171
private boolean connected = false; // Whether connected to MQ
7272
private AtomicBoolean closeNow; // Whether close has been requested
73-
private long reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN; // Delay between repeated reconnect attempts
73+
private AbstractConfig config;
74+
private long receiveTimeout; // Receive timeout for the jms consumer
75+
private long reconnectDelayMillisMin; // Delay between repeated reconnect attempts min
76+
private long reconnectDelayMillisMax; // Delay between repeated reconnect attempts max
7477

75-
private static final long RECEIVE_TIMEOUT = 2000L;
76-
private static final long RECONNECT_DELAY_MILLIS_MIN = 64L;
77-
private static final long RECONNECT_DELAY_MILLIS_MAX = 8192L;
78+
long getReceiveTimeout() {
79+
return receiveTimeout;
80+
}
81+
82+
long getReconnectDelayMillisMin() {
83+
return reconnectDelayMillisMin;
84+
}
85+
86+
long getReconnectDelayMillisMax() {
87+
return reconnectDelayMillisMax;
88+
}
7889

7990
/**
8091
* Configure this class.
@@ -87,6 +98,7 @@ public void configure(final AbstractConfig config) {
8798
log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(),
8899
config);
89100

101+
this.config = config;
90102
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings",
91103
config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS).toString());
92104

@@ -132,6 +144,9 @@ public void configure(final AbstractConfig config) {
132144
userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
133145
password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
134146
topic = config.getString(MQSourceConnector.CONFIG_NAME_TOPIC);
147+
receiveTimeout = config.getLong(MQSourceConnector.CONFIG_MAX_RECEIVE_TIMEOUT);
148+
reconnectDelayMillisMin = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN);
149+
reconnectDelayMillisMax = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX);
135150
} catch (JMSException | JMSRuntimeException jmse) {
136151
log.error("JMS exception {}", jmse);
137152
throw new JMSWorkerConnectionException("JMS connection failed", jmse);
@@ -230,9 +245,9 @@ public Message receive(final String queueName, final QueueConfig queueConfig, fi
230245

231246
Message message = null;
232247
if (wait) {
233-
log.debug("Waiting {} ms for message", RECEIVE_TIMEOUT);
248+
log.debug("Waiting {} ms for message", receiveTimeout);
234249

235-
message = internalConsumer.receive(RECEIVE_TIMEOUT);
250+
message = internalConsumer.receive(receiveTimeout);
236251

237252
if (message == null) {
238253
log.debug("No message received");
@@ -364,20 +379,23 @@ private boolean maybeReconnect() throws JMSRuntimeException {
364379
log.trace("[{}] Entry {}.maybeReconnect", Thread.currentThread().getId(), this.getClass().getName());
365380
try {
366381
connect();
367-
reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN;
368-
log.info("Connection to MQ established");
382+
// Reset reconnect delay to initial minimum after successful connection
383+
reconnectDelayMillisMin = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN);
384+
log.info("Successfully reconnected to MQ.");
369385
} catch (final JMSRuntimeException jmse) {
370-
// Delay slightly so that repeated reconnect loops don't run too fast
386+
log.error("Failed to reconnect to MQ: {}", jmse);
371387
try {
372-
Thread.sleep(reconnectDelayMillis);
388+
log.debug("Waiting for {} ms before next reconnect attempt.", reconnectDelayMillisMin);
389+
Thread.sleep(reconnectDelayMillisMin);
373390
} catch (final InterruptedException ie) {
391+
log.warn("Reconnect delay interrupted.", ie);
374392
}
375393

376-
if (reconnectDelayMillis < RECONNECT_DELAY_MILLIS_MAX) {
377-
reconnectDelayMillis = reconnectDelayMillis * 2;
394+
// Exponential backoff: double the delay, but do not exceed the maximum limit
395+
if (reconnectDelayMillisMin < reconnectDelayMillisMax) {
396+
reconnectDelayMillisMin = Math.min(reconnectDelayMillisMin * 2, reconnectDelayMillisMax);
397+
log.debug("Reconnect delay increased to {} ms.", reconnectDelayMillisMin);
378398
}
379-
380-
log.error("JMS exception {}", jmse);
381399
log.trace("[{}] Exit {}.maybeReconnect, retval=JMSRuntimeException", Thread.currentThread().getId(),
382400
this.getClass().getName());
383401
throw jmse;

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,21 @@ public class MQSourceConnector extends SourceConnector {
163163
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED = "DISABLED";
164164
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF = "ASDEF";
165165

166+
public static final String CONFIG_MAX_RECEIVE_TIMEOUT = "mq.jms.receive.timeout";
167+
public static final String CONFIG_DOCUMENTATION_MAX_RECEIVE_TIMEOUT = "The timeout in milliseconds for receiving messages from JMS Consumer.";
168+
public static final String CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT = "JMS receive timeout";
169+
public static final long CONFIG_MAX_RECEIVE_TIMEOUT_DEFAULT = 2000L;
170+
171+
public static final String CONFIG_RECONNECT_DELAY_MIN = "mq.jms.reconnect.delay.min.ms";
172+
public static final String CONFIG_DOCUMENTATION_RECONNECT_DELAY_MIN = "The minimum delay in milliseconds for reconnect attempts.";
173+
public static final String CONFIG_DISPLAY_RECONNECT_DELAY_MIN = "JMS reconnect minimum delay";
174+
public static final long CONFIG_RECONNECT_DELAY_MIN_DEFAULT = 64L;
175+
176+
public static final String CONFIG_RECONNECT_DELAY_MAX = "mq.jms.reconnect.delay.max.ms";
177+
public static final String CONFIG_DOCUMENTATION_RECONNECT_DELAY_MAX = "The maximum delay in milliseconds for reconnect attempts.";
178+
public static final String CONFIG_DISPLAY_RECONNECT_DELAY_MAX = "JMS reconnect maximum delay";
179+
public static final long CONFIG_RECONNECT_DELAY_MAX_DEFAULT = 8192L;
180+
166181
// Define valid reconnect options
167182
public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = {
168183
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,
@@ -175,7 +190,7 @@ public class MQSourceConnector extends SourceConnector {
175190
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED.toLowerCase(Locale.ENGLISH)
176191
};
177192

178-
public static String version = "2.2.0";
193+
public static String version = "2.2.1";
179194

180195
private Map<String, String> configProps;
181196

@@ -531,6 +546,30 @@ null, new ReadableFile(),
531546
CONFIG_GROUP_MQ, 25,
532547
Width.SHORT,
533548
CONFIG_DISPLAY_MQ_CLIENT_RECONNECT_OPTIONS);
549+
CONFIGDEF.define(CONFIG_MAX_RECEIVE_TIMEOUT,
550+
Type.LONG,
551+
CONFIG_MAX_RECEIVE_TIMEOUT_DEFAULT, ConfigDef.Range.atLeast(0),
552+
Importance.MEDIUM,
553+
CONFIG_DOCUMENTATION_MAX_RECEIVE_TIMEOUT,
554+
CONFIG_GROUP_MQ, 26,
555+
Width.MEDIUM,
556+
CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT);
557+
CONFIGDEF.define(CONFIG_RECONNECT_DELAY_MIN,
558+
Type.LONG,
559+
CONFIG_RECONNECT_DELAY_MIN_DEFAULT, ConfigDef.Range.atLeast(0),
560+
Importance.MEDIUM,
561+
CONFIG_DOCUMENTATION_RECONNECT_DELAY_MIN,
562+
CONFIG_GROUP_MQ, 27,
563+
Width.MEDIUM,
564+
CONFIG_DISPLAY_RECONNECT_DELAY_MIN);
565+
CONFIGDEF.define(CONFIG_RECONNECT_DELAY_MAX,
566+
Type.LONG,
567+
CONFIG_RECONNECT_DELAY_MAX_DEFAULT, ConfigDef.Range.atLeast(0),
568+
Importance.MEDIUM,
569+
CONFIG_DOCUMENTATION_RECONNECT_DELAY_MAX,
570+
CONFIG_GROUP_MQ, 28,
571+
Width.MEDIUM,
572+
CONFIG_DISPLAY_RECONNECT_DELAY_MAX);
534573

535574
CONFIGDEF.define(CONFIG_NAME_TOPIC,
536575
Type.STRING,

0 commit comments

Comments
 (0)