@@ -16,12 +16,12 @@ public class EventHandler {
16
16
private Map <String , EventInfo > eventInfos ;
17
17
private boolean isNeedEventTime ;
18
18
private int outputColNums ;
19
- private int commonKeySize ;
19
+ private int commonFieldsSize ;
20
20
21
- public EventHandler (List <EventSchema > eventSchemas , List <String > eventTimeKeys , List <String > commonKeys ) {
21
+ public EventHandler (List <EventSchema > eventSchemas , List <String > eventTimeFields , List <String > commonFields ) {
22
22
this .isNeedEventTime = false ;
23
23
this .outputColNums = 0 ;
24
- this .commonKeySize = 0 ;
24
+ this .commonFieldsSize = 0 ;
25
25
this .eventInfos = new HashMap <>();
26
26
27
27
String funcName = "createEventSender" ;
@@ -40,7 +40,7 @@ public EventHandler(List<EventSchema> eventSchemas, List<String> eventTimeKeys,
40
40
if (Utils .isEmpty (fieldName ))
41
41
throw new IllegalArgumentException ("fieldName must be non-null and non-empty." );
42
42
43
- // check if has duplicate key in fieldName
43
+ // check if has duplicate field in fieldName
44
44
if (!set .add (fieldName ))
45
45
throw new IllegalArgumentException ("EventSchema cannot has duplicated fieldName in fieldNames." );
46
46
}
@@ -54,15 +54,14 @@ public EventHandler(List<EventSchema> eventSchemas, List<String> eventTimeKeys,
54
54
}
55
55
56
56
int length = event .getFieldNames ().size ();
57
- if (event .getFieldExtraParams ().isEmpty ()) {
57
+ if (event .getFieldExtraParams ().isEmpty ())
58
58
event .setFieldExtraParams (Collections .nCopies (length , 0 ));
59
- }
60
- if (length == 0 ) {
61
- throw new IllegalArgumentException ("eventKey in eventSchema must be non-empty." );
62
- }
63
- if ((!event .getFieldExtraParams ().isEmpty () && length != event .getFieldExtraParams ().size ()) || length != event .getFieldForms ().size () || length != event .getFieldTypes ().size ()) {
64
- throw new IllegalArgumentException ("the number of eventKey, eventTypes, eventForms and eventExtraParams (if set) must have the same length." );
65
- }
59
+
60
+ if (length == 0 )
61
+ throw new IllegalArgumentException ("fieldName in eventSchema must be non-empty." );
62
+
63
+ if ((!event .getFieldExtraParams ().isEmpty () && length != event .getFieldExtraParams ().size ()) || length != event .getFieldForms ().size () || length != event .getFieldTypes ().size ())
64
+ throw new IllegalArgumentException ("the number of fieldName, fieldTypes, fieldForms and fieldExtraParams (if set) must have the same length." );
66
65
67
66
// check if fieldExtraParams valid
68
67
if (Objects .nonNull (event .getFieldExtraParams ()) && !event .getFieldExtraParams ().isEmpty ()) {
@@ -80,31 +79,30 @@ else if (fieldType == Entity.DATA_TYPE.DT_DECIMAL128 && (scale < 0 || scale > 38
80
79
}
81
80
int eventNum = eventSchemas .size ();
82
81
83
- // check eventTimeKeys
84
- List <String > expandTimeKeys = new ArrayList <>();
85
- if (!eventTimeKeys .isEmpty ()) {
86
- // if eventTimeKeys only contain one element, it means every event has this key
87
- if (eventTimeKeys .size () == 1 ) {
88
- expandTimeKeys = Collections .nCopies (eventNum , eventTimeKeys .get (0 ));
82
+ // check eventTimeFields
83
+ List <String > expandTimeFields = new ArrayList <>();
84
+ if (!eventTimeFields .isEmpty ()) {
85
+ // if eventTimeFields only contain one element, it means every event has this field
86
+ if (eventTimeFields .size () == 1 ) {
87
+ expandTimeFields = Collections .nCopies (eventNum , eventTimeFields .get (0 ));
89
88
} else {
90
- if (eventTimeKeys .size () != eventNum ) {
91
- throw new IllegalArgumentException (funcName + "the number of eventTimeKey is inconsistent with the number of events in eventSchemas." );
92
- }
93
- expandTimeKeys = new ArrayList <>(eventTimeKeys );
89
+ if (eventTimeFields .size () != eventNum )
90
+ throw new IllegalArgumentException (funcName + "the number of eventTimeField is inconsistent with the number of events in eventSchemas." );
91
+ expandTimeFields = new ArrayList <>(eventTimeFields );
94
92
}
95
93
isNeedEventTime = true ;
96
94
}
97
95
98
96
// prepare eventInfos
99
97
StringBuilder errMsg = new StringBuilder ();
100
- if (!checkSchema (expandEventSchemas , expandTimeKeys , commonKeys , errMsg ))
98
+ if (!checkSchema (expandEventSchemas , expandTimeFields , commonFields , errMsg ))
101
99
throw new IllegalArgumentException (errMsg .toString ());
102
100
103
- this .commonKeySize = commonKeys .size ();
101
+ this .commonFieldsSize = commonFields .size ();
104
102
}
105
103
106
104
public boolean checkInputTable (String tableName , BasicTable outputTable , StringBuilder errMsg ) {
107
- outputColNums = isNeedEventTime ? (3 + commonKeySize ) : (2 + commonKeySize );
105
+ outputColNums = isNeedEventTime ? (3 + commonFieldsSize ) : (2 + commonFieldsSize );
108
106
if (outputColNums != outputTable .columns ()) {
109
107
errMsg .append ("Incompatible " )
110
108
.append (tableName )
@@ -117,7 +115,7 @@ public boolean checkInputTable(String tableName, BasicTable outputTable, StringB
117
115
int colIdx = 0 ;
118
116
if (this .isNeedEventTime ) {
119
117
if (Entity .typeToCategory (outputTable .getColumn (0 ).getDataType ()) != Entity .DATA_CATEGORY .TEMPORAL ) {
120
- errMsg .append ("First column of outputTable should be temporal if specified eventTimeKey ." );
118
+ errMsg .append ("First column of outputTable should be temporal if specified eventTimeField ." );
121
119
return false ;
122
120
}
123
121
colIdx ++;
@@ -223,7 +221,7 @@ public boolean serializeEvent(String eventType, List<Entity> attributes, List<En
223
221
throw new RuntimeException (e );
224
222
}
225
223
226
- for (int commonIndex : info .getEventSchema ().getCommonKeyIndex ()) {
224
+ for (int commonIndex : info .getEventSchema ().getCommonFieldIndex ()) {
227
225
try {
228
226
serializedEvent .add (attributes .get (commonIndex ));
229
227
} catch (Exception e ) {
@@ -300,7 +298,7 @@ public boolean deserializeEvent(List<IMessage> msgs, List<String> eventTypes, Li
300
298
return true ;
301
299
}
302
300
303
- private boolean checkSchema (List <EventSchema > eventSchemas , List <String > expandTimeKeys , List <String > commonKeys , StringBuilder errMsg ) {
301
+ private boolean checkSchema (List <EventSchema > eventSchemas , List <String > expandTimeFields , List <String > commonFields , StringBuilder errMsg ) {
304
302
int index = 0 ;
305
303
for (EventSchema schema : eventSchemas ) {
306
304
if (eventInfos .containsKey (schema .getEventType ())) {
@@ -312,21 +310,21 @@ private boolean checkSchema(List<EventSchema> eventSchemas, List<String> expandT
312
310
schemaEx .setSchema (schema );
313
311
314
312
if (isNeedEventTime ) {
315
- int timeIndex = schema .getFieldNames ().indexOf (expandTimeKeys .get (index ));
313
+ int timeIndex = schema .getFieldNames ().indexOf (expandTimeFields .get (index ));
316
314
if (timeIndex == -1 ) {
317
- errMsg .append ("Event " ).append (schema .getEventType ()).append (" doesn't contain eventTimeKey " ).append (expandTimeKeys .get (index )).append ("." );
315
+ errMsg .append ("Event " ).append (schema .getEventType ()).append (" doesn't contain eventTimeField " ).append (expandTimeFields .get (index )).append ("." );
318
316
return false ;
319
317
}
320
318
schemaEx .setTimeIndex (timeIndex );
321
319
}
322
320
323
- for (String commonKey : commonKeys ) {
324
- int commonKeyIndex = schema .getFieldNames ().indexOf (commonKey );
325
- if (commonKeyIndex == -1 ) {
326
- errMsg .append ("Event " ).append (schema .getEventType ()).append (" doesn't contain commonKey " ).append (commonKey );
321
+ for (String commonField : commonFields ) {
322
+ int commonFieldIndex = schema .getFieldNames ().indexOf (commonField );
323
+ if (commonFieldIndex == -1 ) {
324
+ errMsg .append ("Event " ).append (schema .getEventType ()).append (" doesn't contain commonField " ).append (commonField );
327
325
return false ;
328
326
}
329
- schemaEx .getCommonKeyIndex ().add (commonKeyIndex );
327
+ schemaEx .getCommonFieldIndex ().add (commonFieldIndex );
330
328
}
331
329
332
330
List <AttributeSerializer > serls = new ArrayList <>();
0 commit comments