Skip to content

Commit fb5dbcb

Browse files
authored
fix: incorrect table data disk cache key (#16837)
* fix: incorrect table data disck cache key should use `offset` and `len` of column meta as corresponding parts of table data cache key * refactor: enforce using same column data key
1 parent 6c05298 commit fb5dbcb

File tree

1 file changed

+31
-9
lines changed

1 file changed

+31
-9
lines changed

src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ impl BlockReader {
4848
let column_array_cache = CacheManager::instance().get_table_data_array_cache();
4949
let mut cached_column_data = vec![];
5050
let mut cached_column_array = vec![];
51+
52+
let column_cache_key_builder = ColumnCacheKeyBuilder::new(location);
53+
5154
for (_index, (column_id, ..)) in self.project_indices.iter() {
5255
if let Some(ignore_column_ids) = ignore_column_ids {
5356
if ignore_column_ids.contains(column_id) {
@@ -58,7 +61,7 @@ impl BlockReader {
5861
if let Some(column_meta) = columns_meta.get(column_id) {
5962
let (offset, len) = column_meta.offset_length();
6063

61-
let column_cache_key = TableDataCacheKey::new(location, *column_id, offset, len);
64+
let column_cache_key = column_cache_key_builder.cache_cache(column_id, column_meta);
6265

6366
// first, check in memory table data cache
6467
// column_array_cache
@@ -91,20 +94,25 @@ impl BlockReader {
9194
.await?;
9295

9396
if self.put_cache {
94-
let table_data_cache = CacheManager::instance().get_table_data_cache();
9597
// add raw data (compressed raw bytes) to column cache
9698
for (column_id, (chunk_idx, range)) in &merge_io_result.columns_chunk_offsets {
97-
let cache_key = TableDataCacheKey::new(
98-
&merge_io_result.block_path,
99-
*column_id,
100-
range.start as u64,
101-
(range.end - range.start) as u64,
102-
);
99+
// Should NOT use `range.start` as part of the cache key,
100+
// as they are not stable and can vary for the same column depending on the query's projection.
101+
// For instance:
102+
// - `SELECT col1, col2 FROM t;`
103+
// - `SELECT col2 FROM t;`
104+
// may result in different ranges for `col2`
105+
// This can lead to cache missing or INCONSISTENCIES
106+
107+
// Safe to unwrap here, since this column has been fetched, its meta must be present.
108+
let column_meta = columns_meta.get(column_id).unwrap();
109+
let column_cache_key = column_cache_key_builder.cache_cache(column_id, column_meta);
110+
103111
let chunk_data = merge_io_result
104112
.owner_memory
105113
.get_chunk(*chunk_idx, &merge_io_result.block_path)?;
106114
let data = chunk_data.slice(range.clone());
107-
table_data_cache.insert(cache_key.as_ref().to_owned(), data);
115+
column_data_cache.insert(column_cache_key.as_ref().to_owned(), data);
108116
}
109117
}
110118

@@ -116,3 +124,17 @@ impl BlockReader {
116124
Ok(block_read_res)
117125
}
118126
}
127+
128+
struct ColumnCacheKeyBuilder<'a> {
129+
block_path: &'a str,
130+
}
131+
132+
impl<'a> ColumnCacheKeyBuilder<'a> {
133+
fn new(block_path: &'a str) -> Self {
134+
Self { block_path }
135+
}
136+
fn cache_cache(&self, column_id: &ColumnId, column_meta: &ColumnMeta) -> TableDataCacheKey {
137+
let (offset, len) = column_meta.offset_length();
138+
TableDataCacheKey::new(self.block_path, *column_id, offset, len)
139+
}
140+
}

0 commit comments

Comments
 (0)