Skip to content

Commit 956db2d

Browse files
Joel HansonGitHub Enterprise
authored andcommitted
refactor: delegate config validation to Kafka Connect (#50)
* feat: pull in changes from origin Contributes to: event-integration/eventstreams-planning#12467 Signed-off-by: Joel Hanson <joel.hanson2@ibm.com> * feat: Fix suggested review comments Contributes to: event-integration/eventstreams-planning#12467 Signed-off-by: Joel Hanson <joel.hanson2@ibm.com> --------- Signed-off-by: Joel Hanson <joel.hanson2@ibm.com>
1 parent a595a15 commit 956db2d

File tree

14 files changed

+394
-252
lines changed

14 files changed

+394
-252
lines changed

pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@
126126
<outputDirectory>${project.build.outputDirectory}/META-INF/LICENSES</outputDirectory>
127127
</configuration>
128128
</plugin>
129-
129+
130130
<plugin>
131131
<artifactId>maven-compiler-plugin</artifactId>
132132
<version>3.1</version>
@@ -324,7 +324,6 @@
324324
</execution>
325325
</executions>
326326
</plugin>
327-
328327
</plugins>
329328
</build>
330329
</project>

src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.ibm.eventstreams.connect.mqsource.utils.MQTestUtil;
2323
import com.ibm.mq.jms.MQConnectionFactory;
2424
import com.ibm.msg.client.wmq.WMQConstants;
25+
26+
import org.apache.kafka.common.config.AbstractConfig;
2527
import org.junit.ClassRule;
2628
import org.testcontainers.containers.GenericContainer;
2729
import org.testcontainers.containers.wait.strategy.Wait;
@@ -100,7 +102,12 @@ protected Map<String, String> getDefaultConnectorProperties() {
100102
props.put("mq.channel.name", CHANNEL_NAME);
101103
props.put("mq.queue", DEFAULT_SOURCE_QUEUE);
102104
props.put("mq.user.authentication.mqcsp", "false");
105+
props.put("topic", "mytopic");
103106
return props;
104107
}
105108

109+
public AbstractConfig getPropertiesConfig(Map<String, String> props) {
110+
final AbstractConfig connectorConfig = new AbstractConfig(MQSourceConnector.CONFIGDEF, props);
111+
return connectorConfig;
112+
}
106113
}

