Skip to content

Commit 304c2b2

Browse files
committed
IBM MQ implementation of interfaces
1 parent da4e7e8 commit 304c2b2

File tree

7 files changed

+1077
-0
lines changed

7 files changed

+1077
-0
lines changed

mqjms/ConnectionFactoryImpl.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Copyright (c) IBM Corporation 2019.
2+
//
3+
// This program and the accompanying materials are made available under the
4+
// terms of the Eclipse Public License 2.0, which is available at
5+
// http://www.eclipse.org/legal/epl-2.0.
6+
//
7+
// SPDX-License-Identifier: EPL-2.0
8+
9+
// Implementation of the JMS style Golang interfaces to communicate with IBM MQ.
10+
package mqjms
11+
12+
import (
13+
"github.com/ibm-messaging/mq-golang/ibmmq"
14+
"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
15+
"strconv"
16+
)
17+
18+
// ConnectionFactoryImpl defines a struct that contains attributes for
19+
// each of the key properties required to establish a connection to an IBM MQ
20+
// queue manager.
21+
//
22+
// The fields are defined as Public so that the struct can be initialised
23+
// programmatically using whatever approach the application prefers.
24+
type ConnectionFactoryImpl struct {
25+
QMName string
26+
Hostname string
27+
PortNumber int
28+
ChannelName string
29+
UserName string
30+
Password string
31+
}
32+
33+
// CreateContext implements the JMS method to create a connection to an IBM MQ
34+
// queue manager.
35+
func (cf ConnectionFactoryImpl) CreateContext() (jms20subset.JMSContext, jms20subset.JMSException) {
36+
37+
// Allocate the internal structures required to create an connection to IBM MQ.
38+
cno := ibmmq.NewMQCNO()
39+
cd := ibmmq.NewMQCD()
40+
41+
// Fill in the required fields in the channel definition structure
42+
cd.ChannelName = cf.ChannelName
43+
cd.ConnectionName = cf.Hostname + "(" + strconv.Itoa(cf.PortNumber) + ")"
44+
45+
// Store the user credentials in an MQCSP, which ensures that long passwords
46+
// can be used.
47+
csp := ibmmq.NewMQCSP()
48+
csp.AuthenticationType = ibmmq.MQCSP_AUTH_USER_ID_AND_PWD
49+
csp.UserId = cf.UserName
50+
csp.Password = cf.Password
51+
52+
// Join the objects together as necessary so that they can be provided as
53+
// part of the connect call.
54+
cno.ClientConn = cd
55+
cno.SecurityParms = csp
56+
57+
// Indicate that we want to use a client (TCP) connection.
58+
cno.Options = ibmmq.MQCNO_CLIENT_BINDING
59+
60+
var ctx jms20subset.JMSContext
61+
var retErr jms20subset.JMSException
62+
63+
// Use the objects that we have configured to create a connection to the
64+
// queue manager.
65+
qMgr, err := ibmmq.Connx(cf.QMName, cno)
66+
67+
if err == nil {
68+
69+
// Connection was created successfully, so we wrap the MQI object into
70+
// a new ContextImpl and return it to the caller.
71+
ctx = ContextImpl{
72+
qMgr: qMgr,
73+
}
74+
75+
} else {
76+
77+
// The underlying MQI call returned an error, so extract the relevant
78+
// details and pass it back to the caller as a JMSException
79+
rcInt := int(err.(*ibmmq.MQReturn).MQRC)
80+
errCode := strconv.Itoa(rcInt)
81+
reason := ibmmq.MQItoString("RC", rcInt)
82+
retErr = jms20subset.CreateJMSException(reason, errCode, err)
83+
84+
}
85+
86+
return ctx, retErr
87+
88+
}

