Skip to content

Commit be5288a

Browse files
author
chengyitian
committed
Merge branch 'dev-pool_loadbalance_fix' of dolphindb.net:dolphindb/api-java into dev
2 parents 7ce7838 + 1e23cbd commit be5288a

File tree

4 files changed

+258
-44
lines changed

4 files changed

+258
-44
lines changed

src/com/xxdb/DBConnection.java

Lines changed: 52 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -788,51 +788,64 @@ public boolean connect(String hostName, int port, String userId, String password
788788
if ( bt!=null && bt.getDataForm() != Entity.DATA_FORM.DF_TABLE)
789789
throw new IOException("Run getClusterPerf() failed.");
790790

791-
if (bt!=null && loadBalance_) {
792-
//ignore very high load nodes, rand one in low load nodes
793-
List<Node> lowLoadNodes=new ArrayList<>();
794-
BasicStringVector colHost = (BasicStringVector) bt.getColumn("host");
795-
BasicIntVector colPort = (BasicIntVector) bt.getColumn("port");
796-
BasicDoubleVector memLoad = (BasicDoubleVector) bt.getColumn("memLoad");
797-
BasicDoubleVector connLoad = (BasicDoubleVector) bt.getColumn("connLoad");
798-
BasicDoubleVector avgLoad = (BasicDoubleVector) bt.getColumn("avgLoad");
799-
for (int i = 0; i < colHost.rows(); i++) {
800-
Node nodex = new Node(colHost.getString(i), colPort.getInt(i));
801-
Node pexistNode = null;
802-
if (highAvailabilitySites != null) {
803-
for (Node node : nodes_) {
804-
if ((node.hostName.equals(nodex.hostName) || nodex.hostName.equals("localhost")) && node.port == nodex.port){
805-
pexistNode = node;
806-
break;
791+
if (bt!=null) {
792+
if (!loadBalance_) {
793+
if (highAvailabilitySites == null) {
794+
BasicStringVector colHost = (BasicStringVector) bt.getColumn("host");
795+
BasicIntVector colPort = (BasicIntVector) bt.getColumn("port");
796+
for (int i = 0; i < colHost.rows(); i++) {
797+
Node curNode = new Node(colHost.getString(i), colPort.getInt(i));
798+
if (!(curNode.hostName.equals(hostName) && curNode.port == port))
799+
nodes_.add(curNode);
800+
}
801+
}
802+
} else {
803+
// enable loadBalance
804+
//ignore very high load nodes, rand one in low load nodes
805+
List<Node> lowLoadNodes=new ArrayList<>();
806+
BasicStringVector colHost = (BasicStringVector) bt.getColumn("host");
807+
BasicIntVector colPort = (BasicIntVector) bt.getColumn("port");
808+
BasicDoubleVector memLoad = (BasicDoubleVector) bt.getColumn("memLoad");
809+
BasicDoubleVector connLoad = (BasicDoubleVector) bt.getColumn("connLoad");
810+
BasicDoubleVector avgLoad = (BasicDoubleVector) bt.getColumn("avgLoad");
811+
for (int i = 0; i < colHost.rows(); i++) {
812+
Node nodex = new Node(colHost.getString(i), colPort.getInt(i));
813+
Node pexistNode = null;
814+
if (highAvailabilitySites != null) {
815+
for (Node node : nodes_) {
816+
if ((node.hostName.equals(nodex.hostName) || nodex.hostName.equals("localhost")) && node.port == nodex.port){
817+
pexistNode = node;
818+
break;
819+
}
807820
}
821+
//node is out of highAvailabilitySites
822+
if (pexistNode == null)
823+
continue;
808824
}
809-
//node is out of highAvailabilitySites
810-
if (pexistNode == null)
811-
continue;
825+
double load=(memLoad.getDouble(i)+connLoad.getDouble(i)+avgLoad.getDouble(i))/3.0;
826+
if (pexistNode != null) {
827+
pexistNode.load = load;
828+
} else {
829+
pexistNode=new Node(colHost.getString(i), colPort.getInt(i), load);
830+
nodes_.add(pexistNode);
831+
}
832+
// low load
833+
if (memLoad.getDouble(i)<0.8 && connLoad.getDouble(i)<0.9 && avgLoad.getDouble(i)<0.8)
834+
lowLoadNodes.add(pexistNode);
812835
}
813-
double load=(memLoad.getDouble(i)+connLoad.getDouble(i)+avgLoad.getDouble(i))/3.0;
814-
if (pexistNode != null) {
815-
pexistNode.load = load;
836+
837+
Node pMinNode;
838+
if (!lowLoadNodes.isEmpty()) {
839+
pMinNode=lowLoadNodes.get(nodeRandom_.nextInt(lowLoadNodes.size()));
816840
} else {
817-
pexistNode=new Node(colHost.getString(i), colPort.getInt(i), load);
818-
nodes_.add(pexistNode);
841+
pMinNode=nodes_.get(nodeRandom_.nextInt(nodes_.size()));
819842
}
820-
// low load
821-
if (memLoad.getDouble(i)<0.8 && connLoad.getDouble(i)<0.9 && avgLoad.getDouble(i)<0.8)
822-
lowLoadNodes.add(pexistNode);
823-
}
824843

825-
Node pMinNode;
826-
if (!lowLoadNodes.isEmpty()) {
827-
pMinNode=lowLoadNodes.get(nodeRandom_.nextInt(lowLoadNodes.size()));
828-
} else {
829-
pMinNode=nodes_.get(nodeRandom_.nextInt(nodes_.size()));
830-
}
831-
832-
if (pMinNode != null && !pMinNode.equals(connectedNode)){
833-
log.info("Switch to node: " + pMinNode.hostName + ":" + pMinNode.port);
834-
conn_.close();
835-
switchDataNode(pMinNode);
844+
if (pMinNode != null && !pMinNode.equals(connectedNode)){
845+
log.info("Switch to node: " + pMinNode.hostName + ":" + pMinNode.port);
846+
conn_.close();
847+
switchDataNode(pMinNode);
848+
}
836849
}
837850
}
838851
} else {

src/com/xxdb/ExclusiveDBConnectionPool.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,18 @@ public ExclusiveDBConnectionPool(String host, int port, String uid, String pwd,
7878
if (count <= 0)
7979
throw new RuntimeException("The thread count can not be less than 0");
8080
if (!loadBalance) {
81+
// not enable loadBalance
8182
for (int i=0; i<count; ++i) {
8283
DBConnection conn = new DBConnection(false, useSSL, compress, usePython);
83-
conn.setLoadBalance(false);
84-
if(!conn.connect(host, port, uid, pwd, initialScript, enableHighAvailability, highAvailabilitySites))
85-
throw new RuntimeException("Can't connect to the specified host.");
84+
try {
85+
boolean isConnected = conn.connect(host, port, uid, pwd, initialScript, enableHighAvailability, highAvailabilitySites, false, loadBalance);
86+
if (!isConnected) {
87+
throw new RuntimeException("Can't connect to the specified host.");
88+
}
89+
} catch (Exception e) {
90+
throw new RuntimeException("Can't connect to the specified host: ", e);
91+
}
92+
8693
workers_.add(new AsyncWorker(conn));
8794
}
8895
} else {
@@ -108,8 +115,7 @@ public ExclusiveDBConnectionPool(String host, int port, String uid, String pwd,
108115
}
109116
for (int i=0; i<count; ++i) {
110117
DBConnection conn = new DBConnection(false, useSSL, compress, usePython);
111-
conn.setLoadBalance(false);
112-
if(!conn.connect(hosts[i % nodeCount], ports[i % nodeCount], uid, pwd, initialScript, enableHighAvailability, highAvailabilitySites))
118+
if(!conn.connect(hosts[i % nodeCount], ports[i % nodeCount], uid, pwd, initialScript, enableHighAvailability, highAvailabilitySites,false,false))
113119
throw new RuntimeException("Can't connect to the host " + nodes.getString(i));
114120
workers_.add(new AsyncWorker(conn));
115121
}

0 commit comments

Comments
 (0)