1
1
package com .ibm .eventstreams .connect .mqsink ;
2
2
3
3
import static org .junit .Assert .assertEquals ;
4
+ import static org .junit .Assert .assertThrows ;
4
5
5
6
import java .util .ArrayList ;
6
7
import java .util .HashMap ;
16
17
import org .apache .kafka .connect .data .Schema ;
17
18
import org .apache .kafka .connect .data .SchemaBuilder ;
18
19
import org .apache .kafka .connect .data .Struct ;
20
+ import org .apache .kafka .connect .errors .ConnectException ;
19
21
import org .apache .kafka .connect .sink .SinkRecord ;
20
22
import org .junit .Test ;
21
23
22
- public class SimpleMQSinkTaskIT extends AbstractJMSContextIT {
24
+ public class MQSinkTaskIT extends AbstractJMSContextIT {
23
25
24
26
private static final String TOPIC = "SINK.TOPIC.NAME" ;
25
27
private static final int PARTITION = 3 ;
@@ -46,6 +48,35 @@ private Map<String, String> createDefaultConnectorProperties() {
46
48
}
47
49
48
50
51
+ @ Test
52
+ public void verifyUnsupportedReplyQueueName () {
53
+ Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
54
+ connectorConfigProps .put ("mq.message.builder" , "com.ibm.eventstreams.connect.mqsink.builders.DefaultMessageBuilder" );
55
+ connectorConfigProps .put ("mq.reply.queue" , "queue://QM2/Q2?persistence=2&priority=5" );
56
+
57
+ MQSinkTask newConnectTask = new MQSinkTask ();
58
+ ConnectException exc = assertThrows (ConnectException .class , () -> {
59
+ newConnectTask .start (connectorConfigProps );
60
+ });
61
+
62
+ assertEquals ("Reply-to queue URI must not contain properties" , exc .getMessage ());
63
+ }
64
+
65
+ @ Test
66
+ public void verifyUnsupportedKeyHeader () {
67
+ Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
68
+ connectorConfigProps .put ("mq.message.builder" , "com.ibm.eventstreams.connect.mqsink.builders.DefaultMessageBuilder" );
69
+ connectorConfigProps .put ("mq.message.builder.key.header" , "hello" );
70
+
71
+ MQSinkTask newConnectTask = new MQSinkTask ();
72
+ ConnectException exc = assertThrows (ConnectException .class , () -> {
73
+ newConnectTask .start (connectorConfigProps );
74
+ });
75
+
76
+ assertEquals ("Unsupported MQ message builder key header value" , exc .getMessage ());
77
+ }
78
+
79
+
49
80
@ Test
50
81
public void verifyStringMessages () throws JMSException {
51
82
MQSinkTask newConnectTask = new MQSinkTask ();
@@ -207,15 +238,15 @@ public void verifyJsonWithDefaultBuilder() throws JMSException {
207
238
connectorConfigProps .put ("mq.message.builder.value.converter" , "org.apache.kafka.connect.json.JsonConverter" );
208
239
connectorConfigProps .put ("mq.message.builder" , "com.ibm.eventstreams.connect.mqsink.builders.DefaultMessageBuilder" );
209
240
210
- verifyMessageConversion (connectorConfigProps , Schema .STRING_SCHEMA , "\" ABC\" " , "\" ABC\" " );
241
+ verifyMessageConversion (connectorConfigProps , Schema .STRING_SCHEMA , "ABC" , "ABC" );
211
242
}
212
243
@ Test
213
244
public void verifyJsonWithJsonBuilder () throws JMSException {
214
245
Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
215
246
connectorConfigProps .put ("mq.message.builder.value.converter" , "org.apache.kafka.connect.json.JsonConverter" );
216
247
connectorConfigProps .put ("mq.message.builder" , "com.ibm.eventstreams.connect.mqsink.builders.JsonMessageBuilder" );
217
248
218
- verifyMessageConversion (connectorConfigProps , Schema .STRING_SCHEMA , "\" ABC\" " , "\" \\ \" ABC\\ \" \" " );
249
+ verifyMessageConversion (connectorConfigProps , Schema .STRING_SCHEMA , "ABC" , "\" ABC\" " );
219
250
}
220
251
221
252
0 commit comments