Skip to content

Commit 1cb5d02

Browse files
author
chengyitian
committed
AJ-636: add new logic for streaming backup;
1 parent ab10a15 commit 1cb5d02

File tree

7 files changed

+199
-50
lines changed

7 files changed

+199
-50
lines changed

src/com/xxdb/streaming/client/AbstractClient.java

Lines changed: 86 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,12 @@ public abstract class AbstractClient implements MessageDispatcher {
3737
protected HashMap<List<String>, List<String>> users = new HashMap<>();
3838
protected boolean isClose_ = false;
3939
protected LinkedBlockingQueue<DBConnection> connList = new LinkedBlockingQueue<>();
40+
protected static boolean ifUseBackupSite;
41+
protected String lastBackupSiteTopic = "";
4042
protected Map<String, Integer> currentSiteIndexMap = new ConcurrentHashMap<>();
43+
protected static Map<String, Long> lastExceptionTopicTimeMap = new ConcurrentHashMap<>();
44+
protected static Integer resubTimeout;
45+
protected static boolean subOnce;
4146

4247
private Daemon daemon = null;
4348

@@ -166,6 +171,17 @@ public Site getSiteByName(String site) {
166171
return null;
167172
}
168173

174+
public Site getCurrentSiteByName(String site) {
175+
List<String> topics = this.getAllTopicsBySite(site);
176+
if (topics.size() > 0) {
177+
Site[] sites = trueTopicToSites.get(topics.get(0));
178+
Integer currentSiteIndex = currentSiteIndexMap.get(topics.get(0));
179+
return sites[currentSiteIndex];
180+
}
181+
182+
return null;
183+
}
184+
169185
public Map<String, StreamDeserializer> getSubInfos(){
170186
return subInfos_;
171187
}
@@ -227,32 +243,65 @@ public boolean tryReconnect(String topic) {
227243
}
228244

229245
boolean reconnected = false;
230-
for (int i = 0; i < sites.length; i ++) {
231-
Integer currentSiteIndex = currentSiteIndexMap.get(topic);
232-
if (currentSiteIndexMap.get(topic) != -1)
233-
i = currentSiteIndex;
234-
Site site = sites[i];
235-
boolean siteReconnected = false;
236-
for (int attempt = 0; attempt < 2; attempt++) {
237-
// try twice every site
238-
if (doReconnect(site)) {
239-
siteReconnected = true;
240-
// if site current connect successfully, break
246+
Integer currentSiteIndex = currentSiteIndexMap.get(topic);
247+
if (currentSiteIndex != null && currentSiteIndex != -1) {
248+
int totalSites = sites.length;
249+
// set successfulSiteIndex init value to -1.
250+
int successfulSiteIndex = -1;
251+
252+
// Starting from currentSiteIndex, go around in a circle until you return to the position just before it (circular looping).
253+
for (int offset = 0; offset < totalSites; offset++) {
254+
// Implement wrapping around using modulo operation
255+
int i = (currentSiteIndex + offset) % totalSites;
256+
257+
Site site = sites[i];
258+
boolean siteReconnected = false;
259+
260+
for (int attempt = 0; attempt < 1; attempt++) {
261+
// try twice for every site.
262+
if (doReconnect(site)) {
263+
siteReconnected = true;
264+
// if site reconnect successfully, break.
265+
break;
266+
}
267+
}
268+
269+
if (siteReconnected) {
270+
reconnected = true;
271+
successfulSiteIndex = i;
272+
// if current site reconnect successfully, no continue try other sites, break.
241273
break;
242274
}
243275
}
244-
if (siteReconnected) {
245-
reconnected = true;
246-
currentSiteIndexMap.put(topic, i);
247-
// if current site connect successfully, no tryReconnect other site, break
248-
break;
276+
277+
// Determine whether to delete the original currentSiteIndex node based on the subOnce parameter.
278+
if (subOnce && reconnected) {
279+
List<Site> siteList = new ArrayList<>(Arrays.asList(sites));
280+
// Remove the original currentSiteIndex node from the list.
281+
siteList.remove((int) currentSiteIndex);
282+
// update sites
283+
sites = siteList.toArray(new Site[0]);
284+
285+
// Calculate the index of the newly successful connection node after a successful deletion.
286+
if (successfulSiteIndex > currentSiteIndex) {
287+
// If the successfully connected node is after the deleted node, reduce the index by 1.
288+
successfulSiteIndex -= 1;
289+
}
290+
// update currentSiteIndexMap to new successfully connected site's index;
291+
currentSiteIndexMap.put(topic, successfulSiteIndex);
292+
} else if (reconnected) {
293+
// not delete site, but update successfulSiteIndex.
294+
currentSiteIndexMap.put(topic, successfulSiteIndex);
249295
}
250296
}
251297

298+
log.info("Successfully switched to node: " + sites[currentSiteIndexMap.get(topic)].host + ":" + sites[currentSiteIndexMap.get(topic)].port);
299+
252300
if (!reconnected) {
253301
waitReconnectTopic.add(topic);
254302
return false;
255303
} else {
304+
reconnectTable.remove(topic.substring(0, topic.indexOf("/")));
256305
waitReconnectTopic.remove(topic);
257306
return true;
258307
}
@@ -438,13 +487,14 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
438487
long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer,
439488
boolean allowExistTopic, String userName, String passWord, boolean msgAsTable)
440489
throws IOException, RuntimeException {
441-
return subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable, null);
490+
return subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable, null, 100, false);
442491
}
443492

444493
protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
445494
String tableName, String actionName, MessageHandler handler,
446495
long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer,
447-
boolean allowExistTopic, String userName, String passWord, boolean msgAsTable, List<String> backupSites) throws IOException, RuntimeException {
496+
boolean allowExistTopic, String userName, String passWord, boolean msgAsTable,
497+
List<String> backupSites, int resubTimeout, boolean subOnce) throws IOException, RuntimeException {
448498
Entity re;
449499
String topic = "";
450500
DBConnection dbConn;
@@ -455,6 +505,9 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
455505

456506
List<Site> parsedBackupSites = new ArrayList<>();
457507
if (Objects.nonNull(backupSites) && !backupSites.isEmpty()) {
508+
AbstractClient.resubTimeout = resubTimeout;
509+
AbstractClient.subOnce = subOnce;
510+
AbstractClient.ifUseBackupSite = true;
458511
// prepare backupSites
459512
for (int i = 0; i < backupSites.size() + 1; i++) {
460513
if (i == 0) {
@@ -494,6 +547,7 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
494547
params.add(new BasicString(actionName));
495548
re = dbConn.run("getSubscriptionTopic", params);
496549
topic = ((BasicAnyVector) re).getEntity(0).getString();
550+
lastBackupSiteTopic = topic;
497551
params.clear();
498552

499553
// set current site index
@@ -598,7 +652,20 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
598652

599653
re = dbConn.run("publishTable", params);
600654
connList.add(dbConn);
601-
if (re instanceof BasicAnyVector) {
655+
if (ifUseBackupSite) {
656+
synchronized (subInfos_){
657+
subInfos_.put(topic, deserializer);
658+
}
659+
synchronized (tableNameToTrueTopic) {
660+
tableNameToTrueTopic.put(host + ":" + port + "/" + tableName + "/" + actionName, topic);
661+
}
662+
synchronized (HATopicToTrueTopic) {
663+
HATopicToTrueTopic.put(topic, topic);
664+
}
665+
synchronized (trueTopicToSites) {
666+
trueTopicToSites.put(topic, trueTopicToSites.get(lastBackupSiteTopic));
667+
}
668+
} else if (re instanceof BasicAnyVector) {
602669
BasicStringVector HASiteStrings = (BasicStringVector) (((BasicAnyVector) re).getEntity(1));
603670
int HASiteNum = HASiteStrings.rows();
604671
Site[] sites = new Site[HASiteNum];

src/com/xxdb/streaming/client/Daemon.java

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ public void setRunningThread(Thread runningThread){
118118
@Override
119119
public void run() {
120120
while (!pThread.isInterrupted()) {
121+
if (!AbstractClient.ifUseBackupSite) {
122+
// original logic:
121123
for (String site : this.dispatcher.getAllReconnectSites()) {
122124
if (dispatcher.getNeedReconnect(site) == 1) {
123125
Site s = dispatcher.getSiteByName(site);
@@ -150,10 +152,65 @@ public void run() {
150152
}
151153
}
152154

153-
try {
154-
Thread.sleep(1000);
155-
} catch (InterruptedException e) {
156-
break;
155+
try {
156+
Thread.sleep(1000);
157+
} catch (InterruptedException e) {
158+
break;
159+
}
160+
} else {
161+
// if set backupSite
162+
for (String site : this.dispatcher.getAllReconnectSites()) {
163+
if (dispatcher.getNeedReconnect(site) == 1) {
164+
System.out.println("flag site: " + site);
165+
Site s = dispatcher.getCurrentSiteByName(site);
166+
dispatcher.activeCloseConnection(s);
167+
String lastTopic = "";
168+
for (String topic : dispatcher.getAllTopicsBySite(site)) {
169+
System.out.println("Daemon need reconnect: " + topic);
170+
// reconnect every info.resubTimeout ms
171+
if (System.currentTimeMillis() - AbstractClient.lastExceptionTopicTimeMap.get(topic) <= AbstractClient.resubTimeout)
172+
continue;
173+
174+
log.info("flag1 try to reconnect topic " + topic);
175+
dispatcher.tryReconnect(topic);
176+
// lastTopic = topic;
177+
}
178+
// dispatcher.setNeedReconnect(lastTopic, 2);
179+
} else {
180+
// try reconnect after 3 second when reconnecting stat
181+
// long ts = dispatcher.getReconnectTimestamp(site);
182+
// if (System.currentTimeMillis() >= ts + 3000) {
183+
Site s = dispatcher.getSiteByName(site);
184+
dispatcher.activeCloseConnection(s);
185+
for (String topic : dispatcher.getAllTopicsBySite(site)) {
186+
// reconnect every info.resubTimeout ms
187+
if (System.currentTimeMillis() - AbstractClient.lastExceptionTopicTimeMap.get(topic) <= AbstractClient.resubTimeout)
188+
continue;
189+
190+
log.info("flag2 try to reconnect topic " + topic);
191+
dispatcher.tryReconnect(topic);
192+
// }
193+
dispatcher.setReconnectTimestamp(site, System.currentTimeMillis());
194+
}
195+
}
196+
}
197+
198+
Set<String> waitReconnectTopic = dispatcher.getAllReconnectTopic();
199+
synchronized (waitReconnectTopic) {
200+
for (String topic : waitReconnectTopic) {
201+
// reconnect every info.resubTimeout ms
202+
if (System.currentTimeMillis() - AbstractClient.lastExceptionTopicTimeMap.get(topic) <= AbstractClient.resubTimeout)
203+
continue;
204+
dispatcher.tryReconnect(topic);
205+
}
206+
}
207+
208+
try {
209+
// check reconnected interval time
210+
Thread.sleep(10);
211+
} catch (InterruptedException e) {
212+
break;
213+
}
157214
}
158215
}
159216
}

src/com/xxdb/streaming/client/MessageDispatcher.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ interface MessageDispatcher {
3232

3333
Site getSiteByName(String site);
3434

35+
Site getCurrentSiteByName(String site);
36+
3537
void activeCloseConnection(Site site);
3638

3739
List<String> getAllTopicsBySite(String site);

src/com/xxdb/streaming/client/MessageParser.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public void run() {
107107
body = BasicEntityFactory.instance().createEntity(df, dt, in, extended);
108108
if (body.isTable() && body.rows() == 0) {
109109
for (String t : topic.split(",")) {
110+
AbstractClient.lastExceptionTopicTimeMap.put(topic, System.currentTimeMillis());
110111
dispatcher.setNeedReconnect(t, 0);
111112
}
112113
assert (body.rows() == 0);
@@ -161,13 +162,16 @@ public void run() {
161162
} catch (Exception e) {
162163
e.printStackTrace();
163164
if (dispatcher.isClosed(topic)) {
164-
log.error("check " + topic + " is unsubscribed");
165+
if (!AbstractClient.ifUseBackupSite)
166+
log.error("check " + topic + " is unsubscribed");
165167
return;
166168
} else {
169+
AbstractClient.lastExceptionTopicTimeMap.put(topic, System.currentTimeMillis());
167170
dispatcher.setNeedReconnect(topic, 1);
168171
}
169172
} catch (Throwable t) {
170173
t.printStackTrace();
174+
AbstractClient.lastExceptionTopicTimeMap.put(topic, System.currentTimeMillis());
171175
dispatcher.setNeedReconnect(topic, 1);
172176
} finally {
173177
try {

src/com/xxdb/streaming/client/PollingClient.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public PollingClient(String subscribeHost, int subscribePort) throws SocketExcep
3737
@Override
3838
protected boolean doReconnect(Site site) {
3939
try {
40+
log.info("PollingClient doReconnect: " + site.host + ":" + site.port);
4041
Thread.sleep(1000);
4142
BlockingQueue<List<IMessage>> queue = subscribeInternal(site.host, site.port, site.tableName, site.actionName, (MessageHandler) null, site.msgId + 1, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord, site.msgAstable);
4243
log.info("Successfully reconnected and subscribed " + site.host + ":" + site.port + ":" + site.tableName);
@@ -62,8 +63,12 @@ public TopicPoller subscribe(String host, int port, String tableName, String act
6263
return topicPoller;
6364
}
6465

65-
public TopicPoller subscribe(String host, int port, String tableName, String actionName, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, String userName, String passWord, boolean msgAsTable, List<String> backupSites) throws IOException {
66-
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, (MessageHandler) null, offset, reconnect, filter, deserializer, false, userName, passWord, msgAsTable, backupSites);
66+
public TopicPoller subscribe(String host, int port, String tableName, String actionName, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, String userName, String passWord, boolean msgAsTable, List<String> backupSites, int resubTimeout, boolean subOnce) throws IOException {
67+
if (resubTimeout < 0)
68+
// resubTimeout default: 100ms
69+
resubTimeout = 100;
70+
71+
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, (MessageHandler) null, offset, reconnect, filter, deserializer, false, userName, passWord, msgAsTable, backupSites, resubTimeout, subOnce);
6772
List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
6873
List<String> usr = Arrays.asList(userName, passWord);
6974
users.put(tp, usr);

src/com/xxdb/streaming/client/ThreadPooledClient.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ public void run() {
119119
}
120120

121121
protected boolean doReconnect(Site site) {
122+
log.info("ThreadPooledClient doReconnect: " + site.host + ":" + site.port);
122123
threadPool.shutdownNow();
123124
try {
124125
Thread.sleep(1000);
@@ -146,8 +147,12 @@ public void subscribe(String host, int port, String tableName, String actionName
146147
}
147148
}
148149

149-
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String passWord, boolean msgAsTable, List<String> backupSites) throws IOException {
150-
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable, backupSites);
150+
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String passWord, boolean msgAsTable, List<String> backupSites, int resubTimeout, boolean subOnce) throws IOException {
151+
if (resubTimeout < 0)
152+
// resubTimeout default: 100ms
153+
resubTimeout = 100;
154+
155+
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable, backupSites, resubTimeout, subOnce);
151156
String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
152157
List<String> usr = Arrays.asList(userName, passWord);
153158
synchronized (queueHandlers) {

0 commit comments

Comments
 (0)