Skip to content

Commit c13f766

Browse files
committed
Ability to read a truncated text message - #77
1 parent 89f251f commit c13f766

File tree

2 files changed

+48
-18
lines changed

2 files changed

+48
-18
lines changed

largemessage_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -331,8 +331,8 @@ func TestTruncatedTextMessage(t *testing.T) {
331331
defer consumer.Close()
332332
}
333333

334-
// This will fail because the default buffer size when receiving a
335-
// message is 32kb.
334+
// Since we have set the flag to allow a truncated message to be returned, this will pass
335+
// but will only return the first 1024 bytes (cf.ReceiveBufferSize) of the message.
336336
truncMsg, errRcv := consumer.ReceiveNoWait()
337337
assert.NotNil(t, errRcv)
338338
assert.Equal(t, "MQRC_TRUNCATED_MSG_ACCEPTED", errRcv.GetReason())
@@ -363,9 +363,8 @@ func TestTruncatedTextMessage(t *testing.T) {
363363
defer consumer2.Close()
364364
}
365365

366-
rcvMsg2, errRcv2 := consumer2.ReceiveNoWait()
367-
assert.Nil(t, errRcv2)
368-
assert.NotNil(t, rcvMsg2)
366+
// Attempt to receive a message, and don't worry whether it does or not.
367+
consumer2.ReceiveNoWait()
369368

370369
}
371370

mqjms/ConsumerImpl.go

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,13 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
7171

7272
getmqmd := ibmmq.NewMQMD()
7373

74-
myBufferSize := 32768
74+
bufferSize := 32768
7575

7676
if consumer.ctx.receiveBufferSize > 0 {
77-
myBufferSize = consumer.ctx.receiveBufferSize
77+
bufferSize = consumer.ctx.receiveBufferSize
7878
}
7979

80-
buffer := make([]byte, myBufferSize)
80+
buffer := make([]byte, bufferSize)
8181

8282
// Calculate the syncpoint value
8383
syncpointSetting := ibmmq.MQGMO_NO_SYNCPOINT
@@ -88,6 +88,7 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
8888
// Set the GMO (get message options)
8989
gmo.Options |= syncpointSetting
9090
gmo.Options |= ibmmq.MQGMO_FAIL_IF_QUIESCING
91+
gmo.Options |= ibmmq.MQGMO_ACCEPT_TRUNCATED_MSG
9192

9293
// Include the message properties in the msgHandle
9394
gmo.Options |= ibmmq.MQGMO_PROPERTIES_IN_HANDLE
@@ -105,7 +106,24 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
105106
// Use the prepared objects to ask for a message from the queue.
106107
datalen, err := consumer.qObject.Get(getmqmd, gmo, buffer)
107108

108-
if err == nil {
109+
// Establish the read length so that it does not exceed the size of the buffer.
110+
readLength := datalen
111+
if datalen > bufferSize {
112+
readLength = bufferSize
113+
}
114+
115+
// Check whether this is a truncated message.
116+
isTruncatedMessage := false
117+
if err != nil && ((err.(*ibmmq.MQReturn)).MQRC == ibmmq.MQRC_TRUNCATED_MSG_ACCEPTED) {
118+
isTruncatedMessage = true
119+
120+
// In the truncated message case we want to return the warning object as well as the message
121+
// so that the application can tell that some of the data is missing.
122+
jmsErr = CreateJMSExceptionFromMQReturn(err)
123+
}
124+
125+
// Golden path - typically a message was received without error.
126+
if err == nil || isTruncatedMessage {
109127

110128
// Set a finalizer on the message handle to allow it to be deleted
111129
// when it is no longer referenced by an active object, to reduce/prevent
@@ -119,8 +137,8 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
119137

120138
var msgBodyStr *string
121139

122-
if datalen > 0 {
123-
strContent := string(buffer[:datalen])
140+
if readLength > 0 {
141+
strContent := string(buffer[:readLength])
124142
msgBodyStr = &strContent
125143
}
126144

@@ -135,11 +153,11 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
135153

136154
} else {
137155

138-
if datalen == 0 {
156+
if readLength == 0 {
139157
buffer = []byte{}
140158
}
141159

142-
trimmedBuffer := buffer[0:datalen]
160+
trimmedBuffer := buffer[0:readLength]
143161

144162
// Not a string, so fall back to BytesMessage
145163
msg = &BytesMessageImpl{
@@ -172,11 +190,7 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
172190

173191
// Parse the details of the error and return it to the caller as
174192
// a JMSException
175-
rcInt := int(mqret.MQRC)
176-
errCode := strconv.Itoa(rcInt)
177-
reason := ibmmq.MQItoString("RC", rcInt)
178-
179-
jmsErr = jms20subset.CreateJMSException(reason, errCode, err)
193+
jmsErr = CreateJMSExceptionFromMQReturn(err)
180194
}
181195

182196
}
@@ -417,3 +431,20 @@ func (consumer ConsumerImpl) Close() {
417431

418432
return
419433
}
434+
435+
func CreateJMSExceptionFromMQReturn(mqretError error) jms20subset.JMSException {
436+
437+
// Assumes this error code was returned from MQ call.
438+
mqret := mqretError.(*ibmmq.MQReturn)
439+
440+
// Parse the details of the error and return it to the caller as
441+
// a JMSException
442+
rcInt := int(mqret.MQRC)
443+
errCode := strconv.Itoa(rcInt)
444+
reason := ibmmq.MQItoString("RC", rcInt)
445+
446+
jmsErr := jms20subset.CreateJMSException(reason, errCode, mqretError)
447+
448+
return jmsErr
449+
450+
}

0 commit comments

Comments
 (0)