Skip to content

Commit 73a630b

Browse files
committed
AJ-486:update test case about mtw
1 parent f855284 commit 73a630b

File tree

1 file changed

+169
-3
lines changed

1 file changed

+169
-3
lines changed

test/com/xxdb/MultithreadedTableWriterTest.java

Lines changed: 169 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -872,7 +872,7 @@ public void test_ChunkInTransaction_insert() throws Exception {
872872
assertEquals(1000,status1.unsentRows+status.sendFailedRows+status.sentRows);
873873

874874
}else {
875-
assertTrue(status1.getErrorInfo().toString().contains(HOST+":"+PORT+" Server response: '<ChunkInTransaction>filepath '/test_MultithreadedTableWriter"));
875+
assertTrue(status1.getErrorInfo().toString().contains(HOST+":"+PORT+" Server response: '<ChunkInTransaction>The openChunks operation failed because the chunk '/test_MultithreadedTableWriter"));
876876
assertEquals("A5",status1.getErrorCode());
877877
assertTrue(status1.sendFailedRows >0);
878878
assertEquals(true,status1.hasError());
@@ -6608,7 +6608,7 @@ public void test_MultithreadedTableWriter_Callback_dfs_single_thread_false()thr
66086608
colNames.add("issuccess");
66096609
cols.add(bsv);
66106610
cols.add(bbv);
6611-
BasicTable callback = new BasicTable(colNames,cols);;
6611+
BasicTable callback = new BasicTable(colNames,cols);
66126612
Callback callbackHandler = new Callback(){
66136613
public void writeCompletion(Table callbackTable) {
66146614
BasicStringVector idV = (BasicStringVector) callbackTable.getColumn(0);
@@ -6648,7 +6648,7 @@ public void writeCompletion(Table callbackTable) {
66486648
}
66496649
System.out.println(mtw.getStatus().toString());
66506650
Assert.assertEquals(true,mtw.getStatus().sendFailedRows>0);
6651-
Assert.assertEquals(true,mtw.getStatus().unsentRows==0);
6651+
//Assert.assertEquals(true,mtw.getStatus().unsentRows==0);
66526652
mtw.waitForThreadCompletion();
66536653
try{conn1.run("startDataNode([\""+HOST+":"+PORT+"\"])");
66546654

@@ -7186,5 +7186,171 @@ public void test_MultithreadedTableWriter_illegal_string_1() throws Exception {
71867186
assertEquals("\0blob1AM\0ZN", table2.getColumn(3).get(1).getString());
71877187
assertEquals("blob1AMZN\0", table2.getColumn(3).get(2).getString());
71887188
}
7189+
//@Test(timeout = 120000)
7190+
public void test_MultithreadedTableWriter_write_block()throws Exception {
7191+
DBConnection conn= new DBConnection(false, false, false, true);
7192+
conn.connect(HOST, PORT, "admin", "123456");
7193+
StringBuilder sb = new StringBuilder();
7194+
sb.append("dbName = 'dfs://test_MultithreadedTableWriter';\n" +
7195+
"if(existsDatabase(dbName)){\n" +
7196+
"\tdropDB(dbName);\n" +
7197+
"}\n" +
7198+
"db = database(dbName, HASH, [STRING, 10], engine=\"TSDB\");\n"+
7199+
"dummy = table(100:0, [`id], [STRING]);\n" +
7200+
"db.createPartitionedTable(dummy, `pt, `id, , `id);");
7201+
conn.run(sb.toString());
7202+
List<Vector> cols = new ArrayList<>();
7203+
List<String> colNames = new ArrayList<>();
7204+
BasicStringVector bsv = new BasicStringVector(1);
7205+
BasicBooleanVector bbv = new BasicBooleanVector(1);
7206+
colNames.add("id");
7207+
colNames.add("issuccess");
7208+
cols.add(bsv);
7209+
cols.add(bbv);
7210+
BasicTable callback = new BasicTable(colNames,cols);;
7211+
Callback callbackHandler = new Callback(){
7212+
public void writeCompletion(Table callbackTable) {
7213+
BasicStringVector idV = (BasicStringVector) callbackTable.getColumn(0);
7214+
BasicBooleanVector successV = (BasicBooleanVector) callbackTable.getColumn(1);
7215+
synchronized (callback) {
7216+
try {
7217+
callback.getColumn(0).Append(idV);
7218+
callback.getColumn(1).Append(successV);
7219+
} catch (Exception e) {
7220+
throw new RuntimeException(e);
7221+
}
7222+
}
7223+
for (int i = 0; i < successV.rows(); i++){
7224+
System.out.println(idV.getString(i) + " " + successV.getBoolean(i));
7225+
}
7226+
}
7227+
};
7228+
MultithreadedTableWriter mtw = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://test_MultithreadedTableWriter", "pt", false,
7229+
false, null, 50000, 1, 1, "id", null, callbackHandler);
7230+
7231+
for (int i = 0; i < 65537; i++){
7232+
try{
7233+
ErrorCodeInfo pErrorInfo = mtw.insert(Integer.toString(i), Integer.toString(i));
7234+
}
7235+
catch(RuntimeException ex)
7236+
{
7237+
System.out.println(ex.getMessage());
7238+
}
7239+
}
7240+
mtw.waitForThreadCompletion();
7241+
7242+
System.out.println("callback rows"+callback.rows());
7243+
//assertEquals(1000000000, callback.rows()-1);
7244+
7245+
Map<String,Entity> map = new HashMap<>();
7246+
map.put("testUpload",callback);
7247+
conn.upload(map);
7248+
BasicTable act = (BasicTable) conn.run("select * from testUpload where issuccess = true order by id");
7249+
BasicTable act1 = (BasicTable) conn.run("select * from testUpload order by id");
7250+
7251+
BasicTable ex = (BasicTable)conn.run("select * from loadTable('dfs://test_MultithreadedTableWriter', 'pt') order by id");
7252+
assertEquals(ex.rows(), act.rows());
7253+
assertEquals(ex.rows(), act.rows());
7254+
for (int i = 0; i < ex.rows(); i++){
7255+
assertEquals(ex.getColumn(0).get(i).getString(), act.getColumn(0).get(i).getString());
7256+
}
7257+
conn.close();
7258+
}
7259+
7260+
//@Test
7261+
public void test_MultithreadedTableWriter_write_block_1()throws Exception {
7262+
DBConnection conn= new DBConnection(false, false, false, true);
7263+
conn.connect(HOST, PORT, "admin", "123456");
7264+
DBConnection conn1= new DBConnection(false, false, false, true);
7265+
conn1.connect(CONTROLLER_HOST, CONTROLLER_PORT, "admin", "123456");
7266+
StringBuilder sb = new StringBuilder();
7267+
sb.append("dbName = 'dfs://test_MultithreadedTableWriter';\n" +
7268+
"if(existsDatabase(dbName)){\n" +
7269+
"\tdropDB(dbName);\n" +
7270+
"}\n" +
7271+
"db = database(dbName, HASH, [STRING, 10], engine=\"TSDB\");\n"+
7272+
"dummy = table(100:0, [`id], [STRING]);\n" +
7273+
"db.createPartitionedTable(dummy, `pt, `id, , `id);");
7274+
conn.run(sb.toString());
7275+
List<Vector> cols = new ArrayList<>();
7276+
List<String> colNames = new ArrayList<>();
7277+
BasicStringVector bsv = new BasicStringVector(1);
7278+
BasicBooleanVector bbv = new BasicBooleanVector(1);
7279+
colNames.add("id");
7280+
colNames.add("issuccess");
7281+
cols.add(bsv);
7282+
cols.add(bbv);
7283+
BasicTable callback = new BasicTable(colNames,cols);
7284+
int count = 0;
7285+
Callback callbackHandler = new Callback(){
7286+
int count = 0;
7287+
public void writeCompletion(Table callbackTable) {
7288+
7289+
BasicStringVector idV = (BasicStringVector) callbackTable.getColumn(0);
7290+
BasicBooleanVector successV = (BasicBooleanVector) callbackTable.getColumn(1);
7291+
synchronized (callback) {
7292+
try {
7293+
callback.getColumn(0).Append(idV);
7294+
callback.getColumn(1).Append(successV);
7295+
} catch (Exception e) {
7296+
throw new RuntimeException(e);
7297+
}
7298+
}
7299+
count++;
7300+
System.out.println("COUNT:"+ count);
7301+
// for (int i = 0; i < successV.rows(); i++){
7302+
// System.out.println(idV.getString(i) + " " + successV.getBoolean(i));
7303+
// }
7304+
}
7305+
};
7306+
MultithreadedTableWriter mtw = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://test_MultithreadedTableWriter", "pt", false,
7307+
true, null, 1, 100, 1, "id", null, callbackHandler);
7308+
7309+
conn1.run("sleep(2000)");
7310+
// BasicTable re = (BasicTable) conn1.run("select port ,connectionNum from rpc(getControllerAlias(),getClusterPerf) where mode= 0");
7311+
// for (int i = 0; i < re.rows(); i++) {
7312+
// conn1.run("try{stopDataNode('"+HOST+":"+re.getColumn(0).get(i)+"')}catch(ex){}");
7313+
// conn1.run("sleep(2000)");
7314+
// }
7315+
for (int i = 0; i < 262139; i++){
7316+
try{
7317+
ErrorCodeInfo pErrorInfo = mtw.insert(Integer.toString(i), Integer.toString(i));
7318+
}
7319+
catch(RuntimeException ex)
7320+
{
7321+
System.out.println(ex.getMessage());
7322+
}
7323+
}
7324+
System.out.println(mtw.getStatus());
7325+
System.out.println("----------------------");
7326+
7327+
conn1.run("sleep(2000)");
7328+
// for (int i = 0; i < re.rows(); i++) {
7329+
// conn1.run("try{startDataNode('"+HOST+":"+re.getColumn(0).get(i)+"')}catch(ex){}");
7330+
// //conn1.run("sleep(2000)");
7331+
// }
7332+
conn1.run("sleep(2000)");
7333+
mtw.waitForThreadCompletion();
7334+
System.out.println(mtw.getStatus());
7335+
7336+
System.out.println("callback rows"+callback.rows());
7337+
//assertEquals(1000000000, callback.rows()-1);
7338+
7339+
Map<String,Entity> map = new HashMap<>();
7340+
map.put("testUpload",callback);
7341+
conn.connect(HOST, PORT, "admin", "123456");
7342+
conn.upload(map);
7343+
BasicTable act = (BasicTable) conn.run("select * from testUpload where issuccess = true order by id");
7344+
BasicTable act1 = (BasicTable) conn.run("select * from testUpload order by id");
7345+
7346+
BasicTable ex = (BasicTable)conn.run("select * from loadTable('dfs://test_MultithreadedTableWriter', 'pt') order by id");
7347+
assertEquals(ex.rows(), act.rows());
7348+
assertEquals(ex.rows(), act.rows());
7349+
for (int i = 0; i < ex.rows(); i++){
7350+
assertEquals(ex.getColumn(0).get(i).getString(), act.getColumn(0).get(i).getString());
7351+
}
7352+
conn.close();
7353+
}
7354+
71897355
}
71907356

0 commit comments

Comments
 (0)