Skip to content

Commit 8352edb

Browse files
committed
feat: new config to control constants in JMSWorker
- New config for JMS receive timeout - New config for JMS reconnect minimum delay - New config for JMS reconnect maximum delay Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent fb50376 commit 8352edb

File tree

7 files changed

+189
-22
lines changed

7 files changed

+189
-22
lines changed

.github/ISSUE_TEMPLATE/BUG-REPORT.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ body:
5757
label: Version
5858
description: What version of our software are you running?
5959
options:
60-
- 2.2.0 (Default)
60+
- 2.3.0 (Default)
6161
- 1.3.5
6262
- older (<1.3.5)
6363
validations:

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,9 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
305305
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |
306306
| mq.max.poll.blocked.time.ms | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. |
307307
| mq.client.reconnect.options | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED |
308+
| mq.jms.receive.timeout | The timeout in milliseconds for receiving messages from JMS Consumer. | long | 2000 | 1 or greater |
309+
| mq.jms.reconnect.delay.min.ms | The minimum delay in milliseconds for reconnect attempts. | long | 64 | 1 or greater |
310+
| mq.jms.reconnect.delay.max.ms | The maximum delay in milliseconds for reconnect attempts. | long | 8192 | 1 or greater |
308311

309312
### Using a CCDT file
310313

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<groupId>com.ibm.eventstreams.connect</groupId>
2121
<artifactId>kafka-connect-mq-source</artifactId>
2222
<packaging>jar</packaging>
23-
<version>2.2.0</version>
23+
<version>2.3.0</version>
2424
<name>kafka-connect-mq-source</name>
2525
<organization>
2626
<name>IBM Corporation</name>

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ private Map<String, String> createDefaultConnectorProperties() {
8686
props.put("mq.queue", DEFAULT_SOURCE_QUEUE);
8787
props.put("mq.user.authentication.mqcsp", "false");
8888
props.put("topic", "mytopic");
89+
props.put("mq.jms.receive.timeout", "5000");
90+
props.put("mq.jms.reconnect.delay.min.ms", "100");
91+
props.put("mq.jms.reconnect.delay.max.ms", "10000");
8992
return props;
9093
}
9194

@@ -654,4 +657,39 @@ public void verifyEmptyTextMessage() throws Exception {
654657

655658
connectTask.commitRecord(kafkaMessage);
656659
}
660+
661+
@Test
662+
public void testJmsWorkerWithCustomReciveForConsumerAndCustomReconnectValues() throws Exception {
663+
connectTask = getSourceTaskWithEmptyKafkaOffset();
664+
665+
final Map<String, String> connectorConfigProps = createExactlyOnceConnectorProperties();
666+
connectorConfigProps.put("mq.message.body.jms", "true");
667+
connectorConfigProps.put("mq.record.builder",
668+
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
669+
connectorConfigProps.put("mq.message.receive.timeout", "5000");
670+
connectorConfigProps.put("mq.reconnect.delay.min.ms", "100");
671+
connectorConfigProps.put("mq.reconnect.delay.max.ms", "10000");
672+
673+
final JMSWorker shared = new JMSWorker();
674+
shared.configure(getPropertiesConfig(connectorConfigProps));
675+
final JMSWorker dedicated = new JMSWorker();
676+
dedicated.configure(getPropertiesConfig(connectorConfigProps));
677+
final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
678+
679+
connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);
680+
681+
final List<Message> messages = createAListOfMessages(getJmsContext(), 2, "message ");
682+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
683+
684+
connectTask.poll();
685+
686+
final List<Message> stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE);
687+
assertThat(stateMsgs1.size()).isEqualTo(1);
688+
shared.attemptRollback();
689+
assertThat(stateMsgs1.size()).isEqualTo(1);
690+
691+
assertEquals(5000L, shared.getReceiveTimeout());
692+
assertEquals(100L, shared.getReconnectDelayMillisMin());
693+
assertEquals(10000L, shared.getReconnectDelayMillisMax());
694+
}
657695
}

