Skip to content

Commit 2e4d810

Browse files
committed
Async Put, with no checks for errors - #32
1 parent 6fda5e2 commit 2e4d810

File tree

6 files changed

+241
-9
lines changed

6 files changed

+241
-9
lines changed

asyncput_test.go

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Copyright (c) IBM Corporation 2021
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License v. 2.0, which is available at
6+
* http://www.eclipse.org/legal/epl-2.0.
7+
*
8+
* SPDX-License-Identifier: EPL-2.0
9+
*/
10+
package main
11+
12+
import (
13+
"strconv"
14+
"testing"
15+
16+
"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
17+
"github.com/ibm-messaging/mq-golang-jms20/mqjms"
18+
"github.com/stretchr/testify/assert"
19+
)
20+
21+
/*
22+
* Test the ability to send a message asynchronously, which can give a higher
23+
* rate of sending non-persistent messages, in exchange for less/no checking for errors.
24+
*/
25+
func TestAsyncPut(t *testing.T) {
26+
27+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
28+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
29+
assert.Nil(t, cfErr)
30+
31+
// Creates a connection to the queue manager, using defer to close it automatically
32+
// at the end of the function (if it was created successfully)
33+
context, ctxErr := cf.CreateContext()
34+
assert.Nil(t, ctxErr)
35+
if context != nil {
36+
defer context.Close()
37+
}
38+
39+
// Set up the producer and consumer with the SYNCHRONOUS (not async yet) queue
40+
syncQueue := context.CreateQueue("DEV.QUEUE.1")
41+
producer := context.CreateProducer().SetDeliveryMode(jms20subset.DeliveryMode_NON_PERSISTENT).SetTimeToLive(60000)
42+
43+
consumer, errCons := context.CreateConsumer(syncQueue)
44+
assert.Nil(t, errCons)
45+
if consumer != nil {
46+
defer consumer.Close()
47+
}
48+
49+
// Create a unique message prefix representing this execution of the test case.
50+
testcasePrefix := strconv.FormatInt(currentTimeMillis(), 10)
51+
syncMsgPrefix := "syncput_" + testcasePrefix + "_"
52+
asyncMsgPrefix := "asyncput_" + testcasePrefix + "_"
53+
numberMessages := 50
54+
55+
// First get a baseline for how long it takes us to send the batch of messages
56+
// WITHOUT async put.
57+
syncStartTime := currentTimeMillis()
58+
for i := 0; i < numberMessages; i++ {
59+
60+
// Create a TextMessage and send it.
61+
msg := context.CreateTextMessageWithString(syncMsgPrefix + strconv.Itoa(i))
62+
63+
errSend := producer.Send(syncQueue, msg)
64+
assert.Nil(t, errSend)
65+
}
66+
syncEndTime := currentTimeMillis()
67+
68+
syncSendTime := syncEndTime - syncStartTime
69+
//fmt.Println("Took " + strconv.FormatInt(syncSendTime, 10) + "ms to send " + strconv.Itoa(numberMessages) + " synchronous messages.")
70+
71+
// Receive the messages back again
72+
finishedReceiving := false
73+
rcvCount := 0
74+
75+
for !finishedReceiving {
76+
rcvTxt, errRvc := consumer.ReceiveStringBodyNoWait()
77+
assert.Nil(t, errRvc)
78+
79+
if rcvTxt != nil {
80+
// Check the message bod matches what we expect
81+
assert.Equal(t, syncMsgPrefix+strconv.Itoa(rcvCount), *rcvTxt)
82+
rcvCount++
83+
} else {
84+
finishedReceiving = true
85+
}
86+
}
87+
88+
// --------------------------------------------------------
89+
// Now repeat the experiment but with ASYNC message put
90+
asyncQueue := context.CreateQueue("DEV.QUEUE.1").SetPutAsyncAllowed(jms20subset.Destination_PUT_ASYNC_ALLOWED_ENABLED)
91+
92+
asyncStartTime := currentTimeMillis()
93+
for i := 0; i < numberMessages; i++ {
94+
95+
// Create a TextMessage and send it.
96+
msg := context.CreateTextMessageWithString(asyncMsgPrefix + strconv.Itoa(i))
97+
98+
errSend := producer.Send(asyncQueue, msg)
99+
assert.Nil(t, errSend)
100+
}
101+
asyncEndTime := currentTimeMillis()
102+
103+
asyncSendTime := asyncEndTime - asyncStartTime
104+
//fmt.Println("Took " + strconv.FormatInt(asyncSendTime, 10) + "ms to send " + strconv.Itoa(numberMessages) + " ASYNC messages.")
105+
106+
// Receive the messages back again
107+
finishedReceiving = false
108+
rcvCount = 0
109+
110+
for !finishedReceiving {
111+
rcvTxt, errRvc := consumer.ReceiveStringBodyNoWait()
112+
assert.Nil(t, errRvc)
113+
114+
if rcvTxt != nil {
115+
// Check the message bod matches what we expect
116+
assert.Equal(t, asyncMsgPrefix+strconv.Itoa(rcvCount), *rcvTxt)
117+
rcvCount++
118+
} else {
119+
finishedReceiving = true
120+
}
121+
}
122+
123+
assert.Equal(t, numberMessages, rcvCount)
124+
125+
// Expect that async put is at least 10% faster than sync put for non-persistent messages
126+
// (in testing against a remote queue manager it was actually 30% faster)
127+
assert.True(t, 100*asyncSendTime < 90*syncSendTime)
128+
129+
}
130+
131+
/*
132+
* Test the getter/setter functions for controlling async put.
133+
*/
134+
func TestAsyncPutGetterSetter(t *testing.T) {
135+
136+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
137+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
138+
assert.Nil(t, cfErr)
139+
140+
// Creates a connection to the queue manager, using defer to close it automatically
141+
// at the end of the function (if it was created successfully)
142+
context, ctxErr := cf.CreateContext()
143+
assert.Nil(t, ctxErr)
144+
if context != nil {
145+
defer context.Close()
146+
}
147+
148+
// Set up the producer and consumer
149+
queue := context.CreateQueue("DEV.QUEUE.1")
150+
151+
// Check the default
152+
assert.Equal(t, jms20subset.Destination_PUT_ASYNC_ALLOWED_AS_DEST, queue.GetPutAsyncAllowed())
153+
154+
// Check enabled
155+
queue = queue.SetPutAsyncAllowed(jms20subset.Destination_PUT_ASYNC_ALLOWED_ENABLED)
156+
assert.Equal(t, jms20subset.Destination_PUT_ASYNC_ALLOWED_ENABLED, queue.GetPutAsyncAllowed())
157+
158+
// Check disabled
159+
queue = queue.SetPutAsyncAllowed(jms20subset.Destination_PUT_ASYNC_ALLOWED_DISABLED)
160+
assert.Equal(t, jms20subset.Destination_PUT_ASYNC_ALLOWED_DISABLED, queue.GetPutAsyncAllowed())
161+
162+
// Check as-dest
163+
queue = queue.SetPutAsyncAllowed(jms20subset.Destination_PUT_ASYNC_ALLOWED_AS_DEST)
164+
assert.Equal(t, jms20subset.Destination_PUT_ASYNC_ALLOWED_AS_DEST, queue.GetPutAsyncAllowed())
165+
166+
}

