Skip to content

Commit 6d2b939

Browse files
authored
refactor: delegate config validation to Kafka Connect (#130)
The connector was doing a lot of it's own processing of config options - type-checking, null-checking, casting, etc. This makes the connector-specific aspects of the code harder to follow, and it also makes it easier for us to miss validating some required config options (for example, we weren't checking that topic names were provided). This commit moves all of this out of the connector, and makes it the responsibility of the Kafka Connect framework. This removes a lot of unnecessary checking and type checking/casting from the connector code, as the source task and jms reader class can assume that they will be given a validated config. Note that I did have to leave the existing Map<String, String> parameter in the record builder as that is a public interface that users have subclassed. This should ideally be a Kafka Connect object rather than a raw map, but in the interest of not breaking existing subclasses, I've left it as a map. This does mean I'm converting the properties map to a config object twice, but as this is only at startup I think the performance cost will be minimal. Signed-off-by: Dale Lane <dale.lane@uk.ibm.com>
1 parent d93b499 commit 6d2b939

File tree

9 files changed

+346
-221
lines changed

9 files changed

+346
-221
lines changed

pom.xml

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -187,28 +187,6 @@
187187
</executions>
188188
</plugin>
189189

190-
<!-- add the src/integration folder as a test folder, which lets us keep -->
191-
<!-- tests that have a dependency on testcontainers separate from pure -->
192-
<!-- unit tests with no external dependency -->
193-
<plugin>
194-
<groupId>org.codehaus.mojo</groupId>
195-
<artifactId>build-helper-maven-plugin</artifactId>
196-
<version>3.3.0</version>
197-
<executions>
198-
<execution>
199-
<id>add-test-source</id>
200-
<phase>process-test-sources</phase>
201-
<goals>
202-
<goal>add-test-source</goal>
203-
</goals>
204-
<configuration>
205-
<sources>
206-
<source>src/integration/java</source>
207-
</sources>
208-
</configuration>
209-
</execution>
210-
</executions>
211-
</plugin>
212190

213191
<!-- generate test code coverage report -->
214192
<plugin>
@@ -323,5 +301,12 @@
323301
</executions>
324302
</plugin>
325303
</plugins>
304+
305+
<!-- some integration tests use the custom mqsc file in the resources directory -->
306+
<testResources>
307+
<testResource>
308+
<directory>src/integration/resources</directory>
309+
</testResource>
310+
</testResources>
326311
</build>
327312
</project>

src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.junit.ClassRule;
3232
import org.testcontainers.containers.GenericContainer;
3333
import org.testcontainers.containers.output.WaitingConsumer;
34+
import org.testcontainers.utility.MountableFile;
3435

