Skip to content

Commit 09c8305

Browse files
committed
feat: Update readme and re-order config options
Contributes to mhub/qp-planning#1670 Signed-off-by: Katherine Stanley <katheris@uk.ibm.com>
1 parent 26178fe commit 09c8305

File tree

4 files changed

+51
-27
lines changed

4 files changed

+51
-27
lines changed

README.md

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -164,20 +164,44 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
164164
| topic | The name of the target Kafka topic | string | | Topic name |
165165

166166
### Using a CCDT file
167-
Some of the connection details for MQ can be provided in a [CCDT file](https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.0.0/com.ibm.mq.con.doc/q016730_.htm) by setting `mq.ccdt.url` in the Kafka Connect source connector configuration file. If this is set the following configuration options are no longer required: `mq.connection.name.list`, `mq.channel.name`.
167+
Some of the connection details for MQ can be provided in a [CCDT file](https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.0.0/com.ibm.mq.con.doc/q016730_.htm) by setting `mq.ccdt.url` in the Kafka Connect source connector configuration file. If using a CCDT file the `mq.connection.name.list` and `mq.channel.name` configuration options are not required.
168168

169169
### Externalizing secrets
170-
[KIP 297](https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations) introduced a mechanism to externalize secrets to be used as configuration for Kafka connectors. If using this mechanism the desired config provider must be configured in the Worker configuration file:
170+
[KIP 297](https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations) introduced a mechanism to externalize secrets to be used as configuration for Kafka connectors.
171171

172+
#### Example: externalizing secrets with FileConfigProvider
173+
174+
Given a file `secrets.properties` with the contents:
175+
```
176+
secret-key=password
177+
```
178+
179+
Update the worker configuration file to specify the FileConfigProvider which is included by default:
180+
181+
```
182+
# Additional properties for the worker configuration to enable use of ConfigProviders
183+
# multiple comma-separated provider types can be specified here
184+
config.providers=file
185+
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
186+
```
187+
188+
Update the connector configuration file to reference `secret-key` in the file:
189+
190+
```
191+
mq.password=${file:mq-secret.properties:secret-key}
192+
```
193+
194+
#### Using custom Config Providers
195+
196+
Custom config providers can also be enabled in the worker configuration file:
172197
```
173198
# Additional properties for the worker configuration to enable use of ConfigProviders
174199
# multiple comma-separated provider types can be specified here
175-
# config.providers=file
176-
# config.providers=file,other-provider
177-
# config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
200+
config.providers=file,other-provider
201+
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
178202
# Other ConfigProvider implementations might require parameters passed in to configure() as follows:
179-
# config.providers.other-provider.param.foo=value1
180-
# config.providers.other-provider.param.bar=value2
203+
config.providers.other-provider.param.foo=value1
204+
config.providers.other-provider.param.bar=value2
181205
```
182206

183207
## Support