jms20subset/Destination.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,32 @@ type Destination interface {
2222
// is automatically implemented by every object, so we need to define at least
2323
// one method here in order to make it meet the JMS style semantics.
2424
GetDestinationName() string
25+
26+
// SetPutAsyncAllowed controls whether asynchronous put is allowed for this
27+
// destination.
28+
//
29+
// Permitted values are:
30+
// * Destination_PUT_ASYNC_ALLOWED_ENABLED - enables async put
31+
// * Destination_PUT_ASYNC_ALLOWED_DISABLED - disables async put
32+
// * Destination_PUT_ASYNC_ALLOWED_AS_DEST - delegate to queue configuration (default)
33+
SetPutAsyncAllowed(paa int) Queue
34+
35+
// GetPutAsyncAllowed returns whether asynchronous put is configured for this
36+
// destination.
37+
//
38+
// Returned value is one of:
39+
// * Destination_PUT_ASYNC_ALLOWED_ENABLED - async put is enabled
40+
// * Destination_PUT_ASYNC_ALLOWED_DISABLED - async put is disabled
41+
// * Destination_PUT_ASYNC_ALLOWED_AS_DEST - delegated to queue configuration (default)
42+
GetPutAsyncAllowed() int
2543
}
44+
45+
// Destination_PUT_ASYNC_ALLOWED_ENABLED is used to enable messages being sent asynchronously.
46+
const Destination_PUT_ASYNC_ALLOWED_ENABLED int = 1
47+
48+
// Destination_PUT_ASYNC_ALLOWED_DISABLED is used to disable messages being sent asynchronously.
49+
const Destination_PUT_ASYNC_ALLOWED_DISABLED int = 0
50+
51+
// Destination_PUT_ASYNC_ALLOWED_AS_DEST allows the async message behaviour to be controlled by
52+
// the queue on the queue manager.
53+
const Destination_PUT_ASYNC_ALLOWED_AS_DEST int = -1

