Skip to content

Commit 3cf3260

Browse files
committed
Transactional async put support - #32
1 parent c719c1c commit 3cf3260

File tree

4 files changed

+437
-24
lines changed

4 files changed

+437
-24
lines changed

asyncput_test.go

Lines changed: 343 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func TestAsyncPutSample(t *testing.T) {
5959
}
6060

6161
/*
62-
* Compare the performance benefit of sending messages non-persistent, non-transational
62+
* Compare the performance benefit of sending messages non-persistent, non-transactional
6363
* messages asynchronously - which can give a higher message rate, in exchange for
6464
* less/no checking for errors.
6565
*
@@ -298,7 +298,7 @@ func TestAsyncPutCheckCountWithFailure(t *testing.T) {
298298
// the next check which takes place at 30.
299299
if i == 0 && errSend != nil && errSend.GetReason() == "MQRC_UNKNOWN_OBJECT_NAME" {
300300

301-
fmt.Println("Skipping TestAsyncPutCheckCountWithFailure as " + QUEUE_25_NAME + " is not defined.")
301+
fmt.Println("Skipping TestAsyncPutCheckCountWithFailure as queue " + QUEUE_25_NAME + " is not defined.")
302302
queueExists = false
303303
break // Stop the loop at this point as we know it won't change.
304304

@@ -402,3 +402,344 @@ func TestAsyncPutGetterSetter(t *testing.T) {
402402
assert.Equal(t, jms20subset.Destination_PUT_ASYNC_ALLOWED_AS_DEST, queue.GetPutAsyncAllowed())
403403

404404
}
405+
406+
/*
407+
* Compare the performance benefit of sending messages persistent, transactional
408+
* messages asynchronously - which can give a higher message rate, in exchange for
409+
* less/no checking for errors.
410+
*
411+
* The test checks that async put is at least 10% faster than synchronous put.
412+
* (in testing against a remote queue manager it was actually 30% faster)
413+
*/
414+
func TestAsyncPutPersistentTransactedComparison(t *testing.T) {
415+
416+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
417+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
418+
assert.Nil(t, cfErr)
419+
420+
// Check the default value for SendCheckCount, which means never check for errors.
421+
assert.Equal(t, 0, cf.SendCheckCount)
422+
423+
// Creates a connection to the queue manager, using defer to close it automatically
424+
// at the end of the function (if it was created successfully)
425+
transactedContext, ctxErr := cf.CreateContextWithSessionMode(jms20subset.JMSContextSESSIONTRANSACTED)
426+
assert.Nil(t, ctxErr)
427+
if transactedContext != nil {
428+
defer transactedContext.Close()
429+
}
430+
431+
// --------------------------------------------------------
432+
// Start by sending a set of messages using the normal synchronous approach, in
433+
// order that we can get a baseline timing.
434+
435+
// Set up the producer and consumer with the SYNCHRONOUS (not async yet) queue
436+
syncQueue := transactedContext.CreateQueue("DEV.QUEUE.1")
437+
producer := transactedContext.CreateProducer().SetDeliveryMode(jms20subset.DeliveryMode_PERSISTENT).SetTimeToLive(60000)
438+
439+
consumer, errCons := transactedContext.CreateConsumer(syncQueue)
440+
assert.Nil(t, errCons)
441+
if consumer != nil {
442+
defer consumer.Close()
443+
}
444+
445+
// Create a unique message prefix representing this execution of the test case.
446+
testcasePrefix := strconv.FormatInt(currentTimeMillis(), 10)
447+
syncMsgPrefix := "syncput_" + testcasePrefix + "_"
448+
asyncMsgPrefix := "asyncput_" + testcasePrefix + "_"
449+
numberMessages := 50
450+
451+
// First get a baseline for how long it takes us to send the batch of messages
452+
// WITHOUT async put (i.e. using normal synchronous put)
453+
syncStartTime := currentTimeMillis()
454+
for i := 0; i < numberMessages; i++ {
455+
456+
// Create a TextMessage and send it.
457+
msg := transactedContext.CreateTextMessageWithString(syncMsgPrefix + strconv.Itoa(i))
458+
459+
errSend := producer.Send(syncQueue, msg)
460+
assert.Nil(t, errSend)
461+
}
462+
transactedContext.Commit()
463+
syncEndTime := currentTimeMillis()
464+
465+
syncSendTime := syncEndTime - syncStartTime
466+
//fmt.Println("Took " + strconv.FormatInt(syncSendTime, 10) + "ms to send " + strconv.Itoa(numberMessages) + " transacted synchronous messages.")
467+
468+
// Receive the messages back again to tidy the queue back to a clean state
469+
finishedReceiving := false
470+
rcvCount := 0
471+
472+
for !finishedReceiving {
473+
rcvTxt, errRvc := consumer.ReceiveStringBodyNoWait()
474+
assert.Nil(t, errRvc)
475+
476+
if rcvTxt != nil {
477+
// Check the message bod matches what we expect
478+
assert.Equal(t, syncMsgPrefix+strconv.Itoa(rcvCount), *rcvTxt)
479+
rcvCount++
480+
} else {
481+
finishedReceiving = true
482+
}
483+
}
484+
transactedContext.Commit()
485+
486+
// --------------------------------------------------------
487+
// Now repeat the experiment but with ASYNC message put
488+
asyncQueue := transactedContext.CreateQueue("DEV.QUEUE.1").SetPutAsyncAllowed(jms20subset.Destination_PUT_ASYNC_ALLOWED_ENABLED)
489+
490+
asyncStartTime := currentTimeMillis()
491+
for i := 0; i < numberMessages; i++ {
492+
493+
// Create a TextMessage and send it.
494+
msg := transactedContext.CreateTextMessageWithString(asyncMsgPrefix + strconv.Itoa(i))
495+
496+
errSend := producer.Send(asyncQueue, msg)
497+
assert.Nil(t, errSend)
498+
}
499+
transactedContext.Commit()
500+
asyncEndTime := currentTimeMillis()
501+
502+
asyncSendTime := asyncEndTime - asyncStartTime
503+
//fmt.Println("Took " + strconv.FormatInt(asyncSendTime, 10) + "ms to send " + strconv.Itoa(numberMessages) + " transacted ASYNC messages.")
504+
505+
// Receive the messages back again to tidy the queue back to a clean state
506+
finishedReceiving = false
507+
rcvCount = 0
508+
509+
for !finishedReceiving {
510+
rcvTxt, errRvc := consumer.ReceiveStringBodyNoWait()
511+
assert.Nil(t, errRvc)
512+
513+
if rcvTxt != nil {
514+
// Check the message bod matches what we expect
515+
assert.Equal(t, asyncMsgPrefix+strconv.Itoa(rcvCount), *rcvTxt)
516+
rcvCount++
517+
} else {
518+
finishedReceiving = true
519+
}
520+
}
521+
transactedContext.Commit()
522+
523+
assert.Equal(t, numberMessages, rcvCount)
524+
525+
// Expect that async put is at least 10% faster than sync put for non-persistent messages
526+
// (in testing against a remote queue manager it was actually 30% faster)
527+
assert.True(t, 100*asyncSendTime < 90*syncSendTime)
528+
529+
}
530+
531+
/*
532+
* Validate that errors are reported at the correct interval when a problem occurs during a
533+
* transactional put of persistent messages (i.e. when the commit occurs)
534+
*
535+
* This test case forces a failure to occur by sending 50 messages to a queue that has had its
536+
* maximum depth set to 25. In the transacted async put case the Send method always completes
537+
* successfully, but the error will be returned on Commit.
538+
*/
539+
func TestAsyncPutTransactedCheckCountWithFailure(t *testing.T) {
540+
541+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
542+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
543+
assert.Nil(t, cfErr)
544+
545+
// The SendCheckCount is not used for async put under a transacted session.
546+
assert.Equal(t, 0, cf.SendCheckCount)
547+
548+
// Creates a connection to the queue manager, using defer to close it automatically
549+
// at the end of the function (if it was created successfully)
550+
transactedContext, ctxErr := cf.CreateContextWithSessionMode(jms20subset.JMSContextSESSIONTRANSACTED)
551+
assert.Nil(t, ctxErr)
552+
if transactedContext != nil {
553+
defer transactedContext.Close()
554+
}
555+
556+
// Set up the producer and consumer with the async queue.
557+
QUEUE_25_NAME := "DEV.MAXDEPTH25"
558+
asyncQueue := transactedContext.CreateQueue(QUEUE_25_NAME).SetPutAsyncAllowed(jms20subset.Destination_PUT_ASYNC_ALLOWED_ENABLED)
559+
producer := transactedContext.CreateProducer().SetDeliveryMode(jms20subset.DeliveryMode_PERSISTENT).SetTimeToLive(60000)
560+
561+
// Create a unique message prefix representing this execution of the test case.
562+
testcasePrefix := strconv.FormatInt(currentTimeMillis(), 10)
563+
msgPrefix := "checkCount_" + testcasePrefix + "_"
564+
numberMessages := 50
565+
566+
// Variable to track whether the queue exists or not.
567+
queueExists := true
568+
569+
// --------------------------------------------------------
570+
// Send ASYNC message put
571+
for i := 0; i < numberMessages; i++ {
572+
573+
// Create a TextMessage and send it.
574+
msg := transactedContext.CreateTextMessageWithString(msgPrefix + strconv.Itoa(i))
575+
576+
errSend2 := producer.Send(asyncQueue, msg)
577+
578+
// Skip the test if the destination does not exist on this queue manager.
579+
if i == 0 && errSend2 != nil && errSend2.GetReason() == "MQRC_UNKNOWN_OBJECT_NAME" {
580+
581+
fmt.Println("Skipping TestAsyncPutTransactedCheckCountWithFailure as queue " + QUEUE_25_NAME + " is not defined.")
582+
queueExists = false
583+
break // Stop the loop at this point as we know it won't change.
584+
585+
}
586+
587+
// In the Transacted case the response from Send is always Nil, because any errors
588+
// will be reflected on the Commit call.
589+
assert.Nil(t, errSend2)
590+
591+
if i%10 == 0 {
592+
commitErr := transactedContext.Commit()
593+
594+
if i == 30 || i == 40 {
595+
596+
assert.NotNil(t, commitErr)
597+
assert.Equal(t, "2003", commitErr.GetErrorCode())
598+
assert.Equal(t, "MQRC_BACKED_OUT", commitErr.GetReason())
599+
600+
// Linked error is out normal async put error with message about "N failures"
601+
linkedErr1 := commitErr.GetLinkedError()
602+
assert.Equal(t, "AsyncPutFailure", linkedErr1.(jms20subset.JMSExceptionImpl).GetErrorCode())
603+
assert.True(t, strings.Contains(linkedErr1.(jms20subset.JMSExceptionImpl).GetReason(), "6 failures"))
604+
assert.True(t, strings.Contains(linkedErr1.(jms20subset.JMSExceptionImpl).GetReason(), "0 warnings"))
605+
606+
// Second level linked message should have reason of MQRC_Q_FULL
607+
linkedErr2 := linkedErr1.(jms20subset.JMSExceptionImpl).GetLinkedError()
608+
assert.NotNil(t, linkedErr2)
609+
linkedReason := linkedErr2.(jms20subset.JMSExceptionImpl).GetReason()
610+
assert.Equal(t, "MQRC_Q_FULL", linkedReason)
611+
612+
} else {
613+
// Messages 31, 32, ... 39, 41, 42, ...
614+
// do not give an error because we don't make an error check.
615+
assert.Nil(t, commitErr)
616+
}
617+
}
618+
}
619+
620+
// Clear out the transaction context.
621+
transactedContext.Commit()
622+
623+
// If the queue exists then tidy up the messages we sent.
624+
if queueExists {
625+
626+
// ----------------------------------
627+
// Receive the messages back again to tidy the queue back to a clean state
628+
consumer, errCons := transactedContext.CreateConsumer(asyncQueue)
629+
assert.Nil(t, errCons)
630+
if consumer != nil {
631+
defer consumer.Close()
632+
}
633+
634+
// Receive the messages back again
635+
finishedReceiving := false
636+
637+
for !finishedReceiving {
638+
rcvMsg, errRvc := consumer.ReceiveNoWait()
639+
assert.Nil(t, errRvc)
640+
641+
if rcvMsg == nil {
642+
finishedReceiving = true
643+
}
644+
}
645+
646+
transactedContext.Commit()
647+
}
648+
}
649+
650+
/*
651+
* Validate that NO errors are reported when a problem occurs during a
652+
* transactional put of a non-persistent message.
653+
*
654+
* (per https://www.ibm.com/docs/en/ibm-mq/9.1?topic=application-putting-messages-asynchronously-in-mq-classes-jms)
655+
*
656+
* This test case forces a failure to occur by sending 50 messages to a queue that has had its
657+
* maximum depth set to 25.
658+
*/
659+
func TestAsyncPutTransactedNonPersistentCheckCountWithFailure(t *testing.T) {
660+
661+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
662+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
663+
assert.Nil(t, cfErr)
664+
665+
// The SendCheckCount is not used for async put under a transacted session.
666+
assert.Equal(t, 0, cf.SendCheckCount)
667+
668+
// Creates a connection to the queue manager, using defer to close it automatically
669+
// at the end of the function (if it was created successfully)
670+
transactedContext, ctxErr := cf.CreateContextWithSessionMode(jms20subset.JMSContextSESSIONTRANSACTED)
671+
assert.Nil(t, ctxErr)
672+
if transactedContext != nil {
673+
defer transactedContext.Close()
674+
}
675+
676+
// Set up the producer and consumer with the async queue.
677+
QUEUE_25_NAME := "DEV.MAXDEPTH25"
678+
asyncQueue := transactedContext.CreateQueue(QUEUE_25_NAME).SetPutAsyncAllowed(jms20subset.Destination_PUT_ASYNC_ALLOWED_ENABLED)
679+
producer := transactedContext.CreateProducer().SetDeliveryMode(jms20subset.DeliveryMode_NON_PERSISTENT).SetTimeToLive(60000)
680+
681+
// Create a unique message prefix representing this execution of the test case.
682+
testcasePrefix := strconv.FormatInt(currentTimeMillis(), 10)
683+
msgPrefix := "checkCount_" + testcasePrefix + "_"
684+
numberMessages := 50
685+
686+
// Variable to track whether the queue exists or not.
687+
queueExists := true
688+
689+
// --------------------------------------------------------
690+
// Send ASYNC message put
691+
for i := 0; i < numberMessages; i++ {
692+
693+
// Create a TextMessage and send it.
694+
msg := transactedContext.CreateTextMessageWithString(msgPrefix + strconv.Itoa(i))
695+
696+
errSend2 := producer.Send(asyncQueue, msg)
697+
698+
// Skip the test if the destination does not exist on this queue manager.
699+
if i == 0 && errSend2 != nil && errSend2.GetReason() == "MQRC_UNKNOWN_OBJECT_NAME" {
700+
701+
fmt.Println("Skipping TestAsyncPutTransactedNonPersistentCheckCountWithFailure as queue " + QUEUE_25_NAME + " is not defined.")
702+
queueExists = false
703+
break // Stop the loop at this point as we know it won't change.
704+
705+
}
706+
707+
// In the Transacted case the response from Send is always Nil, because any errors
708+
// will be reflected on the Commit call.
709+
assert.Nil(t, errSend2)
710+
711+
if i%10 == 0 {
712+
commitErr := transactedContext.Commit()
713+
714+
// No errors reported for NonPersistent messages in a transaction.
715+
assert.Nil(t, commitErr)
716+
717+
}
718+
}
719+
720+
// If the queue exists then tidy up the messages we sent.
721+
if queueExists {
722+
723+
// ----------------------------------
724+
// Receive the messages back again to tidy the queue back to a clean state
725+
consumer, errCons := transactedContext.CreateConsumer(asyncQueue)
726+
assert.Nil(t, errCons)
727+
if consumer != nil {
728+
defer consumer.Close()
729+
}
730+
731+
// Receive the messages back again
732+
finishedReceiving := false
733+
734+
for !finishedReceiving {
735+
rcvMsg, errRvc := consumer.ReceiveNoWait()
736+
assert.Nil(t, errRvc)
737+
738+
if rcvMsg == nil {
739+
finishedReceiving = true
740+
}
741+
}
742+
743+
transactedContext.Commit()
744+
}
745+
}

0 commit comments

Comments
 (0)