Skip to content

Commit 4b00057

Browse files
authored
Merge pull request #49 from ayeshLK/main
Add Support for Retrieving Messages from IBM MQ by Matching Correlation ID and Message ID
2 parents 7b48395 + 17f0636 commit 4b00057

File tree

14 files changed

+376
-25
lines changed

14 files changed

+376
-25
lines changed

ballerina/Ballerina.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
org = "ballerinax"
33
name = "ibm.ibmmq"
4-
version = "1.0.0"
4+
version = "1.1.0"
55
authors = ["Ballerina"]
66
keywords = ["ibm.ibmmq", "client", "messaging", "network", "pubsub"]
77
repository = "https://github.com/ballerina-platform/module-ballerinax-ibm.ibmmq"
@@ -12,8 +12,8 @@ distribution = "2201.9.0"
1212
[[platform.java17.dependency]]
1313
groupId = "io.ballerina.stdlib"
1414
artifactId = "ibm.ibmmq-native"
15-
version = "1.0.0"
16-
path = "../native/build/libs/ibm.ibmmq-native-1.0.0.jar"
15+
version = "1.1.0"
16+
path = "../native/build/libs/ibm.ibmmq-native-1.1.0-SNAPSHOT.jar"
1717

1818
[[platform.java17.dependency]]
1919
groupId = "org.json"

