Skip to content

Commit e443557

Browse files
Support SSL peer name
1 parent 536833b commit e443557

File tree

5 files changed

+39
-8
lines changed

5 files changed

+39
-8
lines changed

README.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,19 @@ In addition, there is another converter for the Avro format that is part of the
110110

111111

112112
## Security
113-
The connector supports authentication with user name and password and also connections secured with TLS using a server-side certificate. It does not currently support TLS mutual authentication with client-side certificates.
113+
The connector supports authentication with user name and password and also connections secured with TLS using a server-side certificate and mutual authentication with client-side certificates.
114114

115115
### Setting up TLS using a server-side certificate
116116
To enable use of TLS, set the configuration 'mq.ssl.cipher.suite' to the name of the cipher suite which matches the CipherSpec in the SSLCIPH attribute of the MQ server-connection channel. Use the table of supported cipher suites for MQ 9.0.x [here](https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.0.0/com.ibm.mq.dev.doc/q113220_.htm) as a reference. Note that the names of the CipherSpecs as used in the MQ configuration are not necessarily the same as the cipher suite names that the connector uses. The connector uses the JMS interface so it follows the Java conventions.
117117

118118
You will need to put the public part of the queue manager's certificate in the JSSE truststore used by the Kafka Connect worker that you're using to run the connector. If you need to specify extra arguments to the worker's JVM, you can use the EXTRA_ARGS environment variable.
119119

120+
### Setting up TLS for mutual authentication
121+
You will need to put the public part of the client's certificate in the queue manager's key repository. You will also need to configure the worker's JVM with the location and password for the keystore containing the client's certificate.
122+
123+
### Troubleshooting
124+
For troubleshooting, or to better understand the handshake performed by the IBM MQ Java client application in combination with your specific JSSE provider, you can enable debugging by setting `javax.net.debug=ssl` in the JVM environment.
125+
120126

121127
## Configuration
122128
The configuration options for the MQ Source Connector are as follows:
@@ -130,18 +136,19 @@ The configuration options for the MQ Source Connector are as follows:
130136
| mq.user.name | The user name for authenticating with the queue manager | string | | User name |
131137
| mq.password | The password for authenticating with the queue manager | string | | Password |
132138
| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | |
133-
| mq.ssl.cipher.suite | The name of the cipher suite for TLS connection | string | | Blank or valid cipher suite |
139+
| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite |
140+
| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern |
134141
| topic | The name of the target Kafka topic | string | | Topic name |
135142

136143

137144
## Future enhancements
138145
The connector is intentionally basic. The idea is to enhance it over time with additional features to make it more capable. Some possible future enhancements are:
139-
* TLS mutual authentication
140146
* Message key support
141147
* Configurable schema for MQ messages
142148
* JMX metrics
143149
* JSON parsing so that the JSON type information is supplied to the converter
144150
* Testing with the Confluent Platform Avro converter and Schema Registry
151+
* Separate TLS configuration for the connector so that keystore location and so on can be specified as configurations
145152

146153

147154
## Issues and contributions

config/mq-source.properties

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,13 @@ mq.queue=
3939
# Whether to interpret the message body as a JMS message type (default false) - optional
4040
# mq.message.body.jms=
4141

42-
# The name of the cipher suite for TLS connection (default blank, meaning do not use TLS) - optional
42+
# The name of the cipher suite for TLS (SSL) connection (default blank, meaning do not use TLS) - optional
4343
# See https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.0.0/com.ibm.mq.dev.doc/q113220_.htm for valid values
4444
# mq.ssl.cipher.suite=
4545

46+
# The distinguished name pattern of the TLS (SSL) peer - optional
47+
# mq.ssl.peer.name=
48+
4649
# The name of the target Kafka topic - required
4750
topic=
4851

