Skip to content

Commit 299bb63

Browse files
committed
Update ThreadedClientsubscribeReverseTest.java
1 parent c2a44b1 commit 299bb63

File tree

1 file changed

+96
-10
lines changed

1 file changed

+96
-10
lines changed

test/com/xxdb/streaming/reverse/ThreadedClientsubscribeReverseTest.java

Lines changed: 96 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,9 @@ public void doEvent(IMessage msg) {
105105
String script = String.format("insert into Receive values(%d,%s,%f)", Integer.parseInt(msg.getEntity(0).getString()), msg.getEntity(1).getString(), Double.valueOf(msg.getEntity(2).toString()));
106106
conn.run(script);
107107
System.out.println(msg.getEntity(0).getString());
108-
Thread.sleep(5);
108+
//Thread.sleep(5);
109109
} catch (IOException e) {
110110
e.printStackTrace();
111-
} catch (InterruptedException e) {
112-
throw new RuntimeException(e);
113111
}
114112
}
115113
};
@@ -3423,6 +3421,7 @@ public void test_ThreadClient_subscribe_backupSites_server_disconnect_backupSite
34233421
conn3.connect(HOST,port_list[1],"admin","123456");
34243422
conn3.run(script1);
34253423
conn3.run(script2);
3424+
conn3.run("n=100000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)");
34263425
controller_conn.run("try{stopDataNode('"+HOST+":"+port_list[2]+"')}catch(ex){}");
34273426
System.out.println(port_list[2]+"节点断掉啦---------------------------------------------------");
34283427
Thread.sleep(10000);
@@ -3459,7 +3458,7 @@ public void test_ThreadClient_subscribe_backupSites_server_disconnect_backupSite
34593458
conn2.run(script2);
34603459
Vector filter1 = (Vector) conn.run("1..100000");
34613460
List<String> backupSites = new ArrayList<>(Collections.singleton(HOST+":"+port_list[2]));
3462-
client.subscribe(HOST,port_list[1],"Trades","subTread1",MessageHandler_handler, -1,true,filter1, (StreamDeserializer) null,true,1000, 1,"admin","123456",backupSites,10,false);
3461+
client.subscribe(HOST,port_list[1],"Trades","subTread1",MessageHandler_handler, 0,true,filter1, (StreamDeserializer) null,true,1000, 1,"admin","123456",backupSites,10,true);
34633462
System.out.println("Successful subscribe");
34643463
conn1.run("n=100000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)");
34653464
conn2.run("n=100000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)");
@@ -3474,14 +3473,21 @@ public void test_ThreadClient_subscribe_backupSites_server_disconnect_backupSite
34743473
conn3.run(script2);
34753474
conn3.run("n=100000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)");
34763475
controller_conn.run("try{stopDataNode('"+HOST+":"+port_list[2]+"')}catch(ex){}");
3476+
//System.out.println(port_list[2]+"节点断掉啦---------------------------------------------------");
34773477
Thread.sleep(10000);
34783478
controller_conn.run("try{startDataNode('"+HOST+":"+port_list[2]+"')}catch(ex){}");
3479-
Thread.sleep(10000);
34803479

3480+
Thread.sleep(2000);
3481+
DBConnection conn4 = new DBConnection();
3482+
conn4.connect(HOST,port_list[2],"admin","123456");
3483+
conn4.run(script1);
3484+
conn4.run(script2);
3485+
conn4.run("n=100000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)");
3486+
Thread.sleep(6000);
34813487
BasicTable row_num = (BasicTable)conn.run("select count(*) from Receive");
34823488
System.out.println(row_num.getColumn(0).get(0));
34833489
assertEquals("100000",row_num.getColumn(0).get(0).getString());
3484-
//client.unsubscribe(HOST,port_list[1],"Trades","subTread1");
3490+
// client.unsubscribe(HOST,port_list[1],"Trades","subTread1");
34853491
}
34863492

