@@ -13,13 +13,103 @@ public abstract class AbstractClient : MessageDispatcher
13
13
protected static readonly int DEFAULT_PORT = 8849 ;
14
14
protected static readonly string DEFAULT_HOST = "localhost" ;
15
15
protected static readonly string DEFAULT_ACTION_NAME = "csharpStreamingApi" ;
16
- protected string localIP ;
17
16
protected int listeningPort ;
18
17
protected QueueManager queueManager = new QueueManager ( ) ;
19
18
protected Dictionary < string , List < IMessage > > messageCache = new Dictionary < string , List < IMessage > > ( ) ;
20
- protected Dictionary < string , string > tableName2Topic = new Dictionary < string , string > ( ) ;
19
+ protected Dictionary < string , string > tableNameToTopic = new Dictionary < string , string > ( ) ;
21
20
protected Dictionary < string , bool > hostEndian = new Dictionary < string , bool > ( ) ;
22
21
protected Thread pThread ;
22
+ protected Dictionary < string , Site > topicToSite = new Dictionary < string , Site > ( ) ;
23
+
24
+ protected class Site
25
+ {
26
+ public string host ;
27
+ public int port ;
28
+ public string tableName ;
29
+ public string actionName ;
30
+ public MessageHandler handler ;
31
+ public long msgId ;
32
+ public bool reconnect ;
33
+ public IVector filter = null ;
34
+ public bool closed = false ;
35
+
36
+ public Site ( string host , int port , string tableName , string actionName , MessageHandler handler , long msgId , bool reconnect , IVector filter )
37
+ {
38
+ this . host = host ;
39
+ this . port = port ;
40
+ this . tableName = tableName ;
41
+ this . actionName = actionName ;
42
+ this . handler = handler ;
43
+ this . msgId = msgId ;
44
+ this . reconnect = reconnect ;
45
+ this . filter = filter ;
46
+ }
47
+ }
48
+
49
+ abstract protected void doReconnect ( Site site ) ;
50
+
51
+ public void setMsgId ( string topic , long msgId )
52
+ {
53
+ lock ( topicToSite )
54
+ {
55
+ Site site = topicToSite [ topic ] ;
56
+ if ( site != null )
57
+ site . msgId = msgId ;
58
+ }
59
+ }
60
+
61
+ public void tryReconnect ( string topic )
62
+ {
63
+ Console . WriteLine ( "Trigger reconnect" ) ;
64
+ queueManager . removeQueue ( topic ) ;
65
+ Site site = null ;
66
+ lock ( topicToSite )
67
+ {
68
+ site = topicToSite [ topic ] ;
69
+ }
70
+ if ( site == null || ! site . reconnect )
71
+ return ;
72
+ tableNameToTopic . Remove ( site . host + ":" + site . port + ":" + site . tableName ) ;
73
+ topicToSite . Remove ( topic ) ;
74
+ activeCloseConnection ( site ) ;
75
+ doReconnect ( site ) ;
76
+ }
77
+
78
+ private void activeCloseConnection ( Site site )
79
+ {
80
+ while ( true )
81
+ {
82
+ try
83
+ {
84
+ DBConnection conn = new DBConnection ( ) ;
85
+ conn . connect ( site . host , site . port ) ;
86
+ try
87
+ {
88
+ string localIP = conn . LocalAddress ;
89
+ List < IEntity > @params = new List < IEntity >
90
+ {
91
+ new BasicString ( localIP ) ,
92
+ new BasicInt ( listeningPort )
93
+ } ;
94
+ conn . run ( "activeClosePublishConnection" , @params ) ;
95
+ }
96
+ catch ( Exception ioex )
97
+ {
98
+ throw ioex ;
99
+ }
100
+ finally
101
+ {
102
+ conn . close ( ) ;
103
+ }
104
+ return ;
105
+ }
106
+ catch ( Exception )
107
+ {
108
+ Console . WriteLine ( "Unable to actively close the publish connection from site " + site . host + ":" + site . port ) ;
109
+ }
110
+ Thread . Sleep ( 1000 ) ;
111
+ }
112
+ }
23
113
24
114
public AbstractClient ( ) : this ( DEFAULT_PORT ) { }
25
115
@@ -90,61 +180,121 @@ public bool isRemoteLittleEndian(string host)
90
180
return false ;
91
181
}
92
182
93
- protected BlockingCollection < List < IMessage > > subscribeInternal ( string host , int port , string tableName , string actionName , long offset )
183
+ public bool isClosed ( string topic )
184
+ {
185
+ lock ( topicToSite )
186
+ {
187
+ Site site = topicToSite [ topic ] ;
188
+ if ( site != null )
189
+ return site . closed ;
190
+ else
191
+ return true ;
192
+ }
193
+ }
194
+
195
+ protected BlockingCollection < List < IMessage > > subscribeInternal ( string host , int port , string tableName , string actionName , MessageHandler handler , long offset , bool reconnect , IVector filter )
94
196
{
95
- IEntity re ;
96
197
string topic = "" ;
198
+ IEntity re ;
97
199
98
200
DBConnection dbConn = new DBConnection ( ) ;
99
201
dbConn . connect ( host , port ) ;
100
- localIP = dbConn . LocalAddress ;
202
+ try
203
+ {
204
+ string localIP = dbConn . LocalAddress ;
101
205
102
- if ( ! hostEndian . ContainsKey ( host ) )
103
- hostEndian . Add ( host , dbConn . RemoteLittleEndian ) ;
206
+ if ( ! hostEndian . ContainsKey ( host ) )
207
+ hostEndian . Add ( host , dbConn . RemoteLittleEndian ) ;
104
208
105
- List < IEntity > @params = new List < IEntity >
106
- {
107
- new BasicString ( tableName ) ,
108
- new BasicString ( actionName )
109
- } ;
110
- re = dbConn . run ( "getSubscriptionTopic" , @params ) ;
111
- topic = ( ( BasicAnyVector ) re ) . getEntity ( 0 ) . getString ( ) ;
112
- BlockingCollection < List < IMessage > > queue = queueManager . addQueue ( topic ) ;
113
- @params . Clear ( ) ;
209
+ List < IEntity > @params = new List < IEntity >
210
+ {
211
+ new BasicString ( tableName ) ,
212
+ new BasicString ( actionName )
213
+ } ;
214
+ re = dbConn . run ( "getSubscriptionTopic" , @params ) ;
215
+ topic = ( ( BasicAnyVector ) re ) . getEntity ( 0 ) . getString ( ) ;
216
+ @params . Clear ( ) ;
114
217
115
- tableName2Topic . Add ( host + ":" + port + ":" + tableName , topic ) ;
218
+ lock ( tableNameToTopic )
219
+ {
220
+ tableNameToTopic . Add ( host + ":" + port + ":" + tableName , topic ) ;
221
+ }
222
+ lock ( topicToSite )
223
+ {
224
+ topicToSite . Add ( topic , new Site ( host , port , tableName , actionName , handler , offset - 1 , reconnect , filter ) ) ;
225
+ }
116
226
117
- @params . Add ( new BasicString ( localIP ) ) ;
118
- @params . Add ( new BasicInt ( listeningPort ) ) ;
119
- @params . Add ( new BasicString ( tableName ) ) ;
120
- @params . Add ( new BasicString ( actionName ) ) ;
121
- if ( offset != - 1 )
227
+ @params . Add ( new BasicString ( localIP ) ) ;
228
+ @params . Add ( new BasicInt ( listeningPort ) ) ;
229
+ @params . Add ( new BasicString ( tableName ) ) ;
230
+ @params . Add ( new BasicString ( actionName ) ) ;
122
231
@params . Add ( new BasicLong ( offset ) ) ;
123
- dbConn . run ( "publishTable" , @params ) ;
124
-
125
- dbConn . close ( ) ;
232
+ if ( filter != null )
233
+ @params . Add ( filter ) ;
234
+ re = dbConn . run ( "publishTable" , @params ) ;
235
+ }
236
+ catch ( Exception ex )
237
+ {
238
+ throw ex ;
239
+ }
240
+ finally
241
+ {
242
+ dbConn . close ( ) ;
243
+ }
244
+
245
+ BlockingCollection < List < IMessage > > queue = queueManager . addQueue ( topic ) ;
126
246
return queue ;
127
247
}
128
248
249
+ protected BlockingCollection < List < IMessage > > subscribeInternal ( string host , int port , string tableName , string actionName , long offset , bool reconnect )
250
+ {
251
+ return subscribeInternal ( host , port , tableName , actionName , null , offset , reconnect , null ) ;
252
+ }
253
+
129
254
protected BlockingCollection < List < IMessage > > subscribeInternal ( string host , int port , string tableName , long offset )
130
255
{
131
- return subscribeInternal ( host , port , tableName , DEFAULT_ACTION_NAME , offset ) ;
256
+ return subscribeInternal ( host , port , tableName , DEFAULT_ACTION_NAME , offset , false ) ;
132
257
}
133
258
134
259
protected void unsubscribeInternal ( string host , int port , string tableName , string actionName )
135
260
{
136
261
DBConnection dbConn = new DBConnection ( ) ;
137
262
dbConn . connect ( host , port ) ;
138
- localIP = dbConn . LocalAddress ;
139
- List < IEntity > @params = new List < IEntity >
140
- {
141
- new BasicString ( localIP ) ,
142
- new BasicInt ( port ) ,
143
- new BasicString ( tableName ) ,
144
- new BasicString ( actionName )
145
- } ;
146
- dbConn . run ( "stopPublishTable" , @params ) ;
147
- dbConn . close ( ) ;
263
+ try
264
+ {
265
+ string localIP = dbConn . LocalAddress ;
266
+ List < IEntity > @params = new List < IEntity >
267
+ {
268
+ new BasicString ( localIP ) ,
269
+ new BasicInt ( listeningPort ) ,
270
+ new BasicString ( tableName ) ,
271
+ new BasicString ( actionName )
272
+ } ;
273
+ dbConn . run ( "stopPublishTable" , @params ) ;
274
+ string topic = null ;
275
+ string fullTableName = host + ":" + port + ":" + tableName ;
276
+ lock ( tableNameToTopic )
277
+ {
278
+ topic = tableNameToTopic [ fullTableName ] ;
279
+ }
280
+ lock ( topicToSite )
281
+ {
282
+ Site site = topicToSite [ topic ] ;
283
+ if ( site != null )
284
+ site . closed = true ;
285
+ }
286
+ Console . WriteLine ( "Successfully unsubscribed table " + fullTableName ) ;
287
+ }
288
+ catch ( Exception ex )
289
+ {
290
+ Console . WriteLine ( ex ) ;
291
+ throw ex ;
292
+ }
293
+ finally
294
+ {
295
+ dbConn . close ( ) ;
296
+ }
297
+ return ;
148
298
}
149
299
150
300
protected void unsubscribeInternal ( string host , int port , string tableName )
0 commit comments