Skip to content

Commit bc3d17e

Browse files
committed
feat: Config changes and new unit tests
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent def7c02 commit bc3d17e

File tree

3 files changed

+122
-12
lines changed

3 files changed

+122
-12
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
308308
| `mq.message.receive.timeout` | The timeout (in milliseconds) for receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater |
309309
| `mq.reconnect.delay.min.ms` | The minimum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 64 | 1 or greater |
310310
| `mq.reconnect.delay.max.ms` | The maximum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 8192 | 1 or greater |
311+
| `mq.receive.max.poll.time.ms` | Maximum time (in milliseconds) to poll messages in a single Kafka Connect task cycle. If set to 0, polling continues until batch size or a receive returns null. | long | 0 | 0 or greater |
311312
| `errors.deadletterqueue.topic.name` | The name of the Kafka topic to use as the dead letter queue (DLQ) for poison messages that fail during processing within the record builder component of the connector. | string | | If left blank (default), failed messages will not be written to a DLQ. |
312313
| `errors.deadletterqueue.context.headers.enable` | Whether to add error context headers to messages written to the DLQ. | boolean | false | When enabled, additional headers describing the error will be included with each DLQ record. |
313314

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 114 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1300,7 +1300,7 @@ public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Ex
13001300
assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic");
13011301
}
13021302

