Skip to content

Commit 18c12c2

Browse files
committed
Fix MessageHandle memory leak on receive - #53
1 parent af0d578 commit 18c12c2

File tree

5 files changed

+155
-0
lines changed

5 files changed

+155
-0
lines changed

memoryleaks_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright (c) IBM Corporation 2023
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License v. 2.0, which is available at
6+
* http://www.eclipse.org/legal/epl-2.0.
7+
*
8+
* SPDX-License-Identifier: EPL-2.0
9+
*/
10+
package main
11+
12+
import (
13+
"fmt"
14+
"runtime"
15+
"testing"
16+
"time"
17+
18+
"github.com/ibm-messaging/mq-golang-jms20/mqjms"
19+
"github.com/stretchr/testify/assert"
20+
)
21+
22+
/*
23+
* Test for memory leak when there is no message to be received.
24+
*/
25+
func DONTRUN_TestLeakOnEmptyGet(t *testing.T) {
26+
27+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
28+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
29+
assert.Nil(t, cfErr)
30+
31+
// Creates a connection to the queue manager, using defer to close it automatically
32+
// at the end of the function (if it was created successfully)
33+
context, ctxErr := cf.CreateContext()
34+
assert.Nil(t, ctxErr)
35+
if context != nil {
36+
defer context.Close()
37+
}
38+
39+
// Now send the message and get it back again, to check that it roundtripped.
40+
queue := context.CreateQueue("DEV.QUEUE.1")
41+
42+
consumer, errCons := context.CreateConsumer(queue)
43+
if consumer != nil {
44+
defer consumer.Close()
45+
}
46+
assert.Nil(t, errCons)
47+
48+
for i := 1; i < 35000; i++ {
49+
50+
//time.Sleep(100 * time.Millisecond)
51+
rcvMsg, errRvc := consumer.ReceiveNoWait()
52+
assert.Nil(t, errRvc)
53+
assert.Nil(t, rcvMsg)
54+
55+
if i%1000 == 0 {
56+
fmt.Println("Messages:", i)
57+
}
58+
59+
}
60+
61+
fmt.Println("Finished receive calls - waiting for cooldown.")
62+
runtime.GC()
63+
64+
time.Sleep(30 * time.Second)
65+
66+
}

mqjms/ConnectionFactoryImpl.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ package mqjms
1111

