Skip to content

Commit 7bbe931

Browse files
Enhance tracing and support MQ on Cloud
1 parent b100ff8 commit 7bbe931

File tree

8 files changed

+125
-47
lines changed

8 files changed

+125
-47
lines changed

.gitignore

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,4 @@ target/
33
.classpath
44
.settings/
55
.project
6-
.vscode/
7-
# Exclude local Maven repo used to incorporate downloaded MQ client JAR into the connector
8-
local-maven-repo/
9-
# Exclude MQ client JAR in the top level of the work tree
10-
allclient*.jar
6+
.vscode/

README.md

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ The connector is supplied as source code which you can easily build into a JAR f
88
To build the connector, you must have the following installed:
99
* [git](https://git-scm.com/)
1010
* [Maven](https://maven.apache.org)
11-
* Java 7 or later
11+
* Java 8 or later
1212

1313
Clone the repository with the following command:
1414
```shell
@@ -25,15 +25,15 @@ Build the connector using Maven:
2525
mvn clean package
2626
```
2727

28-
Once built, the output is a single JAR called `target/kafka-connect-mq-source-0.6-SNAPSHOT-jar-with-dependencies.jar` which contains all of the required dependencies.
28+
Once built, the output is a single JAR called `target/kafka-connect-mq-source-1.0-SNAPSHOT-jar-with-dependencies.jar` which contains all of the required dependencies.
2929

3030

3131
## Running the connector
3232
To run the connector, you must have:
3333
* The JAR from building the connector
3434
* A properties file containing the configuration for the connector
35-
* Apache Kafka
36-
* IBM MQ v7.5 or later
35+
* Apache Kafka, either standalone or included as part of an offering such as IBM Event Streams
36+
* IBM MQ v8 or later, or the IBM MQ on Cloud service
3737

3838
The connector can be run in a Kafka Connect worker in either standalone (single process) or distributed mode. It's a good idea to start in standalone mode.
3939

@@ -139,6 +139,14 @@ You will need to put the public part of the client's certificate in the queue ma
139139
For troubleshooting, or to better understand the handshake performed by the IBM MQ Java client application in combination with your specific JSSE provider, you can enable debugging by setting `javax.net.debug=ssl` in the JVM environment.
140140

141141

142+
## Performance and syncpoint limit
143+
The connector uses a transacted JMS session to receive messages from MQ in syncpoint and periodically commits the in-flight transaction. This has the effect of batching messages together for improved efficiency. However, the frequency of committing transactions is controlled by the Kafka Connect framework rather than the connector. The connector is only able to receive up to the queue manager's maximum uncommitted message limit (typically 10000 messages) before committing.
144+
145+
By default, Kafka Connect only commits every 60 seconds (10 seconds for the standalone worker), meaning that each task is limited to a rate of about 166 messages per second. You can increase the frequency of committing by using the `offset.flush.interval.ms` configuration in the worker configuration file. For example, if you set `offset.flush.interval.ms=5000`, the connector commits every 5 seconds increasing the maximum rate per task to about 2000 messages per second.
146+
147+
If messages are being received faster than they can be committed, the connector prints a message `Uncommitted message limit reached` and sleeps for a short delay. You should use this as an indication to set the `offset.flush.interval.ms` to a lower value, or increase the number of tasks.
148+
149+
142150
## Configuration
143151
The configuration options for the MQ Source Connector are as follows:
144152

@@ -160,12 +168,14 @@ The configuration options for the MQ Source Connector are as follows:
160168

161169
## Future enhancements
162170
The connector is intentionally basic. The idea is to enhance it over time with additional features to make it more capable. Some possible future enhancements are:
163-
* Configurable schema for MQ messages
164-
* Simplification of handling message formats
165171
* JMX metrics
166172
* Separate TLS configuration for the connector so that keystore location and so on can be specified as configurations
167173

168174

175+
## Support
176+
A commercially supported version of this connector is available for customers with a support entitlement for [IBM Event Streams](https://developer.ibm.com/messaging/event-streams/).
177+
178+
169179
## Issues and contributions
170180
For issues relating specifically to this connector, please use the [GitHub issue tracker](https://github.com/ibm-messaging/kafka-connect-mq-source/issues). If you do submit a Pull Request related to this connector, please indicate in the Pull Request that you accept and agree to be bound by the terms of the [IBM Contributor License Agreement](CLA.md).
171181

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<groupId>com.ibm.mq.kafkaconnect</groupId>
2121
<artifactId>kafka-connect-mq-source</artifactId>
2222
<packaging>jar</packaging>
23-
<version>0.6-SNAPSHOT</version>
23+
<version>1.0-SNAPSHOT</version>
2424
<name>kafka-connect-mq-source</name>
2525
<organization>
2626
<name>IBM Corporation</name>
@@ -45,13 +45,13 @@
4545
<dependency>
4646
<groupId>org.apache.kafka</groupId>
4747
<artifactId>connect-api</artifactId>
48-
<version>0.11.0.0</version>
48+
<version>1.1.0</version>
4949
<scope>provided</scope>
5050
</dependency>
5151
<dependency>
5252
<groupId>org.apache.kafka</groupId>
5353
<artifactId>connect-json</artifactId>
54-
<version>0.11.0.0</version>
54+
<version>1.1.0</version>
5555
<scope>provided</scope>
5656
</dependency>
5757

src/main/java/com/ibm/mq/kafkaconnect/JMSReader.java

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.slf4j.Logger;
3838
import org.slf4j.LoggerFactory;
3939

40-
4140
/**
4241
* Reads messages from MQ using JMS. Uses a transacted session, adding messages to the current
4342
* transaction until told to commit. Automatically reconnects as needed.
@@ -76,6 +75,8 @@ public JMSReader() {}
7675
* @throws ConnectException Operation failed and connector should stop.
7776
*/
7877
public void configure(Map<String, String> props) {
78+
log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(), props);
79+
7980
String queueManager = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER);
8081
String connectionNameList = props.get(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_NAME_LIST);
8182
String channelName = props.get(MQSourceConnector.CONFIG_NAME_MQ_CHANNEL_NAME);
@@ -94,6 +95,8 @@ public void configure(Map<String, String> props) {
9495
mqConnFactory.setQueueManager(queueManager);
9596
mqConnFactory.setConnectionNameList(connectionNameList);
9697
mqConnFactory.setChannel(channelName);
98+
mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
99+
97100
queue = new MQQueue(queueName);
98101

99102
this.userName = userName;
@@ -119,7 +122,7 @@ public void configure(Map<String, String> props) {
119122
this.topic = topic;
120123
}
121124
catch (JMSException | JMSRuntimeException jmse) {
122-
log.debug("JMS exception {}", jmse);
125+
log.error("JMS exception {}", jmse);
123126
throw new ConnectException(jmse);
124127
}
125128

@@ -129,15 +132,19 @@ public void configure(Map<String, String> props) {
129132
builder.configure(props);
130133
}
131134
catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NullPointerException exc) {
132-
log.debug("Could not instantiate message builder {}", builderClass);
135+
log.error("Could not instantiate message builder {}", builderClass);
133136
throw new ConnectException("Could not instantiate message builder", exc);
134137
}
138+
139+
log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName());
135140
}
136141

137142
/**
138143
* Connects to MQ.
139144
*/
140145
public void connect() {
146+
log.trace("[{}] Entry {}.connect", Thread.currentThread().getId(), this.getClass().getName());
147+
141148
try {
142149
if (userName != null) {
143150
jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED);
@@ -153,9 +160,11 @@ public void connect() {
153160
}
154161
catch (JMSRuntimeException jmse) {
155162
log.info("Connection to MQ could not be established");
156-
log.debug("JMS exception {}", jmse);
163+
log.error("JMS exception {}", jmse);
157164
handleException(jmse);
158165
}
166+
167+
log.trace("[{}] Exit {}.connect", Thread.currentThread().getId(), this.getClass().getName());
159168
}
160169

161170
/**
@@ -166,7 +175,10 @@ public void connect() {
166175
* @return The SourceRecord representing the message
167176
*/
168177
public SourceRecord receive(boolean wait) {
178+
log.trace("[{}] Entry {}.receive", Thread.currentThread().getId(), this.getClass().getName());
179+
169180
if (!connectInternal()) {
181+
log.trace("[{}] Exit {}.receive, retval=null", Thread.currentThread().getId(), this.getClass().getName());
170182
return null;
171183
}
172184

@@ -176,12 +188,12 @@ public SourceRecord receive(boolean wait) {
176188
if (wait) {
177189
while ((m == null) && !closeNow.get())
178190
{
179-
log.trace("Waiting {} ms for message", RECEIVE_TIMEOUT);
191+
log.debug("Waiting {} ms for message", RECEIVE_TIMEOUT);
180192
m = jmsCons.receive(RECEIVE_TIMEOUT);
181193
}
182194

183195
if (m == null) {
184-
log.trace("No message received");
196+
log.debug("No message received");
185197
}
186198
}
187199
else {
@@ -201,10 +213,11 @@ public SourceRecord receive(boolean wait) {
201213
}
202214
}
203215
catch (JMSException | JMSRuntimeException | ConnectException exc) {
204-
log.debug("JMS exception {}", exc);
216+
log.error("JMS exception {}", exc);
205217
handleException(exc);
206218
}
207219

220+
log.trace("[{}] Exit {}.receive, retval={}", Thread.currentThread().getId(), this.getClass().getName(), sr);
208221
return sr;
209222
}
210223

@@ -213,6 +226,8 @@ public SourceRecord receive(boolean wait) {
213226
* be processed, the transaction is "in peril" and is rolled back instead to avoid data loss.
214227
*/
215228
public void commit() {
229+
log.trace("[{}] Entry {}.commit", Thread.currentThread().getId(), this.getClass().getName());
230+
216231
if (!connectInternal()) {
217232
return;
218233
}
@@ -223,7 +238,7 @@ public void commit() {
223238

224239
if (inperil) {
225240
inperil = false;
226-
log.trace("Rolling back in-flight transaction");
241+
log.debug("Rolling back in-flight transaction");
227242
jmsCtxt.rollback();
228243
}
229244
else {
@@ -232,15 +247,19 @@ public void commit() {
232247
}
233248
}
234249
catch (JMSRuntimeException jmse) {
235-
log.debug("JMS exception {}", jmse);
250+
log.error("JMS exception {}", jmse);
236251
handleException(jmse);
237252
}
253+
254+
log.trace("[{}] Exit {}.commit", Thread.currentThread().getId(), this.getClass().getName());
238255
}
239256

240257
/**
241258
* Closes the connection.
242259
*/
243260
public void close() {
261+
log.trace("[{}] Entry {}.close", Thread.currentThread().getId(), this.getClass().getName());
262+
244263
try {
245264
JMSContext ctxt = jmsCtxt;
246265
closeNow.set(true);
@@ -249,6 +268,8 @@ public void close() {
249268
catch (JMSRuntimeException jmse) {
250269
;
251270
}
271+
272+
log.trace("[{}] Exit {}.close", Thread.currentThread().getId(), this.getClass().getName());
252273
}
253274

254275
/**
@@ -262,9 +283,11 @@ private boolean connectInternal() {
262283
}
263284

264285
if (closeNow.get()) {
286+
log.debug("Closing connection now");
265287
return false;
266288
}
267289

290+
log.trace("[{}] Entry {}.connectInternal", Thread.currentThread().getId(), this.getClass().getName());
268291
try {
269292
if (userName != null) {
270293
jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED);
@@ -279,18 +302,22 @@ private boolean connectInternal() {
279302
log.info("Connection to MQ established");
280303
}
281304
catch (JMSRuntimeException jmse) {
282-
log.debug("JMS exception {}", jmse);
305+
log.error("JMS exception {}", jmse);
283306
handleException(jmse);
307+
log.trace("[{}] Exit {}.connectInternal, retval=false", Thread.currentThread().getId(), this.getClass().getName());
284308
return false;
285309
}
286310

311+
log.trace("[{}] Exit {}.connectInternal, retval=true", Thread.currentThread().getId(), this.getClass().getName());
287312
return true;
288313
}
289314

290315
/**
291316
* Internal method to close the connection.
292317
*/
293318
private void closeInternal() {
319+
log.trace("[{}] Entry {}.closeInternal", Thread.currentThread().getId(), this.getClass().getName());
320+
294321
try {
295322
inflight = false;
296323
inperil = false;
@@ -308,6 +335,8 @@ private void closeInternal() {
308335
jmsCtxt = null;
309336
log.debug("Connection to MQ closed");
310337
}
338+
339+
log.trace("[{}] Exit {}.closeInternal", Thread.currentThread().getId(), this.getClass().getName());
311340
}
312341

313342
/**

src/main/java/com/ibm/mq/kafkaconnect/MQSourceConnector.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public class MQSourceConnector extends SourceConnector {
8686
public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic.";
8787
public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic";
8888

89-
public static String VERSION = "0.6";
89+
public static String VERSION = "1.0";
9090

9191
private Map<String, String> configProps;
9292

@@ -106,10 +106,14 @@ public class MQSourceConnector extends SourceConnector {
106106
* @param props configuration settings
107107
*/
108108
@Override public void start(Map<String, String> props) {
109+
log.trace("[{}] Entry {}.start, props={}", Thread.currentThread().getId(), this.getClass().getName(), props);
110+
109111
configProps = props;
110112
for (final Entry<String, String> entry: props.entrySet()) {
111-
log.trace("Connector props entry {} : {}", entry.getKey(), entry.getValue());
113+
log.debug("Connector props entry {} : {}", entry.getKey(), entry.getValue());
112114
}
115+
116+
log.trace("[{}] Exit {}.start", Thread.currentThread().getId(), this.getClass().getName());
113117
}
114118

115119
/**
@@ -127,19 +131,24 @@ public class MQSourceConnector extends SourceConnector {
127131
* @return configurations for Tasks
128132
*/
129133
@Override public List<Map<String, String>> taskConfigs(int maxTasks) {
134+
log.trace("[{}] Entry {}.taskConfigs, maxTasks={}", Thread.currentThread().getId(), this.getClass().getName(), maxTasks);
135+
130136
List<Map<String, String>> taskConfigs = new ArrayList<>();
131137
for (int i = 0; i < maxTasks; i++)
132138
{
133139
taskConfigs.add(configProps);
134140
}
141+
142+
log.trace("[{}] Exit {}.taskConfigs, retval={}", Thread.currentThread().getId(), this.getClass().getName(), taskConfigs);
135143
return taskConfigs;
136144
}
137145

138146
/**
139147
* Stop this connector.
140148
*/
141149
@Override public void stop() {
142-
150+
log.trace("[{}] Entry {}.stop", Thread.currentThread().getId(), this.getClass().getName());
151+
log.trace("[{}] Exit {}.stop", Thread.currentThread().getId(), this.getClass().getName());
143152
}
144153

145154
/**

0 commit comments

Comments
 (0)