Skip to content

Commit f345f8c

Browse files
Support for TLS with server cert
1 parent 4801a06 commit f345f8c

File tree

5 files changed

+53
-5
lines changed

5 files changed

+53
-5
lines changed

README.md

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ Change directory into the `kafka-connect-mq-source` directory:
2222
cd kafka-connect-mq-source
2323
```
2424

25-
Copy the JAR file `allclient-9.0.0.1.jar` that you unpacked from the ZIP file earlier into the `kafka-connect-mq-source` directory.
25+
Copy the JAR file `com.ibm.mq.allclient.jar` that you unpacked from the ZIP file earlier into the `kafka-connect-mq-source` directory.
2626

2727
Run the following command to create a local Maven repository containing just this file so that it can be used to build the connector:
2828
```shell
29-
mvn deploy:deploy-file -Durl=file://local-maven-repo -Dfile=allclient-9.0.0.1.jar -DgroupId=com.ibm.mq -DartifactId=allclient -Dpackaging=jar -Dversion=9.0.0.1
29+
mvn deploy:deploy-file -Durl=file://local-maven-repo -Dfile=com.ibm.mq.allclient.jar -DgroupId=com.ibm.mq -DartifactId=allclient -Dpackaging=jar -Dversion=9.0.0.1
3030
```
3131

3232
Build the connector using Maven:
@@ -109,6 +109,15 @@ There are three basic converters built into Apache Kafka, with the likely useful
109109
In addition, there is another converter for the Avro format that is part of the Confluent Platform. This has not been tested with the MQ source connector at this time.
110110

111111

112+
## Security
113+
The connector supports authentication with user name and password and also connections secured with TLS using a server-side certificate. It does not currently support TLS mutual authentication with client-side certificates.
114+
115+
### Setting up TLS using a server-side certificate
116+
To enable use of TLS, set the configuration 'mq.ssl.cipher.suite' to the name of the cipher suite which matches the CipherSpec in the SSLCIPH attribute of the MQ server-connection channel. Use the table of supported cipher suites for MQ 9.0.x [here](https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.0.0/com.ibm.mq.dev.doc/q113220_.htm) as a reference. Note that the names of the CipherSpecs as used in the MQ configuration are not necessarily the same as the cipher suite names that the connector uses. The connector uses the JMS interface so it follows the Java conventions.
117+
118+
You will need to put the public part of the queue manager's certificate in a JSSE truststore and then provide the location of the truststore as property to the Kafka Connect worker that you're using to run the connector.
119+
120+
112121
## Configuration
113122
The configuration options for the MQ Source Connector are as follows:
114123

@@ -121,19 +130,24 @@ The configuration options for the MQ Source Connector are as follows:
121130
| mq.user.name | The user name for authenticating with the queue manager | string | | User name |
122131
| mq.password | The password for authenticating with the queue manager | string | | Password |
123132
| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | |
133+
| mq.ssl.cipher.suite | The name of the cipher suite for TLS connection | string | | Blank or valid cipher suite |
124134
| topic | The name of the target Kafka topic | string | | Topic name |
125135

126136

127137
## Future enhancements
128-
The first version of the connector is intentionally basic. The idea is to enhance it with additional features to make it more capable. Some possible future enhancements are:
129-
* TLS connections
138+
The connector is intentionally basic. The idea is to enhance it over time with additional features to make it more capable. Some possible future enhancements are:
139+
* TLS mutual authentication
130140
* Message key support
131141
* Configurable schema for MQ messages
132142
* JMX metrics
133143
* JSON parsing so that the JSON type information is supplied to the converter
134144
* Testing with the Confluent Platform Avro converter and Schema Registry
135145

136146

147+
## Issues and contributions
148+
For issues relating specifically to this connect, please use the [GitHub issue tracker](https://github.com/ibm-messaging/kafka-connect-mq-source/issues). If you do submit a Pull Request related to this connector, please indicate in the Pull Request that you accept and agree to be bound by the terms of the [IBM Contributor License Agreement](CLA.md).
149+
150+
137151
## License
138152
Copyright 2017 IBM Corporation
139153

config/mq-source.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ mq.queue=
3939
# Whether to interpret the message body as a JMS message type (default false) - optional
4040
# mq.message.body.jms=
4141

42+
# The name of the cipher suite for TLS connection (default blank, meaning do not use TLS) - optional
43+
# See https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.0.0/com.ibm.mq.dev.doc/q113220_.htm for valid values
44+
# mq.ssl.cipher.suite=
45+
4246
# The name of the target Kafka topic - required
4347
topic=
4448

src/main/java/com/ibm/mq/kafkaconnect/JMSReader.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class JMSReader {
4949
// Configs
5050
private String userName;
5151
private String password;
52+
private String sslCipherSuite;
5253
private String topic;
5354
private boolean messageBodyJms;
5455

@@ -123,6 +124,20 @@ public void setMessageBodyJms(boolean messageBodyJms)
123124
}
124125
}
125126

127+
/**
128+
* Setter for SSL-related configuration.
129+
*
130+
* @param sslCipherSuite
131+
*/
132+
public void setSSLConfiguration(String sslCipherSuite)
133+
{
134+
this.sslCipherSuite = sslCipherSuite;
135+
if (this.sslCipherSuite != null)
136+
{
137+
mqConnFactory.setSSLCipherSuite(this.sslCipherSuite);
138+
}
139+
}
140+
126141
/**
127142
* Connects to MQ.
128143
*
@@ -290,7 +305,8 @@ private void handleException(Throwable exc) throws ConnectException, RetriableEx
290305
while (t != null) {
291306
if (t instanceof MQException) {
292307
MQException mqe = (MQException)t;
293-
log.error("MQ error: CompCode {}, Reason {}", mqe.getCompCode(), mqe.getReason());
308+
log.error("MQ error: CompCode {}, Reason {} {}", mqe.getCompCode(), mqe.getReason(),
309+
MQConstants.lookupReasonCode(mqe.getReason()));
294310
reason = mqe.getReason();
295311
break;
296312
}

src/main/java/com/ibm/mq/kafkaconnect/MQSourceConnector.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ public class MQSourceConnector extends SourceConnector {
6363
public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_BODY_JMS = "Whether to interpret the message body as a JMS message type.";
6464
public static final String CONFIG_DISPLAY_MQ_MESSAGE_BODY_JMS = "Message body as JMS";
6565

66+
public static final String CONFIG_NAME_MQ_SSL_CIPHER_SUITE = "mq.ssl.cipher.suite";
67+
public static final String CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE = "The name of the cipher suite for TLS (SSL) connection.";
68+
public static final String CONFIG_DISPLAY_MQ_SSL_CIPHER_SUITE = "SSL cipher suite";
69+
6670
public static final String CONFIG_NAME_TOPIC = "topic";
6771
public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic.";
6872
public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic";
@@ -158,6 +162,10 @@ public class MQSourceConnector extends SourceConnector {
158162
CONFIG_DOCUMENTATION_MQ_MESSAGE_BODY_JMS, CONFIG_GROUP_MQ, 7, Width.SHORT,
159163
CONFIG_DISPLAY_MQ_MESSAGE_BODY_JMS);
160164

165+
config.define(CONFIG_NAME_MQ_SSL_CIPHER_SUITE, Type.STRING, null, Importance.MEDIUM,
166+
CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE, CONFIG_GROUP_MQ, 8, Width.MEDIUM,
167+
CONFIG_DISPLAY_MQ_SSL_CIPHER_SUITE);
168+
161169
config.define(CONFIG_NAME_TOPIC, Type.STRING, null, Importance.HIGH,
162170
CONFIG_DOCUMENTATION_TOPIC, null, 0, Width.MEDIUM,
163171
CONFIG_DISPLAY_TOPIC);

src/main/java/com/ibm/mq/kafkaconnect/MQSourceTask.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class MQSourceTask extends SourceTask {
3636
private String queueName;
3737
private String userName;
3838
private String password;
39+
private String sslCipherSuite;
3940
private String topic;
4041

4142
private static int BATCH_SIZE = 50;
@@ -69,11 +70,16 @@ public MQSourceTask() {
6970
queueName = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE);
7071
userName = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
7172
password = props.get(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
73+
sslCipherSuite = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE);
7274
topic = props.get(MQSourceConnector.CONFIG_NAME_TOPIC);
7375

7476
// Construct a reader to interface with MQ
7577
reader = new JMSReader(queueManager, connectionNameList, channelName, queueName, userName, password, topic);
7678

79+
if (sslCipherSuite != null) {
80+
reader.setSSLConfiguration(sslCipherSuite);
81+
}
82+
7783
String mbj = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS);
7884
if (mbj != null) {
7985
reader.setMessageBodyJms(Boolean.parseBoolean(mbj));

0 commit comments

Comments
 (0)