Skip to content

Commit d2fb9f4

Browse files
author
chengyitian
committed
AJ-829: support reconnect、tryReconnectNums for MTW;
1 parent d7c3f29 commit d2fb9f4

File tree

1 file changed

+56
-13
lines changed

1 file changed

+56
-13
lines changed

src/com/xxdb/multithreadedtablewriter/MultithreadedTableWriter.java

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ public MultithreadedTableWriter(String hostName, int port, String userId, String
337337
int threadCount, String partitionCol,
338338
int[] compressTypes, Mode mode, String[] pModeOption) throws Exception{
339339
init(hostName,port,userId, password,dbName, tableName, useSSL,enableHighAvailability,highAvailabilitySites,
340-
batchSize, throttle,threadCount,partitionCol,compressTypes, mode, pModeOption, null, false);
340+
batchSize, throttle,threadCount,partitionCol,compressTypes, mode, pModeOption, null, false, false, -1);
341341
}
342342
public MultithreadedTableWriter(String hostName, int port, String userId, String password,
343343
String dbName, String tableName, boolean useSSL,
@@ -346,15 +346,15 @@ public MultithreadedTableWriter(String hostName, int port, String userId, String
346346
int threadCount, String partitionCol,
347347
int[] compressTypes) throws Exception{
348348
init(hostName,port,userId, password,dbName, tableName, useSSL,enableHighAvailability,highAvailabilitySites,
349-
batchSize, throttle,threadCount,partitionCol,compressTypes, Mode.M_Append, null, null, false);
349+
batchSize, throttle,threadCount,partitionCol,compressTypes, Mode.M_Append, null, null, false, false, -1);
350350
}
351351
public MultithreadedTableWriter(String hostName, int port, String userId, String password,
352352
String dbName, String tableName, boolean useSSL,
353353
boolean enableHighAvailability, String[] highAvailabilitySites,
354354
int batchSize, float throttle,
355355
int threadCount, String partitionCol) throws Exception{
356356
init(hostName,port,userId, password,dbName, tableName, useSSL,enableHighAvailability,highAvailabilitySites,
357-
batchSize, throttle,threadCount,partitionCol,null, Mode.M_Append, null, null, false);
357+
batchSize, throttle,threadCount,partitionCol,null, Mode.M_Append, null, null, false, false, -1);
358358
}
359359
public MultithreadedTableWriter(String hostName, int port, String userId, String password,
360360
String dbName, String tableName, boolean useSSL,
@@ -363,7 +363,7 @@ public MultithreadedTableWriter(String hostName, int port, String userId, String
363363
int threadCount, String partitionCol,
364364
int[] compressTypes, Callback callbackHandler) throws Exception{
365365
init(hostName,port,userId, password,dbName, tableName, useSSL,enableHighAvailability,highAvailabilitySites,
366-
batchSize, throttle,threadCount,partitionCol,compressTypes, Mode.M_Append, null, callbackHandler, false);
366+
batchSize, throttle,threadCount,partitionCol,compressTypes, Mode.M_Append, null, callbackHandler, false, false, -1);
367367
}
368368

369369
public MultithreadedTableWriter(String hostName, int port, String userId, String password,
@@ -374,7 +374,7 @@ public MultithreadedTableWriter(String hostName, int port, String userId, String
374374
int[] compressTypes, Mode mode, String[] pModeOption,
375375
boolean enableActualSendTime) throws Exception {
376376
init(hostName, port, userId, password, dbName, tableName, useSSL, enableHighAvailability, highAvailabilitySites,
377-
batchSize, throttle, threadCount, partitionCol, compressTypes, mode, pModeOption, null, enableActualSendTime);
377+
batchSize, throttle, threadCount, partitionCol, compressTypes, mode, pModeOption, null, enableActualSendTime, false, -1);
378378
}
379379

