Skip to content

Commit 8804cb8

Browse files
committed
AJ-433:add test case about ThreadPooledClient(int threadCount);
1 parent 1c5d68e commit 8804cb8

File tree

3 files changed

+49
-15
lines changed

3 files changed

+49
-15
lines changed

test/com/xxdb/streaming/reverse/PollingClientReverseTest.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@ insert into trades values(timev, symv, take(-1, 1), pricev, exchv,x)
1919

2020
import com.xxdb.DBConnection;
2121
import com.xxdb.data.*;
22-
import com.xxdb.streaming.client.BasicMessage;
23-
import com.xxdb.streaming.client.IMessage;
24-
import com.xxdb.streaming.client.PollingClient;
25-
import com.xxdb.streaming.client.TopicPoller;
22+
import com.xxdb.streaming.client.*;
2623
import org.junit.*;
2724

2825
import java.io.IOException;
@@ -516,7 +513,7 @@ public void test_subscribe_offset_reconnect() throws IOException {
516513
}
517514
}
518515

519-
@Test(timeout = 60000)
516+
@Test(timeout = 120000)
520517
public void test_subscribe_tableName_actionName_offset_reconnect() throws IOException {
521518
for (int j=0;j<10;j++) {
522519
TopicPoller poller1 = client.subscribe(HOST, PORT, "Trades1","subTrades1",-1,true);
@@ -553,7 +550,7 @@ public void test_subscribe_tableName_actionName_offset_reconnect() throws IOExce
553550
}
554551
}
555552

