|
| 1 | +# Using IBM MQ with Kafka Connect |
| 2 | +Many organizations use both IBM MQ and Apache Kafka for their messaging needs. Although they're often used to solve different kinds of messaging problems, people often want to connect them together. When connecting Apache Kafka to other systems, the technology of choice is the Kafka Connect framework. |
| 3 | + |
| 4 | +A pair of basic connectors for IBM MQ are available as source code available on GitHub. The source connector (https://github.com/ibm-messaging/kafka-connect-mq-source) is used to take messages from an MQ queue and transfer them to a Kafka topic, while the sink connector (https://github.com/ibm-messaging/kafka-connect-mq-sink) goes the other way. The GitHub projects have instructions for building the connectors, but once you've successfully built the JAR files, what next? |
| 5 | + |
| 6 | +These instructions tell you how to set up MQ and Apache Kafka from scratch and use the connectors to transfer messages between them. The instructions are for MQ v9 running on Linux, so if you're using a different version or platform, you might have to adjust them slightly. The instructions also expect Apache Kafka 0.10.2.0 or later. |
| 7 | + |
| 8 | + |
| 9 | +## Kafka Connect concepts |
| 10 | +The best place to read about Kafka Connect is of course the Apache Kafka [documentation](https://kafka.apache.org/documentation/). |
| 11 | + |
| 12 | +Kafka Connect connectors run inside a Java process called a *worker*. Kafka Connect can run in either standalone or distributed mode. Standalone mode is intended for testing and temporary connections between systems. Distributed mode is more appropriate for production use. These instructions focus on standalone mode because it's easier to see what's going on. |
| 13 | + |
| 14 | +When you run Kafka Connect with a standalone worker, there are two configuration files. The *worker configuration file* contains the properties needed to connect to Kafka. The *connector configuration file* contains the properties needed for the connector. So, configuration to connect to Kafka goes into the worker configuration file, while the MQ configuration goes into the connector configuration file. |
| 15 | + |
| 16 | +It's simplest to run just one connector in each standalone worker. Kafka Connect workers spew out a lot of messages and it's much simpler to read them if the messages from multiple connectors are not interleaved. |
| 17 | + |
| 18 | + |
| 19 | +## Setting it up from scratch |
| 20 | + |
| 21 | +### Create a queue manager |
| 22 | +These instructions set up a queue manager that uses the local operating system to authenticate the user ID and password, and the user ID is called `alice` and the password is `passw0rd`. |
| 23 | + |
| 24 | +It is assumed that you have installed MQ, you're logged in as a user authorized to administer MQ and the MQ commands are on the path. |
| 25 | + |
| 26 | +Create a queue manager with a TCP/IP listener (on port 1414 in this example): |
| 27 | +``` shell |
| 28 | +crtmqm -p 1414 MYQM |
| 29 | +``` |
| 30 | + |
| 31 | +Start the queue manager: |
| 32 | +``` shell |
| 33 | +strmqm MYQM |
| 34 | +``` |
| 35 | + |
| 36 | +Start the `runmqsc` tool to configure the queue manager: |
| 37 | +``` shell |
| 38 | +runmqsc MYQM |
| 39 | +``` |
| 40 | + |
| 41 | +In `runmqsc`, create a server-connection channel: |
| 42 | +``` |
| 43 | +DEFINE CHANNEL(MYSVRCONN) CHLTYPE(SVRCONN) |
| 44 | +``` |
| 45 | + |
| 46 | +Set the channel authentication rules to accept connections requiring userid and password: |
| 47 | +``` |
| 48 | +SET CHLAUTH(MYSVRCONN) TYPE(BLOCKUSER) USERLIST('nobody') |
| 49 | +SET CHLAUTH('*') TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(NOACCESS) |
| 50 | +SET CHLAUTH(MYSVRCONN) TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(CHANNEL) CHCKCLNT(REQUIRED) |
| 51 | +``` |
| 52 | + |
| 53 | +Set the identity of the client connections based on the supplied context, the user ID: |
| 54 | +``` |
| 55 | +ALTER AUTHINFO(SYSTEM.DEFAULT.AUTHINFO.IDPWOS) AUTHTYPE(IDPWOS) ADOPTCTX(YES) |
| 56 | +``` |
| 57 | + |
| 58 | +And refresh the connection authentication information: |
| 59 | +``` |
| 60 | +REFRESH SECURITY TYPE(CONNAUTH) |
| 61 | +``` |
| 62 | + |
| 63 | +Create an pair of queues for the Kafka Connect connectors to use: |
| 64 | +``` |
| 65 | +DEFINE QLOCAL(MYQSOURCE) |
| 66 | +DEFINE QLOCAL(MYQSINK) |
| 67 | +``` |
| 68 | + |
| 69 | +Authorize `alice` to connect to and inquire the queue manager: |
| 70 | +``` |
| 71 | +SET AUTHREC OBJTYPE(QMGR) PRINCIPAL('alice') AUTHADD(CONNECT,INQ) |
| 72 | +``` |
| 73 | + |
| 74 | +And finally authorize `alice` to use the queues: |
| 75 | +``` |
| 76 | +SET AUTHREC PROFILE(MYQSOURCE) OBJTYPE(QUEUE) PRINCIPAL('alice') AUTHADD(ALLMQI) |
| 77 | +SET AUTHREC PROFILE(MYQSINK) OBJTYPE(QUEUE) PRINCIPAL('alice') AUTHADD(ALLMQI) |
| 78 | +``` |
| 79 | + |
| 80 | +End `runmqsc`: |
| 81 | +``` |
| 82 | +END |
| 83 | +``` |
| 84 | + |
| 85 | +The queue manager is now ready to accept connection from Kafka Connect connectors. |
| 86 | + |
| 87 | + |
| 88 | +### Download and set up Apache Kafka |
| 89 | +If you do not already have Apache Kafka, you can download it from here: http://kafka.apache.org/downloads. Make sure you have the prerequisites installed, such as Java. |
| 90 | + |
| 91 | +Download the latest .tgz file (called something like `kafka_2.11-1.0.0.tgz`) and unpack it. The top-level directory of the unpacked .tgz file is referred to as the *Kafka root directory*. It contains several directories including `bin` for the Kafka executables and `config` for the configuration files. |
| 92 | + |
| 93 | +There are several components required to run a minimal Kafka cluster. It's easiest to run them each in a separate terminal window, starting in the Kafka root directory. |
| 94 | + |
| 95 | +First, start a ZooKeeper server: |
| 96 | +``` shell |
| 97 | +bin/zookeeper-server-start.sh config/zookeeper.properties |
| 98 | +``` |
| 99 | +Wait while it starts up and then prints a message like this: |
| 100 | +``` |
| 101 | +INFO binding to port 0.0.0.0/0.0.0.0:2181 |
| 102 | +``` |
| 103 | + |
| 104 | +In another terminal, start a Kafka server: |
| 105 | +``` shell |
| 106 | +bin/kafka-server-start.sh config/server.properties |
| 107 | +``` |
| 108 | +Wait while it starts up and then prints a message like this: |
| 109 | +``` |
| 110 | +INFO [KafkaServer id=0] started |
| 111 | +``` |
| 112 | + |
| 113 | +Now create a pair of topics `TSOURCE` and `TSINK`: |
| 114 | +``` shell |
| 115 | +bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic TSOURCE --partitions 1 --replication-factor 1 |
| 116 | +bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic TSINK --partitions 1 --replication-factor 1 |
| 117 | +``` |
| 118 | + |
| 119 | +You can see which topics exist like this: |
| 120 | +``` shell |
| 121 | +bin/kafka-topics.sh --zookeeper localhost:2181 --describe |
| 122 | +``` |
| 123 | + |
| 124 | +You now have a Kafka cluster consisting of a single node. This configuration is just a toy, but it works fine for a little testing. |
| 125 | + |
| 126 | +The configuration is as follows: |
| 127 | +* Kafka bootstrap server - `localhost:9092` |
| 128 | +* ZooKeeper server - `localhost:2181` |
| 129 | +* Topic name - `TSOURCE` and `TSINK` |
| 130 | + |
| 131 | +Note that this configuration of Kafka puts its data in `/tmp/kafka-logs`, while ZooKeeper uses `/tmp/zookeeper` and Kafka Connect uses `/tmp/connect.offsets`. You can clear out these directories to reset to an empty state, making sure beforehand that they're not being used for something else. |
| 132 | + |
| 133 | + |
| 134 | +## Running the MQ source connector |
| 135 | +The MQ source connector takes messages from an MQ queue and transfers them to a Kafka topic. |
| 136 | + |
| 137 | +Follow the [instructions](https://github.com/ibm-messaging/kafka-connect-mq-source) and build the connector JAR. The top-level directory that you used to build the connector is referred to as the *connector root directory*. |
| 138 | + |
| 139 | +In a terminal window, change directory into the connector root directory and copy the sample connector configuration file into your home directory so you can edit it safely: |
| 140 | +``` shell |
| 141 | +cp config/mq-source.properties ~ |
| 142 | +``` |
| 143 | + |
| 144 | +Edit the following properties in the `~/mq-source.properties` file to match the configuration so far: |
| 145 | +``` |
| 146 | +mq.queue.manager=MYQM |
| 147 | +mq.connection.name.list=localhost:1414 |
| 148 | +mq.channel.name=MYSVRCONN |
| 149 | +mq.queue=MYQSOURCE |
| 150 | +mq.user.name=alice |
| 151 | +mq.password=passw0rd |
| 152 | +topic=TSOURCE |
| 153 | +``` |
| 154 | + |
| 155 | +Change directory to the Kafka root directory. Start the connector worker replacing `<connector-root-directory>`: |
| 156 | +``` shell |
| 157 | +CLASSPATH=<connector-root-directory>/target/kafka-connect-mq-source-0.2-SNAPSHOT-jar-with-dependencies.jar bin/connect-standalone.sh config/connect-standalone.properties ~/mq-source.properties |
| 158 | +``` |
| 159 | + |
| 160 | +Wait while the worker starts and then prints: |
| 161 | +``` |
| 162 | +INFO Created connector mq-source |
| 163 | +``` |
| 164 | + |
| 165 | +If something goes wrong, you'll see familiar MQ reason codes in the error messages to help you diagnose the problems, such as: |
| 166 | +``` |
| 167 | +ERROR MQ error: CompCode 2, Reason 2538 MQRC_HOST_NOT_AVAILABLE |
| 168 | +``` |
| 169 | + |
| 170 | +You can just kill the worker, fix the problem and start it up again. |
| 171 | + |
| 172 | +Once it's started successfully and connected to MQ, you'll see this message: |
| 173 | +``` |
| 174 | +INFO Connection to MQ established |
| 175 | +``` |
| 176 | + |
| 177 | +In another terminal window, use the Kafka console consumer to start consuming messages from your topic and print them to the console: |
| 178 | +``` shell |
| 179 | +bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TSOURCE |
| 180 | +``` |
| 181 | + |
| 182 | +Now, run the `amqsput` sample and type in some messages to put on the MQ queue: |
| 183 | +``` shell |
| 184 | +/opt/mqm/samp/bin/amqsput MYQSOURCE MYQM |
| 185 | +``` |
| 186 | + |
| 187 | +After a short delay, you should see the messages printed by the Kafka console consumer. |
| 188 | + |
| 189 | +Congratulations! The messages were transferred from the MQ queue `MYQSOURCE` onto the Kafka topic `TSOURCE`. |
| 190 | + |
| 191 | + |
| 192 | +## Running the MQ sink connector |
| 193 | +The MQ sink connector takes messages from a Kafka topic and transfers them to an MQ queue. Running it is very similar to the source connector. |
| 194 | + |
| 195 | +Follow the [instructions](https://github.com/ibm-messaging/kafka-connect-mq-sink) and build the connector JAR. The top-level directory that you used to build the connector is referred to as the *connector root directory*. |
| 196 | + |
| 197 | +In a terminal window, change directory into the connector root directory and copy the sample connector configuration file into your home directory so you can edit it safely: |
| 198 | +``` shell |
| 199 | +cp config/mq-sink.properties ~ |
| 200 | +``` |
| 201 | + |
| 202 | +Edit the following properties in the `~/mq-sink.properties` to match the configuration so far: |
| 203 | +``` |
| 204 | +topics=TSINK |
| 205 | +mq.queue.manager=MYQM |
| 206 | +mq.connection.name.list=localhost:1414 |
| 207 | +mq.channel.name=MYSVRCONN |
| 208 | +mq.queue=MYQSINK |
| 209 | +mq.user.name=alice |
| 210 | +mq.password=passw0rd |
| 211 | +``` |
| 212 | + |
| 213 | +Change directory to the Kafka root directory. Start the connector worker replacing `<connector-root-directory>`: |
| 214 | +``` shell |
| 215 | +CLASSPATH=<connector-root-directory>/target/kafka-connect-mq-sink-0.2-SNAPSHOT-jar-with-dependencies.jar bin/kafka-connect-standalone config/connect-standalone.properties ~/mq-sink.properties |
| 216 | +``` |
| 217 | + |
| 218 | +Wait while the worker starts and then prints: |
| 219 | +``` |
| 220 | +INFO Created connector mq-sink |
| 221 | +``` |
| 222 | + |
| 223 | +If something goes wrong, you'll see familiar MQ reason codes in the error messages to help you diagnose the problems, such as: |
| 224 | +``` |
| 225 | +ERROR MQ error: CompCode 2, Reason 2538 MQRC_HOST_NOT_AVAILABLE |
| 226 | +``` |
| 227 | + |
| 228 | +You can just kill the worker, fix the problem and start it up again. |
| 229 | + |
| 230 | +Once it's started successfully and connected to MQ, you'll see this message: |
| 231 | +``` |
| 232 | +INFO Connection to MQ established |
| 233 | +``` |
| 234 | + |
| 235 | +In another terminal window, use the Kafka console producer to type in some messages and publish them on the Kafka topic: |
| 236 | +``` shell |
| 237 | +bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TSINK |
| 238 | +``` |
| 239 | + |
| 240 | +Now, use the `amqsget` sample to get the messages from the MQ queue: |
| 241 | +``` shell |
| 242 | +/opt/mqm/samp/bin/amqsget MYQSINK MYQM |
| 243 | +``` |
| 244 | + |
| 245 | +After a short delay, you should see the messages printed. |
| 246 | + |
| 247 | +Congratulations! The messages were transferred from the Kafka topic `TSINK` onto the MQ queue `MYQSINK`. |
| 248 | + |
| 249 | + |
| 250 | +## Stopping Apache Kafka |
| 251 | +After you have finished experimenting with this, you will probably want to stop Apache Kafka. You start by stopping any Kafka Connect workers and console producers and consumers that you may have left running. |
| 252 | + |
| 253 | +Then, in the Kafka root directory, stop Kafka: |
| 254 | +``` shell |
| 255 | +bin/kafka-server-stop.sh |
| 256 | +``` |
| 257 | + |
| 258 | +And finally, stop ZooKeeper: |
| 259 | +``` shell |
| 260 | +bin/zookeeper-server-stop.sh |
| 261 | +``` |
| 262 | + |
| 263 | + |
| 264 | +## Using existing MQ or Kafka installations |
| 265 | +You can use an existing MQ or Kafka installation, either locally or on the cloud. For performance reasons, it is recommended to run the Kafka Connect worker close to the queue manager to minimise the effect of network latency. So, if you have a queue manager in your datacenter and Kafka in the cloud, it's best to run the Kafka Connect worker in your datacenter. |
| 266 | + |
| 267 | +To use an existing queue manager, you'll need to specify the configuration information in the connector configuration file. You will need:ng configuration information: |
| 268 | +* The hostname (or IP address) and port number for the queue manager |
| 269 | +* The server-connection channel name |
| 270 | +* If using user ID/password authentication, the user ID and password for client connection |
| 271 | +* If using SSL/TLS, the name of the cipher suite to use |
| 272 | +* The queue manager name |
| 273 | +* The queue name |
| 274 | + |
| 275 | +To use an existing Kafka cluster, you specify the connection information in the worker configuration file. You will need: |
| 276 | +* A list of one or more servers for bootstrapping connections |
| 277 | +* Whether the cluster requires connections to use SSL/TLS |
| 278 | +* Authentication credentials if the cluster requires clients to authenticate |
| 279 | + |
| 280 | +You will also need to run the Kafka Connect worker. If you already have access to the Kafka installation, you probably have the Kafka Connect executables. Otherwise, follow the instructions earlier to download Kafka. |
| 281 | + |
| 282 | +Alternatively, IBM Message Hub is a fully managed cloud deployment of Apache Kafka running in IBM Cloud. In this case, the cluster is running in IBM Cloud but you will still need to run the Kafka Connect worker. If the queue manager is also in the cloud, you could also run the worker in the cloud. If the network latency between the queue manager and Kafka is high, you should run the worker near the queue manager. For information about how to configure Kafka Connect to work with IBM Message Hub read [this](https://console.bluemix.net/docs/services/MessageHub/messagehub113.html#kafka_connect). |
0 commit comments