Skip to content

Commit 62656ce

Browse files
committed
AJ-700: add test case about getOffset
1 parent 30c2bbf commit 62656ce

File tree

2 files changed

+52
-7
lines changed

2 files changed

+52
-7
lines changed

test/com/xxdb/streaming/reverse/ThreadPooledClientReverseTest.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2103,21 +2103,19 @@ public void Test_ThreadPooledClient_subscribe_backupSites_server_disconnect_1()
21032103
System.out.println("这里可以手工断掉这个集群下所有可用节点http://192.168.0.69:18920/?view=overview-old");
21042104
Thread.sleep(1000000);
21052105
}
2106-
21072106
public static MessageHandler MessageHandler_handler_getOffset = new MessageHandler() {
21082107
@Override
21092108
public void doEvent(IMessage msg) {
2110-
System.out.println(msg.getOffset());
21112109
try {
21122110
String script = String.format("insert into Receive values(%d,%s,%f)", Integer.parseInt(msg.getEntity(0).getString()), msg.getEntity(1).getString(), Double.valueOf(msg.getEntity(2).toString()));
21132111
conn.run(script);
2112+
System.out.println("msg.getOffset is :" + msg.getOffset());
2113+
System.out.println("total is :" + total);
21142114
assertEquals(total, msg.getOffset());
21152115
total++;
2116-
System.out.println("msg.getOffset is :" + msg.getOffset());
21172116
} catch (IOException e) {
21182117
e.printStackTrace();
21192118
}
2120-
total=total++;
21212119
}
21222120
};
21232121
@Test(timeout = 180000)
@@ -2144,4 +2142,29 @@ public void test_subscribe_getOffset() throws Exception{
21442142
client1.unsubscribe(HOST, PORT, "Trades");
21452143
}
21462144

2145+
@Test(timeout = 180000)
2146+
public void test_subscribe_filter_getOffset() throws Exception{
2147+
String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
2148+
"share(st1,`Trades)\t\n"
2149+
+ "setStreamTableFilterColumn(objByName(`Trades),`tag)";
2150+
conn.run(script1);
2151+
String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
2152+
"share(st2, `Receive)\t\n";
2153+
conn.run(script2);
2154+
ThreadPooledClient client1 = new ThreadPooledClient(HOST, 0,1);
2155+
BasicIntVector filter = new BasicIntVector(new int[]{1,2,3,4,5});
2156+
client1.subscribe(HOST, PORT, "Trades","ACTION1", MessageHandler_handler_getOffset, -1,filter);
2157+
conn.run("n=7;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
2158+
Thread.sleep(5000);
2159+
BasicTable re = (BasicTable) conn.run("Receive");
2160+
BasicTable tra = (BasicTable) conn.run("Trades");
2161+
assertEquals(5, re.rows());
2162+
for (int i = 0; i < re.rows(); i++) {
2163+
assertEquals(re.getColumn(0).get(i), tra.getColumn(0).get(i));
2164+
assertEquals(re.getColumn(1).get(i), tra.getColumn(1).get(i));
2165+
assertEquals(((Scalar)re.getColumn(2).get(i)).getNumber().doubleValue(), ((Scalar)tra.getColumn(2).get(i)).getNumber().doubleValue(), 4);
2166+
}
2167+
client1.unsubscribe(HOST, PORT, "Trades","ACTION1");
2168+
}
2169+
21472170
}

test/com/xxdb/streaming/reverse/ThreadedClientsubscribeReverseTest.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3643,17 +3643,16 @@ public void test_ThreadClient_subscribe_backupSites_server_disconnect_1() throws
36433643
public static MessageHandler MessageHandler_handler_getOffset = new MessageHandler() {
36443644
@Override
36453645
public void doEvent(IMessage msg) {
3646-
System.out.println(msg.getOffset());
36473646
try {
36483647
String script = String.format("insert into Receive values(%d,%s,%f)", Integer.parseInt(msg.getEntity(0).getString()), msg.getEntity(1).getString(), Double.valueOf(msg.getEntity(2).toString()));
36493648
conn.run(script);
3649+
System.out.println("msg.getOffset is :" + msg.getOffset());
3650+
System.out.println("total is :" + total);
36503651
assertEquals(total, msg.getOffset());
36513652
total++;
3652-
System.out.println("msg.getOffset is :" + msg.getOffset());
36533653
} catch (IOException e) {
36543654
e.printStackTrace();
36553655
}
3656-
total=total++;
36573656
}
36583657
};
36593658
@Test(timeout = 180000)
@@ -3678,4 +3677,27 @@ public void test_subscribe_getOffset() throws Exception{
36783677
}
36793678
client.unsubscribe(HOST, PORT, "Trades");
36803679
}
3680+
@Test(timeout = 180000)
3681+
public void test_subscribe_filter_getOffset() throws Exception{
3682+
String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
3683+
"share(st1,`Trades)\t\n"
3684+
+ "setStreamTableFilterColumn(objByName(`Trades),`tag)";
3685+
conn.run(script1);
3686+
String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
3687+
"share(st2, `Receive)\t\n";
3688+
conn.run(script2);
3689+
BasicIntVector filter = new BasicIntVector(new int[]{1,2,3,4,5});
3690+
client.subscribe(HOST, PORT, "Trades","ACTION1", MessageHandler_handler_getOffset, -1,filter);
3691+
conn.run("n=7;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
3692+
Thread.sleep(5000);
3693+
BasicTable re = (BasicTable) conn.run("Receive");
3694+
BasicTable tra = (BasicTable) conn.run("Trades");
3695+
assertEquals(5, re.rows());
3696+
for (int i = 0; i < re.rows(); i++) {
3697+
assertEquals(re.getColumn(0).get(i), tra.getColumn(0).get(i));
3698+
assertEquals(re.getColumn(1).get(i), tra.getColumn(1).get(i));
3699+
assertEquals(((Scalar)re.getColumn(2).get(i)).getNumber().doubleValue(), ((Scalar)tra.getColumn(2).get(i)).getNumber().doubleValue(), 4);
3700+
}
3701+
client.unsubscribe(HOST, PORT, "Trades","ACTION1");
3702+
}
36813703
}

0 commit comments

Comments
 (0)