3536
import com.ibm.mq.jms.MQConnectionFactory;
3637
import com.ibm.msg.client.jms.JmsConnectionFactory;
@@ -53,6 +54,7 @@ public class AbstractJMSContextIT {
5354
.withEnv("LICENSE", "accept")
5455
.withEnv("MQ_QMGR_NAME", QMGR_NAME)
5556
.withEnv("MQ_ENABLE_EMBEDDED_WEB_SERVER", "false")
57+
.withCopyFileToContainer(MountableFile.forClasspathResource("no-auth-qmgr.mqsc"), "/etc/mqm/99-no-auth-qmgr.mqsc")
5658
.withExposedPorts(1414);
5759

5860
private JMSContext jmsContext;

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ private Map<String, String> getConnectorProps() {
7272
connectorProps.put("mq.password", APP_PASSWORD);
7373
connectorProps.put("mq.message.body.jms", "false");
7474
connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
75+
connectorProps.put("topic", "mytopic");
7576
return connectorProps;
7677
}
7778

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ private Map<String, String> createDefaultConnectorProperties() {
6161
props.put("mq.channel.name", getChannelName());
6262
props.put("mq.queue", MQ_QUEUE);
6363
props.put("mq.user.authentication.mqcsp", "false");
64+
props.put("topic", "mytopic");
6465
return props;
6566
}
6667

@@ -82,6 +83,7 @@ public void verifyJmsTextMessages() throws Exception {
8283
final List<SourceRecord> kafkaMessages = connectTask.poll();
8384
assertEquals(2, kafkaMessages.size());
8485
for (final SourceRecord kafkaMessage : kafkaMessages) {
86+
assertEquals("mytopic", kafkaMessage.topic());
8587
assertNull(kafkaMessage.key());
8688
assertNull(kafkaMessage.valueSchema());
8789

@@ -116,6 +118,7 @@ public void verifyJmsJsonMessages() throws Exception {
116118
assertEquals(5, kafkaMessages.size());
117119
for (int i = 0; i < 5; i++) {
118120
final SourceRecord kafkaMessage = kafkaMessages.get(i);
121+
assertEquals("mytopic", kafkaMessage.topic());
119122
assertNull(kafkaMessage.key());
120123
assertNull(kafkaMessage.valueSchema());
121124

@@ -148,6 +151,7 @@ public void verifyJmsMessageHeaders() throws Exception {
148151
final List<SourceRecord> kafkaMessages = connectTask.poll();
149152
assertEquals(1, kafkaMessages.size());
150153
final SourceRecord kafkaMessage = kafkaMessages.get(0);
154+
assertEquals("mytopic", kafkaMessage.topic());
151155
assertNull(kafkaMessage.key());
152156
assertNull(kafkaMessage.valueSchema());
153157

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
ALTER QMGR CHLAUTH(DISABLED)
2+
ALTER QMGR CONNAUTH(' ')
3+
REFRESH SECURITY TYPE(CONNAUTH)

src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java

Lines changed: 52 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.security.GeneralSecurityException;
3232
import java.security.KeyStore;
3333
import java.security.SecureRandom;
34-
import java.util.Map;
3534
import java.util.concurrent.atomic.AtomicBoolean;
3635

3736
import javax.jms.JMSConsumer;
@@ -45,6 +44,8 @@
4544
import javax.net.ssl.TrustManager;
4645
import javax.net.ssl.TrustManagerFactory;
4746

47+
import org.apache.kafka.common.config.AbstractConfig;
48+
import org.apache.kafka.common.config.types.Password;
4849
import org.apache.kafka.connect.errors.ConnectException;
4950
import org.apache.kafka.connect.errors.RetriableException;
5051
import org.apache.kafka.connect.source.SourceRecord;
@@ -61,7 +62,7 @@ public class JMSReader {
6162

6263
// Configs
6364
private String userName;
64-
private String password;
65+
private Password password;
6566
private String topic;
6667
private boolean messageBodyJms;
6768

@@ -93,116 +94,80 @@ public JMSReader() {
9394
*
9495
* @throws ConnectException Operation failed and connector should stop.
9596
*/
96-
public void configure(final Map<String, String> props) {
97+
public void configure(final AbstractConfig config) {
9798
log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(),
98-
props);
99-
100-
final String queueManager = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER);
101-
final String connectionMode = props.get(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_MODE);
102-
final String connectionNameList = props.get(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_NAME_LIST);
103-
final String channelName = props.get(MQSourceConnector.CONFIG_NAME_MQ_CHANNEL_NAME);
104-
final String queueName = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE);
105-
final String userName = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
106-
final String password = props.get(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
107-
final String ccdtUrl = props.get(MQSourceConnector.CONFIG_NAME_MQ_CCDT_URL);
108-
final String builderClass = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER);
109-
final String mbj = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS);
110-
final String mdr = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_MQMD_READ);
111-
final String sslCipherSuite = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE);
112-
final String sslPeerName = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_PEER_NAME);
113-
final String sslKeystoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION);
114-
final String sslKeystorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD);
115-
final String sslTruststoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION);
116-
final String sslTruststorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD);
117-
final String useMQCSP = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP);
118-
final String useIBMCipherMappings = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS);
119-
final String topic = props.get(MQSourceConnector.CONFIG_NAME_TOPIC);
120-
121-
if (useIBMCipherMappings != null) {
122-
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", useIBMCipherMappings);
123-
}
99+
config);
124100

