Skip to content

Commit e1707ef

Browse files
authored
feat(query): Virtual column support stream write (#18319)
1 parent c45b4b9 commit e1707ef

File tree

10 files changed

+468
-118
lines changed

10 files changed

+468
-118
lines changed

src/query/ee/src/storages/fuse/operations/virtual_columns.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::time::Instant;
2020
use databend_common_catalog::plan::Projection;
2121
use databend_common_catalog::table::Table;
2222
use databend_common_catalog::table_context::TableContext;
23+
use databend_common_exception::ErrorCode;
2324
use databend_common_exception::Result;
2425
use databend_common_expression::BlockMetaInfoDowncast;
2526
use databend_common_expression::ComputedExpr;
@@ -99,8 +100,13 @@ pub async fn do_refresh_virtual_column(
99100
fields,
100101
..fuse_table.schema().as_ref().clone()
101102
});
102-
let virtual_column_builder =
103-
VirtualColumnBuilder::try_create(ctx.clone(), fuse_table, source_schema)?;
103+
104+
if !fuse_table.support_virtual_columns() {
105+
return Err(ErrorCode::VirtualColumnError(
106+
"table don't support virtual column".to_string(),
107+
));
108+
}
109+
let virtual_column_builder = VirtualColumnBuilder::try_create(ctx.clone(), source_schema)?;
104110

