You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This repository includes a Dockerfile to run Kafka Connect in distributed mode. It also adds in the MQ Source Connector as an available connector plugin. It uses the default connect-distributed.properties and connect-log4j.properties files.
1.`docker run -p 8083:8083 kafkaconnect-with-mq-source:0.0.1`
79
+
80
+
**NOTE:** To provide custom properties files create a folder called `config` containing the `connect-distributed.properties` and `connect-log4j.properties` files and use a Docker volume to make them available when running the container:
81
+
`docker run -v $(pwd)/config:/opt/kafka/config -p 8083:8083 kafkaconnect:0.0.1`
82
+
83
+
## Deploying to Kubernetes
84
+
85
+
This repository includes a Kubernetes yaml file called `kafka-connect.yaml`. This will create a deployment to run Kafka Connect in distributed mode and a service to access the deployment.
86
+
87
+
The deployment assumes the existence of a Secret called `connect-distributed-config` and a ConfigMap called `connect-log4j-config`. These can be created using the default files in your Kafka install, however it is easier to edit them later if comments and whitespaces are trimmed before creation.
88
+
89
+
### Creating Kafka Connect configuration Secret and ConfigMap
### Creating Kafka Connect deployment and service in Kubernetes
102
+
103
+
**NOTE:** Remember to [build the Docker image](#running-with-docker) and push it to your Kubernetes image repository. You might need to update the image name in the `kafka-connect.yaml` file.
1.`curl <serviceIP>:<servicePort>/connector-plugins` to see the MQ Source connector available to use
108
+
70
109
## Data formats
71
110
Kafka Connect is very flexible but it's important to understand the way that it processes messages to end up with a reliable system. When the connector encounters a message that it cannot process, it stops rather than throwing the message away. Therefore, you need to make sure that the configuration you use can handle the messages the connector will process.
72
111
@@ -78,7 +117,7 @@ When the MQ source connector reads a message from MQ, it chooses a schema to rep
78
117
79
118
There are two record builders supplied with the connector, although you can write your own. The basic rule is that if you just want the message to be passed along to Kafka unchanged, the default record builder is probably the best choice. If the incoming data is in JSON format and you want to use a schema based on its structure, use the JSON record builder.
80
119
81
-
There are three converters built into Apache Kafka. You need to make sure that the incoming message format, the setting of the *mq.message.body.jms* configuration, the record builder and converter are all compatible. By default, everything is just treated as bytes but if you want the connector to understand the message format and apply more sophisticated processing such as single-message transforms, you'll need a more complex configuration. The following table shows the basic options that work.
120
+
There are three converters built into Apache Kafka. You need to make sure that the incoming message format, the setting of the `mq.message.body.jms` configuration, the record builder and converter are all compatible. By default, everything is just treated as bytes but if you want the connector to understand the message format and apply more sophisticated processing such as single-message transforms, you'll need a more complex configuration. The following table shows the basic options that work.
82
121
83
122
| Record builder class | Incoming MQ message | mq.message.body.jms | Converter class | Outgoing Kafka message |
By default, the connector does not use keys for the Kafka messages it publishes. It can be configured to use the JMS message headers to set the key of the Kafka records. You could use this, for example, to use the MQMD correlation identifier as the partitioning key when the messages are published to Kafka. There are three valid values for the `mq.record.builder.key.header` that controls this behavior.
171
+
By default, the connector does not use keys for the Kafka messages it publishes. It can be configured to use the JMS message headers to set the key of the Kafka records. You could use this, for example, to use the MQMD correlation identifier as the partitioning key when the messages are published to Kafka. There are four valid values for the `mq.record.builder.key.header` that controls this behavior.
133
172
134
173
| mq.record.builder.key.header | Key schema | Key class | Recommended value for key.converter |
In MQ, the message ID and correlation ID are both 24-byte arrays. As strings, the connector represents them using a sequence of 48 hexadecimal characters.
141
181
182
+
### Accessing MQMD fields
183
+
If you write your own RecordBuilder, you can access the MQMD fields of the MQ messages as JMS message properties. By default, only a subset of the MQMD fields are available, but you can get access to all of them by setting the configuration `mq.message.mqmd.read`. For more information, see [JMS message object properties](https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.1.0/com.ibm.mq.dev.doc/q032350_.htm) in the MQ documentation.
184
+
142
185
143
186
## Security
144
187
The connector supports authentication with user name and password and also connections secured with TLS using a server-side certificate and mutual authentication with client-side certificates.
@@ -170,10 +213,11 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
170
213
| mq.ccdt.url | The URL for the CCDT file containing MQ connection details | string || URL for obtaining a CCDT file |
171
214
| mq.record.builder | The class used to build the Kafka Connect record | string || Class implementing RecordBuilder |
172
215
| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false ||
173
-
| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string || JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes |
216
+
| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string || JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes, JMSDestination|
174
217
| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string || Blank or valid cipher suite |
175
218
| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string || Blank or DN pattern |
176
219
| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater |
220
+
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false ||
177
221
| topic | The name of the target Kafka topic | string || Topic name |
178
222
179
223
### Using a CCDT file
@@ -204,6 +248,10 @@ Update the connector configuration file to reference `secret-key` in the file:
To use a file for the `mq.password` in Kubernetes, you create a Secret using the file as described in [the Kubernetes docs](https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-files-from-a-pod).
0 commit comments