Skip to content

Commit 23d0a43

Browse files
committed
fix: update tests to reflect change in handling jms reader
The existing tests were benefitting from the previous handling of the JMS reader. Now that the reader is being closed (without the reader committing what it's consumed from MQ) the messages fetched from the queue manager are being rolled back at the end of every test case when the source task is stopped. These messages are then retrieved from MQ again in the subsequent test case which is causing test failures. This is a byproduct of the source task design - that only makes commits to the MQ queue manager for messages retrieved in a poll() call in a subsequent poll. For normal operation this is okay, but for tests, we need to force there to be a subsequent poll. This commit adds a consistent way of doing this. Signed-off-by: Dale Lane <dale.lane@uk.ibm.com>
1 parent ddf6bfd commit 23d0a43

File tree

3 files changed

+136
-60
lines changed

3 files changed

+136
-60
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,15 @@ public void testAuthenticatedQueueManager() throws Exception {
8686
for (SourceRecord kafkaMessage : kafkaMessages) {
8787
assertNull(kafkaMessage.key());
8888
assertEquals(Schema.OPTIONAL_BYTES_SCHEMA, kafkaMessage.valueSchema());
89+
90+
newConnectTask.commitRecord(kafkaMessage);
8991
}
9092

9193
assertArrayEquals("hello".getBytes(), (byte[]) kafkaMessages.get(0).value());
9294
assertArrayEquals("world".getBytes(), (byte[]) kafkaMessages.get(1).value());
9395

94-
newConnectTask.stop();
96+
SourceTaskStopper stopper = new SourceTaskStopper(newConnectTask);
97+
stopper.run();
9598
}
9699

97100

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 69 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,20 @@
3131

3232
import org.apache.kafka.connect.data.Schema;
3333
import org.apache.kafka.connect.source.SourceRecord;
34+
import org.junit.After;
3435
import org.junit.Test;
3536

3637
public class MQSourceTaskIT extends AbstractJMSContextIT {
3738

39+
private MQSourceTask connectTask = null;
40+
41+
@After
42+
public void cleanup() throws InterruptedException {
43+
SourceTaskStopper stopper = new SourceTaskStopper(connectTask);
44+
stopper.run();
45+
}
46+
47+
3848
private static final String MQ_QUEUE = "DEV.QUEUE.1";
3949

4050
private Map<String, String> createDefaultConnectorProperties() {
@@ -51,42 +61,42 @@ private Map<String, String> createDefaultConnectorProperties() {
5161

5262
@Test
5363
public void verifyJmsTextMessages() throws Exception {
54-
MQSourceTask newConnectTask = new MQSourceTask();
64+
connectTask = new MQSourceTask();
5565

5666
Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
5767
connectorConfigProps.put("mq.message.body.jms", "true");
5868
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
5969

60-
newConnectTask.start(connectorConfigProps);
70+
connectTask.start(connectorConfigProps);
6171

6272
TextMessage message1 = getJmsContext().createTextMessage("hello");
6373
TextMessage message2 = getJmsContext().createTextMessage("world");
6474
putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message1, message2));
6575

66-
List<SourceRecord> kafkaMessages = newConnectTask.poll();
76+
List<SourceRecord> kafkaMessages = connectTask.poll();
6777
assertEquals(2, kafkaMessages.size());
6878
for (SourceRecord kafkaMessage : kafkaMessages) {
6979
assertNull(kafkaMessage.key());
7080
assertNull(kafkaMessage.valueSchema());
81+
82+
connectTask.commitRecord(kafkaMessage);
7183
}
7284

7385
assertEquals("hello", kafkaMessages.get(0).value());
7486
assertEquals("world", kafkaMessages.get(1).value());
75-
76-
newConnectTask.stop();
7787
}
7888

7989

8090

8191
@Test
8292
public void verifyJmsJsonMessages() throws Exception {
83-
MQSourceTask newConnectTask = new MQSourceTask();
93+
connectTask = new MQSourceTask();
8494

8595
Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
8696
connectorConfigProps.put("mq.message.body.jms", "true");
8797
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
8898

89-
newConnectTask.start(connectorConfigProps);
99+
connectTask.start(connectorConfigProps);
90100

91101
List<Message> messages = new ArrayList<>();
92102
for (int i = 0; i < 5; i++) {
@@ -97,7 +107,7 @@ public void verifyJmsJsonMessages() throws Exception {
97107
}
98108
putAllMessagesToQueue(MQ_QUEUE, messages);
99109

100-
List<SourceRecord> kafkaMessages = newConnectTask.poll();
110+
List<SourceRecord> kafkaMessages = connectTask.poll();
101111
assertEquals(5, kafkaMessages.size());
102112
for (int i = 0; i < 5; i++) {
103113
SourceRecord kafkaMessage = kafkaMessages.get(i);
@@ -106,23 +116,23 @@ public void verifyJmsJsonMessages() throws Exception {
106116

107117
Map<?, ?> value = (Map<?, ?>) kafkaMessage.value();
108118
assertEquals(Long.valueOf(i), value.get("i"));
109-
}
110119

111-
newConnectTask.stop();
120+
connectTask.commitRecord(kafkaMessage);
121+
}
112122
}
113123

114124

115125

116126
@Test
117127
public void verifyJmsMessageHeaders() throws Exception {
118-
MQSourceTask newConnectTask = new MQSourceTask();
128+
connectTask = new MQSourceTask();
119129

120130
Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
121131
connectorConfigProps.put("mq.message.body.jms", "true");
122132
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
123133
connectorConfigProps.put("mq.jms.properties.copy.to.kafka.headers", "true");
124134

125-
newConnectTask.start(connectorConfigProps);
135+
connectTask.start(connectorConfigProps);
126136

127137
TextMessage message = getJmsContext().createTextMessage("helloworld");
128138
message.setStringProperty("teststring", "myvalue");
@@ -131,7 +141,7 @@ public void verifyJmsMessageHeaders() throws Exception {
131141

132142
putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message));
133143

134-
List<SourceRecord> kafkaMessages = newConnectTask.poll();
144+
List<SourceRecord> kafkaMessages = connectTask.poll();
135145
assertEquals(1, kafkaMessages.size());
136146
SourceRecord kafkaMessage = kafkaMessages.get(0);
137147
assertNull(kafkaMessage.key());
@@ -143,21 +153,21 @@ public void verifyJmsMessageHeaders() throws Exception {
143153
assertEquals("11", kafkaMessage.headers().lastWithName("volume").value());
144154
assertEquals("42.0", kafkaMessage.headers().lastWithName("decimalmeaning").value());
145155

146-
newConnectTask.stop();
156+
connectTask.commitRecord(kafkaMessage);
147157
}
148158

149159

150160

151161
@Test
152162
public void verifyMessageBatchIndividualCommits() throws Exception {
153-
MQSourceTask newConnectTask = new MQSourceTask();
163+
connectTask = new MQSourceTask();
154164

155165
Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
156166
connectorConfigProps.put("mq.message.body.jms", "true");
157167
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
158168
connectorConfigProps.put("mq.batch.size", "10");
159169

160-
newConnectTask.start(connectorConfigProps);
170+
connectTask.start(connectorConfigProps);
161171

162172
List<Message> messages = new ArrayList<>();
163173
for (int i = 1; i <= 35; i++) {
@@ -169,49 +179,47 @@ public void verifyMessageBatchIndividualCommits() throws Exception {
169179

170180
List<SourceRecord> kafkaMessages;
171181

172-
kafkaMessages = newConnectTask.poll();
182+
kafkaMessages = connectTask.poll();
173183
assertEquals(10, kafkaMessages.size());
174184
for (SourceRecord kafkaMessage : kafkaMessages) {
175185
assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value());
176-
newConnectTask.commitRecord(kafkaMessage);
186+
connectTask.commitRecord(kafkaMessage);
177187
}
178188

179-
kafkaMessages = newConnectTask.poll();
189+
kafkaMessages = connectTask.poll();
180190
assertEquals(10, kafkaMessages.size());
181191
for (SourceRecord kafkaMessage : kafkaMessages) {
182192
assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value());
183-
newConnectTask.commitRecord(kafkaMessage);
193+
connectTask.commitRecord(kafkaMessage);
184194
}
185195

186-
kafkaMessages = newConnectTask.poll();
196+
kafkaMessages = connectTask.poll();
187197
assertEquals(10, kafkaMessages.size());
188198
for (SourceRecord kafkaMessage : kafkaMessages) {
189199
assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value());
190-
newConnectTask.commitRecord(kafkaMessage);
200+
connectTask.commitRecord(kafkaMessage);
191201
}
192202

193-
kafkaMessages = newConnectTask.poll();
203+
kafkaMessages = connectTask.poll();
194204
assertEquals(5, kafkaMessages.size());
195205
for (SourceRecord kafkaMessage : kafkaMessages) {
196206
assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value());
197-
newConnectTask.commitRecord(kafkaMessage);
207+
connectTask.commitRecord(kafkaMessage);
198208
}
199-
200-
newConnectTask.stop();
201209
}
202210

203211

204212

205213
@Test
206214
public void verifyMessageBatchGroupCommits() throws Exception {
207-
MQSourceTask newConnectTask = new MQSourceTask();
215+
connectTask = new MQSourceTask();
208216

209217
Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
210218
connectorConfigProps.put("mq.message.body.jms", "true");
211219
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
212220
connectorConfigProps.put("mq.batch.size", "10");
213221

214-
newConnectTask.start(connectorConfigProps);
222+
connectTask.start(connectorConfigProps);
215223

216224
List<Message> messages = new ArrayList<>();
217225
for (int i = 1; i <= 35; i++) {
@@ -221,46 +229,48 @@ public void verifyMessageBatchGroupCommits() throws Exception {
221229

222230
List<SourceRecord> kafkaMessages;
223231

224-
kafkaMessages = newConnectTask.poll();
232+
kafkaMessages = connectTask.poll();
225233
assertEquals(10, kafkaMessages.size());
226-
newConnectTask.commit();
227-
newConnectTask.commit();
234+
for (SourceRecord m : kafkaMessages) {
235+
connectTask.commitRecord(m);
236+
}
228237

229-
kafkaMessages = newConnectTask.poll();
238+
kafkaMessages = connectTask.poll();
230239
assertEquals(10, kafkaMessages.size());
231-
newConnectTask.commit();
232-
newConnectTask.commit();
240+
for (SourceRecord m : kafkaMessages) {
241+
connectTask.commitRecord(m);
242+
}
233243

234-
kafkaMessages = newConnectTask.poll();
244+
kafkaMessages = connectTask.poll();
235245
assertEquals(10, kafkaMessages.size());
236-
newConnectTask.commit();
237-
newConnectTask.commit();
246+
for (SourceRecord m : kafkaMessages) {
247+
connectTask.commitRecord(m);
248+
}
238249

239-
kafkaMessages = newConnectTask.poll();
250+
kafkaMessages = connectTask.poll();
240251
assertEquals(5, kafkaMessages.size());
241-
newConnectTask.commit();
242-
newConnectTask.commit();
243-
244-
newConnectTask.stop();
252+
for (SourceRecord m : kafkaMessages) {
253+
connectTask.commitRecord(m);
254+
}
245255
}
246256

247257

248258

249259
@Test
250260
public void verifyMessageIdAsKey() throws Exception {
251-
MQSourceTask newConnectTask = new MQSourceTask();
261+
connectTask = new MQSourceTask();
252262

253263
Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
254264
connectorConfigProps.put("mq.message.body.jms", "true");
255265
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
256266
connectorConfigProps.put("mq.record.builder.key.header", "JMSMessageID");
257267

258-
newConnectTask.start(connectorConfigProps);
268+
connectTask.start(connectorConfigProps);
259269

260270
TextMessage message = getJmsContext().createTextMessage("testmessage");
261271
putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message));
262272

263-
List<SourceRecord> kafkaMessages = newConnectTask.poll();
273+
List<SourceRecord> kafkaMessages = connectTask.poll();
264274
assertEquals(1, kafkaMessages.size());
265275

266276
SourceRecord kafkaMessage = kafkaMessages.get(0);
@@ -270,62 +280,62 @@ public void verifyMessageIdAsKey() throws Exception {
270280

271281
assertEquals("testmessage", kafkaMessage.value());
272282

273-
newConnectTask.stop();
283+
connectTask.commitRecord(kafkaMessage);
274284
}
275285

276286

277287

278288
@Test
279289
public void verifyCorrelationIdAsKey() throws Exception {
280-
MQSourceTask newConnectTask = new MQSourceTask();
290+
connectTask = new MQSourceTask();
281291

282292
Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
283293
connectorConfigProps.put("mq.message.body.jms", "true");
284294
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
285295
connectorConfigProps.put("mq.record.builder.key.header", "JMSCorrelationID");
286296

287-
newConnectTask.start(connectorConfigProps);
297+
connectTask.start(connectorConfigProps);
288298

289299
TextMessage message1 = getJmsContext().createTextMessage("first message");
290300
message1.setJMSCorrelationID("verifycorrel");
291301
TextMessage message2 = getJmsContext().createTextMessage("second message");
292302
message2.setJMSCorrelationID("ID:5fb4a18030154fe4b09a1dfe8075bc101dfe8075bc104fe4");
293303
putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message1, message2));
294304

295-
List<SourceRecord> kafkaMessages = newConnectTask.poll();
305+
List<SourceRecord> kafkaMessages = connectTask.poll();
296306
assertEquals(2, kafkaMessages.size());
297307

298308
SourceRecord kafkaMessage1 = kafkaMessages.get(0);
299309
assertEquals("verifycorrel", kafkaMessage1.key());
300310
assertEquals(Schema.OPTIONAL_STRING_SCHEMA, kafkaMessage1.keySchema());
301311
assertEquals("first message", kafkaMessage1.value());
312+
connectTask.commitRecord(kafkaMessage1);
302313

303314
SourceRecord kafkaMessage2 = kafkaMessages.get(1);
304315
assertEquals("5fb4a18030154fe4b09a1dfe8075bc101dfe8075bc104fe4", kafkaMessage2.key());
305316
assertEquals(Schema.OPTIONAL_STRING_SCHEMA, kafkaMessage2.keySchema());
306317
assertEquals("second message", kafkaMessage2.value());
307-
308-
newConnectTask.stop();
318+
connectTask.commitRecord(kafkaMessage2);
309319
}
310320

