Skip to content

Commit 98dafa3

Browse files
committed
update test case about stream
1 parent 1bbf9cb commit 98dafa3

File tree

4 files changed

+331
-260
lines changed

4 files changed

+331
-260
lines changed

test/com/xxdb/Prepare.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.xxdb;
22

3+
import com.xxdb.data.BasicInt;
34
import com.xxdb.data.BasicTable;
45
import java.io.IOException;
56
import java.util.Arrays;
@@ -283,4 +284,18 @@ public static void checkData(BasicTable exception, BasicTable resTable) {
283284
assertEquals(exception.getColumn(i).getString(), resTable.getColumn(i).getString());
284285
}
285286
}
287+
public static void wait_data(String table_name, int data_row) throws IOException, InterruptedException {
288+
DBConnection conn = new DBConnection();
289+
conn.connect(HOST,PORT,"admin","123456");
290+
BasicInt row_num;
291+
for(int i=0;i<50;i++){
292+
row_num = (BasicInt)conn.run("(exec count(*) from "+table_name+")[0]");
293+
// System.out.println(row_num.getInt());
294+
if(row_num.getInt() == data_row){
295+
break;
296+
}
297+
Thread.sleep(300);
298+
i++;
299+
}
300+
}
286301
}

test/com/xxdb/streaming/reverse/PollingClientReverseTest.java

Lines changed: 17 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,13 @@ insert into trades values(timev, symv, take(-1, 1), pricev, exchv,x)
1717
*/
1818
package com.xxdb.streaming.reverse;
1919

20-
import com.xxdb.BasicDBTask;
20+
2121
import com.xxdb.DBConnection;
22-
import com.xxdb.DBTask;
23-
import com.xxdb.ExclusiveDBConnectionPool;
2422
import com.xxdb.data.*;
2523
import com.xxdb.data.Vector;
2624
import com.xxdb.streaming.client.*;
2725
import org.javatuples.Pair;
2826
import org.junit.*;
29-
3027
import java.io.IOException;
3128
import java.net.SocketException;
3229
import java.util.*;
@@ -99,35 +96,22 @@ public void after() throws IOException, InterruptedException {
9996
try {clear_env();}catch (Exception e){}
10097
//client.close();
10198
conn.close();
102-
Thread.sleep(2000);
10399
}
104100

105101
@AfterClass
106102
public static void clear_conn() {
107103
try {clear_env_1();}catch (Exception e){}
108104
}
109105

