Skip to content

Commit 6540b46

Browse files
Merge pull request #4 from ibm-messaging/master
Merge with master
2 parents c021627 + 29ba037 commit 6540b46

File tree

5 files changed

+75
-40
lines changed

5 files changed

+75
-40
lines changed

Dockerfile

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
FROM cp.icr.io/cp/icp4i/es/eventstreams-kafka-icp-linux-amd64:2019.4.2-1ec6a54 as builder
2-
1+
FROM strimzi/kafka:latest-kafka-2.5.0 as builder
32

43
FROM ibmjava:8-jre
54

README.md

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ This repository includes a Dockerfile to run Kafka Connect in distributed mode.
9090
1. `docker run -p 8083:8083 kafkaconnect-with-mq-source:0.0.1`
9191

9292
**NOTE:** To provide custom properties files create a folder called `config` containing the `connect-distributed.properties` and `connect-log4j.properties` files and use a Docker volume to make them available when running the container:
93-
`docker run -v $(pwd)/config:/opt/kafka/config -p 8083:8083 kafkaconnect:0.0.1`
93+
`docker run -v $(pwd)/config:/opt/kafka/config -p 8083:8083 kafkaconnect-with-mq-source:0.0.1`
9494

9595
## Deploying to Kubernetes
9696

@@ -213,30 +213,31 @@ For troubleshooting, or to better understand the handshake performed by the IBM
213213
## Configuration
214214
The configuration options for the Kafka Connect source connector for IBM MQ are as follows:
215215

216-
| Name | Description | Type | Default | Valid values |
217-
| ---------------------------- | ---------------------------------------------------------------------- | ------- | -------------- | ------------------------------------------------------- |
218-
| mq.queue.manager | The name of the MQ queue manager | string | | MQ queue manager name |
219-
| mq.connection.mode | The connection mode - bindings or client | string | client | client, bindings |
220-
| mq.connection.name.list | List of connection names for queue manager | string | | host(port)[,host(port),...] |
221-
| mq.channel.name | The name of the server-connection channel | string | | MQ channel name |
222-
| mq.queue | The name of the source MQ queue | string | | MQ queue name |
223-
| mq.user.name | The user name for authenticating with the queue manager | string | | User name |
224-
| mq.password | The password for authenticating with the queue manager | string | | Password |
225-
| mq.ccdt.url | The URL for the CCDT file containing MQ connection details | string | | URL for obtaining a CCDT file |
226-
| mq.record.builder | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder |
227-
| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | |
228-
| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes, JMSDestination |
229-
| mq.jms.properties.copy.to.kafka.headers | Whether to copy JMS message properties to Kafka headers | boolean | false | |
230-
| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite |
231-
| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern |
232-
| mq.ssl.keystore.location | The path to the JKS keystore to use for SSL (TLS) connections | string | JVM keystore | Local path to a JKS file |
233-
| mq.ssl.keystore.password | The password of the JKS keystore to use for SSL (TLS) connections | string | | |
234-
| mq.ssl.truststore.location | The path to the JKS truststore to use for SSL (TLS) connections | string | JVM truststore | Local path to a JKS file |
235-
| mq.ssl.truststore.password | The password of the JKS truststore to use for SSL (TLS) connections | string | | |
236-
| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater |
237-
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |
238-
| mq.user.authentication.mqcsp | Whether to use MQ connection security parameters (MQCSP) | boolean | true | |
239-
| topic | The name of the target Kafka topic | string | | Topic name |
216+
| Name | Description | Type | Default | Valid values |
217+
| --------------------------------------- | ---------------------------------------------------------------------- | ------- | -------------- | ------------------------------------------------------- |
218+
| topic | The name of the target Kafka topic | string | | Topic name |
219+
| mq.queue.manager | The name of the MQ queue manager | string | | MQ queue manager name |
220+
| mq.connection.mode | The connection mode - bindings or client | string | client | client, bindings |
221+
| mq.connection.name.list | List of connection names for queue manager | string | | host(port)[,host(port),...] |
222+
| mq.channel.name | The name of the server-connection channel | string | | MQ channel name |
223+
| mq.queue | The name of the source MQ queue | string | | MQ queue name |
224+
| mq.user.name | The user name for authenticating with the queue manager | string | | User name |
225+
| mq.password | The password for authenticating with the queue manager | string | | Password |
226+
| mq.user.authentication.mqcsp | Whether to use MQ connection security parameters (MQCSP) | boolean | true | |
227+
| mq.ccdt.url | The URL for the CCDT file containing MQ connection details | string | | URL for obtaining a CCDT file |
228+
| mq.record.builder | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder |
229+
| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | |
230+
| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes, JMSDestination |
231+
| mq.jms.properties.copy.to.kafka.headers | Whether to copy JMS message properties to Kafka headers | boolean | false | |
232+
| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite |
233+
| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern |
234+
| mq.ssl.keystore.location | The path to the JKS keystore to use for SSL (TLS) connections | string | JVM keystore | Local path to a JKS file |
235+
| mq.ssl.keystore.password | The password of the JKS keystore to use for SSL (TLS) connections | string | | |
236+
| mq.ssl.truststore.location | The path to the JKS truststore to use for SSL (TLS) connections | string | JVM truststore | Local path to a JKS file |
237+
| mq.ssl.truststore.password | The password of the JKS truststore to use for SSL (TLS) connections | string | | |
238+
| mq.ssl.use.ibm.cipher.mappings | Whether to set system property to control use of IBM cipher mappings | boolean | | |
239+
| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater |
240+
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |
240241

