Skip to content

Commit 048ffb1

Browse files
author
chengyitian
committed
AJ-681: fix issue about backupSite issues;
1 parent c2a44b1 commit 048ffb1

File tree

2 files changed

+39
-22
lines changed

2 files changed

+39
-22
lines changed

src/com/xxdb/streaming/client/AbstractClient.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,11 +258,14 @@ public boolean tryReconnect(String topic) {
258258
int successfulSiteIndex = -1;
259259

260260
// Starting from currentSiteIndex, go around in a circle until you return to the position just before it (circular looping).
261+
Site lastSite = null;
261262
for (int offset = 0; offset < totalSites; offset++) {
262263
// Implement wrapping around using modulo operation
263264
int i = (currentSiteIndex + offset) % totalSites;
264265

265266
Site site = sites[i];
267+
if (offset == 0)
268+
lastSite = site;
266269
boolean siteReconnected = false;
267270

268271
for (int attempt = 0; attempt < 2; attempt++) {
@@ -286,7 +289,9 @@ public boolean tryReconnect(String topic) {
286289
if (subOnce && reconnected) {
287290
List<Site> siteList = new ArrayList<>(Arrays.asList(sites));
288291
// Remove the original currentSiteIndex node from the list.
289-
siteList.remove((int) currentSiteIndex);
292+
if (!(siteList.get(successfulSiteIndex).host.equals(lastSite.host) && siteList.get(successfulSiteIndex).port == lastSite.port))
293+
siteList.remove((int) currentSiteIndex);
294+
290295
// update sites
291296
sites = siteList.toArray(new Site[0]);
292297
trueTopicToSites.put(topic, sites);

src/com/xxdb/streaming/client/ThreadedClient.java

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -154,24 +154,18 @@ else if(batchSize != -1 && throttle != -1 || batchSize != -1 && secondThrottle !
154154

155155
@Override
156156
protected boolean doReconnect(Site site) {
157-
synchronized (this) {
158-
log.info("ThreadedClient doReconnect: " + site.host + ":" + site.port);
157+
if (!AbstractClient.ifUseBackupSite) {
158+
// not enable backupSite, use original logic
159159
String topicStr = site.host + ":" + site.port + "/" + site.tableName + "/" + site.actionName;
160160
Thread handlerLopper = null;
161-
if (!handlerLoppers.containsKey(topicStr)) {
162-
if (!AbstractClient.ifUseBackupSite) {
161+
synchronized (handlerLoppers) {
162+
if (!handlerLoppers.containsKey(topicStr))
163163
throw new RuntimeException("Subscribe thread is not started");
164-
}
165-
} else {
166164
handlerLopper = handlerLoppers.get(topicStr);
167-
handlerLopper.interrupt();
168165
}
169-
166+
handlerLopper.interrupt();
170167
try {
171-
// System.out.println("doReconnect 尝试切换节点:" + site.host + ":" + site.port);
172-
// System.out.println("site msg id: " +site.msgId);
173-
subscribe(site.host, site.port, site.tableName, site.actionName, site.handler, site.msgId + 1, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord, false);
174-
// System.out.println("doReconnect 尝试切换节点成功:" + site.host + ":" + site.port);
168+
subscribe(site.host, site.port, site.tableName, site.actionName, site.handler, site.msgId + 1, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord);
175169
Date d = new Date();
176170
DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
177171
log.info(df.format(d) + " Successfully reconnected and subscribed " + site.host + ":" + site.port + "/" + site.tableName + "/" + site.actionName);
@@ -183,6 +177,29 @@ protected boolean doReconnect(Site site) {
183177
ex.printStackTrace();
184178
return false;
185179
}
180+
} else {
181+
// enable backupSite, try to switch site and subscribe.
182+
synchronized (this) {
183+
log.info("ThreadedClient doReconnect: " + site.host + ":" + site.port);
184+
try {
185+
System.out.println("doReconnect 尝试切换节点:" + site.host + ":" + site.port);
186+
// System.out.println("site msg id: " +site.msgId);
187+
subscribe(site.host, site.port, site.tableName, site.actionName, site.handler, site.msgId + 1, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord, false);
188+
System.out.println("doReconnect 尝试切换节点成功:" + site.host + ":" + site.port);
189+
// System.out.println("切换后 handlerLoppers: " + handlerLoppers.get(topicStr).getName());
190+
// System.out.println("切换成功后,handlerLoppers size: " + handlerLoppers.size());
191+
Date d = new Date();
192+
DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
193+
log.info(df.format(d) + " Successfully reconnected and subscribed " + site.host + ":" + site.port + "/" + site.tableName + "/" + site.actionName);
194+
return true;
195+
} catch (Exception ex) {
196+
Date d = new Date();
197+
DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
198+
log.error(df.format(d) + " Unable to subscribe table. Will try again after 1 seconds." + site.host + ":" + site.port + "/" + site.tableName + "/" + site.actionName);
199+
ex.printStackTrace();
200+
return false;
201+
}
202+
}
186203
}
187204
}
188205

@@ -198,16 +215,11 @@ public void subscribe(String host, int port, String tableName, String actionName
198215
}
199216
}
200217

201-
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String password, boolean createSubInfo) throws IOException {
218+
/**
219+
* This internal subscribe method only use for when enable backupSite, try to switch site and subscribe.
220+
*/
221+
protected void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String password, boolean createSubInfo) throws IOException {
202222
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, password, false, createSubInfo);
203-
HandlerLopper handlerLopper = new HandlerLopper(queue, handler);
204-
handlerLopper.start();
205-
String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
206-
List<String> usr = Arrays.asList(userName, password);
207-
synchronized (handlerLoppers) {
208-
handlerLoppers.put(topicStr, handlerLopper);
209-
// users.put(topicStr, usr);
210-
}
211223
}
212224

213225
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, int batchSize, int throttle, String userName, String password) throws IOException {

0 commit comments

Comments
 (0)