Skip to content

Commit 8dc9546

Browse files
Joel HansonJoel-hanson
Joel Hanson
authored andcommitted
feat: Allow config for client reconnect in connector
Contributes to: event-integration/eventstreams-planning#12716 Signed-off-by: Joel Hanson <joel.hanson2@ibm.com> Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent bbb1102 commit 8dc9546

File tree

8 files changed

+239
-8
lines changed

8 files changed

+239
-8
lines changed

.github/ISSUE_TEMPLATE/BUG-REPORT.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ body:
5757
label: Version
5858
description: What version of our software are you running?
5959
options:
60-
- 2.1.0 (Default)
61-
- 1.3.5
60+
- 2.1.1 (Default)
61+
- 1.3.5
6262
- older (<1.3.5)
6363
validations:
6464
required: true

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
304304
| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater |
305305
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |
306306
| mq.max.poll.blocked.time.ms | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. |
307+
| mq.client.reconnect.options | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED |
307308

308309
### Using a CCDT file
309310

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<groupId>com.ibm.eventstreams.connect</groupId>
2121
<artifactId>kafka-connect-mq-source</artifactId>
2222
<packaging>jar</packaging>
23-
<version>2.1.0</version>
23+
<version>2.1.1</version>
2424
<name>kafka-connect-mq-source</name>
2525
<organization>
2626
<name>IBM Corporation</name>

src/integration/java/com/ibm/eventstreams/connect/mqsource/JMSWorkerIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,5 +106,4 @@ public void testQueueHoldsMoreThanOneMessage_twoMessageOnQueue() throws Exceptio
106106
public void testQueueHoldsMoreThanOneMessage_queueNotFound() {
107107
assertThrows(Exception.class, ()->jmsWorker.queueHoldsMoreThanOneMessage("QUEUE_DOES_NOT_EXIST"));
108108
}
109-
110109
}

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,4 +577,81 @@ public void testRemoveDeliveredMessagesFromSourceQueueDoesNotThrowException() th
577577
assertThatNoException()
578578
.isThrownBy(() -> connectTask.removeDeliveredMessagesFromSourceQueue(Arrays.asList(msgIds)));
579579
}
580+
581+
582+
@Test
583+
public void testConfigureClientReconnectOptions() throws Exception {
584+
// setup test condition: put messages on source queue, poll once to read them
585+
connectTask = getSourceTaskWithEmptyKafkaOffset();
586+
587+
final Map<String, String> connectorConfigProps = createExactlyOnceConnectorProperties();
588+
connectorConfigProps.put("mq.message.body.jms", "true");
589+
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
590+
connectorConfigProps.put("mq.client.reconnect.options", "QMGR");
591+
592+
JMSWorker shared = new JMSWorker();
593+
shared.configure(getPropertiesConfig(connectorConfigProps));
594+
JMSWorker dedicated = new JMSWorker();
595+
dedicated.configure(getPropertiesConfig(connectorConfigProps));
596+
SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
597+
598+
connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);
599+
600+
final List<Message> messages = createAListOfMessages(getJmsContext(), 2, "message ");
601+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
602+
603+
connectTask.poll();
604+
605+
List<Message> stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE);
606+
assertThat(stateMsgs1.size()).isEqualTo(1);
607+
shared.attemptRollback();
608+
assertThat(stateMsgs1.size()).isEqualTo(1); //state message is still there even though source message were rolled back
609+
610+
}
611+
612+
@Test
613+
public void verifyEmptyMessage() throws Exception {
614+
connectTask = new MQSourceTask();
615+
616+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
617+
connectorConfigProps.put("mq.message.body.jms", "true");
618+
connectorConfigProps.put("mq.record.builder",
619+
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
620+
621+
connectTask.start(connectorConfigProps);
622+
623+
Message emptyMessage = getJmsContext().createMessage();
624+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(emptyMessage));
625+
626+
final List<SourceRecord> kafkaMessages = connectTask.poll();
627+
assertEquals(1, kafkaMessages.size());
628+
629+
final SourceRecord kafkaMessage = kafkaMessages.get(0);
630+
assertNull(kafkaMessage.value());
631+
632+
connectTask.commitRecord(kafkaMessage);
633+
}
634+
635+
@Test
636+
public void verifyEmptyTextMessage() throws Exception {
637+
connectTask = new MQSourceTask();
638+
639+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
640+
connectorConfigProps.put("mq.message.body.jms", "true");
641+
connectorConfigProps.put("mq.record.builder",
642+
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
643+
644+
connectTask.start(connectorConfigProps);
645+
646+
TextMessage emptyMessage = getJmsContext().createTextMessage();
647+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(emptyMessage));
648+
649+
final List<SourceRecord> kafkaMessages = connectTask.poll();
650+
assertEquals(1, kafkaMessages.size());
651+
652+
final SourceRecord kafkaMessage = kafkaMessages.get(0);
653+
assertNull(kafkaMessage.value());
654+
655+
connectTask.commitRecord(kafkaMessage);
656+
}
580657
}