src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,22 @@ public class JMSWorker {
7070

7171
private boolean connected = false; // Whether connected to MQ
7272
private AtomicBoolean closeNow; // Whether close has been requested
73-
private long reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN; // Delay between repeated reconnect attempts
73+
private AbstractConfig config;
74+
private long receiveTimeout; // Receive timeout for the jms consumer
75+
private long reconnectDelayMillisMin; // Delay between repeated reconnect attempts min
76+
private long reconnectDelayMillisMax; // Delay between repeated reconnect attempts max
7477

75-
private static final long RECEIVE_TIMEOUT = 2000L;
76-
private static final long RECONNECT_DELAY_MILLIS_MIN = 64L;
77-
private static final long RECONNECT_DELAY_MILLIS_MAX = 8192L;
78+
long getReceiveTimeout() {
79+
return receiveTimeout;
80+
}
81+
82+
long getReconnectDelayMillisMin() {
83+
return reconnectDelayMillisMin;
84+
}
85+
86+
long getReconnectDelayMillisMax() {
87+
return reconnectDelayMillisMax;
88+
}
7889

7990
/**
8091
* Configure this class.
@@ -87,6 +98,7 @@ public void configure(final AbstractConfig config) {
8798
log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(),
8899
config);
89100

101+
this.config = config;
90102
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings",
91103
config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS).toString());
92104

@@ -132,6 +144,9 @@ public void configure(final AbstractConfig config) {
132144
userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
133145
password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
134146
topic = config.getString(MQSourceConnector.CONFIG_NAME_TOPIC);
147+
receiveTimeout = config.getLong(MQSourceConnector.CONFIG_MAX_RECEIVE_TIMEOUT);
148+
reconnectDelayMillisMin = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN);
149+
reconnectDelayMillisMax = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX);
135150
} catch (JMSException | JMSRuntimeException jmse) {
136151
log.error("JMS exception {}", jmse);
137152
throw new JMSWorkerConnectionException("JMS connection failed", jmse);
@@ -230,9 +245,9 @@ public Message receive(final String queueName, final QueueConfig queueConfig, fi
230245

231246
Message message = null;
232247
if (wait) {
233-
log.debug("Waiting {} ms for message", RECEIVE_TIMEOUT);
248+
log.debug("Waiting {} ms for message", receiveTimeout);
234249

235-
message = internalConsumer.receive(RECEIVE_TIMEOUT);
250+
message = internalConsumer.receive(receiveTimeout);
236251

237252
if (message == null) {
238253
log.debug("No message received");
@@ -364,20 +379,23 @@ private boolean maybeReconnect() throws JMSRuntimeException {
364379
log.trace("[{}] Entry {}.maybeReconnect", Thread.currentThread().getId(), this.getClass().getName());
365380
try {
366381
connect();
367-
reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN;
368-
log.info("Connection to MQ established");
382+
// Reset reconnect delay to initial minimum after successful connection
383+
reconnectDelayMillisMin = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN);
384+
log.info("Successfully reconnected to MQ.");
369385
} catch (final JMSRuntimeException jmse) {
370-
// Delay slightly so that repeated reconnect loops don't run too fast
386+
log.error("Failed to reconnect to MQ: {}", jmse);
371387
try {
372-
Thread.sleep(reconnectDelayMillis);
388+
log.debug("Waiting for {} ms before next reconnect attempt.", reconnectDelayMillisMin);
389+
Thread.sleep(reconnectDelayMillisMin);
373390
} catch (final InterruptedException ie) {
391+
log.warn("Reconnect delay interrupted.", ie);
374392
}
375393

376-
if (reconnectDelayMillis < RECONNECT_DELAY_MILLIS_MAX) {
377-
reconnectDelayMillis = reconnectDelayMillis * 2;
394+
// Exponential backoff: double the delay, but do not exceed the maximum limit
395+
if (reconnectDelayMillisMin < reconnectDelayMillisMax) {
396+
reconnectDelayMillisMin = Math.min(reconnectDelayMillisMin * 2, reconnectDelayMillisMax);
397+
log.debug("Reconnect delay increased to {} ms.", reconnectDelayMillisMin);
378398
}
379-
380-
log.error("JMS exception {}", jmse);
381399
log.trace("[{}] Exit {}.maybeReconnect, retval=JMSRuntimeException", Thread.currentThread().getId(),
382400
this.getClass().getName());
383401
throw jmse;

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,24 @@ public class MQSourceConnector extends SourceConnector {
163163
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED = "DISABLED";
164164
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF = "ASDEF";
165165

166+
public static final String CONFIG_MAX_RECEIVE_TIMEOUT = "mq.message.receive.timeout";
167+
public static final String CONFIG_DOCUMENTATION_MAX_RECEIVE_TIMEOUT = "How long the connector should wait (in milliseconds) for a message to arrive if no message is available immediately";
168+
public static final String CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT = "message receive timeout";
169+
public static final long CONFIG_MAX_RECEIVE_TIMEOUT_DEFAULT = 2000L;
170+
public static final long CONFIG_MAX_RECEIVE_TIMEOUT_MINIMUM = 1L;
171+
172+
public static final String CONFIG_RECONNECT_DELAY_MIN = "mq.reconnect.delay.min.ms";
173+
public static final String CONFIG_DOCUMENTATION_RECONNECT_DELAY_MIN = "The minimum delay in milliseconds for reconnect attempts.";
174+
public static final String CONFIG_DISPLAY_RECONNECT_DELAY_MIN = "reconnect minimum delay";
175+
public static final long CONFIG_RECONNECT_DELAY_MIN_DEFAULT = 64L;
176+
public static final long CONFIG_RECONNECT_DELAY_MIN_MINIMUM = 1L;
177+
178+
public static final String CONFIG_RECONNECT_DELAY_MAX = "mq.reconnect.delay.max.ms";
179+
public static final String CONFIG_DOCUMENTATION_RECONNECT_DELAY_MAX = "The maximum delay in milliseconds for reconnect attempts.";
180+
public static final String CONFIG_DISPLAY_RECONNECT_DELAY_MAX = "reconnect maximum delay";
181+
public static final long CONFIG_RECONNECT_DELAY_MAX_DEFAULT = 8192L;
182+
public static final long CONFIG_RECONNECT_DELAY_MAX_MINIMUM = 10L;
183+
166184
// Define valid reconnect options
167185
public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = {
168186
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,
@@ -175,7 +193,7 @@ public class MQSourceConnector extends SourceConnector {
175193
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED.toLowerCase(Locale.ENGLISH)
176194
};
177195

178-
public static String version = "2.2.0";
196+
public static String version = "2.3.0";
179197

180198
private Map<String, String> configProps;
181199

@@ -265,6 +283,7 @@ public Config validate(final Map<String, String> connectorConfigs) {
265283
final Config config = super.validate(connectorConfigs);
266284

267285
MQSourceConnector.validateMQClientReconnectOptions(config);
286+
MQSourceConnector.validateRetryDelayConfig(config);
268287
return config;
269288
}
270289

@@ -291,6 +310,31 @@ private static void validateMQClientReconnectOptions(final Config config) {
291310
}
292311
}
293312

313+
/**
314+
* Validates if the retry delay max value is greater than or equal to the min value.
315+
* Adds an error message if the validation fails.
316+
*/
317+
private static void validateRetryDelayConfig(final Config config) {
318+
// Collect all configuration values
319+
final Map<String, ConfigValue> configValues = config.configValues().stream()
320+
.collect(Collectors.toMap(ConfigValue::name, v -> v));
321+
322+
final ConfigValue reconnectDelayMaxConfigValue = configValues.get(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX);
323+
final ConfigValue reconnectDelayMinConfigValue = configValues.get(MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN);
324+
325+
final long maxReceiveTimeout = (long) reconnectDelayMaxConfigValue.value();
326+
final long minReceiveTimeout = (long) reconnectDelayMinConfigValue.value();
327+
328+
// Validate if the max value is greater than min value
329+
if (maxReceiveTimeout < minReceiveTimeout) {
330+
reconnectDelayMaxConfigValue.addErrorMessage(String.format(
331+
"The value of '%s' must be greater than or equal to the value of '%s'.",
332+
MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX,
333+
MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN
334+
));
335+
}
336+
}
337+
294338
/** Null validator - indicates that any value is acceptable for this config option. */
295339
private static final ConfigDef.Validator ANY = null;
296340

@@ -531,6 +575,30 @@ null, new ReadableFile(),
531575
CONFIG_GROUP_MQ, 25,
532576
Width.SHORT,
533577
CONFIG_DISPLAY_MQ_CLIENT_RECONNECT_OPTIONS);
578+
CONFIGDEF.define(CONFIG_MAX_RECEIVE_TIMEOUT,
579+
Type.LONG,
580+
CONFIG_MAX_RECEIVE_TIMEOUT_DEFAULT, ConfigDef.Range.atLeast(CONFIG_MAX_RECEIVE_TIMEOUT_MINIMUM),
581+
Importance.MEDIUM,
582+
CONFIG_DOCUMENTATION_MAX_RECEIVE_TIMEOUT,
583+
CONFIG_GROUP_MQ, 26,
584+
Width.MEDIUM,
585+
CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT);
586+
CONFIGDEF.define(CONFIG_RECONNECT_DELAY_MIN,
587+
Type.LONG,
588+
CONFIG_RECONNECT_DELAY_MIN_DEFAULT, ConfigDef.Range.atLeast(CONFIG_RECONNECT_DELAY_MIN_MINIMUM),
589+
Importance.MEDIUM,
590+
CONFIG_DOCUMENTATION_RECONNECT_DELAY_MIN,
591+
CONFIG_GROUP_MQ, 27,
592+
Width.MEDIUM,
593+
CONFIG_DISPLAY_RECONNECT_DELAY_MIN);
594+
CONFIGDEF.define(CONFIG_RECONNECT_DELAY_MAX,
595+
Type.LONG,
596+
CONFIG_RECONNECT_DELAY_MAX_DEFAULT, ConfigDef.Range.atLeast(CONFIG_RECONNECT_DELAY_MAX_MINIMUM),
597+
Importance.MEDIUM,
598+
CONFIG_DOCUMENTATION_RECONNECT_DELAY_MAX,
599+
CONFIG_GROUP_MQ, 28,
600+
Width.MEDIUM,
601+
CONFIG_DISPLAY_RECONNECT_DELAY_MAX);
534602