110-
public void wait_data(String table_name,int data_row) throws IOException, InterruptedException {
111-
BasicInt row_num;
112-
while(true){
113-
row_num = (BasicInt)conn.run("(exec count(*) from "+table_name+")[0]");
114-
// System.out.println(row_num.getInt());
115-
if(row_num.getInt() == data_row){
116-
break;
117-
}
118-
Thread.sleep(100);
119-
}
120-
}
121-
122106
public static void checkResult() throws IOException, InterruptedException {
123-
for (int i = 0; i < 10; i++)
107+
for (int i = 0; i < 20; i++)
124108
{
125109
BasicInt tmpNum = (BasicInt)conn.run("exec count(*) from sub1");
126110
if (tmpNum.getInt()==(1000))
127111
{
128112
break;
129113
}
130-
Thread.sleep(1000);
114+
Thread.sleep(200);
131115
}
132116
BasicTable except = (BasicTable)conn.run("select * from Trades order by permno");
133117
BasicTable res = (BasicTable)conn.run("select * from sub1 order by permno");
@@ -138,15 +122,15 @@ public static void checkResult() throws IOException, InterruptedException {
138122
}
139123
}
140124
public static void checkResult1() throws IOException, InterruptedException {
141-
for (int i = 0; i < 10; i++)
125+
for (int i = 0; i < 20; i++)
142126
{
143127
BasicInt tmpNum = (BasicInt)conn.run("exec count(*) from sub1 ");
144128
BasicInt tmpNum1 = (BasicInt)conn.run("exec count(*) from sub2 ");
145129
if (tmpNum.getInt()==(1000)&& tmpNum1.getInt()==(1000))
146130
{
147131
break;
148132
}
149-
Thread.sleep(3000);
133+
Thread.sleep(100);
150134
}
151135
BasicTable except = (BasicTable)conn.run("select * from pub_t1 order by timestampv");
152136
BasicTable res = (BasicTable)conn.run("select * from sub1 order by timestampv");
@@ -179,7 +163,7 @@ public void test_size() throws IOException {
179163
try {
180164
for (int i=0;i<10;i++){//data<size
181165
conn.run("n=50;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades1.append!(t)");
182-
msgs = poller1.poll(100,1000);
166+
msgs = poller1.poll(1000,1000);
183167
if (msgs==null){
184168
continue;
185169
}
@@ -189,7 +173,7 @@ else if (msgs.size() > 0) {
189173
}
190174
for (int i=0;i<10;i++){//data>size
191175
conn.run("n=5000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades1.append!(t)");
192-
msgs = poller1.poll(100000,1000);
176+
msgs = poller1.poll(1000,1000);
193177
if (msgs==null){
194178
continue;
195179
}
@@ -200,7 +184,7 @@ else if (msgs.size() > 0) {
200184
}
201185
for (int i=0;i<10;i++){//data=size
202186
conn.run("n=5000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades1.append!(t)");
203-
msgs = poller1.poll(1000000,5000);
187+
msgs = poller1.poll(5000,5000);
204188
if (msgs==null){
205189
continue;
206190
}
@@ -344,7 +328,7 @@ public void test_subscribe_other_user() throws IOException, InterruptedException
344328
PrepareUser("test1","123456");
345329
TopicPoller poller1 = client.subscribe(HOST,PORT,"Trades1","subTread1",-1,true,null,"test1","123456");
346330
conn.run("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades1.append!(t)");
347-
Thread.sleep(5000);
331+
//Thread.sleep(5000);
348332
ArrayList<IMessage> msgs1 = poller1.poll(500, 10000);
349333
assertEquals(10000, msgs1.size());
350334
client.unsubscribe(HOST,PORT,"Trades1","subTread1");
@@ -2198,15 +2182,15 @@ public void run() {
21982182
MessageHandler_handler1(messages);
21992183
Thread.sleep(1000);
22002184
thread.join();
2201-
Thread.sleep(10000);
2185+
Thread.sleep(8000);
22022186
controller_conn.run("try{startDataNode('"+HOST+":"+port_list[1]+"')}catch(ex){}");
22032187
//Thread.sleep(1000);
22042188
List<IMessage> messages1 = poller.poll(1000,1000);
2205-
Thread.sleep(1000);
2189+
//Thread.sleep(1000);
22062190
System.out.println(messages1.size());
22072191
//Assert.assertEquals(1000,messages1.size());
22082192
MessageHandler_handler1(messages1);
2209-
Thread.sleep(1000);
2193+
//Thread.sleep(1000);
22102194
BasicTable re = (BasicTable)conn.run("select tag ,now,deltas(now) from Receive order by deltas(now) desc \n");
22112195
System.out.println(re.getString());
22122196
Assert.assertEquals(1000,re.rows());
@@ -2246,31 +2230,31 @@ public void test_PollingClient_subscribe_resubTimeout_subOnce_not_set() throws I
22462230
System.out.println("Successful subscribe");
22472231
conn1.run("n=1000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)");
22482232
conn2.run("n=1000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)");
2249-
Thread.sleep(1000);
2233+
//Thread.sleep(1000);
22502234
//List<IMessage> messages = poller.poll(1000,1000);
22512235
//MessageHandler_handler(messages);
22522236
controller_conn.run("try{stopDataNode('"+HOST+":"+port_list[1]+"')}catch(ex){}");
22532237
System.out.println(port_list[1]+"断掉啦---------------------------------------------------");
2254-
Thread.sleep(10000);
2238+
Thread.sleep(8000);
22552239
conn2.run("n=2000;t=table(1001..n as tag,timestamp(1001..n) as ts,take(100.0,1000) as data);" + "Trades.append!(t)");
22562240
Thread.sleep(2000);
22572241
//List<IMessage> messages1 = poller.poll(2000,2000);
22582242
// MessageHandler_handler(messages1);
22592243
controller_conn.run("try{startDataNode('"+HOST+":"+port_list[1]+"')}catch(ex){}");
2260-
Thread.sleep(10000);
2244+
Thread.sleep(8000);
22612245
DBConnection conn3 = new DBConnection();
22622246
conn3.connect(HOST,port_list[1],"admin","123456");
22632247
conn3.run(script1);
22642248
conn3.run(script2);
22652249
controller_conn.run("try{stopDataNode('"+HOST+":"+port_list[2]+"')}catch(ex){}");
22662250
System.out.println(port_list[2]+"节点断掉啦---------------------------------------------------");
2267-
Thread.sleep(20000);
2251+
Thread.sleep(8000);
22682252
conn3.run("n=3000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)");
22692253
Thread.sleep(5000);
22702254
List<IMessage> messages2 = poller.poll(3000,3000);
22712255
MessageHandler_handler(messages2);
22722256
controller_conn.run("try{startDataNode('"+HOST+":"+port_list[2]+"')}catch(ex){}");
2273-
Thread.sleep(10000);
2257+
Thread.sleep(8000);
22742258

22752259
BasicTable row_num = (BasicTable)conn.run("select count(*) from Receive");
22762260
System.out.println(row_num.getColumn(0).get(0));

0 commit comments

Comments
 (0)