Skip to content

Commit af9111d

Browse files
authored
feat: configurable retry interval after retriable exceptions (#54)
If the connector throws a retriable exception, there is no need for Connect's Kafka consumer to wait for as long to re-consume messages from the Kafka topic. It's therefore helpful to be able to specify that the consumer should use a shorter timeout for polls immediately following a retriable exception. This commit introduces a new config parameter mq.retry.backoff.ms which allows this to be controlled. By default, it is set to 1 minute, which is how long it currently waits. Signed-off-by: Dale Lane <dale.lane@uk.ibm.com>
1 parent ea8a06c commit af9111d

File tree

5 files changed

+37
-10
lines changed

5 files changed

+37
-10
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ COPY --chown=esuser:esgroup --from=builder /opt/kafka/libs/ /opt/kafka/libs/
1010
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-distributed.properties /opt/kafka/config/
1111
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-log4j.properties /opt/kafka/config/
1212
RUN mkdir /opt/kafka/logs && chown esuser:esgroup /opt/kafka/logs
13-
COPY --chown=esuser:esgroup target/kafka-connect-mq-sink-1.4.0-jar-with-dependencies.jar /opt/kafka/libs/
13+
COPY --chown=esuser:esgroup target/kafka-connect-mq-sink-1.5.0-jar-with-dependencies.jar /opt/kafka/libs/
1414

1515
WORKDIR /opt/kafka
1616

README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,13 @@ curl -X POST -H "Content-Type: application/json" http://localhost:8083/connector
9898
This repository includes an example Dockerfile to run Kafka Connect in distributed mode. It also adds in the MQ sink connector as an available connector plugin. It uses the default `connect-distributed.properties` and `connect-log4j.properties` files.
9999

100100
1. `mvn clean package`
101-
1. `docker build -t kafkaconnect-with-mq-sink:1.4.0 .`
102-
1. `docker run -p 8083:8083 kafkaconnect-with-mq-sink:1.4.0`
101+
1. `docker build -t kafkaconnect-with-mq-sink:1.5.0 .`
102+
1. `docker run -p 8083:8083 kafkaconnect-with-mq-sink:1.5.0`
103103

104104
**NOTE:** To provide custom properties files create a folder called `config` containing the `connect-distributed.properties` and `connect-log4j.properties` files and use a Docker volume to make them available when running the container like this:
105105

106106
``` shell
107-
docker run -v $(pwd)/config:/opt/kafka/config -p 8083:8083 kafkaconnect-with-mq-sink:1.4.0
107+
docker run -v $(pwd)/config:/opt/kafka/config -p 8083:8083 kafkaconnect-with-mq-sink:1.5.0
108108
```
109109

110110
To start the MQ connector, you can use `config/mq-sink.json` in this repository after replacing all placeholders and use a command like this:
@@ -312,6 +312,7 @@ The configuration options for the Kafka Connect sink connector for IBM MQ are as
312312
| mq.message.builder.partition.property | The JMS message property to set from the Kafka partition | string | | Blank or valid JMS property name |
313313
| mq.message.builder.offset.property | The JMS message property to set from the Kafka offset | string | | Blank or valid JMS property name |
314314
| mq.reply.queue | The name of the reply-to queue | string | | MQ queue name or queue URI |
315+
| mq.retry.backoff.ms | Wait time, in milliseconds, before retrying after retriable exceptions | long | 60000 | [0,...] |
315316

316317

317318
### Using a CCDT file

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<groupId>com.ibm.eventstreams.connect</groupId>
2121
<artifactId>kafka-connect-mq-sink</artifactId>
2222
<packaging>jar</packaging>
23-
<version>1.4.0</version>
23+
<version>1.5.0</version>
2424
<name>kafka-connect-mq-sink</name>
2525
<organization>
2626
<name>IBM Corporation</name>

src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConnector.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,12 @@ public class MQSinkConnector extends SinkConnector {
147147
public static final String CONFIG_DOCUMENTATION_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES = "Whether to copy Kafka headers to JMS message properties.";
148148
public static final String CONFIG_DISPLAY_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES = "Copy Kafka headers to JMS message properties";
149149

150-
public static String VERSION = "1.4.0";
150+
public static final String CONFIG_NAME_MQ_RETRY_BACKOFF_MS = "mq.retry.backoff.ms";
151+
public static final String CONFIG_DOCUMENTATION_MQ_RETRY_BACKOFF_MS = "Time to wait, in milliseconds, before retrying after retriable exceptions";
152+
public static final String CONFIG_DISPLAY_MQ_RETRY_BACKOFF_MS = "Retry backoff (ms)";
153+
154+
155+
public static String VERSION = "1.5.0";
151156

152157
private Map<String, String> configProps;
153158

@@ -336,6 +341,10 @@ public class MQSinkConnector extends SinkConnector {
336341
CONFIG_DOCUMENTATION_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES, CONFIG_GROUP_MQ, 27, Width.SHORT,
337342
CONFIG_DISPLAY_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES);
338343

344+
config.define(CONFIG_NAME_MQ_RETRY_BACKOFF_MS, Type.LONG, 60000, Range.between(0L, 99999999900L), Importance.LOW,
345+
CONFIG_DOCUMENTATION_MQ_RETRY_BACKOFF_MS, CONFIG_GROUP_MQ, 28, Width.SHORT,
346+
CONFIG_DISPLAY_MQ_RETRY_BACKOFF_MS);
347+
339348
return config;
340349
}
341350
}

