Skip to content

Commit bd39c38

Browse files
committed
Additional locking to protect msg props actions - #53
1 parent e98919e commit bd39c38

File tree

4 files changed

+193
-37
lines changed

4 files changed

+193
-37
lines changed

memoryleaks_test.go

Lines changed: 105 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,33 @@ import (
1515
"testing"
1616
"time"
1717

18+
"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
1819
"github.com/ibm-messaging/mq-golang-jms20/mqjms"
1920
"github.com/stretchr/testify/assert"
2021
)
2122

2223
/*
2324
* Test for memory leak when there is no message to be received.
25+
*
26+
* This test is not included in the normal bucket as it sends an enormous number of
27+
* messages, and requires human observation of the total process size to establish whether
28+
* it passes or not, so can only be run under human supervision
2429
*/
25-
func DONTRUN_TestLeakOnEmptyGet(t *testing.T) {
30+
func DONT_RUNTestLeakOnEmptyGet(t *testing.T) {
2631

2732
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
28-
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
29-
assert.Nil(t, cfErr)
33+
//cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
34+
//assert.Nil(t, cfErr)
35+
36+
// Initialise the attributes of the CF in whatever way you like
37+
cf := mqjms.ConnectionFactoryImpl{
38+
QMName: "QM1",
39+
Hostname: "localhost",
40+
PortNumber: 1414,
41+
ChannelName: "DEV.APP.SVRCONN",
42+
UserName: "app",
43+
Password: "passw0rd",
44+
}
3045

3146
// Creates a connection to the queue manager, using defer to close it automatically
3247
// at the end of the function (if it was created successfully)
@@ -47,7 +62,6 @@ func DONTRUN_TestLeakOnEmptyGet(t *testing.T) {
4762

4863
for i := 1; i < 35000; i++ {
4964

50-
//time.Sleep(100 * time.Millisecond)
5165
rcvMsg, errRvc := consumer.ReceiveNoWait()
5266
assert.Nil(t, errRvc)
5367
assert.Nil(t, rcvMsg)
@@ -64,3 +78,90 @@ func DONTRUN_TestLeakOnEmptyGet(t *testing.T) {
6478
time.Sleep(30 * time.Second)
6579

6680
}
81+
82+
/*
83+
* Test for memory leak when sending and receiving messages
84+
*
85+
* This test is not included in the normal bucket as it sends an enormous number of
86+
* messages, and requires human observation of the total process size to establish whether
87+
* it passes or not, so can only be run under human supervision
88+
*/
89+
func DONTRUN_TestLeakOnPutGet(t *testing.T) {
90+
91+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
92+
//cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
93+
//assert.Nil(t, cfErr)
94+
95+
// Initialise the attributes of the CF in whatever way you like
96+
cf := mqjms.ConnectionFactoryImpl{
97+
QMName: "QM1",
98+
Hostname: "localhost",
99+
PortNumber: 1414,
100+
ChannelName: "DEV.APP.SVRCONN",
101+
UserName: "app",
102+
Password: "passw0rd",
103+
}
104+
105+
// Creates a connection to the queue manager, using defer to close it automatically
106+
// at the end of the function (if it was created successfully)
107+
context, ctxErr := cf.CreateContext()
108+
assert.Nil(t, ctxErr)
109+
if context != nil {
110+
defer context.Close()
111+
}
112+
113+
// Now send the message and get it back again, to check that it roundtripped.
114+
queue := context.CreateQueue("DEV.QUEUE.1")
115+
116+
consumer, errCons := context.CreateConsumer(queue)
117+
if consumer != nil {
118+
defer consumer.Close()
119+
}
120+
assert.Nil(t, errCons)
121+
122+
ttlMillis := 20000
123+
producer := context.CreateProducer().SetTimeToLive(ttlMillis)
124+
125+
for i := 1; i < 25000; i++ {
126+
127+
// Create a TextMessage and check that we can populate it
128+
msgBody := "Message " + fmt.Sprint(i)
129+
txtMsg := context.CreateTextMessage()
130+
txtMsg.SetText(msgBody)
131+
txtMsg.SetIntProperty("MessageNumber", i)
132+
133+
errSend := producer.Send(queue, txtMsg)
134+
assert.Nil(t, errSend)
135+
136+
rcvMsg, errRvc := consumer.ReceiveNoWait()
137+
assert.Nil(t, errRvc)
138+
assert.NotNil(t, rcvMsg)
139+
140+
// Check message body.
141+
switch msg := rcvMsg.(type) {
142+
case jms20subset.TextMessage:
143+
assert.Equal(t, msgBody, *msg.GetText())
144+
default:
145+
assert.Fail(t, "Got something other than a text message")
146+
}
147+
148+
// Check messageID
149+
assert.Equal(t, txtMsg.GetJMSMessageID(), rcvMsg.GetJMSMessageID())
150+
151+
// Check int property
152+
rcvMsgNum, propErr := rcvMsg.GetIntProperty("MessageNumber")
153+
assert.Nil(t, propErr)
154+
assert.Equal(t, i, rcvMsgNum)
155+
156+
if i%1000 == 0 {
157+
fmt.Println("Messages:", i)
158+
}
159+
160+
}
161+
162+
fmt.Println("Finished receive calls - waiting for cooldown.")
163+
runtime.GC()
164+
165+
time.Sleep(30 * time.Second)
166+
167+
}

mqjms/ConsumerImpl.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
129129
MessageImpl: MessageImpl{
130130
mqmd: getmqmd,
131131
msgHandle: &thisMsgHandle,
132+
ctxLock: consumer.ctx.ctxLock,
132133
},
133134
}
134135

@@ -146,6 +147,7 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
146147
MessageImpl: MessageImpl{
147148
mqmd: getmqmd,
148149
msgHandle: &thisMsgHandle,
150+
ctxLock: consumer.ctx.ctxLock,
149151
},
150152
}
151153
}
@@ -404,6 +406,12 @@ func applySelector(selector string, getmqmd *ibmmq.MQMD, gmo *ibmmq.MQGMO) error
404406
func (consumer ConsumerImpl) Close() {
405407

406408
if (ibmmq.MQObject{}) != consumer.qObject {
409+
410+
// Lock the context while we are making calls to the queue manager so that it
411+
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
412+
consumer.ctx.ctxLock.Lock()
413+
defer consumer.ctx.ctxLock.Unlock()
414+
407415
consumer.qObject.Close(0)
408416
}
409417

mqjms/ContextImpl.go

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (ctx ContextImpl) CreateConsumer(dest jms20subset.Destination) (jms20subset
6868
func (ctx ContextImpl) CreateConsumerWithSelector(dest jms20subset.Destination, selector string) (jms20subset.JMSConsumer, jms20subset.JMSException) {
6969

7070
// Lock the context while we are making calls to the queue manager so that it
71-
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
71+
// doesn't conflict with the finalizer we use to delete unused MessageHandles.
7272
ctx.ctxLock.Lock()
7373
defer ctx.ctxLock.Unlock()
7474

@@ -126,7 +126,7 @@ func (ctx ContextImpl) CreateConsumerWithSelector(dest jms20subset.Destination,
126126
func (ctx ContextImpl) CreateBrowser(dest jms20subset.Destination) (jms20subset.QueueBrowser, jms20subset.JMSException) {
127127

128128
// Lock the context while we are making calls to the queue manager so that it
129-
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
129+
// doesn't conflict with the finalizer we use to delete unused MessageHandles.
130130
ctx.ctxLock.Lock()
131131
defer ctx.ctxLock.Unlock()
132132

@@ -177,18 +177,14 @@ func (ctx ContextImpl) CreateBrowser(dest jms20subset.Destination) (jms20subset.
177177
// CreateTextMessage is a JMS standard mechanism for creating a TextMessage.
178178
func (ctx ContextImpl) CreateTextMessage() jms20subset.TextMessage {
179179

180-
// Lock the context while we are making calls to the queue manager so that it
181-
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
182-
ctx.ctxLock.Lock()
183-
defer ctx.ctxLock.Unlock()
184-
185180
var bodyStr *string
186181
thisMsgHandle := ctx.createMsgHandle(ctx.qMgr)
187182

188183
return &TextMessageImpl{
189184
bodyStr: bodyStr,
190185
MessageImpl: MessageImpl{
191186
msgHandle: &thisMsgHandle,
187+
ctxLock: ctx.ctxLock,
192188
},
193189
}
194190
}
@@ -197,6 +193,11 @@ func (ctx ContextImpl) CreateTextMessage() jms20subset.TextMessage {
197193
// store and retrieve message properties.
198194
func (ctx ContextImpl) createMsgHandle(qMgr ibmmq.MQQueueManager) ibmmq.MQMessageHandle {
199195

196+
// Lock the context while we are making calls to the queue manager so that it
197+
// doesn't conflict with the finalizer we use to delete unused MessageHandles.
198+
ctx.ctxLock.Lock()
199+
defer ctx.ctxLock.Unlock()
200+
200201
cmho := ibmmq.NewMQCMHO()
201202
thisMsgHandle, err := qMgr.CrtMH(cmho)
202203

@@ -220,17 +221,13 @@ func (ctx ContextImpl) createMsgHandle(qMgr ibmmq.MQQueueManager) ibmmq.MQMessag
220221
// and initialise it with the chosen text string.
221222
func (ctx ContextImpl) CreateTextMessageWithString(txt string) jms20subset.TextMessage {
222223

223-
// Lock the context while we are making calls to the queue manager so that it
224-
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
225-
ctx.ctxLock.Lock()
226-
defer ctx.ctxLock.Unlock()
227-
228224
thisMsgHandle := ctx.createMsgHandle(ctx.qMgr)
229225

230226
msg := &TextMessageImpl{
231227
bodyStr: &txt,
232228
MessageImpl: MessageImpl{
233229
msgHandle: &thisMsgHandle,
230+
ctxLock: ctx.ctxLock,
234231
},
235232
}
236233

@@ -240,51 +237,44 @@ func (ctx ContextImpl) CreateTextMessageWithString(txt string) jms20subset.TextM
240237
// CreateBytesMessage is a JMS standard mechanism for creating a BytesMessage.
241238
func (ctx ContextImpl) CreateBytesMessage() jms20subset.BytesMessage {
242239

243-
// Lock the context while we are making calls to the queue manager so that it
244-
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
245-
ctx.ctxLock.Lock()
246-
defer ctx.ctxLock.Unlock()
247-
248240
var thisBodyBytes *[]byte
249241
thisMsgHandle := ctx.createMsgHandle(ctx.qMgr)
250242

251243
return &BytesMessageImpl{
252244
bodyBytes: thisBodyBytes,
253245
MessageImpl: MessageImpl{
254246
msgHandle: &thisMsgHandle,
247+
ctxLock: ctx.ctxLock,
255248
},
256249
}
257250
}
258251

259252
// CreateBytesMessageWithBytes is a JMS standard mechanism for creating a BytesMessage.
260253
func (ctx ContextImpl) CreateBytesMessageWithBytes(bytes []byte) jms20subset.BytesMessage {
261254

262-
// Lock the context while we are making calls to the queue manager so that it
263-
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
264-
ctx.ctxLock.Lock()
265-
defer ctx.ctxLock.Unlock()
266-
267255
thisMsgHandle := ctx.createMsgHandle(ctx.qMgr)
268256

269257
return &BytesMessageImpl{
270258
bodyBytes: &bytes,
271259
MessageImpl: MessageImpl{
272260
msgHandle: &thisMsgHandle,
261+
ctxLock: ctx.ctxLock,
273262
},
274263
}
275264
}
276265

277266
// Commit confirms all messages that were sent under this transaction.
278267
func (ctx ContextImpl) Commit() jms20subset.JMSException {
279268

280-
// Lock the context while we are making calls to the queue manager so that it
281-
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
282-
ctx.ctxLock.Lock()
283-
defer ctx.ctxLock.Unlock()
284-
285269
var retErr jms20subset.JMSException
286270

287271
if (ibmmq.MQQueueManager{}) != ctx.qMgr {
272+
273+
// Lock the context while we are making calls to the queue manager so that it
274+
// doesn't conflict with the finalizer we use to delete unused MessageHandles.
275+
ctx.ctxLock.Lock()
276+
defer ctx.ctxLock.Unlock()
277+
288278
err := ctx.qMgr.Cmit()
289279

290280
if err != nil {
@@ -336,14 +326,15 @@ func (ctx ContextImpl) Commit() jms20subset.JMSException {
336326
// Rollback releases all messages that were sent under this transaction.
337327
func (ctx ContextImpl) Rollback() jms20subset.JMSException {
338328

339-
// Lock the context while we are making calls to the queue manager so that it
340-
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
341-
ctx.ctxLock.Lock()
342-
defer ctx.ctxLock.Unlock()
343-
344329
var retErr jms20subset.JMSException
345330

346331
if (ibmmq.MQQueueManager{}) != ctx.qMgr {
332+
333+
// Lock the context while we are making calls to the queue manager so that it
334+
// doesn't conflict with the finalizer we use to delete unused MessageHandles.
335+
ctx.ctxLock.Lock()
336+
defer ctx.ctxLock.Unlock()
337+
347338
err := ctx.qMgr.Back()
348339

349340
if err != nil {
@@ -370,7 +361,7 @@ func (ctx ContextImpl) Close() {
370361
if (ibmmq.MQQueueManager{}) != ctx.qMgr {
371362

372363
// Lock the context while we are making calls to the queue manager so that it
373-
// doesn't conflict with the finalizer we use (below) to delete unused MessageHandles.
364+
// doesn't conflict with the finalizer we use to delete unused MessageHandles.
374365
ctx.ctxLock.Lock()
375366
defer ctx.ctxLock.Unlock()
376367

0 commit comments

Comments
 (0)