Skip to content

Commit 046052a

Browse files
committed
Support for message Priority - #51
1 parent bcf9c99 commit 046052a

File tree

9 files changed

+287
-1
lines changed

9 files changed

+287
-1
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ your own error handling or logging.
124124
* Request/reply messaging pattern - [requestreply_test.go](requestreply_test.go)
125125
* Send and receive under a local transaction - [local_transaction_test.go](local_transaction_test.go)
126126
* Sending a message that expires after a period of time - [timetolive_test.go](timetolive_test.go)
127+
* Sending a message with a specified priority - [priority_test.go](priority_test.go)
127128
* Handle error codes returned by the queue manager - [sample_errorhandling_test.go](sample_errorhandling_test.go)
128129
* Set the application name (ApplName) on connections - [applname_test.go](applname_test.go)
129130
* Receive messages over 32kb in size by setting the receive buffer size - [largemessage_test.go](largemessage_test.go)

jms20subset/JMSProducer.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,11 @@ type JMSProducer interface {
5959
// GetTimeToLive returns the time to live (in milliseconds) that will be
6060
// applied to messages that are sent using this JMSProducer.
6161
GetTimeToLive() int
62+
63+
// SetPriority sets the message priority for all messages sent by this producer.
64+
SetPriority(priority int) JMSProducer
65+
66+
// GetPriority returns the priority for all messages sent by this producer.
67+
// Default priority is 4.
68+
GetPriority() int
6269
}