src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.net.URL;
4444
import java.util.Enumeration;
4545
import java.util.HashMap;
46+
import java.util.Locale;
4647
import java.util.Map;
4748
import java.util.Optional;
4849
import java.util.concurrent.atomic.AtomicBoolean;
@@ -126,6 +127,7 @@ public void configure(final AbstractConfig config) {
126127
mqConnFactory.setSSLSocketFactory(sslContext.getSocketFactory());
127128
}
128129
}
130+
configureClientReconnectOptions(config, mqConnFactory);
129131

130132
userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
131133
password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
@@ -144,6 +146,31 @@ public void configure(final AbstractConfig config) {
144146
log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName());
145147
}
146148

149+
// Configure client reconnect option based on the config
150+
private static void configureClientReconnectOptions(final AbstractConfig config,
151+
final MQConnectionFactory mqConnFactory) throws JMSException {
152+
String clientReconnectOptions = config.getString(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS);
153+
154+
clientReconnectOptions = clientReconnectOptions.toUpperCase(Locale.ENGLISH);
155+
156+
switch (clientReconnectOptions) {
157+
case MQSourceConnector.CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY:
158+
mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT);
159+
break;
160+
161+
case MQSourceConnector.CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR:
162+
mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT_Q_MGR);
163+
break;
164+
165+
case MQSourceConnector.CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED:
166+
mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT_DISABLED);
167+
break;
168+
169+
default:
170+
mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT_AS_DEF);
171+
break;
172+
}
173+
}
147174

148175
/**
149176
* Used for tests.
@@ -152,7 +179,7 @@ protected void setRecordBuilder(final RecordBuilder recordBuilder) {
152179
this.recordBuilder = recordBuilder;
153180
}
154181

155-
protected JMSContext getContext() { // used to enable testing
182+
protected JMSContext getContext() { // used to enable testing
156183
if (jmsCtxt == null) maybeReconnect();
157184
return jmsCtxt;
158185
}

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@
2323
import java.util.Locale;
2424
import java.util.Map;
2525
import java.util.Map.Entry;
26+
import java.util.stream.Collectors;
2627

28+
import org.apache.kafka.common.config.Config;
2729
import org.apache.kafka.common.config.ConfigDef;
2830
import org.apache.kafka.common.config.ConfigDef.Importance;
2931
import org.apache.kafka.common.config.ConfigDef.Type;
3032
import org.apache.kafka.common.config.ConfigDef.Width;
3133
import org.apache.kafka.common.config.ConfigException;
34+
import org.apache.kafka.common.config.ConfigValue;
3235
import org.apache.kafka.connect.connector.Task;
3336
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
3437
import org.apache.kafka.connect.source.ExactlyOnceSupport;
@@ -152,7 +155,27 @@ public class MQSourceConnector extends SourceConnector {
152155
+ "previous batch of messages to be delivered to Kafka before starting a new poll.";
153156
public static final String CONFIG_DISPLAY_MAX_POLL_BLOCKED_TIME_MS = "Max poll blocked time ms";
154157

155-
public static String version = "2.1.0";
158+
public static final String CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS = "mq.client.reconnect.options";
159+
public static final String CONFIG_DOCUMENTATION_MQ_CLIENT_RECONNECT_OPTIONS = "Options governing MQ reconnection.";
160+
public static final String CONFIG_DISPLAY_MQ_CLIENT_RECONNECT_OPTIONS = "MQ client reconnect options";
161+
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR = "QMGR";
162+
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY = "ANY";
163+
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED = "DISABLED";
164+
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF = "ASDEF";
165+
166+
// Define valid reconnect options
167+
public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = {
168+
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,
169+
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF.toLowerCase(Locale.ENGLISH),
170+
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY,
171+
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY.toLowerCase(Locale.ENGLISH),
172+
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR,
173+
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR.toLowerCase(Locale.ENGLISH),
174+
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED,
175+
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED.toLowerCase(Locale.ENGLISH)
176+
};
177+
178+
public static String version = "2.1.1";
156179

157180
private Map<String, String> configProps;
158181

@@ -237,6 +260,37 @@ public ConfigDef config() {
237260
return CONFIGDEF;
238261
}
239262

263+
@Override
264+
public Config validate(final Map<String, String> connectorConfigs) {
265+
final Config config = super.validate(connectorConfigs);
266+
267+
MQSourceConnector.validateMQClientReconnectOptions(config);
268+
return config;
269+
}
270+
271+
private static void validateMQClientReconnectOptions(final Config config) {
272+
// Collect all configuration values
273+
final Map<String, ConfigValue> configValues = config.configValues().stream()
274+
.collect(Collectors.toMap(ConfigValue::name, v -> v));
275+
276+
final ConfigValue clientReconnectOptionsConfigValue = configValues
277+
.get(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS);
278+
final ConfigValue exactlyOnceStateQueueConfigValue = configValues
279+
.get(MQSourceConnector.CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE);
280+
281+
// Check if the exactly once state queue is configured
282+
if (exactlyOnceStateQueueConfigValue.value() == null) {
283+
return;
284+
}
285+
286+
// Validate the client reconnect options
287+
final Boolean isClientReconnectOptionQMGR = CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR.equals(clientReconnectOptionsConfigValue.value());
288+
if (!isClientReconnectOptionQMGR) {
289+
clientReconnectOptionsConfigValue.addErrorMessage(
290+
"When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided. For example: `mq.client.reconnect.options: QMGR`");
291+
}
292+
}
293+
240294
/** Null validator - indicates that any value is acceptable for this config option. */
241295
private static final ConfigDef.Validator ANY = null;
242296