mqjms/ConsumerImpl.go

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
// Copyright (c) IBM Corporation 2019.
2+
//
3+
// This program and the accompanying materials are made available under the
4+
// terms of the Eclipse Public License 2.0, which is available at
5+
// http://www.eclipse.org/legal/epl-2.0.
6+
//
7+
// SPDX-License-Identifier: EPL-2.0
8+
9+
//
10+
package mqjms
11+
12+
import (
13+
"errors"
14+
"github.com/ibm-messaging/mq-golang/ibmmq"
15+
"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
16+
"strconv"
17+
"strings"
18+
)
19+
20+
// ConsumerImpl defines a struct that contains the necessary objects for
21+
// receiving messages from a queue on an IBM MQ queue manager.
22+
type ConsumerImpl struct {
23+
qObject ibmmq.MQObject
24+
selector string
25+
}
26+
27+
// ReceiveNoWait implements the IBM MQ logic necessary to receive a message from
28+
// a Destination, or immediately return a nil Message if there is no available
29+
// message to be received.
30+
func (consumer ConsumerImpl) ReceiveNoWait() (jms20subset.Message, jms20subset.JMSException) {
31+
32+
// Prepare objects to be used in receiving the message.
33+
var msg jms20subset.Message
34+
var jmsErr jms20subset.JMSException
35+
36+
getmqmd := ibmmq.NewMQMD()
37+
gmo := ibmmq.NewMQGMO()
38+
buffer := make([]byte, 32768)
39+
40+
// Set the GMO (get message options)
41+
gmo.Options = ibmmq.MQGMO_NO_SYNCPOINT | ibmmq.MQGMO_FAIL_IF_QUIESCING
42+
43+
// Apply the selector if one has been specified in the Consumer
44+
err := applySelector(consumer.selector, getmqmd, gmo)
45+
if err != nil {
46+
jmsErr = jms20subset.CreateJMSException("ErrorParsingSelector", "ErrorParsingSelector", err)
47+
return nil, jmsErr
48+
}
49+
50+
// Use the prepared objects to ask for a message from the queue.
51+
datalen, err := consumer.qObject.Get(getmqmd, gmo, buffer)
52+
53+
if err == nil {
54+
55+
// Message received successfully (without error).
56+
// Currently we only support TextMessage, so extract the content of the
57+
// message and populate it into a text string.
58+
var msgBodyStr *string
59+
60+
if datalen > 0 {
61+
strContent := strings.TrimSpace(string(buffer[:datalen]))
62+
msgBodyStr = &strContent
63+
}
64+
65+
msg = &TextMessageImpl{
66+
bodyStr: msgBodyStr,
67+
mqmd: getmqmd,
68+
}
69+
70+
} else {
71+
72+
// Error code was returned from MQ call.
73+
mqret := err.(*ibmmq.MQReturn)
74+
75+
if mqret.MQRC == ibmmq.MQRC_NO_MSG_AVAILABLE {
76+
77+
// This isn't a real error - it's the way that MQ indicates that there
78+
// is no message available to be received.
79+
msg = nil
80+
81+
} else {
82+
83+
// Parse the details of the error and return it to the caller as
84+
// a JMSException
85+
rcInt := int(mqret.MQRC)
86+
errCode := strconv.Itoa(rcInt)
87+
reason := ibmmq.MQItoString("RC", rcInt)
88+
89+
jmsErr = jms20subset.CreateJMSException(reason, errCode, err)
90+
}
91+
92+
}
93+
94+
return msg, jmsErr
95+
}
96+
97+
// ReceiveStringBodyNoWait implements the IBM MQ logic necessary to receive a
98+
// message from a Destination and return its body as a string.
99+
//
100+
// If no message is immediately available to be returned then a nil is returned.
101+
func (consumer ConsumerImpl) ReceiveStringBodyNoWait() (*string, jms20subset.JMSException) {
102+
103+
var msgBodyStrPtr *string
104+
var jmsErr jms20subset.JMSException
105+
106+
// Get a message from the queue if one is available.
107+
msg, jmsErr := consumer.ReceiveNoWait()
108+
109+
// If we receive a message without any errors
110+
if jmsErr == nil && msg != nil {
111+
112+
switch msg := msg.(type) {
113+
case jms20subset.TextMessage:
114+
msgBodyStrPtr = msg.GetText()
115+
default:
116+
jmsErr = jms20subset.CreateJMSException(
117+
"Received message is not a TextMessage", "MQJMS6068", nil)
118+
}
119+
120+
}
121+
122+
return msgBodyStrPtr, jmsErr
123+
124+
}
125+
126+
// applySelector is responsible for converting the JMS style selector string
127+
// into the relevant options on the MQI structures so that the correct messages
128+
// are received by the application.
129+
func applySelector(selector string, getmqmd *ibmmq.MQMD, gmo *ibmmq.MQGMO) error {
130+
131+
if selector == "" {
132+
// No selector is provided, so nothing to do here.
133+
return nil
134+
}
135+
136+
// looking for something like "JMSCorrelationID = '01020304050607'"
137+
clauseSplits := strings.Split(selector, "=")
138+
139+
if len(clauseSplits) != 2 {
140+
return errors.New("Unable to parse selector " + selector)
141+
}
142+
143+
if strings.TrimSpace(clauseSplits[0]) != "JMSCorrelationID" {
144+
// Currently we only support correlID selectors, so error out quickly
145+
// if we see anything else.
146+
return errors.New("Only selectors on JMSCorrelationID are currently supported.")
147+
}
148+
149+
// Trim the value.
150+
value := strings.TrimSpace(clauseSplits[1])
151+
152+
// Check for a quote delimited value for the selector clause.
153+
if strings.HasPrefix(value, "'") &&
154+
strings.HasSuffix(value, "'") {
155+
156+
// Parse out the value, and convert it to bytes
157+
stringSplits := strings.Split(value, "'")
158+
correlIDStr := stringSplits[1]
159+
160+
if correlIDStr != "" {
161+
correlBytes := convertStringToMQBytes(correlIDStr)
162+
getmqmd.CorrelId = correlBytes
163+
} else {
164+
return errors.New("No value was found for CorrelationID")
165+
}
166+
167+
} else {
168+
return errors.New("Unable to parse quoted string from " + selector)
169+
}
170+
171+
return nil
172+
}
173+
174+
// Closes the JMSConsumer, releasing any resources that were allocated on
175+
// behalf of that consumer.
176+
func (consumer ConsumerImpl) Close() {
177+
178+
if (ibmmq.MQObject{}) != consumer.qObject {
179+
consumer.qObject.Close(0)
180+
}
181+
182+
return
183+
}

0 commit comments

Comments
 (0)