Skip to content

Commit c5f2023

Browse files
katherisGitHub Enterprise
authored andcommitted
Merge pull request #2 from mhub/feat/1670-authentication
Feat/1670 authentication
2 parents 7c4376c + f38ef10 commit c5f2023

File tree

5 files changed

+94
-22
lines changed

5 files changed

+94
-22
lines changed

README.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,13 +155,54 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
155155
| mq.queue | The name of the source MQ queue | string | | MQ queue name |
156156
| mq.user.name | The user name for authenticating with the queue manager | string | | User name |
157157
| mq.password | The password for authenticating with the queue manager | string | | Password |
158+
| mq.ccdt.url | The URL for the CCDT file containing MQ connection details | string | | URL for obtaining a CCDT file |
158159
| mq.record.builder | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder |
159160
| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | |
160161
| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes |
161162
| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite |
162163
| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern |
163164
| topic | The name of the target Kafka topic | string | | Topic name |
164165

166+
### Using a CCDT file
167+
Some of the connection details for MQ can be provided in a [CCDT file](https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.0.0/com.ibm.mq.con.doc/q016730_.htm) by setting `mq.ccdt.url` in the Kafka Connect source connector configuration file. If using a CCDT file the `mq.connection.name.list` and `mq.channel.name` configuration options are not required.
168+
169+
### Externalizing secrets
170+
[KIP 297](https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations) introduced a mechanism to externalize secrets to be used as configuration for Kafka connectors.
171+
172+
#### Example: externalizing secrets with FileConfigProvider
173+
174+
Given a file `secrets.properties` with the contents:
175+
```
176+
secret-key=password
177+
```
178+
179+
Update the worker configuration file to specify the FileConfigProvider which is included by default:
180+
181+
```
182+
# Additional properties for the worker configuration to enable use of ConfigProviders
183+
# multiple comma-separated provider types can be specified here
184+
config.providers=file
185+
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
186+
```
187+
188+
Update the connector configuration file to reference `secret-key` in the file:
189+
190+
```
191+
mq.password=${file:mq-secret.properties:secret-key}
192+
```
193+
194+
#### Using custom Config Providers
195+
196+
Custom config providers can also be enabled in the worker configuration file:
197+
```
198+
# Additional properties for the worker configuration to enable use of ConfigProviders
199+
# multiple comma-separated provider types can be specified here
200+
config.providers=file,other-provider
201+
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
202+
# Other ConfigProvider implementations might require parameters passed in to configure() as follows:
203+
config.providers.other-provider.param.foo=value1
204+
config.providers.other-provider.param.bar=value2
205+
```
165206

166207
## Troubleshooting
167208

config/mq-source.properties

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ tasks.max=1
2121
# The name of the MQ queue manager - required
2222
mq.queue.manager=
2323

24-
# A list of one or more host(port) entries for connecting to the queue manager. Entries are separated with a comma - required
24+
# A list of one or more host(port) entries for connecting to the queue manager. Entries are separated with a comma - required (unless using CCDT)
2525
mq.connection.name.list=
2626

27-
# The name of the server-connection channel - required
27+
# The name of the server-connection channel - required (unless using CCDT)
2828
mq.channel.name=
2929

3030
# The name of the source MQ queue - required
@@ -36,6 +36,14 @@ mq.queue=
3636
# The password for authenticating with the queue manager - optional
3737
# mq.password=
3838

39+
# Alternatively can use a ConfigProvider for externalising secrets (see README.md for more details)
40+
# Variable references are of the form ${provider:[path:]key} where the path is optional,
41+
# depending on the ConfigProvider implementation.
42+
# mq.password=${file:/var/run/secret.properties:secret-key}
43+
44+
# The CCDT URL to use to establish a connection to the queue manager - optional
45+
# mq.ccdt.url=
46+
3947
# The record builders control conversion of data between the messages in MQ and the internal Kafka Connect representation - required
4048
mq.record.builder=com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder
4149
# mq.record.builder=com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder

src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.ibm.mq.jms.*;
2323
import com.ibm.msg.client.wmq.WMQConstants;
2424

25+
import java.net.MalformedURLException;
26+
import java.net.URL;
2527
import java.util.Map;
2628
import java.util.concurrent.atomic.AtomicBoolean;
2729

