Skip to content

Commit 1e23cbd

Browse files
author
chengyitian
committed
Merge remote-tracking branch 'origin/dev-pool_loadbalance_fix' into dev-pool_loadbalance_fix
2 parents e202477 + d95fe9e commit 1e23cbd

File tree

3 files changed

+138
-51
lines changed

3 files changed

+138
-51
lines changed

test/com/xxdb/LoadBalanceTest.java

Lines changed: 66 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,9 @@ public void Test_getConnection_enableHighAvailability_true_enableLoadBalance_fal
415415
DBConnection connection = new DBConnection();
416416
connection.connect(HOST, PORT, "admin", "123456",null,true,null,false,false);
417417
list.add(connection);
418+
// BasicInt re = (BasicInt)connection.run("getNodePort()");
419+
// System.out.println("current node is:"+re);
420+
// System.out.println("stop current node");
418421
}
419422
DBConnection connection1 = new DBConnection();
420423
connection1.connect(HOST, PORT, "admin", "123456",false);
@@ -533,7 +536,7 @@ public void Test_getConnection_enableHighAvailability_true_enableLoadBalance_fal
533536
@Test
534537
public void Test_DBConnectionPool_enableHighAvailability_false_loadBalance_false() throws SQLException, ClassNotFoundException, IOException, InterruptedException {
535538
DBConnectionPool pool1 = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",100,false,false);
536-
Thread.sleep(10000);
539+
Thread.sleep(1000);
537540
DBConnection connection1 = new DBConnection();
538541
connection1.connect(HOST, PORT, "admin", "123456",false);
539542
connection1.run("sleep(3000)");
@@ -546,7 +549,7 @@ public void Test_DBConnectionPool_enableHighAvailability_false_loadBalance_false
546549
@Test
547550
public void Test_DBConnectionPool_enableHighAvailability_true_loadBalance_false() throws SQLException, ClassNotFoundException, IOException, InterruptedException {
548551
DBConnectionPool pool1 = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",100,false,true);
549-
Thread.sleep(10000);
552+
Thread.sleep(1000);
550553
DBConnection connection1 = new DBConnection();
551554
connection1.connect(HOST, PORT, "admin", "123456",false);
552555
connection1.run("sleep(3000)");
@@ -579,6 +582,45 @@ public void Test_DBConnectionPool_enableHighAvailability_true_loadBalance_false_
579582
}
580583
}
581584
}
585+
//@Test//The current node is unavailable
586+
public void Test_DBConnectionPool_enableHighAvailability_true_loadBalance_false_2() throws SQLException, ClassNotFoundException, IOException, InterruptedException {
587+
DBConnection controller_conn = new DBConnection();
588+
controller_conn.connect(controller_host, controller_port, "admin", "123456");
589+
class MyThread extends Thread {
590+
@Override
591+
public void run() {
592+
try {
593+
DBConnectionPool pool1 = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",100,false,true,null,null, false, false, false);
594+
Thread.sleep(1000);
595+
} catch (IOException e) {
596+
throw new RuntimeException(e);
597+
} catch (InterruptedException e) {
598+
throw new RuntimeException(e);
599+
}
600+
}
601+
}
602+
class MyThread1 extends Thread {
603+
@Override
604+
public void run() {
605+
try {
606+
controller_conn.run("try{stopDataNode('"+HOST+":"+PORT+"')}catch(ex){}");
607+
Thread.sleep(1000);
608+
} catch (Exception e) {
609+
// 捕获异常并打印错误信息
610+
System.err.println("Error executing task: " + e.getMessage());
611+
}
612+
}
613+
}
614+
MyThread thread = new MyThread();
615+
MyThread1 thread1 = new MyThread1();
616+
thread.start();
617+
Thread.sleep(15);
618+
System.err.println("thread1开始运行 ");
619+
thread1.start();
620+
thread.join();
621+
thread1.join();
622+
controller_conn.run("try{stopDataNode('"+HOST+":"+PORT+"')}catch(ex){}");
623+
}
582624
@Test
583625
public void Test_DBConnectionPool_enableHighAvailability_true_loadBalance_true_highAvailabilitySites_null() throws SQLException, ClassNotFoundException, IOException {
584626
DBConnectionPool pool1 = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",100,true,true,null,null, false, false, false);
@@ -601,7 +643,7 @@ public void Test_DBConnectionPool_enableHighAvailability_true_loadBalance_true()
601643
DBConnectionPool pool1 = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",100,true,true,ipports,null, false, false, false);
602644
DBConnection connection1 = new DBConnection();
603645
connection1.connect(HOST, PORT, "admin", "123456",true);
604-
connection1.run("sleep(1000)");
646+
connection1.run("sleep(2000)");
605647
BasicTable re = (BasicTable) connection1.run("select port ,connectionNum from rpc(getControllerAlias(),getClusterPerf) where mode= 0");
606648
for (int i = 0; i < re.rows(); i++) {
607649
System.out.println("port:" + re.getColumn(0).get(i) + " connectionNum:" + re.getColumn(1).get(i));
@@ -617,34 +659,41 @@ public void Test_DBConnectionPool_enableHighAvailability_true_loadBalance_true_1
617659
DBConnection controller_conn = new DBConnection();
618660
controller_conn.connect(controller_host, controller_port, "admin", "123456");
619661
controller_conn.run("try{stopDataNode('"+HOST+":"+PORT+"')}catch(ex){}");
662+
controller_conn.run("sleep(1000)");
620663
DBConnectionPool pool1 = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",100,true,true,ipports,null, false, false, false);
621664
controller_conn.run("try{startDataNode('"+HOST+":"+PORT+"')}catch(ex){}");
665+
controller_conn.run("sleep(1000)");
622666
DBConnection connection1 = new DBConnection();
623667
connection1.connect(HOST, PORT, "admin", "123456",false);
624668
connection1.run("sleep(3000)");
625-
BasicTable re = (BasicTable) connection1.run("select port ,connectionNum from rpc(getControllerAlias(),getClusterPerf) where mode= 0");
669+
BasicTable node1 = (BasicTable)connection1.run("select port ,connectionNum from rpc(getControllerAlias(),getClusterPerf) where mode= 0 and port ="+ipports[1].split(":")[1]);
670+
System.out.println(node1.getString());
671+
Assert.assertEquals(true, Integer.valueOf(node1.getColumn(1).get(0).toString())>=50);
672+
673+
BasicTable node2 = (BasicTable)connection1.run("select port ,connectionNum from rpc(getControllerAlias(),getClusterPerf) where mode= 0 and port ="+ipports[2].split(":")[1]);
674+
System.out.println(node2.getString());
675+
Assert.assertEquals(true, Integer.valueOf(node2.getColumn(1).get(0).toString())>=25);
676+
pool1.shutdown();
677+
}
678+
@Test
679+
public void Test_DBConnectionPool_enableHighAvailability_false_loadBalance_true() throws SQLException, ClassNotFoundException, IOException {
680+
DBConnectionPool pool1 = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",100,true,false,null,null, false, false, false);
681+
DBConnection connection1 = new DBConnection();
682+
connection1.connect(HOST, PORT, "admin", "123456",false);
683+
connection1.run("sleep(3000)");
684+
BasicTable re = (BasicTable) connection1.run("select port ,connectionNum from rpc(getControllerAlias(),getClusterPerf) where mode in [0,4];");
626685
for (int i = 0; i < re.rows(); i++) {
627686
System.out.println("port:" + re.getColumn(0).get(i) + " connectionNum:" + re.getColumn(1).get(i));
628687
String port = re.getColumn(0).get(i).toString();
629688
String connectionNum = re.getColumn(1).get(i).toString();
630689
if(Integer.valueOf(port)!=PORT) {
631-
assertEquals(true, Integer.valueOf(connectionNum) > 25);
690+
assertEquals(true, Integer.valueOf(connectionNum) > 20);
632691
assertEquals(true, Integer.valueOf(connectionNum) < 50);
633692
}
634693
}
635694
pool1.shutdown();
636695
}
637696
@Test
638-
public void Test_DBConnectionPool_enableHighAvailability_false_loadBalance_true() throws SQLException, ClassNotFoundException, IOException {
639-
String re = null;
640-
try{
641-
DBConnectionPool pool1 = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",100,true,false,null,null, false, false, false);
642-
}catch(Exception ex){
643-
re = ex.getMessage();
644-
}
645-
Assert.assertEquals("Cannot only enable loadbalance but not enable highAvailablity.",re);
646-
}
647-
@Test
648697
public void Test_DBConnectionPool_enableHighAvailability_true_loadBalance_false_site_not_null() throws SQLException, ClassNotFoundException, IOException {
649698
DBConnection controller_conn = new DBConnection();
650699
controller_conn.connect(controller_host, controller_port, "admin", "123456");
@@ -663,16 +712,16 @@ public void Test_DBConnectionPool_enableHighAvailability_true_loadBalance_false_
663712
controller_conn.run("5000");
664713
List<DBTask> tasks = new ArrayList<>();
665714
for (int i = 0; i < 100; i++){
666-
BasicDBTask task = new BasicDBTask("t = streamTable(10:0,`a`b,[INT,INT]);\n insert into t values(1,1);");
715+
BasicDBTask task = new BasicDBTask("getNodePort();");
667716
tasks.add(task);
668717
}
669718
pool1.execute(tasks);
719+
pool1.waitForThreadCompletion();
670720
BasicTable node2 = (BasicTable)controller_conn.run("select port ,connectionNum from rpc(getControllerAlias(),getClusterPerf) where mode= 0 and port ="+PORT);
671721
System.out.println(node2.getString());
672722
Assert.assertEquals(true, Integer.valueOf(node2.getColumn(1).get(0).toString())>=100);
673723
controller_conn.run("try{startDataNode('"+HOST+":"+ipportArray[0].split(":")[1]+"')}catch(ex){}");
674724
controller_conn.run("2000");
675725
pool1.shutdown();
676-
677726
}
678727
}

