Skip to content

Commit 040d21d

Browse files
committed
AJ-605: add test case about cep
1 parent 7682e7a commit 040d21d

File tree

2 files changed

+111
-173
lines changed

2 files changed

+111
-173
lines changed

test/com/xxdb/streaming/client/cep/EventClientTest.java

Lines changed: 47 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -583,9 +583,7 @@ public void test_EventClient_commonKeys_two_column() throws IOException, Interr
583583
eventSchemas.add(scheme1);
584584
List<String> eventTimeKeys = new ArrayList<>();
585585
List<String> commonKeys = Arrays.asList(new String[]{"time","market"});
586-
EventSender sender = EventSender.createEventSender(eventSchemas, eventTimeKeys, commonKeys);
587-
588-
sender.connect(conn, "inputTable");
586+
EventSender sender = new EventSender(conn, "inputTable",eventSchemas, eventTimeKeys, commonKeys);
589587
List<Entity> attributes = new ArrayList<>();
590588
attributes.add(new BasicString("123456"));
591589
attributes.add(new BasicTime(LocalTime.from(LocalDateTime.of(2024,3,22,10,45,3,100000000))));
@@ -614,7 +612,7 @@ public static void subscribePrepare() throws IOException {
614612
eventSchemas.add(scheme);
615613
List<String> eventTimeKeys = Arrays.asList(new String[]{"timestamp"});
616614
List<String> commonKeys = Arrays.asList(new String[]{"comment1"});
617-
sender = EventSender.createEventSender(eventSchemas, eventTimeKeys, commonKeys);
615+
sender = new EventSender(conn, "inputTable", eventSchemas, eventTimeKeys, commonKeys);
618616
client = new EventClient(eventSchemas, eventTimeKeys, commonKeys);
619617
}
620618
@Test
@@ -721,7 +719,6 @@ public void test_EventClient_subscribe_handler_null() throws IOException, Inter
721719
public void test_EventClient_subscribe_offset_negative_1() throws IOException, InterruptedException {
722720
subscribePrepare();
723721
conn.run("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;");
724-
sender.connect(conn,"inputTable");
725722
List<Entity> attributes = new ArrayList<>();
726723
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
727724
attributes.add(new BasicString("123456"));
@@ -736,7 +733,6 @@ public void test_EventClient_subscribe_offset_negative_1() throws IOException,
736733
public void test_EventClient_subscribe_offset_negative_2() throws IOException, InterruptedException {
737734
subscribePrepare();
738735
conn.run("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;");
739-
sender.connect(conn,"inputTable");
740736
List<Entity> attributes = new ArrayList<>();
741737
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
742738
attributes.add(new BasicString("123456"));
@@ -751,7 +747,6 @@ public void test_EventClient_subscribe_offset_negative_2() throws IOException,
751747
public void test_EventClient_subscribe_offset_0() throws IOException, InterruptedException {
752748
subscribePrepare();
753749
conn.run("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;");
754-
sender.connect(conn,"inputTable");
755750
List<Entity> attributes = new ArrayList<>();
756751
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
757752
attributes.add(new BasicString("123456"));
@@ -767,7 +762,6 @@ public void test_EventClient_subscribe_offset_0() throws IOException, Interrupt
767762
public void test_EventClient_subscribe_offset_1() throws IOException, InterruptedException {
768763
subscribePrepare();
769764
conn.run("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;");
770-
sender.connect(conn,"inputTable");
771765
List<Entity> attributes = new ArrayList<>();
772766
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
773767
attributes.add(new BasicString("123456"));
@@ -782,7 +776,6 @@ public void test_EventClient_subscribe_offset_1() throws IOException, Interrupt
782776
public void test_EventClient_subscribe_offset_not_match() throws IOException, InterruptedException {
783777
subscribePrepare();
784778
conn.run("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;");
785-
sender.connect(conn,"inputTable");
786779
List<Entity> attributes = new ArrayList<>();
787780
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
788781
attributes.add(new BasicString("123456"));
@@ -799,7 +792,6 @@ public void test_EventClient_subscribe_offset_not_match() throws IOException, I
799792
public void test_EventClient_subscribe_reconnect_true() throws IOException, InterruptedException {
800793
subscribePrepare();
801794
conn.run("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;");
802-
sender.connect(conn,"inputTable");
803795
List<Entity> attributes = new ArrayList<>();
804796
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
805797
attributes.add(new BasicString("123456"));
@@ -813,7 +805,6 @@ public void test_EventClient_subscribe_reconnect_true() throws IOException, Int
813805
public void test_EventClient_subscribe_reconnect_false() throws IOException, InterruptedException {
814806
subscribePrepare();
815807
conn.run("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;");
816-
sender.connect(conn,"inputTable");
817808
List<Entity> attributes = new ArrayList<>();
818809
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
819810
attributes.add(new BasicString("123456"));
@@ -853,15 +844,15 @@ public void test_EventClient_subscribe_admin() throws IOException, InterruptedE
853844
PrepareUser("user1","123456");
854845
DBConnection conn = new DBConnection();
855846
conn.connect(HOST, PORT,"user1","123456");
856-
subscribePrepare();
857-
conn.run("share streamTable(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable1;");
858-
conn.run("addAccessControl(`inputTable1)");
847+
conn.run("share streamTable(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable;");
848+
conn.run("addAccessControl(`inputTable)");
859849
conn.run("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;");
860-
sender.connect(conn,"inputTable1");
850+
subscribePrepare();
851+
861852
List<Entity> attributes = new ArrayList<>();
862853
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
863854
attributes.add(new BasicString("123456"));
864-
client.subscribe(HOST, PORT, "inputTable1", "test1", handler, -1, true, "admin", "123456");
855+
client.subscribe(HOST, PORT, "inputTable", "test1", handler, -1, true, "admin", "123456");
865856
sender.sendEvent("MarketData", attributes);
866857
Thread.sleep(1000);
867858
BasicTable re = (BasicTable)conn.run("select * from outputTable");
@@ -872,15 +863,15 @@ public void test_EventClient_subscribe_other_user() throws IOException, Interru
872863
PrepareUser("user1","123456");
873864
DBConnection conn = new DBConnection();
874865
conn.connect(HOST, PORT,"user1","123456");
875-
subscribePrepare();
876-
conn.run("share streamTable(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable1;");
877-
conn.run("addAccessControl(`inputTable1)");
866+
conn.run("share streamTable(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable;");
867+
conn.run("addAccessControl(`inputTable)");
878868
conn.run("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;");
879-
sender.connect(conn,"inputTable1");
869+
subscribePrepare();
870+
880871
List<Entity> attributes = new ArrayList<>();
881872
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
882873
attributes.add(new BasicString("123456"));
883-
client.subscribe(HOST, PORT, "inputTable1", "test1", handler, -1, true, "user1", "123456");
874+
client.subscribe(HOST, PORT, "inputTable", "test1", handler, -1, true, "user1", "123456");
884875
sender.sendEvent("MarketData", attributes);
885876
Thread.sleep(1000);
886877
BasicTable re = (BasicTable)conn.run("select * from outputTable");
@@ -892,22 +883,31 @@ public void test_EventClient_other_user_unallow() throws IOException, Interrupt
892883
PrepareUser("user2","123456");
893884
DBConnection conn = new DBConnection();
894885
conn.connect(HOST, PORT,"user1","123456");
895-
subscribePrepare();
896-
conn.run("share streamTable(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable1;");
897-
conn.run("addAccessControl(`inputTable1)");
886+
conn.run("share streamTable(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable;");
887+
conn.run("addAccessControl(`inputTable)");
888+
EventSchema scheme = new EventSchema();
889+
scheme.setEventType("MarketData");
890+
scheme.setFieldNames(Arrays.asList("timestamp", "comment1"));
891+
scheme.setFieldTypes(Arrays.asList( DT_TIMESTAMP,DT_STRING));
892+
scheme.setFieldForms(Arrays.asList(DF_SCALAR, DF_SCALAR));
893+
List<EventSchema> eventSchemas = new ArrayList<>();
894+
eventSchemas.add(scheme);
895+
List<String> eventTimeKeys = Arrays.asList(new String[]{"timestamp"});
896+
List<String> commonKeys = Arrays.asList(new String[]{"comment1"});
897+
sender = new EventSender(conn, "inputTable", eventSchemas, eventTimeKeys, commonKeys);
898+
client = new EventClient(eventSchemas, eventTimeKeys, commonKeys);
898899
String re = null;
899900
try{
900-
client.subscribe(HOST, PORT, "inputTable1", "test1", handler1, -1, true, "user2", "123456");
901+
client.subscribe(HOST, PORT, "inputTable", "test1", handler1, -1, true, "user2", "123456");
901902
}catch(Exception ex){
902903
re = ex.getMessage();
903904
}
904-
Assert.assertEquals(true, re.contains("No access to shared table [inputTable1]"));
905+
Assert.assertEquals(true, re.contains("No access to shared table [inputTable]"));
905906
}
906907
@Test
907908
public void test_EventClient_subscribe_unsubscribe_resubscribe() throws IOException, InterruptedException {
908909
subscribePrepare();
909910
conn.run("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;");
910-
sender.connect(conn,"inputTable");
911911
List<Entity> attributes = new ArrayList<>();
912912
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
913913
attributes.add(new BasicString("123456"));
@@ -955,23 +955,22 @@ public void test_EventClient_unsubscribe_duplicated() throws IOException, Inter
955955

956956
@Test
957957
public void test_EventClient_subscribe_haStreamTable() throws IOException, InterruptedException {
958-
subscribePrepare();
959-
conn.run("table = table(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable1;");
960-
conn.run("haStreamTable(11, table, `inputTable1, 100000)");
958+
conn.run("table = table(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable;");
959+
conn.run("haStreamTable(11, table, `inputTable, 100000)");
961960
conn.run("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;");
962-
sender.connect(conn,"inputTable1");
961+
subscribePrepare();
962+
963963
List<Entity> attributes = new ArrayList<>();
964964
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
965965
attributes.add(new BasicString("123456"));
966-
client.subscribe(HOST, PORT, "inputTable1", "test1", handler, -1, true, "user1", "123456");
966+
client.subscribe(HOST, PORT, "inputTable", "test1", handler, -1, true, "user1", "123456");
967967
sender.sendEvent("MarketData", attributes);
968968
Thread.sleep(1000);
969969
BasicTable re = (BasicTable)conn.run("select * from outputTable");
970970
Assert.assertEquals(1,re.rows());
971971
}
972972
@Test
973973
public void test_EventClient_subscribe_haStreamTable_leader() throws IOException, InterruptedException {
974-
subscribePrepare();
975974
BasicString StreamLeaderTmp = (BasicString)conn.run(String.format("getStreamingLeader(%d)", GROUP_ID));
976975
String StreamLeader = StreamLeaderTmp.getString();
977976
BasicString StreamLeaderHostTmp = (BasicString)conn.run(String.format("(exec host from rpc(getControllerAlias(), getClusterPerf) where name=\"%s\")[0]", StreamLeader));
@@ -982,28 +981,28 @@ public void test_EventClient_subscribe_haStreamTable_leader() throws IOExceptio
982981
System.out.println(StreamLeaderPort);
983982
DBConnection conn1 = new DBConnection();
984983
conn1.connect(StreamLeaderHost, StreamLeaderPort, "admin", "123456");
985-
String script = "try{\ndropStreamTable(`inputTable1)\n}catch(ex){\n}\n"+
984+
String script = "try{\ndropStreamTable(`inputTable)\n}catch(ex){\n}\n"+
986985
"table = table(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]);\n"+
987-
"haStreamTable("+GROUP_ID+", table, `inputTable1, 100000);\n"+
986+
"haStreamTable("+GROUP_ID+", table, `inputTable, 100000);\n"+
988987
"share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;;\n";
989988
conn1.run(script);
990-
sender.connect(conn,"inputTable1");
989+
subscribePrepare();
990+
991991
List<Entity> attributes = new ArrayList<>();
992992
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
993993
attributes.add(new BasicString("123456"));
994-
client.subscribe(StreamLeaderHost, StreamLeaderPort, "inputTable1", "test1", handler, -1, true, "user1", "123456");
994+
client.subscribe(StreamLeaderHost, StreamLeaderPort, "inputTable", "test1", handler, -1, true, "user1", "123456");
995995
sender.sendEvent("MarketData", attributes);
996996
Thread.sleep(1000);
997997
BasicTable re = (BasicTable)conn1.run("select * from outputTable");
998998
Assert.assertEquals(1,re.rows());
999999
Assert.assertEquals("2024.03.22T10:45:03.100",re.getColumn(0).get(0).getString());
10001000
Assert.assertEquals("123456",re.getColumn(1).get(0).getString());
1001-
client.unsubscribe(StreamLeaderHost, StreamLeaderPort, "inputTable1", "test1");
1001+
client.unsubscribe(StreamLeaderHost, StreamLeaderPort, "inputTable", "test1");
10021002
}
10031003

10041004
@Test//not support
10051005
public void test_EventClient_subscribe_haStreamTable_follower() throws IOException, InterruptedException {
1006-
subscribePrepare();
10071006
String script0 ="leader = getStreamingLeader("+GROUP_ID+");\n" +
10081007
"groupSitesStr = (exec sites from getStreamingRaftGroups() where id =="+GROUP_ID+")[0];\n"+
10091008
"groupSites = split(groupSitesStr, \",\");\n"+
@@ -1017,23 +1016,24 @@ public void test_EventClient_subscribe_haStreamTable_follower() throws IOExcept
10171016
System.out.println(StreamFollowerPort);
10181017
DBConnection conn1 = new DBConnection();
10191018
conn1.connect(StreamFollowerHost, StreamFollowerPort, "admin", "123456");
1020-
String script = "try{\ndropStreamTable(`inputTable1)\n}catch(ex){\n}\n"+
1019+
String script = "try{\ndropStreamTable(`inputTable)\n}catch(ex){\n}\n"+
10211020
"table = table(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]);\n"+
1022-
"haStreamTable("+GROUP_ID+", table, `inputTable1, 100000);\n"+
1021+
"haStreamTable("+GROUP_ID+", table, `inputTable, 100000);\n"+
10231022
"share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;\n";
10241023
conn1.run(script);
1025-
sender.connect(conn,"inputTable1");
1024+
1025+
subscribePrepare();
10261026
List<Entity> attributes = new ArrayList<>();
10271027
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
10281028
attributes.add(new BasicString("123456"));
1029-
client.subscribe(StreamFollowerHost, StreamFollowerPort, "inputTable1", "test1", handler, -1, true, "user1", "123456");
1029+
client.subscribe(StreamFollowerHost, StreamFollowerPort, "inputTable", "test1", handler, -1, true, "user1", "123456");
10301030
sender.sendEvent("MarketData", attributes);
10311031
Thread.sleep(1000);
10321032
BasicTable re = (BasicTable)conn1.run("select * from outputTable");
10331033
Assert.assertEquals(1,re.rows());
10341034
Assert.assertEquals("2024.03.22T10:45:03.100",re.getColumn(0).get(0).getString());
10351035
Assert.assertEquals("123456",re.getColumn(1).get(0).getString());
1036-
client.unsubscribe(StreamFollowerHost, StreamFollowerPort, "inputTable1", "test1");
1036+
client.unsubscribe(StreamFollowerHost, StreamFollowerPort, "inputTable", "test1");
10371037
}
10381038

10391039
@Test
@@ -1441,8 +1441,7 @@ public void test_EventClient_all_dateType_vector_no_decimal() throws IOExceptio
14411441
List<EventSchema> eventSchemes = Collections.singletonList(scheme);
14421442
List<String> eventTimeKeys = new ArrayList<>();
14431443
List<String> commonKeys = new ArrayList<>();
1444-
EventSender sender = EventSender.createEventSender(eventSchemes, eventTimeKeys, commonKeys);
1445-
sender.connect(conn,"inputTable");
1444+
EventSender sender = new EventSender(conn, "inputTable", eventSchemes, eventTimeKeys, commonKeys);
14461445

14471446
EventClient client = new EventClient(eventSchemes, eventTimeKeys, commonKeys);
14481447
client.subscribe(HOST, PORT, "intput1", "test1", handler_array_no_decimal, -1, true, "admin", "123456");
@@ -1515,8 +1514,7 @@ public void test_EventClient_all_dateType_vector_decimal() throws IOException,
15151514
List<EventSchema> eventSchemes = Collections.singletonList(scheme);
15161515
List<String> eventTimeKeys = new ArrayList<>();
15171516
List<String> commonKeys = new ArrayList<>();
1518-
EventSender sender = EventSender.createEventSender(eventSchemes, eventTimeKeys, commonKeys);
1519-
sender.connect(conn,"inputTable");
1517+
EventSender sender = new EventSender(conn, "inputTable", eventSchemes, eventTimeKeys, commonKeys);
15201518

15211519
EventClient client = new EventClient(eventSchemes, eventTimeKeys, commonKeys);
15221520
client.subscribe(HOST, PORT, "intput1", "test1", handler_array_decimal, -1, true, "admin", "123456");
@@ -1568,10 +1566,9 @@ public void test_EventClient_all_dateType_array() throws IOException, Interrupt
15681566
List<EventSchema> eventSchemas = Collections.singletonList(scheme);
15691567
List<String> eventTimeKeys = new ArrayList<>();
15701568
List<String> commonKeys = new ArrayList<>();
1571-
EventSender sender = EventSender.createEventSender(eventSchemas, eventTimeKeys, commonKeys);
15721569
String script = "share streamTable(1000000:0, `eventType`event, [STRING,BLOB]) as inputTable;\n";
15731570
conn.run(script);
1574-
sender.connect(conn,"inputTable");
1571+
EventSender sender = new EventSender(conn, "inputTable", eventSchemas, eventTimeKeys, commonKeys);
15751572
EventClient client = new EventClient(eventSchemas, eventTimeKeys, commonKeys);
15761573
client.subscribe(HOST, PORT, "inputTable", "test1", handler_array, -1, true, "admin", "123456");
15771574
Preparedata_array(100,10);

0 commit comments

Comments
 (0)