Skip to content

Commit 523b5da

Browse files
dalelaneJoel Hanson
authored and
GitHub Enterprise
committed
fix: avoid dead-locking the SourceTask if all commits aren't received (#51)
* chore: comment updates - no functional changes Fixed some weird line-wrapping in the existing comments, edited comments that were technically inaccurate, and added some extra clarification to comments ahead of changes in the next commit. Signed-off-by: Dale Lane <dale.lane@uk.ibm.com> * test: new unit tests that illustrate desired behaviour poll() should quickly return an empty list if we are still waiting for a previous batch to complete, but throw an exception if this happens too many times. Signed-off-by: Dale Lane <dale.lane@uk.ibm.com> * fix: avoid deadlocking poll calls when commitRecord isnt called Signed-off-by: Dale Lane <dale.lane@uk.ibm.com> * chore: prepare new release I also removed an invalid config option that was included in the pom.xml at some point. Signed-off-by: Dale Lane <dale.lane@uk.ibm.com> * feat: Address review comments Contributes to: event-integration/eventstreams-planning#0 Signed-off-by: Joel Hanson <joel.hanson2@ibm.com> * fix: Update kafka connect-api and connect-json version Contributes to: event-integration/eventstreams-planning#0 Signed-off-by: Joel Hanson <joel.hanson2@ibm.com> --------- Signed-off-by: Dale Lane <dale.lane@uk.ibm.com> Signed-off-by: Joel Hanson <joel.hanson2@ibm.com> Co-authored-by: Joel Hanson <joel.hanson2@ibm.com>
1 parent 956db2d commit 523b5da

File tree

5 files changed

+225
-106
lines changed

5 files changed

+225
-106
lines changed

README.md

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -277,32 +277,33 @@ For troubleshooting, or to better understand the handshake performed by the IBM
277277

278278
The configuration options for the Kafka Connect source connector for IBM MQ are as follows:
279279

280-
| Name | Description | Type | Default | Valid values |
281-
| --------------------------------------- | ---------------------------------------------------------------------- | ------- | -------------- | ------------------------------------------------------- |
282-
| topic | The name of the target Kafka topic | string | | Topic name |
283-
| mq.queue.manager | The name of the MQ queue manager | string | | MQ queue manager name |
284-
| mq.connection.mode | The connection mode - bindings or client | string | client | client, bindings |
285-
| mq.connection.name.list | List of connection names for queue manager | string | | host(port)[,host(port),...] |
286-
| mq.channel.name | The name of the server-connection channel | string | | MQ channel name |
287-
| mq.queue | The name of the source MQ queue | string | | MQ queue name |
288-
| mq.exactly.once.state.queue | The name of the MQ queue used to store state when running with exactly-once semantics | string | | MQ state queue name |
289-
| mq.user.name | The user name for authenticating with the queue manager | string | | User name |
290-
| mq.password | The password for authenticating with the queue manager | string | | Password |
291-
| mq.user.authentication.mqcsp | Whether to use MQ connection security parameters (MQCSP) | boolean | true | |
292-
| mq.ccdt.url | The URL for the CCDT file containing MQ connection details | string | | URL for obtaining a CCDT file |
293-
| mq.record.builder | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder |
294-
| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | |
295-
| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes, JMSDestination |
296-
| mq.jms.properties.copy.to.kafka.headers | Whether to copy JMS message properties to Kafka headers | boolean | false | |
297-
| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite |
298-
| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern |
299-
| mq.ssl.keystore.location | The path to the JKS keystore to use for SSL (TLS) connections | string | JVM keystore | Local path to a JKS file |
300-
| mq.ssl.keystore.password | The password of the JKS keystore to use for SSL (TLS) connections | string | | |
301-
| mq.ssl.truststore.location | The path to the JKS truststore to use for SSL (TLS) connections | string | JVM truststore | Local path to a JKS file |
302-
| mq.ssl.truststore.password | The password of the JKS truststore to use for SSL (TLS) connections | string | | |
303-
| mq.ssl.use.ibm.cipher.mappings | Whether to set system property to control use of IBM cipher mappings | boolean | | |
304-
| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater |
305-
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |
280+
| Name | Description | Type | Default | Valid values |
281+
| --------------------------------------- | ----------------------------------------------------------------------------------------------------------------------- | ------- | -------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
282+
| topic | The name of the target Kafka topic | string | | Topic name |
283+
| mq.queue.manager | The name of the MQ queue manager | string | | MQ queue manager name |
284+
| mq.connection.mode | The connection mode - bindings or client | string | client | client, bindings |
285+
| mq.connection.name.list | List of connection names for queue manager | string | | host(port)[,host(port),...] |
286+
| mq.channel.name | The name of the server-connection channel | string | | MQ channel name |
287+
| mq.queue | The name of the source MQ queue | string | | MQ queue name |
288+
| mq.exactly.once.state.queue | The name of the MQ queue used to store state when running with exactly-once semantics | string | | MQ state queue name |
289+
| mq.user.name | The user name for authenticating with the queue manager | string | | User name |
290+
| mq.password | The password for authenticating with the queue manager | string | | Password |
291+
| mq.user.authentication.mqcsp | Whether to use MQ connection security parameters (MQCSP) | boolean | true | |
292+
| mq.ccdt.url | The URL for the CCDT file containing MQ connection details | string | | URL for obtaining a CCDT file |
293+
| mq.record.builder | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder |
294+
| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | |
295+
| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes, JMSDestination |
296+
| mq.jms.properties.copy.to.kafka.headers | Whether to copy JMS message properties to Kafka headers | boolean | false | |
297+
| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite |
298+
| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern |
299+
| mq.ssl.keystore.location | The path to the JKS keystore to use for SSL (TLS) connections | string | JVM keystore | Local path to a JKS file |
300+
| mq.ssl.keystore.password | The password of the JKS keystore to use for SSL (TLS) connections | string | | |
301+
| mq.ssl.truststore.location | The path to the JKS truststore to use for SSL (TLS) connections | string | JVM truststore | Local path to a JKS file |
302+
| mq.ssl.truststore.password | The password of the JKS truststore to use for SSL (TLS) connections | string | | |
303+
| mq.ssl.use.ibm.cipher.mappings | Whether to set system property to control use of IBM cipher mappings | boolean | | |
304+
| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater |
305+
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |
306+
| mq.max.poll.blocked.time.ms | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. |
306307

