Skip to content

Commit 0cc048b

Browse files
committed
Update EventClientTest.java
1 parent 33b0c50 commit 0cc048b

File tree

1 file changed

+43
-9
lines changed

1 file changed

+43
-9
lines changed

test/com/xxdb/streaming/client/cep/EventClientTest.java

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -855,9 +855,11 @@ public void test_EventClient_subscribe_unsubscribe_resubscribe() throws IOExcep
855855
for(int i=0;i<10;i++) {
856856
client.subscribe(HOST, PORT, "inputTable", "test1", handler, -1, true, "admin", "123456");
857857
sender.sendEvent("MarketData", attributes);
858+
Thread.sleep(200);
858859
client.unsubscribe(HOST, PORT, "inputTable", "test1");
859860
client.subscribe(HOST, PORT, "inputTable", "test1", handler, -1, true, "admin", "123456");
860861
sender.sendEvent("MarketData", attributes);
862+
Thread.sleep(200);
861863
client.unsubscribe(HOST, PORT, "inputTable", "test1");
862864
}
863865
Thread.sleep(1000);
@@ -895,15 +897,27 @@ public void test_EventClient_unsubscribe_duplicated() throws IOException, Inter
895897

896898
@Test
897899
public void test_EventClient_subscribe_haStreamTable() throws IOException, InterruptedException {
898-
conn.run("table = table(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable;");
899-
conn.run("haStreamTable(11, table, `inputTable, 100000)");
900-
conn.run("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;");
901-
subscribePrepare();
900+
String script = "try{\ndropStreamTable(`inputTable)\n}catch(ex){\n}\n"+
901+
"table = table(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]);\n"+
902+
"haStreamTable("+GROUP_ID+", table, `inputTable, 100000);\n"+
903+
"share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;\n";
904+
conn.run(script);
905+
EventSchema scheme = new EventSchema();
906+
scheme.setEventType("MarketData");
907+
scheme.setFieldNames(Arrays.asList("timestamp", "comment1"));
908+
scheme.setFieldTypes(Arrays.asList( DT_TIMESTAMP,DT_STRING));
909+
scheme.setFieldForms(Arrays.asList(DF_SCALAR, DF_SCALAR));
910+
List<EventSchema> eventSchemas = new ArrayList<>();
911+
eventSchemas.add(scheme);
912+
List<String> eventTimeKeys = Arrays.asList(new String[]{"timestamp"});
913+
List<String> commonKeys = Arrays.asList(new String[]{"comment1"});
914+
sender = new EventSender(conn, "inputTable", eventSchemas, eventTimeKeys, commonKeys);
915+
client = new EventClient(eventSchemas, eventTimeKeys, commonKeys);
902916

903917
List<Entity> attributes = new ArrayList<>();
904918
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
905919
attributes.add(new BasicString("123456"));
906-
client.subscribe(HOST, PORT, "inputTable", "test1", handler, -1, true, "user1", "123456");
920+
client.subscribe(HOST, PORT, "inputTable", "test1", handler, -1, true, "admin", "123456");
907921
sender.sendEvent("MarketData", attributes);
908922
Thread.sleep(1000);
909923
BasicTable re = (BasicTable)conn.run("select * from outputTable");
@@ -926,12 +940,22 @@ public void test_EventClient_subscribe_haStreamTable_leader() throws IOExceptio
926940
"haStreamTable("+GROUP_ID+", table, `inputTable_1, 100000);\n"+
927941
"share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;\n";
928942
conn1.run(script);
929-
subscribePrepare();
943+
EventSchema scheme = new EventSchema();
944+
scheme.setEventType("MarketData");
945+
scheme.setFieldNames(Arrays.asList("timestamp", "comment1"));
946+
scheme.setFieldTypes(Arrays.asList( DT_TIMESTAMP,DT_STRING));
947+
scheme.setFieldForms(Arrays.asList(DF_SCALAR, DF_SCALAR));
948+
List<EventSchema> eventSchemas = new ArrayList<>();
949+
eventSchemas.add(scheme);
950+
List<String> eventTimeKeys = Arrays.asList(new String[]{"timestamp"});
951+
List<String> commonKeys = Arrays.asList(new String[]{"comment1"});
952+
sender = new EventSender(conn, "inputTable_1", eventSchemas, eventTimeKeys, commonKeys);
953+
client = new EventClient(eventSchemas, eventTimeKeys, commonKeys);
930954

931955
List<Entity> attributes = new ArrayList<>();
932956
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
933957
attributes.add(new BasicString("123456"));
934-
client.subscribe(StreamLeaderHost, StreamLeaderPort, "inputTable_1", "test1", handler, -1, true, "user1", "123456");
958+
client.subscribe(StreamLeaderHost, StreamLeaderPort, "inputTable_1", "test1", handler, -1, true, "admin", "123456");
935959
sender.sendEvent("MarketData", attributes);
936960
Thread.sleep(1000);
937961
BasicTable re = (BasicTable)conn1.run("select * from outputTable");
@@ -941,7 +965,7 @@ public void test_EventClient_subscribe_haStreamTable_leader() throws IOExceptio
941965
client.unsubscribe(StreamLeaderHost, StreamLeaderPort, "inputTable_1", "test1");
942966
}
943967

944-
@Test//not support
968+
//@Test//not support
945969
public void test_EventClient_subscribe_haStreamTable_follower() throws IOException, InterruptedException {
946970
String script0 ="leader = getStreamingLeader("+GROUP_ID+");\n" +
947971
"groupSitesStr = (exec sites from getStreamingRaftGroups() where id =="+GROUP_ID+")[0];\n"+
@@ -961,8 +985,18 @@ public void test_EventClient_subscribe_haStreamTable_follower() throws IOExcept
961985
"haStreamTable("+GROUP_ID+", table, `inputTable_1, 100000);\n"+
962986
"share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;\n";
963987
conn1.run(script);
988+
EventSchema scheme = new EventSchema();
989+
scheme.setEventType("MarketData");
990+
scheme.setFieldNames(Arrays.asList("timestamp", "comment1"));
991+
scheme.setFieldTypes(Arrays.asList( DT_TIMESTAMP,DT_STRING));
992+
scheme.setFieldForms(Arrays.asList(DF_SCALAR, DF_SCALAR));
993+
List<EventSchema> eventSchemas = new ArrayList<>();
994+
eventSchemas.add(scheme);
995+
List<String> eventTimeKeys = Arrays.asList(new String[]{"timestamp"});
996+
List<String> commonKeys = Arrays.asList(new String[]{"comment1"});
997+
sender = new EventSender(conn, "inputTable_1", eventSchemas, eventTimeKeys, commonKeys);
998+
client = new EventClient(eventSchemas, eventTimeKeys, commonKeys);
964999

965-
subscribePrepare();
9661000
List<Entity> attributes = new ArrayList<>();
9671001
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
9681002
attributes.add(new BasicString("123456"));

0 commit comments

Comments
 (0)