Skip to content

Commit a9da353

Browse files
committed
Subscribe to metadata via managed subscription instead of explicit replyQ
1 parent 72cbde8 commit a9da353

File tree

2 files changed

+36
-17
lines changed

2 files changed

+36
-17
lines changed

mqmetric/discover.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ func DiscoverAndSubscribe(queueList string, checkQueueList bool, metaPrefix stri
191191
func discoverClasses(metaPrefix string) error {
192192
var data []byte
193193
var sub ibmmq.MQObject
194+
var metaReplyQObj ibmmq.MQObject
194195
var err error
195196
var rootTopic string
196197

@@ -200,10 +201,10 @@ func discoverClasses(metaPrefix string) error {
200201
} else {
201202
rootTopic = metaPrefix + "/INFO/QMGR/" + resolvedQMgrName + "/Monitor/METADATA/CLASSES"
202203
}
203-
sub, err = subscribe(rootTopic)
204+
sub, err = subscribeManaged(rootTopic, &metaReplyQObj)
204205
if err == nil {
205-
data, err = getMessage(true)
206-
sub.Close(0)
206+
data, err = getMessageWithHObj(true, metaReplyQObj)
207+
defer sub.Close(0)
207208

208209
elemList, _ := parsePCFResponse(data)
209210

@@ -245,12 +246,13 @@ func discoverClasses(metaPrefix string) error {
245246
func discoverTypes(cl *MonClass) error {
246247
var data []byte
247248
var sub ibmmq.MQObject
249+
var metaReplyQObj ibmmq.MQObject
248250
var err error
249251

250-
sub, err = subscribe(cl.typesTopic)
252+
sub, err = subscribeManaged(cl.typesTopic, &metaReplyQObj)
251253
if err == nil {
252-
data, err = getMessage(true)
253-
sub.Close(0)
254+
data, err = getMessageWithHObj(true, metaReplyQObj)
255+
defer sub.Close(0)
254256

255257
elemList, _ := parsePCFResponse(data)
256258

@@ -293,12 +295,13 @@ func discoverElements(ty *MonType) error {
293295
var err error
294296
var data []byte
295297
var sub ibmmq.MQObject
298+
var metaReplyQObj ibmmq.MQObject
296299
var elem *MonElement
297300

298-
sub, err = subscribe(ty.elementTopic)
301+
sub, err = subscribeManaged(ty.elementTopic, &metaReplyQObj)
299302
if err == nil {
300-
data, err = getMessage(true)
301-
sub.Close(0)
303+
data, err = getMessageWithHObj(true, metaReplyQObj)
304+
defer sub.Close(0)
302305

303306
elemList, _ := parsePCFResponse(data)
304307

@@ -350,15 +353,16 @@ func discoverElementsNLS(ty *MonType, locale string) error {
350353
var err error
351354
var data []byte
352355
var sub ibmmq.MQObject
356+
var metaReplyQObj ibmmq.MQObject
353357

354358
if locale == "" {
355359
return nil
356360
}
357361

358-
sub, err = subscribe(ty.elementTopic + "/" + locale)
362+
sub, err = subscribe(ty.elementTopic+"/"+locale, &metaReplyQObj)
359363
if err == nil {
360364
// Don't wait - if there's nothing on that topic, then get out fast
361-
data, err = getMessage(false)
365+
data, err = getMessageWithHObj(false, metaReplyQObj)
362366
sub.Close(0)
363367

364368
if err != nil {
@@ -668,11 +672,11 @@ func createSubscriptions() error {
668672
continue
669673
}
670674
topic := fmt.Sprintf(ty.ObjectTopic, qList[i])
671-
sub, err = subscribe(topic)
675+
sub, err = subscribe(topic, &replyQObj)
672676
ty.subHobj[qList[i]] = sub
673677
}
674678
} else {
675-
sub, err = subscribe(ty.ObjectTopic)
679+
sub, err = subscribe(ty.ObjectTopic, &replyQObj)
676680
ty.subHobj[QMgrMapKey] = sub
677681
}
678682

mqmetric/mqif.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -218,10 +218,10 @@ func getMessageWithHObj(wait bool, hObj ibmmq.MQObject) ([]byte, error) {
218218

219219
if wait {
220220
gmo.Options |= ibmmq.MQGMO_WAIT
221-
gmo.WaitInterval = 30 * 1000
221+
gmo.WaitInterval = 10 * 1000
222222
}
223223

224-
datalen, err = replyQObj.Get(getmqmd, gmo, getBuffer)
224+
datalen, err = hObj.Get(getmqmd, gmo, getBuffer)
225225

226226
return getBuffer[0:datalen], err
227227
}
@@ -232,17 +232,32 @@ replyQ is used for publications; we do not use a managed queue here,
232232
so that everything can be read from one queue. The object handle for the
233233
subscription is returned so we can close it when it's no longer needed.
234234
*/
235-
func subscribe(topic string) (ibmmq.MQObject, error) {
235+
func subscribe(topic string, pubQObj *ibmmq.MQObject) (ibmmq.MQObject, error) {
236+
return subscribeWithOptions(topic, pubQObj, false)
237+
}
238+
239+
/*
240+
subscribe to the nominated topic, but ask the queue manager to
241+
allocate the replyQ for us
242+
*/
243+
func subscribeManaged(topic string, pubQObj *ibmmq.MQObject) (ibmmq.MQObject, error) {
244+
return subscribeWithOptions(topic, pubQObj, true)
245+
}
246+
247+
func subscribeWithOptions(topic string, pubQObj *ibmmq.MQObject, managed bool) (ibmmq.MQObject, error) {
236248
var err error
237249

238250
mqsd := ibmmq.NewMQSD()
239251
mqsd.Options = ibmmq.MQSO_CREATE
240252
mqsd.Options |= ibmmq.MQSO_NON_DURABLE
241253
mqsd.Options |= ibmmq.MQSO_FAIL_IF_QUIESCING
254+
if managed {
255+
mqsd.Options |= ibmmq.MQSO_MANAGED
256+
}
242257

243258
mqsd.ObjectString = topic
244259

245-
subObj, err := qMgr.Sub(mqsd, &replyQObj)
260+
subObj, err := qMgr.Sub(mqsd, pubQObj)
246261
if err != nil {
247262
return subObj, fmt.Errorf("Error subscribing to topic '%s': %v", topic, err)
248263
}

0 commit comments

Comments
 (0)