Skip to content

Commit a63bcee

Browse files
Beta release of 1.0.2
1 parent e14f8d9 commit a63bcee

File tree

4 files changed

+137
-49
lines changed

4 files changed

+137
-49
lines changed

pom.xml

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<!--
3-
* Copyright 2017, 2018 IBM Corporation
3+
* Copyright 2017, 2018, 2019 IBM Corporation
44
*
55
* Licensed under the Apache License, Version 2.0 (the "License");
66
* you may not use this file except in compliance with the License.
@@ -20,7 +20,7 @@
2020
<groupId>com.ibm.eventstreams.connect</groupId>
2121
<artifactId>kafka-connect-mq-source</artifactId>
2222
<packaging>jar</packaging>
23-
<version>1.0.1</version>
23+
<version>1.0.2-beta</version>
2424
<name>kafka-connect-mq-source</name>
2525
<organization>
2626
<name>IBM Corporation</name>
@@ -63,7 +63,7 @@
6363
<dependency>
6464
<groupId>com.ibm.mq</groupId>
6565
<artifactId>com.ibm.mq.allclient</artifactId>
66-
<version>9.1.0.0</version>
66+
<version>9.1.1.0</version>
6767
</dependency>
6868

6969
<dependency>
@@ -78,18 +78,6 @@
7878
<version>1.7.25</version>
7979
<scope>test</scope>
8080
</dependency>
81-
82-
<dependency>
83-
<groupId>org.bouncycastle</groupId>
84-
<artifactId>bcprov-jdk15on</artifactId>
85-
<version>1.60</version>
86-
</dependency>
87-
<dependency>
88-
<groupId>org.bouncycastle</groupId>
89-
<artifactId>bcpkix-jdk15on</artifactId>
90-
<version>1.60</version>
91-
</dependency>
92-
9381
</dependencies>
9482

9583
<build>

