Skip to content

Create new parameter "mq.encoding" to override the destination character set #43

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ COPY --chown=esuser:esgroup --from=builder /opt/kafka/libs/ /opt/kafka/libs/
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-distributed.properties /opt/kafka/config/
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-log4j.properties /opt/kafka/config/
RUN mkdir /opt/kafka/logs && chown esuser:esgroup /opt/kafka/logs
COPY --chown=esuser:esgroup target/kafka-connect-mq-sink-1.3.1-jar-with-dependencies.jar /opt/kafka/libs/
COPY --chown=esuser:esgroup target/kafka-connect-mq-sink-1.3.2-jar-with-dependencies.jar /opt/kafka/libs/

WORKDIR /opt/kafka

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<groupId>com.ibm.eventstreams.connect</groupId>
<artifactId>kafka-connect-mq-sink</artifactId>
<packaging>jar</packaging>
<version>1.3.1</version>
<version>1.3.2</version>
<name>kafka-connect-mq-sink</name>
<organization>
<name>IBM Corporation</name>
Expand Down
43 changes: 20 additions & 23 deletions src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@
*/
package com.ibm.eventstreams.connect.mqsink;

import com.ibm.eventstreams.connect.mqsink.builders.MessageBuilder;
import com.ibm.mq.MQException;
import com.ibm.mq.constants.MQConstants;
import com.ibm.mq.jms.*;
import com.ibm.eventstreams.connect.mqsink.builders.MessageBuilder;
import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.mq.jms.MQQueue;
import com.ibm.msg.client.wmq.WMQConstants;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could instead be:
import javax.jms.DeliveryMode; import javax.jms.JMSContext; import javax.jms.JMSException; import javax.jms.JMSProducer; import javax.jms.JMSRuntimeException; import javax.jms.Message;

Like it was before.

import javax.net.ssl.*;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could instead be:
import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory;

Like it was before.

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -31,25 +39,6 @@
import java.security.SecureRandom;
import java.util.Map;

import javax.jms.DeliveryMode;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.JMSProducer;
import javax.jms.JMSRuntimeException;
import javax.jms.Message;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Writes messages to MQ using JMS. Uses a transacted session, adding messages to the current
Expand All @@ -61,6 +50,7 @@ public class JMSWriter {
// Configs
private String userName;
private String password;
private String encoding;

// JMS factory and context
private MQConnectionFactory mqConnFactory;
Expand Down Expand Up @@ -99,6 +89,7 @@ public void configure(Map<String, String> props) {
String queueName = props.get(MQSinkConnector.CONFIG_NAME_MQ_QUEUE);
String userName = props.get(MQSinkConnector.CONFIG_NAME_MQ_USER_NAME);
String password = props.get(MQSinkConnector.CONFIG_NAME_MQ_PASSWORD);
String encoding = props.get(MQSinkConnector.CONFIG_NAME_MQ_ENCODING);
String ccdtUrl = props.get(MQSinkConnector.CONFIG_NAME_MQ_CCDT_URL);
String builderClass = props.get(MQSinkConnector.CONFIG_NAME_MQ_MESSAGE_BUILDER);
String mbj = props.get(MQSinkConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS);
Expand Down Expand Up @@ -174,7 +165,8 @@ else if (connectionMode.equals(MQSinkConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_B

this.userName = userName;
this.password = password;

this.encoding = encoding;

queue.setMessageBodyStyle(WMQConstants.WMQ_MESSAGE_BODY_MQ);
if (mbj != null) {
if (Boolean.parseBoolean(mbj)) {
Expand Down Expand Up @@ -252,10 +244,15 @@ public void send(SinkRecord r) throws ConnectException, RetriableException {

try {
Message m = builder.fromSinkRecord(jmsCtxt, r);

if (encoding != null) {
m.setStringProperty(WMQConstants.JMS_IBM_CHARACTER_SET, encoding);
}

inflight = true;
jmsProd.send(queue, m);
}
catch (JMSRuntimeException jmse) {
catch (JMSRuntimeException | JMSException jmse) {
log.error("JMS exception {}", jmse);
throw handleException(jmse);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ public class MQSinkConnector extends SinkConnector {
public static final String CONFIG_DOCUMENTATION_MQ_PASSWORD = "The password for authenticating with the queue manager.";
public static final String CONFIG_DISPLAY_MQ_PASSWORD = "Password";

public static final String CONFIG_NAME_MQ_ENCODING = "mq.encoding";
public static final String CONFIG_DOCUMENTATION_MQ_ENCODING = "The encoding can be replaced in queue manager.";
public static final String CONFIG_DISPLAY_MQ_ENCODING = "Encoding";

public static final String CONFIG_NAME_MQ_CCDT_URL = "mq.ccdt.url";
public static final String CONFIG_DOCUMENTATION_MQ_CCDT_URL = "The CCDT URL to use to establish a connection to the queue manager.";
public static final String CONFIG_DISPLAY_MQ_CCDT_URL = "CCDT URL";
Expand Down Expand Up @@ -143,7 +147,7 @@ public class MQSinkConnector extends SinkConnector {
public static final String CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP = "Whether to use MQ connection security parameters (MQCSP).";
public static final String CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP = "User authentication using MQCSP";

public static String VERSION = "1.3.1";
public static String VERSION = "1.3.2";

private Map<String, String> configProps;

Expand Down Expand Up @@ -328,6 +332,10 @@ public class MQSinkConnector extends SinkConnector {
CONFIG_DOCUMENTATION_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, CONFIG_GROUP_MQ, 26, Width.SHORT,
CONFIG_DISPLAY_MQ_SSL_USE_IBM_CIPHER_MAPPINGS);

config.define(CONFIG_NAME_MQ_ENCODING, Type.STRING, null, Importance.LOW,
CONFIG_DOCUMENTATION_MQ_ENCODING, CONFIG_GROUP_MQ, 27, Width.SHORT,
CONFIG_DISPLAY_MQ_ENCODING);

return config;
}
}
}