Skip to content

Commit b93d8e8

Browse files
authored
Merge pull request #63 from matrober-uk/memory-leaks
Fix memory leaks due to MessageHandles
2 parents af0d578 + bd39c38 commit b93d8e8

File tree

6 files changed

+335
-5
lines changed

6 files changed

+335
-5
lines changed

memoryleaks_test.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Copyright (c) IBM Corporation 2023
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License v. 2.0, which is available at
6+
* http://www.eclipse.org/legal/epl-2.0.
7+
*
8+
* SPDX-License-Identifier: EPL-2.0
9+
*/
10+
package main
11+
12+
import (
13+
"fmt"
14+
"runtime"
15+
"testing"
16+
"time"
17+
18+
"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
19+
"github.com/ibm-messaging/mq-golang-jms20/mqjms"
20+
"github.com/stretchr/testify/assert"
21+
)
22+
23+
/*
24+
* Test for memory leak when there is no message to be received.
25+
*
26+
* This test is not included in the normal bucket as it sends an enormous number of
27+
* messages, and requires human observation of the total process size to establish whether
28+
* it passes or not, so can only be run under human supervision
29+
*/
30+
func DONT_RUNTestLeakOnEmptyGet(t *testing.T) {
31+
32+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
33+
//cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
34+
//assert.Nil(t, cfErr)
35+
36+
// Initialise the attributes of the CF in whatever way you like
37+
cf := mqjms.ConnectionFactoryImpl{
38+
QMName: "QM1",
39+
Hostname: "localhost",
40+
PortNumber: 1414,
41+
ChannelName: "DEV.APP.SVRCONN",
42+
UserName: "app",
43+
Password: "passw0rd",
44+
}
45+
46+
// Creates a connection to the queue manager, using defer to close it automatically
47+
// at the end of the function (if it was created successfully)
48+
context, ctxErr := cf.CreateContext()
49+
assert.Nil(t, ctxErr)
50+
if context != nil {
51+
defer context.Close()
52+
}
53+
54+
// Now send the message and get it back again, to check that it roundtripped.
55+
queue := context.CreateQueue("DEV.QUEUE.1")
56+
57+
consumer, errCons := context.CreateConsumer(queue)
58+
if consumer != nil {
59+
defer consumer.Close()
60+
}
61+
assert.Nil(t, errCons)
62+
63+
for i := 1; i < 35000; i++ {
64+
65+
rcvMsg, errRvc := consumer.ReceiveNoWait()
66+
assert.Nil(t, errRvc)
67+
assert.Nil(t, rcvMsg)
68+
69+
if i%1000 == 0 {
70+
fmt.Println("Messages:", i)
71+
}
72+
73+
}
74+
75+
fmt.Println("Finished receive calls - waiting for cooldown.")
76+
runtime.GC()
77+
78+
time.Sleep(30 * time.Second)
79+
80+
}
81+
82+
/*
83+
* Test for memory leak when sending and receiving messages
84+
*
85+
* This test is not included in the normal bucket as it sends an enormous number of
86+
* messages, and requires human observation of the total process size to establish whether
87+
* it passes or not, so can only be run under human supervision
88+
*/
89+
func DONTRUN_TestLeakOnPutGet(t *testing.T) {
90+
91+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
92+
//cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
93+
//assert.Nil(t, cfErr)
94+
95+
// Initialise the attributes of the CF in whatever way you like
96+
cf := mqjms.ConnectionFactoryImpl{
97+
QMName: "QM1",
98+
Hostname: "localhost",
99+
PortNumber: 1414,
100+
ChannelName: "DEV.APP.SVRCONN",
101+
UserName: "app",
102+
Password: "passw0rd",
103+
}
104+
105+
// Creates a connection to the queue manager, using defer to close it automatically
106+
// at the end of the function (if it was created successfully)
107+
context, ctxErr := cf.CreateContext()
108+
assert.Nil(t, ctxErr)
109+
if context != nil {
110+
defer context.Close()
111+
}
112+
113+
// Now send the message and get it back again, to check that it roundtripped.
114+
queue := context.CreateQueue("DEV.QUEUE.1")
115+
116+
consumer, errCons := context.CreateConsumer(queue)
117+
if consumer != nil {
118+
defer consumer.Close()
119+
}
120+
assert.Nil(t, errCons)
121+
122+
ttlMillis := 20000
123+
producer := context.CreateProducer().SetTimeToLive(ttlMillis)
124+
125+
for i := 1; i < 25000; i++ {
126+
127+
// Create a TextMessage and check that we can populate it
128+
msgBody := "Message " + fmt.Sprint(i)
129+
txtMsg := context.CreateTextMessage()
130+
txtMsg.SetText(msgBody)
131+
txtMsg.SetIntProperty("MessageNumber", i)
132+
133+
errSend := producer.Send(queue, txtMsg)
134+
assert.Nil(t, errSend)
135+
136+
rcvMsg, errRvc := consumer.ReceiveNoWait()
137+
assert.Nil(t, errRvc)
138+
assert.NotNil(t, rcvMsg)
139+
140+
// Check message body.
141+
switch msg := rcvMsg.(type) {
142+
case jms20subset.TextMessage:
143+
assert.Equal(t, msgBody, *msg.GetText())
144+
default:
145+
assert.Fail(t, "Got something other than a text message")
146+
}
147+
148+
// Check messageID
149+
assert.Equal(t, txtMsg.GetJMSMessageID(), rcvMsg.GetJMSMessageID())
150+
151+
// Check int property
152+
rcvMsgNum, propErr := rcvMsg.GetIntProperty("MessageNumber")
153+
assert.Nil(t, propErr)
154+
assert.Equal(t, i, rcvMsgNum)
155+
156+
if i%1000 == 0 {
157+
fmt.Println("Messages:", i)
158+
}
159+
160+
}
161+
162+
fmt.Println("Finished receive calls - waiting for cooldown.")
163+
runtime.GC()
164+
165+
time.Sleep(30 * time.Second)
166+
167+
}

