Skip to content

Commit cde909b

Browse files
authored
Merge pull request #78 from ibm-messaging/accept_truncated_msg
Add support for handling truncated messages
2 parents 0358b82 + c69d248 commit cde909b

File tree

6 files changed

+307
-44
lines changed

6 files changed

+307
-44
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ your own error handling or logging.
128128
* Handle error codes returned by the queue manager - [sample_errorhandling_test.go](sample_errorhandling_test.go)
129129
* Set the application name (ApplName) on connections - [applname_test.go](applname_test.go)
130130
* Receive messages over 32kb in size by setting the receive buffer size - [largemessage_test.go](largemessage_test.go)
131+
* Receive a truncated message body for the case where the body is larger than the allowed buffer size - [largemessage_test.go#301](largemessage_test.go#301)
131132
* Asynchronous put - [asyncput_test.go](asyncput_test.go)
132133
* Special header properties such as JMS_IBM_Format - [specialproperties_test.go](specialproperties_test.go)
133134

jms20subset/JMSException.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ type JMSException interface {
2424

2525
// JMSExceptionImpl is a struct that implements the JMSException interface
2626
type JMSExceptionImpl struct {
27-
reason string
28-
errorCode string
29-
linkedErr error
27+
reason string
28+
errorCode string
29+
linkedErr error
30+
messageLength int
3031
}
3132

3233
// GetReason returns the provider-specific reason string describing the error.
@@ -51,6 +52,15 @@ func (ex JMSExceptionImpl) GetLinkedError() error {
5152

5253
}
5354

55+
// GetMessageLength gives the data length that was returned by a Receive (GET) call,
56+
// for example if the message was truncated, or could not be read because the receive
57+
// buffer was too small.
58+
func (ex JMSExceptionImpl) GetMessageLength() int {
59+
60+
return ex.messageLength
61+
62+
}
63+
5464
// Error allows the JMSExceptionImpl struct to be treated as a Golang error,
5565
// while also returning a human readable string representation of the error.
5666
func (ex JMSExceptionImpl) Error() string {
@@ -66,11 +76,17 @@ func (ex JMSExceptionImpl) Error() string {
6676

6777
// CreateJMSException is a helper function for creating a JMSException
6878
func CreateJMSException(reason string, errorCode string, linkedErr error) JMSException {
79+
return CreateJMSExceptionWithExtraParams(reason, errorCode, linkedErr, 0)
80+
}
81+
82+
// CreateJMSException is a helper function for creating a JMSException
83+
func CreateJMSExceptionWithExtraParams(reason string, errorCode string, linkedErr error, messageLength int) JMSException {
6984

7085
ex := JMSExceptionImpl{
71-
reason: reason,
72-
errorCode: errorCode,
73-
linkedErr: linkedErr,
86+
reason: reason,
87+
errorCode: errorCode,
88+
linkedErr: linkedErr,
89+
messageLength: messageLength,
7490
}
7591

7692
return ex

largemessage_test.go

Lines changed: 219 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,20 @@ func TestLargeTextMessage(t *testing.T) {
6262

6363
// The message is still left on the queue since it failed to be received successfully.
6464

65-
// Since the buffer size is configured using the ConnectionFactoy we will
66-
// create a second connection in order to successfully retrieve the message.
67-
cf.ReceiveBufferSize = len(txtOver32kb) + 50
65+
// Use a special attribute of the returned exception to look up what the actual length of the
66+
// message is, so that we can correctly increase the buffer size in order to receive the message.
67+
switch jmsExc := errRcv.(type) {
68+
case jms20subset.JMSExceptionImpl:
69+
realMessageLength := jmsExc.GetMessageLength()
70+
assert.Equal(t, len(txtOver32kb), realMessageLength) // check it matches the original (long) length
71+
72+
// Since the buffer size is configured using the ConnectionFactory we will
73+
// create a second connection in order to successfully retrieve the message.
74+
cf.ReceiveBufferSize = realMessageLength
75+
76+
default:
77+
assert.Fail(t, "Got something other than a JMSExceptionImpl")
78+
}
6879

6980
context2, ctxErr2 := cf.CreateContext()
7081
assert.Nil(t, ctxErr2)
@@ -78,7 +89,7 @@ func TestLargeTextMessage(t *testing.T) {
7889
defer consumer2.Close()
7990
}
8091

81-
rcvMsg2, errRcv2 := consumer2.ReceiveNoWait()
92+
rcvMsg2, errRcv2 := consumer2.ReceiveNoWait() // receive the message using the correct (larger) buffer size
8293
assert.Nil(t, errRcv2)
8394
assert.NotNil(t, rcvMsg2)
8495

@@ -134,9 +145,20 @@ func TestLargeReceiveStringBodyTextMessage(t *testing.T) {
134145

135146
// The message is still left on the queue since it failed to be received successfully.
136147

137-
// Since the buffer size is configured using the ConnectionFactoy we will
138-
// create a second connection in order to successfully retrieve the message.
139-
cf.ReceiveBufferSize = len(txtOver32kb) + 50
148+
// Use a special attribute of the returned exception to look up what the actual length of the
149+
// message is, so that we can correctly increase the buffer size in order to receive the message.
150+
switch jmsExc := errRcv.(type) {
151+
case jms20subset.JMSExceptionImpl:
152+
realMessageLength := jmsExc.GetMessageLength()
153+
assert.Equal(t, len(txtOver32kb), realMessageLength) // check it matches the original (long) length
154+
155+
// Since the buffer size is configured using the ConnectionFactory we will
156+
// create a second connection in order to successfully retrieve the message.
157+
cf.ReceiveBufferSize = realMessageLength
158+
159+
default:
160+
assert.Fail(t, "Got something other than a JMSExceptionImpl")
161+
}
140162

141163
context2, ctxErr2 := cf.CreateContext()
142164
assert.Nil(t, ctxErr2)
@@ -200,9 +222,20 @@ func TestLargeBytesMessage(t *testing.T) {
200222

201223
// The message is still left on the queue since it failed to be received successfully.
202224

203-
// Since the buffer size is configured using the ConnectionFactoy we will
204-
// create a second connection in order to successfully retrieve the message.
205-
cf.ReceiveBufferSize = len(bytesOver32kb) + 50
225+
// Use a special attribute of the returned exception to look up what the actual length of the
226+
// message is, so that we can correctly increase the buffer size in order to receive the message.
227+
switch jmsExc := errRcv.(type) {
228+
case jms20subset.JMSExceptionImpl:
229+
realMessageLength := jmsExc.GetMessageLength()
230+
assert.Equal(t, len(txtOver32kb), realMessageLength) // check it matches the original (long) length
231+
232+
// Since the buffer size is configured using the ConnectionFactory we will
233+
// create a second connection in order to successfully retrieve the message.
234+
cf.ReceiveBufferSize = realMessageLength
235+
236+
default:
237+
assert.Fail(t, "Got something other than a JMSExceptionImpl")
238+
}
206239

207240
context2, ctxErr2 := cf.CreateContext()
208241
assert.Nil(t, ctxErr2)
@@ -273,9 +306,20 @@ func TestLargeReceiveBytesBodyBytesMessage(t *testing.T) {
273306

274307
// The message is still left on the queue since it failed to be received successfully.
275308

276-
// Since the buffer size is configured using the ConnectionFactoy we will
277-
// create a second connection in order to successfully retrieve the message.
278-
cf.ReceiveBufferSize = len(bytesOver32kb) + 50
309+
// Use a special attribute of the returned exception to look up what the actual length of the
310+
// message is, so that we can correctly increase the buffer size in order to receive the message.
311+
switch jmsExc := errRcv.(type) {
312+
case jms20subset.JMSExceptionImpl:
313+
realMessageLength := jmsExc.GetMessageLength()
314+
assert.Equal(t, len(txtOver32kb), realMessageLength) // check it matches the original (long) length
315+
316+
// Since the buffer size is configured using the ConnectionFactory we will
317+
// create a second connection in order to successfully retrieve the message.
318+
cf.ReceiveBufferSize = realMessageLength
319+
320+
default:
321+
assert.Fail(t, "Got something other than a JMSExceptionImpl")
322+
}
279323

280324
context2, ctxErr2 := cf.CreateContext()
281325
assert.Nil(t, ctxErr2)
@@ -295,6 +339,168 @@ func TestLargeReceiveBytesBodyBytesMessage(t *testing.T) {
295339

296340
}
297341

342+
/*
343+
* Test receiving a truncated text message, where the body of the message is larger than the receive buffer.
344+
*/
345+
func TestTruncatedTextMessage(t *testing.T) {
346+
347+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
348+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
349+
cf.ReceiveBufferSize = 1024
350+
cf.AcceptTruncatedMessage = true
351+
assert.Nil(t, cfErr)
352+
353+
// Creates a connection to the queue manager, using defer to close it automatically
354+
// at the end of the function (if it was created successfully)
355+
context, ctxErr := cf.CreateContext()
356+
assert.Nil(t, ctxErr)
357+
if context != nil {
358+
defer context.Close()
359+
}
360+
361+
// Get a long text string over 32kb in length
362+
txtOver32kb := getStringOver32kb()
363+
364+
// Create a TextMessage with it
365+
msg := context.CreateTextMessageWithString(txtOver32kb)
366+
367+
// Now send the message and get it back again.
368+
queue := context.CreateQueue("DEV.QUEUE.1")
369+
errSend := context.CreateProducer().SetTimeToLive(5000).Send(queue, msg)
370+
assert.Nil(t, errSend)
371+
372+
consumer, errCons := context.CreateConsumer(queue) // with 1kb buffer size as applied at the beginning of this test
373+
assert.Nil(t, errCons)
374+
if consumer != nil {
375+
defer consumer.Close()
376+
}
377+
378+
// Since we have set the flag to allow a truncated message to be returned, this will pass
379+
// but will only return the first 1024 bytes (cf.ReceiveBufferSize) of the message.
380+
truncMsg, errRcv := consumer.ReceiveNoWait()
381+
assert.NotNil(t, errRcv)
382+
assert.Equal(t, "MQRC_TRUNCATED_MSG_ACCEPTED", errRcv.GetReason())
383+
assert.Equal(t, "2079", errRcv.GetErrorCode())
384+
385+
assert.NotNil(t, truncMsg) // We have received the message, but it is truncated.
386+
387+
switch txtTruncMsg := truncMsg.(type) {
388+
case jms20subset.TextMessage:
389+
receivedTxt := txtTruncMsg.GetText()
390+
assert.Equal(t, cf.ReceiveBufferSize, len(*receivedTxt))
391+
default:
392+
assert.Fail(t, "Got something other than a text message")
393+
}
394+
395+
// Use a special attribute of the returned exception to look up what the actual length of the
396+
// message is, so that we can tidy up the message (read successfully) in the event the previous
397+
// step failed.
398+
switch jmsExc := errRcv.(type) {
399+
case jms20subset.JMSExceptionImpl:
400+
realMessageLength := jmsExc.GetMessageLength()
401+
assert.Equal(t, len(txtOver32kb), realMessageLength) // check it matches the original (long) length
402+
403+
// Since the buffer size is configured using the ConnectionFactory we will
404+
// create a second connection in order to successfully retrieve the message.
405+
cf.ReceiveBufferSize = realMessageLength
406+
407+
default:
408+
assert.Fail(t, "Got something other than a JMSExceptionImpl")
409+
}
410+
411+
context2, ctxErr2 := cf.CreateContext()
412+
assert.Nil(t, ctxErr2)
413+
if context2 != nil {
414+
defer context2.Close()
415+
}
416+
417+
consumer2, errCons2 := context2.CreateConsumer(queue)
418+
assert.Nil(t, errCons2)
419+
if consumer2 != nil {
420+
defer consumer2.Close()
421+
}
422+
423+
// If the first part of this text was successful then there should be no message to receive.
424+
tidyMsg, tidyErr := consumer2.ReceiveNoWait()
425+
assert.Nil(t, tidyErr)
426+
assert.Nil(t, tidyMsg)
427+
428+
}
429+
430+
/*
431+
* Test receiving a truncated bytes message, where the body of the message is larger than the receive buffer.
432+
*/
433+
func TestTruncatedBytesMessage(t *testing.T) {
434+
435+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
436+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
437+
cf.ReceiveBufferSize = 1024
438+
cf.AcceptTruncatedMessage = true
439+
assert.Nil(t, cfErr)
440+
441+
// Creates a connection to the queue manager, using defer to close it automatically
442+
// at the end of the function (if it was created successfully)
443+
context, ctxErr := cf.CreateContext()
444+
assert.Nil(t, ctxErr)
445+
if context != nil {
446+
defer context.Close()
447+
}
448+
449+
// Get a long text string over 32kb in length
450+
txtOver32kb := getStringOver32kb()
451+
bytesOver32kb := []byte(txtOver32kb)
452+
453+
// Create a TextMessage with it
454+
msg := context.CreateBytesMessageWithBytes(bytesOver32kb)
455+
456+
// Now send the message and get it back again.
457+
queue := context.CreateQueue("DEV.QUEUE.1")
458+
errSend := context.CreateProducer().SetTimeToLive(5000).Send(queue, msg)
459+
assert.Nil(t, errSend)
460+
461+
consumer, errCons := context.CreateConsumer(queue) // with 1kb buffer size as applied at the beginning of this test
462+
assert.Nil(t, errCons)
463+
if consumer != nil {
464+
defer consumer.Close()
465+
}
466+
467+
// Since we have set the flag to allow a truncated message to be returned, this will pass
468+
// but will only return the first 1024 bytes (cf.ReceiveBufferSize) of the message.
469+
truncMsg, errRcv := consumer.ReceiveNoWait()
470+
assert.NotNil(t, errRcv)
471+
assert.Equal(t, "MQRC_TRUNCATED_MSG_ACCEPTED", errRcv.GetReason())
472+
assert.Equal(t, "2079", errRcv.GetErrorCode())
473+
474+
assert.NotNil(t, truncMsg) // We have received the message, but it is truncated.
475+
476+
switch bytesTruncMsg := truncMsg.(type) {
477+
case jms20subset.BytesMessage:
478+
receivedBytes := bytesTruncMsg.ReadBytes()
479+
assert.Equal(t, cf.ReceiveBufferSize, len(*receivedBytes))
480+
default:
481+
assert.Fail(t, "Got something other than a bytes message")
482+
}
483+
484+
// Make sure we tidy up in case the previous part of the test failed.
485+
cf.ReceiveBufferSize = len(txtOver32kb) + 50
486+
487+
context2, ctxErr2 := cf.CreateContext()
488+
assert.Nil(t, ctxErr2)
489+
if context2 != nil {
490+
defer context2.Close()
491+
}
492+
493+
consumer2, errCons2 := context2.CreateConsumer(queue)
494+
assert.Nil(t, errCons2)
495+
if consumer2 != nil {
496+
defer consumer2.Close()
497+
}
498+
499+
// Attempt to receive a message, and don't worry whether it does or not.
500+
consumer2.ReceiveNoWait()
501+
502+
}
503+
298504
func getStringOver32kb() string {
299505

300506
// Build a text string which is over 32KB (in a not very efficient way!)

mqjms/ConnectionFactoryImpl.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ type ConnectionFactoryImpl struct {
4848
// Controls the size of the buffer used when receiving a message (default is 32kb if not set)
4949
ReceiveBufferSize int
5050

51+
// Controls whether a message should be returned if the body will be truncated (because the body is larger than the ReceiveBufferSize)
52+
AcceptTruncatedMessage bool
53+
5154
// SetCheckCount defines the number of messages that will be asynchronously put using
5255
// this Context between checks for errors. For example a value of 10 will cause an error
5356
// check to be triggered once for every 10 messages.
@@ -155,12 +158,13 @@ func (cf ConnectionFactoryImpl) CreateContextWithSessionMode(sessionMode int, mq
155158
// Connection was created successfully, so we wrap the MQI object into
156159
// a new ContextImpl and return it to the caller.
157160
ctx = ContextImpl{
158-
qMgr: qMgr,
159-
ctxLock: &sync.Mutex{},
160-
sessionMode: sessionMode,
161-
receiveBufferSize: cf.ReceiveBufferSize,
162-
sendCheckCount: cf.SendCheckCount,
163-
sendCheckCountInc: countInc,
161+
qMgr: qMgr,
162+
ctxLock: &sync.Mutex{},
163+
sessionMode: sessionMode,
164+
receiveBufferSize: cf.ReceiveBufferSize,
165+
acceptTruncatedMessage: cf.AcceptTruncatedMessage,
166+
sendCheckCount: cf.SendCheckCount,
167+
sendCheckCountInc: countInc,
164168
}
165169

166170
}

0 commit comments

Comments
 (0)