105111
let projection = Projection::Columns(field_indices);
106112
let block_reader =
@@ -299,11 +305,10 @@ impl AsyncTransform for VirtualColumnTransform {
299305
.and_then(BlockMeta::downcast_ref_from)
300306
.unwrap();
301307

302-
let virtual_column_state = self.virtual_column_builder.add_block(
303-
&data_block,
304-
&self.write_settings,
305-
&block_meta.location,
306-
)?;
308+
self.virtual_column_builder.add_block(&data_block)?;
309+
let virtual_column_state = self
310+
.virtual_column_builder
311+
.finalize(&self.write_settings, &block_meta.location)?;
307312

308313
if virtual_column_state
309314
.draft_virtual_block_meta

src/query/ee/tests/it/aggregating_index/index_scan.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use databend_common_expression::block_debug::pretty_format_blocks;
2323
use databend_common_expression::DataBlock;
2424
use databend_common_expression::SendableDataBlockStream;
2525
use databend_common_expression::SortColumnDescription;
26-
use databend_common_sql::optimizer::SExpr;
26+
use databend_common_sql::optimizer::ir::SExpr;
2727
use databend_common_sql::planner::plans::Plan;
2828
use databend_common_sql::plans::RelOperator;
2929
use databend_common_sql::Planner;
@@ -724,7 +724,6 @@ fn get_sort_col_descs(num_cols: usize) -> Vec<SortColumnDescription> {
724724
offset: i,
725725
nulls_first: false,
726726
asc: true,
727-
is_nullable: false,
728727
});
729728
}
730729
sorts
@@ -1041,10 +1040,7 @@ async fn test_fuzz_impl(format: &str, spill: bool) -> Result<()> {
10411040
"query_out_of_memory_behavior".to_string(),
10421041
"spilling".to_string(),
10431042
),
1044-
(
1045-
"max_query_memory_usage".to_string(),
1046-
"1".to_string(),
1047-
),
1043+
("max_query_memory_usage".to_string(), "1".to_string()),
10481044
]))
10491045
} else {
10501046
None
@@ -1057,7 +1053,7 @@ async fn test_fuzz_impl(format: &str, spill: bool) -> Result<()> {
10571053
if let Some(s) = spill_settings.as_ref() {
10581054
let settings = session.get_settings();
10591055
// Make sure the operator will spill the aggregation.
1060-
settings.set_batch_settings(s)?;
1056+
settings.set_batch_settings(s, false)?;
10611057
}
10621058

10631059
// Prepare table and data

src/query/ee/tests/it/storages/fuse/operations/virtual_columns_builder.rs

Lines changed: 188 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ async fn test_virtual_column_builder() -> Result<()> {
5454
0,
5555
); // Dummy location
5656

57-
let builder = VirtualColumnBuilder::try_create(ctx, fuse_table, schema).unwrap();
57+
let mut builder = VirtualColumnBuilder::try_create(ctx, schema).unwrap();
5858

5959
let block = DataBlock::new(
6060
vec![
@@ -83,7 +83,8 @@ async fn test_virtual_column_builder() -> Result<()> {
8383
3,
8484
);
8585

86-
let result = builder.add_block(&block, &write_settings, &location)?;
86+
builder.add_block(&block)?;
87+
let result = builder.finalize(&write_settings, &location)?;
8788

8889
assert!(!result.data.is_empty());
8990
assert_eq!(
@@ -195,7 +196,8 @@ async fn test_virtual_column_builder() -> Result<()> {
195196
8,
196197
);
197198

198-
let result = builder.add_block(&block, &write_settings, &location)?;
199+
builder.add_block(&block)?;
200+
let result = builder.finalize(&write_settings, &location)?;
199201

200202
// Expected columns: id, create, text, user.id, replies, geo.lat
201203
assert_eq!(
@@ -302,14 +304,196 @@ async fn test_virtual_column_builder() -> Result<()> {
302304
entries, 8, // Number of rows
303305
);
304306

305-
let result = builder.add_block(&block, &write_settings, &location)?;
307+
builder.add_block(&block)?;
308+
let result = builder.finalize(&write_settings, &location)?;
306309

307310
// all columns should be discarded due to > 70% nulls
308311
assert!(result.data.is_empty());
309312

310313
Ok(())
311314
}
312315

316+
#[tokio::test(flavor = "multi_thread")]
317+
async fn test_virtual_column_builder_stream_write() -> Result<()> {
318+
let fixture = TestFixture::setup_with_custom(EESetup::new()).await?;
319+
320+
fixture
321+
.default_session()
322+
.get_settings()
323+
.set_enable_experimental_virtual_column(1)?;
324+
fixture.create_default_database().await?;
325+
fixture.create_variant_table().await?;
326+
327+
let ctx = fixture.new_query_ctx().await?;
328+
329+
let table = fixture.latest_default_table().await?;
330+
let table_info = table.get_table_info();
331+
let schema = table_info.meta.schema.clone();
332+
333+
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
334+
335+
let write_settings = fuse_table.get_write_settings();
336+
let location = (
337+
"_b/h0196236b460676369cfcf6fec0dedefa_v2.parquet".to_string(),
338+
0,
339+
); // Dummy location
340+
341+
let mut builder = VirtualColumnBuilder::try_create(ctx, schema).unwrap();
342+
343+
// Create blocks with consistent schema across all blocks
344+
let blocks = vec![
345+
// Block 1: Simple nested structure
346+
DataBlock::new(
347+
vec![
348+
(Int32Type::from_data(vec![1, 2, 3])).into(),
349+
(VariantType::from_opt_data(vec![
350+
Some(
351+
OwnedJsonb::from_str(r#"{"user": {"id": 1, "name": "Alice"}, "score": 100}"#)
352+
.unwrap()
353+
.to_vec(),
354+
),
355+
Some(
356+
OwnedJsonb::from_str(r#"{"user": {"id": 2, "name": "Bob"}, "score": 85}"#)
357+
.unwrap()
358+
.to_vec(),
359+
),
360+
Some(
361+
OwnedJsonb::from_str(r#"{"user": {"id": 3, "name": "Charlie"}, "score": 92}"#)
362+
.unwrap()
363+
.to_vec(),
364+
),
365+
]))
366+
.into(),
367+
],
368+
3,
369+
),
370+
// Block 2: Same structure, different values
371+
DataBlock::new(
372+
vec![
373+
(Int32Type::from_data(vec![4, 5, 6])).into(),
374+
(VariantType::from_opt_data(vec![
375+
Some(
376+
OwnedJsonb::from_str(r#"{"user": {"id": 4, "name": "Dave"}, "score": 78}"#)
377+
.unwrap()
378+
.to_vec(),
379+
),
380+
Some(
381+
OwnedJsonb::from_str(r#"{"user": {"id": 5, "name": "Eve"}, "score": 95}"#)
382+
.unwrap()
383+
.to_vec(),
384+
),
385+
Some(
386+
OwnedJsonb::from_str(r#"{"user": {"id": 6, "name": "Frank"}, "score": 88}"#)
387+
.unwrap()
388+
.to_vec(),
389+
),
390+
]))
391+
.into(),
392+
],
393+
3,
394+
),
395+
// Block 3: Same structure with additional fields
396+
DataBlock::new(
397+
vec![
398+
(Int32Type::from_data(vec![7, 8, 9])).into(),
399+
(VariantType::from_opt_data(vec![
400+
Some(
401+
OwnedJsonb::from_str(r#"{"user": {"id": 7, "name": "Grace", "active": true}, "score": 91, "tags": ["expert"]}"#)
402+
.unwrap()
403+
.to_vec(),
404+
),
405+
Some(
406+
OwnedJsonb::from_str(r#"{"user": {"id": 8, "name": "Heidi", "active": false}, "score": 75, "tags": ["novice"]}"#)
407+
.unwrap()
408+
.to_vec(),
409+
),
410+
Some(
411+
OwnedJsonb::from_str(r#"{"user": {"id": 9, "name": "Ivan", "active": true}, "score": 89, "tags": ["intermediate"]}"#)
412+
.unwrap()
413+
.to_vec(),
414+
),
415+
]))
416+
.into(),
417+
],
418+
3,
419+
),
420+
];
421+
422+
// Stream write: add each block to the builder
423+
for block in &blocks {
424+
builder.add_block(block)?;
425+
}
426+
427+
// Finalize once after adding all blocks
428+
let result = builder.finalize(&write_settings, &location)?;
429+
430+
// Verify the result
431+
assert!(!result.data.is_empty());
432+
433+
// We expect virtual columns for user.id, user.name, user.active, score, and tags[0]
434+
assert_eq!(
435+
result.draft_virtual_block_meta.virtual_column_metas.len(),
436+
5
437+
);
438+
439+
// Check user.id column
440+
let meta_user_id = find_virtual_col(
441+
&result.draft_virtual_block_meta.virtual_column_metas,
442+
1,
443+
"['user']['id']",
444+
)
445+
.expect("Virtual column ['user']['id'] not found");
446+
assert_eq!(meta_user_id.source_column_id, 1);
447+
assert_eq!(meta_user_id.name, "['user']['id']");
448+
assert_eq!(meta_user_id.data_type, VariantDataType::UInt64);
449+
450+
// Check user.name column
451+
let meta_user_name = find_virtual_col(
452+
&result.draft_virtual_block_meta.virtual_column_metas,
453+
1,
454+
"['user']['name']",
455+
)
456+
.expect("Virtual column ['user']['name'] not found");
457+
assert_eq!(meta_user_name.source_column_id, 1);
458+
assert_eq!(meta_user_name.name, "['user']['name']");
459+
assert_eq!(meta_user_name.data_type, VariantDataType::String);
460+
461+
// Check score column
462+
let meta_score = find_virtual_col(
463+
&result.draft_virtual_block_meta.virtual_column_metas,
464+
1,
465+
"['score']",
466+
)
467+
.expect("Virtual column ['score'] not found");
468+
assert_eq!(meta_score.source_column_id, 1);
469+
assert_eq!(meta_score.name, "['score']");
470+
assert_eq!(meta_score.data_type, VariantDataType::UInt64);
471+
472+
// Check user.active column (only present in the third block)
473+
let meta_user_active = find_virtual_col(
474+
&result.draft_virtual_block_meta.virtual_column_metas,
475+
1,
476+
"['user']['active']",
477+
)
478+
.expect("Virtual column ['user']['active'] not found");
479+
assert_eq!(meta_user_active.source_column_id, 1);
480+
assert_eq!(meta_user_active.name, "['user']['active']");
481+
assert_eq!(meta_user_active.data_type, VariantDataType::Boolean);
482+
483+
// Check tags[0] column (only present in the third block)
484+
let meta_tags = find_virtual_col(
485+
&result.draft_virtual_block_meta.virtual_column_metas,
486+
1,
487+
"['tags'][0]",
488+
)
489+
.expect("Virtual column ['tags'][0] not found");
490+
assert_eq!(meta_tags.source_column_id, 1);
491+
assert_eq!(meta_tags.name, "['tags'][0]");
492+
assert_eq!(meta_tags.data_type, VariantDataType::String);
493+
494+
Ok(())
495+
}
496+
313497
fn find_virtual_col<'a>(
314498
metas: &'a [DraftVirtualColumnMeta],
315499
source_id: ColumnId,

src/query/storages/fuse/src/fuse_table.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1204,7 +1204,19 @@ impl Table for FuseTable {
12041204
}
12051205

12061206
fn support_virtual_columns(&self) -> bool {
1207-
matches!(self.storage_format, FuseStorageFormat::Parquet)
1207+
if matches!(self.storage_format, FuseStorageFormat::Parquet)
1208+
&& matches!(self.table_type, FuseTableType::Standard)
1209+
{
1210+
// ignore persistent system tables {
1211+
if let Ok(database_name) = self.table_info.database_name() {
1212+
if database_name == "persistent_system" {
1213+
return false;
1214+
}
1215+
}
1216+
true
1217+
} else {
1218+
false
1219+
}
12081220
}
12091221

12101222
fn result_can_be_cached(&self) -> bool {

src/query/storages/fuse/src/io/write/block_writer.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,11 +180,10 @@ impl BlockBuilder {
180180

181181
let virtual_column_state =
182182
if let Some(ref virtual_column_builder) = self.virtual_column_builder {
183-
let virtual_column_state = virtual_column_builder.add_block(
184-
&data_block,
185-
&self.write_settings,
186-
&block_location,
187-
)?;
183+
let mut virtual_column_builder = virtual_column_builder.clone();
184+
virtual_column_builder.add_block(&data_block)?;
185+
let virtual_column_state =
186+
virtual_column_builder.finalize(&self.write_settings, &block_location)?;
188187
Some(virtual_column_state)
189188
} else {
190189
None

0 commit comments

Comments
 (0)