307308
### Using a CCDT file
308309

pom.xml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<groupId>com.ibm.eventstreams.connect</groupId>
2121
<artifactId>kafka-connect-mq-source</artifactId>
2222
<packaging>jar</packaging>
23-
<version>2.0.0</version>
23+
<version>2.0.1</version>
2424
<name>kafka-connect-mq-source</name>
2525
<organization>
2626
<name>IBM Corporation</name>
@@ -50,13 +50,13 @@
5050
<dependency>
5151
<groupId>org.apache.kafka</groupId>
5252
<artifactId>connect-api</artifactId>
53-
<version>3.4.0</version>
53+
<version>3.6.1</version>
5454
<scope>provided</scope>
5555
</dependency>
5656
<dependency>
5757
<groupId>org.apache.kafka</groupId>
5858
<artifactId>connect-json</artifactId>
59-
<version>3.4.0</version>
59+
<version>3.6.1</version>
6060
<scope>provided</scope>
6161
</dependency>
6262

@@ -304,7 +304,6 @@
304304
<artifactId>maven-checkstyle-plugin</artifactId>
305305
<version>3.2.0</version>
306306
<configuration>
307-
<encoding>UTF-8</encoding>
308307
<consoleOutput>true</consoleOutput>
309308
<failsOnError>true</failsOnError>
310309
<violationSeverity>warning</violationSeverity>

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,12 @@ public class MQSourceConnector extends SourceConnector {
147147
public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic.";
148148
public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic";
149149

150-
public static String version = "2.0.0";
150+
public static final String CONFIG_MAX_POLL_BLOCKED_TIME_MS = "mq.max.poll.blocked.time.ms";
151+
public static final String CONFIG_DOCUMENTATION_MAX_POLL_BLOCKED_TIME_MS = "How long the SourceTask will wait for a "
152+
+ "previous batch of messages to be delivered to Kafka before starting a new poll.";
153+
public static final String CONFIG_DISPLAY_MAX_POLL_BLOCKED_TIME_MS = "Max poll blocked time ms";
154+
155+
public static String version = "2.0.1";
151156

152157
private Map<String, String> configProps;
153158

@@ -450,6 +455,19 @@ null, new ReadableFile(),
450455
CONFIG_GROUP_MQ, 23, Width.LONG,
451456
CONFIG_DISPLAY_MQ_EXACTLY_ONCE_STATE_QUEUE);
452457

458+
// How long the SourceTask will wait for a previous batch of messages to
459+
// be delivered to Kafka before starting a new poll.
460+
// It is important that this is less than the time defined for
461+
// task.shutdown.graceful.timeout.ms as that is how long Connect will
462+
// wait for the task to perform lifecycle operations.
463+
CONFIGDEF.define(CONFIG_MAX_POLL_BLOCKED_TIME_MS,
464+
Type.INT,
465+
2000, ConfigDef.Range.atLeast(0),
466+
Importance.MEDIUM,
467+
CONFIG_DOCUMENTATION_MAX_POLL_BLOCKED_TIME_MS,
468+
null, 24, Width.MEDIUM,
469+
CONFIG_DISPLAY_MAX_POLL_BLOCKED_TIME_MS);
470+
453471
CONFIGDEF.define(CONFIG_NAME_TOPIC,
454472
Type.STRING,
455473
// user must specify the topic name

0 commit comments

Comments
 (0)