src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkTask.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2323
import org.apache.kafka.common.TopicPartition;
24+
import org.apache.kafka.connect.errors.RetriableException;
2425
import org.apache.kafka.connect.sink.SinkRecord;
2526
import org.apache.kafka.connect.sink.SinkTask;
2627

@@ -32,6 +33,8 @@ public class MQSinkTask extends SinkTask {
3233

3334
private JMSWriter writer;
3435

36+
private long retryBackoffMs = 60000;
37+
3538
public MQSinkTask() {
3639
}
3740

@@ -61,6 +64,13 @@ public MQSinkTask() {
6164
log.debug("Task props entry {} : {}", entry.getKey(), value);
6265
}
6366

67+
// check if a custom retry time is provided
68+
String retryBackoffMsStr = props.get(MQSinkConnector.CONFIG_NAME_MQ_RETRY_BACKOFF_MS);
69+
if (retryBackoffMsStr != null) {
70+
retryBackoffMs = Long.parseLong(retryBackoffMsStr);
71+
}
72+
log.debug("Setting retry backoff {}", retryBackoffMs);
73+
6474
// Construct a writer to interface with MQ
6575
writer = new JMSWriter();
6676
writer.configure(props);
@@ -85,12 +95,19 @@ public MQSinkTask() {
8595
@Override public void put(Collection<SinkRecord> records) {
8696
log.trace("[{}] Entry {}.put, records.size={}", Thread.currentThread().getId(), this.getClass().getName(), records.size());
8797

88-
for (SinkRecord r: records) {
89-
log.debug("Putting record for topic {}, partition {} and offset {}", r.topic(), r.kafkaPartition(), r.kafkaOffset());
90-
writer.send(r);
98+
try {
99+
for (SinkRecord r: records) {
100+
log.debug("Putting record for topic {}, partition {} and offset {}", r.topic(), r.kafkaPartition(), r.kafkaOffset());
101+
writer.send(r);
102+
}
103+
104+
writer.commit();
105+
}
106+
catch (RetriableException rte) {
107+
context.timeout(retryBackoffMs);
108+
throw rte;
91109
}
92110

93-
writer.commit();
94111
log.trace("[{}] Exit {}.put", Thread.currentThread().getId(), this.getClass().getName());
95112
}
96113

0 commit comments

Comments
 (0)