Skip to content

Commit f5d14bd

Browse files
author
chengyitian
committed
Merge branch 'dev_conn_pool_Prj2' of dolphindb.net:dolphindb/api-java into dev
2 parents 0726429 + 5f63ed1 commit f5d14bd

33 files changed

+1087
-130
lines changed

src/com/xxdb/DBConnection.java

Lines changed: 83 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.net.InetAddress;
66
import java.net.InetSocketAddress;
77
import java.net.Socket;
8+
import java.rmi.RemoteException;
89
import java.security.PublicKey;
910
import java.security.cert.CertificateException;
1011
import java.security.cert.X509Certificate;
@@ -55,6 +56,7 @@ public class DBConnection {
5556
private long runSeqNo_ = 0;
5657
private int[] serverVersion_;
5758
private boolean isReverseStreaming_ = false;
59+
private int tryReconnectNums = -1;
5860

5961
private static final Logger log = LoggerFactory.getLogger(DBConnection.class);
6062

@@ -189,7 +191,6 @@ private boolean connect()throws IOException{
189191
socket_.connect(new InetSocketAddress(hostName_,port_), 3000);
190192
}
191193
} catch (ConnectException ex) {
192-
log.error("com.xxdb.DBConnection.DBConnectionImpl.connect() has exception. Current node hostName: " + this.hostName_ + ", port: " + this.port_);
193194
log.error("Connect to " + this.hostName_ + ":" + this.port_ + " failed.");
194195
throw ex;
195196
}
@@ -668,6 +669,11 @@ public boolean connect(String hostName, int port, int timeout, boolean reconnect
668669
return connect(hostName, port, "", "", null, false, null, reconnect);
669670
}
670671

672+
public boolean connect(String hostName, int port, int timeout, boolean reconnect, int tryReconnectNums) throws IOException {
673+
this.connTimeout_ = timeout;
674+
return connect(hostName, port, "", "", null, false, null, reconnect, tryReconnectNums);
675+
}
676+
671677
public boolean connect(String hostName, int port, String initialScript) throws IOException {
672678
return connect(hostName, port, "", "", initialScript, false, null);
673679
}
@@ -723,14 +729,32 @@ public boolean connect(String hostName, int port, String userId, String password
723729
return connect(hostName, port, userId, password, initialScript, enableHighAvailability, highAvailabilitySites, reconnect, false);
724730
}
725731

