diff --git a/Dockerfile b/Dockerfile
index 27c7ecd..dc6336f 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -10,7 +10,7 @@ COPY --chown=esuser:esgroup --from=builder /opt/kafka/libs/ /opt/kafka/libs/
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-distributed.properties /opt/kafka/config/
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-log4j.properties /opt/kafka/config/
RUN mkdir /opt/kafka/logs && chown esuser:esgroup /opt/kafka/logs
-COPY --chown=esuser:esgroup target/kafka-connect-mq-sink-1.3.1-jar-with-dependencies.jar /opt/kafka/libs/
+COPY --chown=esuser:esgroup target/kafka-connect-mq-sink-1.3.2-jar-with-dependencies.jar /opt/kafka/libs/
WORKDIR /opt/kafka
diff --git a/pom.xml b/pom.xml
index 51eb56a..7ec6132 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,7 @@
com.ibm.eventstreams.connect
kafka-connect-mq-sink
jar
- 1.3.1
+ 1.3.2
kafka-connect-mq-sink
IBM Corporation
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWriter.java b/src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWriter.java
index a6f20ea..8279b0c 100644
--- a/src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWriter.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWriter.java
@@ -15,12 +15,20 @@
*/
package com.ibm.eventstreams.connect.mqsink;
+import com.ibm.eventstreams.connect.mqsink.builders.MessageBuilder;
import com.ibm.mq.MQException;
import com.ibm.mq.constants.MQConstants;
-import com.ibm.mq.jms.*;
-import com.ibm.eventstreams.connect.mqsink.builders.MessageBuilder;
+import com.ibm.mq.jms.MQConnectionFactory;
+import com.ibm.mq.jms.MQQueue;
import com.ibm.msg.client.wmq.WMQConstants;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.jms.*;
+import javax.net.ssl.*;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -31,25 +39,6 @@
import java.security.SecureRandom;
import java.util.Map;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSContext;
-import javax.jms.JMSException;
-import javax.jms.JMSProducer;
-import javax.jms.JMSRuntimeException;
-import javax.jms.Message;
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-
-import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.errors.RetriableException;
-import org.apache.kafka.connect.sink.SinkRecord;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* Writes messages to MQ using JMS. Uses a transacted session, adding messages to the current
@@ -61,6 +50,7 @@ public class JMSWriter {
// Configs
private String userName;
private String password;
+ private String encoding;
// JMS factory and context
private MQConnectionFactory mqConnFactory;
@@ -99,6 +89,7 @@ public void configure(Map props) {
String queueName = props.get(MQSinkConnector.CONFIG_NAME_MQ_QUEUE);
String userName = props.get(MQSinkConnector.CONFIG_NAME_MQ_USER_NAME);
String password = props.get(MQSinkConnector.CONFIG_NAME_MQ_PASSWORD);
+ String encoding = props.get(MQSinkConnector.CONFIG_NAME_MQ_ENCODING);
String ccdtUrl = props.get(MQSinkConnector.CONFIG_NAME_MQ_CCDT_URL);
String builderClass = props.get(MQSinkConnector.CONFIG_NAME_MQ_MESSAGE_BUILDER);
String mbj = props.get(MQSinkConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS);
@@ -174,7 +165,8 @@ else if (connectionMode.equals(MQSinkConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_B
this.userName = userName;
this.password = password;
-
+ this.encoding = encoding;
+
queue.setMessageBodyStyle(WMQConstants.WMQ_MESSAGE_BODY_MQ);
if (mbj != null) {
if (Boolean.parseBoolean(mbj)) {
@@ -252,10 +244,15 @@ public void send(SinkRecord r) throws ConnectException, RetriableException {
try {
Message m = builder.fromSinkRecord(jmsCtxt, r);
+
+ if (encoding != null) {
+ m.setStringProperty(WMQConstants.JMS_IBM_CHARACTER_SET, encoding);
+ }
+
inflight = true;
jmsProd.send(queue, m);
}
- catch (JMSRuntimeException jmse) {
+ catch (JMSRuntimeException | JMSException jmse) {
log.error("JMS exception {}", jmse);
throw handleException(jmse);
}
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConnector.java
index a5d1427..b284975 100644
--- a/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConnector.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConnector.java
@@ -66,6 +66,10 @@ public class MQSinkConnector extends SinkConnector {
public static final String CONFIG_DOCUMENTATION_MQ_PASSWORD = "The password for authenticating with the queue manager.";
public static final String CONFIG_DISPLAY_MQ_PASSWORD = "Password";
+ public static final String CONFIG_NAME_MQ_ENCODING = "mq.encoding";
+ public static final String CONFIG_DOCUMENTATION_MQ_ENCODING = "The encoding can be replaced in queue manager.";
+ public static final String CONFIG_DISPLAY_MQ_ENCODING = "Encoding";
+
public static final String CONFIG_NAME_MQ_CCDT_URL = "mq.ccdt.url";
public static final String CONFIG_DOCUMENTATION_MQ_CCDT_URL = "The CCDT URL to use to establish a connection to the queue manager.";
public static final String CONFIG_DISPLAY_MQ_CCDT_URL = "CCDT URL";
@@ -143,7 +147,7 @@ public class MQSinkConnector extends SinkConnector {
public static final String CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP = "Whether to use MQ connection security parameters (MQCSP).";
public static final String CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP = "User authentication using MQCSP";
- public static String VERSION = "1.3.1";
+ public static String VERSION = "1.3.2";
private Map configProps;
@@ -328,6 +332,10 @@ public class MQSinkConnector extends SinkConnector {
CONFIG_DOCUMENTATION_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, CONFIG_GROUP_MQ, 26, Width.SHORT,
CONFIG_DISPLAY_MQ_SSL_USE_IBM_CIPHER_MAPPINGS);
+ config.define(CONFIG_NAME_MQ_ENCODING, Type.STRING, null, Importance.LOW,
+ CONFIG_DOCUMENTATION_MQ_ENCODING, CONFIG_GROUP_MQ, 27, Width.SHORT,
+ CONFIG_DISPLAY_MQ_ENCODING);
+
return config;
}
-}
\ No newline at end of file
+}