Skip to content

Commit b8499b1

Browse files
committed
Move from RabbitMQ to isc.rabbitmq package
1 parent d016de5 commit b8499b1

File tree

11 files changed

+136
-183
lines changed

11 files changed

+136
-183
lines changed

RabbitMQ/Operation.cls

Lines changed: 0 additions & 17 deletions
This file was deleted.

RabbitMQ/Service.cls

Lines changed: 0 additions & 14 deletions
This file was deleted.

RabbitMQ/Common.cls renamed to isc/rabbitmq/Common.cls

Lines changed: 7 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,16 @@
1-
Class RabbitMQ.Common Extends %RegisteredObject [ Abstract ]
1+
Class isc.rabbitmq.Common Extends %RegisteredObject [ Abstract ]
22
{
33

4-
/// This is the ID name of the set of credentials values (Username, Password) to be used to access the HTTP server
5-
/// Property Credentials As %String [ InitialExpression = "None" ];
6-
Property Host As %String [ InitialExpression = "localhost" ];
4+
/// Either host or Connection URI: amqp:\\
5+
/// If Connection URI is passed, then Port, Credentials and Virtual host properties are ignored.
6+
Property Host As %VarString [ InitialExpression = "localhost" ];
77

88
Property Port As %Integer [ InitialExpression = -1 ];
99

1010
Property VirtualHost As %String [ InitialExpression = "/" ];
1111

12-
/// Outbound adapter only (ignored for inbound adapter)
13-
/// If Exchange is not set, then Queue name.
14-
/// If Exchange is set, then Routing key.
1512
Property Queue As %String;
1613

17-
/// Exchange name. Optional, empty by default.
18-
Property Exchange As %String;
19-
2014
/// Config Name of the Java Gateway service controlling the Java Gateway server this item will use.
2115
/// Alternatively use JGHost and JGPort Settings, to specify Java gateway outside of Ensemble scope.
2216
Property JGService As %String;
@@ -45,18 +39,8 @@ Property Encoding As %String;
4539
/// See property AdditionalPaths in that class.
4640
Property ClassPath As %String(MAXLEN = 32000);
4741

48-
/// How many times have we tried reconnecting
49-
/// empty - do not retry
50-
/// 0 - retry ad infinitum
51-
/// n - retry n times
52-
Property RetryCount As %Integer [ InitialExpression = 5 ];
53-
54-
/// How frequently to retry access to the output system.
55-
/// Pause in seconds between retry attempts.
56-
Property RetryInterval As %Numeric(MINVAL = 0) [ InitialExpression = 5 ];
57-
5842
/// These are the production settings for this object
59-
Parameter SETTINGS = "Host:Basic,Port:Basic,VirtualHost:Basic,Queue:Basic,Credentials:Basic:credentialsSelector,JGHost:Java Gateway,JGPort:Java Gateway,JGService:Java Gateway:selector?context={Ens.ContextSearch/ProductionItems?targets=0&productionName=@productionId},ClassPath:Basic,Encoding:Basic,RetryCount:Alerting,RetryInterval:Alerting";
43+
Parameter SETTINGS = "Host:Basic,Port:Basic,VirtualHost:Basic,Queue:Basic,Credentials:Basic:credentialsSelector,JGHost:Java Gateway,JGPort:Java Gateway,JGService:Java Gateway:selector?context={Ens.ContextSearch/ProductionItems?targets=0&productionName=@productionId},ClassPath:Basic,Encoding:Basic";
6044

6145
/// Connect to running JGW
6246
Method Connect() As %Status
@@ -97,44 +81,14 @@ Method ConnectToRabbitMQ() As %Status
9781
Set pass = "guest"
9882
}
9983

100-
Set port = $select(..Port="":-1, 1:..Port)
101-
10284
Try {
103-
Set ..API = ##class(isc.rabbitmq.API).%New(..JGW, ..Host, port, user, pass, ..VirtualHost, ..Queue, $$$YES, ..Exchange)
104-
} Catch ex {
85+
Set ..API = ##class(isc.rabbitmq.API).%New(..JGW, ..Host, ..Port, user, pass, ..VirtualHost, ..Queue, $$$YES)
86+
} Catch ex {
10587
Set sc = $$$ADDSC(ex.AsStatus(),$g(%objlasterror))
10688
}
10789

10890
Quit sc
10991
}
11092