556-
@Test(timeout = 60000)
553+
@Test(timeout = 120000)
557554
public void test_subscribe_tableName_actionName_reconnect() throws IOException {
558555
TopicPoller poller1 = client.subscribe(HOST, PORT, "Trades1","subTrades1",true);
559556
PollingClient client1 = new PollingClient(HOST,0);

test/com/xxdb/streaming/reverse/ThreadPooledClientReverseTest.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ public void test_subscribe_ofst0() throws Exception {
229229
int ofst = 0;
230230
client.subscribe(HOST, PORT, "Trades", MessageHandler_handler, ofst);
231231
conn.run("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
232-
Thread.sleep(20000);
232+
Thread.sleep(30000);
233233
BasicTable re = (BasicTable) conn.run("select * from Receive order by tag");
234234
BasicTable tra = (BasicTable) conn.run("select * from Trades order by tag");
235235
client.unsubscribe(HOST, PORT, "Trades", "javaStreamingApi");
@@ -478,7 +478,7 @@ public void test_subscribe_other_some_user() throws IOException, InterruptedExce
478478

479479
@Test
480480
public void test_subscribe_one_user_some_table() throws IOException, InterruptedException {
481-
conn.run("def create_user(){try{deleteUser(`test1)}catch(ex){};createUser(`test1, '123456');};"+
481+
conn.run("login('admin','123456');def create_user(){try{deleteUser(`test1)}catch(ex){};createUser(`test1, '123456');};"+
482482
"rpc(getControllerAlias(),create_user);" +
483483
"share streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE]) as tmp_st1;"+
484484
"share streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE]) as tmp_st2;"+
@@ -489,7 +489,7 @@ public void test_subscribe_one_user_some_table() throws IOException, Interrupted
489489
client.subscribe(HOST, PORT, "tmp_st3", "subTread1", MessageHandler_handler, -1, true, null, true, "test1", "123456_error");
490490
fail("no exception thrown");
491491
}catch (Exception e){
492-
System.out.println(e.getMessage());
492+
System.out.println(e.getMessage()+"12345666");
493493
}
494494
conn.run("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "tmp_st1.append!(t)");
495495
conn.run("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "tmp_st2.append!(t)");
@@ -577,11 +577,11 @@ public void test_ThreadPooledClient_null() throws Exception {
577577
client1.subscribe(HOST, PORT, "Trades", "subTrades",MessageHandler_handler,true);
578578
conn.run("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
579579
conn.run("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
580-
Thread.sleep(10000);
580+
Thread.sleep(15000);
581581
BasicTable re = (BasicTable) conn.run("select * from Receive order by tag");
582582
BasicTable tra = (BasicTable) conn.run("select * from Trades order by tag");
583583
client1.unsubscribe(HOST, PORT, "Trades", "subTrades");
584-
//assertEquals(20000, re.rows());
584+
assertEquals(20000, re.rows());
585585
for (int i = 0; i < 1000; i++) {
586586
assertEquals(re.getColumn(0).get(i), tra.getColumn(0).get(i));
587587
assertEquals(re.getColumn(1).get(i), tra.getColumn(1).get(i));
@@ -684,4 +684,25 @@ public void test_StreamDeserializer_dataType_filters_subscribe_haStreamTable() t
684684
Assert.assertEquals(table2.rows(), msg2.size());
685685
client.unsubscribe(StreamLeaderHost, StreamLeaderPort, "outTables", "mutiSchema");
686686
}
687+
@Test
688+
public void test_ThreadPooledClient_threadCount() throws Exception {
689+
client = new ThreadPooledClient(10);
690+
Vector filter1 = (Vector) conn.run("1..1000");
691+
client.subscribe(HOST, PORT, "Trades", "subTrades", MessageHandler_handler, -1, true, filter1, true);
692+
conn.run("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
693+
conn.run("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
694+
Thread.sleep(10000);
695+
BasicTable re = (BasicTable) conn.run("select * from Receive order by tag");
696+
BasicTable tra = (BasicTable) conn.run("select * from Trades order by tag");
697+
client.unsubscribe(HOST, PORT, "Trades", "subTrades");
698+
assertEquals(2000, re.rows());
699+
for (int i = 0; i < 1000; i++) {
700+
assertEquals(re.getColumn(0).get(i), tra.getColumn(0).get(i));
701+
assertEquals(re.getColumn(1).get(i), tra.getColumn(1).get(i));
702+
assertEquals(((Scalar)re.getColumn(2).get(i)).getNumber().doubleValue(), ((Scalar)tra.getColumn(2).get(i)).getNumber().doubleValue(), 4);
703+
assertEquals(re.getColumn(0).get(i + 1000), tra.getColumn(0).get(i + 1000));
704+
assertEquals(re.getColumn(1).get(i + 1000), tra.getColumn(1).get(i + 1000));
705+
assertEquals(((Scalar)re.getColumn(2).get(i + 1000)).getNumber().doubleValue(), ((Scalar)tra.getColumn(2).get(i + 1000)).getNumber().doubleValue(), 4);
706+
}
707+
}
687708
}

test/com/xxdb/streaming/reverse/ThreadedClientsubscribeReverseTest.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public void test_ThreadedClient_only_subscribePort() throws IOException {
177177

178178

179179

180-
@Test(timeout = 60000)
180+
@Test(timeout = 120000)
181181
public void test_subscribe_ex1() throws Exception {
182182
String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
183183
"share(st1,`Trades1)\t\n"
@@ -743,7 +743,7 @@ public void doEvent(IMessage msg) {
743743
}
744744
}
745745

746-
@Test(timeout = 60000)
746+
@Test(timeout = 120000)
747747
public void test_subscribe_batchSize_throttle() throws Exception {
748748
String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
749749
"share(st1,`Trades)\t\n"
@@ -1001,7 +1001,7 @@ public void test_func_BatchMessageHandler() throws IOException, InterruptedExcep
10011001
client.unsubscribe(HOST,PORT,"Trades","BatchMessageHandler");
10021002
}
10031003

1004-
@Test(timeout = 60000)
1004+
@Test(timeout = 120000)
10051005
public void test_func_BatchMessageHandler_not_set_batchSize() throws IOException, InterruptedException {
10061006
String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
10071007
"share(st1,`Trades)\t\n"
@@ -1038,7 +1038,7 @@ public void test_func_BatchMessageHandler_single_msg() throws IOException, Inter
10381038
client.unsubscribe(HOST,PORT,"Trades","single_msg");
10391039
}
10401040

1041-
@Test(timeout = 60000)
1041+
@Test(timeout = 120000)
10421042
public void test_func_BatchMessageHandler_mul_single_msg() throws IOException, InterruptedException {
10431043
String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
10441044
"share(st1,`Trades)\t\n"
@@ -2178,4 +2178,20 @@ public void test_StreamDeserializer_pair_stream_table_filters_subscribe_isomate_
21782178
client.unsubscribe(HOST, PORT, "outTables", "mutiSchema");
21792179
}
21802180

2181+
@Test
2182+
public void test_ThreadedClient_only_subscribePort1() throws IOException, InterruptedException {
2183+
ThreadedClient client2 = new ThreadedClient(0);
2184+
String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
2185+
"share(st1,`Trades1)\t\n"
2186+
+ "setStreamTableFilterColumn(objByName(`Trades1),`tag)";
2187+
conn.run(script1);
2188+
String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
2189+
"share(st2, `Receive)\t\n";
2190+
conn.run(script2);
2191+
client2.subscribe(HOST,PORT,"Trades1","subTrades1",MessageHandler_handler, -1, true);
2192+
// Thread.sleep(100000000);
2193+
client2.unsubscribe(HOST,PORT,"Trades1","subTrades1");
2194+
client2.close();
2195+
}
2196+
21812197
}

0 commit comments

Comments
 (0)