535603
CONFIGDEF.define(CONFIG_NAME_TOPIC,
536604
Type.STRING,

src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package com.ibm.eventstreams.connect.mqsource;
1717

1818
import org.apache.kafka.common.config.Config;
19-
import org.apache.kafka.common.config.ConfigValue;
2019
import org.apache.kafka.connect.connector.Connector;
2120
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
2221
import org.apache.kafka.connect.source.ExactlyOnceSupport;
@@ -30,11 +29,7 @@
3029
import static org.junit.Assert.assertTrue;
3130

3231
import java.util.HashMap;
33-
import java.util.List;
3432
import java.util.Map;
35-
import java.util.stream.Collector;
36-
import java.util.stream.Collectors;
37-
import java.util.stream.Stream;
3833

3934
public class MQSourceConnectorTest {
4035
@Test
@@ -146,4 +141,49 @@ public void testValidateMQClientReconnectOptionsWithANYOption() {
146141
.flatMap(cv -> cv.errorMessages().stream())
147142
.anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
148143
}
144+
145+
@Test
146+
public void testValidateRetryDelayConfig() {
147+
final Map<String, String> configProps = new HashMap<String, String>();
148+
configProps.put("mq.reconnect.delay.max.ms", "10");
149+
configProps.put("mq.reconnect.delay.min.ms", "100");
150+
configProps.put("tasks.max", "1");
151+
152+
final Config config = new MQSourceConnector().validate(configProps);
153+
154+
assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
155+
assertTrue(config.configValues().stream()
156+
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX))
157+
.flatMap(cv -> cv.errorMessages().stream())
158+
.anyMatch(msg -> msg.contains("The value of 'mq.reconnect.delay.max.ms' must be greater than or equal to the value of 'mq.reconnect.delay.min.ms'.")));
159+
}
160+
161+
@Test
162+
public void testValidateRetryDelayConfigWithNoReconnectValues() {
163+
final Map<String, String> configProps = new HashMap<String, String>();
164+
configProps.put("tasks.max", "1");
165+
166+
final Config config = new MQSourceConnector().validate(configProps);
167+
168+
assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
169+
assertTrue(config.configValues().stream()
170+
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX))
171+
.flatMap(cv -> cv.errorMessages().stream())
172+
.allMatch(msg -> msg == null));
173+
}
174+
175+
@Test
176+
public void testValidateRetryDelayConfigWithDefaultValues() {
177+
final Map<String, String> configProps = new HashMap<String, String>();
178+
configProps.put("mq.reconnect.delay.min.ms", "1000000");
179+
configProps.put("tasks.max", "1");
180+
181+
final Config config = new MQSourceConnector().validate(configProps);
182+
183+
assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
184+
assertTrue(config.configValues().stream()
185+
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX))
186+
.flatMap(cv -> cv.errorMessages().stream())
187+
.anyMatch(msg -> msg.contains("The value of 'mq.reconnect.delay.max.ms' must be greater than or equal to the value of 'mq.reconnect.delay.min.ms'.")));
188+
}
149189
}

0 commit comments

Comments
 (0)