src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2017, 2018 IBM Corporation
2+
* Copyright 2017, 2018, 2019 IBM Corporation
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -302,16 +302,8 @@ public void commit() {
302302
public void close() {
303303
log.trace("[{}] Entry {}.close", Thread.currentThread().getId(), this.getClass().getName());
304304

305-
try {
306-
JMSContext ctxt = jmsCtxt;
307-
closeNow.set(true);
308-
if (ctxt != null) {
309-
ctxt.close();
310-
}
311-
}
312-
catch (JMSRuntimeException jmse) {
313-
;
314-
}
305+
closeNow.set(true);
306+
closeInternal();
315307

316308
log.trace("[{}] Exit {}.close", Thread.currentThread().getId(), this.getClass().getName());
317309
}

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2017, 2018 IBM Corporation
2+
* Copyright 2017, 2018, 2019 IBM Corporation
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -96,7 +96,7 @@ public class MQSourceConnector extends SourceConnector {
9696
public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic.";
9797
public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic";
9898

99-
public static String VERSION = "1.0.1";
99+
public static String VERSION = "1.0.2-beta";
100100

101101
private Map<String, String> configProps;
102102

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java

Lines changed: 129 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2017, 2018 IBM Corporation
2+
* Copyright 2017, 2018, 2019 IBM Corporation
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,8 @@
1919
import java.util.List;
2020
import java.util.Map;
2121
import java.util.Map.Entry;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.atomic.AtomicBoolean;
2224
import java.util.concurrent.atomic.AtomicInteger;
2325

2426
import org.apache.kafka.connect.source.SourceRecord;
@@ -30,12 +32,13 @@
3032
public class MQSourceTask extends SourceTask {
3133
private static final Logger log = LoggerFactory.getLogger(MQSourceTask.class);
3234

33-
private static int BATCH_SIZE = 100;
34-
private static int MAX_UNCOMMITTED_MSGS = 10000;
35-
private static int MAX_UNCOMMITTED_MSGS_DELAY_MS = 500;
35+
private static int BATCH_SIZE = 250; // The maximum number of records returned per call to poll()
36+
private CountDownLatch batchCompleteSignal = null; // Used to signal completion of a batch
37+
private AtomicInteger pollCycle = new AtomicInteger(1); // Incremented each time poll() is called
38+
private int lastCommitPollCycle = 0; // The value of pollCycle the last time commit() was called
39+
private AtomicBoolean stopNow = new AtomicBoolean(); // Whether stop has been requested
3640

3741
private JMSReader reader;
38-
private AtomicInteger uncommittedMessages = new AtomicInteger(0);
3942

4043
public MQSourceTask() {
4144
}
@@ -87,35 +90,51 @@ public MQSourceTask() {
8790

8891
final List<SourceRecord> msgs = new ArrayList<>();
8992
int messageCount = 0;
90-
int uncommittedMessagesInt = this.uncommittedMessages.get();
9193

92-
if (uncommittedMessagesInt < MAX_UNCOMMITTED_MSGS) {
93-
log.info("Polling for records");
94+
// Resolve any in-flight transaction, committing unless there has been an error between
95+
// receiving the message from MQ and converting it
96+
if (batchCompleteSignal != null) {
97+
log.debug("Awaiting batch completion signal");
98+
batchCompleteSignal.await();
99+
100+
log.debug("Committing records");
101+
reader.commit();
102+
}
103+
104+
// Increment the counter for the number of times poll is called so we can ensure we don't get stuck waiting for
105+
// commitRecord callbacks to trigger the batch complete signal
106+
int currentPollCycle = pollCycle.incrementAndGet();
107+
log.debug("Starting poll cycle {}", currentPollCycle);
94108

109+
if (!stopNow.get()) {
110+
log.info("Polling for records");
95111
SourceRecord src;
96112
do {
97113
// For the first message in the batch, wait a while if no message
98114
src = reader.receive(messageCount == 0);
99115
if (src != null) {
100116
msgs.add(src);
101117
messageCount++;
102-
uncommittedMessagesInt = this.uncommittedMessages.incrementAndGet();
103118
}
104-
} while ((src != null) && (messageCount < BATCH_SIZE) && (uncommittedMessagesInt < MAX_UNCOMMITTED_MSGS));
105-
106-
log.debug("Poll returning {} records", messageCount);
119+
} while ((src != null) && (messageCount < BATCH_SIZE) && !stopNow.get());
107120
}
108-
else {
109-
log.info("Uncommitted message limit reached");
110-
Thread.sleep(MAX_UNCOMMITTED_MSGS_DELAY_MS);
121+
122+
synchronized(this) {
123+
if (messageCount > 0) {
124+
batchCompleteSignal = new CountDownLatch(messageCount);
125+
}
126+
else {
127+
batchCompleteSignal = null;
128+
}
111129
}
112130

131+
log.debug("Poll returning {} records", messageCount);
132+
113133
log.trace("[{}] Exit {}.poll, retval={}", Thread.currentThread().getId(), this.getClass().getName(), messageCount);
114134
return msgs;
115135
}
116136

117-
118-
/**
137+
/**
119138
* <p>
120139
* Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This
121140
* method should block until the commit is complete.
@@ -129,9 +148,49 @@ public MQSourceTask() {
129148
public void commit() throws InterruptedException {
130149
log.trace("[{}] Entry {}.commit", Thread.currentThread().getId(), this.getClass().getName());
131150

132-
log.debug("Committing records");
133-
reader.commit();
134-
this.uncommittedMessages.set(0);
151+
// This callback is simply used to ensure that the mechanism to use commitRecord callbacks
152+
// to check that all messages in a batch are complete is not getting stuck. If this callback
153+
// is being called, it means that Kafka Connect believes that all outstanding messages have
154+
// been completed. That should mean that commitRecord has been called for all of them too.
155+
// However, if too few calls to commitRecord are received, the connector could wait indefinitely.
156+
// If this commit callback is called twice without the poll cycle increasing, trigger the
157+
// batch complete signal directly.
158+
int currentPollCycle = pollCycle.get();
159+
log.debug("Commit starting in poll cycle {}", currentPollCycle);
160+
boolean willShutdown = false;
161+
162+
if (lastCommitPollCycle == currentPollCycle)
163+
{
164+
synchronized (this) {
165+
if (batchCompleteSignal != null) {
166+
log.debug("Bumping batch complete signal by {}", batchCompleteSignal.getCount());
167+
168+
// This means we're waiting for the signal in the poll() method and it's been
169+
// waiting for at least two calls to this commit callback. It's stuck.
170+
while (batchCompleteSignal.getCount() > 0) {
171+
batchCompleteSignal.countDown();
172+
}
173+
}
174+
else if (stopNow.get()) {
175+
log.debug("Shutting down with empty batch after delay");
176+
willShutdown = true;
177+
}
178+
}
179+
}
180+
else {
181+
lastCommitPollCycle = currentPollCycle;
182+
183+
synchronized (this) {
184+
if ((batchCompleteSignal == null) && stopNow.get()) {
185+
log.debug("Shutting down with empty batch");
186+
willShutdown = true;
187+
}
188+
}
189+
}
190+
191+
if (willShutdown) {
192+
shutdown();
193+
}
135194

136195
log.trace("[{}] Exit {}.commit", Thread.currentThread().getId(), this.getClass().getName());
137196
}
@@ -149,10 +208,59 @@ public void commit() throws InterruptedException {
149208
@Override public void stop() {
150209
log.trace("[{}] Entry {}.stop", Thread.currentThread().getId(), this.getClass().getName());
151210

211+
stopNow.set(true);
212+
213+
boolean willShutdown = false;
214+
215+
synchronized(this) {
216+
if (batchCompleteSignal == null) {
217+
willShutdown = true;
218+
}
219+
}
220+
221+
if (willShutdown) {
222+
shutdown();
223+
}
224+
225+
log.trace("[{}] Exit {}.stop", Thread.currentThread().getId(), this.getClass().getName());
226+
}
227+
228+
/**
229+
* <p>
230+
* Commit an individual {@link SourceRecord} when the callback from the producer client is received, or if a record is filtered by a transformation.
231+
* </p>
232+
* <p>
233+
* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
234+
* automatically. This hook is provided for systems that also need to store offsets internally
235+
* in their own system.
236+
* </p>
237+
*
238+
* @param record {@link SourceRecord} that was successfully sent via the producer.
239+
* @throws InterruptedException
240+
*/
241+
@Override public void commitRecord(SourceRecord record) throws InterruptedException {
242+
log.trace("[{}] Entry {}.commitRecord, record={}", Thread.currentThread().getId(), this.getClass().getName(), record);
243+
244+
synchronized (this) {
245+
batchCompleteSignal.countDown();
246+
}
247+
248+
log.trace("[{}] Exit {}.commitRecord", Thread.currentThread().getId(), this.getClass().getName());
249+
}
250+
251+
/**
252+
* <p>
253+
* Shuts down the task, releasing any resource held by the task.
254+
* </p>
255+
*/
256+
private void shutdown() {
257+
log.trace("[{}] Entry {}.shutdown", Thread.currentThread().getId(), this.getClass().getName());
258+
259+
// Close the connection to MQ to clean up
152260
if (reader != null) {
153261
reader.close();
154262
}
155263

156-
log.trace("[{}] Exit {}.stop", Thread.currentThread().getId(), this.getClass().getName());
264+
log.trace("[{}] Exit {}.shutdown", Thread.currentThread().getId(), this.getClass().getName());
157265
}
158266
}

0 commit comments

Comments
 (0)