test/com/xxdb/Prepare.java

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.xxdb.data.BasicTable;
44
import java.io.IOException;
5+
import java.util.Arrays;
56
import java.util.ResourceBundle;
67

78
import static org.junit.Assert.assertEquals;
@@ -10,33 +11,36 @@ public class Prepare {
1011
static ResourceBundle bundle = ResourceBundle.getBundle("com/xxdb/setup/settings");
1112
static String HOST = bundle.getString("HOST");
1213
static int PORT = Integer.parseInt(bundle.getString("PORT"));
14+
static int[] port_list = Arrays.stream(bundle.getString("PORTS").split(",")).mapToInt(Integer::parseInt).toArray();
1315

1416
public static void clear_env() throws IOException {
15-
DBConnection conn = new DBConnection();
16-
conn.connect(HOST,PORT,"admin","123456");
17-
conn.run("a = getStreamingStat().pubTables\n" +
18-
"for(i in a){\n" +
19-
"\ttry{stopPublishTable(i.subscriber.split(\":\")[0],int(i.subscriber.split(\":\")[1]),i.tableName,i.actions)}catch(ex){}\n" +
20-
"}");
21-
conn.run("def getAllShare(){\n" +
22-
"\treturn select name from objs(true) where shared=1\n" +
23-
"\t}\n" +
24-
"\n" +
25-
"def clearShare(){\n" +
26-
"\tlogin(`admin,`123456)\n" +
27-
"\tallShare=exec name from pnodeRun(getAllShare)\n" +
28-
"\tfor(i in allShare){\n" +
29-
"\t\ttry{\n" +
30-
"\t\t\trpc((exec node from pnodeRun(getAllShare) where name =i)[0],clearTablePersistence,objByName(i))\n" +
31-
"\t\t\t}catch(ex1){}\n" +
32-
"\t\trpc((exec node from pnodeRun(getAllShare) where name =i)[0],undef,i,SHARED)\n" +
33-
"\t}\n" +
34-
"\ttry{\n" +
35-
"\t\tPST_DIR=rpc(getControllerAlias(),getDataNodeConfig{getNodeAlias()})['persistenceDir']\n" +
36-
"\t}catch(ex1){}\n" +
37-
"}\n" +
38-
"clearShare()");
39-
conn.run("try{dropStreamEngine(\"serInput\");\n}catch(ex){\n}\n");
17+
for (int i = 0; i < port_list.length; i++) {
18+
DBConnection conn = new DBConnection();
19+
conn.connect(HOST, port_list[i], "admin", "123456");
20+
conn.run("a = getStreamingStat().pubTables\n" +
21+
"for(i in a){\n" +
22+
"\ttry{stopPublishTable(i.subscriber.split(\":\")[0],int(i.subscriber.split(\":\")[1]),i.tableName,i.actions)}catch(ex){}\n" +
23+
"}");
24+
conn.run("def getAllShare(){\n" +
25+
"\treturn select name from objs(true) where shared=1\n" +
26+
"\t}\n" +
27+
"\n" +
28+
"def clearShare(){\n" +
29+
"\tlogin(`admin,`123456)\n" +
30+
"\tallShare=exec name from pnodeRun(getAllShare)\n" +
31+
"\tfor(i in allShare){\n" +
32+
"\t\ttry{\n" +
33+
"\t\t\trpc((exec node from pnodeRun(getAllShare) where name =i)[0],clearTablePersistence,objByName(i))\n" +
34+
"\t\t\t}catch(ex1){}\n" +
35+
"\t\trpc((exec node from pnodeRun(getAllShare) where name =i)[0],undef,i,SHARED)\n" +
36+
"\t}\n" +
37+
"\ttry{\n" +
38+
"\t\tPST_DIR=rpc(getControllerAlias(),getDataNodeConfig{getNodeAlias()})['persistenceDir']\n" +
39+
"\t}catch(ex1){}\n" +
40+
"}\n" +
41+
"clearShare()");
42+
conn.run("try{dropStreamEngine(\"serInput\");\n}catch(ex){\n}\n");
43+
}
4044
}
4145

4246
public static void Preparedata(long count) throws IOException {

test/com/xxdb/streaming/client/cep/EventClientTest.java

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -856,9 +856,11 @@ public void test_EventClient_subscribe_unsubscribe_resubscribe() throws IOExcep
856856
for(int i=0;i<10;i++) {
857857
client.subscribe(HOST, PORT, "inputTable", "test1", handler, -1, true, "admin", "123456");
858858
sender.sendEvent("MarketData", attributes);
859+
Thread.sleep(200);
859860
client.unsubscribe(HOST, PORT, "inputTable", "test1");
860861
client.subscribe(HOST, PORT, "inputTable", "test1", handler, -1, true, "admin", "123456");
861862
sender.sendEvent("MarketData", attributes);
863+
Thread.sleep(200);
862864
client.unsubscribe(HOST, PORT, "inputTable", "test1");
863865
}
864866
Thread.sleep(1000);
@@ -896,15 +898,27 @@ public void test_EventClient_unsubscribe_duplicated() throws IOException, Inter
896898

897899
@Test
898900
public void test_EventClient_subscribe_haStreamTable() throws IOException, InterruptedException {
899-
conn.run("table = table(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable;");
900-
conn.run("haStreamTable(11, table, `inputTable, 100000)");
901-
conn.run("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;");
902-
subscribePrepare();
901+
String script = "try{\ndropStreamTable(`inputTable)\n}catch(ex){\n}\n"+
902+
"table = table(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]);\n"+
903+
"haStreamTable("+GROUP_ID+", table, `inputTable, 100000);\n"+
904+
"share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;\n";
905+
conn.run(script);
906+
EventSchema scheme = new EventSchema();
907+
scheme.setEventType("MarketData");
908+
scheme.setFieldNames(Arrays.asList("timestamp", "comment1"));
909+
scheme.setFieldTypes(Arrays.asList( DT_TIMESTAMP,DT_STRING));
910+
scheme.setFieldForms(Arrays.asList(DF_SCALAR, DF_SCALAR));
911+
List<EventSchema> eventSchemas = new ArrayList<>();
912+
eventSchemas.add(scheme);
913+
List<String> eventTimeKeys = Arrays.asList(new String[]{"timestamp"});
914+
List<String> commonKeys = Arrays.asList(new String[]{"comment1"});
915+
sender = new EventSender(conn, "inputTable", eventSchemas, eventTimeKeys, commonKeys);
916+
client = new EventClient(eventSchemas, eventTimeKeys, commonKeys);
903917

904918
List<Entity> attributes = new ArrayList<>();
905919
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
906920
attributes.add(new BasicString("123456"));
907-
client.subscribe(HOST, PORT, "inputTable", "test1", handler, -1, true, "user1", "123456");
921+
client.subscribe(HOST, PORT, "inputTable", "test1", handler, -1, true, "admin", "123456");
908922
sender.sendEvent("MarketData", attributes);
909923
Thread.sleep(1000);
910924
BasicTable re = (BasicTable)conn.run("select * from outputTable");
@@ -927,12 +941,22 @@ public void test_EventClient_subscribe_haStreamTable_leader() throws IOExceptio
927941
"haStreamTable("+GROUP_ID+", table, `inputTable_1, 100000);\n"+
928942
"share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;\n";
929943
conn1.run(script);
930-
subscribePrepare();
944+
EventSchema scheme = new EventSchema();
945+
scheme.setEventType("MarketData");
946+
scheme.setFieldNames(Arrays.asList("timestamp", "comment1"));
947+
scheme.setFieldTypes(Arrays.asList( DT_TIMESTAMP,DT_STRING));
948+
scheme.setFieldForms(Arrays.asList(DF_SCALAR, DF_SCALAR));
949+
List<EventSchema> eventSchemas = new ArrayList<>();
950+
eventSchemas.add(scheme);
951+
List<String> eventTimeKeys = Arrays.asList(new String[]{"timestamp"});
952+
List<String> commonKeys = Arrays.asList(new String[]{"comment1"});
953+
sender = new EventSender(conn, "inputTable_1", eventSchemas, eventTimeKeys, commonKeys);
954+
client = new EventClient(eventSchemas, eventTimeKeys, commonKeys);
931955

932956
List<Entity> attributes = new ArrayList<>();
933957
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
934958
attributes.add(new BasicString("123456"));
935-
client.subscribe(StreamLeaderHost, StreamLeaderPort, "inputTable_1", "test1", handler, -1, true, "user1", "123456");
959+
client.subscribe(StreamLeaderHost, StreamLeaderPort, "inputTable_1", "test1", handler, -1, true, "admin", "123456");
936960
sender.sendEvent("MarketData", attributes);
937961
Thread.sleep(1000);
938962
BasicTable re = (BasicTable)conn1.run("select * from outputTable");
@@ -942,7 +966,7 @@ public void test_EventClient_subscribe_haStreamTable_leader() throws IOExceptio
942966
client.unsubscribe(StreamLeaderHost, StreamLeaderPort, "inputTable_1", "test1");
943967
}
944968

945-
@Test//not support
969+
//@Test//not support
946970
public void test_EventClient_subscribe_haStreamTable_follower() throws IOException, InterruptedException {
947971
String script0 ="leader = getStreamingLeader("+GROUP_ID+");\n" +
948972
"groupSitesStr = (exec sites from getStreamingRaftGroups() where id =="+GROUP_ID+")[0];\n"+
@@ -962,8 +986,18 @@ public void test_EventClient_subscribe_haStreamTable_follower() throws IOExcept
962986
"haStreamTable("+GROUP_ID+", table, `inputTable_1, 100000);\n"+
963987
"share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;\n";
964988
conn1.run(script);
989+
EventSchema scheme = new EventSchema();
990+
scheme.setEventType("MarketData");
991+
scheme.setFieldNames(Arrays.asList("timestamp", "comment1"));
992+
scheme.setFieldTypes(Arrays.asList( DT_TIMESTAMP,DT_STRING));
993+
scheme.setFieldForms(Arrays.asList(DF_SCALAR, DF_SCALAR));
994+
List<EventSchema> eventSchemas = new ArrayList<>();
995+
eventSchemas.add(scheme);
996+
List<String> eventTimeKeys = Arrays.asList(new String[]{"timestamp"});
997+
List<String> commonKeys = Arrays.asList(new String[]{"comment1"});
998+
sender = new EventSender(conn, "inputTable_1", eventSchemas, eventTimeKeys, commonKeys);
999+
client = new EventClient(eventSchemas, eventTimeKeys, commonKeys);
9651000

966-
subscribePrepare();
9671001
List<Entity> attributes = new ArrayList<>();
9681002
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
9691003
attributes.add(new BasicString("123456"));

0 commit comments

Comments
 (0)