1
1
/**
2
- * Copyright 2017, 2018 IBM Corporation
2
+ * Copyright 2017, 2018, 2019 IBM Corporation
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
19
19
import java .util .List ;
20
20
import java .util .Map ;
21
21
import java .util .Map .Entry ;
22
+ import java .util .concurrent .CountDownLatch ;
23
+ import java .util .concurrent .atomic .AtomicBoolean ;
22
24
import java .util .concurrent .atomic .AtomicInteger ;
23
25
24
26
import org .apache .kafka .connect .source .SourceRecord ;
30
32
public class MQSourceTask extends SourceTask {
31
33
private static final Logger log = LoggerFactory .getLogger (MQSourceTask .class );
32
34
33
- private static int BATCH_SIZE = 100 ;
34
- private static int MAX_UNCOMMITTED_MSGS = 10000 ;
35
- private static int MAX_UNCOMMITTED_MSGS_DELAY_MS = 500 ;
35
+ private static int BATCH_SIZE = 250 ; // The maximum number of records returned per call to poll()
36
+ private CountDownLatch batchCompleteSignal = null ; // Used to signal completion of a batch
37
+ private AtomicInteger pollCycle = new AtomicInteger (1 ); // Incremented each time poll() is called
38
+ private int lastCommitPollCycle = 0 ; // The value of pollCycle the last time commit() was called
39
+ private AtomicBoolean stopNow = new AtomicBoolean (); // Whether stop has been requested
36
40
37
41
private JMSReader reader ;
38
- private AtomicInteger uncommittedMessages = new AtomicInteger (0 );
39
42
40
43
public MQSourceTask () {
41
44
}
@@ -87,35 +90,51 @@ public MQSourceTask() {
87
90
88
91
final List <SourceRecord > msgs = new ArrayList <>();
89
92
int messageCount = 0 ;
90
- int uncommittedMessagesInt = this .uncommittedMessages .get ();
91
93
92
- if (uncommittedMessagesInt < MAX_UNCOMMITTED_MSGS ) {
93
- log .info ("Polling for records" );
94
+ // Resolve any in-flight transaction, committing unless there has been an error between
95
+ // receiving the message from MQ and converting it
96
+ if (batchCompleteSignal != null ) {
97
+ log .debug ("Awaiting batch completion signal" );
98
+ batchCompleteSignal .await ();
99
+
100
+ log .debug ("Committing records" );
101
+ reader .commit ();
102
+ }
103
+
104
+ // Increment the counter for the number of times poll is called so we can ensure we don't get stuck waiting for
105
+ // commitRecord callbacks to trigger the batch complete signal
106
+ int currentPollCycle = pollCycle .incrementAndGet ();
107
+ log .debug ("Starting poll cycle {}" , currentPollCycle );
94
108
109
+ if (!stopNow .get ()) {
110
+ log .info ("Polling for records" );
95
111
SourceRecord src ;
96
112
do {
97
113
// For the first message in the batch, wait a while if no message
98
114
src = reader .receive (messageCount == 0 );
99
115
if (src != null ) {
100
116
msgs .add (src );
101
117
messageCount ++;
102
- uncommittedMessagesInt = this .uncommittedMessages .incrementAndGet ();
103
118
}
104
- } while ((src != null ) && (messageCount < BATCH_SIZE ) && (uncommittedMessagesInt < MAX_UNCOMMITTED_MSGS ));
105
-
106
- log .debug ("Poll returning {} records" , messageCount );
119
+ } while ((src != null ) && (messageCount < BATCH_SIZE ) && !stopNow .get ());
107
120
}
108
- else {
109
- log .info ("Uncommitted message limit reached" );
110
- Thread .sleep (MAX_UNCOMMITTED_MSGS_DELAY_MS );
121
+
122
+ synchronized (this ) {
123
+ if (messageCount > 0 ) {
124
+ batchCompleteSignal = new CountDownLatch (messageCount );
125
+ }
126
+ else {
127
+ batchCompleteSignal = null ;
128
+ }
111
129
}
112
130
131
+ log .debug ("Poll returning {} records" , messageCount );
132
+
113
133
log .trace ("[{}] Exit {}.poll, retval={}" , Thread .currentThread ().getId (), this .getClass ().getName (), messageCount );
114
134
return msgs ;
115
135
}
116
136
117
-
118
- /**
137
+ /**
119
138
* <p>
120
139
* Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This
121
140
* method should block until the commit is complete.
@@ -129,9 +148,49 @@ public MQSourceTask() {
129
148
public void commit () throws InterruptedException {
130
149
log .trace ("[{}] Entry {}.commit" , Thread .currentThread ().getId (), this .getClass ().getName ());
131
150
132
- log .debug ("Committing records" );
133
- reader .commit ();
134
- this .uncommittedMessages .set (0 );
151
+ // This callback is simply used to ensure that the mechanism to use commitRecord callbacks
152
+ // to check that all messages in a batch are complete is not getting stuck. If this callback
153
+ // is being called, it means that Kafka Connect believes that all outstanding messages have
154
+ // been completed. That should mean that commitRecord has been called for all of them too.
155
+ // However, if too few calls to commitRecord are received, the connector could wait indefinitely.
156
+ // If this commit callback is called twice without the poll cycle increasing, trigger the
157
+ // batch complete signal directly.
158
+ int currentPollCycle = pollCycle .get ();
159
+ log .debug ("Commit starting in poll cycle {}" , currentPollCycle );
160
+ boolean willShutdown = false ;
161
+
162
+ if (lastCommitPollCycle == currentPollCycle )
163
+ {
164
+ synchronized (this ) {
165
+ if (batchCompleteSignal != null ) {
166
+ log .debug ("Bumping batch complete signal by {}" , batchCompleteSignal .getCount ());
167
+
168
+ // This means we're waiting for the signal in the poll() method and it's been
169
+ // waiting for at least two calls to this commit callback. It's stuck.
170
+ while (batchCompleteSignal .getCount () > 0 ) {
171
+ batchCompleteSignal .countDown ();
172
+ }
173
+ }
174
+ else if (stopNow .get ()) {
175
+ log .debug ("Shutting down with empty batch after delay" );
176
+ willShutdown = true ;
177
+ }
178
+ }
179
+ }
180
+ else {
181
+ lastCommitPollCycle = currentPollCycle ;
182
+
183
+ synchronized (this ) {
184
+ if ((batchCompleteSignal == null ) && stopNow .get ()) {
185
+ log .debug ("Shutting down with empty batch" );
186
+ willShutdown = true ;
187
+ }
188
+ }
189
+ }
190
+
191
+ if (willShutdown ) {
192
+ shutdown ();
193
+ }
135
194
136
195
log .trace ("[{}] Exit {}.commit" , Thread .currentThread ().getId (), this .getClass ().getName ());
137
196
}
@@ -149,10 +208,59 @@ public void commit() throws InterruptedException {
149
208
@ Override public void stop () {
150
209
log .trace ("[{}] Entry {}.stop" , Thread .currentThread ().getId (), this .getClass ().getName ());
151
210
211
+ stopNow .set (true );
212
+
213
+ boolean willShutdown = false ;
214
+
215
+ synchronized (this ) {
216
+ if (batchCompleteSignal == null ) {
217
+ willShutdown = true ;
218
+ }
219
+ }
220
+
221
+ if (willShutdown ) {
222
+ shutdown ();
223
+ }
224
+
225
+ log .trace ("[{}] Exit {}.stop" , Thread .currentThread ().getId (), this .getClass ().getName ());
226
+ }
227
+
228
+ /**
229
+ * <p>
230
+ * Commit an individual {@link SourceRecord} when the callback from the producer client is received, or if a record is filtered by a transformation.
231
+ * </p>
232
+ * <p>
233
+ * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
234
+ * automatically. This hook is provided for systems that also need to store offsets internally
235
+ * in their own system.
236
+ * </p>
237
+ *
238
+ * @param record {@link SourceRecord} that was successfully sent via the producer.
239
+ * @throws InterruptedException
240
+ */
241
+ @ Override public void commitRecord (SourceRecord record ) throws InterruptedException {
242
+ log .trace ("[{}] Entry {}.commitRecord, record={}" , Thread .currentThread ().getId (), this .getClass ().getName (), record );
243
+
244
+ synchronized (this ) {
245
+ batchCompleteSignal .countDown ();
246
+ }
247
+
248
+ log .trace ("[{}] Exit {}.commitRecord" , Thread .currentThread ().getId (), this .getClass ().getName ());
249
+ }
250
+
251
+ /**
252
+ * <p>
253
+ * Shuts down the task, releasing any resource held by the task.
254
+ * </p>
255
+ */
256
+ private void shutdown () {
257
+ log .trace ("[{}] Entry {}.shutdown" , Thread .currentThread ().getId (), this .getClass ().getName ());
258
+
259
+ // Close the connection to MQ to clean up
152
260
if (reader != null ) {
153
261
reader .close ();
154
262
}
155
263
156
- log .trace ("[{}] Exit {}.stop " , Thread .currentThread ().getId (), this .getClass ().getName ());
264
+ log .trace ("[{}] Exit {}.shutdown " , Thread .currentThread ().getId (), this .getClass ().getName ());
157
265
}
158
266
}
0 commit comments