src/main/java/com/ibm/mq/kafkaconnect/JMSReader.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class JMSReader {
5050
private String userName;
5151
private String password;
5252
private String sslCipherSuite;
53+
private String sslPeerName;
5354
private String topic;
5455
private boolean messageBodyJms;
5556

@@ -127,14 +128,24 @@ public void setMessageBodyJms(boolean messageBodyJms)
127128
/**
128129
* Setter for SSL-related configuration.
129130
*
130-
* @param sslCipherSuite
131+
* @param sslCipherSuite The name of the cipher suite for TLS (SSL) connection
132+
* @param sslPeerName The distinguished name pattern of the TLS (SSL) peer
131133
*/
132-
public void setSSLConfiguration(String sslCipherSuite)
134+
public void setSSLConfiguration(String sslCipherSuite, String sslPeerName)
133135
{
134136
this.sslCipherSuite = sslCipherSuite;
135137
if (this.sslCipherSuite != null)
136138
{
137139
mqConnFactory.setSSLCipherSuite(this.sslCipherSuite);
140+
if (this.sslPeerName != null)
141+
{
142+
try {
143+
mqConnFactory.setSSLPeerName(sslPeerName);
144+
}
145+
catch (JMSException jmse) {
146+
;
147+
}
148+
}
138149
}
139150
}
140151

src/main/java/com/ibm/mq/kafkaconnect/MQSourceConnector.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,13 @@ public class MQSourceConnector extends SourceConnector {
6464
public static final String CONFIG_DISPLAY_MQ_MESSAGE_BODY_JMS = "Message body as JMS";
6565

6666
public static final String CONFIG_NAME_MQ_SSL_CIPHER_SUITE = "mq.ssl.cipher.suite";
67-
public static final String CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE = "The name of the cipher suite for TLS (SSL) connection.";
67+
public static final String CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE = "The name of the cipher suite for the TLS (SSL) connection.";
6868
public static final String CONFIG_DISPLAY_MQ_SSL_CIPHER_SUITE = "SSL cipher suite";
6969

70+
public static final String CONFIG_NAME_MQ_SSL_PEER_NAME = "mq.ssl.peer.name";
71+
public static final String CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME = "The distinguished name pattern of the TLS (SSL) peer.";
72+
public static final String CONFIG_DISPLAY_MQ_SSL_PEER_NAME = "SSL peer name";
73+
7074
public static final String CONFIG_NAME_TOPIC = "topic";
7175
public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic.";
7276
public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic";
@@ -166,6 +170,10 @@ public class MQSourceConnector extends SourceConnector {
166170
CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE, CONFIG_GROUP_MQ, 8, Width.MEDIUM,
167171
CONFIG_DISPLAY_MQ_SSL_CIPHER_SUITE);
168172

173+
config.define(CONFIG_NAME_MQ_SSL_PEER_NAME, Type.STRING, null, Importance.MEDIUM,
174+
CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME, CONFIG_GROUP_MQ, 9, Width.MEDIUM,
175+
CONFIG_DISPLAY_MQ_SSL_PEER_NAME);
176+
169177
config.define(CONFIG_NAME_TOPIC, Type.STRING, null, Importance.HIGH,
170178
CONFIG_DOCUMENTATION_TOPIC, null, 0, Width.MEDIUM,
171179
CONFIG_DISPLAY_TOPIC);

src/main/java/com/ibm/mq/kafkaconnect/MQSourceTask.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class MQSourceTask extends SourceTask {
3737
private String userName;
3838
private String password;
3939
private String sslCipherSuite;
40+
private String sslPeerName;
4041
private String topic;
4142

4243
private static int BATCH_SIZE = 50;
@@ -71,13 +72,14 @@ public MQSourceTask() {
7172
userName = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
7273
password = props.get(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
7374
sslCipherSuite = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE);
75+
sslPeerName = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_PEER_NAME);
7476
topic = props.get(MQSourceConnector.CONFIG_NAME_TOPIC);
7577

7678
// Construct a reader to interface with MQ
7779
reader = new JMSReader(queueManager, connectionNameList, channelName, queueName, userName, password, topic);
7880

7981
if (sslCipherSuite != null) {
80-
reader.setSSLConfiguration(sslCipherSuite);
82+
reader.setSSLConfiguration(sslCipherSuite, sslPeerName);
8183
}
8284

8385
String mbj = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS);

0 commit comments

Comments
 (0)