1303-
public void testMaxPollTimeTerminatesBatchEarly() throws Exception {
1303+
public void testMaxPollTimeTerminates() throws Exception {
13041304
connectTask = getSourceTaskWithEmptyKafkaOffset();
13051305

13061306
final Map<String, String> connectorConfigProps = createExactlyOnceConnectorProperties();
@@ -1309,7 +1309,7 @@ public void testMaxPollTimeTerminatesBatchEarly() throws Exception {
13091309
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
13101310
connectorConfigProps.put("mq.message.receive.timeout", "150");
13111311
connectorConfigProps.put("mq.receive.subsequent.timeout.ms", "10");
1312-
connectorConfigProps.put("mq.receive.max.poll.time.ms", "200"); // stop after 200ms
1312+
connectorConfigProps.put("mq.receive.max.poll.time.ms", "200");
13131313
connectorConfigProps.put("mq.batch.size", "5000");
13141314

13151315
final JMSWorker shared = new JMSWorker();
@@ -1320,25 +1320,131 @@ public void testMaxPollTimeTerminatesBatchEarly() throws Exception {
13201320

13211321
connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);
13221322

1323-
List<Message> messages = createAListOfMessages(getJmsContext(), 10, "msg ");
1323+
final List<Message> messages = createAListOfMessages(getJmsContext(), 10, "msg ");
13241324
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
1325-
messages = createAListOfMessages(getJmsContext(), 10, "msg ");
13261325
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
1327-
messages = createAListOfMessages(getJmsContext(), 10, "msg ");
13281326
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
13291327

1328+
final List<SourceRecord> kafkaMessages = connectTask.poll();
1329+
1330+
final List<Message> stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE);
1331+
assertThat(stateMsgs1.size()).isEqualTo(1);
1332+
final List<Message> sourceMsgs = getAllMessagesFromQueue(DEFAULT_SOURCE_QUEUE);
1333+
assertThat(sourceMsgs.size()).isEqualTo(0);
1334+
assertEquals(30, kafkaMessages.size());
1335+
}
1336+
1337+
@Test
1338+
public void testMaxPollTimeTerminatesBatchEarly() throws Exception {
1339+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1340+
1341+
final Map<String, String> connectorConfigProps = createExactlyOnceConnectorProperties();
1342+
connectorConfigProps.put("mq.message.body.jms", "true");
1343+
connectorConfigProps.put("mq.record.builder",
1344+
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
1345+
connectorConfigProps.put("mq.message.receive.timeout", "100");
1346+
connectorConfigProps.put("mq.receive.subsequent.timeout.ms", "10");
1347+
connectorConfigProps.put("mq.receive.max.poll.time.ms", "200"); // stop after 200ms
1348+
connectorConfigProps.put("mq.batch.size", "5000");
1349+
1350+
final JMSWorker shared = new JMSWorker();
1351+
shared.configure(getPropertiesConfig(connectorConfigProps));
1352+
final JMSWorker dedicated = new JMSWorker();
1353+
dedicated.configure(getPropertiesConfig(connectorConfigProps));
1354+
final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
1355+
1356+
connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);
1357+
1358+
final List<Message> messages = createAListOfMessages(getJmsContext(), 10, "msg ");
1359+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
13301360
final long start = System.nanoTime();
13311361
final List<SourceRecord> kafkaMessages = connectTask.poll();
13321362
final long durationMs = (System.nanoTime() - start) / 1_000_000;
13331363

1334-
System.out.println(durationMs);
13351364
// Poll should end close to 200ms
1336-
assertThat(durationMs >= 180 && durationMs <= 500).isTrue();
1365+
assertThat(durationMs <= 210).isTrue();
13371366

13381367
final List<Message> stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE);
13391368
assertThat(stateMsgs1.size()).isEqualTo(1);
13401369
final List<Message> sourceMsgs = getAllMessagesFromQueue(DEFAULT_SOURCE_QUEUE);
13411370
assertThat(sourceMsgs.size()).isEqualTo(0);
1342-
assertEquals(30, kafkaMessages.size());
1371+
assertEquals(10, kafkaMessages.size());
1372+
}
1373+
1374+
@Test
1375+
public void testPollEndsWhenBatchSizeReached() throws Exception {
1376+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1377+
1378+
final Map<String, String> config = createExactlyOnceConnectorProperties();
1379+
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
1380+
config.put("mq.message.receive.timeout", "100");
1381+
config.put("mq.receive.subsequent.timeout.ms", "10");
1382+
config.put("mq.receive.max.poll.time.ms", "1000");
1383+
config.put("mq.batch.size", "10");
1384+
1385+
final JMSWorker shared = new JMSWorker();
1386+
shared.configure(getPropertiesConfig(config));
1387+
final JMSWorker dedicated = new JMSWorker();
1388+
dedicated.configure(getPropertiesConfig(config));
1389+
final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
1390+
connectTask.start(config, shared, dedicated, sequenceStateClient);
1391+
1392+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, createAListOfMessages(getJmsContext(), 12, "msg "));
1393+
1394+
final long start = System.nanoTime();
1395+
connectTask.poll();
1396+
final long durationMs = (System.nanoTime() - start) / 1_000_000;
1397+
1398+
assertThat(durationMs < 1000).isTrue();
1399+
}
1400+
1401+
@Test
1402+
public void testPollWithMaxPollTimeZeroBehavesAsDefault() throws Exception {
1403+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1404+
final Map<String, String> config = createExactlyOnceConnectorProperties();
1405+
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
1406+
config.put("mq.message.receive.timeout", "400");
1407+
config.put("mq.receive.max.poll.time.ms", "0");
1408+
config.put("mq.batch.size", "100");
1409+
1410+
final JMSWorker shared = new JMSWorker();
1411+
shared.configure(getPropertiesConfig(config));
1412+
final JMSWorker dedicated = new JMSWorker();
1413+
dedicated.configure(getPropertiesConfig(config));
1414+
final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
1415+
connectTask.start(config, shared, dedicated, sequenceStateClient);
1416+
1417+
// putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, createAListOfMessages(getJmsContext(), 3, "msg "));
1418+
1419+
final long start = System.nanoTime();
1420+
final List<SourceRecord> records = connectTask.poll();
1421+
final long durationMs = (System.nanoTime() - start) / 1_000_000;
1422+
1423+
assertThat(durationMs >= 400 && durationMs <= 450).isTrue();
1424+
assertEquals(0, records.size());
1425+
}
1426+
1427+
@Test
1428+
public void testPollWithShortMaxPollTime() throws Exception {
1429+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1430+
final Map<String, String> config = createExactlyOnceConnectorProperties();
1431+
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
1432+
config.put("mq.receive.max.poll.time.ms", "50");
1433+
config.put("mq.message.receive.timeout", "1");
1434+
config.put("mq.receive.subsequent.timeout.ms", "0");
1435+
config.put("mq.batch.size", "5000");
1436+
1437+
final JMSWorker shared = new JMSWorker();
1438+
shared.configure(getPropertiesConfig(config));
1439+
final JMSWorker dedicated = new JMSWorker();
1440+
dedicated.configure(getPropertiesConfig(config));
1441+
final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
1442+
connectTask.start(config, shared, dedicated, sequenceStateClient);
1443+
1444+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, createAListOfMessages(getJmsContext(), 100, "msg "));
1445+
1446+
final List<SourceRecord> records = connectTask.poll();
1447+
1448+
assertThat(records.size() < 100);
13431449
}
13441450
}

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,11 @@ public class MQSourceConnector extends SourceConnector {
188188
public static final long CONFIG_RECONNECT_DELAY_MAX_MINIMUM = 10L;
189189

190190
public static final String CONFIG_MAX_POLL_TIME = "mq.receive.max.poll.time.ms";
191-
public static final String CONFIG_DOCUMENTATION_MAX_POLL_TIME = "Maximum time (in milliseconds) to spend polling messages before returning a batch to Kafka. "
192-
+ "If not set or set to 0, polling continues until batch size or receive timeout conditions are met.";
191+
public static final String CONFIG_DOCUMENTATION_MAX_POLL_TIME = "Maximum time (in milliseconds) to poll for messages during a single Kafka Connect poll cycle. "
192+
+ "Acts as a hard upper bound on how long the task will try to accumulate a batch. "
193+
+ "If set to 0 or not defined, polling continues until either a message receive returns null or the batch size is met. "
194+
+ "Note: It is recommended to keep this value less than or equal to both 'mq.message.receive.timeout' "
195+
+ "and 'mq.receive.subsequent.timeout.ms' to avoid unexpected delays due to long blocking receive calls.";
193196
public static final String CONFIG_DISPLAY_MAX_POLL_TIME = "Max poll time (ms)";
194197
public static final long CONFIG_MAX_POLL_TIME_DEFAULT = 0L;
195198

@@ -615,7 +618,7 @@ null, new ReadableFile(),
615618
CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT);
616619
CONFIGDEF.define(CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT,
617620
ConfigDef.Type.LONG,
618-
CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT_DEFAULT,
621+
CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT_DEFAULT, ConfigDef.Range.atLeast(CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT_DEFAULT),
619622
ConfigDef.Importance.LOW,
620623
CONFIG_DOCUMENTATION_SUBSEQUENT_RECEIVE_TIMEOUT,
621624
CONFIG_GROUP_MQ,
@@ -656,7 +659,7 @@ null, new ReadableFile(),
656659
DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
657660
CONFIGDEF.define(CONFIG_MAX_POLL_TIME,
658661
ConfigDef.Type.LONG,
659-
CONFIG_MAX_POLL_TIME_DEFAULT,
662+
CONFIG_MAX_POLL_TIME_DEFAULT, ConfigDef.Range.atLeast(CONFIG_MAX_POLL_TIME_DEFAULT),
660663
ConfigDef.Importance.LOW,
661664
CONFIG_DOCUMENTATION_MAX_POLL_TIME,
662665
CONFIG_GROUP_MQ,

0 commit comments

Comments
 (0)