15
15
*/
16
16
package com .ibm .eventstreams .connect .mqsource ;
17
17
18
+ import com .ibm .eventstreams .connect .mqsource .builders .RecordBuilder ;
19
+
18
20
import com .ibm .mq .MQException ;
19
21
import com .ibm .mq .constants .MQConstants ;
20
22
import com .ibm .mq .jms .*;
21
- import com .ibm .eventstreams .connect .mqsource .builders .RecordBuilder ;
22
23
import com .ibm .msg .client .wmq .WMQConstants ;
23
24
24
25
import java .util .Map ;
@@ -215,10 +216,14 @@ public SourceRecord receive(boolean wait) {
215
216
inperil = false ;
216
217
}
217
218
}
218
- catch (JMSException | JMSRuntimeException | ConnectException exc ) {
219
+ catch (JMSException | JMSRuntimeException exc ) {
219
220
log .error ("JMS exception {}" , exc );
220
221
handleException (exc );
221
222
}
223
+ catch (ConnectException exc ) {
224
+ log .error ("Connect exception {}" , exc );
225
+ throw exc ;
226
+ }
222
227
223
228
log .trace ("[{}] Exit {}.receive, retval={}" , Thread .currentThread ().getId (), this .getClass ().getName (), sr );
224
229
return sr ;
@@ -266,7 +271,9 @@ public void close() {
266
271
try {
267
272
JMSContext ctxt = jmsCtxt ;
268
273
closeNow .set (true );
269
- ctxt .close ();
274
+ if (ctxt != null ) {
275
+ ctxt .close ();
276
+ }
270
277
}
271
278
catch (JMSRuntimeException jmse ) {
272
279
;
@@ -375,6 +382,11 @@ private ConnectException handleException(Throwable exc) {
375
382
reason = mqe .getReason ();
376
383
break ;
377
384
}
385
+ else if (t instanceof JMSException ) {
386
+ JMSException jmse = (JMSException )t ;
387
+ log .error ("JMS exception: error code {}" , jmse .getErrorCode ());
388
+ }
389
+
378
390
t = t .getCause ();
379
391
}
380
392
@@ -394,7 +406,7 @@ private ConnectException handleException(Throwable exc) {
394
406
isRetriable = true ;
395
407
break ;
396
408
397
- // These reason codes indicate that the connect is still OK, but just retrying later
409
+ // These reason codes indicate that the connection is still OK, but just retrying later
398
410
// will probably recover - possibly with administrative action on the queue manager
399
411
case MQConstants .MQRC_GET_INHIBITED :
400
412
isRetriable = true ;
@@ -403,6 +415,13 @@ private ConnectException handleException(Throwable exc) {
403
415
}
404
416
405
417
if (mustClose ) {
418
+ // Delay so that repeated reconnect loops don't run too fast
419
+ try {
420
+ Thread .sleep (RECONNECT_DELAY_MILLIS_MAX );
421
+ }
422
+ catch (InterruptedException ie ) {
423
+ ;
424
+ }
406
425
closeInternal ();
407
426
}
408
427
0 commit comments