380380
public MultithreadedTableWriter(String hostName, int port, String userId, String password,
@@ -385,7 +385,7 @@ public MultithreadedTableWriter(String hostName, int port, String userId, String
385385
int[] compressTypes,
386386
boolean enableActualSendTime) throws Exception {
387387
init(hostName, port, userId, password, dbName, tableName, useSSL, enableHighAvailability, highAvailabilitySites,
388-
batchSize, throttle, threadCount, partitionCol, compressTypes, Mode.M_Append, null, null, enableActualSendTime);
388+
batchSize, throttle, threadCount, partitionCol, compressTypes, Mode.M_Append, null, null, enableActualSendTime, false, -1);
389389
}
390390

391391
public MultithreadedTableWriter(String hostName, int port, String userId, String password,
@@ -395,7 +395,7 @@ public MultithreadedTableWriter(String hostName, int port, String userId, String
395395
int threadCount, String partitionCol,
396396
boolean enableActualSendTime) throws Exception {
397397
init(hostName, port, userId, password, dbName, tableName, useSSL, enableHighAvailability, highAvailabilitySites,
398-
batchSize, throttle, threadCount, partitionCol, null, Mode.M_Append, null, null, enableActualSendTime);
398+
batchSize, throttle, threadCount, partitionCol, null, Mode.M_Append, null, null, enableActualSendTime, false, -1);
399399
}
400400

401401
public MultithreadedTableWriter(String hostName, int port, String userId, String password,
@@ -406,7 +406,50 @@ public MultithreadedTableWriter(String hostName, int port, String userId, String
406406
int[] compressTypes, Callback callbackHandler,
407407
boolean enableActualSendTime) throws Exception {
408408
init(hostName, port, userId, password, dbName, tableName, useSSL, enableHighAvailability, highAvailabilitySites,
409-
batchSize, throttle, threadCount, partitionCol, compressTypes, Mode.M_Append, null, callbackHandler, enableActualSendTime);
409+
batchSize, throttle, threadCount, partitionCol, compressTypes, Mode.M_Append, null, callbackHandler, enableActualSendTime, false, -1);
410+
}
411+
412+
public MultithreadedTableWriter(String hostName, int port, String userId, String password,
413+
String dbName, String tableName, boolean useSSL,
414+
boolean enableHighAvailability, String[] highAvailabilitySites,
415+
int batchSize, float throttle,
416+
int threadCount, String partitionCol,
417+
int[] compressTypes, Mode mode, String[] pModeOption,
418+
boolean enableActualSendTime, boolean reconnect, int tryReconnectNums) throws Exception {
419+
init(hostName, port, userId, password, dbName, tableName, useSSL, enableHighAvailability, highAvailabilitySites,
420+
batchSize, throttle, threadCount, partitionCol, compressTypes, mode, pModeOption, null, enableActualSendTime, reconnect, tryReconnectNums);
421+
}
422+
423+
public MultithreadedTableWriter(String hostName, int port, String userId, String password,
424+
String dbName, String tableName, boolean useSSL,
425+
boolean enableHighAvailability, String[] highAvailabilitySites,
426+
int batchSize, float throttle,
427+
int threadCount, String partitionCol,
428+
int[] compressTypes,
429+
boolean enableActualSendTime, boolean reconnect, int tryReconnectNums) throws Exception {
430+
init(hostName, port, userId, password, dbName, tableName, useSSL, enableHighAvailability, highAvailabilitySites,
431+
batchSize, throttle, threadCount, partitionCol, compressTypes, Mode.M_Append, null, null, enableActualSendTime, reconnect, tryReconnectNums);
432+
}
433+
434+
public MultithreadedTableWriter(String hostName, int port, String userId, String password,
435+
String dbName, String tableName, boolean useSSL,
436+
boolean enableHighAvailability, String[] highAvailabilitySites,
437+
int batchSize, float throttle,
438+
int threadCount, String partitionCol,
439+
boolean enableActualSendTime, boolean reconnect, int tryReconnectNums) throws Exception {
440+
init(hostName, port, userId, password, dbName, tableName, useSSL, enableHighAvailability, highAvailabilitySites,
441+
batchSize, throttle, threadCount, partitionCol, null, Mode.M_Append, null, null, enableActualSendTime, reconnect, tryReconnectNums);
442+
}
443+
444+
public MultithreadedTableWriter(String hostName, int port, String userId, String password,
445+
String dbName, String tableName, boolean useSSL,
446+
boolean enableHighAvailability, String[] highAvailabilitySites,
447+
int batchSize, float throttle,
448+
int threadCount, String partitionCol,
449+
int[] compressTypes, Callback callbackHandler,
450+
boolean enableActualSendTime, boolean reconnect, int tryReconnectNums) throws Exception {
451+
init(hostName, port, userId, password, dbName, tableName, useSSL, enableHighAvailability, highAvailabilitySites,
452+
batchSize, throttle, threadCount, partitionCol, compressTypes, Mode.M_Append, null, callbackHandler, enableActualSendTime, reconnect, tryReconnectNums);
410453
}
411454

