Skip to content

Commit 3225272

Browse files
committed
feat: Expose one config adjust the blocking behavior
- Enhancement to the IBM MQ Source Connector to expose two configurable values to adjust the blocking behavior of the JMS receive methods when retrieving messages from MQ Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent 9129d3a commit 3225272

File tree

3 files changed

+56
-23
lines changed

3 files changed

+56
-23
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,8 @@ public void testJmsWorkerWithCustomReciveForConsumerAndCustomReconnectValues() t
666666
connectorConfigProps.put("mq.message.body.jms", "true");
667667
connectorConfigProps.put("mq.record.builder",
668668
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
669-
connectorConfigProps.put("mq.message.receive.timeout", "5000");
669+
connectorConfigProps.put("mq.receive.timeout.ms", "5000");
670+
connectorConfigProps.put("mq.receive.subsequent.timeout.ms", "2000");
670671
connectorConfigProps.put("mq.reconnect.delay.min.ms", "100");
671672
connectorConfigProps.put("mq.reconnect.delay.max.ms", "10000");
672673

@@ -688,7 +689,8 @@ public void testJmsWorkerWithCustomReciveForConsumerAndCustomReconnectValues() t
688689
shared.attemptRollback();
689690
assertThat(stateMsgs1.size()).isEqualTo(1);
690691

691-
assertEquals(5000L, shared.getReceiveTimeout());
692+
assertEquals(5000L, shared.getInitialReceiveTimeoutMs());
693+
assertEquals(2000L, shared.getSubsequentReceiveTimeoutMs());
692694
assertEquals(100L, shared.getReconnectDelayMillisMin());
693695
assertEquals(10000L, shared.getReconnectDelayMillisMax());
694696
}

