Skip to content

Commit d09ffa5

Browse files
authored
chore(query): add snapshot logs in read partitions (#16918)
1 parent 62e5f24 commit d09ffa5

File tree

2 files changed

+21
-2
lines changed

2 files changed

+21
-2
lines changed

src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use databend_common_catalog::plan::split_row_id;
2323
use databend_common_catalog::plan::PartInfoPtr;
2424
use databend_common_catalog::plan::Projection;
2525
use databend_common_catalog::table::Table;
26+
use databend_common_exception::ErrorCode;
2627
use databend_common_exception::Result;
2728
use databend_common_expression::DataBlock;
2829
use databend_common_expression::DataSchema;
@@ -142,6 +143,19 @@ impl<const BLOCKING_IO: bool> RowsFetcher for ParquetRowsFetcher<BLOCKING_IO> {
142143
})
143144
.collect::<Vec<_>>();
144145

146+
// check if row index is in valid bounds cause we don't ensure rowid is valid
147+
for (block_idx, row_idx, _) in indices.iter() {
148+
if *block_idx as usize >= blocks.len()
149+
|| *row_idx as usize >= blocks[*block_idx as usize].num_rows()
150+
{
151+
return Err(ErrorCode::Internal(format!(
152+
"RowID is invalid, block idx {block_idx}, row idx {row_idx}, blocks len {}, block idx len {:?}",
153+
blocks.len(),
154+
blocks.get(*block_idx as usize).map(|b| b.num_rows()),
155+
)));
156+
}
157+
}
158+
145159
Ok(DataBlock::take_blocks(&blocks, &indices, num_rows))
146160
}
147161

src/query/storages/fuse/src/operations/read_partitions.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ use databend_storages_common_pruner::BlockMetaIndex;
3939
use databend_storages_common_table_meta::meta::BlockMeta;
4040
use databend_storages_common_table_meta::meta::ColumnStatistics;
4141
use databend_storages_common_table_meta::table::ChangeType;
42-
use log::debug;
4342
use log::info;
4443
use sha2::Digest;
4544
use sha2::Sha256;
@@ -62,7 +61,6 @@ impl FuseTable {
6261
dry_run: bool,
6362
) -> Result<(PartStatistics, Partitions)> {
6463
let distributed_pruning = ctx.get_settings().get_enable_distributed_pruning()?;
65-
debug!("fuse table do read partitions, push downs:{:?}", push_downs);
6664
if let Some(changes_desc) = &self.changes_desc {
6765
// For "ANALYZE TABLE" statement, we need set the default change type to "Insert".
6866
let change_type = push_downs.as_ref().map_or(ChangeType::Insert, |v| {
@@ -74,6 +72,13 @@ impl FuseTable {
7472
}
7573

7674
let snapshot = self.read_table_snapshot().await?;
75+
76+
info!(
77+
"fuse table {} do read partitions, push downs:{:?}, snapshot id: {:?}",
78+
self.name(),
79+
push_downs,
80+
snapshot.as_ref().map(|sn| sn.snapshot_id)
81+
);
7782
match snapshot {
7883
Some(snapshot) => {
7984
let snapshot_loc = self

0 commit comments

Comments
 (0)