311321

312322

313323
@Test
314324
public void verifyCorrelationIdBytesAsKey() throws Exception {
315-
MQSourceTask newConnectTask = new MQSourceTask();
325+
connectTask = new MQSourceTask();
316326

317327
Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
318328
connectorConfigProps.put("mq.message.body.jms", "true");
319329
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
320330
connectorConfigProps.put("mq.record.builder.key.header", "JMSCorrelationIDAsBytes");
321331

322-
newConnectTask.start(connectorConfigProps);
332+
connectTask.start(connectorConfigProps);
323333

324334
TextMessage message = getJmsContext().createTextMessage("testmessagewithcorrelbytes");
325335
message.setJMSCorrelationID("verifycorrelbytes");
326336
putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message));
327337

328-
List<SourceRecord> kafkaMessages = newConnectTask.poll();
338+
List<SourceRecord> kafkaMessages = connectTask.poll();
329339
assertEquals(1, kafkaMessages.size());
330340

331341
SourceRecord kafkaMessage = kafkaMessages.get(0);
@@ -334,26 +344,26 @@ public void verifyCorrelationIdBytesAsKey() throws Exception {
334344

335345
assertEquals("testmessagewithcorrelbytes", kafkaMessage.value());
336346

337-
newConnectTask.stop();
347+
connectTask.commitRecord(kafkaMessage);
338348
}
339349

340350

341351

342352
@Test
343353
public void verifyDestinationAsKey() throws Exception {
344-
MQSourceTask newConnectTask = new MQSourceTask();
354+
connectTask = new MQSourceTask();
345355

346356
Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
347357
connectorConfigProps.put("mq.message.body.jms", "true");
348358
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
349359
connectorConfigProps.put("mq.record.builder.key.header", "JMSDestination");
350360

351-
newConnectTask.start(connectorConfigProps);
361+
connectTask.start(connectorConfigProps);
352362

353363
TextMessage message = getJmsContext().createTextMessage("testmessagewithdest");
354364
putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message));
355365

356-
List<SourceRecord> kafkaMessages = newConnectTask.poll();
366+
List<SourceRecord> kafkaMessages = connectTask.poll();
357367
assertEquals(1, kafkaMessages.size());
358368

359369
SourceRecord kafkaMessage = kafkaMessages.get(0);
@@ -362,6 +372,6 @@ public void verifyDestinationAsKey() throws Exception {
362372

363373
assertEquals("testmessagewithdest", kafkaMessage.value());
364374

365-
newConnectTask.stop();
375+
connectTask.commitRecord(kafkaMessage);
366376
}
367377
}

0 commit comments

Comments
 (0)