Skip to content

Commit 3b7d5d3

Browse files
committed
Merge branch 'dev' of https://dolphindb.net/dolphindb/api-java into dev
2 parents ed34ef7 + d638135 commit 3b7d5d3

File tree

4 files changed

+41
-43
lines changed

4 files changed

+41
-43
lines changed

src/com/xxdb/streaming/client/cep/EventClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ public class EventClient extends AbstractClient {
2828

2929
private static final Logger log = LoggerFactory.getLogger(DBConnection.class);
3030

31-
public EventClient(List<EventSchema> eventSchemas, List<String> eventTimeKeys, List<String> commonKeys) throws SocketException {
31+
public EventClient(List<EventSchema> eventSchemas, List<String> eventTimeFields, List<String> commonFields) throws SocketException {
3232
super(0);
33-
eventHandler = new EventHandler(eventSchemas, eventTimeKeys, commonKeys);
33+
eventHandler = new EventHandler(eventSchemas, eventTimeFields, commonFields);
3434
}
3535

3636
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, String userName, String password) throws IOException {

src/com/xxdb/streaming/client/cep/EventHandler.java

Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ public class EventHandler {
1616
private Map<String, EventInfo> eventInfos;
1717
private boolean isNeedEventTime;
1818
private int outputColNums;
19-
private int commonKeySize;
19+
private int commonFieldsSize;
2020

21-
public EventHandler(List<EventSchema> eventSchemas, List<String> eventTimeKeys, List<String> commonKeys) {
21+
public EventHandler(List<EventSchema> eventSchemas, List<String> eventTimeFields, List<String> commonFields) {
2222
this.isNeedEventTime = false;
2323
this.outputColNums = 0;
24-
this.commonKeySize = 0;
24+
this.commonFieldsSize = 0;
2525
this.eventInfos = new HashMap<>();
2626

2727
String funcName = "createEventSender";
@@ -40,7 +40,7 @@ public EventHandler(List<EventSchema> eventSchemas, List<String> eventTimeKeys,
4040
if (Utils.isEmpty(fieldName))
4141
throw new IllegalArgumentException("fieldName must be non-null and non-empty.");
4242

43-
// check if has duplicate key in fieldName
43+
// check if has duplicate field in fieldName
4444
if (!set.add(fieldName))
4545
throw new IllegalArgumentException("EventSchema cannot has duplicated fieldName in fieldNames.");
4646
}
@@ -54,15 +54,14 @@ public EventHandler(List<EventSchema> eventSchemas, List<String> eventTimeKeys,
5454
}
5555

5656
int length = event.getFieldNames().size();
57-
if (event.getFieldExtraParams().isEmpty()) {
57+
if (event.getFieldExtraParams().isEmpty())
5858
event.setFieldExtraParams(Collections.nCopies(length, 0));
59-
}
60-
if (length == 0) {
61-
throw new IllegalArgumentException("eventKey in eventSchema must be non-empty.");
62-
}
63-
if ((!event.getFieldExtraParams().isEmpty() && length != event.getFieldExtraParams().size()) || length != event.getFieldForms().size() || length != event.getFieldTypes().size()) {
64-
throw new IllegalArgumentException("the number of eventKey, eventTypes, eventForms and eventExtraParams (if set) must have the same length.");
65-
}
59+
60+
if (length == 0)
61+
throw new IllegalArgumentException("fieldName in eventSchema must be non-empty.");
62+
63+
if ((!event.getFieldExtraParams().isEmpty() && length != event.getFieldExtraParams().size()) || length != event.getFieldForms().size() || length != event.getFieldTypes().size())
64+
throw new IllegalArgumentException("the number of fieldName, fieldTypes, fieldForms and fieldExtraParams (if set) must have the same length.");
6665

6766
// check if fieldExtraParams valid
6867
if (Objects.nonNull(event.getFieldExtraParams()) && !event.getFieldExtraParams().isEmpty()) {
@@ -80,31 +79,30 @@ else if (fieldType == Entity.DATA_TYPE.DT_DECIMAL128 && (scale < 0 || scale > 38
8079
}
8180
int eventNum = eventSchemas.size();
8281

83-
// check eventTimeKeys
84-
List<String> expandTimeKeys = new ArrayList<>();
85-
if (!eventTimeKeys.isEmpty()) {
86-
// if eventTimeKeys only contain one element, it means every event has this key
87-
if (eventTimeKeys.size() == 1) {
88-
expandTimeKeys = Collections.nCopies(eventNum, eventTimeKeys.get(0));
82+
// check eventTimeFields
83+
List<String> expandTimeFields = new ArrayList<>();
84+
if (!eventTimeFields.isEmpty()) {
85+
// if eventTimeFields only contain one element, it means every event has this field
86+
if (eventTimeFields.size() == 1) {
87+
expandTimeFields = Collections.nCopies(eventNum, eventTimeFields.get(0));
8988
} else {
90-
if (eventTimeKeys.size() != eventNum) {
91-
throw new IllegalArgumentException(funcName + "the number of eventTimeKey is inconsistent with the number of events in eventSchemas.");
92-
}
93-
expandTimeKeys = new ArrayList<>(eventTimeKeys);
89+
if (eventTimeFields.size() != eventNum)
90+
throw new IllegalArgumentException(funcName + "the number of eventTimeField is inconsistent with the number of events in eventSchemas.");
91+
expandTimeFields = new ArrayList<>(eventTimeFields);
9492
}
9593
isNeedEventTime = true;
9694
}
9795

9896
// prepare eventInfos
9997
StringBuilder errMsg = new StringBuilder();
100-
if (!checkSchema(expandEventSchemas, expandTimeKeys, commonKeys, errMsg))
98+
if (!checkSchema(expandEventSchemas, expandTimeFields, commonFields, errMsg))
10199
throw new IllegalArgumentException(errMsg.toString());
102100

103-
this.commonKeySize = commonKeys.size();
101+
this.commonFieldsSize = commonFields.size();
104102
}
105103

106104
public boolean checkInputTable(String tableName, BasicTable outputTable, StringBuilder errMsg) {
107-
outputColNums = isNeedEventTime ? (3 + commonKeySize) : (2 + commonKeySize);
105+
outputColNums = isNeedEventTime ? (3 + commonFieldsSize) : (2 + commonFieldsSize);
108106
if (outputColNums != outputTable.columns()) {
109107
errMsg.append("Incompatible ")
110108
.append(tableName)
@@ -117,7 +115,7 @@ public boolean checkInputTable(String tableName, BasicTable outputTable, StringB
117115
int colIdx = 0;
118116
if (this.isNeedEventTime) {
119117
if (Entity.typeToCategory(outputTable.getColumn(0).getDataType()) != Entity.DATA_CATEGORY.TEMPORAL) {
120-
errMsg.append("First column of outputTable should be temporal if specified eventTimeKey.");
118+
errMsg.append("First column of outputTable should be temporal if specified eventTimeField.");
121119
return false;
122120
}
123121
colIdx++;
@@ -223,7 +221,7 @@ public boolean serializeEvent(String eventType, List<Entity> attributes, List<En
223221
throw new RuntimeException(e);
224222
}
225223

226-
for (int commonIndex : info.getEventSchema().getCommonKeyIndex()) {
224+
for (int commonIndex : info.getEventSchema().getCommonFieldIndex()) {
227225
try {
228226
serializedEvent.add(attributes.get(commonIndex));
229227
} catch (Exception e) {
@@ -300,7 +298,7 @@ public boolean deserializeEvent(List<IMessage> msgs, List<String> eventTypes, Li
300298
return true;
301299
}
302300

303-
private boolean checkSchema(List<EventSchema> eventSchemas, List<String> expandTimeKeys, List<String> commonKeys, StringBuilder errMsg) {
301+
private boolean checkSchema(List<EventSchema> eventSchemas, List<String> expandTimeFields, List<String> commonFields, StringBuilder errMsg) {
304302
int index = 0;
305303
for (EventSchema schema : eventSchemas) {
306304
if (eventInfos.containsKey(schema.getEventType())) {
@@ -312,21 +310,21 @@ private boolean checkSchema(List<EventSchema> eventSchemas, List<String> expandT
312310
schemaEx.setSchema(schema);
313311

314312
if (isNeedEventTime) {
315-
int timeIndex = schema.getFieldNames().indexOf(expandTimeKeys.get(index));
313+
int timeIndex = schema.getFieldNames().indexOf(expandTimeFields.get(index));
316314
if (timeIndex == -1) {
317-
errMsg.append("Event ").append(schema.getEventType()).append(" doesn't contain eventTimeKey ").append(expandTimeKeys.get(index)).append(".");
315+
errMsg.append("Event ").append(schema.getEventType()).append(" doesn't contain eventTimeField ").append(expandTimeFields.get(index)).append(".");
318316
return false;
319317
}
320318
schemaEx.setTimeIndex(timeIndex);
321319
}
322320

323-
for (String commonKey : commonKeys) {
324-
int commonKeyIndex = schema.getFieldNames().indexOf(commonKey);
325-
if (commonKeyIndex == -1) {
326-
errMsg.append("Event ").append(schema.getEventType()).append(" doesn't contain commonKey ").append(commonKey);
321+
for (String commonField : commonFields) {
322+
int commonFieldIndex = schema.getFieldNames().indexOf(commonField);
323+
if (commonFieldIndex == -1) {
324+
errMsg.append("Event ").append(schema.getEventType()).append(" doesn't contain commonField ").append(commonField);
327325
return false;
328326
}
329-
schemaEx.getCommonKeyIndex().add(commonKeyIndex);
327+
schemaEx.getCommonFieldIndex().add(commonFieldIndex);
330328
}
331329

332330
List<AttributeSerializer> serls = new ArrayList<>();

src/com/xxdb/streaming/client/cep/EventSchemaEx.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ public class EventSchemaEx {
77

88
private EventSchema schema;
99
private int timeIndex;
10-
private List<Integer> commonKeyIndex;
10+
private List<Integer> commonFieldIndex;
1111

1212
public EventSchemaEx() {
1313
this.schema = new EventSchema();
14-
this.commonKeyIndex = new ArrayList<>();
14+
this.commonFieldIndex = new ArrayList<>();
1515
}
1616

1717
public EventSchema getSchema() {
@@ -30,7 +30,7 @@ public void setTimeIndex(int timeIndex) {
3030
this.timeIndex = timeIndex;
3131
}
3232

33-
public List<Integer> getCommonKeyIndex() {
34-
return this.commonKeyIndex;
33+
public List<Integer> getCommonFieldIndex() {
34+
return this.commonFieldIndex;
3535
}
3636
}

src/com/xxdb/streaming/client/cep/EventSender.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ public class EventSender {
1313
private EventHandler eventHandler;
1414
private DBConnection conn;
1515

16-
public EventSender(DBConnection conn, String tableName, List<EventSchema> eventSchemas, List<String> eventTimeKeys, List<String> commonKeys) throws IOException {
17-
this.eventHandler = new EventHandler(eventSchemas, eventTimeKeys, commonKeys);
16+
public EventSender(DBConnection conn, String tableName, List<EventSchema> eventSchemas, List<String> eventTimeFields, List<String> commonFields) throws IOException {
17+
this.eventHandler = new EventHandler(eventSchemas, eventTimeFields, commonFields);
1818
this.conn = conn;
1919

2020
String sql = "select top 0 * from " + tableName;

0 commit comments

Comments
 (0)