111-
Method IsOpen() As %Status
112-
{
113-
#Dim sc As %Status = $$$OK
114-
Set retryCount = 1
115-
Try {
116-
While '..API.isOpen() {
117-
If ..RetryCount = "" {
118-
Set sc = $$$ERROR($$$GeneralError, "Connection problems. Consider specifying RetryCount and RetryInterval settings")
119-
} ElseIf ((..RetryCount = 0) || (retryCount < ..RetryCount)) {
120-
// wait and retry connecting
121-
Set retryCount = retryCount + 1
122-
Hang ..RetryInterval
123-
124-
// reconnect happens in isOpen method
125-
} Else {
126-
// we're out of reconnect attempts
127-
Set sc = $$$ERROR($$$GeneralError, $$$FormatText("Connection still closed after %1 attempts at reconnecting.", ..RetryCount))
128-
}
129-
Quit:$$$ISERR(sc)
130-
}
131-
} Catch ex {
132-
#Dim ex As %Exception.General
133-
Set sc = ex.AsStatus()
134-
}
135-
136-
Quit sc
137-
}
138-
13993
}
14094

RabbitMQ/InboundAdapter.cls renamed to isc/rabbitmq/InboundAdapter.cls

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
1-
Class RabbitMQ.InboundAdapter Extends (Ens.InboundAdapter, RabbitMQ.Common)
1+
Class isc.rabbitmq.InboundAdapter Extends (Ens.InboundAdapter, isc.rabbitmq.Common)
22
{
33

44
/// Stream class to store message body. Leave empty to use strings.
55
Property BodyClass As %Dictionary.CacheClassname;
66

77
Parameter SETTINGS = "BodyClass:Basic";
88

9+
ClassMethod BodyClassIsValid(val) As %Status
10+
{
11+
quit:val="" $$$OK
12+
quit:val="%Stream.GlobalCharacter" $$$OK
13+
quit $$$ERROR($$$GeneralError, "No")
14+
}
15+
916
/// Establish gateway connection and init java API.
1017
Method OnInit() As %Status
1118
{
@@ -28,43 +35,28 @@ Method OnTearDown() As %Status
2835
/// Get Messages from RabbitMQ queue.
2936
Method OnTask() As %Status
3037
{
31-
#Dim sc As %Status = $$$OK
38+
Set sc = $$$OK
3239

3340
Set messageCount = 1
3441

3542
While messageCount > 0 {
36-
Set sc = ..IsOpen()
37-
Quit:$$$ISERR(sc)
38-
3943
// List containing metainformation and possibly body (in the case of string interaction) of the RabbitMQ message
4044
#Dim messageList As %ListOfDataTypes
4145

42-
Set messageList = ##class(%ListOfDataTypes).%New()
43-
For i=1:1:15 Do messageList.Insert("")
44-
45-
Try {
46-
If ..BodyClass = "" {
47-
Set messageList = ..API.readMessageString()
48-
} Else {
49-
#Dim tempStream As %Library.GlobalBinaryStream
50-
Set tempStream = ..API.readMessageStream(.messageList)
51-
}
52-
} Catch ex {
53-
#Dim ex As %Exception.General
54-
If ($ZE["<ZJGTW>") {
55-
Set sc = ..IsOpen()
56-
Set:$$$ISERR(sc) sc = $$$ADDSC(sc, ex.AsStatus())
57-
} Else {
58-
Set sc = ex.AsStatus()
59-
}
60-
Quit:$$$ISERR(sc)
46+
If ..BodyClass = "" {
47+
Set messageList = ..API.readMessageString()
48+
} Else {
49+
#Dim tempStream As %Library.GlobalBinaryStream
50+
Set messageList = ##class(%ListOfDataTypes).%New()
51+
For i=1:1:15 Do messageList.Insert("")
52+
Set tempStream = ..API.readMessageStream(.messageList)
6153
}
6254

6355
Set messageLength = messageList.GetAt(1)
6456
Set messageCount = messageList.GetAt(2)
6557

6658
If messageLength>0 {
67-
#Dim message As RabbitMQ.Message
59+
#Dim message As isc.rabbitmq.Message
6860
Set message = ..ListToMessage(messageList)
6961
If ..BodyClass = "" {
7062
Set message.BodyString = ..DecodeMessageBody(messageList.GetAt(16))
@@ -86,9 +78,9 @@ Method OnTask() As %Status
8678
}
8779

8880
/// Convert list containing metainformation into RabbitMQ message
89-
ClassMethod ListToMessage(list As %ListOfDataTypes) As RabbitMQ.Message
81+
ClassMethod ListToMessage(list As %ListOfDataTypes) As isc.rabbitmq.Message
9082
{
91-
Set message = ##class(RabbitMQ.Message).%New()
83+
Set message = ##class(isc.rabbitmq.Message).%New()
9284

9385
Set message.ContentType = list.GetAt(3)
9486
Set message.ContentEncoding = list.GetAt(4)

RabbitMQ/Message.cls renamed to isc/rabbitmq/Message.cls

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Class RabbitMQ.Message Extends %Persistent
1+
Class isc.rabbitmq.Message Extends %Persistent
22
{
33

44
Property ContentType As %String;
@@ -96,11 +96,11 @@ Storage Default
9696
<Value>BodyStream</Value>
9797
</Value>
9898
</Data>
99-
<DataLocation>^RabbitMQ.MessageD</DataLocation>
99+
<DataLocation>^isc.rabbitmq.MessageD</DataLocation>
100100
<DefaultData>MessageDefaultData</DefaultData>
101-
<IdLocation>^RabbitMQ.MessageD</IdLocation>
102-
<IndexLocation>^RabbitMQ.MessageI</IndexLocation>
103-
<StreamLocation>^RabbitMQ.MessageS</StreamLocation>
101+
<IdLocation>^isc.rabbitmq.MessageD</IdLocation>
102+
<IndexLocation>^isc.rabbitmq.MessageI</IndexLocation>
103+
<StreamLocation>^isc.rabbitmq.MessageS</StreamLocation>
104104
<Type>%Library.CacheStorage</Type>
105105
}
106106

isc/rabbitmq/Operation.cls

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
Class isc.rabbitmq.Operation Extends Ens.BusinessOperation
2+
{
3+
4+
Parameter ADAPTER = "isc.rabbitmq.OutboundAdapter";
5+
6+
Property Adapter As isc.rabbitmq.OutboundAdapter;
7+
8+
Method OnMessage(request As Ens.StringContainer, response As Ens.Response) As %Status
9+
{
10+
#Dim sc As %Status = $$$OK
11+
Set response = ##class(Ens.Response).%New()
12+
Set sc = ..Adapter.SendMessage(request.StringValue)
13+
Quit sc
14+
}
15+
16+
}
17+

RabbitMQ/OutboundAdapter.cls renamed to isc/rabbitmq/OutboundAdapter.cls

Lines changed: 15 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1-
Class RabbitMQ.OutboundAdapter Extends (Ens.OutboundAdapter, RabbitMQ.Common)
1+
Class isc.rabbitmq.OutboundAdapter Extends (Ens.OutboundAdapter, isc.rabbitmq.Common)
22
{
33

4-
Parameter SETTINGS = "Exchange:Basic";
5-
64
/// Establish gateway connection and init java API.
75
Method OnInit() As %Status
86
{
@@ -29,30 +27,21 @@ Method SendMessage(message As %Stream.Object) As %Status
2927
Set stream = ##class(%Library.GlobalBinaryStream).%New()
3028

3129
If $isObject(message) {
32-
While 'message.AtEnd {
33-
Do stream.Write(..EncodeMessageBody(message.Read($$$MaxStringLength)))
30+
If message.%IsA("%Library.GlobalBinaryStream") {
31+
Set stream = message
32+
} Else {
33+
While 'message.AtEnd {
34+
Do stream.Write(..EncodeMessageBody(message.Read($$$MaxStringLength)))
35+
}
3436
}
3537
} Else {
3638
Do stream.Write(..EncodeMessageBody(message))
3739
}
3840

39-
#Dim attempts As %Integer = 0
40-
41-
While attempts<2 {
42-
Set attempts = attempts + 1
43-
Try {
44-
Do ..API.sendMessage(stream)
45-
Return sc
46-
} Catch ex {
47-
#Dim ex As %Exception.General
48-
If ($ZE["<ZJGTW>") {
49-
Set sc = ..IsOpen()
50-
Set:$$$ISERR(sc) sc = $$$ADDSC(sc, ex.AsStatus())
51-
} Else {
52-
Set sc = ex.AsStatus()
53-
}
54-
Return:$$$ISERR(sc) sc // quit if reconnect is unsuccessful or we got unknown exception
55-
}
41+
Try {
42+
Do ..API.sendMessage(stream)
43+
} Catch ex {
44+
Set sc = ex.AsStatus()
5645
}
5746
Quit sc
5847
}
@@ -70,24 +59,11 @@ Method SendMessageToQueue(queue As %String, message As %Stream.Object) As %Statu
7059
} Else {
7160
Do stream.Write(..EncodeMessageBody(message))
7261
}
73-
74-
#Dim attempts As %Integer = 0
7562

76-
While attempts<2 {
77-
Set attempts = attempts + 1
78-
Try {
79-
Do ..API.sendMessageToQueue(queue, stream)
80-
Return sc
81-
} Catch ex {
82-
#Dim ex As %Exception.General
83-
If ($ZE["<ZJGTW>") {
84-
Set sc = ..IsOpen()
85-
Set:$$$ISERR(sc) sc = $$$ADDSC(sc, ex.AsStatus())
86-
} Else {
87-
Set sc = ex.AsStatus()
88-
}
89-
Return:$$$ISERR(sc) sc // quit if reconnect is unsuccessful or we got unknown exception
90-
}
63+
Try {
64+
Do ..API.sendMessageToQueue(queue, stream)
65+
} Catch ex {
66+
Set sc = ex.AsStatus()
9167
}
9268
Quit sc
9369
}

RabbitMQ/Production.cls renamed to isc/rabbitmq/Production.cls

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
Class RabbitMQ.Production Extends Ens.Production
1+
Class isc.rabbitmq.Production Extends Ens.Production
22
{
33

44
XData ProductionDefinition
55
{
6-
<Production Name="RabbitMQ.Production" TestingEnabled="true" LogGeneralTraceEvents="false">
6+
<Production Name="isc.rabbitmq.Production" TestingEnabled="true" LogGeneralTraceEvents="false">
77
<Description></Description>
88
<ActorPoolSize>2</ActorPoolSize>
9-
<Item Name="RabbitMQ.Service" Category="" ClassName="RabbitMQ.Service" PoolSize="1" Enabled="true" Foreground="false" Comment="" LogTraceEvents="true" Schedule="">
9+
<Item Name="isc.rabbitmq.Service" Category="" ClassName="RabbitMQ.Service" PoolSize="1" Enabled="true" Foreground="false" Comment="" LogTraceEvents="true" Schedule="">
1010
<Setting Target="Adapter" Name="JGService">EnsLib.JavaGateway.Service</Setting>
11-
<Setting Target="Adapter" Name="ClassPath">C:\InterSystems\RabbitMQjava.jar</Setting>
11+
<Setting Target="Adapter" Name="ClassPath">C:\InterSystems\RabbitMQjava.jar;C:\InterSystems\Java\amqp-client-5.7.2.jar</Setting>
1212
<Setting Target="Adapter" Name="Queue">hello</Setting>
1313
<Setting Target="Adapter" Name="BodyClass"></Setting>
1414
<Setting Target="Adapter" Name="Encoding">UTF8</Setting>
@@ -18,12 +18,12 @@ XData ProductionDefinition
1818
<Setting Target="Host" Name="Port">55559</Setting>
1919
<Setting Target="Host" Name="JDKVersion">JDK18</Setting>
2020
<Setting Target="Host" Name="JavaHome">C:\Progra~1\Java\jdk1.8.0_121\</Setting>
21-
<Setting Target="Host" Name="ClassPath">C:\InterSystems\RabbitMQjava.jar</Setting>
21+
<Setting Target="Host" Name="ClassPath">C:\InterSystems\RabbitMQjava.jar;C:\InterSystems\Java\amqp-client-5.7.2.jar</Setting>
2222
</Item>
23-
<Item Name="RabbitMQ.Operation" Category="" ClassName="RabbitMQ.Operation" PoolSize="1" Enabled="true" Foreground="false" Comment="" LogTraceEvents="false" Schedule="">
23+
<Item Name="isc.rabbitmq.Operation" Category="" ClassName="RabbitMQ.Operation" PoolSize="1" Enabled="true" Foreground="false" Comment="" LogTraceEvents="false" Schedule="">
2424
<Setting Target="Adapter" Name="JGService">EnsLib.JavaGateway.Service</Setting>
2525
<Setting Target="Adapter" Name="Queue">hello</Setting>
26-
<Setting Target="Adapter" Name="ClassPath">C:\InterSystems\RabbitMQjava.jar</Setting>
26+
<Setting Target="Adapter" Name="ClassPath">C:\InterSystems\RabbitMQjava.jar;C:\InterSystems\Java\amqp-client-5.7.2.jar</Setting>
2727
<Setting Target="Adapter" Name="Encoding">UTF8</Setting>
2828
</Item>
2929
</Production>

isc/rabbitmq/Service.cls

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
Class isc.rabbitmq.Service Extends Ens.BusinessService
2+
{
3+
4+
Parameter ADAPTER = "isc.rabbitmq.InboundAdapter";
5+
6+
Property Adapter As isc.rabbitmq.InboundAdapter;
7+
8+
Method OnProcessInput(message As isc.rabbitmq.Message) As %Status
9+
{
10+
quit message.%Save()
11+
}
12+
13+
}
14+

0 commit comments

Comments
 (0)