Skip to content

Commit cdee4b0

Browse files
author
chengyitian
committed
AJ-687: fix issue about backupSites for PollingClient;
1 parent 218c038 commit cdee4b0

File tree

1 file changed

+41
-15
lines changed

1 file changed

+41
-15
lines changed

src/com/xxdb/streaming/client/PollingClient.java

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
public class PollingClient extends AbstractClient {
2020
TopicPoller topicPoller = null;
21-
private HashMap<List<String>, List<String>> users = new HashMap<>();
21+
// private HashMap<List<String>, List<String>> users = new HashMap<>();
2222

2323
private static final Logger log = LoggerFactory.getLogger(PollingClient.class);
2424

@@ -36,20 +36,46 @@ public PollingClient(String subscribeHost, int subscribePort) throws SocketExcep
3636

3737
@Override
3838
protected boolean doReconnect(Site site) {
39-
try {
40-
log.info("PollingClient doReconnect: " + site.host + ":" + site.port);
41-
Thread.sleep(1000);
42-
BlockingQueue<List<IMessage>> queue = subscribeInternal(site.host, site.port, site.tableName, site.actionName, (MessageHandler) null, site.msgId + 1, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord, site.msgAstable);
43-
log.info("Successfully reconnected and subscribed " + site.host + ":" + site.port + ":" + site.tableName);
44-
topicPoller.setQueue(queue);
45-
return true;
46-
} catch (Exception ex) {
47-
log.error("Unable to subscribe table. Will try again after 1 seconds.");
48-
ex.printStackTrace();
49-
return false;
39+
if (!AbstractClient.ifUseBackupSite) {
40+
// not enable backupSite, use original logic.
41+
try {
42+
log.info("PollingClient doReconnect: " + site.host + ":" + site.port);
43+
Thread.sleep(1000);
44+
BlockingQueue<List<IMessage>> queue = subscribeInternal(site.host, site.port, site.tableName, site.actionName, (MessageHandler) null, site.msgId + 1, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord, site.msgAstable);
45+
log.info("Successfully reconnected and subscribed " + site.host + ":" + site.port + ":" + site.tableName);
46+
topicPoller.setQueue(queue);
47+
return true;
48+
} catch (Exception ex) {
49+
log.error("Unable to subscribe table. Will try again after 1 seconds.");
50+
ex.printStackTrace();
51+
return false;
52+
}
53+
} else {
54+
// enable backupSite, try to switch site and subscribe.
55+
try {
56+
log.info("PollingClient doReconnect: " + site.host + ":" + site.port);
57+
Thread.sleep(1000);
58+
subscribe(site.host, site.port, site.tableName, site.actionName, (MessageHandler) null, site.msgId + 1, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord, site.msgAstable, false);
59+
String topicStr = site.host + ":" + site.port + "/" + site.tableName + "/" + site.actionName;
60+
String curTopic = tableNameToTrueTopic.get(topicStr);
61+
BlockingQueue<List<IMessage>> queue = queueManager.addQueue(curTopic);
62+
queueManager.changeQueue(curTopic, lastQueue);
63+
64+
log.info("Successfully reconnected and subscribed " + site.host + ":" + site.port + ":" + site.tableName);
65+
topicPoller.setQueue(lastQueue);
66+
return true;
67+
} catch (Exception ex) {
68+
log.error("Unable to subscribe table. Will try again after 1 seconds.");
69+
ex.printStackTrace();
70+
return false;
71+
}
5072
}
5173
}
5274

75+
protected void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String password, boolean msgAsTable, boolean createSubInfo) throws IOException {
76+
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, password, false, createSubInfo);
77+
}
78+
5379
public TopicPoller subscribe(String host, int port, String tableName, String actionName, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, String userName, String passWord) throws IOException {
5480
return subscribe(host, port, tableName, actionName, offset, reconnect, filter, deserializer, userName, passWord, false);
5581
}
@@ -58,7 +84,7 @@ public TopicPoller subscribe(String host, int port, String tableName, String act
5884
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, (MessageHandler) null, offset, reconnect, filter, deserializer, false, userName, passWord, msgAsTable);
5985
List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
6086
List<String> usr = Arrays.asList(userName, passWord);
61-
users.put(tp, usr);
87+
// users.put(tp, usr);
6288
topicPoller = new TopicPoller(queue);
6389
return topicPoller;
6490
}
@@ -71,7 +97,7 @@ public TopicPoller subscribe(String host, int port, String tableName, String act
7197
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, (MessageHandler) null, offset, reconnect, filter, deserializer, false, userName, passWord, msgAsTable, backupSites, resubTimeout, subOnce);
7298
List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
7399
List<String> usr = Arrays.asList(userName, passWord);
74-
users.put(tp, usr);
100+
// users.put(tp, usr);
75101
topicPoller = new TopicPoller(queue);
76102
return topicPoller;
77103
}
@@ -80,7 +106,7 @@ public TopicPoller subscribe(String host, int port, String tableName, String act
80106
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, (MessageHandler) null, offset, reconnect, filter, deserializer, false, userName, passWord, msgAsTable, backupSites, 100, false);
81107
List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
82108
List<String> usr = Arrays.asList(userName, passWord);
83-
users.put(tp, usr);
109+
// users.put(tp, usr);
84110
topicPoller = new TopicPoller(queue);
85111
return topicPoller;
86112
}

0 commit comments

Comments
 (0)