Skip to content

Commit 9746790

Browse files
committed
Add ConnectionFactory option for AcceptTruncatedMessage - #77
1 parent c13f766 commit 9746790

File tree

5 files changed

+99
-15
lines changed

5 files changed

+99
-15
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ your own error handling or logging.
128128
* Handle error codes returned by the queue manager - [sample_errorhandling_test.go](sample_errorhandling_test.go)
129129
* Set the application name (ApplName) on connections - [applname_test.go](applname_test.go)
130130
* Receive messages over 32kb in size by setting the receive buffer size - [largemessage_test.go](largemessage_test.go)
131+
* Receive a truncated message body for the case where the body is larger than the allowed buffer size - [largemessage_test.go#301](largemessage_test.go#301)
131132
* Asynchronous put - [asyncput_test.go](asyncput_test.go)
132133
* Special header properties such as JMS_IBM_Format - [specialproperties_test.go](specialproperties_test.go)
133134

largemessage_test.go

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,14 +296,14 @@ func TestLargeReceiveBytesBodyBytesMessage(t *testing.T) {
296296
}
297297

298298
/*
299-
* Test receiving a truncated message, where the body of the message is larger than the receive buffer.
299+
* Test receiving a truncated text message, where the body of the message is larger than the receive buffer.
300300
*/
301301
func TestTruncatedTextMessage(t *testing.T) {
302302

303303
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
304304
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
305305
cf.ReceiveBufferSize = 1024
306-
// TODO - set parameter to allow receive of truncated message
306+
cf.AcceptTruncatedMessage = true
307307
assert.Nil(t, cfErr)
308308

309309
// Creates a connection to the queue manager, using defer to close it automatically
@@ -368,6 +368,80 @@ func TestTruncatedTextMessage(t *testing.T) {
368368

369369
}
370370

371+
/*
372+
* Test receiving a truncated bytes message, where the body of the message is larger than the receive buffer.
373+
*/
374+
func TestTruncatedBytesMessage(t *testing.T) {
375+
376+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
377+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
378+
cf.ReceiveBufferSize = 1024
379+
cf.AcceptTruncatedMessage = true
380+
assert.Nil(t, cfErr)
381+
382+
// Creates a connection to the queue manager, using defer to close it automatically
383+
// at the end of the function (if it was created successfully)
384+
context, ctxErr := cf.CreateContext()
385+
assert.Nil(t, ctxErr)
386+
if context != nil {
387+
defer context.Close()
388+
}
389+
390+
// Get a long text string over 32kb in length
391+
txtOver32kb := getStringOver32kb()
392+
bytesOver32kb := []byte(txtOver32kb)
393+
394+
// Create a TextMessage with it
395+
msg := context.CreateBytesMessageWithBytes(bytesOver32kb)
396+
397+
// Now send the message and get it back again.
398+
queue := context.CreateQueue("DEV.QUEUE.1")
399+
errSend := context.CreateProducer().SetTimeToLive(5000).Send(queue, msg)
400+
assert.Nil(t, errSend)
401+
402+
consumer, errCons := context.CreateConsumer(queue) // with 1kb buffer size as applied at the beginning of this test
403+
assert.Nil(t, errCons)
404+
if consumer != nil {
405+
defer consumer.Close()
406+
}
407+
408+
// Since we have set the flag to allow a truncated message to be returned, this will pass
409+
// but will only return the first 1024 bytes (cf.ReceiveBufferSize) of the message.
410+
truncMsg, errRcv := consumer.ReceiveNoWait()
411+
assert.NotNil(t, errRcv)
412+
assert.Equal(t, "MQRC_TRUNCATED_MSG_ACCEPTED", errRcv.GetReason())
413+
assert.Equal(t, "2079", errRcv.GetErrorCode())
414+
415+
assert.NotNil(t, truncMsg) // We have received the message, but it is truncated.
416+
417+
switch bytesTruncMsg := truncMsg.(type) {
418+
case jms20subset.BytesMessage:
419+
receivedBytes := bytesTruncMsg.ReadBytes()
420+
assert.Equal(t, cf.ReceiveBufferSize, len(*receivedBytes))
421+
default:
422+
assert.Fail(t, "Got something other than a bytes message")
423+
}
424+
425+
// Make sure we tidy up in case the previous part of the test failed.
426+
cf.ReceiveBufferSize = len(txtOver32kb) + 50
427+
428+
context2, ctxErr2 := cf.CreateContext()
429+
assert.Nil(t, ctxErr2)
430+
if context2 != nil {
431+
defer context2.Close()
432+
}
433+
434+
consumer2, errCons2 := context2.CreateConsumer(queue)
435+
assert.Nil(t, errCons2)
436+
if consumer2 != nil {
437+
defer consumer2.Close()
438+
}
439+
440+
// Attempt to receive a message, and don't worry whether it does or not.
441+
consumer2.ReceiveNoWait()
442+
443+
}
444+
371445
func getStringOver32kb() string {
372446

373447
// Build a text string which is over 32KB (in a not very efficient way!)

mqjms/ConnectionFactoryImpl.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ type ConnectionFactoryImpl struct {
4848
// Controls the size of the buffer used when receiving a message (default is 32kb if not set)
4949
ReceiveBufferSize int
5050

51+
// Controls whether a message should be returned if the body will be truncated (because the body is larger than the ReceiveBufferSize)
52+
AcceptTruncatedMessage bool
53+
5154
// SetCheckCount defines the number of messages that will be asynchronously put using
5255
// this Context between checks for errors. For example a value of 10 will cause an error
5356
// check to be triggered once for every 10 messages.
@@ -155,12 +158,13 @@ func (cf ConnectionFactoryImpl) CreateContextWithSessionMode(sessionMode int, mq
155158
// Connection was created successfully, so we wrap the MQI object into
156159
// a new ContextImpl and return it to the caller.
157160
ctx = ContextImpl{
158-
qMgr: qMgr,
159-
ctxLock: &sync.Mutex{},
160-
sessionMode: sessionMode,
161-
receiveBufferSize: cf.ReceiveBufferSize,
162-
sendCheckCount: cf.SendCheckCount,
163-
sendCheckCountInc: countInc,
161+
qMgr: qMgr,
162+
ctxLock: &sync.Mutex{},
163+
sessionMode: sessionMode,
164+
receiveBufferSize: cf.ReceiveBufferSize,
165+
acceptTruncatedMessage: cf.AcceptTruncatedMessage,
166+
sendCheckCount: cf.SendCheckCount,
167+
sendCheckCountInc: countInc,
164168
}
165169

166170
}

mqjms/ConsumerImpl.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,11 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
8888
// Set the GMO (get message options)
8989
gmo.Options |= syncpointSetting
9090
gmo.Options |= ibmmq.MQGMO_FAIL_IF_QUIESCING
91-
gmo.Options |= ibmmq.MQGMO_ACCEPT_TRUNCATED_MSG
91+
92+
// Allow a truncated message to be returned if the user has requested it (default is to return an error on get)
93+
if consumer.ctx.acceptTruncatedMessage {
94+
gmo.Options |= ibmmq.MQGMO_ACCEPT_TRUNCATED_MSG
95+
}
9296

9397
// Include the message properties in the msgHandle
9498
gmo.Options |= ibmmq.MQGMO_PROPERTIES_IN_HANDLE

mqjms/ContextImpl.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@ import (
2121
// ContextImpl encapsulates the objects necessary to maintain an active
2222
// connection to an IBM MQ queue manager.
2323
type ContextImpl struct {
24-
qMgr ibmmq.MQQueueManager
25-
ctxLock *sync.Mutex // Mutex to synchronize MQRC calls to the queue manager
26-
sessionMode int
27-
receiveBufferSize int
28-
sendCheckCount int
29-
sendCheckCountInc *int // Internal counter to keep track of async-put messages sent
24+
qMgr ibmmq.MQQueueManager
25+
ctxLock *sync.Mutex // Mutex to synchronize MQRC calls to the queue manager
26+
sessionMode int
27+
acceptTruncatedMessage bool
28+
receiveBufferSize int
29+
sendCheckCount int
30+
sendCheckCountInc *int // Internal counter to keep track of async-put messages sent
3031
}
3132

3233
// CreateQueue implements the logic necessary to create a provider-specific

0 commit comments

Comments
 (0)