diff --git a/Dockerfile b/Dockerfile index 27c7ecd..dc6336f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,7 +10,7 @@ COPY --chown=esuser:esgroup --from=builder /opt/kafka/libs/ /opt/kafka/libs/ COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-distributed.properties /opt/kafka/config/ COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-log4j.properties /opt/kafka/config/ RUN mkdir /opt/kafka/logs && chown esuser:esgroup /opt/kafka/logs -COPY --chown=esuser:esgroup target/kafka-connect-mq-sink-1.3.1-jar-with-dependencies.jar /opt/kafka/libs/ +COPY --chown=esuser:esgroup target/kafka-connect-mq-sink-1.3.2-jar-with-dependencies.jar /opt/kafka/libs/ WORKDIR /opt/kafka diff --git a/pom.xml b/pom.xml index 51eb56a..7ec6132 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ com.ibm.eventstreams.connect kafka-connect-mq-sink jar - 1.3.1 + 1.3.2 kafka-connect-mq-sink IBM Corporation diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWriter.java b/src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWriter.java index a6f20ea..8279b0c 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWriter.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWriter.java @@ -15,12 +15,20 @@ */ package com.ibm.eventstreams.connect.mqsink; +import com.ibm.eventstreams.connect.mqsink.builders.MessageBuilder; import com.ibm.mq.MQException; import com.ibm.mq.constants.MQConstants; -import com.ibm.mq.jms.*; -import com.ibm.eventstreams.connect.mqsink.builders.MessageBuilder; +import com.ibm.mq.jms.MQConnectionFactory; +import com.ibm.mq.jms.MQQueue; import com.ibm.msg.client.wmq.WMQConstants; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import javax.jms.*; +import javax.net.ssl.*; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -31,25 +39,6 @@ import java.security.SecureRandom; import java.util.Map; -import javax.jms.DeliveryMode; -import javax.jms.JMSContext; -import javax.jms.JMSException; -import javax.jms.JMSProducer; -import javax.jms.JMSRuntimeException; -import javax.jms.Message; -import javax.net.ssl.KeyManager; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; -import javax.net.ssl.TrustManagerFactory; - -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.errors.RetriableException; -import org.apache.kafka.connect.sink.SinkRecord; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * Writes messages to MQ using JMS. Uses a transacted session, adding messages to the current @@ -61,6 +50,7 @@ public class JMSWriter { // Configs private String userName; private String password; + private String encoding; // JMS factory and context private MQConnectionFactory mqConnFactory; @@ -99,6 +89,7 @@ public void configure(Map props) { String queueName = props.get(MQSinkConnector.CONFIG_NAME_MQ_QUEUE); String userName = props.get(MQSinkConnector.CONFIG_NAME_MQ_USER_NAME); String password = props.get(MQSinkConnector.CONFIG_NAME_MQ_PASSWORD); + String encoding = props.get(MQSinkConnector.CONFIG_NAME_MQ_ENCODING); String ccdtUrl = props.get(MQSinkConnector.CONFIG_NAME_MQ_CCDT_URL); String builderClass = props.get(MQSinkConnector.CONFIG_NAME_MQ_MESSAGE_BUILDER); String mbj = props.get(MQSinkConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS); @@ -174,7 +165,8 @@ else if (connectionMode.equals(MQSinkConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_B this.userName = userName; this.password = password; - + this.encoding = encoding; + queue.setMessageBodyStyle(WMQConstants.WMQ_MESSAGE_BODY_MQ); if (mbj != null) { if (Boolean.parseBoolean(mbj)) { @@ -252,10 +244,15 @@ public void send(SinkRecord r) throws ConnectException, RetriableException { try { Message m = builder.fromSinkRecord(jmsCtxt, r); + + if (encoding != null) { + m.setStringProperty(WMQConstants.JMS_IBM_CHARACTER_SET, encoding); + } + inflight = true; jmsProd.send(queue, m); } - catch (JMSRuntimeException jmse) { + catch (JMSRuntimeException | JMSException jmse) { log.error("JMS exception {}", jmse); throw handleException(jmse); } diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConnector.java index a5d1427..b284975 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConnector.java @@ -66,6 +66,10 @@ public class MQSinkConnector extends SinkConnector { public static final String CONFIG_DOCUMENTATION_MQ_PASSWORD = "The password for authenticating with the queue manager."; public static final String CONFIG_DISPLAY_MQ_PASSWORD = "Password"; + public static final String CONFIG_NAME_MQ_ENCODING = "mq.encoding"; + public static final String CONFIG_DOCUMENTATION_MQ_ENCODING = "The encoding can be replaced in queue manager."; + public static final String CONFIG_DISPLAY_MQ_ENCODING = "Encoding"; + public static final String CONFIG_NAME_MQ_CCDT_URL = "mq.ccdt.url"; public static final String CONFIG_DOCUMENTATION_MQ_CCDT_URL = "The CCDT URL to use to establish a connection to the queue manager."; public static final String CONFIG_DISPLAY_MQ_CCDT_URL = "CCDT URL"; @@ -143,7 +147,7 @@ public class MQSinkConnector extends SinkConnector { public static final String CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP = "Whether to use MQ connection security parameters (MQCSP)."; public static final String CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP = "User authentication using MQCSP"; - public static String VERSION = "1.3.1"; + public static String VERSION = "1.3.2"; private Map configProps; @@ -328,6 +332,10 @@ public class MQSinkConnector extends SinkConnector { CONFIG_DOCUMENTATION_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, CONFIG_GROUP_MQ, 26, Width.SHORT, CONFIG_DISPLAY_MQ_SSL_USE_IBM_CIPHER_MAPPINGS); + config.define(CONFIG_NAME_MQ_ENCODING, Type.STRING, null, Importance.LOW, + CONFIG_DOCUMENTATION_MQ_ENCODING, CONFIG_GROUP_MQ, 27, Width.SHORT, + CONFIG_DISPLAY_MQ_ENCODING); + return config; } -} \ No newline at end of file +}