@@ -87,6 +89,7 @@ public void configure(Map<String, String> props) {
8789
String queueName = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE);
8890
String userName = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
8991
String password = props.get(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
92+
String ccdtUrl = props.get(MQSourceConnector.CONFIG_NAME_MQ_CCDT_URL);
9093
String builderClass = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER);
9194
String mbj = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS);
9295
String sslCipherSuite = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE);
@@ -97,10 +100,22 @@ public void configure(Map<String, String> props) {
97100
mqConnFactory = new MQConnectionFactory();
98101
mqConnFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
99102
mqConnFactory.setQueueManager(queueManager);
100-
mqConnFactory.setConnectionNameList(connectionNameList);
101-
mqConnFactory.setChannel(channelName);
102103
mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
103104

105+
if (ccdtUrl != null) {
106+
URL ccdtUrlObject;
107+
try {
108+
ccdtUrlObject = new URL(ccdtUrl);
109+
} catch (MalformedURLException e) {
110+
log.error("MalformedURLException exception {}", e);
111+
throw new ConnectException("CCDT file url invalid.");
112+
}
113+
mqConnFactory.setCCDTURL(ccdtUrlObject);
114+
} else {
115+
mqConnFactory.setConnectionNameList(connectionNameList);
116+
mqConnFactory.setChannel(channelName);
117+
}
118+
104119
queue = new MQQueue(queueName);
105120

106121
this.userName = userName;

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ public class MQSourceConnector extends SourceConnector {
5959
public static final String CONFIG_DOCUMENTATION_MQ_PASSWORD = "The password for authenticating with the queue manager.";
6060
public static final String CONFIG_DISPLAY_MQ_PASSWORD = "Password";
6161

62+
public static final String CONFIG_NAME_MQ_CCDT_URL = "mq.ccdt.url";
63+
public static final String CONFIG_DOCUMENTATION_MQ_CCDT_URL = "The CCDT URL to use to establish a connection to the queue manager.";
64+
public static final String CONFIG_DISPLAY_MQ_CCDT_URL = "CCDT URL";
65+
6266
public static final String CONFIG_NAME_MQ_RECORD_BUILDER = "mq.record.builder";
6367
public static final String CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER = "The class used to build the Kafka Connect records.";
6468
public static final String CONFIG_DISPLAY_MQ_RECORD_BUILDER = "Record builder";
@@ -162,44 +166,48 @@ public class MQSourceConnector extends SourceConnector {
162166
CONFIG_DOCUMENTATION_MQ_QUEUE_MANAGER, CONFIG_GROUP_MQ, 1, Width.MEDIUM,
163167
CONFIG_DISPLAY_MQ_QUEUE_MANAGER);
164168

165-
config.define(CONFIG_NAME_MQ_CONNECTION_NAME_LIST, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH,
169+
config.define(CONFIG_NAME_MQ_CONNECTION_NAME_LIST, Type.STRING, null, Importance.MEDIUM,
166170
CONFIG_DOCUMENTATION_MQ_CONNNECTION_NAME_LIST, CONFIG_GROUP_MQ, 2, Width.LONG,
167171
CONFIG_DISPLAY_MQ_CONNECTION_NAME_LIST);
168172

169-
config.define(CONFIG_NAME_MQ_CHANNEL_NAME, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH,
173+
config.define(CONFIG_NAME_MQ_CHANNEL_NAME, Type.STRING, null, Importance.MEDIUM,
170174
CONFIG_DOCUMENTATION_MQ_CHANNEL_NAME, CONFIG_GROUP_MQ, 3, Width.MEDIUM,
171175
CONFIG_DISPLAY_MQ_CHANNEL_NAME);
172176

177+
config.define(CONFIG_NAME_MQ_CCDT_URL, Type.STRING, null, Importance.MEDIUM,
178+
CONFIG_DOCUMENTATION_MQ_CCDT_URL, CONFIG_GROUP_MQ, 4, Width.MEDIUM,
179+
CONFIG_DISPLAY_MQ_CCDT_URL);
180+
173181
config.define(CONFIG_NAME_MQ_QUEUE, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH,
174-
CONFIG_DOCUMENTATION_MQ_QUEUE, CONFIG_GROUP_MQ, 4, Width.LONG,
182+
CONFIG_DOCUMENTATION_MQ_QUEUE, CONFIG_GROUP_MQ, 5, Width.LONG,
175183
CONFIG_DISPLAY_MQ_QUEUE);
176184

177185
config.define(CONFIG_NAME_MQ_USER_NAME, Type.STRING, null, Importance.MEDIUM,
178-
CONFIG_DOCUMENTATION_MQ_USER_NAME, CONFIG_GROUP_MQ, 5, Width.MEDIUM,
186+
CONFIG_DOCUMENTATION_MQ_USER_NAME, CONFIG_GROUP_MQ, 6, Width.MEDIUM,
179187
CONFIG_DISPLAY_MQ_USER_NAME);
180188

181189
config.define(CONFIG_NAME_MQ_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
182-
CONFIG_DOCUMENTATION_MQ_PASSWORD, CONFIG_GROUP_MQ, 6, Width.MEDIUM,
190+
CONFIG_DOCUMENTATION_MQ_PASSWORD, CONFIG_GROUP_MQ, 7, Width.MEDIUM,
183191
CONFIG_DISPLAY_MQ_PASSWORD);
184192

185193
config.define(CONFIG_NAME_MQ_RECORD_BUILDER, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH,
186-
CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER, CONFIG_GROUP_MQ, 7, Width.LONG,
194+
CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER, CONFIG_GROUP_MQ, 8, Width.LONG,
187195
CONFIG_DISPLAY_MQ_RECORD_BUILDER);
188196

189197
config.define(CONFIG_NAME_MQ_MESSAGE_BODY_JMS, Type.BOOLEAN, Boolean.FALSE, Importance.MEDIUM,
190-
CONFIG_DOCUMENTATION_MQ_MESSAGE_BODY_JMS, CONFIG_GROUP_MQ, 8, Width.SHORT,
198+
CONFIG_DOCUMENTATION_MQ_MESSAGE_BODY_JMS, CONFIG_GROUP_MQ, 9, Width.SHORT,
191199
CONFIG_DISPLAY_MQ_MESSAGE_BODY_JMS);
192200

193201
config.define(CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER, Type.STRING, null, Importance.MEDIUM,
194-
CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_KEY_HEADER, CONFIG_GROUP_MQ, 9, Width.MEDIUM,
202+
CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_KEY_HEADER, CONFIG_GROUP_MQ, 10, Width.MEDIUM,
195203
CONFIG_DISPLAY_MQ_RECORD_BUILDER_KEY_HEADER);
196204

197205
config.define(CONFIG_NAME_MQ_SSL_CIPHER_SUITE, Type.STRING, null, Importance.MEDIUM,
198-
CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE, CONFIG_GROUP_MQ, 10, Width.MEDIUM,
206+
CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE, CONFIG_GROUP_MQ, 11, Width.MEDIUM,
199207
CONFIG_DISPLAY_MQ_SSL_CIPHER_SUITE);
200208

201209
config.define(CONFIG_NAME_MQ_SSL_PEER_NAME, Type.STRING, null, Importance.MEDIUM,
202-
CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME, CONFIG_GROUP_MQ, 11, Width.MEDIUM,
210+
CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME, CONFIG_GROUP_MQ, 12, Width.MEDIUM,
203211
CONFIG_DISPLAY_MQ_SSL_PEER_NAME);
204212

205213
config.define(CONFIG_NAME_TOPIC, Type.STRING, null, Importance.HIGH,

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ public class MQSourceTask extends SourceTask {
3131
private static final Logger log = LoggerFactory.getLogger(MQSourceTask.class);
3232

3333
private static int BATCH_SIZE = 100;
34-
private static int MAXUMSGS = 10000;
35-
private static int MAXUMSGS_DELAY_MS = 500;
34+
private static int MAX_UNCOMMITTED_MSGS = 10000;
35+
private static int MAX_UNCOMMITTED_MSGS_DELAY_MS = 500;
3636

3737
private JMSReader reader;
3838
private AtomicInteger uncommittedMessages = new AtomicInteger(0);
@@ -81,27 +81,27 @@ public MQSourceTask() {
8181

8282
final List<SourceRecord> msgs = new ArrayList<>();
8383
int messageCount = 0;
84-
int uncom = this.uncommittedMessages.get();
84+
int uncommittedMessagesInt = this.uncommittedMessages.get();
8585

86-
if (uncom < MAXUMSGS) {
86+
if (uncommittedMessagesInt < MAX_UNCOMMITTED_MSGS) {
8787
log.info("Polling for records");
8888

8989
SourceRecord src;
9090
do {
9191
// For the first message in the batch, wait a while if no message
92-
src = reader.receive(messageCount == 0 ? true : false);
92+
src = reader.receive(messageCount == 0);
9393
if (src != null) {
9494
msgs.add(src);
9595
messageCount++;
96-
uncom = this.uncommittedMessages.incrementAndGet();
96+
uncommittedMessagesInt = this.uncommittedMessages.incrementAndGet();
9797
}
98-
} while ((src != null) && (messageCount < BATCH_SIZE) && (uncom < MAXUMSGS));
98+
} while ((src != null) && (messageCount < BATCH_SIZE) && (uncommittedMessagesInt < MAX_UNCOMMITTED_MSGS));
9999

100100
log.debug("Poll returning {} records", messageCount);
101101
}
102102
else {
103103
log.info("Uncommitted message limit reached");
104-
Thread.sleep(MAXUMSGS_DELAY_MS);
104+
Thread.sleep(MAX_UNCOMMITTED_MSGS_DELAY_MS);
105105
}
106106

107107
log.trace("[{}] Exit {}.poll, retval={}", Thread.currentThread().getId(), this.getClass().getName(), messageCount);

0 commit comments

Comments
 (0)