Skip to content

mq.receive.max.poll.time.ms, to limit the maximum time spent polling messages in a Kafka Connect task cycle #153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

Joel-hanson
Copy link
Contributor

This pull request introduces a new configuration option, mq.receive.max.poll.time.ms, to limit the maximum time spent polling messages in a Kafka Connect task cycle. It also includes updates to the implementation and tests to support this feature.

New Configuration Option:

  • Added mq.receive.max.poll.time.ms to define the maximum time (in milliseconds) for polling messages during a Kafka Connect task cycle. If set to 0, polling continues until the batch size is met or no more messages are available. This configuration is documented in the README.md and integrated into the connector's configuration definitions.

Implementation Updates:

  • Updated the MQSourceTask class to use the new mq.receive.max.poll.time.ms configuration. The polling logic now respects the maximum poll time, ensuring the task terminates polling early if the time limit is reached.

Testing Enhancements:

  • Added multiple integration tests in MQSourceTaskIT.java to validate the behavior of the mq.receive.max.poll.time.ms configuration under different scenarios, such as terminating early, respecting batch size, and handling a value of 0.

These changes improve the flexibility and control of the Kafka Connect source connector for IBM MQ, allowing users to fine-tune message polling behavior.# Description

Type of change

  • New feature (non-breaking change which adds functionality)
  • This change requires a documentation update

How Has This Been Tested?

  • Integration tests

Checklist

  • My code follows the style guidelines of this project
  • I have performed a self-review of my code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes
  • Any dependent changes have been merged and published in downstream modules

- Maximum time (in milliseconds) to spend polling
  messages before returning a batch to Kafka.

Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
@Joel-hanson Joel-hanson changed the title Evi 14907 mq.receive.max.poll.time.ms, to limit the maximum time spent polling messages in a Kafka Connect task cycle Jun 23, 2025
@Joel-hanson Joel-hanson requested a review from dalelane June 26, 2025 10:18
Copy link
Contributor

@dalelane dalelane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've suggested a small refactoring that means we do the sum once up-front, and then just a straight comparison every time around the loop (rather than doing the sum in every loop). In practice, I suspect the performance impact of such a micro-optimisation will be minimal, so feel free to ignore.

Otherwise this all looks good to me, thanks!

}

log.debug("Polling for records");
final long startTime = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final long startTime = System.currentTimeMillis();
final long pollEndTime = System.currentTimeMillis() + maxPollTime;

message != null &&
localList.size() < numberOfMessagesToBePolled &&
!stopNow.get() &&
(maxPollTime <= 0 || (System.currentTimeMillis() - startTime) < maxPollTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(maxPollTime <= 0 || (System.currentTimeMillis() - startTime) < maxPollTime)
(maxPollTime <= 0 || (System.currentTimeMillis() < pollEndTime)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants