1
1
package com .xxdb ;
2
2
3
3
import java .io .*;
4
- import java .net .ConnectException ;
5
- import java .net .InetAddress ;
6
- import java .net .InetSocketAddress ;
7
- import java .net .Socket ;
8
- import java .rmi .RemoteException ;
4
+ import java .net .*;
9
5
import java .security .PublicKey ;
10
6
import java .security .cert .CertificateException ;
11
7
import java .security .cert .X509Certificate ;
@@ -460,9 +456,14 @@ private Entity run(String script, String scriptType, ProgressListener listener,
460
456
header = in_ .readLine ();
461
457
}
462
458
}catch (IOException ex ){
463
- isConnected_ = false ;
464
- socket_ = null ;
465
- throw new IOException ("Failed to read response header from the socket with IO error " + ex .getMessage ());
459
+ if (ex instanceof SocketTimeoutException ) {
460
+ // isConnected_ = true;
461
+ throw ex ;
462
+ } else {
463
+ isConnected_ = false ;
464
+ socket_ = null ;
465
+ throw new IOException ("Failed to read response header from the socket with IO error " + ex .getMessage ());
466
+ }
466
467
}
467
468
468
469
String [] headers = header .split (" " );
@@ -1062,7 +1063,7 @@ else if (type == ExceptionType.ET_NODENOTAVAIL)
1062
1063
return false ;
1063
1064
}
1064
1065
1065
- public ExceptionType parseException (String msg , Node node ){
1066
+ public ExceptionType parseException (String msg , Node node ) {
1066
1067
if (msg ==null ){
1067
1068
node .hostName = "" ;
1068
1069
node .port = 0 ;
@@ -1095,7 +1096,7 @@ public ExceptionType parseException(String msg, Node node){
1095
1096
node .hostName = "" ;
1096
1097
node .port = 0 ;
1097
1098
return ExceptionType .ET_NOINITIALIZED ;
1098
- } else if (msg .contains ("Failed to read response header from the socket with IO error Read timed out" )) {
1099
+ } else if (msg .contains ("Read timed out" )) {
1099
1100
conn_ .getNode (node );
1100
1101
return ExceptionType .ET_READTIMEDOUT ;
1101
1102
} else {
@@ -1240,9 +1241,14 @@ public Entity run(String script, ProgressListener listener, int priority, int pa
1240
1241
return new Void ();
1241
1242
else if (type == ExceptionType .ET_UNKNOW )
1242
1243
throw e ;
1243
- }else {
1244
- parseException (e .getMessage (), node );
1244
+ else if (type == ExceptionType .ET_READTIMEDOUT )
1245
+ cancelConsoleJob (enableSeqNo , currentSeqNo , e );
1246
+ } else {
1247
+ ExceptionType type = parseException (e .getMessage (), node );
1248
+ if (type == ExceptionType .ET_READTIMEDOUT )
1249
+ cancelConsoleJob (enableSeqNo , currentSeqNo , e );
1245
1250
}
1251
+
1246
1252
switchDataNode (node );
1247
1253
}
1248
1254
}
@@ -1255,6 +1261,29 @@ else if (type == ExceptionType.ET_UNKNOW)
1255
1261
}
1256
1262
}
1257
1263
1264
+ private void cancelConsoleJob (boolean enableSeqNo , long currentSeqNo , IOException e ) throws IOException {
1265
+ String cancelConsoleJobScript =
1266
+ "jobs = exec rootJobId from getConsoleJobs() where sessionId = " + conn_ .sessionID_ + "\n " +
1267
+ (conn_ .python_ ? "if size(jobs):\n " : "if (size(jobs))\n " ) +
1268
+ " cancelConsoleJob(jobs)\n " ;
1269
+ conn_ .ifUrgent_ = true ;
1270
+ // conn_.asynTask_ = true;
1271
+
1272
+ if (enableSeqNo )
1273
+ currentSeqNo = newSeqNo ();
1274
+
1275
+ try {
1276
+ conn_ .run (cancelConsoleJobScript , currentSeqNo );
1277
+ conn_ .ifUrgent_ = false ;
1278
+ } catch (IOException ioe ) {
1279
+ conn_ .ifUrgent_ = false ;
1280
+ throw new RuntimeException ("Execute cancelConsoleJob failed after current connnection read timed out. " );
1281
+ }
1282
+
1283
+ log .error (e .getMessage ());
1284
+ throw e ;
1285
+ }
1286
+
1258
1287
public Entity run (String script , ProgressListener listener , int priority , int parallelism , int fetchSize , boolean clearSessionMemory , String tableName ) throws IOException {
1259
1288
return run (script , listener , priority , parallelism , fetchSize , clearSessionMemory , tableName , true );
1260
1289
}
0 commit comments