Skip to content

Commit 7871cea

Browse files
Merge pull request #45 from MatthiasBechtold/master
Added configuration for SSL keystores
2 parents 0684079 + e19089c commit 7871cea

File tree

5 files changed

+118
-24
lines changed

5 files changed

+118
-24
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ COPY --chown=esuser:esgroup --from=builder /opt/kafka/libs/ /opt/kafka/libs/
1111
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-distributed.properties /opt/kafka/config/
1212
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-log4j.properties /opt/kafka/config/
1313
RUN mkdir /opt/kafka/logs && chown esuser:esgroup /opt/kafka/logs
14-
COPY --chown=esuser:esgroup target/kafka-connect-mq-source-1.1.1-jar-with-dependencies.jar /opt/kafka/libs/
14+
COPY --chown=esuser:esgroup target/kafka-connect-mq-source-1.2.0-SNAPSHOT-jar-with-dependencies.jar /opt/kafka/libs/
1515

1616
WORKDIR /opt/kafka
1717

README.md

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -213,24 +213,28 @@ For troubleshooting, or to better understand the handshake performed by the IBM
213213
## Configuration
214214
The configuration options for the Kafka Connect source connector for IBM MQ are as follows:
215215

216-
| Name | Description | Type | Default | Valid values |
217-
| ---------------------------- | ----------------------------------------------------------- | ------- | ------------- | ------------------------------------------------------- |
218-
| mq.queue.manager | The name of the MQ queue manager | string | | MQ queue manager name |
219-
| mq.connection.mode | The connection mode - bindings or client | string | client | client, bindings |
220-
| mq.connection.name.list | List of connection names for queue manager | string | | host(port)[,host(port),...] |
221-
| mq.channel.name | The name of the server-connection channel | string | | MQ channel name |
222-
| mq.queue | The name of the source MQ queue | string | | MQ queue name |
223-
| mq.user.name | The user name for authenticating with the queue manager | string | | User name |
224-
| mq.password | The password for authenticating with the queue manager | string | | Password |
225-
| mq.ccdt.url | The URL for the CCDT file containing MQ connection details | string | | URL for obtaining a CCDT file |
226-
| mq.record.builder | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder |
227-
| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | |
228-
| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes, JMSDestination |
229-
| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite |
230-
| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern |
231-
| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater |
232-
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |
233-
| topic | The name of the target Kafka topic | string | | Topic name |
216+
| Name | Description | Type | Default | Valid values |
217+
| ---------------------------- | ---------------------------------------------------------------------- | ------- | -------------- | ------------------------------------------------------- |
218+
| mq.queue.manager | The name of the MQ queue manager | string | | MQ queue manager name |
219+
| mq.connection.mode | The connection mode - bindings or client | string | client | client, bindings |
220+
| mq.connection.name.list | List of connection names for queue manager | string | | host(port)[,host(port),...] |
221+
| mq.channel.name | The name of the server-connection channel | string | | MQ channel name |
222+
| mq.queue | The name of the source MQ queue | string | | MQ queue name |
223+
| mq.user.name | The user name for authenticating with the queue manager | string | | User name |
224+
| mq.password | The password for authenticating with the queue manager | string | | Password |
225+
| mq.ccdt.url | The URL for the CCDT file containing MQ connection details | string | | URL for obtaining a CCDT file |
226+
| mq.record.builder | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder |
227+
| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | |
228+
| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes, JMSDestination |
229+
| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite |
230+
| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern |
231+
| mq.ssl.keystore.location | The path to the JKS keystore to use for SSL (TLS) connections | string | JVM keystore | Local path to a JKS file |
232+
| mq.ssl.keystore.password | The password of the JKS keystore to use for SSL (TLS) connections | string | | |
233+
| mq.ssl.truststore.location | The path to the JKS truststore to use for SSL (TLS) connections | string | JVM truststore | Local path to a JKS file |
234+
| mq.ssl.truststore.password | The password of the JKS truststore to use for SSL (TLS) connections | string | | |
235+
| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater |
236+
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |
237+
| topic | The name of the target Kafka topic | string | | Topic name |
234238

235239
### Using a CCDT file
236240
Some of the connection details for MQ can be provided in a [CCDT file](https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.1.0/com.ibm.mq.con.doc/q016730_.htm) by setting `mq.ccdt.url` in the MQ source connector configuration file. If using a CCDT file the `mq.connection.name.list` and `mq.channel.name` configuration options are not required.

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<groupId>com.ibm.eventstreams.connect</groupId>
2121
<artifactId>kafka-connect-mq-source</artifactId>
2222
<packaging>jar</packaging>
23-
<version>1.1.1</version>
23+
<version>1.2.0-SNAPSHOT</version>
2424
<name>kafka-connect-mq-source</name>
2525
<organization>
2626
<name>IBM Corporation</name>