241242

242243
### Using a CCDT file
@@ -277,6 +278,10 @@ To use a file for the `mq.password` in Kubernetes, you create a Secret using the
277278

278279
You may receive an `org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed` error when trying to run the MQ source connector using SSL to connect to your Kafka cluster. In the case that the error is caused by the following exception: `Caused by: java.security.cert.CertificateException: No subject alternative DNS name matching XXXXX found.`, Java may be replacing the IP address of your cluster with the corresponding hostname in your `/etc/hosts` file. For example, to push Docker images to a custom Docker repository, you may add an entry in this file which corresponds to the IP of your repository e.g. `123.456.78.90 mycluster.icp`. To fix this, you can comment out this line in your `/etc/hosts` file.
279280

281+
### Unsupported cipher suite
282+
283+
When configuring TLS connection to MQ, you may find that the queue manager rejects the cipher suite, in spite of the name looking correct. There are two different naming conventions for cipher suites (https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.1.0/com.ibm.mq.dev.doc/q113220_.htm). Setting the configuration option `mq.ssl.use.ibm.cipher.mappings=false` often resolves cipher suite problems.
284+
280285

281286
## Support
282287
A commercially supported version of this connector is available for customers with a support entitlement for [IBM Event Streams](https://www.ibm.com/cloud/event-streams).

config/mq-source.properties

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2017, 2018, 2019 IBM Corporation
1+
# Copyright 2017, 2020 IBM Corporation
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -18,6 +18,9 @@ connector.class=com.ibm.eventstreams.connect.mqsource.MQSourceConnector
1818
# You can increase this for higher throughput, but message ordering will be lost
1919
tasks.max=1
2020

21+
# The name of the target Kafka topic - required
22+
topic=
23+
2124
# The name of the MQ queue manager - required
2225
mq.queue.manager=
2326

@@ -73,17 +76,26 @@ mq.record.builder=com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBu
7376
# The distinguished name pattern of the TLS (SSL) peer - optional
7477
# mq.ssl.peer.name=
7578

79+
# Location and password for the keystore and truststore for SSL (TLS) connections
80+
# mq.ssl.keystore.location=
81+
# mq.ssl.keystore.password=
82+
# mq.ssl.truststore.location=
83+
# mq.ssl.truststore.password=
84+
85+
# Whether to set system property to control use of IBM cipher mappings - optional
86+
# mq.ssl.use.ibm.cipher.mappings=false
87+
7688
# Whether to enable reading of all MQMD fields (default false) - optional
7789
# mq.message.mqmd.read=
7890

79-
# The name of the target Kafka topic - required
80-
topic=
91+
# The maximum number of messages in a batch (unit of work) - optional
92+
# mq.batch.size=250
8193

8294
# The converters control conversion of data between the internal Kafka Connect representation and the messages in Kafka.
8395
# key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
84-
# key.converter=org.apache.kafka.connect.storage.StringConverter
96+
key.converter=org.apache.kafka.connect.storage.StringConverter
8597
# key.converter=org.apache.kafka.connect.json.JsonConverter
8698

87-
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
88-
# value.converter=org.apache.kafka.connect.storage.StringConverter
99+
# value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
100+
value.converter=org.apache.kafka.connect.storage.StringConverter
89101
# value.converter=org.apache.kafka.connect.json.JsonConverter

src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2017, 2018, 2019 IBM Corporation
2+
* Copyright 2017, 2020 IBM Corporation
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,7 +27,9 @@
2727
import java.io.InputStream;
2828
import java.net.MalformedURLException;
2929
import java.net.URL;
30-
import java.security.*;
30+
import java.security.GeneralSecurityException;
31+
import java.security.KeyStore;
32+
import java.security.SecureRandom;
3133
import java.util.Map;
3234
import java.util.concurrent.atomic.AtomicBoolean;
3335

@@ -36,8 +38,10 @@
3638
import javax.jms.JMSException;
3739
import javax.jms.JMSRuntimeException;
3840
import javax.jms.Message;
41+
import javax.net.ssl.KeyManager;
3942
import javax.net.ssl.KeyManagerFactory;
4043
import javax.net.ssl.SSLContext;
44+
import javax.net.ssl.TrustManager;
4145
import javax.net.ssl.TrustManagerFactory;
4246

4347
import org.apache.kafka.connect.errors.ConnectException;
@@ -108,8 +112,13 @@ public void configure(Map<String, String> props) {
108112
String sslTruststoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION);
109113
String sslTruststorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD);
110114
String useMQCSP = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP);
115+
String useIBMCipherMappings = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS);
111116
String topic = props.get(MQSourceConnector.CONFIG_NAME_TOPIC);
112117