125-
int transportType = WMQConstants.WMQ_CM_CLIENT;
126-
if (connectionMode != null) {
127-
if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT)) {
128-
transportType = WMQConstants.WMQ_CM_CLIENT;
129-
} else if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS)) {
130-
transportType = WMQConstants.WMQ_CM_BINDINGS;
131-
} else {
132-
log.error("Unsupported MQ connection mode {}", connectionMode);
133-
throw new ConnectException("Unsupported MQ connection mode");
134-
}
135-
}
101+
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings",
102+
config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS).toString());
103+
104+
final int transportType =
105+
config.getString(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_MODE)
106+
.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT) ?
107+
WMQConstants.WMQ_CM_CLIENT :
108+
WMQConstants.WMQ_CM_BINDINGS;
136109

137110
try {
138111
mqConnFactory = new MQConnectionFactory();
139112
mqConnFactory.setTransportType(transportType);
140-
mqConnFactory.setQueueManager(queueManager);
141-
mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
142-
if (useMQCSP != null) {
143-
mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP,
144-
Boolean.parseBoolean(useMQCSP));
145-
}
113+
mqConnFactory.setQueueManager(config.getString(MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER));
114+
mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP,
115+
config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP));
146116

147117
if (transportType == WMQConstants.WMQ_CM_CLIENT) {
118+
final String ccdtUrl = config.getString(MQSourceConnector.CONFIG_NAME_MQ_CCDT_URL);
119+
148120
if (ccdtUrl != null) {
149-
final URL ccdtUrlObject;
150-
try {
151-
ccdtUrlObject = new URL(ccdtUrl);
152-
} catch (final MalformedURLException e) {
153-
log.error("MalformedURLException exception {}", e);
154-
throw new ConnectException("CCDT file url invalid", e);
155-
}
156-
mqConnFactory.setCCDTURL(ccdtUrlObject);
121+
mqConnFactory.setCCDTURL(new URL(ccdtUrl));
157122
} else {
158-
mqConnFactory.setConnectionNameList(connectionNameList);
159-
mqConnFactory.setChannel(channelName);
123+
mqConnFactory.setConnectionNameList(config.getString(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_NAME_LIST));
124+
mqConnFactory.setChannel(config.getString(MQSourceConnector.CONFIG_NAME_MQ_CHANNEL_NAME));
160125
}
161126

162-
if (sslCipherSuite != null) {
163-
mqConnFactory.setSSLCipherSuite(sslCipherSuite);
164-
if (sslPeerName != null) {
165-
mqConnFactory.setSSLPeerName(sslPeerName);
166-
}
167-
}
127+
mqConnFactory.setSSLCipherSuite(config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE));
128+
mqConnFactory.setSSLPeerName(config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_PEER_NAME));
168129

130+
131+
final String sslKeystoreLocation = config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION);
132+
final Password sslKeystorePassword = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD);
133+
final String sslTruststoreLocation = config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION);
134+
final Password sslTruststorePassword = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD);
169135
if (sslKeystoreLocation != null || sslTruststoreLocation != null) {
170136
final SSLContext sslContext = buildSslContext(sslKeystoreLocation, sslKeystorePassword,
171137
sslTruststoreLocation, sslTruststorePassword);
172138
mqConnFactory.setSSLSocketFactory(sslContext.getSocketFactory());
173139
}
174140
}
175141

176-
queue = new MQQueue(queueName);
142+
queue = new MQQueue(config.getString(MQSourceConnector.CONFIG_NAME_MQ_QUEUE));
177143

178-
this.userName = userName;
179-
this.password = password;
144+
userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
145+
password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
180146

181-
this.messageBodyJms = false;
182-
queue.setMessageBodyStyle(WMQConstants.WMQ_MESSAGE_BODY_MQ);
183-
if (mbj != null) {
184-
if (Boolean.parseBoolean(mbj)) {
185-
this.messageBodyJms = true;
186-
queue.setMessageBodyStyle(WMQConstants.WMQ_MESSAGE_BODY_JMS);
187-
}
188-
}
147+
messageBodyJms = config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS);
148+
queue.setMessageBodyStyle(messageBodyJms ?
149+
WMQConstants.WMQ_MESSAGE_BODY_JMS :
150+
WMQConstants.WMQ_MESSAGE_BODY_MQ);
189151

