18
18
19
19
public class PollingClient extends AbstractClient {
20
20
TopicPoller topicPoller = null ;
21
- private HashMap <List <String >, List <String >> users = new HashMap <>();
21
+ // private HashMap<List<String>, List<String>> users = new HashMap<>();
22
22
23
23
private static final Logger log = LoggerFactory .getLogger (PollingClient .class );
24
24
@@ -36,19 +36,46 @@ public PollingClient(String subscribeHost, int subscribePort) throws SocketExcep
36
36
37
37
@ Override
38
38
protected boolean doReconnect (Site site ) {
39
- try {
40
- Thread .sleep (1000 );
41
- BlockingQueue <List <IMessage >> queue = subscribeInternal (site .host , site .port , site .tableName , site .actionName , (MessageHandler ) null , site .msgId + 1 , true , site .filter , site .deserializer , site .allowExistTopic , site .userName , site .passWord , site .msgAstable );
42
- log .info ("Successfully reconnected and subscribed " + site .host + ":" + site .port + ":" + site .tableName );
43
- topicPoller .setQueue (queue );
44
- return true ;
45
- } catch (Exception ex ) {
46
- log .error ("Unable to subscribe table. Will try again after 1 seconds." );
47
- ex .printStackTrace ();
48
- return false ;
39
+ if (!AbstractClient .ifUseBackupSite ) {
40
+ // not enable backupSite, use original logic.
41
+ try {
42
+ log .info ("PollingClient doReconnect: " + site .host + ":" + site .port );
43
+ Thread .sleep (1000 );
44
+ BlockingQueue <List <IMessage >> queue = subscribeInternal (site .host , site .port , site .tableName , site .actionName , (MessageHandler ) null , site .msgId + 1 , true , site .filter , site .deserializer , site .allowExistTopic , site .userName , site .passWord , site .msgAstable );
45
+ log .info ("Successfully reconnected and subscribed " + site .host + ":" + site .port + ":" + site .tableName );
46
+ topicPoller .setQueue (queue );
47
+ return true ;
48
+ } catch (Exception ex ) {
49
+ log .error ("Unable to subscribe table. Will try again after 1 seconds." );
50
+ ex .printStackTrace ();
51
+ return false ;
52
+ }
53
+ } else {
54
+ // enable backupSite, try to switch site and subscribe.
55
+ try {
56
+ log .info ("PollingClient doReconnect: " + site .host + ":" + site .port );
57
+ Thread .sleep (1000 );
58
+ subscribe (site .host , site .port , site .tableName , site .actionName , (MessageHandler ) null , site .msgId + 1 , true , site .filter , site .deserializer , site .allowExistTopic , site .userName , site .passWord , site .msgAstable , false );
59
+ String topicStr = site .host + ":" + site .port + "/" + site .tableName + "/" + site .actionName ;
60
+ String curTopic = tableNameToTrueTopic .get (topicStr );
61
+ BlockingQueue <List <IMessage >> queue = queueManager .addQueue (curTopic );
62
+ queueManager .changeQueue (curTopic , lastQueue );
63
+
64
+ log .info ("Successfully reconnected and subscribed " + site .host + ":" + site .port + ":" + site .tableName );
65
+ topicPoller .setQueue (lastQueue );
66
+ return true ;
67
+ } catch (Exception ex ) {
68
+ log .error ("Unable to subscribe table. Will try again after 1 seconds." );
69
+ ex .printStackTrace ();
70
+ return false ;
71
+ }
49
72
}
50
73
}
51
74
75
+ protected void subscribe (String host , int port , String tableName , String actionName , MessageHandler handler , long offset , boolean reconnect , Vector filter , StreamDeserializer deserializer , boolean allowExistTopic , String userName , String password , boolean msgAsTable , boolean createSubInfo ) throws IOException {
76
+ BlockingQueue <List <IMessage >> queue = subscribeInternal (host , port , tableName , actionName , handler , offset , reconnect , filter , deserializer , allowExistTopic , userName , password , false , createSubInfo );
77
+ }
78
+
52
79
public TopicPoller subscribe (String host , int port , String tableName , String actionName , long offset , boolean reconnect , Vector filter , StreamDeserializer deserializer , String userName , String passWord ) throws IOException {
53
80
return subscribe (host , port , tableName , actionName , offset , reconnect , filter , deserializer , userName , passWord , false );
54
81
}
@@ -57,7 +84,29 @@ public TopicPoller subscribe(String host, int port, String tableName, String act
57
84
BlockingQueue <List <IMessage >> queue = subscribeInternal (host , port , tableName , actionName , (MessageHandler ) null , offset , reconnect , filter , deserializer , false , userName , passWord , msgAsTable );
58
85
List <String > tp = Arrays .asList (host , String .valueOf (port ), tableName , actionName );
59
86
List <String > usr = Arrays .asList (userName , passWord );
60
- users .put (tp , usr );
87
+ // users.put(tp, usr);
88
+ topicPoller = new TopicPoller (queue );
89
+ return topicPoller ;
90
+ }
91
+
92
+ public TopicPoller subscribe (String host , int port , String tableName , String actionName , long offset , boolean reconnect , Vector filter , StreamDeserializer deserializer , String userName , String passWord , boolean msgAsTable , List <String > backupSites , int resubTimeout , boolean subOnce ) throws IOException {
93
+ if (resubTimeout < 0 )
94
+ // resubTimeout default: 100ms
95
+ resubTimeout = 100 ;
96
+
97
+ BlockingQueue <List <IMessage >> queue = subscribeInternal (host , port , tableName , actionName , (MessageHandler ) null , offset , reconnect , filter , deserializer , false , userName , passWord , msgAsTable , backupSites , resubTimeout , subOnce );
98
+ List <String > tp = Arrays .asList (host , String .valueOf (port ), tableName , actionName );
99
+ List <String > usr = Arrays .asList (userName , passWord );
100
+ // users.put(tp, usr);
101
+ topicPoller = new TopicPoller (queue );
102
+ return topicPoller ;
103
+ }
104
+
105
+ public TopicPoller subscribe (String host , int port , String tableName , String actionName , long offset , boolean reconnect , Vector filter , StreamDeserializer deserializer , String userName , String passWord , boolean msgAsTable , List <String > backupSites ) throws IOException {
106
+ BlockingQueue <List <IMessage >> queue = subscribeInternal (host , port , tableName , actionName , (MessageHandler ) null , offset , reconnect , filter , deserializer , false , userName , passWord , msgAsTable , backupSites , 100 , false );
107
+ List <String > tp = Arrays .asList (host , String .valueOf (port ), tableName , actionName );
108
+ List <String > usr = Arrays .asList (userName , passWord );
109
+ // users.put(tp, usr);
61
110
topicPoller = new TopicPoller (queue );
62
111
return topicPoller ;
63
112
}
@@ -120,47 +169,63 @@ public void unsubscribe(String host, int port, String tableName) throws IOExcept
120
169
121
170
@ Override
122
171
protected void unsubscribeInternal (String host , int port , String tableName , String actionName ) throws IOException {
123
- DBConnection dbConn = new DBConnection ();
124
- List <String > tp = Arrays .asList (host , String .valueOf (port ), tableName , actionName );
125
- List <String > usr = users .get (tp );
126
- String user = usr .get (0 );
127
- String pwd = usr .get (1 );
128
- if (!user .equals ("" ))
129
- dbConn .connect (host , port , user , pwd );
130
- else
131
- dbConn .connect (host , port );
132
- try {
133
- String localIP = this .listeningHost ;
134
- if (localIP .equals ("" ))
135
- localIP = dbConn .getLocalAddress ().getHostAddress ();
136
- List <Entity > params = new ArrayList <Entity >();
137
- params .add (new BasicString (localIP ));
138
- params .add (new BasicInt (this .listeningPort ));
139
- params .add (new BasicString (tableName ));
140
- params .add (new BasicString (actionName ));
141
-
142
- dbConn .run ("stopPublishTable" , params );
143
- String topic = null ;
144
- String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName ;
145
- synchronized (tableNameToTrueTopic ) {
146
- topic = tableNameToTrueTopic .get (fullTableName );
172
+ synchronized (this ) {
173
+ DBConnection dbConn = new DBConnection ();
174
+ if (!currentSiteIndexMap .isEmpty ()) {
175
+ String topic = tableNameToTrueTopic .get ( host + ":" + port + "/" + tableName + "/" + actionName );
176
+ Integer currentSiteIndex = currentSiteIndexMap .get (topic );
177
+ Site [] sites = trueTopicToSites .get (topic );
178
+ host = sites [currentSiteIndex ].host ;
179
+ port = sites [currentSiteIndex ].port ;
147
180
}
148
- synchronized (trueTopicToSites ) {
181
+
182
+ List <String > tp = Arrays .asList (host , String .valueOf (port ), tableName , actionName );
183
+ List <String > usr = users .get (tp );
184
+ String user = usr .get (0 );
185
+ String pwd = usr .get (1 );
186
+ if (!user .equals ("" ))
187
+ dbConn .connect (host , port , user , pwd );
188
+ else
189
+ dbConn .connect (host , port );
190
+ try {
191
+ String localIP = this .listeningHost ;
192
+ if (localIP .equals ("" ))
193
+ localIP = dbConn .getLocalAddress ().getHostAddress ();
194
+ List <Entity > params = new ArrayList <Entity >();
195
+ params .add (new BasicString (localIP ));
196
+ params .add (new BasicInt (this .listeningPort ));
197
+ params .add (new BasicString (tableName ));
198
+ params .add (new BasicString (actionName ));
199
+
200
+ dbConn .run ("stopPublishTable" , params );
201
+ String topic = null ;
202
+ String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName ;
203
+ //synchronized (tableNameToTrueTopic) {
204
+ topic = tableNameToTrueTopic .get (fullTableName );
205
+ // }
206
+ // synchronized (trueTopicToSites) {
149
207
Site [] sites = trueTopicToSites .get (topic );
150
208
if (sites == null || sites .length == 0 )
151
209
;
152
210
for (int i = 0 ; i < sites .length ; i ++)
153
211
sites [i ].closed = true ;
154
- }
155
- synchronized (queueManager ) {
212
+ // }
213
+ // synchronized (queueManager) {
156
214
queueManager .removeQueue (topic );
215
+ // }
216
+
217
+ // init backupSites related params.
218
+ if (AbstractClient .ifUseBackupSite ) {
219
+ AbstractClient .ifUseBackupSite = false ;
220
+ AbstractClient .subOnce = false ;
221
+ AbstractClient .resubTimeout = 100 ;
222
+ }
223
+ log .info ("Successfully unsubscribed table " + fullTableName );
224
+ } catch (Exception ex ) {
225
+ throw ex ;
226
+ } finally {
227
+ dbConn .close ();
157
228
}
158
- log .info ("Successfully unsubscribed table " + fullTableName );
159
- } catch (Exception ex ) {
160
- throw ex ;
161
- } finally {
162
- dbConn .close ();
163
229
}
164
- return ;
165
230
}
166
231
}
0 commit comments