jms20subset/Queue.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,11 @@ package jms20subset
1515
// specifies the identity of a queue to the JMS API functions.
1616
type Queue interface {
1717

18+
// Encapsulate the root Destination type so that this interface "inherits" the
19+
// accessors for standard attributes that apply to all destination types
20+
Destination
21+
1822
// GetQueueName returns the provider-specific name of the queue that is
1923
// represented by this object.
2024
GetQueueName() string
21-
22-
// GetDestinationName returns the provider-specific name of the queue that is
23-
// represented by this object.
24-
//
25-
// This method is implemented to allow us to consider the Queue interface
26-
// as a specialization of the Destination interface.
27-
GetDestinationName() string
2825
}

mqjms/ContextImpl.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ func (ctx ContextImpl) CreateQueue(queueName string) jms20subset.Queue {
3030

3131
// Store the name of the queue
3232
queue := QueueImpl{
33-
queueName: queueName,
33+
queueName: queueName,
34+
putAsyncAllowed: jms20subset.Destination_PUT_ASYNC_ALLOWED_AS_DEST,
3435
}
3536

3637
return queue

mqjms/ProducerImpl.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ func (producer ProducerImpl) Send(dest jms20subset.Destination, msg jms20subset.
9191
// unique message ID
9292
pmo.Options = syncpointSetting | ibmmq.MQPMO_NEW_MSG_ID
9393

94+
// Is async put has been requested then apply the appropriate PMO option
95+
if dest.GetPutAsyncAllowed() == jms20subset.Destination_PUT_ASYNC_ALLOWED_ENABLED {
96+
pmo.Options |= ibmmq.MQPMO_ASYNC_RESPONSE
97+
}
98+
9499
// Convert the JMS persistence into the equivalent MQ message descriptor
95100
// attribute.
96101
if producer.deliveryMode == jms20subset.DeliveryMode_NON_PERSISTENT {

mqjms/QueueImpl.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,18 @@
99
// Package mqjms provides the implementation of the JMS style Golang interfaces to communicate with IBM MQ.
1010
package mqjms
1111

12+
import (
13+
"fmt"
14+
"strconv"
15+
16+
"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
17+
)
18+
1219
// QueueImpl encapsulates the provider-specific attributes necessary to
1320
// communicate with an IBM MQ queue.
1421
type QueueImpl struct {
15-
queueName string
22+
queueName string
23+
putAsyncAllowed int
1624
}
1725

1826
// GetQueueName returns the provider-specific name of the queue that is
@@ -30,3 +38,30 @@ func (queue QueueImpl) GetDestinationName() string {
3038
return queue.queueName
3139

3240
}
41+
42+
// SetPutAsyncAllowed allows the async allowed setting to be updated.
43+
func (queue QueueImpl) SetPutAsyncAllowed(paa int) jms20subset.Queue {
44+
45+
// Check that the specified paa parameter is one of the values that we permit,
46+
// and if so store that value inside queue.
47+
if paa == jms20subset.Destination_PUT_ASYNC_ALLOWED_ENABLED ||
48+
paa == jms20subset.Destination_PUT_ASYNC_ALLOWED_DISABLED ||
49+
paa == jms20subset.Destination_PUT_ASYNC_ALLOWED_AS_DEST {
50+
51+
queue.putAsyncAllowed = paa
52+
53+
} else {
54+
// Normally we would throw an error here to indicate that an invalid value
55+
// was specified, however we have decided that it is more useful to support
56+
// method chaining, which prevents us from returning an error object.
57+
// Instead we settle for printing an error message to the console.
58+
fmt.Println("Invalid PutAsyncAllowed value specified: " + strconv.Itoa(paa))
59+
}
60+
61+
return queue
62+
}
63+
64+
// GetPutAsyncAllowed returns the current setting for async put.
65+
func (queue QueueImpl) GetPutAsyncAllowed() int {
66+
return queue.putAsyncAllowed
67+
}

0 commit comments

Comments
 (0)