ballerina/Dependencies.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ modules = [
9090
[[package]]
9191
org = "ballerinax"
9292
name = "ibm.ibmmq"
93-
version = "1.0.0"
93+
version = "1.1.0"
9494
dependencies = [
9595
{org = "ballerina", name = "crypto"},
9696
{org = "ballerina", name = "jballerina.java"},

ballerina/tests/queue_producer_consumer_tests.bal

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,3 +518,178 @@ function produceAndConsumerMessageWithMultipleHeaderTypesWithJsonPayloadTest() r
518518
check queueManager.disconnect();
519519
}
520520

521+
@test:Config {
522+
groups: ["ibmmqQueue", "matchOptions"]
523+
}
524+
function produceConsumeWithMsgId() returns error? {
525+
QueueManager queueManager = check new (
526+
name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN",
527+
userID = "app", password = "password");
528+
Queue queue = check queueManager.accessQueue("DEV.QUEUE.2", MQOO_OUTPUT | MQOO_INPUT_AS_Q_DEF);
529+
530+
byte[] providedMsgId = "msg-id-1".toBytes();
531+
string messageContent = "This is a sample message with a message-id.";
532+
check queue->put({
533+
messageId: providedMsgId,
534+
payload: messageContent.toBytes()
535+
});
536+
537+
Message? message = check queue->get(matchOptions = { messageId: providedMsgId });
538+
test:assertTrue(message is Message, "Could not retrieve a message for a valid message identifier");
539+
540+
byte[]? payload = message?.payload;
541+
test:assertEquals(string:fromBytes(check payload.ensureType()), messageContent);
542+
543+
check queue->close();
544+
check queueManager.disconnect();
545+
}
546+
547+
@test:Config {
548+
groups: ["ibmmqQueue", "matchOptions"]
549+
}
550+
function produceConsumeWithInvalidMsgId() returns error? {
551+
QueueManager queueManager = check new (
552+
name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN",
553+
userID = "app", password = "password");
554+
Queue queue = check queueManager.accessQueue("DEV.QUEUE.2", MQOO_OUTPUT | MQOO_INPUT_AS_Q_DEF);
555+
556+
string messageContent = "This is a sample message with a message-id.";
557+
check queue->put({
558+
payload: messageContent.toBytes()
559+
});
560+
561+
Message? message = check queue->get(matchOptions = { messageId: "test-msg-id-1".toBytes() });
562+
test:assertTrue(message is (), "Retrieved a message for an invalid message identifier");
563+
564+
check queue->close();
565+
check queueManager.disconnect();
566+
}
567+
568+
@test:Config {
569+
groups: ["ibmmqQueue", "matchOptions"]
570+
}
571+
function produceConsumeWithCorrId() returns error? {
572+
QueueManager queueManager = check new (
573+
name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN",
574+
userID = "app", password = "password");
575+
Queue queue = check queueManager.accessQueue("DEV.QUEUE.2", MQOO_OUTPUT | MQOO_INPUT_AS_Q_DEF);
576+
577+
byte[] providedCorrId = "corr-id-1".toBytes();
578+
string messageContent = "This is a sample message with a correlation-id.";
579+
check queue->put({
580+
correlationId: providedCorrId,
581+
payload: messageContent.toBytes()
582+
});
583+
584+
Message? message = check queue->get(matchOptions = { correlationId: providedCorrId });
585+
test:assertTrue(message is Message, "Could not retrieve a message for a valid correlation identifier");
586+
587+
byte[]? correlationId = message?.correlationId;
588+
test:assertTrue(correlationId is byte[], "Could not find the correlation identifier for the message");
589+
590+
byte[]? payload = message?.payload;
591+
test:assertEquals(string:fromBytes(check payload.ensureType()), messageContent);
592+
593+
check queue->close();
594+
check queueManager.disconnect();
595+
}
596+
597+
@test:Config {
598+
groups: ["ibmmqQueue", "matchOptions"]
599+
}
600+
function produceConsumeWithInvalidCorrId() returns error? {
601+
QueueManager queueManager = check new (
602+
name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN",
603+
userID = "app", password = "password");
604+
Queue queue = check queueManager.accessQueue("DEV.QUEUE.2", MQOO_OUTPUT | MQOO_INPUT_AS_Q_DEF);
605+
606+
string messageContent = "This is a sample message with a message-id.";
607+
check queue->put({
608+
payload: messageContent.toBytes()
609+
});
610+
611+
Message? message = check queue->get(matchOptions = { correlationId: "test-corr-id-1".toBytes() });
612+
test:assertTrue(message is (), "Retrieved a message for an invalid correlation identifier");
613+
614+
check queue->close();
615+
check queueManager.disconnect();
616+
}
617+
618+
@test:Config {
619+
groups: ["ibmmqQueue", "matchOptions"]
620+
}
621+
function produceConsumeWithMsgIdAndCorrId() returns error? {
622+
QueueManager queueManager = check new (
623+
name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN",
624+
userID = "app", password = "password");
625+
Queue queue = check queueManager.accessQueue("DEV.QUEUE.2", MQOO_OUTPUT | MQOO_INPUT_AS_Q_DEF);
626+
627+
byte[] providedMsgId = "msg-id-1".toBytes();
628+
byte[] providedCorrId = "corr-id-1".toBytes();
629+
string messageContent = "This is a sample message with a message-id and a correlation-id.";
630+
check queue->put({
631+
messageId: providedMsgId,
632+
correlationId: providedCorrId,
633+
payload: messageContent.toBytes()
634+
});
635+
636+
Message? message = check queue->get(matchOptions = { messageId: providedMsgId, correlationId: providedCorrId });
637+
test:assertTrue(message is Message, "Could not retrieve a message for a valid message identifier and correlation identifier");
638+
639+
byte[]? payload = message?.payload;
640+
test:assertEquals(string:fromBytes(check payload.ensureType()), messageContent);
641+
642+
check queue->close();
643+
check queueManager.disconnect();
644+
}
645+
646+
@test:Config {
647+
groups: ["ibmmqQueue", "matchOptions"],
648+
dependsOn: [produceConsumeWithMsgIdAndCorrId]
649+
}
650+
function produceConsumeWithInvalidMsgIdAndCorrId() returns error? {
651+
QueueManager queueManager = check new (
652+
name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN",
653+
userID = "app", password = "password");
654+
Queue queue = check queueManager.accessQueue("DEV.QUEUE.2", MQOO_OUTPUT | MQOO_INPUT_AS_Q_DEF);
655+
656+
byte[] providedMsgId = "msg-id-1".toBytes();
657+
byte[] providedCorrId = "corr-id-1".toBytes();
658+
string messageContent = "This is a sample message with a message-id and a correlation-id.";
659+
check queue->put({
660+
correlationId: providedCorrId,
661+
payload: messageContent.toBytes()
662+
});
663+
664+
Message? message = check queue->get(matchOptions = { messageId: providedMsgId, correlationId: providedCorrId });
665+
test:assertTrue(message is (), "Retrieved a message for an invalid message-id and a correct correlation identifier");
666+
667+
check queue->close();
668+
check queueManager.disconnect();
669+
}
670+
671+
@test:Config {
672+
groups: ["ibmmqQueue", "matchOptions"],
673+
dependsOn: [produceConsumeWithInvalidMsgIdAndCorrId]
674+
}
675+
function produceConsumeWithMsgIdAndInvalidCorrId() returns error? {
676+
QueueManager queueManager = check new (
677+
name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN",
678+
userID = "app", password = "password");
679+
Queue queue = check queueManager.accessQueue("DEV.QUEUE.2", MQOO_OUTPUT | MQOO_INPUT_AS_Q_DEF);
680+
681+
byte[] providedMsgId = "msg-id-1".toBytes();
682+
byte[] providedCorrId = "corr-id-1".toBytes();
683+
string messageContent = "This is a sample message with a message-id and a correlation-id.";
684+
check queue->put({
685+
messageId: providedMsgId,
686+
payload: messageContent.toBytes()
687+
});
688+
689+
Message? message = check queue->get(matchOptions = { messageId: providedMsgId, correlationId: providedCorrId });
690+
test:assertTrue(message is (), "Retrieved a message for a correct message-id and an invalid correlation identifier");
691+
692+
check queue->close();
693+
check queueManager.disconnect();
694+
}
695+

ballerina/types.bal

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,20 @@ public type CertKey record {|
8383
# + options - Get message option
8484
# + waitInterval - The maximum time (in seconds) that a `get` call waits for a suitable message to
8585
# arrive. It is used in conjunction with `ibmmq.MQGMO_WAIT`.
86+
# + matchOptions - Message selection criteria
8687
public type GetMessageOptions record {|
8788
int options = MQGMO_NO_WAIT;
8889
int waitInterval = 10;
90+
MatchOptions matchOptions?;
91+
|};
92+
93+
# Represents the selection criteria that determine which message is retrieved.
94+
#
95+
# + messageId - The message identifier of the message which needs to be retrieved
96+
# + correlationId - The Correlation identifier of the message which needs to be retrieved
97+
public type MatchOptions record {|
98+
byte[] messageId?;
99+
byte[] correlationId?;
89100
|};
90101

91102
# Represents an IBM MQ message property.

changelog.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55

66
## [Unreleased]
77

8+
### Added
9+
10+
- [Add Support for Retrieving Messages from IBM MQ by Matching Correlation ID and Message ID](https://github.com/ballerina-platform/ballerina-library/issues/6918)
11+
12+
## [1.0.0] - 2024-07-08
13+
814
### Changed
915

1016
- [Decouple IBM MQ java client jar from the IBM MQ connector](https://github.com/ballerina-platform/ballerina-library/issues/6287)

docs/spec/spec.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,14 +408,27 @@ public type Message record {|
408408

409409
## 4. Client Options
410410

411-
- GetMessageOptions record represents client options which can be used when retrieving messages from an IBM MQ destination.
411+
- `GetMessageOptions` record represents client options which can be used when retrieving messages from an IBM MQ destination.
412412

413413
```ballerina
414414
public type GetMessageOptions record {|
415415
# Get message option
416416
int options = MQGMO_NO_WAIT;
417417
# The maximum time (in seconds) that a `get` call waits for a suitable message to arrive. It is used in conjunction with `ibmmq.MQGMO_WAIT`.
418418
int waitInterval = 10;
419+
# Message selection criteria
420+
MatchOptions matchOptions?;
421+
|};
422+
```
423+
424+
- `MatchOptions` record represents the selection criteria that determine which message is retrieved.
425+
426+
```ballerina
427+
public type MatchOptions record {|
428+
# The message identifier of the message which needs to be retrieved
429+
byte[] messageId?;
430+
# The Correlation identifier of the message which needs to be retrieved
431+
byte[] correlationId?;
419432
|};
420433
```
421434

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
org.gradle.caching=true
22
group=io.ballerina.lib
3-
version=1.0.1-SNAPSHOT
3+
version=1.1.0-SNAPSHOT
44

55
ballerinaLangVersion=2201.9.0
66
checkstylePluginVersion=10.12.1

native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
import com.ibm.mq.MQGetMessageOptions;
2323
import com.ibm.mq.MQMessage;
2424
import com.ibm.mq.MQPropertyDescriptor;
25+
import com.ibm.mq.constants.MQConstants;
2526
import com.ibm.mq.headers.MQHeaderList;
27+
import io.ballerina.lib.ibm.ibmmq.config.GetMessageOptions;
28+
import io.ballerina.lib.ibm.ibmmq.config.MatchOptions;
2629
import io.ballerina.lib.ibm.ibmmq.headers.MQRFH2Header;
2730
import io.ballerina.runtime.api.PredefinedTypes;
2831
import io.ballerina.runtime.api.Runtime;
@@ -66,7 +69,6 @@
6669
import static io.ballerina.lib.ibm.ibmmq.Constants.MESSAGE_PROPERTY;
6770
import static io.ballerina.lib.ibm.ibmmq.Constants.MESSAGE_PROPERTIES;
6871
import static io.ballerina.lib.ibm.ibmmq.Constants.MESSAGE_TYPE_FIELD;
69-
import static io.ballerina.lib.ibm.ibmmq.Constants.OPTIONS;
7072
import static io.ballerina.lib.ibm.ibmmq.Constants.PD_CONTEXT;
7173
import static io.ballerina.lib.ibm.ibmmq.Constants.PD_COPY_OPTIONS;
7274
import static io.ballerina.lib.ibm.ibmmq.Constants.PD_OPTIONS;
@@ -79,7 +81,6 @@
7981
import static io.ballerina.lib.ibm.ibmmq.Constants.PUT_APPLICATION_TYPE_FIELD;
8082
import static io.ballerina.lib.ibm.ibmmq.Constants.REPLY_TO_QM_NAME_FIELD;
8183
import static io.ballerina.lib.ibm.ibmmq.Constants.REPLY_TO_QUEUE_NAME_FIELD;
82-
import static io.ballerina.lib.ibm.ibmmq.Constants.WAIT_INTERVAL;
8384
import static io.ballerina.lib.ibm.ibmmq.ModuleUtils.getModule;
8485
import static io.ballerina.lib.ibm.ibmmq.headers.MQCIHHeader.createMQCIHHeaderFromBHeader;
8586
import static io.ballerina.lib.ibm.ibmmq.headers.MQIIHHeader.createMQIIHHeaderFromBHeader;
@@ -293,13 +294,40 @@ private static BMap populateDescriptorFromMQPropertyDescriptor(MQPropertyDescrip
293294
return descriptor;
294295
}
295296

296-
public static MQGetMessageOptions getGetMessageOptions(BMap<BString, Object> bOptions) {
297-
int waitInterval = bOptions.getIntValue(WAIT_INTERVAL).intValue();
298-
int options = bOptions.getIntValue(OPTIONS).intValue();
299-
MQGetMessageOptions getMessageOptions = new MQGetMessageOptions();
300-
getMessageOptions.waitInterval = waitInterval * 1000;
301-
getMessageOptions.options = options;
302-
return getMessageOptions;
297+
public static MQMessage getMqMessage(MatchOptions matchOptions) {
298+
MQMessage message = new MQMessage();
299+
if (Objects.isNull(matchOptions)) {
300+
return message;
301+
}
302+
303+
if (Objects.nonNull(matchOptions.messageId())) {
304+
message.messageId = matchOptions.messageId();
305+
}
306+
if (Objects.nonNull(matchOptions.correlationId())) {
307+
message.correlationId = matchOptions.correlationId();
308+
}
309+
return message;
310+
}
311+
312+
public static MQGetMessageOptions getMqGetMsgOptions(GetMessageOptions getMsgOptions) {
313+
MQGetMessageOptions mqGetMsgOptions = new MQGetMessageOptions();
314+
mqGetMsgOptions.waitInterval = getMsgOptions.waitInterval();
315+
mqGetMsgOptions.options = getMsgOptions.options();
316+
317+
MatchOptions matchOptions = getMsgOptions.matchOptions();
318+
if (Objects.isNull(matchOptions)) {
319+
return mqGetMsgOptions;
320+
}
321+
322+
int matchOpt = MQConstants.MQMO_NONE;
323+
if (Objects.nonNull(matchOptions.messageId())) {
324+
matchOpt |= MQConstants.MQMO_MATCH_MSG_ID;
325+
}
326+
if (Objects.nonNull(matchOptions.correlationId())) {
327+
matchOpt |= MQConstants.MQMO_MATCH_CORREL_ID;
328+
}
329+
mqGetMsgOptions.matchOptions = matchOpt;
330+
return mqGetMsgOptions;
303331
}
304332

305333
private static Object getBHeaders(Runtime runtime, MQMessage mqMessage) {

native/src/main/java/io/ballerina/lib/ibm.ibmmq/Constants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ public interface Constants {
108108
BString PROPERTY_DESCRIPTOR = StringUtils.fromString("descriptor");
109109
BString WAIT_INTERVAL = StringUtils.fromString("waitInterval");
110110
BString OPTIONS = StringUtils.fromString("options");
111+
BString MATCH_OPTIONS = StringUtils.fromString("matchOptions");
111112
BString FORMAT_FIELD = StringUtils.fromString("format");
112113
BString MESSAGE_ID_FIELD = StringUtils.fromString("messageId");
113114
BString CORRELATION_ID_FIELD = StringUtils.fromString("correlationId");

native/src/main/java/io/ballerina/lib/ibm.ibmmq/Queue.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.ibm.mq.MQMessage;
2424
import com.ibm.mq.MQQueue;
2525
import com.ibm.mq.constants.CMQC;
26+
import io.ballerina.lib.ibm.ibmmq.config.GetMessageOptions;
2627
import io.ballerina.runtime.api.Environment;
2728
import io.ballerina.runtime.api.Future;
2829
import io.ballerina.runtime.api.values.BError;
@@ -60,15 +61,16 @@ public static Object put(Environment environment, BObject queueObject, BMap<BStr
6061
return null;
6162
}
6263

63-
public static Object get(Environment environment, BObject queueObject, BMap<BString, Object> options) {
64+
public static Object get(Environment environment, BObject queueObject, BMap<BString, Object> bGetMsgOptions) {
6465
MQQueue queue = (MQQueue) queueObject.getNativeData(Constants.NATIVE_QUEUE);
65-
MQGetMessageOptions getMessageOptions = CommonUtils.getGetMessageOptions(options);
66+
GetMessageOptions getMsgOptions = new GetMessageOptions(bGetMsgOptions);
67+
MQMessage mqMessage = CommonUtils.getMqMessage(getMsgOptions.matchOptions());
68+
MQGetMessageOptions mqGetMsgOptions = CommonUtils.getMqGetMsgOptions(getMsgOptions);
6669
Future future = environment.markAsync();
6770
QUEUE_EXECUTOR_SERVICE.execute(() -> {
6871
try {
69-
MQMessage message = new MQMessage();
70-
queue.get(message, getMessageOptions);
71-
future.complete(CommonUtils.getBMessageFromMQMessage(environment.getRuntime(), message));
72+
queue.get(mqMessage, mqGetMsgOptions);
73+
future.complete(CommonUtils.getBMessageFromMQMessage(environment.getRuntime(), mqMessage));
7274
} catch (MQException e) {
7375
if (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE) {
7476
future.complete(null);

0 commit comments

Comments
 (0)