732+
public boolean connect(String hostName, int port, String userId, String password, String initialScript, boolean enableHighAvailability, String[] highAvailabilitySites, boolean reconnect, int tryReconnectNums) throws IOException {
733+
if (enableHighAvailability)
734+
return connect(hostName, port, userId, password, initialScript, enableHighAvailability, highAvailabilitySites, reconnect, true, tryReconnectNums);
735+
else
736+
return connect(hostName, port, userId, password, initialScript, enableHighAvailability, highAvailabilitySites, reconnect, false, tryReconnectNums);
737+
}
738+
726739
public boolean connect(String hostName, int port, String userId, String password, String initialScript, boolean enableHighAvailability, String[] highAvailabilitySites, boolean reconnect, boolean enableLoadBalance) throws IOException {
740+
return connect(hostName, port, userId, password, initialScript, enableHighAvailability, highAvailabilitySites, reconnect, enableLoadBalance, -1);
741+
}
742+
743+
public boolean connect(String hostName, int port, String userId, String password, String initialScript, boolean enableHighAvailability, String[] highAvailabilitySites, boolean reconnect, boolean enableLoadBalance, int tryReconnectNums) throws IOException {
727744
mutex_.lock();
728745
try {
729746
this.uid_ = userId;
730747
this.pwd_ = password;
731748
this.initialScript_ = initialScript;
732749
this.enableHighAvailability_ = enableHighAvailability;
733750
this.loadBalance_ = enableLoadBalance;
751+
if (tryReconnectNums <= 0) {
752+
this.tryReconnectNums = -1;
753+
log.warn("If the param 'tryReconnectNums' less than or equal to 0, when reconnect will be unlimited attempts.");
754+
} else {
755+
this.tryReconnectNums = tryReconnectNums;
756+
}
757+
734758
if (this.loadBalance_ && !this.enableHighAvailability_)
735759
throw new RuntimeException("Cannot only enable loadbalance but not enable highAvailablity.");
736760

@@ -746,20 +770,48 @@ public boolean connect(String hostName, int port, String userId, String password
746770

747771
Node connectedNode = new Node();
748772
BasicTable bt = null;
773+
749774
while (!closed_) {
775+
int totalConnectAttemptNums = this.tryReconnectNums * nodes_.size();
776+
int attempt = 0;
750777
while (!conn_.isConnected() && !closed_) {
751-
for (Node one : nodes_) {
752-
if (connectNode(one)) {
753-
connectedNode = one;
754-
break;
778+
if (this.tryReconnectNums > 0) {
779+
// finite try to connect.
780+
for (Node one : nodes_) {
781+
attempt ++;
782+
// System.out.println("Current init connect node: " + one.hostName + ":" + one.port);
783+
if (connectNode(one)) {
784+
connectedNode = one;
785+
break;
786+
}
787+
788+
try {
789+
Thread.sleep(100);
790+
} catch (Exception e){
791+
e.printStackTrace();
792+
return false;
793+
}
755794
}
756795

757-
try {
758-
Thread.sleep(100);
759-
} catch (Exception e){
760-
e.printStackTrace();
796+
if (attempt == totalConnectAttemptNums) {
797+
log.error("Connect failed after " + tryReconnectNums + " reconnect attemps for every node in high availability sites.");
761798
return false;
762799
}
800+
} else {
801+
// infinite try to connect.
802+
for (Node one : nodes_) {
803+
if (connectNode(one)) {
804+
connectedNode = one;
805+
break;
806+
}
807+
808+
try {
809+
Thread.sleep(100);
810+
} catch (Exception e){
811+
e.printStackTrace();
812+
return false;
813+
}
814+
}
763815
}
764816
}
765817

@@ -882,30 +934,44 @@ private void initConnection() throws IOException{
882934
}
883935

884936
public void switchDataNode(Node node) throws IOException{
937+
int attempt = 0;
938+
boolean isConnected = false;
885939
do {
886-
if (node.hostName != null && node.hostName.length() > 0){
887-
if (connectNode(node)){
940+
attempt ++;
941+
if (node.hostName != null && node.hostName.length() > 0) {
942+
if (connectNode(node)) {
888943
log.info("Switch to node: " + node.hostName + ":" + node.port + " successfully.");
944+
isConnected = true;
889945
break;
890946
}
891947
}
948+
892949
if (nodes_.isEmpty()){
893950
log.error("com.xxdb.DBConnection.switchDataNode nodes_ is empty. Current node hostName: " + node.hostName + ", port: " + node.port);
894951
log.error("Connect to " + node.hostName + ":" + node.port + " failed.");
895952
throw new RuntimeException("Connect to " + node.hostName + ":" + node.port + " failed.");
896953
}
897-
int index = nodeRandom_.nextInt(nodes_.size());
898-
if (connectNode(nodes_.get(index))){
899-
log.info("Switch to node: " + nodes_.get(index).hostName + ":" + nodes_.get(index).port + " successfully.");
900-
break;
954+
955+
if (nodes_.size() > 1) {
956+
int index = nodeRandom_.nextInt(nodes_.size());
957+
if (connectNode(nodes_.get(index))){
958+
log.info("Switch to node: " + nodes_.get(index).hostName + ":" + nodes_.get(index).port + " successfully.");
959+
isConnected = true;
960+
break;
961+
}
901962
}
963+
902964
try {
903965
Thread.sleep(1000);
904-
}catch (Exception e){
966+
} catch (Exception e){
905967
e.printStackTrace();
906968
return;
907969
}
908-
}while (!closed_);
970+
} while (!closed_ && (tryReconnectNums == -1 || attempt < tryReconnectNums));
971+
972+
if (!closed_ && !isConnected)
973+
throw new RuntimeException("Connect to " + node.hostName + ":" + node.port + " failed after " + attempt + " reconnect attemps.");
974+
909975
if (initialScript_!=null && initialScript_.length() > 0){
910976
run(initialScript_);
911977
}

0 commit comments

Comments
 (0)