@@ -1463,7 +1463,7 @@ void reader::impl::preprocess_subpass_pages(read_mode mode, size_t chunk_read_li
1463
1463
page_input,
1464
1464
chunk_row_output_iter{pass.pages .device_ptr ()});
1465
1465
1466
- // copy chunk row into the subpass pages
1466
+ // copy chunk_row into the subpass pages
1467
1467
// only need to do this if we are not processing the whole pass in one subpass
1468
1468
if (!subpass.single_subpass ) {
1469
1469
thrust::for_each (rmm::exec_policy_nosync (_stream),
@@ -1481,31 +1481,42 @@ void reader::impl::preprocess_subpass_pages(read_mode mode, size_t chunk_read_li
1481
1481
// able to decode for this pass. we will have selected a set of pages for each column in the
1482
1482
// row group, but not every page will have the same number of rows. so, we can only read as many
1483
1483
// rows as the smallest batch (by column) we have decompressed.
1484
- size_t page_index = 0 ;
1485
- size_t max_row = std::numeric_limits<size_t >::max ();
1484
+ size_t first_page_index = 0 ;
1485
+ size_t max_row = std::numeric_limits<size_t >::max ();
1486
1486
auto const last_pass_row =
1487
1487
_file_itm_data.input_pass_start_row_count [_file_itm_data._current_input_pass + 1 ];
1488
+ // for each column
1488
1489
for (size_t idx = 0 ; idx < subpass.column_page_count .size (); idx++) {
1489
- auto const & last_page = subpass.pages [page_index + (subpass.column_page_count [idx] - 1 )];
1490
- auto const & chunk = pass.chunks [last_page.chunk_idx ];
1490
+ // compute max row for this column in the subpass
1491
+ auto const & last_page = subpass.pages [first_page_index + (subpass.column_page_count [idx] - 1 )];
1492
+ auto const & last_chunk = pass.chunks [last_page.chunk_idx ];
1493
+ auto max_col_row = static_cast <size_t >(last_chunk.start_row ) +
1494
+ static_cast <size_t >(last_page.chunk_row ) +
1495
+ static_cast <size_t >(last_page.num_rows );
1491
1496
1492
- size_t max_col_row =
1493
- static_cast <size_t >(chunk.start_row + last_page.chunk_row + last_page.num_rows );
1494
1497
// special case. list rows can span page boundaries, but we can't tell if that is happening
1495
1498
// here because we have not yet decoded the pages. the very last row starting in the page may
1496
1499
// not terminate in the page. to handle this, only decode up to the second to last row in the
1497
1500
// subpass since we know that will safely completed.
1498
- bool const is_list = chunk.max_level [level_type::REPETITION] > 0 ;
1501
+ bool const is_list = last_chunk.max_level [level_type::REPETITION] > 0 ;
1502
+ // corner case: only decode up to the second-to-last row, except if this is the last page in the
1503
+ // entire pass. this handles the case where we only have 1 chunk, 1 page, and potentially even
1504
+ // just 1 row.
1499
1505
if (is_list && max_col_row < last_pass_row) {
1500
- auto const & first_page = subpass.pages [page_index];
1501
- size_t const min_col_row = static_cast <size_t >(chunk.start_row + first_page.chunk_row );
1506
+ // compute min row for this column in the subpass
1507
+ auto const & first_page = subpass.pages [first_page_index];
1508
+ auto const & first_chunk = pass.chunks [first_page.chunk_idx ];
1509
+ auto const min_col_row =
1510
+ static_cast <size_t >(first_chunk.start_row ) + static_cast <size_t >(first_page.chunk_row );
1511
+
1512
+ // must have at least 2 rows in the subpass.
1502
1513
CUDF_EXPECTS ((max_col_row - min_col_row) > 1 , " Unexpected short subpass" );
1503
1514
max_col_row--;
1504
1515
}
1505
1516
1506
1517
max_row = min (max_row, max_col_row);
1507
1518
1508
- page_index += subpass.column_page_count [idx];
1519
+ first_page_index += subpass.column_page_count [idx];
1509
1520
}
1510
1521
subpass.skip_rows = pass.skip_rows + pass.processed_rows ;
1511
1522
auto const pass_end = pass.skip_rows + pass.num_rows ;
0 commit comments