src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,13 @@
2222
import com.ibm.mq.jms.*;
2323
import com.ibm.msg.client.wmq.WMQConstants;
2424

25+
import java.io.FileInputStream;
26+
import java.io.FileNotFoundException;
27+
import java.io.IOException;
28+
import java.io.InputStream;
2529
import java.net.MalformedURLException;
2630
import java.net.URL;
31+
import java.security.*;
2732
import java.util.Map;
2833
import java.util.concurrent.atomic.AtomicBoolean;
2934

@@ -32,6 +37,9 @@
3237
import javax.jms.JMSException;
3338
import javax.jms.JMSRuntimeException;
3439
import javax.jms.Message;
40+
import javax.net.ssl.KeyManagerFactory;
41+
import javax.net.ssl.SSLContext;
42+
import javax.net.ssl.TrustManagerFactory;
3543

3644
import org.apache.kafka.connect.errors.ConnectException;
3745
import org.apache.kafka.connect.errors.RetriableException;
@@ -96,6 +104,10 @@ public void configure(Map<String, String> props) {
96104
String mdr = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_MQMD_READ);
97105
String sslCipherSuite = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE);
98106
String sslPeerName = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_PEER_NAME);
107+
String sslKeystoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION);
108+
String sslKeystorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD);
109+
String sslTruststoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION);
110+
String sslTruststorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD);
99111
String topic = props.get(MQSourceConnector.CONFIG_NAME_TOPIC);
100112

101113
int transportType = WMQConstants.WMQ_CM_CLIENT;
@@ -142,6 +154,11 @@ else if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE
142154
mqConnFactory.setSSLPeerName(sslPeerName);
143155
}
144156
}
157+
158+
if (sslKeystoreLocation != null || sslTruststoreLocation != null) {
159+
final SSLContext sslContext = buildSslContext(sslKeystoreLocation, sslKeystorePassword, sslTruststoreLocation, sslTruststorePassword);
160+
mqConnFactory.setSSLSocketFactory(sslContext.getSocketFactory());
161+
}
145162
}
146163

