Skip to content

Commit c734ad0

Browse files
feat: Optional CCDT
Contributes to: mhub/qp-planning#1670 Signed-off-by: Andrew Dunnings <andrew.dunnings1@ibm.com>
1 parent 64077a5 commit c734ad0

File tree

4 files changed

+33
-7
lines changed

4 files changed

+33
-7
lines changed

config/mq-source.properties

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ tasks.max=1
2121
# The name of the MQ queue manager - required
2222
mq.queue.manager=
2323

24-
# A list of one or more host(port) entries for connecting to the queue manager. Entries are separated with a comma - required
24+
# A list of one or more host(port) entries for connecting to the queue manager. Entries are separated with a comma - required (unless using CCDT)
2525
mq.connection.name.list=
2626

27-
# The name of the server-connection channel - required
27+
# The name of the server-connection channel - equired (unless using CCDT)
2828
mq.channel.name=
2929

3030
# The name of the source MQ queue - required
@@ -36,6 +36,9 @@ mq.queue=
3636
# The password for authenticating with the queue manager - optional
3737
# mq.password=
3838

39+
# The CCDT URL to use to establish a connection to the queue manager - optional
40+
# mq.ccd.url=
41+
3942
# The record builders control conversion of data between the messages in MQ and the internal Kafka Connect representation - required
4043
mq.record.builder=com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder
4144
# mq.record.builder=com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder

src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.ibm.mq.jms.*;
2323
import com.ibm.msg.client.wmq.WMQConstants;
2424

25+
import java.net.MalformedURLException;
26+
import java.net.URL;
2527
import java.util.Map;
2628
import java.util.concurrent.atomic.AtomicBoolean;
2729

@@ -87,6 +89,7 @@ public void configure(Map<String, String> props) {
8789
String queueName = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE);
8890
String userName = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
8991
String password = props.get(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
92+
String ccdtUrl = props.get(MQSourceConnector.CONFIG_NAME_MQ_CCDT_URL);
9093
String builderClass = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER);
9194
String mbj = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS);
9295
String sslCipherSuite = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE);
@@ -97,9 +100,21 @@ public void configure(Map<String, String> props) {
97100
mqConnFactory = new MQConnectionFactory();
98101
mqConnFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
99102
mqConnFactory.setQueueManager(queueManager);
100-
mqConnFactory.setConnectionNameList(connectionNameList);
101-
mqConnFactory.setChannel(channelName);
102-
mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
103+
104+
if (ccdtUrl != null) {
105+
URL ccdtUrlObject;
106+
try {
107+
ccdtUrlObject = new URL(ccdtUrl);
108+
} catch (MalformedURLException e) {
109+
log.error("MalformedURLException exception {}", e);
110+
throw new ConnectException("CCDT file url invalid.");
111+
}
112+
mqConnFactory.setCCDTURL(ccdtUrlObject);
113+
} else {
114+
mqConnFactory.setConnectionNameList(connectionNameList);
115+
mqConnFactory.setChannel(channelName);
116+
// mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
117+
}
103118

104119
queue = new MQQueue(queueName);
105120

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ public class MQSourceConnector extends SourceConnector {
5959
public static final String CONFIG_DOCUMENTATION_MQ_PASSWORD = "The password for authenticating with the queue manager.";
6060
public static final String CONFIG_DISPLAY_MQ_PASSWORD = "Password";
6161

62+
public static final String CONFIG_NAME_MQ_CCDT_URL = "mq.ccdt.url";
63+
public static final String CONFIG_DOCUMENTATION_MQ_CCDT_URL = "The CCDT URL to use to establish a connection to the queue manager.";
64+
public static final String CONFIG_DISPLAY_MQ_CCDT_URL = "CCDT URL";
65+
6266
public static final String CONFIG_NAME_MQ_RECORD_BUILDER = "mq.record.builder";
6367
public static final String CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER = "The class used to build the Kafka Connect records.";
6468
public static final String CONFIG_DISPLAY_MQ_RECORD_BUILDER = "Record builder";
@@ -182,6 +186,10 @@ public class MQSourceConnector extends SourceConnector {
182186
CONFIG_DOCUMENTATION_MQ_PASSWORD, CONFIG_GROUP_MQ, 6, Width.MEDIUM,
183187
CONFIG_DISPLAY_MQ_PASSWORD);
184188

189+
config.define(CONFIG_NAME_MQ_CCDT_URL, Type.STRING, null, Importance.MEDIUM,
190+
CONFIG_DOCUMENTATION_MQ_CCDT_URL, CONFIG_GROUP_MQ, 6, Width.MEDIUM,
191+
CONFIG_DISPLAY_MQ_CCDT_URL);
192+
185193
config.define(CONFIG_NAME_MQ_RECORD_BUILDER, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH,
186194
CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER, CONFIG_GROUP_MQ, 7, Width.LONG,
187195
CONFIG_DISPLAY_MQ_RECORD_BUILDER);

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class MQSourceTask extends SourceTask {
3131
private static final Logger log = LoggerFactory.getLogger(MQSourceTask.class);
3232

3333
private static int BATCH_SIZE = 100;
34-
private static int MAXUMSGS = 10000;
34+
private static int MAXUMSGS = 10000; // What does this stand for, better name?
3535
private static int MAXUMSGS_DELAY_MS = 500;
3636

3737
private JMSReader reader;
@@ -89,7 +89,7 @@ public MQSourceTask() {
8989
SourceRecord src;
9090
do {
9191
// For the first message in the batch, wait a while if no message
92-
src = reader.receive(messageCount == 0 ? true : false);
92+
src = reader.receive(messageCount == 0);
9393
if (src != null) {
9494
msgs.add(src);
9595
messageCount++;

0 commit comments

Comments
 (0)