Skip to content

Commit 30c2bbf

Browse files
committed
AJ-700: add test case about getOffset
1 parent 2e11e72 commit 30c2bbf

File tree

4 files changed

+108
-4
lines changed

4 files changed

+108
-4
lines changed

test/com/xxdb/streaming/client/BasicMessageTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ public void test_BasicMessage() throws Exception {
2525
assertEquals("(6.4, 9.2)",bm.getValue(1).toString());
2626
assertEquals("China",bm.getValue("mongoDB").toString());
2727
assertEquals("(5,(6.4, 9.2),China,15.48)",bm.getMsg().getString());
28+
assertEquals(0,bm.getOffset());
2829
}
29-
3030
}

test/com/xxdb/streaming/reverse/PollingClientReverseTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2302,4 +2302,27 @@ public void test_PollingClient_subscribe_backupSites_server_disconnect_1() throw
23022302
System.out.println("这里可以手工断掉这个集群下所有可用节点http://192.168.0.69:18920/?view=overview-old");
23032303
Thread.sleep(1000000);
23042304
}
2305+
@Test//(timeout = 60000)
2306+
public void test_subscribe_getOffset() throws IOException {
2307+
conn.run("setStreamTableFilterColumn(Trades1,`tag);" +
2308+
"filter=1..5");
2309+
BasicIntVector filter = (BasicIntVector) conn.run("filter");
2310+
TopicPoller poller1 = client.subscribe(HOST,PORT,"Trades1","subTread1",-1,filter);
2311+
ArrayList<IMessage> msg1;
2312+
conn.run("n=6;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" +
2313+
"Trades1.append!(t)");
2314+
msg1 = poller1.poll(500, 10000);
2315+
assertEquals(5, msg1.size());
2316+
System.out.println("msg1.get(0).getOffset() :" + msg1.get(0).getOffset());
2317+
System.out.println("msg1.get(1).getOffset() :" + msg1.get(1).getOffset());
2318+
System.out.println("msg1.get(2).getOffset() :" + msg1.get(2).getOffset());
2319+
System.out.println("msg1.get(3).getOffset() :" + msg1.get(3).getOffset());
2320+
System.out.println("msg1.get(4).getOffset() :" + msg1.get(4).getOffset());
2321+
assertEquals(1, msg1.get(0).getOffset());
2322+
assertEquals(2, msg1.get(1).getOffset());
2323+
assertEquals(3, msg1.get(2).getOffset());
2324+
assertEquals(4, msg1.get(3).getOffset());
2325+
assertEquals(5, msg1.get(4).getOffset());
2326+
client.unsubscribe(HOST,PORT,"Trades1","subTread1");
2327+
}
23052328
}

test/com/xxdb/streaming/reverse/ThreadPooledClientReverseTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public class ThreadPooledClientReverseTest {
2525
static String HOST = bundle.getString("HOST");
2626
static int PORT = Integer.parseInt(bundle.getString("PORT"));
2727
//static int PORT = 9002;
28+
static long total = 0;
2829

2930
private static ThreadPooledClient client;
3031

@@ -2102,4 +2103,45 @@ public void Test_ThreadPooledClient_subscribe_backupSites_server_disconnect_1()
21022103
System.out.println("这里可以手工断掉这个集群下所有可用节点http://192.168.0.69:18920/?view=overview-old");
21032104
Thread.sleep(1000000);
21042105
}
2106+
2107+
public static MessageHandler MessageHandler_handler_getOffset = new MessageHandler() {
2108+
@Override
2109+
public void doEvent(IMessage msg) {
2110+
System.out.println(msg.getOffset());
2111+
try {
2112+
String script = String.format("insert into Receive values(%d,%s,%f)", Integer.parseInt(msg.getEntity(0).getString()), msg.getEntity(1).getString(), Double.valueOf(msg.getEntity(2).toString()));
2113+
conn.run(script);
2114+
assertEquals(total, msg.getOffset());
2115+
total++;
2116+
System.out.println("msg.getOffset is :" + msg.getOffset());
2117+
} catch (IOException e) {
2118+
e.printStackTrace();
2119+
}
2120+
total=total++;
2121+
}
2122+
};
2123+
@Test(timeout = 180000)
2124+
public void test_subscribe_getOffset() throws Exception{
2125+
String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
2126+
"share(st1,`Trades)\t\n"
2127+
+ "setStreamTableFilterColumn(objByName(`Trades),`tag)";
2128+
conn.run(script1);
2129+
String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
2130+
"share(st2, `Receive)\t\n";
2131+
conn.run(script2);
2132+
ThreadPooledClient client1 = new ThreadPooledClient(HOST, 0,1);
2133+
client1.subscribe(HOST, PORT, "Trades", MessageHandler_handler_getOffset, true);
2134+
conn.run("n=1000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
2135+
Thread.sleep(5000);
2136+
BasicTable re = (BasicTable) conn.run("Receive");
2137+
BasicTable tra = (BasicTable) conn.run("Trades");
2138+
assertEquals(1000, re.rows());
2139+
for (int i = 0; i < re.rows(); i++) {
2140+
assertEquals(re.getColumn(0).get(i), tra.getColumn(0).get(i));
2141+
assertEquals(re.getColumn(1).get(i), tra.getColumn(1).get(i));
2142+
assertEquals(((Scalar)re.getColumn(2).get(i)).getNumber().doubleValue(), ((Scalar)tra.getColumn(2).get(i)).getNumber().doubleValue(), 4);
2143+
}
2144+
client1.unsubscribe(HOST, PORT, "Trades");
2145+
}
2146+
21052147
}

