5
5
import com .xxdb .data .*;
6
6
import com .xxdb .route .Domain ;
7
7
import com .xxdb .route .DomainFactory ;
8
+ import org .slf4j .LoggerFactory ;
8
9
import java .io .IOException ;
9
10
import java .time .LocalDateTime ;
10
11
import java .util .ArrayList ;
11
12
import java .util .List ;
12
13
import java .util .Objects ;
13
- import java .util .logging .Logger ;
14
-
15
14
import static com .xxdb .data .Entity .DATA_TYPE .*;
16
15
17
16
public class MultithreadedTableWriter {
18
- private Logger logger_ = Logger .getLogger (getClass (). getName () );
17
+ private static final org . slf4j . Logger log = LoggerFactory .getLogger (MultithreadedTableWriter . class );
19
18
public static class ThreadStatus {
20
19
public long threadId ;
21
20
public long sentRows , unsentRows , sendFailedRows ;
@@ -173,7 +172,7 @@ boolean writeAllData(){
173
172
writeTable = new BasicTable (colNames , items );
174
173
}catch (Exception e ){
175
174
e .printStackTrace ();
176
- tableWriter_ .logger_ . warning ("threadid=" + writeThread_ .getId () + " sendindex=" + sentRows_ + " create table error: " + e );
175
+ tableWriter_ .log . warn ("threadid=" + writeThread_ .getId () + " sendindex=" + sentRows_ + " create table error: " + e );
177
176
tableWriter_ .setError (ErrorCodeInfo .Code .EC_Server , "Failed to createTable: " + e );
178
177
isWriteDone = false ;
179
178
}
@@ -200,7 +199,7 @@ boolean writeAllData(){
200
199
sentRows_ += addRowCount ;
201
200
} catch (Exception e ) {
202
201
e .printStackTrace ();
203
- tableWriter_ .logger_ . warning ("threadid=" + writeThread_ .getId () + " sendindex=" + sentRows_ + " Save table error: " + e + " script:" + runscript );
202
+ tableWriter_ .log . warn ("threadid=" + writeThread_ .getId () + " sendindex=" + sentRows_ + " Save table error: " + e + " script:" + runscript );
204
203
tableWriter_ .setError (ErrorCodeInfo .Code .EC_Server , "Failed to save the inserted data: " + e + " script: " + runscript );
205
204
isWriteDone = false ;
206
205
tableWriter_ .hasError_ = true ;
@@ -770,7 +769,7 @@ public ErrorCodeInfo insert(Object... args){
770
769
(enableActualSendTime_ && args .length != colInfos_ .length - 1 )) {
771
770
return new ErrorCodeInfo (ErrorCodeInfo .Code .EC_InvalidParameter , "Column counts don't match." );
772
771
}
773
- try {
772
+
774
773
List <Entity > prow =new ArrayList <>();
775
774
int colindex = 0 ;
776
775
Entity .DATA_TYPE dataType ;
@@ -779,25 +778,43 @@ public ErrorCodeInfo insert(Object... args){
779
778
dataType = colInfos_ [colindex ].type_ ;
780
779
Entity entity ;
781
780
isAllNull = false ;
782
- entity = BasicEntityFactory .createScalar (dataType , one , colInfos_ [colindex ].extra_ );
781
+ try {
782
+ entity = BasicEntityFactory .createScalar (dataType , one , colInfos_ [colindex ].extra_ );
783
+ } catch (Exception e ) {
784
+ String errorMsg = "Invalid object error when create scalar for column '" + colInfos_ [colindex ].name_ + "': " + e .getMessage ();
785
+ log .error (errorMsg );
786
+ return new ErrorCodeInfo (ErrorCodeInfo .Code .EC_InvalidObject , errorMsg );
787
+ }
788
+
783
789
if (entity == null ) {
784
790
return new ErrorCodeInfo (ErrorCodeInfo .Code .EC_InvalidObject , "Data conversion error: " + dataType );
785
791
}
792
+
786
793
prow .add (entity );
787
794
colindex ++;
788
795
}
796
+
789
797
if (enableActualSendTime_ ) {
790
798
dataType = colInfos_ [colindex ].type_ ;
791
799
if (dataType != DT_NANOTIMESTAMP )
792
800
return new ErrorCodeInfo (ErrorCodeInfo .Code .EC_InvalidObject , String .format ("Data type error: %s,should be NANOTIMESTAMP." , dataType ));
793
801
Entity entity ;
794
802
isAllNull = false ;
795
- entity = BasicEntityFactory .createScalar (dataType , null , colInfos_ [colindex ].extra_ );
803
+ try {
804
+ entity = BasicEntityFactory .createScalar (dataType , null , colInfos_ [colindex ].extra_ );
805
+ } catch (Exception e ) {
806
+ String errorMsg = "Invalid object error when create scalar for column '" + colInfos_ [colindex ].name_ + "': " + e .getMessage ();
807
+ log .error (errorMsg );
808
+ return new ErrorCodeInfo (ErrorCodeInfo .Code .EC_InvalidObject , errorMsg );
809
+ }
810
+
796
811
prow .add (entity );
797
812
}
798
- if (isAllNull ){
813
+
814
+ try {
815
+ if (isAllNull )
799
816
return new ErrorCodeInfo (ErrorCodeInfo .Code .EC_InvalidObject , "Can't insert a Null row." );
800
- }
817
+
801
818
int threadindex ;
802
819
if (threads_ .size () > 1 ){
803
820
if (isPartionedTable_ ){
@@ -820,7 +837,7 @@ public ErrorCodeInfo insert(Object... args){
820
837
insertThreadWrite (threadindex , prow );
821
838
return new ErrorCodeInfo ();
822
839
}catch (Exception e ){
823
- e . printStackTrace ( );
840
+ log . error ( e . getMessage () );
824
841
return new ErrorCodeInfo (ErrorCodeInfo .Code .EC_InvalidObject , "Invalid object error " + e );
825
842
}
826
843
}
0 commit comments