Skip to content

Commit 8a5751f

Browse files
committed
Check on a defined interval for errors from async put - #32
1 parent 2e4d810 commit 8a5751f

File tree

9 files changed

+330
-5
lines changed

9 files changed

+330
-5
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ your own error handling or logging.
124124
* Handle error codes returned by the queue manager - [sample_errorhandling_test.go](sample_errorhandling_test.go)
125125
* Set the application name (ApplName) on connections - [applname_test.go](applname_test.go)
126126
* Receive messages over 32kb in size by setting the receive buffer size - [largemessage_test.go](largemessage_test.go)
127+
* Asynchronous put - [asyncput_test.go](asyncput_test.go)
127128

128129
As normal with Go, you can run any individual testcase by executing a command such as;
129130
```bash

asyncput_test.go

Lines changed: 243 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
package main
1111

1212
import (
13+
"fmt"
1314
"strconv"
15+
"strings"
1416
"testing"
1517

1618
"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
@@ -19,10 +21,10 @@ import (
1921
)
2022

2123
/*
22-
* Test the ability to send a message asynchronously, which can give a higher
24+
* Minimal example showing how to send a message asynchronously, which can give a higher
2325
* rate of sending non-persistent messages, in exchange for less/no checking for errors.
2426
*/
25-
func TestAsyncPut(t *testing.T) {
27+
func TestAsyncPutSample(t *testing.T) {
2628

2729
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
2830
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
@@ -36,6 +38,55 @@ func TestAsyncPut(t *testing.T) {
3638
defer context.Close()
3739
}
3840

41+
// Set up a Producer for NonPersistent messages and Destination the PutAsyncAllowed=true
42+
producer := context.CreateProducer().SetDeliveryMode(jms20subset.DeliveryMode_NON_PERSISTENT)
43+
asyncQueue := context.CreateQueue("DEV.QUEUE.1").SetPutAsyncAllowed(jms20subset.Destination_PUT_ASYNC_ALLOWED_ENABLED)
44+
45+
// Send a message (asynchronously)
46+
msg := context.CreateTextMessageWithString("some text")
47+
errSend := producer.Send(asyncQueue, msg)
48+
assert.Nil(t, errSend)
49+
50+
// Tidy up the message to leave the test clean.
51+
consumer, errCons := context.CreateConsumer(asyncQueue)
52+
assert.Nil(t, errCons)
53+
if consumer != nil {
54+
defer consumer.Close()
55+
}
56+
_, errRvc := consumer.ReceiveStringBodyNoWait()
57+
assert.Nil(t, errRvc)
58+
59+
}
60+
61+
/*
62+
* Compare the performance benefit of sending messages non-persistent, non-transational
63+
* messages asynchronously - which can give a higher message rate, in exchange for
64+
* less/no checking for errors.
65+
*
66+
* The test checks that async put is at least 10% faster than synchronous put.
67+
* (in testing against a remote queue manager it was actually 30% faster)
68+
*/
69+
func TestAsyncPutComparison(t *testing.T) {
70+
71+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
72+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
73+
assert.Nil(t, cfErr)
74+
75+
// Check the default value for SendCheckCount, which means never check for errors.
76+
assert.Equal(t, 0, cf.SendCheckCount)
77+
78+
// Creates a connection to the queue manager, using defer to close it automatically
79+
// at the end of the function (if it was created successfully)
80+
context, ctxErr := cf.CreateContext()
81+
assert.Nil(t, ctxErr)
82+
if context != nil {
83+
defer context.Close()
84+
}
85+
86+
// --------------------------------------------------------
87+
// Start by sending a set of messages using the normal synchronous approach, in
88+
// order that we can get a baseline timing.
89+
3990
// Set up the producer and consumer with the SYNCHRONOUS (not async yet) queue
4091
syncQueue := context.CreateQueue("DEV.QUEUE.1")
4192
producer := context.CreateProducer().SetDeliveryMode(jms20subset.DeliveryMode_NON_PERSISTENT).SetTimeToLive(60000)
@@ -53,7 +104,7 @@ func TestAsyncPut(t *testing.T) {
53104
numberMessages := 50
54105

55106
// First get a baseline for how long it takes us to send the batch of messages
56-
// WITHOUT async put.
107+
// WITHOUT async put (i.e. using normal synchronous put)
57108
syncStartTime := currentTimeMillis()
58109
for i := 0; i < numberMessages; i++ {
59110

@@ -68,7 +119,7 @@ func TestAsyncPut(t *testing.T) {
68119
syncSendTime := syncEndTime - syncStartTime
69120
//fmt.Println("Took " + strconv.FormatInt(syncSendTime, 10) + "ms to send " + strconv.Itoa(numberMessages) + " synchronous messages.")
70121

71-
// Receive the messages back again
122+
// Receive the messages back again to tidy the queue back to a clean state
72123
finishedReceiving := false
73124
rcvCount := 0
74125

@@ -103,7 +154,7 @@ func TestAsyncPut(t *testing.T) {
103154
asyncSendTime := asyncEndTime - asyncStartTime
104155
//fmt.Println("Took " + strconv.FormatInt(asyncSendTime, 10) + "ms to send " + strconv.Itoa(numberMessages) + " ASYNC messages.")
105156

106-
// Receive the messages back again
157+
// Receive the messages back again to tidy the queue back to a clean state
107158
finishedReceiving = false
108159
rcvCount = 0
109160

@@ -128,6 +179,193 @@ func TestAsyncPut(t *testing.T) {
128179

129180
}
130181

182+
/*
183+
* Test the ability to successfully send async messages with checking enabled.
184+
*
185+
* This test is checking that no failures are reported when the interval checking
186+
* is enabled.
187+
*/
188+
func TestAsyncPutCheckCount(t *testing.T) {
189+
190+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
191+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
192+
assert.Nil(t, cfErr)
193+
194+
// Set the CF flag to enable checking for errors after a certain number of messages
195+
cf.SendCheckCount = 10
196+
197+
// Check the default value for SendCheckCount
198+
assert.Equal(t, 10, cf.SendCheckCount)
199+
200+
// Creates a connection to the queue manager, using defer to close it automatically
201+
// at the end of the function (if it was created successfully)
202+
context, ctxErr := cf.CreateContext()
203+
assert.Nil(t, ctxErr)
204+
if context != nil {
205+
defer context.Close()
206+
}
207+
208+
// Set up the producer and consumer with the async queue.
209+
asyncQueue := context.CreateQueue("DEV.QUEUE.1").SetPutAsyncAllowed(jms20subset.Destination_PUT_ASYNC_ALLOWED_ENABLED)
210+
producer := context.CreateProducer().SetDeliveryMode(jms20subset.DeliveryMode_NON_PERSISTENT)
211+
212+
// Create a unique message prefix representing this execution of the test case.
213+
testcasePrefix := strconv.FormatInt(currentTimeMillis(), 10)
214+
msgPrefix := "checkCount_" + testcasePrefix + "_"
215+
numberMessages := 50
216+
217+
// --------------------------------------------------------
218+
// Do ASYNC message put
219+
for i := 0; i < numberMessages; i++ {
220+
221+
// Create a TextMessage and send it.
222+
msg := context.CreateTextMessageWithString(msgPrefix + strconv.Itoa(i))
223+
224+
errSend := producer.Send(asyncQueue, msg)
225+
assert.Nil(t, errSend)
226+
}
227+
228+
// ----------------------------------
229+
// Receive the messages back again to tidy the queue back to a clean state
230+
consumer, errCons := context.CreateConsumer(asyncQueue)
231+
assert.Nil(t, errCons)
232+
if consumer != nil {
233+
defer consumer.Close()
234+
}
235+
236+
finishedReceiving := false
237+
238+
for !finishedReceiving {
239+
rcvMsg, errRvc := consumer.ReceiveNoWait()
240+
assert.Nil(t, errRvc)
241+
242+
if rcvMsg == nil {
243+
finishedReceiving = true
244+
}
245+
}
246+
}
247+
248+
/*
249+
* Validate that errors are reported at the correct interval when a problem occurs.
250+
*
251+
* This test case forces a failure to occur by sending 50 messages to a queue that has had its
252+
* maximum depth set to 25. With SendCheckCount of 10 we will not receive an error until message 30,
253+
* which is the first time the error check is made after the point at which the queue has filled up.
254+
*/
255+
func TestAsyncPutCheckCountWithFailure(t *testing.T) {
256+
257+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
258+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
259+
assert.Nil(t, cfErr)
260+
261+
// Set the CF flag to enable checking for errors after a certain number of messages
262+
cf.SendCheckCount = 10
263+
264+
// Check the value for SendCheckCount was stored correctly.
265+
assert.Equal(t, 10, cf.SendCheckCount)
266+
267+
// Creates a connection to the queue manager, using defer to close it automatically
268+
// at the end of the function (if it was created successfully)
269+
context, ctxErr := cf.CreateContext()
270+
assert.Nil(t, ctxErr)
271+
if context != nil {
272+
defer context.Close()
273+
}
274+
275+
// Set up the producer and consumer with the async queue.
276+
QUEUE_25_NAME := "DEV.MAXDEPTH25"
277+
asyncQueue := context.CreateQueue(QUEUE_25_NAME).SetPutAsyncAllowed(jms20subset.Destination_PUT_ASYNC_ALLOWED_ENABLED)
278+
producer := context.CreateProducer().SetDeliveryMode(jms20subset.DeliveryMode_NON_PERSISTENT)
279+
280+
// Create a unique message prefix representing this execution of the test case.
281+
testcasePrefix := strconv.FormatInt(currentTimeMillis(), 10)
282+
msgPrefix := "checkCount_" + testcasePrefix + "_"
283+
numberMessages := 50
284+
285+
// Variable to track whether the queue exists or not.
286+
queueExists := true
287+
288+
// --------------------------------------------------------
289+
// Send ASYNC message put
290+
for i := 0; i < numberMessages; i++ {
291+
292+
// Create a TextMessage and send it.
293+
msg := context.CreateTextMessageWithString(msgPrefix + strconv.Itoa(i))
294+
295+
errSend := producer.Send(asyncQueue, msg)
296+
297+
// Messages will start to fail at number 25 but we don't get an error until
298+
// the next check which takes place at 30.
299+
if i == 0 && errSend != nil && errSend.GetReason() == "MQRC_UNKNOWN_OBJECT_NAME" {
300+
301+
fmt.Println("Skipping TestAsyncPutCheckCountWithFailure as " + QUEUE_25_NAME + " is not defined.")
302+
queueExists = false
303+
break // Stop the loop at this point as we know it won't change.
304+
305+
} else if i < 30 {
306+
assert.Nil(t, errSend)
307+
} else if i == 30 {
308+
309+
assert.NotNil(t, errSend)
310+
assert.Equal(t, "AsyncPutFailure", errSend.GetErrorCode())
311+
312+
// Message should be "N failures"
313+
assert.True(t, strings.Contains(errSend.GetReason(), "6 failures"))
314+
assert.True(t, strings.Contains(errSend.GetReason(), "0 warnings"))
315+
316+
// Linked message should have reason of MQRC_Q_FULL
317+
linkedErr := errSend.GetLinkedError()
318+
assert.NotNil(t, linkedErr)
319+
linkedReason := linkedErr.(jms20subset.JMSExceptionImpl).GetReason()
320+
assert.Equal(t, "MQRC_Q_FULL", linkedReason)
321+
322+
} else if i == 40 {
323+
324+
assert.NotNil(t, errSend)
325+
assert.Equal(t, "AsyncPutFailure", errSend.GetErrorCode())
326+
327+
// Message should be "N failures"
328+
assert.True(t, strings.Contains(errSend.GetReason(), "10 failures")) // all of these failed
329+
assert.True(t, strings.Contains(errSend.GetReason(), "0 warnings"))
330+
331+
// Linked message should have reason of MQRC_Q_FULL
332+
linkedErr := errSend.GetLinkedError()
333+
assert.NotNil(t, linkedErr)
334+
linkedReason := linkedErr.(jms20subset.JMSExceptionImpl).GetReason()
335+
assert.Equal(t, "MQRC_Q_FULL", linkedReason)
336+
337+
} else {
338+
// Messages 31, 32, ... 39, 41, 42, ...
339+
// do not give an error because we don't make an error check.
340+
assert.Nil(t, errSend)
341+
}
342+
}
343+
344+
// If the queue exists then tidy up the messages we sent.
345+
if queueExists {
346+
347+
// ----------------------------------
348+
// Receive the messages back again to tidy the queue back to a clean state
349+
consumer, errCons := context.CreateConsumer(asyncQueue)
350+
assert.Nil(t, errCons)
351+
if consumer != nil {
352+
defer consumer.Close()
353+
}
354+
355+
// Receive the messages back again
356+
finishedReceiving := false
357+
358+
for !finishedReceiving {
359+
rcvMsg, errRvc := consumer.ReceiveNoWait()
360+
assert.Nil(t, errRvc)
361+
362+
if rcvMsg == nil {
363+
finishedReceiving = true
364+
}
365+
}
366+
}
367+
}
368+
131369
/*
132370
* Test the getter/setter functions for controlling async put.
133371
*/

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/ibm-messaging/mq-golang-jms20
33
go 1.13
44

55
require (
6+
github.com/ibm-messaging/mq-golang v3.0.0+incompatible
67
github.com/ibm-messaging/mq-golang/v5 v5.1.3
78
github.com/stretchr/testify v1.4.0
89
)

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
33
github.com/ibm-messaging/mq-golang v1.0.1-0.20190820103725-19b946c185a8 h1:kUwSXeftVen12FRnShG+Ykhb2Kd6Cd/DbpWwbYal7j0=
44
github.com/ibm-messaging/mq-golang v1.0.1-0.20190820103725-19b946c185a8/go.mod h1:qjsZDb7m1oKnbPeDma2JVJTKgyCA91I4bcJ1qHY+gcA=
55
github.com/ibm-messaging/mq-golang v3.0.0+incompatible h1:Yc3c8emAyveT54uNDRMkgvS+EBAHeLNWHkc3hk5x+IY=
6+
github.com/ibm-messaging/mq-golang v3.0.0+incompatible/go.mod h1:qjsZDb7m1oKnbPeDma2JVJTKgyCA91I4bcJ1qHY+gcA=
67
github.com/ibm-messaging/mq-golang/v5 v5.0.0 h1:9J8bsDoCo60rbSgB7ZAURPG3L5Kpr+F8dYNOwQ7Qnnk=
78
github.com/ibm-messaging/mq-golang/v5 v5.0.0/go.mod h1:ywCwmYbJOU/E0rl+z4GiNoxVMty68O+LVO39a1VMXrE=
89
github.com/ibm-messaging/mq-golang/v5 v5.1.2 h1:u0e1Vce2TNqJpH088vF77rDMsnMRWnGaOIlxZo4DMZc=

jms20subset/Destination.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ type Destination interface {
2626
// SetPutAsyncAllowed controls whether asynchronous put is allowed for this
2727
// destination.
2828
//
29+
// See also ConnectionFactoryImpl.SendCheckCount to control the frequency with
30+
// which checks will be made for errors. Default of 0 (zero) means no error checks
31+
// will be made for errors during async put.
32+
//
2933
// Permitted values are:
3034
// * Destination_PUT_ASYNC_ALLOWED_ENABLED - enables async put
3135
// * Destination_PUT_ASYNC_ALLOWED_DISABLED - disables async put

jms20subset/JMSException.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type JMSException interface {
1919
GetReason() string
2020
GetErrorCode() string
2121
GetLinkedError() error
22+
Error() string
2223
}
2324

2425
// JMSExceptionImpl is a struct that implements the JMSException interface

mqjms/ConnectionFactoryImpl.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@ type ConnectionFactoryImpl struct {
4646

4747
// Controls the size of the buffer used when receiving a message (default is 32kb if not set)
4848
ReceiveBufferSize int
49+
50+
// SetCheckCount defines the number of messages that will be asynchronously put using
51+
// this Context between checks for errors. For example a value of 10 will cause an error
52+
// check to be triggered once for every 10 messages.
53+
//
54+
// See also Destination#SetPutAsyncAllowed(int)
55+
//
56+
// Default of 0 (zero) means that no checks are made for asynchronous put calls.
57+
SendCheckCount int
4958
}
5059

5160
// CreateContext implements the JMS method to create a connection to an IBM MQ
@@ -131,12 +140,20 @@ func (cf ConnectionFactoryImpl) CreateContextWithSessionMode(sessionMode int) (j
131140

132141
if err == nil {
133142

143+
// Initialize the countInc value to 1 so that if CheckCount is enabled (>0)
144+
// then an error check will be made after the first message - to catch any
145+
// failures quickly.
146+
countInc := new(int)
147+
*countInc = 1
148+
134149
// Connection was created successfully, so we wrap the MQI object into
135150
// a new ContextImpl and return it to the caller.
136151
ctx = ContextImpl{
137152
qMgr: qMgr,
138153
sessionMode: sessionMode,
139154
receiveBufferSize: cf.ReceiveBufferSize,
155+
sendCheckCount: cf.SendCheckCount,
156+
sendCheckCountInc: countInc,
140157
}
141158

142159
} else {

mqjms/ContextImpl.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ type ContextImpl struct {
2222
qMgr ibmmq.MQQueueManager
2323
sessionMode int
2424
receiveBufferSize int
25+
sendCheckCount int
26+
sendCheckCountInc *int // Internal counter to keep track of async-put messages sent
2527
}
2628

2729
// CreateQueue implements the logic necessary to create a provider-specific

0 commit comments

Comments
 (0)