Skip to content

Commit 5e38f34

Browse files
Added support for JSON
1 parent e443557 commit 5e38f34

File tree

9 files changed

+338
-186
lines changed

9 files changed

+338
-186
lines changed

README.md

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ Build the connector using Maven:
3434
mvn clean package
3535
```
3636

37-
Once built, the output is a single JAR called `target/kafka-connect-mq-source-0.1-SNAPSHOT-jar-with-dependencies.jar` which contains all of the required dependencies.
37+
Once built, the output is a single JAR called `target/kafka-connect-mq-source-0.2-SNAPSHOT-jar-with-dependencies.jar` which contains all of the required dependencies.
3838

3939

4040
## Running the connector
@@ -60,51 +60,65 @@ bin/connect-standalone.sh connect-standalone.properties mq-source.properties
6060
## Data formats
6161
Kafka Connect is very flexible but it's important to understand the way that it processes messages to end up with a reliable system. When the connector encounters a message that it cannot process, it stops rather than throwing the message away. Therefore, you need to make sure that the configuration you use can handle the messages the connector will process.
6262

63+
This is rather complicated and it's likely that a future update of the connector will simplify matters.
64+
6365
Each message in Kafka Connect is associated with a representation of the message format known as a *schema*. Each Kafka message actually has two parts, key and value, and each part has its own schema. The MQ source connector does not currently use message keys, but some of the configuration options use the word *Value* because they refer to the Kafka message value.
6466

65-
When the MQ source connector reads a message from MQ, it chooses a schema to represent the message format and creates a Java object containing the message value. Each message is then processed using a *converter* which creates the message that's published on a Kafka topic. You need to choose a converter appropriate to the format of messages that will pass through the connector.
67+
When the MQ source connector reads a message from MQ, it chooses a schema to represent the message format and creates an internal object called a *record* containing the message value. This conversion is performed using a *record builder*. Each record is then processed using a *converter* which creates the message that's published on a Kafka topic.
68+
69+
There are two record builders supplied with the connector, although you can write your own. The basic rule is that if you just want the message to be passed along to Kafka unchanged, the default record builder is probably the best choice. If the incoming data is in JSON format and you want to use a schema based on its structure, use the JSON record builder.
70+
71+
There are three converters build into Apache Kafka and another which is part of the Confluent Platform. You need to make sure that the incoming message format, the setting of the *mq.message.body.jms* configuration, the record builder and converter are all compatible. By default, everything is just treated as bytes but if you want the connector to understand the message format and apply more sophisticated processing such as single-message transforms, you'll need a more complex configuration. The following table shows the basic options that work.
72+
73+
| Record builder class | Incoming MQ message | mq.message.body.jms | Converter class | Outgoing Kafka message |
74+
| ----------------------------------------------------- | ---------------------- | ------------------- | ------------------------------------------------------ | ----------------------- |
75+
| com.ibm.mq.kafkaconnect.builders.DefaultRecordBuilder | Any | false (default) | org.apache.kafka.connect.converters.ByteArrayConverter | **Binary data** |
76+
| com.ibm.mq.kafkaconnect.builders.DefaultRecordBuilder | JMS BytesMessage | true | org.apache.kafka.connect.converters.ByteArrayConverter | **Binary data** |
77+
| com.ibm.mq.kafkaconnect.builders.DefaultRecordBuilder | JMS TextMessage | true | org.apache.kafka.connect.storage.StringConverter | **String data** |
78+
| com.ibm.mq.kafkaconnect.builders.JsonRecordBuilder | JSON, may have schema | Not used | org.apache.kafka.connect.json.JsonConverter | **JSON, no schema** |
79+
| com.ibm.mq.kafkaconnect.builders.JsonRecordBuilder | JSON, may have schema | Not used | io.confluent.connect.avro.AvroConverter | **Binary-encoded Avro** |
6680

6781
There's no single configuration that will always be right, but here are some high-level suggestions.
6882

69-
* Pass unchanged binary data as the Kafka message value
83+
* Pass unchanged binary (or string) data as the Kafka message value
7084
```
7185
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
7286
```
87+
* Message format is MQSTR, pass string data as the Kafka message value
88+
```
89+
mq.message.body.jms=true
90+
value.converter=org.apache.kafka.connect.converters.StringConverter
91+
```
7392
* Messages are JMS BytesMessage, pass byte array as the Kafka message value
7493
```
7594
mq.message.body.jms=true
7695
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
7796
```
78-
* Messages are JMS TextMessage, pass string as the Kafka message value
97+
* Messages are JMS TextMessage, pass string data as the Kafka message value
7998
```
8099
mq.message.body.jms=true
81100
value.converter=org.apache.kafka.connect.storage.StringConverter
82101
```
83102

84-
### The gory detail
85-
The MQ source connector has a configuration option *mq.message.body.jms* that controls whether it interprets the MQ messages as JMS messages or regular MQ messages. By default, *mq.message.body.jms=false* which gives the following behaviour.
86-
87-
| Incoming message format | Value schema | Value class |
88-
| ----------------------- | -------------- | ----------- |
89-
| Any | OPTIONAL_BYTES | byte[] |
90103

91-
This means that all messages are treated as arrays of bytes, and the converter must be able to handle arrays of bytes.
92-
93-
When you set *mq.message.body.jms=true*, the MQ messages are interpreted as JMS messages. This is appropriate if the applications sending the messages are themselves using JMS. This gives the following behaviour.
104+
### The gory detail
105+
The messages received from MQ are processed by a record builder which builds a Kafka Connect record to represent the message. There are two record builders supplied with the MQ source connector. The connector has a configuration option *mq.message.body.jms* that controls whether it interprets the MQ messages as JMS messages or regular MQ messages.
94106

95-
| Incoming message format | Value schema | Value class |
96-
| ----------------------- | ------------ | ---------------- |
97-
| JMS BytesMessage | null | byte[] |
98-
| JMS TextMessage | null | java.lang.String |
99-
| Anything else | *EXCEPTION* | *EXCEPTION* |
107+
| Record builder class | mq.message.body.jms | Incoming message body | Value schema | Value class |
108+
| ----------------------------------------------------- | ------------------- | --------------------- | ------------------ | ------------------ |
109+
| com.ibm.mq.kafkaconnect.builders.DefaultRecordBuilder | false (default) | Any | OPTIONAL_BYTES | byte[] |
110+
| com.ibm.mq.kafkaconnect.builders.DefaultRecordBuilder | true | JMS BytesMessage | null | byte[] |
111+
| com.ibm.mq.kafkaconnect.builders.DefaultRecordBuilder | true | JMS TextMessage | null | String |
112+
| com.ibm.mq.kafkaconnect.builders.DefaultRecordBuilder | true | Everything else | *EXCEPTION* | *EXCEPTION* |
113+
| com.ibm.mq.kafkaconnect.builders.JsonRecordBuilder | Not used | JSON | Depends on message | Depends on message |
100114

101-
There are three basic converters built into Apache Kafka, with the likely useful combinations in **bold**.
115+
You must then choose a converter than can handle the value schema and class. There are three basic converters built into Apache Kafka, with the likely useful combinations in **bold**.
102116

103-
| Converter class | byte[] | java.lang.String |
104-
| ------------------------------------------------------ | ------------------- | ---------------- |
105-
| org.apache.kafka.connect.converters.ByteArrayConverter | **Binary data** | *EXCEPTION* |
106-
| org.apache.kafka.connect.storage.StringConverter | Works, not useful | **String data** |
107-
| org.apache.kafka.connect.json.JsonConverter | Base-64 JSON String | JSON String |
117+
| Converter class | Output for byte[] | Output for String | Output for compound schema |
118+
| ------------------------------------------------------ | ------------------- | ----------------- | -------------------------- |
119+
| org.apache.kafka.connect.converters.ByteArrayConverter | **Binary data** | *EXCEPTION* | *EXCEPTION* |
120+
| org.apache.kafka.connect.storage.StringConverter | Works, not useful | **String data** | Works, not useful |
121+
| org.apache.kafka.connect.json.JsonConverter | Base-64 JSON String | JSON String | **JSON data** |
108122

109123
In addition, there is another converter for the Avro format that is part of the Confluent Platform. This has not been tested with the MQ source connector at this time.
110124

@@ -135,6 +149,7 @@ The configuration options for the MQ Source Connector are as follows:
135149
| mq.queue | The name of the source MQ queue | string | | MQ queue name |
136150
| mq.user.name | The user name for authenticating with the queue manager | string | | User name |
137151
| mq.password | The password for authenticating with the queue manager | string | | Password |
152+
| mq.record.builder | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder |
138153
| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | |
139154
| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite |
140155
| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern |
@@ -143,10 +158,10 @@ The configuration options for the MQ Source Connector are as follows:
143158

144159
## Future enhancements
145160
The connector is intentionally basic. The idea is to enhance it over time with additional features to make it more capable. Some possible future enhancements are:
161+
* Simplification of handling message formats
146162
* Message key support
147163
* Configurable schema for MQ messages
148164
* JMX metrics
149-
* JSON parsing so that the JSON type information is supplied to the converter
150165
* Testing with the Confluent Platform Avro converter and Schema Registry
151166
* Separate TLS configuration for the connector so that keystore location and so on can be specified as configurations
152167

config/mq-source.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ mq.queue=
3636
# The password for authenticating with the queue manager - optional
3737
# mq.password=
3838

39+
# The record builders control conversion of data between the messages in MQ and the internal Kafka Connect representation - required
40+
mq.record.builder=com.ibm.mq.kafkaconnect.builders.DefaultRecordBuilder
41+
# mq.record.builder=com.ibm.mq.kafkaconnect.builders.JsonRecordBuilder
42+
3943
# Whether to interpret the message body as a JMS message type (default false) - optional
4044
# mq.message.body.jms=
4145

pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<groupId>com.ibm.mq.kafkaconnect</groupId>
55
<artifactId>kafka-connect-mq-source</artifactId>
66
<packaging>jar</packaging>
7-
<version>0.1-SNAPSHOT</version>
7+
<version>0.2-SNAPSHOT</version>
88
<name>kafka-connect-mq-source</name>
99
<organization>
1010
<name>IBM Corporation</name>
@@ -28,6 +28,12 @@
2828
<version>0.11.0.0</version>
2929
<scope>provided</scope>
3030
</dependency>
31+
<dependency>
32+
<groupId>org.apache.kafka</groupId>
33+
<artifactId>connect-json</artifactId>
34+
<version>0.11.0.0</version>
35+
<scope>provided</scope>
36+
</dependency>
3137
<dependency>
3238
<groupId>junit</groupId>
3339
<artifactId>junit</artifactId>

0 commit comments

Comments
 (0)