412455
private void init(String hostName, int port, String userId, String password,
@@ -415,7 +458,7 @@ private void init(String hostName, int port, String userId, String password,
415458
int batchSize, float throttle,
416459
int threadCount, String partitionCol,
417460
int[] compressTypes, Mode mode, String[] pModeOption, Callback callbackHandler,
418-
boolean enableActualSendTime) throws Exception{
461+
boolean enableActualSendTime, boolean reconnect, int tryReconnectNums) throws Exception{
419462
dbName_=dbName;
420463
tableName_=tableName;
421464
batchSize_=batchSize;
@@ -450,7 +493,7 @@ private void init(String hostName, int port, String userId, String password,
450493
compressTypes_=new int[compressTypes.length];
451494
System.arraycopy(compressTypes,0,compressTypes_,0,compressTypes.length);
452495
}
453-
DBConnection pConn = newConn(hostName,port,userId,password,dbName,tableName,useSSL,enableHighAvailability,highAvailabilitySites,isCompress);
496+
DBConnection pConn = newConn(hostName,port,userId,password,dbName,tableName,useSSL,enableHighAvailability,highAvailabilitySites,isCompress, reconnect, tryReconnectNums);
454497
if(pConn==null){
455498
throw new RuntimeException("Failed to connect to server " + hostName + ":" + port);
456499
}
@@ -575,7 +618,7 @@ private void init(String hostName, int port, String userId, String password,
575618
// init done, start thread now.
576619
for(int i = 0; i < threadCount; i++){
577620
if (pConn == null) {
578-
pConn = newConn(hostName,port,userId,password,dbName,tableName,useSSL,enableHighAvailability,highAvailabilitySites,isCompress);
621+
pConn = newConn(hostName,port,userId,password,dbName,tableName,useSSL,enableHighAvailability,highAvailabilitySites,isCompress, reconnect, tryReconnectNums);
579622
}
580623
WriterThread writerThread = new WriterThread(this,pConn, callbackHandler);
581624
threads_.add(writerThread);
@@ -863,10 +906,10 @@ private List<Vector> createListVector(){
863906
private boolean isExiting() { return hasError_ || isExiting_; }
864907
private DBConnection newConn(String hostName, int port, String userId, String password,
865908
String dbName, String tableName, boolean useSSL,
866-
boolean enableHighAvailability, String[] highAvailabilitySites,boolean compress) throws IOException {
909+
boolean enableHighAvailability, String[] highAvailabilitySites,boolean compress, boolean reconnect, int tryReconnectNums) throws IOException {
867910
DBConnection pConn = new DBConnection(false,useSSL,compress);
868911
//String hostName, int port, String userId, String password, String initialScript, boolean enableHighAvailability, String[] highAvailabilitySites
869-
boolean ret = pConn.connect(hostName, port, userId, password, null,enableHighAvailability,highAvailabilitySites);
912+
boolean ret = pConn.connect(hostName, port, userId, password, null,enableHighAvailability,highAvailabilitySites, reconnect, tryReconnectNums);
870913
if (!ret)
871914
return null;
872915
return pConn;

0 commit comments

Comments
 (0)