Skip to content

Commit def7c02

Browse files
committed
feat: added new config mq.receive.max.poll.time.ms
- Maximum time (in milliseconds) to spend polling messages before returning a batch to Kafka. Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent 25c102f commit def7c02

File tree

3 files changed

+109
-39
lines changed

3 files changed

+109
-39
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1299,4 +1299,46 @@ public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Ex
12991299
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
13001300
assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic");
13011301
}
1302+
1303+
public void testMaxPollTimeTerminatesBatchEarly() throws Exception {
1304+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1305+
1306+
final Map<String, String> connectorConfigProps = createExactlyOnceConnectorProperties();
1307+
connectorConfigProps.put("mq.message.body.jms", "true");
1308+
connectorConfigProps.put("mq.record.builder",
1309+
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
1310+
connectorConfigProps.put("mq.message.receive.timeout", "150");
1311+
connectorConfigProps.put("mq.receive.subsequent.timeout.ms", "10");
1312+
connectorConfigProps.put("mq.receive.max.poll.time.ms", "200"); // stop after 200ms
1313+
connectorConfigProps.put("mq.batch.size", "5000");
1314+
1315+
final JMSWorker shared = new JMSWorker();
1316+
shared.configure(getPropertiesConfig(connectorConfigProps));
1317+
final JMSWorker dedicated = new JMSWorker();
1318+
dedicated.configure(getPropertiesConfig(connectorConfigProps));
1319+
final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
1320+
1321+
connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);
1322+
1323+
List<Message> messages = createAListOfMessages(getJmsContext(), 10, "msg ");
1324+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
1325+
messages = createAListOfMessages(getJmsContext(), 10, "msg ");
1326+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
1327+
messages = createAListOfMessages(getJmsContext(), 10, "msg ");
1328+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
1329+
1330+
final long start = System.nanoTime();
1331+
final List<SourceRecord> kafkaMessages = connectTask.poll();
1332+
final long durationMs = (System.nanoTime() - start) / 1_000_000;
1333+
1334+
System.out.println(durationMs);
1335+
// Poll should end close to 200ms
1336+
assertThat(durationMs >= 180 && durationMs <= 500).isTrue();
1337+
1338+
final List<Message> stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE);
1339+
assertThat(stateMsgs1.size()).isEqualTo(1);
1340+
final List<Message> sourceMsgs = getAllMessagesFromQueue(DEFAULT_SOURCE_QUEUE);
1341+
assertThat(sourceMsgs.size()).isEqualTo(0);
1342+
assertEquals(30, kafkaMessages.size());
1343+
}
13021344
}

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,12 @@ public class MQSourceConnector extends SourceConnector {
187187
public static final long CONFIG_RECONNECT_DELAY_MAX_DEFAULT = 8192L;
188188
public static final long CONFIG_RECONNECT_DELAY_MAX_MINIMUM = 10L;
189189

190+
public static final String CONFIG_MAX_POLL_TIME = "mq.receive.max.poll.time.ms";
191+
public static final String CONFIG_DOCUMENTATION_MAX_POLL_TIME = "Maximum time (in milliseconds) to spend polling messages before returning a batch to Kafka. "
192+
+ "If not set or set to 0, polling continues until batch size or receive timeout conditions are met.";
193+
public static final String CONFIG_DISPLAY_MAX_POLL_TIME = "Max poll time (ms)";
194+
public static final long CONFIG_MAX_POLL_TIME_DEFAULT = 0L;
195+
190196
public static final String DLQ_PREFIX = "errors.deadletterqueue.";
191197

192198
public static final String DLQ_TOPIC_NAME_CONFIG = DLQ_PREFIX + "topic.name";
@@ -648,6 +654,15 @@ null, new ReadableFile(),
648654
CONFIG_GROUP_MQ, 31,
649655
Width.MEDIUM,
650656
DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
657+
CONFIGDEF.define(CONFIG_MAX_POLL_TIME,
658+
ConfigDef.Type.LONG,
659+
CONFIG_MAX_POLL_TIME_DEFAULT,
660+
ConfigDef.Importance.LOW,
661+
CONFIG_DOCUMENTATION_MAX_POLL_TIME,
662+
CONFIG_GROUP_MQ,
663+
32,
664+
ConfigDef.Width.MEDIUM,
665+
CONFIG_DISPLAY_MAX_POLL_TIME);
651666

652667
CONFIGDEF.define(CONFIG_NAME_TOPIC,
653668
Type.STRING,

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java

Lines changed: 52 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,18 @@
1515
*/
1616
package com.ibm.eventstreams.connect.mqsource;
1717

18-
import com.ibm.eventstreams.connect.mqsource.builders.RecordBuilderException;
19-
import com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceState;
20-
import com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceStateClient;
21-
import com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceStateException;
22-
import com.ibm.eventstreams.connect.mqsource.util.LogMessages;
23-
import com.ibm.eventstreams.connect.mqsource.util.ExceptionProcessor;
24-
import com.ibm.eventstreams.connect.mqsource.util.QueueConfig;
25-
import org.apache.kafka.common.config.AbstractConfig;
26-
import org.apache.kafka.connect.errors.ConnectException;
27-
import org.apache.kafka.connect.source.SourceRecord;
28-
import org.apache.kafka.connect.source.SourceTask;
29-
import org.slf4j.Logger;
30-
import org.slf4j.LoggerFactory;
31-
32-
import javax.jms.JMSException;
33-
import javax.jms.JMSRuntimeException;
34-
import javax.jms.Message;
18+
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_MAX_POLL_BLOCKED_TIME_MS;
19+
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_MAX_POLL_TIME;
20+
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_BATCH_SIZE;
21+
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE;
22+
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_QUEUE;
23+
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER;
24+
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT;
25+
import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskStartUpAction.NORMAL_OPERATION;
26+
import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskStartUpAction.REDELIVER_UNSENT_BATCH;
27+
import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskStartUpAction.REMOVE_DELIVERED_MESSAGES_FROM_SOURCE_QUEUE;
28+
import static com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceState.LastKnownState.DELIVERED;
29+
import static com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceState.LastKnownState.IN_FLIGHT;
3530

3631
import java.util.ArrayList;
3732
import java.util.Collections;
@@ -47,24 +42,32 @@
4742
import java.util.function.Consumer;
4843
import java.util.stream.Collectors;
4944

50-
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_MAX_POLL_BLOCKED_TIME_MS;
51-
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_BATCH_SIZE;
52-
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE;
53-
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_QUEUE;
54-
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER;
55-
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT;
45+
import javax.jms.JMSException;
46+
import javax.jms.JMSRuntimeException;
47+
import javax.jms.Message;
5648

57-
import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskStartUpAction.REMOVE_DELIVERED_MESSAGES_FROM_SOURCE_QUEUE;
58-
import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskStartUpAction.NORMAL_OPERATION;
59-
import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskStartUpAction.REDELIVER_UNSENT_BATCH;
60-
import static com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceState.LastKnownState.DELIVERED;
61-
import static com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceState.LastKnownState.IN_FLIGHT;
49+
import org.apache.kafka.common.config.AbstractConfig;
50+
import org.apache.kafka.connect.errors.ConnectException;
51+
import org.apache.kafka.connect.source.SourceRecord;
52+
import org.apache.kafka.connect.source.SourceTask;
53+
import org.slf4j.Logger;
54+
import org.slf4j.LoggerFactory;
55+
56+
import com.ibm.eventstreams.connect.mqsource.builders.RecordBuilderException;
57+
import com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceState;
58+
import com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceStateClient;
59+
import com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceStateException;
60+
import com.ibm.eventstreams.connect.mqsource.util.ExceptionProcessor;
61+
import com.ibm.eventstreams.connect.mqsource.util.LogMessages;
62+
import com.ibm.eventstreams.connect.mqsource.util.QueueConfig;
6263

6364
public class MQSourceTask extends SourceTask {
6465
private static final Logger log = LoggerFactory.getLogger(MQSourceTask.class);
6566

6667
// The maximum number of records returned per call to poll()
6768
private int batchSize = CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT;
69+
// The maximum time to spend polling messages before returning a batch
70+
private long maxPollTime = CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT;
6871

6972
// Used to signal completion of a batch
7073
// After returning a batch of messages to Connect, the SourceTask waits
@@ -174,6 +177,7 @@ protected void start(final Map<String, String> props, final JMSWorker reader, fi
174177
startUpAction = NORMAL_OPERATION;
175178

176179
batchSize = config.getInt(CONFIG_NAME_MQ_BATCH_SIZE);
180+
maxPollTime = config.getLong(CONFIG_MAX_POLL_TIME);
177181
try {
178182
reader.configure(config);
179183
reader.connect();
@@ -415,22 +419,31 @@ private void initOrResetBatchCompleteSignal(final boolean predicate, final List<
415419

416420
private List<Message> pollSourceQueue(final int numberOfMessagesToBePolled) throws JMSException {
417421
final List<Message> localList = new ArrayList<>();
418-
419-
if (!stopNow.get()) {
420-
log.debug("Polling for records");
421-
Message message;
422-
do {
423-
message = reader.receive(sourceQueue, sourceQueueConfig, localList.size() == 0);
424-
if (message != null) {
425-
localList.add(message);
426-
}
427-
} while (message != null && localList.size() < numberOfMessagesToBePolled && !stopNow.get());
428-
} else {
422+
if (stopNow.get()) {
429423
log.info("Stopping polling for records");
424+
return localList;
430425
}
426+
427+
log.debug("Polling for records");
428+
final long startTime = System.currentTimeMillis();
429+
430+
Message message;
431+
do {
432+
message = reader.receive(sourceQueue, sourceQueueConfig, localList.isEmpty());
433+
if (message != null) {
434+
localList.add(message);
435+
}
436+
} while (
437+
message != null &&
438+
localList.size() < numberOfMessagesToBePolled &&
439+
!stopNow.get() &&
440+
(maxPollTime <= 0 || (System.currentTimeMillis() - startTime) < maxPollTime)
441+
);
442+
431443
return localList;
432444
}
433445

446+
434447
private boolean isFirstMsgOnSourceQueueARequiredMsg(final List<String> msgIds) throws JMSException {
435448
final Message message = reader.browse(sourceQueue).get();
436449
return msgIds.contains(message.getJMSMessageID());

0 commit comments

Comments
 (0)