Skip to content

Commit d1cf7c9

Browse files
author
chengyitian
committed
AJ-689: optimize ThreadedClient backupSites unsubscribe logic;
1 parent cdee4b0 commit d1cf7c9

File tree

2 files changed

+89
-32
lines changed

2 files changed

+89
-32
lines changed

src/com/xxdb/streaming/client/AbstractClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public abstract class AbstractClient implements MessageDispatcher {
4545
protected static boolean subOnce;
4646
// protected static boolean createSubInfo;
4747
protected BlockingQueue<List<IMessage>> lastQueue;
48+
protected String lastSuccessSubscribeTopic = "";
4849

4950
private Daemon daemon = null;
5051

@@ -312,9 +313,11 @@ public boolean tryReconnect(String topic) {
312313

313314
// update currentSiteIndexMap to new successfully connected site's index;
314315
currentSiteIndexMap.put(topic, successfulSiteIndex);
316+
currentSiteIndexMap.put(lastSuccessSubscribeTopic, successfulSiteIndex);
315317
} else if (reconnected) {
316318
// not delete site, but update successfulSiteIndex.
317319
currentSiteIndexMap.put(topic, successfulSiteIndex);
320+
currentSiteIndexMap.put(lastSuccessSubscribeTopic, successfulSiteIndex);
318321
}
319322
}
320323

@@ -581,6 +584,7 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
581584
re = dbConn.run("getSubscriptionTopic", params);
582585
topic = ((BasicAnyVector) re).getEntity(0).getString();
583586
lastBackupSiteTopic = topic;
587+
lastSuccessSubscribeTopic = topic;
584588
params.clear();
585589

586590
// set current site index
@@ -668,6 +672,7 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
668672
re = dbConn.run("getSubscriptionTopic", params);
669673
topic = ((BasicAnyVector) re).getEntity(0).getString();
670674
// lastBackupSiteTopic = topic;
675+
lastSuccessSubscribeTopic = topic;
671676
params.clear();
672677

673678
params.add(new BasicString(localIP));

src/com/xxdb/streaming/client/ThreadedClient.java

Lines changed: 84 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -447,20 +447,9 @@ public void unsubscribe(String host, int port, String tableName) throws IOExcept
447447

448448
@Override
449449
protected void unsubscribeInternal(String host, int port, String tableName, String actionName) throws IOException {
450-
String originHost = host;
451-
int originPort = port;
452-
453-
synchronized (this) {
450+
if (!AbstractClient.ifUseBackupSite) {
451+
// original logic
454452
DBConnection dbConn = new DBConnection();
455-
456-
if (!currentSiteIndexMap.isEmpty()) {
457-
String topic = tableNameToTrueTopic.get( host + ":" + port + "/" + tableName + "/" + actionName);
458-
Integer currentSiteIndex = currentSiteIndexMap.get(topic);
459-
Site[] sites = trueTopicToSites.get(topic);
460-
host = sites[currentSiteIndex].host;
461-
port = sites[currentSiteIndex].port;
462-
}
463-
464453
List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
465454
List<String> usr = users.get(tp);
466455
String user = usr.get(0);
@@ -482,31 +471,94 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
482471
dbConn.run("stopPublishTable", params);
483472
String topic = null;
484473
String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
485-
// synchronized (tableNameToTrueTopic) {
486-
topic = tableNameToTrueTopic.get(fullTableName);
487-
// }
488-
// synchronized (trueTopicToSites) {
489-
Site[] sites = trueTopicToSites.get(topic);
490-
if (sites == null || sites.length == 0)
491-
;
492-
for (int i = 0; i < sites.length; i++)
493-
sites[i].closed = true;
494-
// }
495-
// synchronized (queueManager) {
496-
queueManager.removeQueue(topic);
497-
// }
474+
synchronized (tableNameToTrueTopic) {
475+
topic = tableNameToTrueTopic.get(fullTableName);
476+
}
477+
synchronized (trueTopicToSites) {
478+
Site[] sites = trueTopicToSites.get(topic);
479+
if (sites == null || sites.length == 0)
480+
;
481+
for (int i = 0; i < sites.length; i++)
482+
sites[i].closed = true;
483+
}
484+
synchronized (queueManager) {
485+
queueManager.removeQueue(topic);
486+
}
498487
log.info("Successfully unsubscribed table " + fullTableName);
499488
} catch (Exception ex) {
500489
throw ex;
501490
} finally {
502491
dbConn.close();
503-
String topicStr = originHost + ":" + originPort + "/" + tableName + "/" + actionName;
492+
String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
504493
HandlerLopper handlerLopper = null;
505-
// synchronized (handlerLoppers) {
506-
handlerLopper = handlerLoppers.get(topicStr);
507-
handlerLoppers.remove(topicStr);
508-
handlerLopper.interrupt();
509-
// }
494+
synchronized (handlerLoppers) {
495+
handlerLopper = handlerLoppers.get(topicStr);
496+
handlerLoppers.remove(topicStr);
497+
handlerLopper.interrupt();
498+
}
499+
}
500+
} else {
501+
// use backBackSite
502+
String originHost = host;
503+
int originPort = port;
504+
505+
synchronized (this) {
506+
DBConnection dbConn = new DBConnection();
507+
508+
if (!currentSiteIndexMap.isEmpty()) {
509+
Integer currentSiteIndex = currentSiteIndexMap.get(lastSuccessSubscribeTopic);
510+
Site[] sites = trueTopicToSites.get(lastSuccessSubscribeTopic);
511+
host = sites[currentSiteIndex].host;
512+
port = sites[currentSiteIndex].port;
513+
}
514+
515+
List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
516+
List<String> usr = users.get(tp);
517+
String user = usr.get(0);
518+
String pwd = usr.get(1);
519+
if (!user.equals(""))
520+
dbConn.connect(host, port, user, pwd);
521+
else
522+
dbConn.connect(host, port);
523+
try {
524+
String localIP = this.listeningHost;
525+
if(localIP.equals(""))
526+
localIP = dbConn.getLocalAddress().getHostAddress();
527+
List<Entity> params = new ArrayList<Entity>();
528+
params.add(new BasicString(localIP));
529+
params.add(new BasicInt(this.listeningPort));
530+
params.add(new BasicString(tableName));
531+
params.add(new BasicString(actionName));
532+
533+
dbConn.run("stopPublishTable", params);
534+
String topic = null;
535+
String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
536+
// synchronized (tableNameToTrueTopic) {
537+
topic = tableNameToTrueTopic.get(fullTableName);
538+
// }
539+
// synchronized (trueTopicToSites) {
540+
Site[] sites = trueTopicToSites.get(topic);
541+
if (sites == null || sites.length == 0)
542+
;
543+
for (int i = 0; i < sites.length; i++)
544+
sites[i].closed = true;
545+
// }
546+
// synchronized (queueManager) {
547+
queueManager.removeQueue(lastBackupSiteTopic);
548+
// }
549+
log.info("Successfully unsubscribed table " + fullTableName);
550+
} catch (Exception ex) {
551+
throw ex;
552+
} finally {
553+
dbConn.close();
554+
String topicStr = originHost + ":" + originPort + "/" + tableName + "/" + actionName;
555+
HandlerLopper handlerLopper = null;
556+
// synchronized (handlerLoppers) {
557+
handlerLopper = handlerLoppers.get(topicStr);
558+
handlerLoppers.remove(topicStr);
559+
handlerLopper.interrupt();
560+
// }
561+
}
510562
}
511563
}
512564
}

0 commit comments

Comments
 (0)