1
1
package com .xxdb ;
2
2
3
3
import java .io .*;
4
- import java .net .ConnectException ;
5
- import java .net .InetAddress ;
6
- import java .net .InetSocketAddress ;
7
- import java .net .Socket ;
8
- import java .rmi .RemoteException ;
4
+ import java .net .*;
9
5
import java .security .PublicKey ;
10
6
import java .security .cert .CertificateException ;
11
7
import java .security .cert .X509Certificate ;
@@ -50,6 +46,8 @@ public class DBConnection {
50
46
private List <Node > nodes_ = new ArrayList <>();
51
47
private Random nodeRandom_ = new Random ();
52
48
private int connTimeout_ = 0 ;
49
+ private int connectTimeout_ = 0 ;
50
+ private int readTimeout_ = 0 ;
53
51
private boolean closed_ = false ;
54
52
private boolean loadBalance_ = false ;
55
53
private String runClientId_ = null ;
@@ -66,7 +64,9 @@ private enum ExceptionType{
66
64
ET_NEWLEADER (2 ),
67
65
ET_NODENOTAVAIL (3 ),
68
66
ET_NOINITIALIZED (4 ),
69
- ET_NOTLEADER (5 );
67
+ ET_NOTLEADER (5 ),
68
+ ET_READTIMEDOUT (6 ),
69
+ ET_NORESPONSEHEADER (7 );
70
70
71
71
public int value ;
72
72
ExceptionType (int value ){
@@ -147,6 +147,8 @@ private class DBConnectionImpl{
147
147
private boolean compress_ = false ;
148
148
private boolean ifUrgent_ = false ;
149
149
private int connTimeout_ = 0 ;
150
+ private int connectTimeout_ = 0 ;
151
+ private int readTimeout_ = 0 ;
150
152
private ExtendedDataInput in_ ;
151
153
private ExtendedDataOutput out_ ;
152
154
private boolean remoteLittleEndian_ ;
@@ -168,35 +170,49 @@ private DBConnectionImpl(boolean asynTask, boolean sslEnable, boolean compress,
168
170
this .lock_ = new ReentrantLock ();
169
171
}
170
172
171
- private boolean connect (String hostName , int port , String userId , String password , int connTimeout ) throws IOException {
173
+ private boolean connect (String hostName , int port , String userId , String password , int connTimeout , int connectTimeout , int readTimeout ) throws IOException {
172
174
this .hostName_ = hostName ;
173
175
this .port_ = port ;
174
176
this .userId_ = userId ;
175
177
this .pwd_ = password ;
176
178
this .connTimeout_ = connTimeout ;
179
+ this .connectTimeout_ = connectTimeout ;
180
+ this .readTimeout_ = readTimeout ;
177
181
return connect ();
178
182
}
179
183
180
- private boolean connect ()throws IOException {
184
+ private boolean connect () throws IOException {
181
185
this .isConnected_ = false ;
182
186
183
187
try {
184
- if (sslEnable_ )
188
+ if (sslEnable_ )
185
189
socket_ = getSSLSocketFactory ().createSocket ();
186
190
else
187
191
socket_ = new Socket ();
188
- if (this .connTimeout_ > 0 ){
189
- socket_ .connect (new InetSocketAddress (hostName_ ,port_ ), connTimeout_ );
190
- }else {
191
- socket_ .connect (new InetSocketAddress (hostName_ ,port_ ), 3000 );
192
- }
193
- } catch (ConnectException ex ) {
192
+
193
+ // set 'connectTimeout' param to connect()
194
+ if (this .connTimeout_ > 0 && this .connectTimeout_ == 0 )
195
+ socket_ .connect (new InetSocketAddress (hostName_ , port_ ), connTimeout_ );
196
+ else if (this .connTimeout_ > 0 && this .connectTimeout_ > 0 )
197
+ socket_ .connect (new InetSocketAddress (hostName_ , port_ ), connectTimeout_ );
198
+ else if (this .connTimeout_ == 0 && this .connectTimeout_ > 0 )
199
+ socket_ .connect (new InetSocketAddress (hostName_ , port_ ), connectTimeout_ );
200
+ else if (this .connTimeout_ == 0 && this .connectTimeout_ == 0 )
201
+ socket_ .connect (new InetSocketAddress (hostName_ , port_ ), 3000 );
202
+ } catch (IOException ex ) {
194
203
log .error ("Connect to " + this .hostName_ + ":" + this .port_ + " failed." );
195
204
throw ex ;
196
205
}
197
- if (this .connTimeout_ > 0 ) {
206
+
207
+ // set 'readTimeout' param to setSoTimeout
208
+ if (this .connTimeout_ > 0 && this .readTimeout_ == 0 )
198
209
socket_ .setSoTimeout (this .connTimeout_ );
199
- }
210
+ else if (this .connTimeout_ > 0 && this .readTimeout_ > 0 )
211
+ socket_ .setSoTimeout (this .readTimeout_ );
212
+ else if (this .connTimeout_ == 0 && this .readTimeout_ > 0 )
213
+ socket_ .setSoTimeout (this .readTimeout_ );
214
+
215
+
200
216
socket_ .setKeepAlive (true );
201
217
socket_ .setTcpNoDelay (true );
202
218
out_ = new LittleEndianDataOutputStream (new BufferedOutputStream (socket_ .getOutputStream ()));
@@ -441,9 +457,14 @@ private Entity run(String script, String scriptType, ProgressListener listener,
441
457
header = in_ .readLine ();
442
458
}
443
459
}catch (IOException ex ){
444
- isConnected_ = false ;
445
- socket_ = null ;
446
- throw new IOException ("Failed to read response header from the socket with IO error " + ex .getMessage ());
460
+ if (ex instanceof SocketTimeoutException ) {
461
+ // isConnected_ = true;
462
+ throw ex ;
463
+ } else {
464
+ isConnected_ = false ;
465
+ socket_ = null ;
466
+ throw new IOException ("Failed to read response header from the socket with IO error " + ex .getMessage ());
467
+ }
447
468
}
448
469
449
470
String [] headers = header .split (" " );
@@ -664,16 +685,49 @@ public boolean connect(String hostName, int port, int timeout) throws IOExceptio
664
685
return connect (hostName , port , "" , "" , null , false , null );
665
686
}
666
687
688
+ public boolean connect (String hostName , int port , int connectTimeout , int readTimeout ) throws IOException {
689
+ if (connectTimeout < 0 || readTimeout < 0 ) {
690
+ log .error ("The param connectTimeout or readTimeout cannot less than zero." );
691
+ return false ;
692
+ }
693
+
694
+ this .connectTimeout_ = connectTimeout ;
695
+ this .readTimeout_ = readTimeout ;
696
+ return connect (hostName , port , "" , "" , null , false , null );
697
+ }
698
+
667
699
public boolean connect (String hostName , int port , int timeout , boolean reconnect ) throws IOException {
668
700
this .connTimeout_ = timeout ;
669
701
return connect (hostName , port , "" , "" , null , false , null , reconnect );
670
702
}
671
703
704
+ public boolean connect (String hostName , int port , int connectTimeout , int readTimeout , boolean reconnect ) throws IOException {
705
+ if (connectTimeout < 0 || readTimeout < 0 ) {
706
+ log .error ("The param connectTimeout or readTimeout cannot less than zero." );
707
+ return false ;
708
+ }
709
+
710
+ this .connectTimeout_ = connectTimeout ;
711
+ this .readTimeout_ = readTimeout ;
712
+ return connect (hostName , port , "" , "" , null , false , null , reconnect );
713
+ }
714
+
672
715
public boolean connect (String hostName , int port , int timeout , boolean reconnect , int tryReconnectNums ) throws IOException {
673
716
this .connTimeout_ = timeout ;
674
717
return connect (hostName , port , "" , "" , null , false , null , reconnect , tryReconnectNums );
675
718
}
676
719
720
+ public boolean connect (String hostName , int port , int connectTimeout , int readTimeout , boolean reconnect , int tryReconnectNums ) throws IOException {
721
+ if (connectTimeout < 0 || readTimeout < 0 ) {
722
+ log .error ("The param connectTimeout or readTimeout cannot less than zero." );
723
+ return false ;
724
+ }
725
+
726
+ this .connectTimeout_ = connectTimeout ;
727
+ this .readTimeout_ = readTimeout ;
728
+ return connect (hostName , port , "" , "" , null , false , null , reconnect , tryReconnectNums );
729
+ }
730
+
677
731
public boolean connect (String hostName , int port , String initialScript ) throws IOException {
678
732
return connect (hostName , port , "" , "" , initialScript , false , null );
679
733
}
@@ -900,6 +954,7 @@ public boolean connect(String hostName, int port, String userId, String password
900
954
}
901
955
} else {
902
956
if (reconnect ) {
957
+ nodes_ .clear ();
903
958
nodes_ .add (new Node (hostName , port ));
904
959
switchDataNode (new Node (hostName , port ));
905
960
} else {
@@ -938,7 +993,8 @@ public void switchDataNode(Node node) throws IOException{
938
993
attempt ++;
939
994
if (node .hostName != null && node .hostName .length () > 0 ) {
940
995
if (connectNode (node )) {
941
- log .info ("Switch to node: " + node .hostName + ":" + node .port + " successfully." );
996
+ if (nodes_ .size () > 1 )
997
+ log .info ("Switch to node: " + node .hostName + ":" + node .port + " successfully." );
942
998
isConnected = true ;
943
999
break ;
944
1000
}
@@ -974,12 +1030,12 @@ public void switchDataNode(Node node) throws IOException{
974
1030
}
975
1031
}
976
1032
977
- public boolean connectNode (Node node ) throws IOException {
1033
+ public boolean connectNode (Node node ) throws IOException {
978
1034
log .info ("Connect to " + node .hostName + ":" + node .port + "." );
979
1035
while (!closed_ ){
980
1036
try {
981
- return conn_ .connect (node .hostName , node .port , uid_ , pwd_ , connTimeout_ );
982
- }catch (Exception e ){
1037
+ return conn_ .connect (node .hostName , node .port , uid_ , pwd_ , connTimeout_ , connectTimeout_ , readTimeout_ );
1038
+ } catch (Exception e ) {
983
1039
if (isConnected ()){
984
1040
Node tmpNode = new Node ();
985
1041
tmpNode .hostName = node .hostName ;
@@ -993,11 +1049,13 @@ else if (type == ExceptionType.ET_NODENOTAVAIL)
993
1049
else
994
1050
throw e ;
995
1051
}
996
- }else {
997
- log .error (e .getMessage ());
1052
+ } else {
1053
+ if (Objects .nonNull (e .getMessage ()))
1054
+ log .error (e .getMessage ());
998
1055
return false ;
999
1056
}
1000
1057
}
1058
+
1001
1059
try {
1002
1060
Thread .sleep (100 );
1003
1061
}catch (Exception e ){
@@ -1008,8 +1066,7 @@ else if (type == ExceptionType.ET_NODENOTAVAIL)
1008
1066
return false ;
1009
1067
}
1010
1068
1011
- public ExceptionType parseException (String msg , Node node ){
1012
- log .info ("com.xxdb.DBConnection.parseException msg: " + msg );
1069
+ public ExceptionType parseException (String msg , Node node ) {
1013
1070
if (msg ==null ){
1014
1071
node .hostName = "" ;
1015
1072
node .port = 0 ;
@@ -1042,7 +1099,13 @@ public ExceptionType parseException(String msg, Node node){
1042
1099
node .hostName = "" ;
1043
1100
node .port = 0 ;
1044
1101
return ExceptionType .ET_NOINITIALIZED ;
1045
- }else {
1102
+ } else if (msg .contains ("Read timed out" )) {
1103
+ conn_ .getNode (node );
1104
+ return ExceptionType .ET_READTIMEDOUT ;
1105
+ } else if (msg .contains ("Failed to read response header from the socket with IO error null" )) {
1106
+ conn_ .getNode (node );
1107
+ return ExceptionType .ET_NORESPONSEHEADER ;
1108
+ } else {
1046
1109
node .hostName = "" ;
1047
1110
node .port = 0 ;
1048
1111
return ExceptionType .ET_UNKNOW ;
@@ -1184,9 +1247,14 @@ public Entity run(String script, ProgressListener listener, int priority, int pa
1184
1247
return new Void ();
1185
1248
else if (type == ExceptionType .ET_UNKNOW )
1186
1249
throw e ;
1187
- }else {
1188
- parseException (e .getMessage (), node );
1250
+ else if (type == ExceptionType .ET_READTIMEDOUT )
1251
+ cancelConsoleJob (enableSeqNo , currentSeqNo , e );
1252
+ } else {
1253
+ ExceptionType type = parseException (e .getMessage (), node );
1254
+ if (type == ExceptionType .ET_READTIMEDOUT )
1255
+ cancelConsoleJob (enableSeqNo , currentSeqNo , e );
1189
1256
}
1257
+
1190
1258
switchDataNode (node );
1191
1259
}
1192
1260
}
@@ -1199,6 +1267,29 @@ else if (type == ExceptionType.ET_UNKNOW)
1199
1267
}
1200
1268
}
1201
1269
1270
+ private void cancelConsoleJob (boolean enableSeqNo , long currentSeqNo , IOException e ) throws IOException {
1271
+ String cancelConsoleJobScript =
1272
+ "jobs = exec rootJobId from getConsoleJobs() where sessionId = " + conn_ .sessionID_ + "\n " +
1273
+ (conn_ .python_ ? "if size(jobs):\n " : "if (size(jobs))\n " ) +
1274
+ " cancelConsoleJob(jobs)\n " ;
1275
+ conn_ .ifUrgent_ = true ;
1276
+ // conn_.asynTask_ = true;
1277
+
1278
+ if (enableSeqNo )
1279
+ currentSeqNo = newSeqNo ();
1280
+
1281
+ try {
1282
+ conn_ .run (cancelConsoleJobScript , currentSeqNo );
1283
+ conn_ .ifUrgent_ = false ;
1284
+ } catch (IOException ioe ) {
1285
+ conn_ .ifUrgent_ = false ;
1286
+ throw new RuntimeException ("Execute cancelConsoleJob failed after current connnection read timed out. " );
1287
+ }
1288
+
1289
+ log .error (e .getMessage ());
1290
+ throw e ;
1291
+ }
1292
+
1202
1293
public Entity run (String script , ProgressListener listener , int priority , int parallelism , int fetchSize , boolean clearSessionMemory , String tableName ) throws IOException {
1203
1294
return run (script , listener , priority , parallelism , fetchSize , clearSessionMemory , tableName , true );
1204
1295
}
0 commit comments