src/integration/java/com/ibm/eventstreams/connect/mqsource/JMSWorkerIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public void setUpJMSWorker() {
4545
defaultConnectorProperties.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
4646

4747
jmsWorker = new JMSWorker();
48-
jmsWorker.configure(defaultConnectorProperties);
48+
jmsWorker.configure(getPropertiesConfig(defaultConnectorProperties));
4949
jmsWorker.connect();
5050
}
5151

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ public class MQSourceTaskAuthIT {
5252

5353
private static final String QMGR_NAME = "MYAUTHQMGR";
5454
private static final String QUEUE_NAME = "DEV.QUEUE.1";
55-
private static final String STATE_QUEUE_NAME = "DEV.QUEUE.2";
5655
private static final String CHANNEL_NAME = "DEV.APP.SVRCONN";
5756
private static final String APP_PASSWORD = "MySuperSecretPassword";
5857
private static final String ADMIN_PASSWORD = "MyAdminPassword";
@@ -89,6 +88,7 @@ private Map<String, String> getConnectorProps() {
8988
connectorProps.put("mq.password", APP_PASSWORD);
9089
connectorProps.put("mq.message.body.jms", "false");
9190
connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
91+
connectorProps.put("topic", "mytopic");
9292
return connectorProps;
9393
}
9494

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskExceptionHandlingIT.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import static org.mockito.ArgumentMatchers.any;
3232
import static org.mockito.ArgumentMatchers.anyBoolean;
3333
import static org.mockito.ArgumentMatchers.anyString;
34-
import static org.mockito.ArgumentMatchers.isNull;
34+
import static org.mockito.ArgumentMatchers.eq;
3535
import static org.mockito.Mockito.doCallRealMethod;
3636

3737
import java.util.ArrayList;
@@ -68,6 +68,8 @@ public class MQSourceTaskExceptionHandlingIT extends AbstractJMSContextIT {
6868
private MQSourceTask connectTask;
6969
private SequenceStateClient sequenceStateClient;
7070

71+
private static final String TOPIC_NAME = "mytopic";
72+
7173
private SequenceStateClient createSequenceStateClient(final JMSWorker shared, final JMSWorker dedicated, final Map<String,String> connectorProps) {
7274
sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
7375
return sequenceStateClient;
@@ -79,10 +81,10 @@ public void startup() throws Exception {
7981
final Map<String, String> connectorConfigProps = getConnectorProps();
8082

8183
JMSWorker shared = new JMSWorker();
82-
shared.configure(connectorConfigProps);
84+
shared.configure(getPropertiesConfig(connectorConfigProps));
8385

8486
JMSWorker dedicated = new JMSWorker();
85-
dedicated.configure(connectorConfigProps);
87+
dedicated.configure(getPropertiesConfig(connectorConfigProps));
8688

8789
connectTask.start(connectorConfigProps);
8890
createSequenceStateClient(shared, dedicated, connectorConfigProps);
@@ -108,6 +110,7 @@ private Map<String, String> getConnectorProps() {
108110
connectorProps.put("mq.user.authentication.mqcsp", "false");
109111
connectorProps.put("mq.message.body.jms", "true");
110112
connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
113+
connectorProps.put("topic", TOPIC_NAME);
111114
return connectorProps;
112115
}
113116

@@ -176,10 +179,10 @@ public void testPollDoesThrowExceptionDueToMQConnectionError() throws Exception
176179
@Test
177180
public void testIfErrorsOccurWithPollRollbackAndContinues() throws Exception {
178181
JMSWorker spyJMSWorker = Mockito.spy(new JMSWorker());
179-
spyJMSWorker.configure(getConnectorProps());
182+
spyJMSWorker.configure(getPropertiesConfig(getConnectorProps()));
180183

181184
JMSWorker dedicated = new JMSWorker();
182-
dedicated.configure(getConnectorProps());
185+
dedicated.configure(getPropertiesConfig(getConnectorProps()));
183186

184187
MQSourceTask connectTask = getSourceTaskWithEmptyKafkaOffset();
185188

@@ -240,10 +243,10 @@ public void testIfErrorsOccurWithPollRollbackAndContinues() throws Exception {
240243
@Test
241244
public void testNoExceptionThrownWhenJMSExceptionThrownByBuilder_AndRecordsAreRollbackAndThenProcessedNextPoll() throws Exception {
242245
JMSWorker sharedJMSWorker = new JMSWorker();
243-
sharedJMSWorker.configure(getConnectorProps());
246+
sharedJMSWorker.configure(getPropertiesConfig(getConnectorProps()));
244247

245248
JMSWorker dedicated = new JMSWorker();
246-
dedicated.configure(getConnectorProps());
249+
dedicated.configure(getPropertiesConfig(getConnectorProps()));
247250

248251
DefaultRecordBuilder spyRecordBuilder = Mockito.spy(new DefaultRecordBuilder());
249252

@@ -274,7 +277,7 @@ public void testNoExceptionThrownWhenJMSExceptionThrownByBuilder_AndRecordsAreRo
274277
.doThrow(new JMSException("This is a JMSException caused by a spy!!")) // 8
275278
.doCallRealMethod()
276279
// Be careful with these, any() does not cover null hence null is used
277-
.when(spyRecordBuilder).toSourceRecord(any(JMSContext.class), isNull(), anyBoolean(), any(Message.class), any(), any());
280+
.when(spyRecordBuilder).toSourceRecord(any(JMSContext.class), eq(TOPIC_NAME), anyBoolean(), any(Message.class), any(), any());
278281

279282
// Needs to be done here, after the doCallRealMethods have been setup.
280283
sharedJMSWorker.setRecordBuilder(spyRecordBuilder);

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ private Map<String, String> createDefaultConnectorProperties() {
8585
props.put("mq.channel.name", CHANNEL_NAME);
8686
props.put("mq.queue", DEFAULT_SOURCE_QUEUE);
8787
props.put("mq.user.authentication.mqcsp", "false");
88+
props.put("topic", "mytopic");
8889
return props;
8990
}
9091

@@ -430,9 +431,9 @@ public void testSequenceStateMsgReadUnderMQTx() throws Exception {
430431
connectorConfigProps.put("mq.message.body.jms", "true");
431432
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
432433

433-
spyJMSWorker.configure(connectorConfigProps);
434+
spyJMSWorker.configure(getPropertiesConfig(connectorConfigProps));
434435
JMSWorker dedicated = new JMSWorker();
435-
dedicated.configure(connectorConfigProps);
436+
dedicated.configure(getPropertiesConfig(connectorConfigProps));
436437
SequenceStateClient sequenceStateClient = Mockito.spy(new SequenceStateClient(DEFAULT_STATE_QUEUE, spyJMSWorker, dedicated));
437438

438439
connectTask.start(connectorConfigProps, spyJMSWorker, dedicated, sequenceStateClient);
@@ -484,9 +485,9 @@ public void testSequenceStateMsgWrittenIndependentFromGetSource() throws Excepti
484485
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
485486

486487
JMSWorker shared = new JMSWorker();
487-
shared.configure(connectorConfigProps);
488+
shared.configure(getPropertiesConfig(connectorConfigProps));
488489
JMSWorker dedicated = new JMSWorker();
489-
dedicated.configure(connectorConfigProps);
490+
dedicated.configure(getPropertiesConfig(connectorConfigProps));
490491
SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
491492

492493
connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);
@@ -514,9 +515,9 @@ public void testRemoveDeliveredMessagesFromSourceQueueThrowsException() throws E
514515
JMSWorker spyDedicated = Mockito.spy(new JMSWorker());
515516
JMSWorker spyShared = Mockito.spy(new JMSWorker());
516517

517-
spyJMSWorker.configure(connectorConfigProps);
518-
spyDedicated.configure(connectorConfigProps);
519-
spyShared.configure(connectorConfigProps);
518+
spyJMSWorker.configure(getPropertiesConfig(connectorConfigProps));
519+
spyDedicated.configure(getPropertiesConfig(connectorConfigProps));
520+
spyShared.configure(getPropertiesConfig(connectorConfigProps));
520521

521522
Message messageSpy = Mockito.spy(getJmsContext().createTextMessage("Spy Injected Message"));
522523

@@ -552,9 +553,9 @@ public void testRemoveDeliveredMessagesFromSourceQueueDoesNotThrowException() th
552553
JMSWorker spyDedicated = Mockito.spy(new JMSWorker());
553554
JMSWorker spyShared = Mockito.spy(new JMSWorker());
554555

555-
spyJMSWorker.configure(connectorConfigProps);
556-
spyDedicated.configure(connectorConfigProps);
557-
spyShared.configure(connectorConfigProps);
556+
spyJMSWorker.configure(getPropertiesConfig(connectorConfigProps));
557+
spyDedicated.configure(getPropertiesConfig(connectorConfigProps));
558+
spyShared.configure(getPropertiesConfig(connectorConfigProps));
558559

559560
Message messageSpy = Mockito.spy(getJmsContext().createTextMessage("Spy Injected Message"));
560561

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskOnlyOnceIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.ibm.eventstreams.connect.mqsource.utils.SourceTaskStopper;
2323

2424
import org.apache.kafka.connect.errors.ConnectException;
25-
import org.apache.kafka.connect.errors.RetriableException;
2625
import org.apache.kafka.connect.source.SourceRecord;
2726
import org.jetbrains.annotations.NotNull;
2827
import org.junit.After;
@@ -61,9 +60,9 @@ public void startup() throws Exception {
6160
final Map<String, String> connectorConfigProps = connectionProperties();
6261

6362
final JMSWorker shared = new JMSWorker();
64-
shared.configure(connectorConfigProps);
63+
shared.configure(getPropertiesConfig(connectorConfigProps));
6564
final JMSWorker dedicated = new JMSWorker();
66-
dedicated.configure(connectorConfigProps);
65+
dedicated.configure(getPropertiesConfig(connectorConfigProps));
6766
sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
6867

6968
connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);
@@ -89,12 +88,13 @@ private Map<String, String> connectionProperties() {
8988
props.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
9089
props.put("mq.exactly.once.state.queue", DEFAULT_STATE_QUEUE);
9190
props.put("tasks.max", "1");
91+
props.put("topic", "mytopic");
9292
return props;
9393
}
9494

9595
private Map<String, String> connectionPropertiesWithOnlyOnceDisabled() {
9696
final Map<String, String> props = connectionProperties();
97-
props.put("mq.exactly.once.state.queue", null);
97+
props.remove("mq.exactly.once.state.queue");
9898
return props;
9999
}
100100

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskOnlyOnceStartBehaviourIT.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ public class MQSourceTaskOnlyOnceStartBehaviourIT extends AbstractJMSContextIT {
5555
public void startup() throws Exception {
5656

5757
JMSWorker shared = new JMSWorker();
58-
shared.configure(connectionProperties());
58+
shared.configure(getPropertiesConfig(connectionProperties()));
5959
shared.connect();
6060
JMSWorker dedicated = new JMSWorker();
61-
dedicated.configure(connectionProperties());
61+
dedicated.configure(getPropertiesConfig(connectionProperties()));
6262
dedicated.connect();
6363
sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
6464

@@ -86,6 +86,7 @@ private Map<String, String> connectionProperties() {
8686
props.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
8787
props.put("mq.exactly.once.state.queue", DEFAULT_STATE_QUEUE);
8888
props.put("tasks.max", "1");
89+
props.put("topic", "mytopic");
8990
return props;
9091
}
9192

@@ -132,8 +133,8 @@ public void testOnlyOnceStartBehaviour_GivenNoSequenceStateIsPresentOnQueueOrKaf
132133

133134
// Check that MQ has been read from
134135
List<Message> mqMessages = browseAllMessagesFromQueue(DEFAULT_SOURCE_QUEUE);
135-
assertThat(mqMessages).isEmpty();
136-
136+
assertThat(mqMessages).isEmpty();
137+
137138
// Check that the messages have been created as source records correctly
138139
assertThat(kafkaMessages)
139140
.hasSize(2)
@@ -173,8 +174,8 @@ public void testOnlyOnceStartBehaviour_GivenNoSequenceStateIsPresentOnQueueButOf
173174

174175
// Check that MQ has been read from
175176
List<Message> mqMessages = browseAllMessagesFromQueue(DEFAULT_SOURCE_QUEUE);
176-
assertThat(mqMessages).isEmpty();
177-
177+
assertThat(mqMessages).isEmpty();
178+
178179
// Check that the messages have been created as source records correctly
179180
assertThat(kafkaMessages)
180181
.hasSize(2)
@@ -192,7 +193,7 @@ public void testOnlyOnceStartBehaviour_GivenNoSequenceStateIsPresentOnQueueButOf
192193
assertThat(mqSequenceState.get().getSequenceId()).isEqualTo(6L);
193194
}
194195

195-
@Test // this one needs to take in to account the message ids that should be re deilvered from MQ Also need assertion on state defined inthe start command
196+
@Test // this one needs to take in to account the message ids that should be re deilvered from MQ Also need assertion on state defined inthe start command
196197
public void testOnlyOnceStartBehaviour_GivenSequenceStateIsPresentOnQueue_AndStateIsInFlight_AndStoredOffsetNotInKafka() throws Exception {
197198

198199
connectTask = getSourceTaskWithEmptyKafkaOffset(); // Kafka has no state
@@ -219,7 +220,7 @@ public void testOnlyOnceStartBehaviour_GivenSequenceStateIsPresentOnQueue_AndSta
219220
assertThat(connectTask.startUpAction).isEqualTo(MQSourceTaskStartUpAction.REDELIVER_UNSENT_BATCH);
220221
assertThat(connectTask.getSequenceId().get()).isEqualTo(23L);
221222
assertThat(connectTask.getMsgIds()).isEqualTo(firstBatchOfMessageIds);
222-
223+
223224
final List<SourceRecord> kafkaMessagesInitialPoll = connectTask.poll();
224225

225226
// Check that no record have been returned since the inflight records transaction has ent been rolled back yet
@@ -247,7 +248,7 @@ public void testOnlyOnceStartBehaviour_GivenSequenceStateIsPresentOnQueue_AndSta
247248

248249
// Check that MQ has been read from
249250
List<Message> mqMessages = browseAllMessagesFromQueue(DEFAULT_SOURCE_QUEUE);
250-
assertThat(mqMessages.size()).isEqualTo(3);
251+
assertThat(mqMessages.size()).isEqualTo(3);
251252

252253
// Check that the internal sequence id has been set correctly
253254
assertThat(connectTask.getSequenceId().get()).isEqualTo(23L);

src/integration/java/com/ibm/eventstreams/connect/mqsource/SequenceStateClientIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ public void createSequenceStateClient() {
5757
Map<String, String> props = getDefaultConnectorProperties();
5858
props.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
5959
shared = new JMSWorker();
60-
shared.configure(props);
60+
shared.configure(getPropertiesConfig(props));
6161
JMSWorker dedicated = new JMSWorker();
62-
dedicated.configure(props);
62+
dedicated.configure(getPropertiesConfig(props));
6363
sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
6464
}
6565

@@ -332,5 +332,5 @@ private List<Message> aListOfOneStringMessage() throws Exception {
332332
String text = "GenericTextMessage";
333333
return Arrays.asList(getJmsContext().createTextMessage(text));
334334
}
335-
335+
336336
}

0 commit comments

Comments
 (0)