Skip to content

Commit d016de5

Browse files
committed
Connection auto-recovery
1 parent f2a2c2b commit d016de5

File tree

3 files changed

+99
-18
lines changed

3 files changed

+99
-18
lines changed

RabbitMQ/Common.cls

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,18 @@ Property Encoding As %String;
4545
/// See property AdditionalPaths in that class.
4646
Property ClassPath As %String(MAXLEN = 32000);
4747

48+
/// How many times have we tried reconnecting
49+
/// empty - do not retry
50+
/// 0 - retry ad infinitum
51+
/// n - retry n times
52+
Property RetryCount As %Integer [ InitialExpression = 5 ];
53+
54+
/// How frequently to retry access to the output system.
55+
/// Pause in seconds between retry attempts.
56+
Property RetryInterval As %Numeric(MINVAL = 0) [ InitialExpression = 5 ];
57+
4858
/// These are the production settings for this object
49-
Parameter SETTINGS = "Host:Basic,Port:Basic,VirtualHost:Basic,Queue:Basic,Credentials:Basic:credentialsSelector,JGHost:Java Gateway,JGPort:Java Gateway,JGService:Java Gateway:selector?context={Ens.ContextSearch/ProductionItems?targets=0&productionName=@productionId},ClassPath:Basic,Encoding:Basic";
59+
Parameter SETTINGS = "Host:Basic,Port:Basic,VirtualHost:Basic,Queue:Basic,Credentials:Basic:credentialsSelector,JGHost:Java Gateway,JGPort:Java Gateway,JGService:Java Gateway:selector?context={Ens.ContextSearch/ProductionItems?targets=0&productionName=@productionId},ClassPath:Basic,Encoding:Basic,RetryCount:Alerting,RetryInterval:Alerting";
5060

5161
/// Connect to running JGW
5262
Method Connect() As %Status
@@ -87,14 +97,44 @@ Method ConnectToRabbitMQ() As %Status
8797
Set pass = "guest"
8898
}
8999

100+
Set port = $select(..Port="":-1, 1:..Port)
101+
90102
Try {
91-
Set ..API = ##class(isc.rabbitmq.API).%New(..JGW, ..Host, ..Port, user, pass, ..VirtualHost, ..Queue, $$$YES, ..Exchange)
103+
Set ..API = ##class(isc.rabbitmq.API).%New(..JGW, ..Host, port, user, pass, ..VirtualHost, ..Queue, $$$YES, ..Exchange)
92104
} Catch ex {
93105
Set sc = $$$ADDSC(ex.AsStatus(),$g(%objlasterror))
94106
}
95107

96108
Quit sc
97109
}
98110

111+
Method IsOpen() As %Status
112+
{
113+
#Dim sc As %Status = $$$OK
114+
Set retryCount = 1
115+
Try {
116+
While '..API.isOpen() {
117+
If ..RetryCount = "" {
118+
Set sc = $$$ERROR($$$GeneralError, "Connection problems. Consider specifying RetryCount and RetryInterval settings")
119+
} ElseIf ((..RetryCount = 0) || (retryCount < ..RetryCount)) {
120+
// wait and retry connecting
121+
Set retryCount = retryCount + 1
122+
Hang ..RetryInterval
123+
124+
// reconnect happens in isOpen method
125+
} Else {
126+
// we're out of reconnect attempts
127+
Set sc = $$$ERROR($$$GeneralError, $$$FormatText("Connection still closed after %1 attempts at reconnecting.", ..RetryCount))
128+
}
129+
Quit:$$$ISERR(sc)
130+
}
131+
} Catch ex {
132+
#Dim ex As %Exception.General
133+
Set sc = ex.AsStatus()
134+
}
135+
136+
Quit sc
137+
}
138+
99139
}
100140

