Skip to content

Commit 126afaf

Browse files
author
chengyitian
committed
AJ-672: fix issue about backupSite switch;
1 parent 13e6229 commit 126afaf

File tree

2 files changed

+55
-6
lines changed

2 files changed

+55
-6
lines changed

src/com/xxdb/streaming/client/AbstractClient.java

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public abstract class AbstractClient implements MessageDispatcher {
4343
protected static Map<String, Long> lastExceptionTopicTimeMap = new ConcurrentHashMap<>();
4444
protected static Integer resubTimeout;
4545
protected static boolean subOnce;
46+
// protected static boolean createSubInfo;
47+
protected BlockingQueue<List<IMessage>> lastQueue;
4648

4749
private Daemon daemon = null;
4850

@@ -175,7 +177,7 @@ public Site getCurrentSiteByName(String site) {
175177
List<String> topics = this.getAllTopicsBySite(site);
176178
if (topics.size() > 0) {
177179
Site[] sites = trueTopicToSites.get(topics.get(0));
178-
Integer currentSiteIndex = currentSiteIndexMap.get(topics.get(0));
180+
Integer currentSiteIndex = currentSiteIndexMap.get(lastBackupSiteTopic);
179181
return sites[currentSiteIndex];
180182
}
181183

@@ -195,6 +197,12 @@ public void setMsgId(String topic, long msgId) {
195197
return;
196198
if (sites.length == 1)
197199
sites[0].msgId = msgId;
200+
201+
if (ifUseBackupSite) {
202+
for (Site site : sites) {
203+
site.msgId = msgId;
204+
}
205+
}
198206
}
199207
}
200208

@@ -243,7 +251,7 @@ public boolean tryReconnect(String topic) {
243251
}
244252

245253
boolean reconnected = false;
246-
Integer currentSiteIndex = currentSiteIndexMap.get(topic);
254+
Integer currentSiteIndex = currentSiteIndexMap.get(lastBackupSiteTopic);
247255
if (currentSiteIndex != null && currentSiteIndex != -1) {
248256
int totalSites = sites.length;
249257
// set successfulSiteIndex init value to -1.
@@ -295,12 +303,13 @@ public boolean tryReconnect(String topic) {
295303
}
296304
}
297305

298-
log.info("Successfully switched to node: " + sites[currentSiteIndexMap.get(topic)].host + ":" + sites[currentSiteIndexMap.get(topic)].port);
306+
// log.info("Successfully switched to node: " + sites[currentSiteIndexMap.get(topic)].host + ":" + sites[currentSiteIndexMap.get(topic)].port);
299307

300308
if (!reconnected) {
301309
waitReconnectTopic.add(topic);
302310
return false;
303311
} else {
312+
log.info("Successfully switched to node: " + sites[currentSiteIndexMap.get(topic)].host + ":" + sites[currentSiteIndexMap.get(topic)].port);
304313
reconnectTable.remove(topic.substring(0, topic.indexOf("/")));
305314
waitReconnectTopic.remove(topic);
306315
return true;
@@ -490,11 +499,19 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
490499
return subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable, null, 100, false);
491500
}
492501

502+
protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
503+
String tableName, String actionName, MessageHandler handler,
504+
long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer,
505+
boolean allowExistTopic, String userName, String passWord, boolean msgAsTable, boolean createSubInfo)
506+
throws IOException, RuntimeException {
507+
return subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable, null, 100, false, createSubInfo);
508+
}
509+
493510
protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
494511
String tableName, String actionName, MessageHandler handler,
495512
long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer,
496513
boolean allowExistTopic, String userName, String passWord, boolean msgAsTable,
497-
List<String> backupSites, int resubTimeout, boolean subOnce) throws IOException, RuntimeException {
514+
List<String> backupSites, int resubTimeout, boolean subOnce, boolean createSubInfo) throws IOException, RuntimeException {
498515
Entity re;
499516
String topic = "";
500517
DBConnection dbConn;
@@ -508,6 +525,7 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
508525
AbstractClient.resubTimeout = resubTimeout;
509526
AbstractClient.subOnce = subOnce;
510527
AbstractClient.ifUseBackupSite = true;
528+
// AbstractClient.createSubInfo = createSubInfo;
511529
// prepare backupSites
512530
for (int i = 0; i < backupSites.size() + 1; i++) {
513531
if (i == 0) {
@@ -634,6 +652,7 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
634652
subscribeInternalConnect(dbConn, host, port, userName, passWord);
635653
re = dbConn.run("getSubscriptionTopic", params);
636654
topic = ((BasicAnyVector) re).getEntity(0).getString();
655+
// lastBackupSiteTopic = topic;
637656
params.clear();
638657

639658
params.add(new BasicString(localIP));
@@ -721,10 +740,25 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
721740
}
722741
}
723742

724-
BlockingQueue<List<IMessage>> queue = queueManager.addQueue(topic);
743+
BlockingQueue<List<IMessage>> queue;
744+
if (createSubInfo) {
745+
queue = queueManager.addQueue(topic);
746+
lastQueue = queue;
747+
} else {
748+
queue = lastQueue;
749+
}
750+
725751
return queue;
726752
}
727753

754+
protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
755+
String tableName, String actionName, MessageHandler handler,
756+
long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer,
757+
boolean allowExistTopic, String userName, String passWord, boolean msgAsTable,
758+
List<String> backupSites, int resubTimeout, boolean subOnce) throws IOException, RuntimeException {
759+
return subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable, backupSites, resubTimeout, subOnce, true);
760+
}
761+
728762
protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
729763
String tableName, String actionName, long offset, boolean reconnect)
730764
throws IOException, RuntimeException {

src/com/xxdb/streaming/client/ThreadedClient.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,10 @@ protected boolean doReconnect(Site site) {
168168
}
169169

170170
try {
171-
subscribe(site.host, site.port, site.tableName, site.actionName, site.handler, site.msgId + 1, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord);
171+
// System.out.println("doReconnect 尝试切换节点:" + site.host + ":" + site.port);
172+
// System.out.println("site msg id: " +site.msgId);
173+
subscribe(site.host, site.port, site.tableName, site.actionName, site.handler, site.msgId + 1, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord, false);
174+
// System.out.println("doReconnect 尝试切换节点成功:" + site.host + ":" + site.port);
172175
Date d = new Date();
173176
DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
174177
log.info(df.format(d) + " Successfully reconnected and subscribed " + site.host + ":" + site.port + "/" + site.tableName + "/" + site.actionName);
@@ -195,6 +198,18 @@ public void subscribe(String host, int port, String tableName, String actionName
195198
}
196199
}
197200

201+
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String password, boolean createSubInfo) throws IOException {
202+
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, password, false, createSubInfo);
203+
HandlerLopper handlerLopper = new HandlerLopper(queue, handler);
204+
handlerLopper.start();
205+
String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
206+
List<String> usr = Arrays.asList(userName, password);
207+
synchronized (handlerLoppers) {
208+
handlerLoppers.put(topicStr, handlerLopper);
209+
// users.put(topicStr, usr);
210+
}
211+
}
212+
198213
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, int batchSize, int throttle, String userName, String password) throws IOException {
199214
if(batchSize<=0)
200215
throw new IllegalArgumentException("BatchSize must be greater than zero");

0 commit comments

Comments
 (0)