Skip to content

Commit 0cb67ae

Browse files
authored
Merge pull request #26 from ibm-messaging/buffer-length
Make receive buffer length configurable - #9
2 parents ea18000 + 2c11d70 commit 0cb67ae

File tree

6 files changed

+333
-6
lines changed

6 files changed

+333
-6
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ your own error handling or logging.
123123
* Sending a message that expires after a period of time - [timetolive_test.go](timetolive_test.go)
124124
* Handle error codes returned by the queue manager - [sample_errorhandling_test.go](sample_errorhandling_test.go)
125125
* Set the application name (ApplName) on connections - [applname_test.go](applname_test.go)
126-
126+
* Receive messages over 32kb in size by setting the receive buffer size - [largemessage_test.go](largemessage_test.go)
127127

128128
As normal with Go, you can run any individual testcase by executing a command such as;
129129
```bash

largemessage_test.go

Lines changed: 314 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,314 @@
1+
/*
2+
* Copyright (c) IBM Corporation 2019
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License v. 2.0, which is available at
6+
* http://www.eclipse.org/legal/epl-2.0.
7+
*
8+
* SPDX-License-Identifier: EPL-2.0
9+
*/
10+
package main
11+
12+
import (
13+
"testing"
14+
"time"
15+
16+
"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
17+
"github.com/ibm-messaging/mq-golang-jms20/mqjms"
18+
"github.com/stretchr/testify/assert"
19+
)
20+
21+
/*
22+
* Test the send/receive of a large Text message, over the default 32KB size.
23+
*/
24+
func TestLargeTextMessage(t *testing.T) {
25+
26+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
27+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
28+
assert.Nil(t, cfErr)
29+
30+
// Creates a connection to the queue manager, using defer to close it automatically
31+
// at the end of the function (if it was created successfully)
32+
context, ctxErr := cf.CreateContext()
33+
assert.Nil(t, ctxErr)
34+
if context != nil {
35+
defer context.Close()
36+
}
37+
38+
// Get a long text string over 32kb in length
39+
txtOver32kb := getStringOver32kb()
40+
41+
// Create a TextMessage with it
42+
msg := context.CreateTextMessageWithString(txtOver32kb)
43+
44+
// Now send the message and get it back again.
45+
queue := context.CreateQueue("DEV.QUEUE.1")
46+
errSend := context.CreateProducer().SetTimeToLive(30000).Send(queue, msg)
47+
assert.Nil(t, errSend)
48+
49+
consumer, errCons := context.CreateConsumer(queue)
50+
assert.Nil(t, errCons)
51+
if consumer != nil {
52+
defer consumer.Close()
53+
}
54+
55+
// This will fail because the default buffer size when receiving a
56+
// message is 32kb.
57+
_, errRcv := consumer.ReceiveNoWait()
58+
assert.NotNil(t, errRcv)
59+
assert.Equal(t, "MQRC_TRUNCATED_MSG_FAILED", errRcv.GetReason())
60+
assert.Equal(t, "2080", errRcv.GetErrorCode())
61+
62+
// The message is still left on the queue since it failed to be received successfully.
63+
64+
// Since the buffer size is configured using the ConnectionFactoy we will
65+
// create a second connection in order to successfully retrieve the message.
66+
cf.ReceiveBufferSize = len(txtOver32kb) + 50
67+
68+
context2, ctxErr2 := cf.CreateContext()
69+
assert.Nil(t, ctxErr2)
70+
if context2 != nil {
71+
defer context2.Close()
72+
}
73+
74+
consumer2, errCons2 := context2.CreateConsumer(queue)
75+
assert.Nil(t, errCons2)
76+
if consumer2 != nil {
77+
defer consumer2.Close()
78+
}
79+
80+
rcvMsg2, errRcv2 := consumer2.ReceiveNoWait()
81+
assert.Nil(t, errRcv2)
82+
assert.NotNil(t, rcvMsg2)
83+
84+
switch msg2 := rcvMsg2.(type) {
85+
case jms20subset.TextMessage:
86+
assert.Equal(t, &txtOver32kb, msg2.GetText())
87+
default:
88+
assert.Fail(t, "Got something other than a text message")
89+
}
90+
91+
}
92+
93+
/*
94+
* Test the send/receive of a large Text message, over the default 32KB size.
95+
*/
96+
func TestLargeReceiveStringBodyTextMessage(t *testing.T) {
97+
98+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
99+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
100+
assert.Nil(t, cfErr)
101+
102+
// Creates a connection to the queue manager, using defer to close it automatically
103+
// at the end of the function (if it was created successfully)
104+
context, ctxErr := cf.CreateContext()
105+
assert.Nil(t, ctxErr)
106+
if context != nil {
107+
defer context.Close()
108+
}
109+
110+
// Get a long text string over 32kb in length
111+
txtOver32kb := getStringOver32kb()
112+
113+
// Create a TextMessage with it
114+
msg := context.CreateTextMessageWithString(txtOver32kb)
115+
116+
// Now send the message and get it back again.
117+
queue := context.CreateQueue("DEV.QUEUE.1")
118+
errSend := context.CreateProducer().SetTimeToLive(30000).Send(queue, msg)
119+
assert.Nil(t, errSend)
120+
121+
consumer, errCons := context.CreateConsumer(queue)
122+
assert.Nil(t, errCons)
123+
if consumer != nil {
124+
defer consumer.Close()
125+
}
126+
127+
// This will fail because the default buffer size when receiving a
128+
// message is 32kb.
129+
_, errRcv := consumer.ReceiveStringBodyNoWait()
130+
assert.NotNil(t, errRcv)
131+
assert.Equal(t, "MQRC_TRUNCATED_MSG_FAILED", errRcv.GetReason())
132+
assert.Equal(t, "2080", errRcv.GetErrorCode())
133+
134+
// The message is still left on the queue since it failed to be received successfully.
135+
136+
// Since the buffer size is configured using the ConnectionFactoy we will
137+
// create a second connection in order to successfully retrieve the message.
138+
cf.ReceiveBufferSize = len(txtOver32kb) + 50
139+
140+
context2, ctxErr2 := cf.CreateContext()
141+
assert.Nil(t, ctxErr2)
142+
if context2 != nil {
143+
defer context2.Close()
144+
}
145+
146+
consumer2, errCons2 := context2.CreateConsumer(queue)
147+
assert.Nil(t, errCons2)
148+
if consumer2 != nil {
149+
defer consumer2.Close()
150+
}
151+
152+
rcvStr, errRcv2 := consumer2.ReceiveStringBodyNoWait()
153+
assert.Nil(t, errRcv2)
154+
assert.Equal(t, &txtOver32kb, rcvStr)
155+
156+
}
157+
158+
/*
159+
* Test the send/receive of a large Bytes message, over the default 32KB size.
160+
*/
161+
func TestLargeBytesMessage(t *testing.T) {
162+
163+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
164+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
165+
assert.Nil(t, cfErr)
166+
167+
// Creates a connection to the queue manager, using defer to close it automatically
168+
// at the end of the function (if it was created successfully)
169+
context, ctxErr := cf.CreateContext()
170+
assert.Nil(t, ctxErr)
171+
if context != nil {
172+
defer context.Close()
173+
}
174+
175+
// Get a long text string over 32kb in length
176+
txtOver32kb := getStringOver32kb()
177+
bytesOver32kb := []byte(txtOver32kb)
178+
179+
// Create a TextMessage with it
180+
msg := context.CreateBytesMessageWithBytes(bytesOver32kb)
181+
182+
// Now send the message and get it back again.
183+
queue := context.CreateQueue("DEV.QUEUE.1")
184+
errSend := context.CreateProducer().SetTimeToLive(30000).Send(queue, msg)
185+
assert.Nil(t, errSend)
186+
187+
consumer, errCons := context.CreateConsumer(queue)
188+
assert.Nil(t, errCons)
189+
if consumer != nil {
190+
defer consumer.Close()
191+
}
192+
193+
// This will fail because the default buffer size when receiving a
194+
// message is 32kb.
195+
_, errRcv := consumer.ReceiveNoWait()
196+
assert.NotNil(t, errRcv)
197+
assert.Equal(t, "MQRC_TRUNCATED_MSG_FAILED", errRcv.GetReason())
198+
assert.Equal(t, "2080", errRcv.GetErrorCode())
199+
200+
// The message is still left on the queue since it failed to be received successfully.
201+
202+
// Since the buffer size is configured using the ConnectionFactoy we will
203+
// create a second connection in order to successfully retrieve the message.
204+
cf.ReceiveBufferSize = len(bytesOver32kb) + 50
205+
206+
context2, ctxErr2 := cf.CreateContext()
207+
assert.Nil(t, ctxErr2)
208+
if context2 != nil {
209+
defer context2.Close()
210+
}
211+
212+
consumer2, errCons2 := context2.CreateConsumer(queue)
213+
assert.Nil(t, errCons2)
214+
if consumer2 != nil {
215+
defer consumer2.Close()
216+
}
217+
218+
rcvMsg2, errRcv2 := consumer2.ReceiveNoWait()
219+
assert.Nil(t, errRcv2)
220+
assert.NotNil(t, rcvMsg2)
221+
222+
switch msg2 := rcvMsg2.(type) {
223+
case jms20subset.BytesMessage:
224+
assert.Equal(t, &bytesOver32kb, msg2.ReadBytes())
225+
default:
226+
assert.Fail(t, "Got something other than a text message")
227+
}
228+
229+
}
230+
231+
/*
232+
* Test the send/receive of a large Bytes message, over the default 32KB size.
233+
*/
234+
func TestLargeReceiveBytesBodyBytesMessage(t *testing.T) {
235+
236+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
237+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
238+
assert.Nil(t, cfErr)
239+
240+
// Creates a connection to the queue manager, using defer to close it automatically
241+
// at the end of the function (if it was created successfully)
242+
context, ctxErr := cf.CreateContext()
243+
assert.Nil(t, ctxErr)
244+
if context != nil {
245+
defer context.Close()
246+
}
247+
248+
// Get a long text string over 32kb in length
249+
txtOver32kb := getStringOver32kb()
250+
bytesOver32kb := []byte(txtOver32kb)
251+
252+
// Create a TextMessage with it
253+
msg := context.CreateBytesMessageWithBytes(bytesOver32kb)
254+
255+
// Now send the message and get it back again.
256+
queue := context.CreateQueue("DEV.QUEUE.1")
257+
errSend := context.CreateProducer().SetTimeToLive(30000).Send(queue, msg)
258+
assert.Nil(t, errSend)
259+
260+
consumer, errCons := context.CreateConsumer(queue)
261+
assert.Nil(t, errCons)
262+
if consumer != nil {
263+
defer consumer.Close()
264+
}
265+
266+
// This will fail because the default buffer size when receiving a
267+
// message is 32kb.
268+
_, errRcv := consumer.ReceiveBytesBodyNoWait()
269+
assert.NotNil(t, errRcv)
270+
assert.Equal(t, "MQRC_TRUNCATED_MSG_FAILED", errRcv.GetReason())
271+
assert.Equal(t, "2080", errRcv.GetErrorCode())
272+
273+
// The message is still left on the queue since it failed to be received successfully.
274+
275+
// Since the buffer size is configured using the ConnectionFactoy we will
276+
// create a second connection in order to successfully retrieve the message.
277+
cf.ReceiveBufferSize = len(bytesOver32kb) + 50
278+
279+
context2, ctxErr2 := cf.CreateContext()
280+
assert.Nil(t, ctxErr2)
281+
if context2 != nil {
282+
defer context2.Close()
283+
}
284+
285+
consumer2, errCons2 := context2.CreateConsumer(queue)
286+
assert.Nil(t, errCons2)
287+
if consumer2 != nil {
288+
defer consumer2.Close()
289+
}
290+
291+
rcvBytes2, errRcv2 := consumer2.ReceiveBytesBodyNoWait()
292+
assert.Nil(t, errRcv2)
293+
assert.Equal(t, &bytesOver32kb, rcvBytes2)
294+
295+
}
296+
297+
func getStringOver32kb() string {
298+
299+
// Build a text string which is over 32KB (in a not very efficient way!)
300+
// First snippet is 100 chars long.
301+
txt100chars := "The quick brown fox jumped over the lazy dog into the flowing river that rushed over the cliff edge."
302+
txt1300chars := string(time.Now().UnixNano()) + txt100chars + txt100chars + txt100chars + txt100chars + txt100chars +
303+
txt100chars + txt100chars + txt100chars + txt100chars + txt100chars +
304+
txt100chars + txt100chars + txt100chars
305+
306+
txtOver32kb := txt1300chars
307+
308+
for len(txtOver32kb) < 32*1024 {
309+
txtOver32kb += txt1300chars
310+
}
311+
312+
return txtOver32kb
313+
314+
}

mqjms/ConnectionFactoryImpl.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ type ConnectionFactoryImpl struct {
4343

4444
// Allthough only available per MQ 9.1.2 it looks like a good idea to have this present in MQ-JMS
4545
ApplName string
46+
47+
// Controls the size of the buffer used when receiving a message (default is 32kb if not set)
48+
ReceiveBufferSize int
4649
}
4750

4851
// CreateContext implements the JMS method to create a connection to an IBM MQ
@@ -131,8 +134,9 @@ func (cf ConnectionFactoryImpl) CreateContextWithSessionMode(sessionMode int) (j
131134
// Connection was created successfully, so we wrap the MQI object into
132135
// a new ContextImpl and return it to the caller.
133136
ctx = ContextImpl{
134-
qMgr: qMgr,
135-
sessionMode: sessionMode,
137+
qMgr: qMgr,
138+
sessionMode: sessionMode,
139+
receiveBufferSize: cf.ReceiveBufferSize,
136140
}
137141

138142
} else {

mqjms/ConsumerImpl.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,14 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
6262
var jmsErr jms20subset.JMSException
6363

6464
getmqmd := ibmmq.NewMQMD()
65-
buffer := make([]byte, 32768)
65+
66+
myBufferSize := 32768
67+
68+
if consumer.ctx.receiveBufferSize > 0 {
69+
myBufferSize = consumer.ctx.receiveBufferSize
70+
}
71+
72+
buffer := make([]byte, myBufferSize)
6673

6774
// Calculate the syncpoint value
6875
syncpointSetting := ibmmq.MQGMO_NO_SYNCPOINT

mqjms/ContextImpl.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ import (
1919
// ContextImpl encapsulates the objects necessary to maintain an active
2020
// connection to an IBM MQ queue manager.
2121
type ContextImpl struct {
22-
qMgr ibmmq.MQQueueManager
23-
sessionMode int
22+
qMgr ibmmq.MQQueueManager
23+
sessionMode int
24+
receiveBufferSize int
2425
}
2526

2627
// CreateQueue implements the logic necessary to create a provider-specific

next-features.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Not currently implemented:
1111
- Message Properties etc
1212
- Temporary destinations
1313
- Priority
14+
- Configurable option to auto-set the receive buffer length if the default 32kb is exceeded (less efficient that setting up front)
1415

1516
Client capabilities for participating in Uniform Clusters;
1617
- CCDT to allow listing queue managers

0 commit comments

Comments
 (0)