jms20subset/Message.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ type Message interface {
5252
// jms20subset.DeliveryMode_PERSISTENT and jms20subset.DeliveryMode_NON_PERSISTENT
5353
GetJMSDeliveryMode() int
5454

55+
// GetJMSPriority returns the priority that is specified for this message.
56+
GetJMSPriority() int
57+
5558
// SetStringProperty enables an application to set a string-type message property.
5659
//
5760
// value is *string which allows a nil value to be specified, to unset an individual

jms20subset/Priority.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Derived from the Eclipse Project for JMS, available at;
2+
// https://github.com/eclipse-ee4j/jms-api
3+
//
4+
// This program and the accompanying materials are made available under the
5+
// terms of the Eclipse Public License 2.0, which is available at
6+
// http://www.eclipse.org/legal/epl-2.0.
7+
//
8+
// SPDX-License-Identifier: EPL-2.0
9+
10+
// Package jms20subset provides interfaces for messaging applications in the style of the Java Message Service (JMS) API.
11+
package jms20subset
12+
13+
// Go doesn't allow constants in structs so the naming of this file is only for
14+
// logical grouping purposes. The constants are package scoped, but we use a
15+
// prefix to the naming in order to maintain similarity with Java JMS.
16+
17+
// Priority_DEFAULT is the default priority for messages sent using a Producer.
18+
const Priority_DEFAULT int = 4

mqjms/ContextImpl.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func (ctx ContextImpl) CreateProducer() jms20subset.JMSProducer {
4949
producer := ProducerImpl{
5050
ctx: ctx,
5151
deliveryMode: jms20subset.DeliveryMode_PERSISTENT,
52+
priority: jms20subset.Priority_DEFAULT,
5253
}
5354

5455
return &producer

mqjms/MessageImpl.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,20 @@ func (msg *MessageImpl) GetJMSDeliveryMode() int {
5454
return jmsPersistence
5555
}
5656

57+
// GetJMSPriority extracts the message priority from the native MQ message descriptor.
58+
func (msg *MessageImpl) GetJMSPriority() int {
59+
60+
pri := 4
61+
62+
// Extract the MsgId field from the MQ message descriptor if one exists.
63+
// Note that if there is no MQMD then there is no messageID to return.
64+
if msg.mqmd != nil {
65+
pri = int(msg.mqmd.Priority)
66+
}
67+
68+
return pri
69+
}
70+
5771
// GetJMSMessageID extracts the message ID from the native MQ message descriptor.
5872
func (msg *MessageImpl) GetJMSMessageID() string {
5973
msgIDStr := ""

mqjms/ProducerImpl.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type ProducerImpl struct {
2525
ctx ContextImpl
2626
deliveryMode int
2727
timeToLive int
28+
priority int
2829
}
2930

3031
// SendString sends a TextMessage with the specified body to the specified Destination
@@ -152,6 +153,10 @@ func (producer ProducerImpl) Send(dest jms20subset.Destination, msg jms20subset.
152153
putmqmd.Expiry = (int32(producer.timeToLive) / 100)
153154
}
154155

156+
// Convert the JMS priority into the equivalent MQ message descriptor
157+
// attribute.
158+
putmqmd.Priority = int32(producer.priority)
159+
155160
// Invoke the MQ command to put the message using MQPUT1 to avoid MQOPEN and MQCLOSE.
156161
// Any Err that occurs will be handled below.
157162
err := producer.ctx.qMgr.Put1(mqod, putmqmd, pmo, buffer)
@@ -310,3 +315,28 @@ func (producer *ProducerImpl) SetTimeToLive(timeToLive int) jms20subset.JMSProdu
310315
func (producer *ProducerImpl) GetTimeToLive() int {
311316
return producer.timeToLive
312317
}
318+
319+
// SetPriority contains the MQ logic necessary to store the specified
320+
// priority parameter inside the Producer object so that it can be
321+
// applied when sending messages using this Producer.
322+
func (producer *ProducerImpl) SetPriority(priority int) jms20subset.JMSProducer {
323+
324+
// Only accept a non-negative value for priority.
325+
if priority >= 0 {
326+
producer.priority = priority
327+
328+
} else {
329+
// Normally we would throw an error here to indicate that an invalid value
330+
// was specified, however we have decided that it is more useful to support
331+
// method chaining, which prevents us from returning an error object.
332+
// Instead we settle for printing an error message to the console.
333+
fmt.Println("Invalid Priority specified: " + strconv.FormatInt(int64(priority), 10))
334+
}
335+
336+
return producer
337+
}
338+
339+
// GetPriority returns the priority for all messages sent by this producer.
340+
func (producer *ProducerImpl) GetPriority() int {
341+
return producer.priority
342+
}

next-features.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ Not currently implemented:
99
- SendToQmgr, ReplyToQmgr
1010
- Topics (pub/sub)
1111
- Temporary destinations
12-
- Priority
1312
- Configurable option to auto-set the receive buffer length if the default 32kb is exceeded (less efficient that setting up front)
1413

1514
Client capabilities for participating in Uniform Clusters;

priority_test.go

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* Copyright (c) IBM Corporation 2022
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License v. 2.0, which is available at
6+
* http://www.eclipse.org/legal/epl-2.0.
7+
*
8+
* SPDX-License-Identifier: EPL-2.0
9+
*/
10+
package main
11+
12+
import (
13+
"testing"
14+
15+
"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
16+
"github.com/ibm-messaging/mq-golang-jms20/mqjms"
17+
"github.com/stretchr/testify/assert"
18+
)
19+
20+
/*
21+
* Test the retrieval of special header properties
22+
*/
23+
func TestPrioritySetGet(t *testing.T) {
24+
25+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
26+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
27+
assert.Nil(t, cfErr)
28+
29+
// Creates a connection to the queue manager, using defer to close it automatically
30+
// at the end of the function (if it was created successfully)
31+
context, ctxErr := cf.CreateContext()
32+
assert.Nil(t, ctxErr)
33+
if context != nil {
34+
defer context.Close()
35+
}
36+
37+
// Create a TextMessage and check that we can populate it
38+
msgBody := "PriorityMsg"
39+
txtMsg := context.CreateTextMessage()
40+
txtMsg.SetText(msgBody)
41+
42+
// Set up objects for send/receive
43+
queue := context.CreateQueue("DEV.QUEUE.1")
44+
consumer, errCons := context.CreateConsumer(queue)
45+
if consumer != nil {
46+
defer consumer.Close()
47+
}
48+
assert.Nil(t, errCons)
49+
50+
// Now send the message and get it back again, to check that it roundtripped.
51+
ttlMillis := 20000
52+
producer := context.CreateProducer().SetTimeToLive(ttlMillis)
53+
54+
// Check the default priority.
55+
assert.Equal(t, jms20subset.Priority_DEFAULT, producer.GetPriority())
56+
57+
errSend := producer.Send(queue, txtMsg)
58+
assert.Nil(t, errSend)
59+
60+
rcvMsg, errRvc := consumer.ReceiveNoWait()
61+
assert.Nil(t, errRvc)
62+
assert.NotNil(t, rcvMsg)
63+
64+
switch msg := rcvMsg.(type) {
65+
case jms20subset.TextMessage:
66+
assert.Equal(t, msgBody, *msg.GetText())
67+
default:
68+
assert.Fail(t, "Got something other than a text message")
69+
}
70+
71+
// Check the Priority
72+
gotPropValue := rcvMsg.GetJMSPriority()
73+
assert.Equal(t, jms20subset.Priority_DEFAULT, gotPropValue)
74+
75+
// -------
76+
// Go again with a different priority.
77+
newMsgPriority := 2
78+
producer = producer.SetPriority(newMsgPriority)
79+
assert.Equal(t, newMsgPriority, producer.GetPriority())
80+
81+
errSend = producer.Send(queue, txtMsg)
82+
assert.Nil(t, errSend)
83+
84+
rcvMsg, errRvc = consumer.ReceiveNoWait()
85+
assert.Nil(t, errRvc)
86+
assert.NotNil(t, rcvMsg)
87+
88+
switch msg := rcvMsg.(type) {
89+
case jms20subset.TextMessage:
90+
assert.Equal(t, msgBody, *msg.GetText())
91+
default:
92+
assert.Fail(t, "Got something other than a text message")
93+
}
94+
95+
// Check the Priority
96+
gotPropValue = rcvMsg.GetJMSPriority()
97+
assert.Equal(t, newMsgPriority, gotPropValue)
98+
99+
// -------
100+
// Go again with a different priority.
101+
newMsgPriority2 := 7
102+
producer = producer.SetPriority(newMsgPriority2)
103+
assert.Equal(t, newMsgPriority2, producer.GetPriority())
104+
105+
errSend = producer.Send(queue, txtMsg)
106+
assert.Nil(t, errSend)
107+
108+
rcvMsg, errRvc = consumer.ReceiveNoWait()
109+
assert.Nil(t, errRvc)
110+
assert.NotNil(t, rcvMsg)
111+
112+
switch msg := rcvMsg.(type) {
113+
case jms20subset.TextMessage:
114+
assert.Equal(t, msgBody, *msg.GetText())
115+
default:
116+
assert.Fail(t, "Got something other than a text message")
117+
}
118+
119+
// Check the Priority
120+
gotPropValue = rcvMsg.GetJMSPriority()
121+
assert.Equal(t, newMsgPriority2, gotPropValue)
122+
123+
}
124+
125+
/*
126+
* Test the functional behaviour of messages sent with different priorities,
127+
* where the expectation is that they should be returned from the queue in priority
128+
* order (with FIFO within a priority group), and not FIFO across the entire queue.
129+
*/
130+
func TestPriorityOrdering(t *testing.T) {
131+
132+
// Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory
133+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
134+
assert.Nil(t, cfErr)
135+
136+
// Creates a connection to the queue manager, using defer to close it automatically
137+
// at the end of the function (if it was created successfully)
138+
context, ctxErr := cf.CreateContext()
139+
assert.Nil(t, ctxErr)
140+
if context != nil {
141+
defer context.Close()
142+
}
143+
144+
// Set up objects for send/receive
145+
queue := context.CreateQueue("DEV.QUEUE.1")
146+
consumer, errCons := context.CreateConsumer(queue)
147+
if consumer != nil {
148+
defer consumer.Close()
149+
}
150+
assert.Nil(t, errCons)
151+
152+
// Send a sequence of messages with different priorities.
153+
id_msg1_pri5 := sendMessageWithPriority(t, context, queue, 5)
154+
id_msg2_pri2 := sendMessageWithPriority(t, context, queue, 2)
155+
id_msg3_pri8 := sendMessageWithPriority(t, context, queue, 8)
156+
id_msg4_pri2 := sendMessageWithPriority(t, context, queue, 2)
157+
id_msg5_pri4 := sendMessageWithPriority(t, context, queue, 4)
158+
id_msg6_pri5 := sendMessageWithPriority(t, context, queue, 5)
159+
id_msg7_pri2 := sendMessageWithPriority(t, context, queue, 2)
160+
id_msg8_pri2 := sendMessageWithPriority(t, context, queue, 2)
161+
id_msg9_pri4 := sendMessageWithPriority(t, context, queue, 4)
162+
id_msg10_pri1 := sendMessageWithPriority(t, context, queue, 1)
163+
164+
// We expect them to be returned in priority group order (highest first),
165+
// with FIFO within the priority group.
166+
//
167+
// This behaviour relies on MsgDeliverySequence for the queue
168+
// being set to MQMDS_PRIORITY (which is the default).
169+
receiveMessageAndCheck(t, consumer, id_msg3_pri8, 8)
170+
receiveMessageAndCheck(t, consumer, id_msg1_pri5, 5)
171+
receiveMessageAndCheck(t, consumer, id_msg6_pri5, 5)
172+
receiveMessageAndCheck(t, consumer, id_msg5_pri4, 4)
173+
receiveMessageAndCheck(t, consumer, id_msg9_pri4, 4)
174+
receiveMessageAndCheck(t, consumer, id_msg2_pri2, 2)
175+
receiveMessageAndCheck(t, consumer, id_msg4_pri2, 2)
176+
receiveMessageAndCheck(t, consumer, id_msg7_pri2, 2)
177+
receiveMessageAndCheck(t, consumer, id_msg8_pri2, 2)
178+
receiveMessageAndCheck(t, consumer, id_msg10_pri1, 1)
179+
180+
}
181+
182+
// Send a message with the specified priority.
183+
// Return the generated MessageID.
184+
func sendMessageWithPriority(t *testing.T, context jms20subset.JMSContext, queue jms20subset.Queue, priority int) string {
185+
186+
ttlMillis := 20000
187+
producer := context.CreateProducer().SetTimeToLive(ttlMillis)
188+
producer = producer.SetPriority(priority)
189+
190+
// Create a TextMessage and check that we can populate it
191+
msgBody := "PriorityOrderingMsg"
192+
txtMsg := context.CreateTextMessage()
193+
txtMsg.SetText(msgBody)
194+
195+
errSend := producer.Send(queue, txtMsg)
196+
assert.Nil(t, errSend)
197+
198+
return txtMsg.GetJMSMessageID()
199+
200+
}
201+
202+
// Try to receive a message from the queue and check that it matches
203+
// the expected attributes
204+
func receiveMessageAndCheck(t *testing.T, consumer jms20subset.JMSConsumer, expectedMsgID string, expectedPriority int) {
205+
206+
rcvMsg, errRvc := consumer.ReceiveNoWait()
207+
assert.Nil(t, errRvc)
208+
assert.NotNil(t, rcvMsg)
209+
210+
assert.Equal(t, expectedPriority, rcvMsg.GetJMSPriority())
211+
assert.Equal(t, expectedMsgID, rcvMsg.GetJMSMessageID())
212+
213+
}

0 commit comments

Comments
 (0)