34873493
@Test(timeout = 180000)
@@ -3517,7 +3523,7 @@ public void doEvent(IMessage msg) {
35173523
}
35183524
};
35193525
@Test(timeout = 180000)
3520-
public void test_ThreadClient_subscribe_backupSites_resubTimeout() throws Exception {
3526+
public void test_ThreadClient_subscribe_backupSites_resubTimeout1() throws Exception {
35213527
DBConnection controller_conn = new DBConnection();
35223528
controller_conn.connect(controller_host,controller_port,"admin","123456");
35233529
controller_conn.run("try{startDataNode('"+HOST+":"+port_list[1]+"')}catch(ex){}");
@@ -3533,12 +3539,16 @@ public void test_ThreadClient_subscribe_backupSites_resubTimeout() throws Except
35333539
conn1.connect(HOST,port_list[1],"admin","123456");
35343540
conn1.run(script1);
35353541
conn1.run(script2);
3542+
DBConnection conn2 = new DBConnection();
3543+
conn2.connect(HOST,port_list[2],"admin","123456");
3544+
conn2.run(script1);
3545+
conn2.run(script2);
35363546
Vector filter1 = (Vector) conn.run("1..100000");
3537-
List<String> backupSites = new ArrayList<>(Collections.singleton(HOST+":"+PORT));
3538-
client.subscribe(HOST,port_list[1],"Trades","subTread1",MessageHandler_handler1, -1,true,filter1, (StreamDeserializer) null,true,1000, 1,"admin","123456",backupSites,10000,true);
3547+
List<String> backupSites = new ArrayList<>(Collections.singleton(HOST+":"+port_list[2]));
3548+
client.subscribe(HOST,port_list[1],"Trades","subTread1",MessageHandler_handler1, -1,true,filter1, (StreamDeserializer) null,true,1, 1,"admin","123456",backupSites,10000,true);
35393549
System.out.println("Successful subscribe");
3540-
conn.run("n=100000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
35413550
conn1.run("n=100000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
3551+
conn2.run("n=100000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
35423552
Thread.sleep(1000);
35433553
controller_conn.run("try{stopDataNode('"+HOST+":"+port_list[1]+"')}catch(ex){}");
35443554
Thread.sleep(10000);
@@ -3548,6 +3558,82 @@ public void test_ThreadClient_subscribe_backupSites_resubTimeout() throws Except
35483558
System.out.println(re.getString());
35493559
//client.unsubscribe(HOST,port_list[1],"Trades","subTread1");
35503560
}
3561+
public static MessageHandler MessageHandler_handler2 = new MessageHandler() {
3562+
List<Long> list = new ArrayList<>();
3563+
3564+
@Override
3565+
public void doEvent(IMessage msg) {
3566+
Long re = System.currentTimeMillis();
3567+
// client.
3568+
// String script = String.format("insert into Receive values(%d,%s,%f,%d)", Integer.parseInt(msg.getEntity(0).getString()), msg.getEntity(1).getString(), Double.valueOf(msg.getEntity(2).toString()),System.currentTimeMillis());
3569+
//conn.run(script);
3570+
//System.out.println("当前时间:" + re);
3571+
// list.add(re);
3572+
//System.out.println(list.toString());
3573+
3574+
}
3575+
};
3576+
@Test(timeout = 180000)
3577+
public void test_ThreadClient_subscribe_backupSites_resubTimeout() throws Exception {
3578+
DBConnection controller_conn = new DBConnection();
3579+
controller_conn.connect(controller_host,controller_port,"admin","123456");
3580+
controller_conn.run("try{startDataNode('"+HOST+":"+port_list[1]+"')}catch(ex){}");
3581+
controller_conn.run("sleep(1000)");
3582+
String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
3583+
"share(st1,`Trades)\t\n"
3584+
+ "setStreamTableFilterColumn(objByName(`Trades),`tag)";
3585+
conn.run(script1);
3586+
String script2 = "st2 = streamTable(1000000:0,`tag`ts`data`now,[INT,TIMESTAMP,DOUBLE,TIMESTAMP])\n" +
3587+
"share(st2, `Receive)\t\n";
3588+
conn.run(script2);
3589+
DBConnection conn1 = new DBConnection();
3590+
conn1.connect(HOST,port_list[1],"admin","123456");
3591+
conn1.run(script1);
3592+
conn1.run(script2);
3593+
Vector filter1 = (Vector) conn.run("1..10000000");
3594+
List<String> backupSites = new ArrayList<>(Collections.singleton(HOST+":"+PORT));
3595+
client.subscribe(HOST,port_list[1],"Trades","subTread1",MessageHandler_handler2, -1,true,filter1, (StreamDeserializer) null,true,1, 1,"admin","123456",backupSites,10000,true);
3596+
System.out.println("Successful subscribe");
3597+
conn.run("n=10000000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
3598+
conn1.run("n=10000000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
3599+
System.out.println("Successful subscribe111");
3600+
class MyThread extends Thread {
3601+
@Override
3602+
public void run() {
3603+
try {
3604+
conn.run("n=1000000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
3605+
conn1.run("n=1000000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
3606+
} catch (Exception e) {
3607+
// 捕获异常并打印错误信息
3608+
System.err.println( e.getMessage());
3609+
}
3610+
}
3611+
}
3612+
class MyThread1 extends Thread {
3613+
@Override
3614+
public void run() {
3615+
try {
3616+
controller_conn.run("try{stopDataNode('"+HOST+":"+port_list[1]+"')}catch(ex){}");
3617+
} catch (Exception e) {
3618+
// 捕获异常并打印错误信息
3619+
System.err.println(e.getMessage());
3620+
}
3621+
}
3622+
}
3623+
// MyThread thread = new MyThread();
3624+
// MyThread1 thread1 = new MyThread1();
3625+
// thread.start();
3626+
// Thread.sleep(5);
3627+
// thread1.start();
3628+
// thread.join();
3629+
// Thread.sleep(1000);
3630+
// Thread.sleep(5000);
3631+
controller_conn.run("try{startDataNode('"+HOST+":"+port_list[1]+"')}catch(ex){}");
3632+
//Thread.sleep(20000);
3633+
//BasicTable re = (BasicTable)conn.run("select tag ,now,deltas(now) from Receive where deltas(now)>0 order by deltas(now) desc limit 1\n");
3634+
//System.out.println(re.getString());
3635+
//client.unsubscribe(HOST,port_list[1],"Trades","subTread1");
3636+
}
35513637

35523638
// @Test(timeout = 180000)
35533639
public void test_ThreadClient_subscribe_backupSites_server_disconnect_1() throws IOException, InterruptedException {

0 commit comments

Comments
 (0)