190-
if (mdr != null) {
191-
if (Boolean.parseBoolean(mdr)) {
192-
queue.setBooleanProperty(WMQConstants.WMQ_MQMD_READ_ENABLED, true);
193-
}
194-
}
152+
queue.setBooleanProperty(WMQConstants.WMQ_MQMD_READ_ENABLED,
153+
config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_MQMD_READ));
154+
155+
topic = config.getString(MQSourceConnector.CONFIG_NAME_TOPIC);
195156

196-
this.topic = topic;
197157
} catch (JMSException | JMSRuntimeException jmse) {
198158
log.error("JMS exception {}", jmse);
199159
throw new ConnectException(jmse);
160+
} catch (final MalformedURLException e) {
161+
log.error("MalformedURLException exception {}", e);
162+
throw new ConnectException("CCDT file url invalid", e);
200163
}
201164

165+
166+
final String builderClass = config.getString(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER);
202167
try {
203168
final Class<? extends RecordBuilder> c = Class.forName(builderClass).asSubclass(RecordBuilder.class);
204169
builder = c.newInstance();
205-
builder.configure(props);
170+
builder.configure(config.originalsStrings());
206171
} catch (ClassNotFoundException | ClassCastException | IllegalAccessException | InstantiationException
207172
| NullPointerException exc) {
208173
log.error("Could not instantiate message builder {}", builderClass);
@@ -220,7 +185,7 @@ public void connect() {
220185

221186
try {
222187
if (userName != null) {
223-
jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED);
188+
jmsCtxt = mqConnFactory.createContext(userName, password.value(), JMSContext.SESSION_TRANSACTED);
224189
} else {
225190
jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED);
226191
}
@@ -373,7 +338,7 @@ private boolean connectInternal() {
373338
log.trace("[{}] Entry {}.connectInternal", Thread.currentThread().getId(), this.getClass().getName());
374339
try {
375340
if (userName != null) {
376-
jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED);
341+
jmsCtxt = mqConnFactory.createContext(userName, password.value(), JMSContext.SESSION_TRANSACTED);
377342
} else {
378343
jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED);
379344
}
@@ -498,8 +463,8 @@ private ConnectException handleException(final Throwable exc) {
498463
return new ConnectException(exc);
499464
}
500465

501-
private SSLContext buildSslContext(final String sslKeystoreLocation, final String sslKeystorePassword,
502-
final String sslTruststoreLocation, final String sslTruststorePassword) {
466+
private SSLContext buildSslContext(final String sslKeystoreLocation, final Password sslKeystorePassword,
467+
final String sslTruststoreLocation, final Password sslTruststorePassword) {
503468
log.trace("[{}] Entry {}.buildSslContext", Thread.currentThread().getId(), this.getClass().getName());
504469

505470
try {
@@ -508,7 +473,7 @@ private SSLContext buildSslContext(final String sslKeystoreLocation, final Strin
508473

509474
if (sslKeystoreLocation != null) {
510475
final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
511-
kmf.init(loadKeyStore(sslKeystoreLocation, sslKeystorePassword), sslKeystorePassword.toCharArray());
476+
kmf.init(loadKeyStore(sslKeystoreLocation, sslKeystorePassword), sslKeystorePassword.value().toCharArray());
512477
keyManagers = kmf.getKeyManagers();
513478
}
514479

@@ -530,12 +495,12 @@ private SSLContext buildSslContext(final String sslKeystoreLocation, final Strin
530495
}
531496
}
532497

533-
private KeyStore loadKeyStore(final String location, final String password) throws GeneralSecurityException {
498+
private KeyStore loadKeyStore(final String location, final Password password) throws GeneralSecurityException {
534499
log.trace("[{}] Entry {}.loadKeyStore", Thread.currentThread().getId(), this.getClass().getName());
535500

536501
try (final InputStream ksStr = new FileInputStream(location)) {
537502
final KeyStore ks = KeyStore.getInstance("JKS");
538-
ks.load(ksStr, password.toCharArray());
503+
ks.load(ksStr, password.value().toCharArray());
539504

540505
log.trace("[{}] Exit {}.loadKeyStore, retval={}", Thread.currentThread().getId(),
541506
this.getClass().getName(), ks);

0 commit comments

Comments
 (0)