test/com/xxdb/streaming/reverse/ThreadedClientsubscribeReverseTest.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public class ThreadedClientsubscribeReverseTest {
3232
static int controller_port = Integer.parseInt(bundle.getString("CONTROLLER_PORT"));
3333
//static int PORT = 9002;
3434
private static ThreadedClient client;
35+
static long total = 0;
3536

3637
@BeforeClass
3738
public static void setUp() throws IOException {
@@ -114,7 +115,7 @@ public void doEvent(IMessage msg) {
114115
}
115116
};
116117

117-
public void wait_data(String table_name,int data_row) throws IOException, InterruptedException {
118+
public static void wait_data(String table_name, int data_row) throws IOException, InterruptedException {
118119
BasicInt row_num;
119120
while(true){
120121
row_num = (BasicInt)conn.run("(exec count(*) from "+table_name+")[0]");
@@ -243,7 +244,7 @@ public void test_ThreadedClient_null() throws IOException {
243244
}
244245

245246
@Test
246-
public void test_ThreadedClient_only_subscribePort() throws IOException {
247+
public void test_ThreadedClient_only_subscribePort() throws IOException, InterruptedException {
247248
ThreadedClient client2 = new ThreadedClient(0);
248249
String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
249250
"share(st1,`Trades)\t\n"
@@ -3639,4 +3640,42 @@ public void test_ThreadClient_subscribe_backupSites_server_disconnect_1() throws
36393640
System.out.println("这里可以手工断掉这个集群下所有可用节点http://192.168.0.69:18920/?view=overview-old");
36403641
Thread.sleep(1000000);
36413642
}
3642-
}
3643+
public static MessageHandler MessageHandler_handler_getOffset = new MessageHandler() {
3644+
@Override
3645+
public void doEvent(IMessage msg) {
3646+
System.out.println(msg.getOffset());
3647+
try {
3648+
String script = String.format("insert into Receive values(%d,%s,%f)", Integer.parseInt(msg.getEntity(0).getString()), msg.getEntity(1).getString(), Double.valueOf(msg.getEntity(2).toString()));
3649+
conn.run(script);
3650+
assertEquals(total, msg.getOffset());
3651+
total++;
3652+
System.out.println("msg.getOffset is :" + msg.getOffset());
3653+
} catch (IOException e) {
3654+
e.printStackTrace();
3655+
}
3656+
total=total++;
3657+
}
3658+
};
3659+
@Test(timeout = 180000)
3660+
public void test_subscribe_getOffset() throws Exception{
3661+
String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
3662+
"share(st1,`Trades)\t\n"
3663+
+ "setStreamTableFilterColumn(objByName(`Trades),`tag)";
3664+
conn.run(script1);
3665+
String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n" +
3666+
"share(st2, `Receive)\t\n";
3667+
conn.run(script2);
3668+
client.subscribe(HOST, PORT, "Trades", MessageHandler_handler_getOffset, true);
3669+
conn.run("n=5000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
3670+
wait_data("Receive",5000);
3671+
BasicTable re = (BasicTable) conn.run("Receive");
3672+
BasicTable tra = (BasicTable) conn.run("Trades");
3673+
assertEquals(5000, re.rows());
3674+
for (int i = 0; i < re.rows(); i++) {
3675+
assertEquals(re.getColumn(0).get(i), tra.getColumn(0).get(i));
3676+
assertEquals(re.getColumn(1).get(i), tra.getColumn(1).get(i));
3677+
assertEquals(((Scalar)re.getColumn(2).get(i)).getNumber().doubleValue(), ((Scalar)tra.getColumn(2).get(i)).getNumber().doubleValue(), 4);
3678+
}
3679+
client.unsubscribe(HOST, PORT, "Trades");
3680+
}
3681+
}

0 commit comments

Comments
 (0)