RabbitMQ/InboundAdapter.cls

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,36 @@ Method OnTearDown() As %Status
2828
/// Get Messages from RabbitMQ queue.
2929
Method OnTask() As %Status
3030
{
31-
Set sc = $$$OK
31+
#Dim sc As %Status = $$$OK
3232

3333
Set messageCount = 1
3434

3535
While messageCount > 0 {
36+
Set sc = ..IsOpen()
37+
Quit:$$$ISERR(sc)
38+
3639
// List containing metainformation and possibly body (in the case of string interaction) of the RabbitMQ message
3740
#Dim messageList As %ListOfDataTypes
3841

39-
If ..BodyClass = "" {
40-
Set messageList = ..API.readMessageString()
41-
} Else {
42-
#Dim tempStream As %Library.GlobalBinaryStream
43-
Set messageList = ##class(%ListOfDataTypes).%New()
44-
For i=1:1:15 Do messageList.Insert("")
45-
Set tempStream = ..API.readMessageStream(.messageList)
42+
Set messageList = ##class(%ListOfDataTypes).%New()
43+
For i=1:1:15 Do messageList.Insert("")
44+
45+
Try {
46+
If ..BodyClass = "" {
47+
Set messageList = ..API.readMessageString()
48+
} Else {
49+
#Dim tempStream As %Library.GlobalBinaryStream
50+
Set tempStream = ..API.readMessageStream(.messageList)
51+
}
52+
} Catch ex {
53+
#Dim ex As %Exception.General
54+
If ($ZE["<ZJGTW>") {
55+
Set sc = ..IsOpen()
56+
Set:$$$ISERR(sc) sc = $$$ADDSC(sc, ex.AsStatus())
57+
} Else {
58+
Set sc = ex.AsStatus()
59+
}
60+
Quit:$$$ISERR(sc)
4661
}
4762

4863
Set messageLength = messageList.GetAt(1)

RabbitMQ/OutboundAdapter.cls

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,23 @@ Method SendMessage(message As %Stream.Object) As %Status
3636
Do stream.Write(..EncodeMessageBody(message))
3737
}
3838

39-
Try {
40-
Do ..API.sendMessage(stream)
41-
} Catch ex {
42-
Set sc = ex.AsStatus()
39+
#Dim attempts As %Integer = 0
40+
41+
While attempts<2 {
42+
Set attempts = attempts + 1
43+
Try {
44+
Do ..API.sendMessage(stream)
45+
Return sc
46+
} Catch ex {
47+
#Dim ex As %Exception.General
48+
If ($ZE["<ZJGTW>") {
49+
Set sc = ..IsOpen()
50+
Set:$$$ISERR(sc) sc = $$$ADDSC(sc, ex.AsStatus())
51+
} Else {
52+
Set sc = ex.AsStatus()
53+
}
54+
Return:$$$ISERR(sc) sc // quit if reconnect is unsuccessful or we got unknown exception
55+
}
4356
}
4457
Quit sc
4558
}
@@ -57,11 +70,24 @@ Method SendMessageToQueue(queue As %String, message As %Stream.Object) As %Statu
5770
} Else {
5871
Do stream.Write(..EncodeMessageBody(message))
5972
}
73+
74+
#Dim attempts As %Integer = 0
6075

61-
Try {
62-
Do ..API.sendMessageToQueue(queue, stream)
63-
} Catch ex {
64-
Set sc = ex.AsStatus()
76+
While attempts<2 {
77+
Set attempts = attempts + 1
78+
Try {
79+
Do ..API.sendMessageToQueue(queue, stream)
80+
Return sc
81+
} Catch ex {
82+
#Dim ex As %Exception.General
83+
If ($ZE["<ZJGTW>") {
84+
Set sc = ..IsOpen()
85+
Set:$$$ISERR(sc) sc = $$$ADDSC(sc, ex.AsStatus())
86+
} Else {
87+
Set sc = ex.AsStatus()
88+
}
89+
Return:$$$ISERR(sc) sc // quit if reconnect is unsuccessful or we got unknown exception
90+
}
6591
}
6692
Quit sc
6793
}

0 commit comments

Comments
 (0)