|
5 | 5 | import com.xxdb.io.Double2;
|
6 | 6 | import com.xxdb.io.Long2;
|
7 | 7 | import com.xxdb.multithreadedtablewriter.MultithreadedTableWriter;
|
| 8 | +import com.xxdb.route.AutoFitTableUpsert; |
8 | 9 | import com.xxdb.route.PartitionedTableAppender;
|
9 | 10 | import org.junit.After;
|
10 | 11 | import org.junit.Before;
|
@@ -2289,5 +2290,244 @@ public void test_PartitionedTableAppender_allDataType_array_null() throws Except
|
2289 | 2290 | pool.shutdown();
|
2290 | 2291 | conn.close();
|
2291 | 2292 | }
|
| 2293 | + |
| 2294 | + @Test |
| 2295 | + public void Test_PartitionedTableAppender_iotAnyVector() throws Exception { |
| 2296 | + String script = "if(existsDatabase(\"dfs://testIOT_allDateType\")) dropDatabase(\"dfs://testIOT_allDateType\")\n" + |
| 2297 | + " create database \"dfs://testIOT_allDateType\" partitioned by VALUE(1..20),RANGE(2020.01.01 2022.01.01 2025.01.01), engine='TSDB'\n" + |
| 2298 | + " create table \"dfs://testIOT_allDateType\".\"pt\"(\n" + |
| 2299 | + " deviceId INT,\n" + |
| 2300 | + " timestamp TIMESTAMP,\n" + |
| 2301 | + " location SYMBOL,\n" + |
| 2302 | + " value IOTANY,\n" + |
| 2303 | + " )\n" + |
| 2304 | + "partitioned by deviceId, timestamp,\n" + |
| 2305 | + "sortColumns=[`deviceId, `location, `timestamp],\n" + |
| 2306 | + "latestKeyCache=true;\n" + |
| 2307 | + "pt = loadTable(\"dfs://testIOT_allDateType\",\"pt\");\n" ; |
| 2308 | + conn.run(script); |
| 2309 | + BasicTable bt = (BasicTable)conn.run("t=table([1] as deviceId, [now()] as timestamp, [`loc1] as location, [char('Q')] as value);\n select * from t"); |
| 2310 | + BasicTable bt1 = (BasicTable)conn.run("t=table([2] as deviceId, [now()] as timestamp, [`loc1] as location, [short(233)] as value);\n select * from t"); |
| 2311 | + BasicTable bt2 = (BasicTable)conn.run("t=table([3] as deviceId, [now()] as timestamp, [`loc1] as location, [int(-233)] as value);\n select * from t"); |
| 2312 | + BasicTable bt3 = (BasicTable)conn.run("t=table([4] as deviceId, [now()] as timestamp, [`loc1] as location, [long(233121)] as value);\n select * from t"); |
| 2313 | + BasicTable bt4 = (BasicTable)conn.run("t=table([5] as deviceId, [now()] as timestamp, [`loc1] as location, [true] as value);\n select * from t"); |
| 2314 | + BasicTable bt5 = (BasicTable)conn.run("t=table([6] as deviceId, [now()] as timestamp, [`loc1] as location, [233.34f] as value);\n select * from t"); |
| 2315 | + BasicTable bt6 = (BasicTable)conn.run("t=table([7] as deviceId, [now()] as timestamp, [`loc1] as location, [233.34] as value);\n select * from t"); |
| 2316 | + BasicTable bt7 = (BasicTable)conn.run("t=table([8] as deviceId, [now()] as timestamp, [`loc1] as location, [`loc1] as value);\n select * from t"); |
| 2317 | + BasicTable bt8 = (BasicTable)conn.run("t=table(12..14 as deviceId, [now(),2022.06.13 13:30:10.008,2020.06.13 13:30:10.008] as timestamp, [`loc1`loc2`loc3] as location, [symbol(`AAA`bbb`xxx)] as value);\n select * from t"); |
| 2318 | + System.out.println(bt8.getString()); |
| 2319 | + ExclusiveDBConnectionPool pool = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",3,false,false); |
| 2320 | + PartitionedTableAppender appender = new PartitionedTableAppender("dfs://testIOT_allDateType","pt","deviceId", pool); |
| 2321 | + appender.append(bt); |
| 2322 | + appender.append(bt1); |
| 2323 | + appender.append(bt2); |
| 2324 | + appender.append(bt3); |
| 2325 | + appender.append(bt4); |
| 2326 | + appender.append(bt5); |
| 2327 | + appender.append(bt6); |
| 2328 | + appender.append(bt7); |
| 2329 | + appender.append(bt8); |
| 2330 | + BasicTable bt10 = (BasicTable) conn.run("select * from loadTable(\"dfs://testIOT_allDateType\",`pt);"); |
| 2331 | + assertEquals(11, bt10.rows()); |
| 2332 | + System.out.println(bt10.getString()); |
| 2333 | + assertEquals("['Q',233,-233,233121,true,233.33999634,233.34,loc1,AAA,bbb,xxx]", bt10.getColumn(3).getString()); |
| 2334 | + pool.shutdown(); |
| 2335 | + } |
| 2336 | + |
| 2337 | + @Test |
| 2338 | + public void Test_PartitionedTableAppender_iotAnyVector_compress_true() throws Exception { |
| 2339 | + String script = "if(existsDatabase(\"dfs://testIOT_allDateType\")) dropDatabase(\"dfs://testIOT_allDateType\")\n" + |
| 2340 | + " create database \"dfs://testIOT_allDateType\" partitioned by VALUE(1..20),RANGE(2020.01.01 2022.01.01 2025.01.01), engine='TSDB'\n" + |
| 2341 | + " create table \"dfs://testIOT_allDateType\".\"pt\"(\n" + |
| 2342 | + " deviceId INT,\n" + |
| 2343 | + " timestamp TIMESTAMP,\n" + |
| 2344 | + " location SYMBOL,\n" + |
| 2345 | + " value IOTANY,\n" + |
| 2346 | + " )\n" + |
| 2347 | + "partitioned by deviceId, timestamp,\n" + |
| 2348 | + "sortColumns=[`deviceId, `location, `timestamp],\n" + |
| 2349 | + "latestKeyCache=true;\n" + |
| 2350 | + "pt = loadTable(\"dfs://testIOT_allDateType\",\"pt\");\n" ; |
| 2351 | + conn.run(script); |
| 2352 | + BasicTable bt = (BasicTable)conn.run("t=table([1] as deviceId, [now()] as timestamp, [`loc1] as location, [char('Q')] as value);\n select * from t"); |
| 2353 | + BasicTable bt1 = (BasicTable)conn.run("t=table([2] as deviceId, [now()] as timestamp, [`loc1] as location, [short(233)] as value);\n select * from t"); |
| 2354 | + BasicTable bt2 = (BasicTable)conn.run("t=table([3] as deviceId, [now()] as timestamp, [`loc1] as location, [int(-233)] as value);\n select * from t"); |
| 2355 | + BasicTable bt3 = (BasicTable)conn.run("t=table([4] as deviceId, [now()] as timestamp, [`loc1] as location, [long(233121)] as value);\n select * from t"); |
| 2356 | + BasicTable bt4 = (BasicTable)conn.run("t=table([5] as deviceId, [now()] as timestamp, [`loc1] as location, [true] as value);\n select * from t"); |
| 2357 | + BasicTable bt5 = (BasicTable)conn.run("t=table([6] as deviceId, [now()] as timestamp, [`loc1] as location, [233.34f] as value);\n select * from t"); |
| 2358 | + BasicTable bt6 = (BasicTable)conn.run("t=table([7] as deviceId, [now()] as timestamp, [`loc1] as location, [233.34] as value);\n select * from t"); |
| 2359 | + BasicTable bt7 = (BasicTable)conn.run("t=table([8] as deviceId, [now()] as timestamp, [`loc1] as location, [`loc1] as value);\n select * from t"); |
| 2360 | + BasicTable bt8 = (BasicTable)conn.run("t=table(12..14 as deviceId, [now(),2022.06.13 13:30:10.008,2020.06.13 13:30:10.008] as timestamp, [`loc1`loc2`loc3] as location, [symbol(`AAA`bbb`xxx)] as value);\n select * from t"); |
| 2361 | + System.out.println(bt8.getString()); |
| 2362 | + DBConnectionPool pool = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456", 10, true, true, null,null,true,false,false); |
| 2363 | + PartitionedTableAppender appender = new PartitionedTableAppender("dfs://testIOT_allDateType","pt","deviceId", pool); |
| 2364 | + appender.append(bt); |
| 2365 | + appender.append(bt1); |
| 2366 | + appender.append(bt2); |
| 2367 | + appender.append(bt3); |
| 2368 | + appender.append(bt4); |
| 2369 | + appender.append(bt5); |
| 2370 | + appender.append(bt6); |
| 2371 | + appender.append(bt7); |
| 2372 | + appender.append(bt8); |
| 2373 | + BasicTable bt10 = (BasicTable) conn.run("select * from loadTable(\"dfs://testIOT_allDateType\",`pt);"); |
| 2374 | + assertEquals(11, bt10.rows()); |
| 2375 | + System.out.println(bt10.getString()); |
| 2376 | + assertEquals("['Q',233,-233,233121,true,233.33999634,233.34,loc1,AAA,bbb,xxx]", bt10.getColumn(3).getString()); |
| 2377 | + pool.shutdown(); |
| 2378 | + } |
| 2379 | + @Test |
| 2380 | + public void Test_PartitionedTableAppender_iotAnyVector_null() throws Exception { |
| 2381 | + String script = "if(existsDatabase(\"dfs://testIOT_allDateType\")) dropDatabase(\"dfs://testIOT_allDateType\")\n" + |
| 2382 | + " create database \"dfs://testIOT_allDateType\" partitioned by VALUE(1..20),RANGE(2020.01.01 2022.01.01 2025.01.01), engine='TSDB'\n" + |
| 2383 | + " create table \"dfs://testIOT_allDateType\".\"pt\"(\n" + |
| 2384 | + " deviceId INT,\n" + |
| 2385 | + " timestamp TIMESTAMP,\n" + |
| 2386 | + " location SYMBOL,\n" + |
| 2387 | + " value IOTANY,\n" + |
| 2388 | + " )\n" + |
| 2389 | + "partitioned by deviceId, timestamp,\n" + |
| 2390 | + "sortColumns=[`deviceId, `location, `timestamp],\n" + |
| 2391 | + "latestKeyCache=true;\n" + |
| 2392 | + "pt = loadTable(\"dfs://testIOT_allDateType\",\"pt\");\n" ; |
| 2393 | + conn.run(script); |
| 2394 | + BasicTable bt = (BasicTable)conn.run("t=table([1] as deviceId, [now()] as timestamp, [`loc1] as location, [char(NULL)] as value);\n select * from t"); |
| 2395 | + BasicTable bt1 = (BasicTable)conn.run("t=table([2] as deviceId, [now()] as timestamp, [`loc1] as location, [short(NULL)] as value);\n select * from t"); |
| 2396 | + BasicTable bt2 = (BasicTable)conn.run("t=table([3] as deviceId, [now()] as timestamp, [`loc1] as location, [int(NULL)] as value);\n select * from t"); |
| 2397 | + BasicTable bt3 = (BasicTable)conn.run("t=table([4] as deviceId, [now()] as timestamp, [`loc1] as location, [long(NULL)] as value);\n select * from t"); |
| 2398 | + BasicTable bt4 = (BasicTable)conn.run("t=table([5] as deviceId, [now()] as timestamp, [`loc1] as location, [bool(NULL)] as value);\n select * from t"); |
| 2399 | + BasicTable bt5 = (BasicTable)conn.run("t=table([6] as deviceId, [now()] as timestamp, [`loc1] as location, [float(NULL)] as value);\n select * from t"); |
| 2400 | + BasicTable bt6 = (BasicTable)conn.run("t=table([7] as deviceId, [now()] as timestamp, [`loc1] as location, [double(NULL)] as value);\n select * from t"); |
| 2401 | + BasicTable bt7 = (BasicTable)conn.run("t=table([8] as deviceId, [now()] as timestamp, [`loc1] as location, [string(NULL)] as value);\n select * from t"); |
| 2402 | + BasicTable bt8 = (BasicTable)conn.run("t=table(12..14 as deviceId, [now(),2022.06.13 13:30:10.008,2020.06.13 13:30:10.008] as timestamp, `loc1`loc2`loc3 as location, symbol([NULL,`bbb,`AAA]) as value);\n select * from t limit 1 "); |
| 2403 | + List<String> colNames = new ArrayList<String>(); |
| 2404 | + colNames.add("deviceId"); |
| 2405 | + colNames.add("timestamp"); |
| 2406 | + colNames.add("location"); |
| 2407 | + colNames.add("value"); |
| 2408 | + List<Vector> cols = new ArrayList<Vector>(); |
| 2409 | + cols.add(new BasicIntVector(0)); |
| 2410 | + cols.add(new BasicTimestampVector(0)); |
| 2411 | + cols.add(new BasicStringVector(0)); |
| 2412 | + cols.add(new BasicIntVector(0)); |
| 2413 | + BasicTable bt9 = new BasicTable(colNames,cols); |
| 2414 | + ExclusiveDBConnectionPool pool = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",3,false,false); |
| 2415 | + PartitionedTableAppender appender = new PartitionedTableAppender("dfs://testIOT_allDateType","pt","deviceId", pool); |
| 2416 | + |
| 2417 | + appender.append(bt); |
| 2418 | + appender.append(bt1); |
| 2419 | + appender.append(bt2); |
| 2420 | + appender.append(bt3); |
| 2421 | + appender.append(bt4); |
| 2422 | + appender.append(bt5); |
| 2423 | + appender.append(bt6); |
| 2424 | + appender.append(bt7); |
| 2425 | + appender.append(bt8); |
| 2426 | + appender.append(bt9); |
| 2427 | + BasicTable bt10 = (BasicTable) conn.run("select * from loadTable(\"dfs://testIOT_allDateType\",`pt);"); |
| 2428 | + assertEquals(9, bt10.rows()); |
| 2429 | + System.out.println(bt10.getColumn(3).getString()); |
| 2430 | + assertEquals("[,,,,,,,,]", bt10.getColumn(3).getString()); |
| 2431 | + pool.shutdown(); |
| 2432 | + } |
| 2433 | + |
| 2434 | + @Test |
| 2435 | + public void Test_PartitionedTableAppender_iotAnyVector_null_compress_true() throws Exception { |
| 2436 | + String script = "if(existsDatabase(\"dfs://testIOT_allDateType\")) dropDatabase(\"dfs://testIOT_allDateType\")\n" + |
| 2437 | + " create database \"dfs://testIOT_allDateType\" partitioned by VALUE(1..20),RANGE(2020.01.01 2022.01.01 2025.01.01), engine='TSDB'\n" + |
| 2438 | + " create table \"dfs://testIOT_allDateType\".\"pt\"(\n" + |
| 2439 | + " deviceId INT,\n" + |
| 2440 | + " timestamp TIMESTAMP,\n" + |
| 2441 | + " location SYMBOL,\n" + |
| 2442 | + " value IOTANY,\n" + |
| 2443 | + " )\n" + |
| 2444 | + "partitioned by deviceId, timestamp,\n" + |
| 2445 | + "sortColumns=[`deviceId, `location, `timestamp],\n" + |
| 2446 | + "latestKeyCache=true;\n" + |
| 2447 | + "pt = loadTable(\"dfs://testIOT_allDateType\",\"pt\");\n" ; |
| 2448 | + conn.run(script); |
| 2449 | + BasicTable bt = (BasicTable)conn.run("t=table([1] as deviceId, [now()] as timestamp, [`loc1] as location, [char(NULL)] as value);\n select * from t"); |
| 2450 | + BasicTable bt1 = (BasicTable)conn.run("t=table([2] as deviceId, [now()] as timestamp, [`loc1] as location, [short(NULL)] as value);\n select * from t"); |
| 2451 | + BasicTable bt2 = (BasicTable)conn.run("t=table([3] as deviceId, [now()] as timestamp, [`loc1] as location, [int(NULL)] as value);\n select * from t"); |
| 2452 | + BasicTable bt3 = (BasicTable)conn.run("t=table([4] as deviceId, [now()] as timestamp, [`loc1] as location, [long(NULL)] as value);\n select * from t"); |
| 2453 | + BasicTable bt4 = (BasicTable)conn.run("t=table([5] as deviceId, [now()] as timestamp, [`loc1] as location, [bool(NULL)] as value);\n select * from t"); |
| 2454 | + BasicTable bt5 = (BasicTable)conn.run("t=table([6] as deviceId, [now()] as timestamp, [`loc1] as location, [float(NULL)] as value);\n select * from t"); |
| 2455 | + BasicTable bt6 = (BasicTable)conn.run("t=table([7] as deviceId, [now()] as timestamp, [`loc1] as location, [double(NULL)] as value);\n select * from t"); |
| 2456 | + BasicTable bt7 = (BasicTable)conn.run("t=table([8] as deviceId, [now()] as timestamp, [`loc1] as location, [string(NULL)] as value);\n select * from t"); |
| 2457 | + BasicTable bt8 = (BasicTable)conn.run("t=table(12..14 as deviceId, [now(),2022.06.13 13:30:10.008,2020.06.13 13:30:10.008] as timestamp, `loc1`loc2`loc3 as location, symbol([NULL,`bbb,`AAA]) as value);\n select * from t limit 1 "); |
| 2458 | + List<String> colNames = new ArrayList<String>(); |
| 2459 | + colNames.add("deviceId"); |
| 2460 | + colNames.add("timestamp"); |
| 2461 | + colNames.add("location"); |
| 2462 | + colNames.add("value"); |
| 2463 | + List<Vector> cols = new ArrayList<Vector>(); |
| 2464 | + cols.add(new BasicIntVector(0)); |
| 2465 | + cols.add(new BasicTimestampVector(0)); |
| 2466 | + cols.add(new BasicStringVector(0)); |
| 2467 | + cols.add(new BasicIntVector(0)); |
| 2468 | + BasicTable bt9 = new BasicTable(colNames,cols); |
| 2469 | + DBConnectionPool pool = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456", 10, true, true, null,null,true,false,false); |
| 2470 | + PartitionedTableAppender appender = new PartitionedTableAppender("dfs://testIOT_allDateType","pt","deviceId", pool); |
| 2471 | + appender.append(bt); |
| 2472 | + appender.append(bt1); |
| 2473 | + appender.append(bt2); |
| 2474 | + appender.append(bt3); |
| 2475 | + appender.append(bt4); |
| 2476 | + appender.append(bt5); |
| 2477 | + appender.append(bt6); |
| 2478 | + appender.append(bt7); |
| 2479 | + appender.append(bt8); |
| 2480 | + appender.append(bt9); |
| 2481 | + BasicTable bt10 = (BasicTable) conn.run("select * from loadTable(\"dfs://testIOT_allDateType\",`pt);"); |
| 2482 | + assertEquals(9, bt10.rows()); |
| 2483 | + System.out.println(bt10.getColumn(3).getString()); |
| 2484 | + assertEquals("[,,,,,,,,]", bt10.getColumn(3).getString()); |
| 2485 | + pool.shutdown(); |
| 2486 | + } |
| 2487 | + |
| 2488 | + @Test |
| 2489 | + public void Test_PartitionedTableAppender_iotAnyVector_big_data() throws Exception { |
| 2490 | + String script = "if(existsDatabase(\"dfs://testIOT_allDateType1\")) dropDatabase(\"dfs://testIOT_allDateType1\")\n" + |
| 2491 | + " create database \"dfs://testIOT_allDateType1\" partitioned by RANGE(100000*(0..10)),RANGE(2020.01.01 2022.01.01 2025.01.01), engine='TSDB'\n" + |
| 2492 | + " create table \"dfs://testIOT_allDateType1\".\"pt\"(\n" + |
| 2493 | + " deviceId INT,\n" + |
| 2494 | + " timestamp TIMESTAMP,\n" + |
| 2495 | + " location SYMBOL,\n" + |
| 2496 | + " value IOTANY,\n" + |
| 2497 | + " )\n" + |
| 2498 | + "partitioned by deviceId, timestamp,\n" + |
| 2499 | + "sortColumns=[`deviceId, `location, `timestamp],\n" + |
| 2500 | + "latestKeyCache=true;\n" + |
| 2501 | + "pt = loadTable(\"dfs://testIOT_allDateType1\",\"pt\");\n" ; |
| 2502 | + conn.run(script); |
| 2503 | + BasicTable bt = (BasicTable)conn.run("t=table(take(1..100000,1000000) as deviceId, take(now()+(0..100), 1000000) as timestamp, take(\"bb\"+string(0..100), 1000000) as location, take(char(1..100000),1000000) as value);\n select * from t"); |
| 2504 | + BasicTable bt1 = (BasicTable)conn.run("t=table(take(100001..200000,1000000) as deviceId, take(now()+(0..100), 1000000) as timestamp, take(\"bb\"+string(0..100), 1000000) as location, take(short(1..100000),1000000) as value);\n select * from t"); |
| 2505 | + BasicTable bt2 = (BasicTable)conn.run("t=table(take(200001..300000,1000000) as deviceId, take(now()+(0..100), 1000000) as timestamp, take(\"bb\"+string(0..100), 1000000) as location, take(int(1..100000),1000000) as value);\n select * from t"); |
| 2506 | + BasicTable bt3 = (BasicTable)conn.run("t=table(take(300001..400000,1000000) as deviceId, take(now()+(0..100), 1000000) as timestamp, take(\"bb\"+string(0..100), 1000000) as location, take(long(1..100000),1000000) as value);\n select * from t"); |
| 2507 | + BasicTable bt4 = (BasicTable)conn.run("t=table(take(400001..500000,1000000) as deviceId, take(now()+(0..100), 1000000) as timestamp, take(\"bb\"+string(0..100), 1000000) as location, take(true false null,1000000) as value);\n select * from t"); |
| 2508 | + BasicTable bt5 = (BasicTable)conn.run("t=table(take(500001..600000,1000000) as deviceId, take(now()+(0..100), 1000000) as timestamp, take(\"bb\"+string(0..100), 1000000) as location, take(-2.33f 0 4.44f,1000000) as value);\n select * from t"); |
| 2509 | + BasicTable bt6 = (BasicTable)conn.run("t=table(take(600001..700000,1000000) as deviceId, take(now()+(0..100), 1000000) as timestamp, take(\"bb\"+string(0..100), 1000000) as location, take(-2.33 0 4.44,1000000) as value);\n select * from t"); |
| 2510 | + BasicTable bt7 = (BasicTable)conn.run("t=table(take(700001..800000,1000000) as deviceId, take(now()+(0..100), 1000000) as timestamp, take(\"bb\"+string(0..100), 1000000) as location, take(\"bb\"+string(0..100000), 1000000) as value);\n select * from t"); |
| 2511 | + BasicTable bt8 = (BasicTable)conn.run("t=table(take(800001..900000,1000000) as deviceId, take(now()+(0..100), 1000000) as timestamp, take(\"bb\"+string(0..100), 1000000) as location, symbol(take(NULL`bbb`AAA,1000000)) as value);\n select * from t"); |
| 2512 | + System.out.println(bt8.getString()); |
| 2513 | + ExclusiveDBConnectionPool pool = new ExclusiveDBConnectionPool(HOST,PORT,"admin","123456",3,false,false); |
| 2514 | + PartitionedTableAppender appender = new PartitionedTableAppender("dfs://testIOT_allDateType1","pt","deviceId", pool); |
| 2515 | + long start = System.nanoTime(); |
| 2516 | + appender.append(bt); |
| 2517 | + appender.append(bt1); |
| 2518 | + appender.append(bt2); |
| 2519 | + appender.append(bt3); |
| 2520 | + appender.append(bt4); |
| 2521 | + appender.append(bt5); |
| 2522 | + appender.append(bt6); |
| 2523 | + appender.append(bt7); |
| 2524 | + appender.append(bt8); |
| 2525 | + long end = System.nanoTime(); |
| 2526 | + System.out.println((end - start) / 1000000); |
| 2527 | + |
| 2528 | + BasicTable bt10 = (BasicTable) conn.run("select count(*) from loadTable(\"dfs://testIOT_allDateType1\",`pt);"); |
| 2529 | + assertEquals(9000000, bt10.rows()); |
| 2530 | + pool.shutdown(); |
| 2531 | + } |
2292 | 2532 | }
|
2293 | 2533 |
|
0 commit comments