src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,17 @@ public class JMSWorker {
7171
private boolean connected = false; // Whether connected to MQ
7272
private AtomicBoolean closeNow; // Whether close has been requested
7373
private AbstractConfig config;
74-
private long receiveTimeout; // Receive timeout for the jms consumer
74+
private long initialReceiveTimeoutMs; // Receive timeout for the jms consumer
75+
private long subsequentReceiveTimeoutMs; // Receive timeout for the jms consumer on the subsequent calls
7576
private long reconnectDelayMillisMin; // Delay between repeated reconnect attempts min
7677
private long reconnectDelayMillisMax; // Delay between repeated reconnect attempts max
7778

78-
long getReceiveTimeout() {
79-
return receiveTimeout;
79+
long getInitialReceiveTimeoutMs() {
80+
return initialReceiveTimeoutMs;
81+
}
82+
83+
long getSubsequentReceiveTimeoutMs() {
84+
return subsequentReceiveTimeoutMs;
8085
}
8186

8287
long getReconnectDelayMillisMin() {
@@ -144,7 +149,8 @@ public void configure(final AbstractConfig config) {
144149
userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
145150
password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
146151
topic = config.getString(MQSourceConnector.CONFIG_NAME_TOPIC);
147-
receiveTimeout = config.getLong(MQSourceConnector.CONFIG_MAX_RECEIVE_TIMEOUT);
152+
initialReceiveTimeoutMs = config.getLong(MQSourceConnector.CONFIG_MAX_RECEIVE_TIMEOUT);
153+
subsequentReceiveTimeoutMs = config.getLong(MQSourceConnector.CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT);
148154
reconnectDelayMillisMin = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN);
149155
reconnectDelayMillisMax = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX);
150156
} catch (JMSException | JMSRuntimeException jmse) {
@@ -222,10 +228,14 @@ public void connect() {
222228
*
223229
* @param queueName The name of the queue to get messages from
224230
* @param queueConfig Any particular queue configuration that should be applied
225-
* @param wait Whether to wait indefinitely for a message
231+
* @param initialCall Indicates whether this is the initial receive call in the polling cycle.
232+
* Determines which configured timeout to use:
233+
* - If true, uses the initial receive timeout.
234+
* - If false, uses the subsequent receive timeout.
235+
* A timeout value of 0 results in a non-blocking receiveNoWait() call.
226236
* @return The Message retrieved from MQ
227237
*/
228-
public Message receive(final String queueName, final QueueConfig queueConfig, final boolean wait) throws JMSRuntimeException, JMSException {
238+
public Message receive(final String queueName, final QueueConfig queueConfig, final boolean initialCall) throws JMSRuntimeException, JMSException {
229239
log.trace("[{}] Entry {}.receive", Thread.currentThread().getId(), this.getClass().getName());
230240

231241
if (!maybeReconnect()) {
@@ -243,21 +253,25 @@ public Message receive(final String queueName, final QueueConfig queueConfig, fi
243253
jmsConsumers.put(queueName, internalConsumer);
244254
}
245255

246-
Message message = null;
247-
if (wait) {
248-
log.debug("Waiting {} ms for message", receiveTimeout);
249256

250-
message = internalConsumer.receive(receiveTimeout);
257+
final long timeoutMs = initialCall
258+
? initialReceiveTimeoutMs
259+
: subsequentReceiveTimeoutMs;
260+
261+
Message message = null;
262+
if (timeoutMs > 0) {
263+
// block up to timeoutMs
264+
message = internalConsumer.receive(timeoutMs);
251265

252266
if (message == null) {
253-
log.debug("No message received");
267+
log.debug("No message received within {} ms on queue={}", timeoutMs, queueName);
254268
}
255269
} else {
270+
// non‐blocking
256271
message = internalConsumer.receiveNoWait();
257272
}
258273

259274
log.trace("[{}] Exit {}.receive, retval={}", Thread.currentThread().getId(), this.getClass().getName(), message);
260-
261275
return message;
262276
}
263277

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,18 @@ public class MQSourceConnector extends SourceConnector {
163163
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED = "DISABLED";
164164
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF = "ASDEF";
165165

166-
public static final String CONFIG_MAX_RECEIVE_TIMEOUT = "mq.message.receive.timeout";
166+
public static final String CONFIG_MAX_RECEIVE_TIMEOUT = "mq.receive.timeout.ms";
167167
public static final String CONFIG_DOCUMENTATION_MAX_RECEIVE_TIMEOUT = "How long the connector should wait (in milliseconds) for a message to arrive if no message is available immediately";
168-
public static final String CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT = "message receive timeout";
168+
public static final String CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT = "Initial receive timeout (ms)";
169169
public static final long CONFIG_MAX_RECEIVE_TIMEOUT_DEFAULT = 2000L;
170170
public static final long CONFIG_MAX_RECEIVE_TIMEOUT_MINIMUM = 1L;
171171

172+
public static final String CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT = "mq.receive.subsequent.timeout.ms";
173+
public static final String CONFIG_DOCUMENTATION_SUBSEQUENT_RECEIVE_TIMEOUT = "How long (in milliseconds) the connector should wait for subsequent receives, "
174+
+ "defaults to 0 (no-wait) and uses receiveNoWait().";
175+
public static final String CONFIG_DISPLAY_SUBSEQUENT_RECEIVE_TIMEOUT = "subsequent receive timeout (ms)";
176+
public static final long CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT_DEFAULT = 0L;
177+
172178
public static final String CONFIG_RECONNECT_DELAY_MIN = "mq.reconnect.delay.min.ms";
173179
public static final String CONFIG_DOCUMENTATION_RECONNECT_DELAY_MIN = "The minimum delay in milliseconds for reconnect attempts.";
174180
public static final String CONFIG_DISPLAY_RECONNECT_DELAY_MIN = "reconnect minimum delay";
@@ -576,27 +582,38 @@ null, new ReadableFile(),
576582
Width.SHORT,
577583
CONFIG_DISPLAY_MQ_CLIENT_RECONNECT_OPTIONS);
578584
CONFIGDEF.define(CONFIG_MAX_RECEIVE_TIMEOUT,
579-
Type.LONG,
580-
CONFIG_MAX_RECEIVE_TIMEOUT_DEFAULT, ConfigDef.Range.atLeast(CONFIG_MAX_RECEIVE_TIMEOUT_MINIMUM),
581-
Importance.MEDIUM,
585+
ConfigDef.Type.LONG,
586+
CONFIG_MAX_RECEIVE_TIMEOUT_DEFAULT,
587+
ConfigDef.Range.atLeast(CONFIG_MAX_RECEIVE_TIMEOUT_MINIMUM),
588+
ConfigDef.Importance.MEDIUM,
582589
CONFIG_DOCUMENTATION_MAX_RECEIVE_TIMEOUT,
583-
CONFIG_GROUP_MQ, 26,
584-
Width.MEDIUM,
590+
"MQ",
591+
26,
592+
ConfigDef.Width.MEDIUM,
585593
CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT);
594+
CONFIGDEF.define(CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT,
595+
ConfigDef.Type.LONG,
596+
CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT_DEFAULT,
597+
ConfigDef.Importance.LOW,
598+
CONFIG_DOCUMENTATION_SUBSEQUENT_RECEIVE_TIMEOUT,
599+
"MQ",
600+
27,
601+
ConfigDef.Width.MEDIUM,
602+
CONFIG_DISPLAY_SUBSEQUENT_RECEIVE_TIMEOUT);
586603
CONFIGDEF.define(CONFIG_RECONNECT_DELAY_MIN,
587604
Type.LONG,
588605
CONFIG_RECONNECT_DELAY_MIN_DEFAULT, ConfigDef.Range.atLeast(CONFIG_RECONNECT_DELAY_MIN_MINIMUM),
589606
Importance.MEDIUM,
590607
CONFIG_DOCUMENTATION_RECONNECT_DELAY_MIN,
591-
CONFIG_GROUP_MQ, 27,
608+
CONFIG_GROUP_MQ, 28,
592609
Width.MEDIUM,
593610
CONFIG_DISPLAY_RECONNECT_DELAY_MIN);
594611
CONFIGDEF.define(CONFIG_RECONNECT_DELAY_MAX,
595612
Type.LONG,
596613
CONFIG_RECONNECT_DELAY_MAX_DEFAULT, ConfigDef.Range.atLeast(CONFIG_RECONNECT_DELAY_MAX_MINIMUM),
597614
Importance.MEDIUM,
598615
CONFIG_DOCUMENTATION_RECONNECT_DELAY_MAX,
599-
CONFIG_GROUP_MQ, 28,
616+
CONFIG_GROUP_MQ, 29,
600617
Width.MEDIUM,
601618
CONFIG_DISPLAY_RECONNECT_DELAY_MAX);
602619

0 commit comments

Comments
 (0)