Skip to content

Commit 9b7dbcc

Browse files
author
chengyitian
committed
AJ-690: fix issue about listeningPort set and dbConn create order;
1 parent bbe750e commit 9b7dbcc

File tree

1 file changed

+28
-9
lines changed

1 file changed

+28
-9
lines changed

src/com/xxdb/streaming/client/AbstractClient.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -532,11 +532,7 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
532532
List<String> backupSites, int resubTimeout, boolean subOnce, boolean createSubInfo) throws IOException, RuntimeException {
533533
Entity re;
534534
String topic = "";
535-
DBConnection dbConn;
536-
if (listeningPort > 0)
537-
dbConn = new DBConnection();
538-
else
539-
dbConn = DBConnection.internalCreateEnableReverseStreamingDBConnection(false, false, false, false, false, SqlStdEnum.DolphinDB);
535+
DBConnection dbConn = null;
540536

541537
List<Site> parsedBackupSites = new ArrayList<>();
542538
if (Objects.nonNull(backupSites) && !backupSites.isEmpty()) {
@@ -563,6 +559,8 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
563559
for (int i = 0; i < parsedBackupSites.size() && !isConnected; i++) {
564560
Site site = parsedBackupSites.get(i);
565561
try {
562+
checkServerVersion(site.host, site.port);
563+
dbConn = createSubscribeInternalDBConnection();
566564
subscribeInternalConnect(dbConn, site.host, site.port, site.userName, site.passWord);
567565
if (deserializer!=null&&!deserializer.isInited())
568566
deserializer.init(dbConn);
@@ -610,20 +608,21 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
610608
}
611609

612610
re = dbConn.run("publishTable", params);
611+
connList.add(dbConn);
613612
} catch (IOException e) {
614613
log.error("Connect to site " + site.host + ":" + site.port + " failed: " + e.getMessage());
615614
}
616615
}
617616

618-
Site curConnectedSite = parsedBackupSites.get(currentSiteIndexMap.get(topic));
619-
checkServerVersion(curConnectedSite.host, curConnectedSite.port);
617+
// Site curConnectedSite = parsedBackupSites.get(currentSiteIndexMap.get(topic));
618+
// checkServerVersion(curConnectedSite.host, curConnectedSite.port);
620619

621620
if (!isConnected)
622621
throw new IOException("All sites try connect failed.");
623622
}
624623

625624
if (parsedBackupSites.size() != 0) {
626-
connList.add(dbConn);
625+
// connList.add(dbConn);
627626

628627
// prepare parsedBackupSites
629628
for (int i = 0; i < parsedBackupSites.size(); i++) {
@@ -658,6 +657,16 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
658657
List<String> usr = Arrays.asList(userName, passWord);
659658
users.put(tp, usr);
660659

660+
dbConn = createSubscribeInternalDBConnection();
661+
subscribeInternalConnect(dbConn, host, port, userName, passWord);
662+
663+
if (deserializer!=null&&!deserializer.isInited())
664+
deserializer.init(dbConn);
665+
if (deserializer != null){
666+
BasicDictionary schema = (BasicDictionary) dbConn.run(tableName + ".schema()");
667+
deserializer.checkSchema(schema);
668+
}
669+
661670
String localIP = this.listeningHost;
662671
if (localIP.equals(""))
663672
localIP = dbConn.getLocalAddress().getHostAddress();
@@ -668,7 +677,7 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
668677
List<Entity> params = new ArrayList<Entity>();
669678
params.add(new BasicString(tableName));
670679
params.add(new BasicString(actionName));
671-
subscribeInternalConnect(dbConn, host, port, userName, passWord);
680+
// subscribeInternalConnect(dbConn, host, port, userName, passWord);
672681
re = dbConn.run("getSubscriptionTopic", params);
673682
topic = ((BasicAnyVector) re).getEntity(0).getString();
674683
// lastBackupSiteTopic = topic;
@@ -921,4 +930,14 @@ private static void subscribeInternalConnect(DBConnection dbConn, String host, i
921930
else
922931
dbConn.connect(host, port);
923932
}
933+
934+
private DBConnection createSubscribeInternalDBConnection() {
935+
DBConnection dbConn;
936+
if (listeningPort > 0)
937+
dbConn = new DBConnection();
938+
else
939+
dbConn = DBConnection.internalCreateEnableReverseStreamingDBConnection(false, false, false, false, false, SqlStdEnum.DolphinDB);
940+
941+
return dbConn;
942+
}
924943
}

0 commit comments

Comments
 (0)