Skip to content

Commit 33b0c50

Browse files
committed
Update ThreadedClientsubscribeReverseTest.java
1 parent ad36741 commit 33b0c50

File tree

1 file changed

+43
-44
lines changed

1 file changed

+43
-44
lines changed

test/com/xxdb/streaming/reverse/ThreadedClientsubscribeReverseTest.java

Lines changed: 43 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -37,21 +37,21 @@ public void clear() throws IOException {
3737
} catch (IOException ex) {
3838
ex.printStackTrace();
3939
}
40-
try{client.unsubscribe(HOST, PORT, "Trades1");}catch (Exception ex){}
41-
try{client.unsubscribe(HOST, PORT, "Trades1", "subTrades2");}catch (Exception ex){}
42-
try{client.unsubscribe(HOST, PORT, "Trades1", "subTrades1");}catch (Exception ex){}
43-
try{client.unsubscribe(HOST, PORT, "Trades1", "subTrades");}catch (Exception ex){}
40+
try{client.unsubscribe(HOST, PORT, "Trades");}catch (Exception ex){}
41+
try{client.unsubscribe(HOST, PORT, "Trades", "subTrades2");}catch (Exception ex){}
42+
try{client.unsubscribe(HOST, PORT, "Trades", "subTrades");}catch (Exception ex){}
43+
try{client.unsubscribe(HOST, PORT, "Trades", "subTrades");}catch (Exception ex){}
4444
try{client.unsubscribe(HOST, PORT, "outTables", "mutiSchema");}catch (Exception ex){}
4545
try{client.unsubscribe(HOST, PORT, "outTables", "javaStreamingApi");}catch (Exception ex){}
4646
clear_env();
4747
}
4848

