From def7c02f16ebc1a4bd45ae652b7e01e5e223c143 Mon Sep 17 00:00:00 2001 From: Joel Hanson Date: Mon, 23 Jun 2025 14:18:36 +0530 Subject: [PATCH 1/2] feat: added new config mq.receive.max.poll.time.ms - Maximum time (in milliseconds) to spend polling messages before returning a batch to Kafka. Signed-off-by: Joel Hanson --- .../connect/mqsource/MQSourceTaskIT.java | 42 +++++++++ .../connect/mqsource/MQSourceConnector.java | 15 +++ .../connect/mqsource/MQSourceTask.java | 91 +++++++++++-------- 3 files changed, 109 insertions(+), 39 deletions(-) diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java index 4f4fe32..93e33b4 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -1299,4 +1299,46 @@ public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Ex assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); } + + public void testMaxPollTimeTerminatesBatchEarly() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createExactlyOnceConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.message.receive.timeout", "150"); + connectorConfigProps.put("mq.receive.subsequent.timeout.ms", "10"); + connectorConfigProps.put("mq.receive.max.poll.time.ms", "200"); // stop after 200ms + connectorConfigProps.put("mq.batch.size", "5000"); + + final JMSWorker shared = new JMSWorker(); + shared.configure(getPropertiesConfig(connectorConfigProps)); + final JMSWorker dedicated = new JMSWorker(); + dedicated.configure(getPropertiesConfig(connectorConfigProps)); + final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated); + + connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient); + + List messages = createAListOfMessages(getJmsContext(), 10, "msg "); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages); + messages = createAListOfMessages(getJmsContext(), 10, "msg "); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages); + messages = createAListOfMessages(getJmsContext(), 10, "msg "); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages); + + final long start = System.nanoTime(); + final List kafkaMessages = connectTask.poll(); + final long durationMs = (System.nanoTime() - start) / 1_000_000; + + System.out.println(durationMs); + // Poll should end close to 200ms + assertThat(durationMs >= 180 && durationMs <= 500).isTrue(); + + final List stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE); + assertThat(stateMsgs1.size()).isEqualTo(1); + final List sourceMsgs = getAllMessagesFromQueue(DEFAULT_SOURCE_QUEUE); + assertThat(sourceMsgs.size()).isEqualTo(0); + assertEquals(30, kafkaMessages.size()); + } } diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java index 4d5b994..a1209de 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java @@ -187,6 +187,12 @@ public class MQSourceConnector extends SourceConnector { public static final long CONFIG_RECONNECT_DELAY_MAX_DEFAULT = 8192L; public static final long CONFIG_RECONNECT_DELAY_MAX_MINIMUM = 10L; + public static final String CONFIG_MAX_POLL_TIME = "mq.receive.max.poll.time.ms"; + public static final String CONFIG_DOCUMENTATION_MAX_POLL_TIME = "Maximum time (in milliseconds) to spend polling messages before returning a batch to Kafka. " + + "If not set or set to 0, polling continues until batch size or receive timeout conditions are met."; + public static final String CONFIG_DISPLAY_MAX_POLL_TIME = "Max poll time (ms)"; + public static final long CONFIG_MAX_POLL_TIME_DEFAULT = 0L; + public static final String DLQ_PREFIX = "errors.deadletterqueue."; public static final String DLQ_TOPIC_NAME_CONFIG = DLQ_PREFIX + "topic.name"; @@ -648,6 +654,15 @@ null, new ReadableFile(), CONFIG_GROUP_MQ, 31, Width.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY); + CONFIGDEF.define(CONFIG_MAX_POLL_TIME, + ConfigDef.Type.LONG, + CONFIG_MAX_POLL_TIME_DEFAULT, + ConfigDef.Importance.LOW, + CONFIG_DOCUMENTATION_MAX_POLL_TIME, + CONFIG_GROUP_MQ, + 32, + ConfigDef.Width.MEDIUM, + CONFIG_DISPLAY_MAX_POLL_TIME); CONFIGDEF.define(CONFIG_NAME_TOPIC, Type.STRING, diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java index 9a876d5..2b8c814 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java @@ -15,23 +15,18 @@ */ package com.ibm.eventstreams.connect.mqsource; -import com.ibm.eventstreams.connect.mqsource.builders.RecordBuilderException; -import com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceState; -import com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceStateClient; -import com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceStateException; -import com.ibm.eventstreams.connect.mqsource.util.LogMessages; -import com.ibm.eventstreams.connect.mqsource.util.ExceptionProcessor; -import com.ibm.eventstreams.connect.mqsource.util.QueueConfig; -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.source.SourceRecord; -import org.apache.kafka.connect.source.SourceTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.JMSException; -import javax.jms.JMSRuntimeException; -import javax.jms.Message; +import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_MAX_POLL_BLOCKED_TIME_MS; +import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_MAX_POLL_TIME; +import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_BATCH_SIZE; +import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE; +import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_QUEUE; +import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER; +import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT; +import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskStartUpAction.NORMAL_OPERATION; +import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskStartUpAction.REDELIVER_UNSENT_BATCH; +import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskStartUpAction.REMOVE_DELIVERED_MESSAGES_FROM_SOURCE_QUEUE; +import static com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceState.LastKnownState.DELIVERED; +import static com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceState.LastKnownState.IN_FLIGHT; import java.util.ArrayList; import java.util.Collections; @@ -47,24 +42,32 @@ import java.util.function.Consumer; import java.util.stream.Collectors; -import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_MAX_POLL_BLOCKED_TIME_MS; -import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_BATCH_SIZE; -import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE; -import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_QUEUE; -import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER; -import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT; +import javax.jms.JMSException; +import javax.jms.JMSRuntimeException; +import javax.jms.Message; -import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskStartUpAction.REMOVE_DELIVERED_MESSAGES_FROM_SOURCE_QUEUE; -import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskStartUpAction.NORMAL_OPERATION; -import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskStartUpAction.REDELIVER_UNSENT_BATCH; -import static com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceState.LastKnownState.DELIVERED; -import static com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceState.LastKnownState.IN_FLIGHT; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.ibm.eventstreams.connect.mqsource.builders.RecordBuilderException; +import com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceState; +import com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceStateClient; +import com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceStateException; +import com.ibm.eventstreams.connect.mqsource.util.ExceptionProcessor; +import com.ibm.eventstreams.connect.mqsource.util.LogMessages; +import com.ibm.eventstreams.connect.mqsource.util.QueueConfig; public class MQSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(MQSourceTask.class); // The maximum number of records returned per call to poll() private int batchSize = CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT; + // The maximum time to spend polling messages before returning a batch + private long maxPollTime = CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT; // Used to signal completion of a batch // After returning a batch of messages to Connect, the SourceTask waits @@ -174,6 +177,7 @@ protected void start(final Map props, final JMSWorker reader, fi startUpAction = NORMAL_OPERATION; batchSize = config.getInt(CONFIG_NAME_MQ_BATCH_SIZE); + maxPollTime = config.getLong(CONFIG_MAX_POLL_TIME); try { reader.configure(config); reader.connect(); @@ -415,22 +419,31 @@ private void initOrResetBatchCompleteSignal(final boolean predicate, final List< private List pollSourceQueue(final int numberOfMessagesToBePolled) throws JMSException { final List localList = new ArrayList<>(); - - if (!stopNow.get()) { - log.debug("Polling for records"); - Message message; - do { - message = reader.receive(sourceQueue, sourceQueueConfig, localList.size() == 0); - if (message != null) { - localList.add(message); - } - } while (message != null && localList.size() < numberOfMessagesToBePolled && !stopNow.get()); - } else { + if (stopNow.get()) { log.info("Stopping polling for records"); + return localList; } + + log.debug("Polling for records"); + final long startTime = System.currentTimeMillis(); + + Message message; + do { + message = reader.receive(sourceQueue, sourceQueueConfig, localList.isEmpty()); + if (message != null) { + localList.add(message); + } + } while ( + message != null && + localList.size() < numberOfMessagesToBePolled && + !stopNow.get() && + (maxPollTime <= 0 || (System.currentTimeMillis() - startTime) < maxPollTime) + ); + return localList; } + private boolean isFirstMsgOnSourceQueueARequiredMsg(final List msgIds) throws JMSException { final Message message = reader.browse(sourceQueue).get(); return msgIds.contains(message.getJMSMessageID()); From bc3d17e81b910a0c8eb66e0fe89e10c02d114e2b Mon Sep 17 00:00:00 2001 From: Joel Hanson Date: Mon, 23 Jun 2025 18:12:06 +0530 Subject: [PATCH 2/2] feat: Config changes and new unit tests Signed-off-by: Joel Hanson --- README.md | 1 + .../connect/mqsource/MQSourceTaskIT.java | 122 ++++++++++++++++-- .../connect/mqsource/MQSourceConnector.java | 11 +- 3 files changed, 122 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 1e19519..2643e14 100644 --- a/README.md +++ b/README.md @@ -308,6 +308,7 @@ The configuration options for the Kafka Connect source connector for IBM MQ are | `mq.message.receive.timeout` | The timeout (in milliseconds) for receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater | | `mq.reconnect.delay.min.ms` | The minimum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 64 | 1 or greater | | `mq.reconnect.delay.max.ms` | The maximum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 8192 | 1 or greater | +| `mq.receive.max.poll.time.ms` | Maximum time (in milliseconds) to poll messages in a single Kafka Connect task cycle. If set to 0, polling continues until batch size or a receive returns null. | long | 0 | 0 or greater | | `errors.deadletterqueue.topic.name` | The name of the Kafka topic to use as the dead letter queue (DLQ) for poison messages that fail during processing within the record builder component of the connector. | string | | If left blank (default), failed messages will not be written to a DLQ. | | `errors.deadletterqueue.context.headers.enable` | Whether to add error context headers to messages written to the DLQ. | boolean | false | When enabled, additional headers describing the error will be included with each DLQ record. | diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java index 93e33b4..6cd3f95 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -1300,7 +1300,7 @@ public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Ex assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); } - public void testMaxPollTimeTerminatesBatchEarly() throws Exception { + public void testMaxPollTimeTerminates() throws Exception { connectTask = getSourceTaskWithEmptyKafkaOffset(); final Map connectorConfigProps = createExactlyOnceConnectorProperties(); @@ -1309,7 +1309,7 @@ public void testMaxPollTimeTerminatesBatchEarly() throws Exception { "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.message.receive.timeout", "150"); connectorConfigProps.put("mq.receive.subsequent.timeout.ms", "10"); - connectorConfigProps.put("mq.receive.max.poll.time.ms", "200"); // stop after 200ms + connectorConfigProps.put("mq.receive.max.poll.time.ms", "200"); connectorConfigProps.put("mq.batch.size", "5000"); final JMSWorker shared = new JMSWorker(); @@ -1320,25 +1320,131 @@ public void testMaxPollTimeTerminatesBatchEarly() throws Exception { connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient); - List messages = createAListOfMessages(getJmsContext(), 10, "msg "); + final List messages = createAListOfMessages(getJmsContext(), 10, "msg "); putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages); - messages = createAListOfMessages(getJmsContext(), 10, "msg "); putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages); - messages = createAListOfMessages(getJmsContext(), 10, "msg "); putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages); + final List kafkaMessages = connectTask.poll(); + + final List stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE); + assertThat(stateMsgs1.size()).isEqualTo(1); + final List sourceMsgs = getAllMessagesFromQueue(DEFAULT_SOURCE_QUEUE); + assertThat(sourceMsgs.size()).isEqualTo(0); + assertEquals(30, kafkaMessages.size()); + } + + @Test + public void testMaxPollTimeTerminatesBatchEarly() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createExactlyOnceConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.message.receive.timeout", "100"); + connectorConfigProps.put("mq.receive.subsequent.timeout.ms", "10"); + connectorConfigProps.put("mq.receive.max.poll.time.ms", "200"); // stop after 200ms + connectorConfigProps.put("mq.batch.size", "5000"); + + final JMSWorker shared = new JMSWorker(); + shared.configure(getPropertiesConfig(connectorConfigProps)); + final JMSWorker dedicated = new JMSWorker(); + dedicated.configure(getPropertiesConfig(connectorConfigProps)); + final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated); + + connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient); + + final List messages = createAListOfMessages(getJmsContext(), 10, "msg "); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages); final long start = System.nanoTime(); final List kafkaMessages = connectTask.poll(); final long durationMs = (System.nanoTime() - start) / 1_000_000; - System.out.println(durationMs); // Poll should end close to 200ms - assertThat(durationMs >= 180 && durationMs <= 500).isTrue(); + assertThat(durationMs <= 210).isTrue(); final List stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE); assertThat(stateMsgs1.size()).isEqualTo(1); final List sourceMsgs = getAllMessagesFromQueue(DEFAULT_SOURCE_QUEUE); assertThat(sourceMsgs.size()).isEqualTo(0); - assertEquals(30, kafkaMessages.size()); + assertEquals(10, kafkaMessages.size()); + } + + @Test + public void testPollEndsWhenBatchSizeReached() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map config = createExactlyOnceConnectorProperties(); + config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + config.put("mq.message.receive.timeout", "100"); + config.put("mq.receive.subsequent.timeout.ms", "10"); + config.put("mq.receive.max.poll.time.ms", "1000"); + config.put("mq.batch.size", "10"); + + final JMSWorker shared = new JMSWorker(); + shared.configure(getPropertiesConfig(config)); + final JMSWorker dedicated = new JMSWorker(); + dedicated.configure(getPropertiesConfig(config)); + final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated); + connectTask.start(config, shared, dedicated, sequenceStateClient); + + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, createAListOfMessages(getJmsContext(), 12, "msg ")); + + final long start = System.nanoTime(); + connectTask.poll(); + final long durationMs = (System.nanoTime() - start) / 1_000_000; + + assertThat(durationMs < 1000).isTrue(); + } + + @Test + public void testPollWithMaxPollTimeZeroBehavesAsDefault() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + final Map config = createExactlyOnceConnectorProperties(); + config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + config.put("mq.message.receive.timeout", "400"); + config.put("mq.receive.max.poll.time.ms", "0"); + config.put("mq.batch.size", "100"); + + final JMSWorker shared = new JMSWorker(); + shared.configure(getPropertiesConfig(config)); + final JMSWorker dedicated = new JMSWorker(); + dedicated.configure(getPropertiesConfig(config)); + final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated); + connectTask.start(config, shared, dedicated, sequenceStateClient); + + // putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, createAListOfMessages(getJmsContext(), 3, "msg ")); + + final long start = System.nanoTime(); + final List records = connectTask.poll(); + final long durationMs = (System.nanoTime() - start) / 1_000_000; + + assertThat(durationMs >= 400 && durationMs <= 450).isTrue(); + assertEquals(0, records.size()); + } + + @Test + public void testPollWithShortMaxPollTime() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + final Map config = createExactlyOnceConnectorProperties(); + config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + config.put("mq.receive.max.poll.time.ms", "50"); + config.put("mq.message.receive.timeout", "1"); + config.put("mq.receive.subsequent.timeout.ms", "0"); + config.put("mq.batch.size", "5000"); + + final JMSWorker shared = new JMSWorker(); + shared.configure(getPropertiesConfig(config)); + final JMSWorker dedicated = new JMSWorker(); + dedicated.configure(getPropertiesConfig(config)); + final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated); + connectTask.start(config, shared, dedicated, sequenceStateClient); + + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, createAListOfMessages(getJmsContext(), 100, "msg ")); + + final List records = connectTask.poll(); + + assertThat(records.size() < 100); } } diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java index a1209de..d920b2d 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java @@ -188,8 +188,11 @@ public class MQSourceConnector extends SourceConnector { public static final long CONFIG_RECONNECT_DELAY_MAX_MINIMUM = 10L; public static final String CONFIG_MAX_POLL_TIME = "mq.receive.max.poll.time.ms"; - public static final String CONFIG_DOCUMENTATION_MAX_POLL_TIME = "Maximum time (in milliseconds) to spend polling messages before returning a batch to Kafka. " - + "If not set or set to 0, polling continues until batch size or receive timeout conditions are met."; + public static final String CONFIG_DOCUMENTATION_MAX_POLL_TIME = "Maximum time (in milliseconds) to poll for messages during a single Kafka Connect poll cycle. " + + "Acts as a hard upper bound on how long the task will try to accumulate a batch. " + + "If set to 0 or not defined, polling continues until either a message receive returns null or the batch size is met. " + + "Note: It is recommended to keep this value less than or equal to both 'mq.message.receive.timeout' " + + "and 'mq.receive.subsequent.timeout.ms' to avoid unexpected delays due to long blocking receive calls."; public static final String CONFIG_DISPLAY_MAX_POLL_TIME = "Max poll time (ms)"; public static final long CONFIG_MAX_POLL_TIME_DEFAULT = 0L; @@ -615,7 +618,7 @@ null, new ReadableFile(), CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT); CONFIGDEF.define(CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT, ConfigDef.Type.LONG, - CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT_DEFAULT, + CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT_DEFAULT, ConfigDef.Range.atLeast(CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT_DEFAULT), ConfigDef.Importance.LOW, CONFIG_DOCUMENTATION_SUBSEQUENT_RECEIVE_TIMEOUT, CONFIG_GROUP_MQ, @@ -656,7 +659,7 @@ null, new ReadableFile(), DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY); CONFIGDEF.define(CONFIG_MAX_POLL_TIME, ConfigDef.Type.LONG, - CONFIG_MAX_POLL_TIME_DEFAULT, + CONFIG_MAX_POLL_TIME_DEFAULT, ConfigDef.Range.atLeast(CONFIG_MAX_POLL_TIME_DEFAULT), ConfigDef.Importance.LOW, CONFIG_DOCUMENTATION_MAX_POLL_TIME, CONFIG_GROUP_MQ,