Skip to content

Commit 7c9d3a2

Browse files
author
chengyitian
committed
Merge remote-tracking branch 'origin/dev-pool_loadbalance_fix' into dev-pool_loadbalance_fix
2 parents 2d05f22 + f9a5099 commit 7c9d3a2

File tree

3 files changed

+189
-44
lines changed

3 files changed

+189
-44
lines changed

test/com/xxdb/LoadBalanceTest.java

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,4 +530,149 @@ public void Test_getConnection_enableHighAvailability_true_enableLoadBalance_fal
530530
controller_conn.run("try{startDataNode('"+HOST+":"+node1.getInt()+"')}catch(ex){}");
531531
controller_conn.run("2000");
532532
}
533+
@Test
534+
public void Test_DBConnectionPool_enableHighAvailability_false_loadBalance_false() throws SQLException, ClassNotFoundException, IOException, InterruptedException {
535+
DBConnectionPool pool1 = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",100,false,false);
536+
Thread.sleep(10000);
537+
DBConnection connection1 = new DBConnection();
538+
connection1.connect(HOST, PORT, "admin", "123456",false);
539+
connection1.run("sleep(3000)");
540+
BasicIntVector re = (BasicIntVector)connection1.run("EXEC connectionNum from rpc(getControllerAlias(),getClusterPerf) where port="+PORT);
541+
System.out.println(re.getInt(0));
542+
assertEquals(true,re.getInt(0)>=100);
543+
connection1.close();
544+
pool1.shutdown();
545+
}
546+
@Test
547+
public void Test_DBConnectionPool_enableHighAvailability_true_loadBalance_false() throws SQLException, ClassNotFoundException, IOException, InterruptedException {
548+
DBConnectionPool pool1 = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",100,false,true);
549+
Thread.sleep(10000);
550+
DBConnection connection1 = new DBConnection();
551+
connection1.connect(HOST, PORT, "admin", "123456",false);
552+
connection1.run("sleep(3000)");
553+
BasicIntVector re = (BasicIntVector)connection1.run("EXEC connectionNum from rpc(getControllerAlias(),getClusterPerf) where port="+PORT);
554+
System.out.println(re.getInt(0));
555+
assertEquals(true,re.getInt(0)>100);
556+
connection1.close();
557+
pool1.shutdown();
558+
}
559+
560+
@Test//The current node is unavailable
561+
public void Test_DBConnectionPool_enableHighAvailability_true_loadBalance_false_1() throws SQLException, ClassNotFoundException, IOException {
562+
DBConnection controller_conn = new DBConnection();
563+
controller_conn.connect(controller_host, controller_port, "admin", "123456");
564+
controller_conn.run("try{stopDataNode('"+HOST+":"+PORT+"')}catch(ex){}");
565+
DBConnectionPool pool1 = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",100,false,true,ipports,null, false, false, false);
566+
567+
controller_conn.run("try{startDataNode('"+HOST+":"+PORT+"')}catch(ex){}");
568+
controller_conn.run("sleep(3000);");
569+
DBConnection connection1 = new DBConnection();
570+
connection1.connect(HOST, PORT, "admin", "123456",false);
571+
int port1 = port_list[1];
572+
BasicTable re = (BasicTable) connection1.run("select port ,connectionNum from rpc(getControllerAlias(),getClusterPerf) where mode= 0");
573+
for (int i = 0; i < re.rows(); i++) {
574+
System.out.println("port:" + re.getColumn(0).get(i) + " connectionNum:" + re.getColumn(1).get(i));
575+
String port = re.getColumn(0).get(i).toString();
576+
String connectionNum = re.getColumn(1).get(i).toString();
577+
if(Integer.valueOf(port)==port1) {
578+
assertEquals(true, Integer.valueOf(connectionNum) >= 100);
579+
}
580+
}
581+
}
582+
@Test
583+
public void Test_DBConnectionPool_enableHighAvailability_true_loadBalance_true_highAvailabilitySites_null() throws SQLException, ClassNotFoundException, IOException {
584+
DBConnectionPool pool1 = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",100,true,true,null,null, false, false, false);
585+
DBConnection connection1 = new DBConnection();
586+
connection1.connect(HOST, PORT, "admin", "123456",true);
587+
connection1.run("sleep(3000)");
588+
BasicTable re = (BasicTable) connection1.run("select port ,connectionNum from rpc(getControllerAlias(),getClusterPerf) where mode= 0");
589+
for (int i = 0; i < re.rows(); i++) {
590+
System.out.println("port:" + re.getColumn(0).get(i) + " connectionNum:" + re.getColumn(1).get(i));
591+
String port = re.getColumn(0).get(i).toString();
592+
String connectionNum = re.getColumn(1).get(i).toString();
593+
assertEquals(true, Integer.valueOf(connectionNum) >= 20);
594+
assertEquals(true, Integer.valueOf(connectionNum) < 50);
595+
}
596+
pool1.shutdown();
597+
}
598+
599+
@Test
600+
public void Test_DBConnectionPool_enableHighAvailability_true_loadBalance_true() throws SQLException, ClassNotFoundException, IOException {
601+
DBConnectionPool pool1 = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",100,true,true,ipports,null, false, false, false);
602+
DBConnection connection1 = new DBConnection();
603+
connection1.connect(HOST, PORT, "admin", "123456",true);
604+
connection1.run("sleep(1000)");
605+
BasicTable re = (BasicTable) connection1.run("select port ,connectionNum from rpc(getControllerAlias(),getClusterPerf) where mode= 0");
606+
for (int i = 0; i < re.rows(); i++) {
607+
System.out.println("port:" + re.getColumn(0).get(i) + " connectionNum:" + re.getColumn(1).get(i));
608+
String port = re.getColumn(0).get(i).toString();
609+
String connectionNum = re.getColumn(1).get(i).toString();
610+
assertEquals(true, Integer.valueOf(connectionNum) >= 20);
611+
assertEquals(true, Integer.valueOf(connectionNum) < 50);
612+
}
613+
pool1.shutdown();
614+
}
615+
@Test//The current node is unavailable
616+
public void Test_DBConnectionPool_enableHighAvailability_true_loadBalance_true_1() throws SQLException, ClassNotFoundException, IOException {
617+
DBConnection controller_conn = new DBConnection();
618+
controller_conn.connect(controller_host, controller_port, "admin", "123456");
619+
controller_conn.run("try{stopDataNode('"+HOST+":"+PORT+"')}catch(ex){}");
620+
DBConnectionPool pool1 = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",100,true,true,ipports,null, false, false, false);
621+
controller_conn.run("try{startDataNode('"+HOST+":"+PORT+"')}catch(ex){}");
622+
DBConnection connection1 = new DBConnection();
623+
connection1.connect(HOST, PORT, "admin", "123456",false);
624+
connection1.run("sleep(3000)");
625+
BasicTable re = (BasicTable) connection1.run("select port ,connectionNum from rpc(getControllerAlias(),getClusterPerf) where mode= 0");
626+
for (int i = 0; i < re.rows(); i++) {
627+
System.out.println("port:" + re.getColumn(0).get(i) + " connectionNum:" + re.getColumn(1).get(i));
628+
String port = re.getColumn(0).get(i).toString();
629+
String connectionNum = re.getColumn(1).get(i).toString();
630+
if(Integer.valueOf(port)!=PORT) {
631+
assertEquals(true, Integer.valueOf(connectionNum) > 25);
632+
assertEquals(true, Integer.valueOf(connectionNum) < 50);
633+
}
634+
}
635+
pool1.shutdown();
636+
}
637+
@Test
638+
public void Test_DBConnectionPool_enableHighAvailability_false_loadBalance_true() throws SQLException, ClassNotFoundException, IOException {
639+
String re = null;
640+
try{
641+
DBConnectionPool pool1 = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",100,true,false,null,null, false, false, false);
642+
}catch(Exception ex){
643+
re = ex.getMessage();
644+
}
645+
Assert.assertEquals("Cannot only enable loadbalance but not enable highAvailablity.",re);
646+
}
647+
@Test
648+
public void Test_DBConnectionPool_enableHighAvailability_true_loadBalance_false_site_not_null() throws SQLException, ClassNotFoundException, IOException {
649+
DBConnection controller_conn = new DBConnection();
650+
controller_conn.connect(controller_host, controller_port, "admin", "123456");
651+
controller_conn.run("try{stopDataNode('"+HOST+":"+PORT+"')}catch(ex){}");
652+
controller_conn.run("2000");
653+
String[] ipportArray = new String[1];
654+
ipportArray[0] = ipports[2];
655+
DBConnectionPool pool1 = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",100,false,true,ipportArray,null, false, false, false);
656+
controller_conn.run("20000");
657+
BasicTable node1 = (BasicTable)controller_conn.run("select port ,connectionNum from rpc(getControllerAlias(),getClusterPerf) where mode= 0 and port ="+ipportArray[0].split(":")[1]);
658+
System.out.println(node1.getString());
659+
Assert.assertEquals(true, Integer.valueOf(node1.getColumn(1).get(0).toString())>=100);
660+
controller_conn.run("try{startDataNode('"+HOST+":"+PORT+"')}catch(ex){}");
661+
controller_conn.run("2000");
662+
controller_conn.run("try{stopDataNode('"+HOST+":"+ipportArray[0].split(":")[1]+"')}catch(ex){}");
663+
controller_conn.run("5000");
664+
List<DBTask> tasks = new ArrayList<>();
665+
for (int i = 0; i < 100; i++){
666+
BasicDBTask task = new BasicDBTask("t = streamTable(10:0,`a`b,[INT,INT]);\n insert into t values(1,1);");
667+
tasks.add(task);
668+
}
669+
pool1.execute(tasks);
670+
BasicTable node2 = (BasicTable)controller_conn.run("select port ,connectionNum from rpc(getControllerAlias(),getClusterPerf) where mode= 0 and port ="+PORT);
671+
System.out.println(node2.getString());
672+
Assert.assertEquals(true, Integer.valueOf(node2.getColumn(1).get(0).toString())>=100);
673+
controller_conn.run("try{startDataNode('"+HOST+":"+ipportArray[0].split(":")[1]+"')}catch(ex){}");
674+
controller_conn.run("2000");
675+
pool1.shutdown();
676+
677+
}
533678
}

test/com/xxdb/streaming/client/cep/EventClientTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public void after() throws IOException, InterruptedException {
4343
try{client.unsubscribe(HOST, PORT, "intput", "test1");}catch (Exception ex){}
4444
try{client.unsubscribe(HOST, PORT, "inputTable" ,"javaStreamingApi");}catch (Exception ex){}
4545
try{client.unsubscribe(HOST, PORT, "intput" ,"javaStreamingApi");}catch (Exception ex){}
46+
try{client.unsubscribe(HOST, PORT, "inputTable_1" ,"test1");}catch (Exception ex){}
4647
}
4748

4849
public static EventMessageHandler handler = new EventMessageHandler() {

0 commit comments

Comments
 (0)