4949
@After
5050
public void after() throws IOException, InterruptedException {
51-
try{client.unsubscribe(HOST, PORT, "Trades1");}catch (Exception ex){}
52-
try{client.unsubscribe(HOST, PORT, "Trades1", "subTrades2");}catch (Exception ex){}
53-
try{client.unsubscribe(HOST, PORT, "Trades1", "subTrades1");}catch (Exception ex){}
54-
try{client.unsubscribe(HOST, PORT, "Trades1", "subTrades");}catch (Exception ex){}
51+
try{client.unsubscribe(HOST, PORT, "Trades");}catch (Exception ex){}
52+
try{client.unsubscribe(HOST, PORT, "Trades", "subTrades2");}catch (Exception ex){}
53+
try{client.unsubscribe(HOST, PORT, "Trades", "subTrades");}catch (Exception ex){}
54+
try{client.unsubscribe(HOST, PORT, "Trades", "subTrades");}catch (Exception ex){}
5555
try{client.unsubscribe(HOST, PORT, "outTables", "mutiSchema");}catch (Exception ex){}
5656
try{client.unsubscribe(HOST, PORT, "outTables", "javaStreamingApi");}catch (Exception ex){}
5757
clear_env();
@@ -234,14 +234,14 @@ public void test_ThreadedClient_null() throws IOException {
234234
public void test_ThreadedClient_only_subscribePort() throws IOException {
235235
ThreadedClient client2 = new ThreadedClient(0);
236236
String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
237-
"share(st1,`Trades1)\t\n"
238-
+ "setStreamTableFilterColumn(objByName(`Trades1),`tag)";
237+
"share(st1,`Trades)\t\n"
238+
+ "setStreamTableFilterColumn(objByName(`Trades),`tag)";
239239
conn.run(script1);
240240
String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
241241
"share(st2, `Receive)\t\n";
242242
conn.run(script2);
243-
client2.subscribe(HOST,PORT,"Trades1","subTrades1",MessageHandler_handler);
244-
client2.unsubscribe(HOST,PORT,"Trades1","subTrades1");
243+
client2.subscribe(HOST,PORT,"Trades","subTrades",MessageHandler_handler);
244+
client2.unsubscribe(HOST,PORT,"Trades","subTrades");
245245
client2.close();
246246
}
247247

@@ -250,25 +250,25 @@ public void test_ThreadedClient_only_subscribePort() throws IOException {
250250
@Test(timeout = 120000)
251251
public void test_subscribe_ex1() throws Exception {
252252
String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
253-
"share(st1,`Trades1)\t\n"
254-
+ "setStreamTableFilterColumn(objByName(`Trades1),`tag)";
253+
"share(st1,`Trades)\t\n"
254+
+ "setStreamTableFilterColumn(objByName(`Trades),`tag)";
255255
conn.run(script1);
256256
String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
257257
"share(st2, `Receive)\t\n";
258258
conn.run(script2);
259-
client.subscribe(HOST, PORT, "Trades1", MessageHandler_handler);
260-
conn.run("n=1000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades1.append!(t)");
259+
client.subscribe(HOST, PORT, "Trades", MessageHandler_handler);
260+
conn.run("n=1000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
261261
wait_data("Receive",1000);
262262
BasicTable re = (BasicTable) conn.run("Receive");
263-
BasicTable tra = (BasicTable) conn.run("Trades1");
263+
BasicTable tra = (BasicTable) conn.run("Trades");
264264
assertEquals(re.rows(), tra.rows());
265265
for (int i = 0; i < re.rows(); i++) {
266266
assertEquals(re.getColumn(0).get(i), tra.getColumn(0).get(i));
267267
assertEquals(re.getColumn(1).get(i), tra.getColumn(1).get(i));
268268
assertEquals(((Scalar)re.getColumn(2).get(i)).getNumber().doubleValue(), ((Scalar)tra.getColumn(2).get(i)).getNumber().doubleValue(), 4);
269269
}
270270
try {
271-
client.unsubscribe(HOST, PORT, "Trades1", "subtrades");
271+
client.unsubscribe(HOST, PORT, "Trades", "subtrades");
272272
}catch (Exception ex){
273273

274274
}
@@ -446,7 +446,7 @@ public void test_subscribe_tableName_actionName() throws Exception {
446446
"share(st2, `Receive)\t\n";
447447
conn.run(script2);
448448
conn.run("n=100;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
449-
client.subscribe(HOST, PORT, "Trades","subTrades1",MessageHandler_handler);
449+
client.subscribe(HOST, PORT, "Trades","subTrades",MessageHandler_handler);
450450
conn.run("n=1000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
451451
conn.run("n=1000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
452452
conn.run("n=1000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
@@ -459,7 +459,7 @@ public void test_subscribe_tableName_actionName() throws Exception {
459459
assertEquals(re.getColumn(1).get(i), tra.getColumn(1).get(i + 100));
460460
assertEquals(((Scalar)re.getColumn(2).get(i)).getNumber().doubleValue(), ((Scalar)tra.getColumn(2).get(i + 100)).getNumber().doubleValue(),4);
461461
}
462-
client.unsubscribe(HOST, PORT, "Trades","subTrades1");
462+
client.unsubscribe(HOST, PORT, "Trades","subTrades");
463463
}
464464

465465
@Test(timeout = 180000)
@@ -486,7 +486,7 @@ public void test_subscribe_tableName_handler_offset_reconnect_success() throws I
486486
Thread.sleep(2000);
487487
BasicInt row_num2 = (BasicInt)conn.run("(exec count(*) from Receive)[0]");
488488
System.out.println(row_num2);
489-
assertEquals(true,row_num.getInt()<=row_num2.getInt());
489+
assertEquals(true,row_num.getInt()<row_num2.getInt());
490490
write_data.interrupt();
491491
client.unsubscribe(HOST,PORT,"Trades");
492492
}
@@ -501,14 +501,14 @@ public void test_subscribe_TableName_ActionName_Handler_reconnect() throws IOExc
501501
"share(st2, `Receive)\t\n";
502502
conn.run(script2);
503503
Vector filter1 = (Vector) conn.run("1..100000");
504-
client.subscribe(HOST,PORT,"Trades","subTrades1",MessageHandler_handler,true);
504+
client.subscribe(HOST,PORT,"Trades","subTrades",MessageHandler_handler,true);
505505
System.out.println("Successful subscribe");
506506
MyThread write_data = new MyThread ();
507507
write_data.start();
508508
Thread.sleep(2000);
509-
conn.run("stopPublishTable('"+HOST+"',9055,'Trades',\"subTrades1\")");
509+
conn.run("stopPublishTable('"+HOST+"',9055,'Trades',\"subTrades\")");
510510
Thread.sleep(2000);
511-
conn.run("stopPublishTable('"+HOST+"',9055,'Trades',\"subTrades1\")");
511+
conn.run("stopPublishTable('"+HOST+"',9055,'Trades',\"subTrades\")");
512512
Thread.sleep(3000);
513513
BasicInt row_num = (BasicInt)conn.run("(exec count(*) from Receive)[0]");
514514
System.out.println(row_num);
@@ -517,7 +517,7 @@ public void test_subscribe_TableName_ActionName_Handler_reconnect() throws IOExc
517517
System.out.println(row_num2);
518518
assertEquals(true,row_num.getInt()<=row_num2.getInt());
519519
write_data.interrupt();
520-
client.unsubscribe(HOST,PORT,"Trades","subTrades1");
520+
client.unsubscribe(HOST,PORT,"Trades","subTrades");
521521
}
522522

523523
@Test(timeout = 180000)
@@ -784,7 +784,7 @@ public void doEvent(IMessage msg) {
784784
};
785785
int ofst = -1;
786786
Vector filter1 = (Vector) conn.run("1..1000");
787-
client.subscribe(HOST, PORT, "Trades", "subTrades1", MessageHandler_handler, -1, filter1);
787+
client.subscribe(HOST, PORT, "Trades", "subTrades", MessageHandler_handler, -1, filter1);
788788
Vector filter2 = (Vector) conn.run("2001..3000");
789789
client.subscribe(HOST, PORT, "Trades", "subTrades2", handler1, -1, filter2);
790790
conn.run("n=4000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
@@ -794,7 +794,7 @@ public void doEvent(IMessage msg) {
794794
BasicTable tra = (BasicTable) conn.run("Trades");
795795
BasicTable fil = (BasicTable) conn.run("filter");
796796
client.unsubscribe(HOST, PORT, "Trades", "subTrades2");
797-
client.unsubscribe(HOST, PORT, "Trades", "subTrades1");
797+
client.unsubscribe(HOST, PORT, "Trades", "subTrades");
798798
conn.run("dropStreamTable(`filter)");
799799
assertEquals(1000, re.rows());
800800
assertEquals(1000, fil.rows());
@@ -876,11 +876,11 @@ public void test_subscribe_unsubscribe_resubscribe() throws Exception {
876876
conn.run(script2);
877877
Vector filter1 = (Vector) conn.run("1..1000");
878878
for (int i=0;i<10;i++){
879-
client.subscribe(HOST, PORT, "Trades", "subTrades1", MessageHandler_handler, -1, true, filter1, true, 10000, 5);
879+
client.subscribe(HOST, PORT, "Trades", "subTrades", MessageHandler_handler, -1, true, filter1, true, 10000, 5);
880880
client.subscribe(HOST, PORT, "Trades", "subTrades2", MessageHandler_handler, -1, true, filter1, true, 10000, 5);
881881
client.subscribe(HOST, PORT, "Trades", "subTrades3", MessageHandler_handler, -1, true, filter1, true, 10000, 5);
882882
conn.run("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
883-
client.unsubscribe(HOST, PORT, "Trades", "subTrades1");
883+
client.unsubscribe(HOST, PORT, "Trades", "subTrades");
884884
client.unsubscribe(HOST, PORT, "Trades", "subTrades2");
885885
client.unsubscribe(HOST, PORT, "Trades", "subTrades3");
886886
}
@@ -1237,7 +1237,7 @@ public class MyThread extends Thread {
12371237
public void run() {
12381238
try {
12391239
while (true) {
1240-
conn.run("n=1000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades1.append!(t)");
1240+
conn.run("n=1000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
12411241
Thread.sleep(100);
12421242
}
12431243
} catch (Exception e) {
@@ -1248,32 +1248,31 @@ public void run() {
12481248
@Test(timeout=120000)
12491249
public void test_subscribe_reconnect_successful() throws IOException, InterruptedException {
12501250
String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
1251-
"share st1 as Trades1\t\n"
1252-
+ "setStreamTableFilterColumn(objByName(`Trades1),`tag)";
1251+
"share st1 as Trades\t\n"
1252+
+ "setStreamTableFilterColumn(objByName(`Trades),`tag)";
12531253
conn.run(script1);
12541254
String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
12551255
"share(st2, `Receive)\t\n";
12561256
conn.run(script2);
12571257
Vector filter1 = (Vector) conn.run("1..100000");
1258-
client.subscribe(HOST,PORT,"Trades1","subTread1",MessageHandler_handler,-1,true,filter1,true,100,5,"admin","123456");
1258+
client.subscribe(HOST,PORT,"Trades","subTread1",MessageHandler_handler,-1,true,filter1,true,100,5,"admin","123456");
12591259
System.out.println("Successful subscribe");
1260-
conn.run("n=1000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades1.append!(t)");
12611260
Thread.sleep(100);
12621261
MyThread write_data = new MyThread ();
1263-
// write_data.start();
1262+
write_data.start();
12641263
Thread.sleep(2000);
1265-
conn.run("stopPublishTable('"+HOST+"',0,'Trades1','subTread1')");
1264+
conn.run("stopPublishTable('"+HOST+"',0,'Trades','subTread1')");
12661265
Thread.sleep(10000);
1267-
conn.run("stopPublishTable('"+HOST+"',0,'Trades1','subTread1')");
1266+
conn.run("stopPublishTable('"+HOST+"',0,'Trades','subTread1')");
12681267
Thread.sleep(3000);
12691268
BasicInt row_num = (BasicInt)conn.run("(exec count(*) from Receive)[0]");
12701269
System.out.println(row_num);
12711270
Thread.sleep(2000);
12721271
BasicInt row_num2 = (BasicInt)conn.run("(exec count(*) from Receive)[0]");
12731272
System.out.println(row_num2);
1274-
assertEquals(true,row_num.getInt()<=row_num2.getInt());
1275-
// write_data.interrupt();
1276-
client.unsubscribe(HOST,PORT,"Trades1","subTread1");
1273+
assertEquals(true,row_num.getInt()<row_num2.getInt());
1274+
write_data.interrupt();
1275+
client.unsubscribe(HOST,PORT,"Trades","subTread1");
12771276
}
12781277

12791278
@Test(timeout = 120000)
@@ -2281,15 +2280,15 @@ public void test_StreamDeserializer_pair_stream_table_filters_subscribe_isomate_
22812280
public void test_ThreadedClient_only_subscribePort1() throws IOException, InterruptedException {
22822281
ThreadedClient client2 = new ThreadedClient(0);
22832282
String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
2284-
"share(st1,`Trades1)\t\n"
2285-
+ "setStreamTableFilterColumn(objByName(`Trades1),`tag)";
2283+
"share(st1,`Trades)\t\n"
2284+
+ "setStreamTableFilterColumn(objByName(`Trades),`tag)";
22862285
conn.run(script1);
22872286
String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
22882287
"share(st2, `Receive)\t\n";
22892288
conn.run(script2);
2290-
client2.subscribe(HOST,PORT,"Trades1","subTrades1",MessageHandler_handler, -1, true);
2289+
client2.subscribe(HOST,PORT,"Trades","subTrades",MessageHandler_handler, -1, true);
22912290
// Thread.sleep(100000000);
2292-
client2.unsubscribe(HOST,PORT,"Trades1","subTrades1");
2291+
client2.unsubscribe(HOST,PORT,"Trades","subTrades");
22932292
client2.close();
22942293
}
22952294
public static MessageHandler Handler_array = new MessageHandler() {

0 commit comments

Comments
 (0)