147164
queue = new MQQueue(queueName);
@@ -464,4 +481,45 @@ else if (t instanceof JMSException) {
464481

465482
return new ConnectException(exc);
466483
}
484+
485+
private SSLContext buildSslContext(String sslKeystoreLocation, String sslKeystorePassword, String sslTruststoreLocation, String sslTruststorePassword) {
486+
log.trace("[{}] Entry {}.buildSslContext", Thread.currentThread().getId(), this.getClass().getName());
487+
488+
try {
489+
final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
490+
final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
491+
492+
if (sslKeystoreLocation != null) {
493+
kmf.init(loadKeyStore(sslKeystoreLocation, sslKeystorePassword), sslKeystorePassword.toCharArray());
494+
}
495+
496+
if (sslTruststoreLocation != null) {
497+
tmf.init(loadKeyStore(sslTruststoreLocation, sslTruststorePassword));
498+
}
499+
500+
final SSLContext sslContext = SSLContext.getInstance("TLS");
501+
sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());
502+
503+
log.trace("[{}] Exit {}.buildSslContext, retval={}", Thread.currentThread().getId(), this.getClass().getName(), sslContext);
504+
return sslContext;
505+
506+
} catch (GeneralSecurityException e) {
507+
throw new ConnectException("Error creating SSLContext", e);
508+
}
509+
}
510+
511+
private KeyStore loadKeyStore(String location, String password) throws GeneralSecurityException {
512+
log.trace("[{}] Entry {}.loadKeyStore", Thread.currentThread().getId(), this.getClass().getName());
513+
514+
try (final InputStream ksStr = new FileInputStream(location)) {
515+
final KeyStore ks = KeyStore.getInstance("JKS");
516+
ks.load(ksStr, password.toCharArray());
517+
518+
log.trace("[{}] Exit {}.loadKeyStore, retval={}", Thread.currentThread().getId(), this.getClass().getName(), ks);
519+
return ks;
520+
521+
} catch (IOException e) {
522+
throw new ConnectException("Error reading keystore " + location, e);
523+
}
524+
}
467525
}

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,22 @@ public class MQSourceConnector extends SourceConnector {
9393
public static final String CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME = "The distinguished name pattern of the TLS (SSL) peer.";
9494
public static final String CONFIG_DISPLAY_MQ_SSL_PEER_NAME = "SSL peer name";
9595

96+
public static final String CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION = "mq.ssl.keystore.location";
97+
public static final String CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_LOCATION = "The path to the JKS keystore to use for the TLS (SSL) connection.";
98+
public static final String CONFIG_DISPLAY_MQ_SSL_KEYSTORE_LOCATION = "SSL keystore location";
99+
100+
public static final String CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD = "mq.ssl.keystore.password";
101+
public static final String CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_PASSWORD = "The password of the JKS keystore to use for the TLS (SSL) connection.";
102+
public static final String CONFIG_DISPLAY_MQ_SSL_KEYSTORE_PASSWORD = "SSL keystore password";
103+
104+
public static final String CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION = "mq.ssl.truststore.location";
105+
public static final String CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_LOCATION = "The path to the JKS truststore to use for the TLS (SSL) connection.";
106+
public static final String CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_LOCATION = "SSL truststore location";
107+
108+
public static final String CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD = "mq.ssl.truststore.password";
109+
public static final String CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_PASSWORD = "The password of the JKS truststore to use for the TLS (SSL) connection.";
110+
public static final String CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_PASSWORD = "SSL truststore password";
111+
96112
public static final String CONFIG_NAME_MQ_BATCH_SIZE = "mq.batch.size";
97113
public static final String CONFIG_DOCUMENTATION_MQ_BATCH_SIZE = "The maximum number of messages in a batch. A batch uses a single unit of work.";
98114
public static final String CONFIG_DISPLAY_MQ_BATCH_SIZE = "Batch size";
@@ -107,7 +123,7 @@ public class MQSourceConnector extends SourceConnector {
107123
public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic.";
108124
public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic";
109125

110-
public static String VERSION = "1.1.1";
126+
public static String VERSION = "1.2.0-SNAPSHOT";
111127

112128
private Map<String, String> configProps;
113129

@@ -240,15 +256,31 @@ public class MQSourceConnector extends SourceConnector {
240256
CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME, CONFIG_GROUP_MQ, 13, Width.MEDIUM,
241257
CONFIG_DISPLAY_MQ_SSL_PEER_NAME);
242258

259+
config.define(CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION, Type.STRING, null, Importance.MEDIUM,
260+
CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_LOCATION, CONFIG_GROUP_MQ, 14, Width.MEDIUM,
261+
CONFIG_DISPLAY_MQ_SSL_KEYSTORE_LOCATION);
262+
263+
config.define(CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
264+
CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_PASSWORD, CONFIG_GROUP_MQ, 15, Width.MEDIUM,
265+
CONFIG_DISPLAY_MQ_SSL_KEYSTORE_PASSWORD);
266+
267+
config.define(CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION, Type.STRING, null, Importance.MEDIUM,
268+
CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_LOCATION, CONFIG_GROUP_MQ, 16, Width.MEDIUM,
269+
CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_LOCATION);
270+
271+
config.define(CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
272+
CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_PASSWORD, CONFIG_GROUP_MQ, 17, Width.MEDIUM,
273+
CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_PASSWORD);
274+
243275
config.define(CONFIG_NAME_MQ_BATCH_SIZE, Type.INT, CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT,
244276
ConfigDef.Range.atLeast(CONFIG_VALUE_MQ_BATCH_SIZE_MINIMUM), Importance.LOW,
245-
CONFIG_DOCUMENTATION_MQ_BATCH_SIZE, CONFIG_GROUP_MQ, 14, Width.MEDIUM,
277+
CONFIG_DOCUMENTATION_MQ_BATCH_SIZE, CONFIG_GROUP_MQ, 18, Width.MEDIUM,
246278
CONFIG_DISPLAY_MQ_BATCH_SIZE);
247279

248280
config.define(CONFIG_NAME_MQ_MESSAGE_MQMD_READ, Type.BOOLEAN, Boolean.FALSE, Importance.LOW,
249-
CONFIG_DOCUMENTATION_MQ_MESSAGE_MQMD_READ, CONFIG_GROUP_MQ, 15, Width.SHORT,
281+
CONFIG_DOCUMENTATION_MQ_MESSAGE_MQMD_READ, CONFIG_GROUP_MQ, 19, Width.SHORT,
250282
CONFIG_DISPLAY_MQ_MESSAGE_MQMD_READ);
251-
283+
252284
config.define(CONFIG_NAME_TOPIC, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH,
253285
CONFIG_DOCUMENTATION_TOPIC, null, 0, Width.MEDIUM,
254286
CONFIG_DISPLAY_TOPIC);

0 commit comments

Comments
 (0)