Skip to content

Commit 82cb56f

Browse files
committed
Support receive with wait
1 parent 1718dec commit 82cb56f

File tree

5 files changed

+217
-5
lines changed

5 files changed

+217
-5
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ your own error handling or logging.
113113
* Creating a ConnectionFactory that uses a client connection to a remote queue manager - [connectionfactory_test.go](connectionfactory_test.go)
114114
* Creating a ConnectionFactory that uses a bindings connection to a local queue manager - [local_bindings_test.go](local_bindings_test.go)
115115
* Create a connection using anonymous (one-way) TLS encryption or mutual TLS authentication - [tls_connections_test.go](tls_connections_test.go)
116-
* Send/receive a text string - [sample_sendreceive_test.go](sample_sendreceive_test.go)
116+
* Send/receive (with no wait) a text string - [sample_sendreceive_test.go](sample_sendreceive_test.go)
117+
* Receive with wait [receivewithwait_test.go](receivewithwait_test.go)
117118
* Send a message as Persistent or NonPersistent - [deliverymode_test.go](deliverymode_test.go)
118119
* Get by CorrelationID - [getbycorrelid_test.go](getbycorrelid_test.go)
119120
* Request/reply messaging pattern - [requestreply_test.go](requestreply_test.go)

jms20subset/JMSConsumer.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,21 @@ type JMSConsumer interface {
2424

2525
// ReceiveStringBodyNoWait receives the next message for this JMSConsumer
2626
// and returns its body as a string. If a message is not immediately
27-
//available a nil is returned.
27+
// available a nil is returned.
2828
ReceiveStringBodyNoWait() (*string, JMSException)
2929

30+
// Receive(waitMillis) returns a message if one is available, or otherwise
31+
// waits for up to the specified number of milliseconds for one to become
32+
// available. A value of zero or less indicates to wait indefinitely.
33+
Receive(waitMillis int32) (Message, JMSException)
34+
35+
// ReceiveStringBody returns the body of a message as a string if one is
36+
// available. If a message is not immediately available the method will
37+
// block for up to the specified number of milliseconds to wait for one
38+
// to become available. A value of zero or less indicates to wait
39+
// indefinitely.
40+
ReceiveStringBody(waitMillis int32) (*string, JMSException)
41+
3042
// Closes the JMSConsumer in order to free up any resources that were
3143
// allocated by the provider on behalf of this consumer.
3244
Close()

mqjms/ConsumerImpl.go

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,42 @@ type ConsumerImpl struct {
2929
// message to be received.
3030
func (consumer ConsumerImpl) ReceiveNoWait() (jms20subset.Message, jms20subset.JMSException) {
3131

32+
gmo := ibmmq.NewMQGMO()
33+
return consumer.receiveInternal(gmo)
34+
35+
}
36+
37+
// Receive(waitMillis) returns a message if one is available, or otherwise
38+
// waits for up to the specified number of milliseconds for one to become
39+
// available. A value of zero or less indicates to wait indefinitely.
40+
func (consumer ConsumerImpl) Receive(waitMillis int32) (jms20subset.Message, jms20subset.JMSException) {
41+
42+
if waitMillis <= 0 {
43+
waitMillis = ibmmq.MQWI_UNLIMITED
44+
}
45+
46+
gmo := ibmmq.NewMQGMO()
47+
gmo.Options |= ibmmq.MQGMO_WAIT
48+
gmo.WaitInterval = waitMillis
49+
50+
return consumer.receiveInternal(gmo)
51+
52+
}
53+
54+
// Internal method to provide common functionality across the different types
55+
// of receive.
56+
func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Message, jms20subset.JMSException) {
57+
3258
// Prepare objects to be used in receiving the message.
3359
var msg jms20subset.Message
3460
var jmsErr jms20subset.JMSException
3561

3662
getmqmd := ibmmq.NewMQMD()
37-
gmo := ibmmq.NewMQGMO()
3863
buffer := make([]byte, 32768)
3964

4065
// Set the GMO (get message options)
41-
gmo.Options = ibmmq.MQGMO_NO_SYNCPOINT | ibmmq.MQGMO_FAIL_IF_QUIESCING
66+
gmo.Options |= ibmmq.MQGMO_NO_SYNCPOINT
67+
gmo.Options |= ibmmq.MQGMO_FAIL_IF_QUIESCING
4268

4369
// Apply the selector if one has been specified in the Consumer
4470
err := applySelector(consumer.selector, getmqmd, gmo)
@@ -123,6 +149,36 @@ func (consumer ConsumerImpl) ReceiveStringBodyNoWait() (*string, jms20subset.JMS
123149

124150
}
125151

152+
// ReceiveStringBody implements the IBM MQ logic necessary to receive a
153+
// message from a Destination and return its body as a string.
154+
//
155+
// If no message is available the method blocks up to the specified number
156+
// of milliseconds for one to become available.
157+
func (consumer ConsumerImpl) ReceiveStringBody(waitMillis int32) (*string, jms20subset.JMSException) {
158+
159+
var msgBodyStrPtr *string
160+
var jmsErr jms20subset.JMSException
161+
162+
// Get a message from the queue if one is available.
163+
msg, jmsErr := consumer.Receive(waitMillis)
164+
165+
// If we receive a message without any errors
166+
if jmsErr == nil && msg != nil {
167+
168+
switch msg := msg.(type) {
169+
case jms20subset.TextMessage:
170+
msgBodyStrPtr = msg.GetText()
171+
default:
172+
jmsErr = jms20subset.CreateJMSException(
173+
"Received message is not a TextMessage", "MQJMS6068", nil)
174+
}
175+
176+
}
177+
178+
return msgBodyStrPtr, jmsErr
179+
180+
}
181+
126182
// applySelector is responsible for converting the JMS style selector string
127183
// into the relevant options on the MQI structures so that the correct messages
128184
// are received by the application.

next-features.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ project!
55

66
Not currently implemented:
77
--------------------------
8-
- ReceiveWithWait
98
- Cascade close from JMSContext to producer/consumer objects
109
- BytesMessage, receiveBytesBody
1110
- Local transactions (e.g. allow request/reply under transaction)

receivewithwait_test.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright (c) IBM Corporation 2019
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License v. 2.0, which is available at
6+
* http://www.eclipse.org/legal/epl-2.0.
7+
*
8+
* SPDX-License-Identifier: EPL-2.0
9+
*/
10+
package main
11+
12+
import (
13+
"github.com/ibm-messaging/mq-golang-jms20/mqjms"
14+
"github.com/stretchr/testify/assert"
15+
"testing"
16+
"time"
17+
)
18+
19+
/*
20+
* Test the behaviour of the receive-with-wait methods.
21+
*/
22+
func TestReceiveWait(t *testing.T) {
23+
24+
// Loads CF parameters from connection_info.json and apiKey.json in the Downloads directory
25+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
26+
assert.Nil(t, cfErr)
27+
28+
// Creates a connection to the queue manager, using defer to close it automatically
29+
// at the end of the function (if it was created successfully)
30+
context, ctxErr := cf.CreateContext()
31+
assert.Nil(t, ctxErr)
32+
if context != nil {
33+
defer context.Close()
34+
}
35+
36+
// Equivalent to a JNDI lookup or other declarative definition
37+
queue := context.CreateQueue("DEV.QUEUE.1")
38+
39+
// Set up the consumer ready to receive messages.
40+
consumer, conErr := context.CreateConsumer(queue)
41+
assert.Nil(t, conErr)
42+
if consumer != nil {
43+
defer consumer.Close()
44+
}
45+
46+
// Check no message on the queue to start with
47+
testMsg, err1 := consumer.ReceiveNoWait()
48+
assert.Nil(t, err1)
49+
assert.Nil(t, testMsg)
50+
51+
waitTime := int32(500)
52+
53+
// Since there is no message on the queue, the receiveWait call should
54+
// wait for the requested period of time, before returning nil.
55+
startTime := currentTimeMillis()
56+
testMsg2, err2 := consumer.Receive(waitTime)
57+
endTime := currentTimeMillis()
58+
assert.Nil(t, testMsg2)
59+
assert.Nil(t, err2)
60+
61+
// Within a reasonable margin of the expected wait time.
62+
assert.True(t, (endTime-startTime-int64(waitTime)) > -100)
63+
64+
// Send a message.
65+
err := context.CreateProducer().SendString(queue, "MyMessage")
66+
assert.Nil(t, err)
67+
68+
// With a message on the queue the receiveWait call should return immediately
69+
// with the message.
70+
startTime2 := currentTimeMillis()
71+
testMsg3, err3 := consumer.Receive(waitTime)
72+
endTime2 := currentTimeMillis()
73+
assert.NotNil(t, testMsg3)
74+
assert.Nil(t, err3)
75+
76+
// Should not wait the entire waitTime duration
77+
assert.True(t, (endTime2-startTime2) < 300)
78+
79+
}
80+
81+
func TestReceiveStringBodyWait(t *testing.T) {
82+
83+
// Loads CF parameters from connection_info.json and apiKey.json in the Downloads directory
84+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
85+
assert.Nil(t, cfErr)
86+
87+
// Creates a connection to the queue manager, using defer to close it automatically
88+
// at the end of the function (if it was created successfully)
89+
context, ctxErr := cf.CreateContext()
90+
assert.Nil(t, ctxErr)
91+
if context != nil {
92+
defer context.Close()
93+
}
94+
95+
// Equivalent to a JNDI lookup or other declarative definition
96+
queue := context.CreateQueue("DEV.QUEUE.1")
97+
98+
// Set up the consumer ready to receive messages.
99+
consumer, conErr := context.CreateConsumer(queue)
100+
assert.Nil(t, conErr)
101+
if consumer != nil {
102+
defer consumer.Close()
103+
}
104+
105+
// Check no message on the queue to start with
106+
msg, err1 := consumer.ReceiveStringBodyNoWait()
107+
assert.Nil(t, err1)
108+
assert.Nil(t, msg)
109+
110+
waitTime := int32(500)
111+
112+
// Since there is no message on the queue, the receiveWait call should
113+
// wait for the requested period of time, before returning nil.
114+
startTime := currentTimeMillis()
115+
msg2, err2 := consumer.ReceiveStringBody(waitTime)
116+
endTime := currentTimeMillis()
117+
assert.Nil(t, msg2)
118+
assert.Nil(t, err2)
119+
120+
// Within a reasonable margin of the expected wait time.
121+
assert.True(t, (endTime-startTime-int64(waitTime)) > -100)
122+
123+
// Send a message.
124+
inputMsg := "MyMessageTest"
125+
err := context.CreateProducer().SendString(queue, inputMsg)
126+
assert.Nil(t, err)
127+
128+
// With a message on the queue the receiveWait call should return immediately
129+
// with the message.
130+
startTime2 := currentTimeMillis()
131+
msg3, err3 := consumer.ReceiveStringBody(waitTime)
132+
endTime2 := currentTimeMillis()
133+
assert.Nil(t, err3)
134+
assert.Equal(t, inputMsg, *msg3)
135+
136+
// Should not wait the entire waitTime duration
137+
assert.True(t, (endTime2-startTime2) < 300)
138+
139+
}
140+
141+
// Return the current time in milliseconds
142+
func currentTimeMillis() int64 {
143+
return int64(time.Nanosecond) * time.Now().UnixNano() / int64(time.Millisecond)
144+
}

0 commit comments

Comments
 (0)