src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ public void configure(Map<String, String> props) {
100100
mqConnFactory = new MQConnectionFactory();
101101
mqConnFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
102102
mqConnFactory.setQueueManager(queueManager);
103+
mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
103104

104105
if (ccdtUrl != null) {
105106
URL ccdtUrlObject;
@@ -113,7 +114,6 @@ public void configure(Map<String, String> props) {
113114
} else {
114115
mqConnFactory.setConnectionNameList(connectionNameList);
115116
mqConnFactory.setChannel(channelName);
116-
// mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
117117
}
118118

119119
queue = new MQQueue(queueName);

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -174,40 +174,40 @@ public class MQSourceConnector extends SourceConnector {
174174
CONFIG_DOCUMENTATION_MQ_CHANNEL_NAME, CONFIG_GROUP_MQ, 3, Width.MEDIUM,
175175
CONFIG_DISPLAY_MQ_CHANNEL_NAME);
176176

177+
config.define(CONFIG_NAME_MQ_CCDT_URL, Type.STRING, null, Importance.MEDIUM,
178+
CONFIG_DOCUMENTATION_MQ_CCDT_URL, CONFIG_GROUP_MQ, 4, Width.MEDIUM,
179+
CONFIG_DISPLAY_MQ_CCDT_URL);
180+
177181
config.define(CONFIG_NAME_MQ_QUEUE, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH,
178-
CONFIG_DOCUMENTATION_MQ_QUEUE, CONFIG_GROUP_MQ, 4, Width.LONG,
182+
CONFIG_DOCUMENTATION_MQ_QUEUE, CONFIG_GROUP_MQ, 5, Width.LONG,
179183
CONFIG_DISPLAY_MQ_QUEUE);
180184

181185
config.define(CONFIG_NAME_MQ_USER_NAME, Type.STRING, null, Importance.MEDIUM,
182-
CONFIG_DOCUMENTATION_MQ_USER_NAME, CONFIG_GROUP_MQ, 5, Width.MEDIUM,
186+
CONFIG_DOCUMENTATION_MQ_USER_NAME, CONFIG_GROUP_MQ, 6, Width.MEDIUM,
183187
CONFIG_DISPLAY_MQ_USER_NAME);
184188

185189
config.define(CONFIG_NAME_MQ_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
186-
CONFIG_DOCUMENTATION_MQ_PASSWORD, CONFIG_GROUP_MQ, 6, Width.MEDIUM,
190+
CONFIG_DOCUMENTATION_MQ_PASSWORD, CONFIG_GROUP_MQ, 7, Width.MEDIUM,
187191
CONFIG_DISPLAY_MQ_PASSWORD);
188192

189-
config.define(CONFIG_NAME_MQ_CCDT_URL, Type.STRING, null, Importance.MEDIUM,
190-
CONFIG_DOCUMENTATION_MQ_CCDT_URL, CONFIG_GROUP_MQ, 6, Width.MEDIUM,
191-
CONFIG_DISPLAY_MQ_CCDT_URL);
192-
193193
config.define(CONFIG_NAME_MQ_RECORD_BUILDER, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH,
194-
CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER, CONFIG_GROUP_MQ, 7, Width.LONG,
194+
CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER, CONFIG_GROUP_MQ, 8, Width.LONG,
195195
CONFIG_DISPLAY_MQ_RECORD_BUILDER);
196196

197197
config.define(CONFIG_NAME_MQ_MESSAGE_BODY_JMS, Type.BOOLEAN, Boolean.FALSE, Importance.MEDIUM,
198-
CONFIG_DOCUMENTATION_MQ_MESSAGE_BODY_JMS, CONFIG_GROUP_MQ, 8, Width.SHORT,
198+
CONFIG_DOCUMENTATION_MQ_MESSAGE_BODY_JMS, CONFIG_GROUP_MQ, 9, Width.SHORT,
199199
CONFIG_DISPLAY_MQ_MESSAGE_BODY_JMS);
200200

201201
config.define(CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER, Type.STRING, null, Importance.MEDIUM,
202-
CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_KEY_HEADER, CONFIG_GROUP_MQ, 9, Width.MEDIUM,
202+
CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_KEY_HEADER, CONFIG_GROUP_MQ, 10, Width.MEDIUM,
203203
CONFIG_DISPLAY_MQ_RECORD_BUILDER_KEY_HEADER);
204204

205205
config.define(CONFIG_NAME_MQ_SSL_CIPHER_SUITE, Type.STRING, null, Importance.MEDIUM,
206-
CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE, CONFIG_GROUP_MQ, 10, Width.MEDIUM,
206+
CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE, CONFIG_GROUP_MQ, 11, Width.MEDIUM,
207207
CONFIG_DISPLAY_MQ_SSL_CIPHER_SUITE);
208208

209209
config.define(CONFIG_NAME_MQ_SSL_PEER_NAME, Type.STRING, null, Importance.MEDIUM,
210-
CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME, CONFIG_GROUP_MQ, 11, Width.MEDIUM,
210+
CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME, CONFIG_GROUP_MQ, 12, Width.MEDIUM,
211211
CONFIG_DISPLAY_MQ_SSL_PEER_NAME);
212212

213213
config.define(CONFIG_NAME_TOPIC, Type.STRING, null, Importance.HIGH,

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ public class MQSourceTask extends SourceTask {
3131
private static final Logger log = LoggerFactory.getLogger(MQSourceTask.class);
3232

3333
private static int BATCH_SIZE = 100;
34-
private static int MAXUMSGS = 10000; // What does this stand for, better name?
35-
private static int MAXUMSGS_DELAY_MS = 500;
34+
private static int MAX_UNCOMMITTED_MSGS = 10000;
35+
private static int MAX_UNCOMMITTED_MSGS_DELAY_MS = 500;
3636

3737
private JMSReader reader;
3838
private AtomicInteger uncommittedMessages = new AtomicInteger(0);
@@ -81,9 +81,9 @@ public MQSourceTask() {
8181

8282
final List<SourceRecord> msgs = new ArrayList<>();
8383
int messageCount = 0;
84-
int uncom = this.uncommittedMessages.get();
84+
int uncommittedMessagesInt = this.uncommittedMessages.get();
8585

86-
if (uncom < MAXUMSGS) {
86+
if (uncommittedMessagesInt < MAX_UNCOMMITTED_MSGS) {
8787
log.info("Polling for records");
8888

8989
SourceRecord src;
@@ -93,15 +93,15 @@ public MQSourceTask() {
9393
if (src != null) {
9494
msgs.add(src);
9595
messageCount++;
96-
uncom = this.uncommittedMessages.incrementAndGet();
96+
uncommittedMessagesInt = this.uncommittedMessages.incrementAndGet();
9797
}
98-
} while ((src != null) && (messageCount < BATCH_SIZE) && (uncom < MAXUMSGS));
98+
} while ((src != null) && (messageCount < BATCH_SIZE) && (uncommittedMessagesInt < MAX_UNCOMMITTED_MSGS));
9999

100100
log.debug("Poll returning {} records", messageCount);
101101
}
102102
else {
103103
log.info("Uncommitted message limit reached");
104-
Thread.sleep(MAXUMSGS_DELAY_MS);
104+
Thread.sleep(MAX_UNCOMMITTED_MSGS_DELAY_MS);
105105
}
106106

107107
log.trace("[{}] Exit {}.poll, retval={}", Thread.currentThread().getId(), this.getClass().getName(), messageCount);

0 commit comments

Comments
 (0)