1212
import (
1313
"strconv"
14+
"sync"
1415

1516
"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
1617
ibmmq "github.com/ibm-messaging/mq-golang/v5/ibmmq"
@@ -155,6 +156,7 @@ func (cf ConnectionFactoryImpl) CreateContextWithSessionMode(sessionMode int, mq
155156
// a new ContextImpl and return it to the caller.
156157
ctx = ContextImpl{
157158
qMgr: qMgr,
159+
ctxLock: &sync.Mutex{},
158160
sessionMode: sessionMode,
159161
receiveBufferSize: cf.ReceiveBufferSize,
160162
sendCheckCount: cf.SendCheckCount,

mqjms/ConsumerImpl.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ package mqjms
1111

1212
import (
1313
"errors"
14+
"fmt"
15+
"runtime"
1416
"strconv"
1517
"strings"
1618

@@ -57,6 +59,11 @@ func (consumer ConsumerImpl) Receive(waitMillis int32) (jms20subset.Message, jms
5759
// of receive.
5860
func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Message, jms20subset.JMSException) {
5961

62+
// Lock the context while we are making calls to the queue manager so that it
63+
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
64+
consumer.ctx.ctxLock.Lock()
65+
defer consumer.ctx.ctxLock.Unlock()
66+
6067
// Prepare objects to be used in receiving the message.
6168
var msg jms20subset.Message
6269
var jmsErr jms20subset.JMSException
@@ -99,6 +106,28 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
99106

100107
if err == nil {
101108

109+
// Set a finalizer on the message handle to allow it to be deleted
110+
// when it is no longer referenced by an active object, to reduce/prevent
111+
// memory leaks.
112+
runtime.SetFinalizer(&thisMsgHandle, func(msgHandle *ibmmq.MQMessageHandle) {
113+
consumer.ctx.ctxLock.Lock()
114+
dmho := ibmmq.NewMQDMHO()
115+
err := msgHandle.DltMH(dmho)
116+
if err != nil {
117+
118+
mqret := err.(*ibmmq.MQReturn)
119+
120+
if mqret.MQRC == ibmmq.MQRC_HCONN_ERROR {
121+
// Expected if the connection is closed before the finalizer executes
122+
// (at which point it should get tidied up automatically by the connection)
123+
} else {
124+
fmt.Println("DltMH finalizer", err)
125+
}
126+
127+
}
128+
consumer.ctx.ctxLock.Unlock()
129+
})
130+
102131
// Message received successfully (without error).
103132
// Determine on the basis of the format field what sort of message to create.
104133

@@ -142,6 +171,11 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
142171
// Error code was returned from MQ call.
143172
mqret := err.(*ibmmq.MQReturn)
144173

174+
// Delete the message handle object in-line here now that it is no longer required,
175+
// to avoid memory leak
176+
dmho := ibmmq.NewMQDMHO()
177+
gmo.MsgHandle.DltMH(dmho)
178+
145179
if mqret.MQRC == ibmmq.MQRC_NO_MSG_AVAILABLE {
146180

147181
// This isn't a real error - it's the way that MQ indicates that there

mqjms/ContextImpl.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ package mqjms
1212
import (
1313
"fmt"
1414
"strconv"
15+
"sync"
1516

1617
"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
1718
ibmmq "github.com/ibm-messaging/mq-golang/v5/ibmmq"
@@ -21,6 +22,7 @@ import (
2122
// connection to an IBM MQ queue manager.
2223
type ContextImpl struct {
2324
qMgr ibmmq.MQQueueManager
25+
ctxLock *sync.Mutex // Mutex to synchronize MQRC calls to the queue manager
2426
sessionMode int
2527
receiveBufferSize int
2628
sendCheckCount int
@@ -65,6 +67,11 @@ func (ctx ContextImpl) CreateConsumer(dest jms20subset.Destination) (jms20subset
6567
// receive messages that match the specified selector from the given Destination.
6668
func (ctx ContextImpl) CreateConsumerWithSelector(dest jms20subset.Destination, selector string) (jms20subset.JMSConsumer, jms20subset.JMSException) {
6769

70+
// Lock the context while we are making calls to the queue manager so that it
71+
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
72+
ctx.ctxLock.Lock()
73+
defer ctx.ctxLock.Unlock()
74+
6875
// First validate the selector string format (we don't make use of it at
6976
// runtime until the receive is called)
7077
if selector != "" {
@@ -118,6 +125,11 @@ func (ctx ContextImpl) CreateConsumerWithSelector(dest jms20subset.Destination,
118125
// an application can look at messages without removing them.
119126
func (ctx ContextImpl) CreateBrowser(dest jms20subset.Destination) (jms20subset.QueueBrowser, jms20subset.JMSException) {
120127

128+
// Lock the context while we are making calls to the queue manager so that it
129+
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
130+
ctx.ctxLock.Lock()
131+
defer ctx.ctxLock.Unlock()
132+
121133
// Set up the necessary objects to open the queue
122134
mqod := ibmmq.NewMQOD()
123135
var openOptions int32
@@ -165,6 +177,11 @@ func (ctx ContextImpl) CreateBrowser(dest jms20subset.Destination) (jms20subset.
165177
// CreateTextMessage is a JMS standard mechanism for creating a TextMessage.
166178
func (ctx ContextImpl) CreateTextMessage() jms20subset.TextMessage {
167179

180+
// Lock the context while we are making calls to the queue manager so that it
181+
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
182+
ctx.ctxLock.Lock()
183+
defer ctx.ctxLock.Unlock()
184+
168185
var bodyStr *string
169186
thisMsgHandle := createMsgHandle(ctx.qMgr)
170187

@@ -198,6 +215,11 @@ func createMsgHandle(qMgr ibmmq.MQQueueManager) ibmmq.MQMessageHandle {
198215
// and initialise it with the chosen text string.
199216
func (ctx ContextImpl) CreateTextMessageWithString(txt string) jms20subset.TextMessage {
200217

218+
// Lock the context while we are making calls to the queue manager so that it
219+
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
220+
ctx.ctxLock.Lock()
221+
defer ctx.ctxLock.Unlock()
222+
201223
thisMsgHandle := createMsgHandle(ctx.qMgr)
202224

203225
msg := &TextMessageImpl{
@@ -213,6 +235,11 @@ func (ctx ContextImpl) CreateTextMessageWithString(txt string) jms20subset.TextM
213235
// CreateBytesMessage is a JMS standard mechanism for creating a BytesMessage.
214236
func (ctx ContextImpl) CreateBytesMessage() jms20subset.BytesMessage {
215237

238+
// Lock the context while we are making calls to the queue manager so that it
239+
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
240+
ctx.ctxLock.Lock()
241+
defer ctx.ctxLock.Unlock()
242+
216243
var thisBodyBytes *[]byte
217244
thisMsgHandle := createMsgHandle(ctx.qMgr)
218245

@@ -227,6 +254,11 @@ func (ctx ContextImpl) CreateBytesMessage() jms20subset.BytesMessage {
227254
// CreateBytesMessageWithBytes is a JMS standard mechanism for creating a BytesMessage.
228255
func (ctx ContextImpl) CreateBytesMessageWithBytes(bytes []byte) jms20subset.BytesMessage {
229256

257+
// Lock the context while we are making calls to the queue manager so that it
258+
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
259+
ctx.ctxLock.Lock()
260+
defer ctx.ctxLock.Unlock()
261+
230262
thisMsgHandle := createMsgHandle(ctx.qMgr)
231263

232264
return &BytesMessageImpl{
@@ -240,6 +272,11 @@ func (ctx ContextImpl) CreateBytesMessageWithBytes(bytes []byte) jms20subset.Byt
240272
// Commit confirms all messages that were sent under this transaction.
241273
func (ctx ContextImpl) Commit() jms20subset.JMSException {
242274

275+
// Lock the context while we are making calls to the queue manager so that it
276+
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
277+
ctx.ctxLock.Lock()
278+
defer ctx.ctxLock.Unlock()
279+
243280
var retErr jms20subset.JMSException
244281

245282
if (ibmmq.MQQueueManager{}) != ctx.qMgr {
@@ -294,6 +331,11 @@ func (ctx ContextImpl) Commit() jms20subset.JMSException {
294331
// Rollback releases all messages that were sent under this transaction.
295332
func (ctx ContextImpl) Rollback() jms20subset.JMSException {
296333

334+
// Lock the context while we are making calls to the queue manager so that it
335+
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
336+
ctx.ctxLock.Lock()
337+
defer ctx.ctxLock.Unlock()
338+
297339
var retErr jms20subset.JMSException
298340

299341
if (ibmmq.MQQueueManager{}) != ctx.qMgr {
@@ -321,6 +363,12 @@ func (ctx ContextImpl) Close() {
321363
ctx.Rollback()
322364

323365
if (ibmmq.MQQueueManager{}) != ctx.qMgr {
366+
367+
// Lock the context while we are making calls to the queue manager so that it
368+
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
369+
ctx.ctxLock.Lock()
370+
defer ctx.ctxLock.Unlock()
371+
324372
ctx.qMgr.Disc()
325373
}
326374

mqjms/ProducerImpl.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ func (producer ProducerImpl) SendBytes(dest jms20subset.Destination, body []byte
5858
// that are defined on this JMSProducer.
5959
func (producer ProducerImpl) Send(dest jms20subset.Destination, msg jms20subset.Message) jms20subset.JMSException {
6060

61+
// Lock the context while we are making calls to the queue manager so that it
62+
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
63+
producer.ctx.ctxLock.Lock()
64+
defer producer.ctx.ctxLock.Unlock()
65+
6166
// Set up the basic objects we need to send the message.
6267
mqod := ibmmq.NewMQOD()
6368
putmqmd := ibmmq.NewMQMD()

0 commit comments

Comments
 (0)