Skip to content

Commit 056c019

Browse files
author
chengyitian
committed
AJ-676: fix issue about streaming backupSites;
1 parent 9450ac9 commit 056c019

File tree

2 files changed

+12
-0
lines changed

2 files changed

+12
-0
lines changed

src/com/xxdb/streaming/client/QueueManager.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ public synchronized BlockingQueue<List<IMessage>> addQueue(String topic) {
1616
throw new RuntimeException("Topic " + topic + " already subscribed");
1717
}
1818

19+
public synchronized BlockingQueue<List<IMessage>> changeQueue(String topic, BlockingQueue<List<IMessage>> q) {
20+
if (queueMap.containsKey(topic)) {
21+
queueMap.put(topic, q);
22+
return q;
23+
}
24+
throw new RuntimeException("Topic " + topic + " doesn't subscribed");
25+
}
26+
1927
public synchronized BlockingQueue<List<IMessage>> getQueue(String topic) {
2028
BlockingQueue<List<IMessage>> q = queueMap.get(topic);
2129
return q;

src/com/xxdb/streaming/client/ThreadedClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,10 @@ protected boolean doReconnect(Site site) {
186186
// System.out.println("site msg id: " +site.msgId);
187187
subscribe(site.host, site.port, site.tableName, site.actionName, site.handler, site.msgId + 1, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord, false);
188188
System.out.println("doReconnect 尝试切换节点成功:" + site.host + ":" + site.port);
189+
String topicStr = site.host + ":" + site.port + "/" + site.tableName + "/" + site.actionName;
190+
String curTopic = tableNameToTrueTopic.get(topicStr);
191+
BlockingQueue<List<IMessage>> queue = queueManager.addQueue(curTopic);
192+
queueManager.changeQueue(curTopic, lastQueue);
189193
// System.out.println("切换后 handlerLoppers: " + handlerLoppers.get(topicStr).getName());
190194
// System.out.println("切换成功后,handlerLoppers size: " + handlerLoppers.size());
191195
Date d = new Date();

0 commit comments

Comments
 (0)