@@ -856,9 +856,11 @@ public void test_EventClient_subscribe_unsubscribe_resubscribe() throws IOExcep
856
856
for (int i =0 ;i <10 ;i ++) {
857
857
client .subscribe (HOST , PORT , "inputTable" , "test1" , handler , -1 , true , "admin" , "123456" );
858
858
sender .sendEvent ("MarketData" , attributes );
859
+ Thread .sleep (200 );
859
860
client .unsubscribe (HOST , PORT , "inputTable" , "test1" );
860
861
client .subscribe (HOST , PORT , "inputTable" , "test1" , handler , -1 , true , "admin" , "123456" );
861
862
sender .sendEvent ("MarketData" , attributes );
863
+ Thread .sleep (200 );
862
864
client .unsubscribe (HOST , PORT , "inputTable" , "test1" );
863
865
}
864
866
Thread .sleep (1000 );
@@ -896,15 +898,27 @@ public void test_EventClient_unsubscribe_duplicated() throws IOException, Inter
896
898
897
899
@ Test
898
900
public void test_EventClient_subscribe_haStreamTable () throws IOException , InterruptedException {
899
- conn .run ("table = table(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable;" );
900
- conn .run ("haStreamTable(11, table, `inputTable, 100000)" );
901
- conn .run ("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;" );
902
- subscribePrepare ();
901
+ String script = "try{\n dropStreamTable(`inputTable)\n }catch(ex){\n }\n " +
902
+ "table = table(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]);\n " +
903
+ "haStreamTable(" +GROUP_ID +", table, `inputTable, 100000);\n " +
904
+ "share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;\n " ;
905
+ conn .run (script );
906
+ EventSchema scheme = new EventSchema ();
907
+ scheme .setEventType ("MarketData" );
908
+ scheme .setFieldNames (Arrays .asList ("timestamp" , "comment1" ));
909
+ scheme .setFieldTypes (Arrays .asList ( DT_TIMESTAMP ,DT_STRING ));
910
+ scheme .setFieldForms (Arrays .asList (DF_SCALAR , DF_SCALAR ));
911
+ List <EventSchema > eventSchemas = new ArrayList <>();
912
+ eventSchemas .add (scheme );
913
+ List <String > eventTimeKeys = Arrays .asList (new String []{"timestamp" });
914
+ List <String > commonKeys = Arrays .asList (new String []{"comment1" });
915
+ sender = new EventSender (conn , "inputTable" , eventSchemas , eventTimeKeys , commonKeys );
916
+ client = new EventClient (eventSchemas , eventTimeKeys , commonKeys );
903
917
904
918
List <Entity > attributes = new ArrayList <>();
905
919
attributes .add (new BasicTimestamp (LocalDateTime .of (2024 ,3 ,22 ,10 ,45 ,3 ,100000000 )));
906
920
attributes .add (new BasicString ("123456" ));
907
- client .subscribe (HOST , PORT , "inputTable" , "test1" , handler , -1 , true , "user1 " , "123456" );
921
+ client .subscribe (HOST , PORT , "inputTable" , "test1" , handler , -1 , true , "admin " , "123456" );
908
922
sender .sendEvent ("MarketData" , attributes );
909
923
Thread .sleep (1000 );
910
924
BasicTable re = (BasicTable )conn .run ("select * from outputTable" );
@@ -927,12 +941,22 @@ public void test_EventClient_subscribe_haStreamTable_leader() throws IOExceptio
927
941
"haStreamTable(" +GROUP_ID +", table, `inputTable_1, 100000);\n " +
928
942
"share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;\n " ;
929
943
conn1 .run (script );
930
- subscribePrepare ();
944
+ EventSchema scheme = new EventSchema ();
945
+ scheme .setEventType ("MarketData" );
946
+ scheme .setFieldNames (Arrays .asList ("timestamp" , "comment1" ));
947
+ scheme .setFieldTypes (Arrays .asList ( DT_TIMESTAMP ,DT_STRING ));
948
+ scheme .setFieldForms (Arrays .asList (DF_SCALAR , DF_SCALAR ));
949
+ List <EventSchema > eventSchemas = new ArrayList <>();
950
+ eventSchemas .add (scheme );
951
+ List <String > eventTimeKeys = Arrays .asList (new String []{"timestamp" });
952
+ List <String > commonKeys = Arrays .asList (new String []{"comment1" });
953
+ sender = new EventSender (conn , "inputTable_1" , eventSchemas , eventTimeKeys , commonKeys );
954
+ client = new EventClient (eventSchemas , eventTimeKeys , commonKeys );
931
955
932
956
List <Entity > attributes = new ArrayList <>();
933
957
attributes .add (new BasicTimestamp (LocalDateTime .of (2024 ,3 ,22 ,10 ,45 ,3 ,100000000 )));
934
958
attributes .add (new BasicString ("123456" ));
935
- client .subscribe (StreamLeaderHost , StreamLeaderPort , "inputTable_1" , "test1" , handler , -1 , true , "user1 " , "123456" );
959
+ client .subscribe (StreamLeaderHost , StreamLeaderPort , "inputTable_1" , "test1" , handler , -1 , true , "admin " , "123456" );
936
960
sender .sendEvent ("MarketData" , attributes );
937
961
Thread .sleep (1000 );
938
962
BasicTable re = (BasicTable )conn1 .run ("select * from outputTable" );
@@ -942,7 +966,7 @@ public void test_EventClient_subscribe_haStreamTable_leader() throws IOExceptio
942
966
client .unsubscribe (StreamLeaderHost , StreamLeaderPort , "inputTable_1" , "test1" );
943
967
}
944
968
945
- @ Test //not support
969
+ // @Test//not support
946
970
public void test_EventClient_subscribe_haStreamTable_follower () throws IOException , InterruptedException {
947
971
String script0 ="leader = getStreamingLeader(" +GROUP_ID +");\n " +
948
972
"groupSitesStr = (exec sites from getStreamingRaftGroups() where id ==" +GROUP_ID +")[0];\n " +
@@ -962,8 +986,18 @@ public void test_EventClient_subscribe_haStreamTable_follower() throws IOExcept
962
986
"haStreamTable(" +GROUP_ID +", table, `inputTable_1, 100000);\n " +
963
987
"share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;\n " ;
964
988
conn1 .run (script );
989
+ EventSchema scheme = new EventSchema ();
990
+ scheme .setEventType ("MarketData" );
991
+ scheme .setFieldNames (Arrays .asList ("timestamp" , "comment1" ));
992
+ scheme .setFieldTypes (Arrays .asList ( DT_TIMESTAMP ,DT_STRING ));
993
+ scheme .setFieldForms (Arrays .asList (DF_SCALAR , DF_SCALAR ));
994
+ List <EventSchema > eventSchemas = new ArrayList <>();
995
+ eventSchemas .add (scheme );
996
+ List <String > eventTimeKeys = Arrays .asList (new String []{"timestamp" });
997
+ List <String > commonKeys = Arrays .asList (new String []{"comment1" });
998
+ sender = new EventSender (conn , "inputTable_1" , eventSchemas , eventTimeKeys , commonKeys );
999
+ client = new EventClient (eventSchemas , eventTimeKeys , commonKeys );
965
1000
966
- subscribePrepare ();
967
1001
List <Entity > attributes = new ArrayList <>();
968
1002
attributes .add (new BasicTimestamp (LocalDateTime .of (2024 ,3 ,22 ,10 ,45 ,3 ,100000000 )));
969
1003
attributes .add (new BasicString ("123456" ));
0 commit comments