mqjms/ConnectionFactoryImpl.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ package mqjms
1111

1212
import (
1313
"strconv"
14+
"sync"
1415

1516
"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
1617
ibmmq "github.com/ibm-messaging/mq-golang/v5/ibmmq"
@@ -155,6 +156,7 @@ func (cf ConnectionFactoryImpl) CreateContextWithSessionMode(sessionMode int, mq
155156
// a new ContextImpl and return it to the caller.
156157
ctx = ContextImpl{
157158
qMgr: qMgr,
159+
ctxLock: &sync.Mutex{},
158160
sessionMode: sessionMode,
159161
receiveBufferSize: cf.ReceiveBufferSize,
160162
sendCheckCount: cf.SendCheckCount,

mqjms/ConsumerImpl.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@ package mqjms
1111

1212
import (
1313
"errors"
14+
"fmt"
15+
"runtime"
1416
"strconv"
1517
"strings"
18+
"sync"
1619

1720
"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
1821
ibmmq "github.com/ibm-messaging/mq-golang/v5/ibmmq"
@@ -57,6 +60,11 @@ func (consumer ConsumerImpl) Receive(waitMillis int32) (jms20subset.Message, jms
5760
// of receive.
5861
func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Message, jms20subset.JMSException) {
5962

63+
// Lock the context while we are making calls to the queue manager so that it
64+
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
65+
consumer.ctx.ctxLock.Lock()
66+
defer consumer.ctx.ctxLock.Unlock()
67+
6068
// Prepare objects to be used in receiving the message.
6169
var msg jms20subset.Message
6270
var jmsErr jms20subset.JMSException
@@ -99,6 +107,11 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
99107

100108
if err == nil {
101109

110+
// Set a finalizer on the message handle to allow it to be deleted
111+
// when it is no longer referenced by an active object, to reduce/prevent
112+
// memory leaks.
113+
setMessageHandlerFinalizer(thisMsgHandle, consumer.ctx.ctxLock)
114+
102115
// Message received successfully (without error).
103116
// Determine on the basis of the format field what sort of message to create.
104117

@@ -116,6 +129,7 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
116129
MessageImpl: MessageImpl{
117130
mqmd: getmqmd,
118131
msgHandle: &thisMsgHandle,
132+
ctxLock: consumer.ctx.ctxLock,
119133
},
120134
}
121135

@@ -133,6 +147,7 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
133147
MessageImpl: MessageImpl{
134148
mqmd: getmqmd,
135149
msgHandle: &thisMsgHandle,
150+
ctxLock: consumer.ctx.ctxLock,
136151
},
137152
}
138153
}
@@ -142,6 +157,11 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
142157
// Error code was returned from MQ call.
143158
mqret := err.(*ibmmq.MQReturn)
144159

160+
// Delete the message handle object in-line here now that it is no longer required,
161+
// to avoid memory leak
162+
dmho := ibmmq.NewMQDMHO()
163+
gmo.MsgHandle.DltMH(dmho)
164+
145165
if mqret.MQRC == ibmmq.MQRC_NO_MSG_AVAILABLE {
146166

147167
// This isn't a real error - it's the way that MQ indicates that there
@@ -164,6 +184,36 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
164184
return msg, jmsErr
165185
}
166186

187+
/*
188+
* Set a finalizer on the message handle to allow it to be deleted
189+
* when it is no longer referenced by an active object, to reduce/prevent
190+
* memory leaks.
191+
*/
192+
func setMessageHandlerFinalizer(thisMsgHandle ibmmq.MQMessageHandle, ctxLock *sync.Mutex) {
193+
194+
runtime.SetFinalizer(&thisMsgHandle, func(msgHandle *ibmmq.MQMessageHandle) {
195+
ctxLock.Lock()
196+
defer ctxLock.Unlock()
197+
198+
dmho := ibmmq.NewMQDMHO()
199+
err := msgHandle.DltMH(dmho)
200+
if err != nil {
201+
202+
mqret := err.(*ibmmq.MQReturn)
203+
204+
if mqret.MQRC == ibmmq.MQRC_HCONN_ERROR {
205+
// Expected if the connection is closed before the finalizer executes
206+
// (at which point it should get tidied up automatically by the connection)
207+
} else {
208+
fmt.Println("DltMH finalizer", err)
209+
}
210+
211+
}
212+
213+
})
214+
215+
}
216+
167217
// ReceiveStringBodyNoWait implements the IBM MQ logic necessary to receive a
168218
// message from a Destination and return its body as a string.
169219
//
@@ -356,6 +406,12 @@ func applySelector(selector string, getmqmd *ibmmq.MQMD, gmo *ibmmq.MQGMO) error
356406
func (consumer ConsumerImpl) Close() {
357407

358408
if (ibmmq.MQObject{}) != consumer.qObject {
409+
410+
// Lock the context while we are making calls to the queue manager so that it
411+
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
412+
consumer.ctx.ctxLock.Lock()
413+
defer consumer.ctx.ctxLock.Unlock()
414+
359415
consumer.qObject.Close(0)
360416
}
361417

0 commit comments

Comments
 (0)