118+
if (useIBMCipherMappings != null) {
119+
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", useIBMCipherMappings);
120+
}
121+
113122
int transportType = WMQConstants.WMQ_CM_CLIENT;
114123
if (connectionMode != null) {
115124
if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT)) {
@@ -489,23 +498,26 @@ private SSLContext buildSslContext(String sslKeystoreLocation, String sslKeystor
489498
log.trace("[{}] Entry {}.buildSslContext", Thread.currentThread().getId(), this.getClass().getName());
490499

491500
try {
492-
final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
493-
final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
501+
KeyManager[] keyManagers = null;
502+
TrustManager[] trustManagers = null;
494503

495504
if (sslKeystoreLocation != null) {
505+
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
496506
kmf.init(loadKeyStore(sslKeystoreLocation, sslKeystorePassword), sslKeystorePassword.toCharArray());
507+
keyManagers = kmf.getKeyManagers();
497508
}
498509

499510
if (sslTruststoreLocation != null) {
511+
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
500512
tmf.init(loadKeyStore(sslTruststoreLocation, sslTruststorePassword));
513+
trustManagers = tmf.getTrustManagers();
501514
}
502515

503516
final SSLContext sslContext = SSLContext.getInstance("TLS");
504-
sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());
517+
sslContext.init(keyManagers, trustManagers, new SecureRandom());
505518

506519
log.trace("[{}] Exit {}.buildSslContext, retval={}", Thread.currentThread().getId(), this.getClass().getName(), sslContext);
507520
return sslContext;
508-
509521
} catch (GeneralSecurityException e) {
510522
throw new ConnectException("Error creating SSLContext", e);
511523
}
@@ -520,7 +532,6 @@ private KeyStore loadKeyStore(String location, String password) throws GeneralSe
520532

521533
log.trace("[{}] Exit {}.loadKeyStore, retval={}", Thread.currentThread().getId(), this.getClass().getName(), ks);
522534
return ks;
523-
524535
} catch (IOException e) {
525536
throw new ConnectException("Error reading keystore " + location, e);
526537
}

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ public class MQSourceConnector extends SourceConnector {
113113
public static final String CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_PASSWORD = "The password of the JKS truststore to use for the TLS (SSL) connection.";
114114
public static final String CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_PASSWORD = "SSL truststore password";
115115

116+
public static final String CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS = "mq.ssl.use.ibm.cipher.mappings";
117+
public static final String CONFIG_DOCUMENTATION_MQ_SSL_USE_IBM_CIPHER_MAPPINGS = "Whether to set system property to control use of IBM cipher mappings.";
118+
public static final String CONFIG_DISPLAY_MQ_SSL_USE_IBM_CIPHER_MAPPINGS = "Use IBM cipher mappings";
119+
116120
public static final String CONFIG_NAME_MQ_BATCH_SIZE = "mq.batch.size";
117121
public static final String CONFIG_DOCUMENTATION_MQ_BATCH_SIZE = "The maximum number of messages in a batch. A batch uses a single unit of work.";
118122
public static final String CONFIG_DISPLAY_MQ_BATCH_SIZE = "Batch size";
@@ -297,6 +301,10 @@ public class MQSourceConnector extends SourceConnector {
297301
CONFIG_DOCUMENTATION_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, CONFIG_GROUP_MQ, 21, Width.MEDIUM,
298302
CONFIG_DISPLAY_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER);
299303

304+
config.define(CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, Type.BOOLEAN, null, Importance.LOW,
305+
CONFIG_DOCUMENTATION_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, CONFIG_GROUP_MQ, 22, Width.SHORT,
306+
CONFIG_DISPLAY_MQ_SSL_USE_IBM_CIPHER_MAPPINGS);
307+
300308
config.define(CONFIG_NAME_TOPIC, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH,
301309
CONFIG_DOCUMENTATION_TOPIC, null, 0, Width.MEDIUM,
302310
CONFIG_DISPLAY_TOPIC);

0 commit comments

Comments
 (0)