@@ -468,6 +522,16 @@ null, new ReadableFile(),
468522
null, 24, Width.MEDIUM,
469523
CONFIG_DISPLAY_MAX_POLL_BLOCKED_TIME_MS);
470524

525+
CONFIGDEF.define(CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS,
526+
Type.STRING,
527+
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,
528+
ConfigDef.ValidString.in(CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS),
529+
Importance.MEDIUM,
530+
CONFIG_DOCUMENTATION_MQ_CLIENT_RECONNECT_OPTIONS,
531+
CONFIG_GROUP_MQ, 25,
532+
Width.SHORT,
533+
CONFIG_DISPLAY_MQ_CLIENT_RECONNECT_OPTIONS);
534+
471535
CONFIGDEF.define(CONFIG_NAME_TOPIC,
472536
Type.STRING,
473537
// user must specify the topic name

src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package com.ibm.eventstreams.connect.mqsource;
1717

18+
import org.apache.kafka.common.config.Config;
19+
import org.apache.kafka.common.config.ConfigValue;
1820
import org.apache.kafka.connect.connector.Connector;
1921
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
2022
import org.apache.kafka.connect.source.ExactlyOnceSupport;
@@ -28,7 +30,11 @@
2830
import static org.junit.Assert.assertTrue;
2931

3032
import java.util.HashMap;
33+
import java.util.List;
3134
import java.util.Map;
35+
import java.util.stream.Collector;
36+
import java.util.stream.Collectors;
37+
import java.util.stream.Stream;
3238

3339
public class MQSourceConnectorTest {
3440
@Test
@@ -82,5 +88,62 @@ public void testConnectorConfigSupportsExactlyOnce() {
8288
assertFalse(MQSourceConnector.configSupportsExactlyOnce(Collections.singletonMap("mq.exactly.once.state.queue", "")));
8389
assertFalse(MQSourceConnector.configSupportsExactlyOnce(Collections.singletonMap("mq.exactly.once.state.queue", null)));
8490
}
85-
86-
}
91+
92+
@Test
93+
public void testValidateMQClientReconnectOptions() {
94+
final Map<String, String> configProps = new HashMap<String, String>();
95+
configProps.put("mq.exactly.once.state.queue", "DEV.QUEUE.2");
96+
configProps.put("tasks.max", "1");
97+
98+
final Config config = new MQSourceConnector().validate(configProps);
99+
100+
assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
101+
assertTrue(config.configValues().stream()
102+
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS))
103+
.flatMap(cv -> cv.errorMessages().stream())
104+
.anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
105+
}
106+
107+
@Test
108+
public void testValidateMQClientReconnectOptionsWithoutExactlyOnce() {
109+
final Map<String, String> configProps = new HashMap<String, String>();
110+
final Config config = new MQSourceConnector().validate(configProps);
111+
112+
assertFalse(config.configValues().stream()
113+
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS))
114+
.flatMap(cv -> cv.errorMessages().stream())
115+
.anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
116+
}
117+
118+
@Test
119+
public void testValidateMQClientReconnectOptionsWithQMGROption() {
120+
final Map<String, String> configProps = new HashMap<String, String>();
121+
configProps.put("mq.exactly.once.state.queue", "DEV.QUEUE.2");
122+
configProps.put("mq.client.reconnect.options", "QMGR");
123+
configProps.put("tasks.max", "1");
124+
125+
final Config config = new MQSourceConnector().validate(configProps);
126+
127+
assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
128+
assertFalse(config.configValues().stream()
129+
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS))
130+
.flatMap(cv -> cv.errorMessages().stream())
131+
.anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
132+
}
133+
134+
@Test
135+
public void testValidateMQClientReconnectOptionsWithANYOption() {
136+
final Map<String, String> configProps = new HashMap<String, String>();
137+
configProps.put("mq.exactly.once.state.queue", "DEV.QUEUE.2");
138+
configProps.put("mq.client.reconnect.options", "ANY");
139+
configProps.put("tasks.max", "1");
140+
141+
final Config config = new MQSourceConnector().validate(configProps);
142+
143+
assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
144+
assertTrue(config.configValues().stream()
145+
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS))
146+
.flatMap(cv -> cv.errorMessages().stream())
147+
.anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
148+
}
149+
}

0 commit comments

Comments
 (0)