Skip to content

Commit 8d3ceb7

Browse files
authored
fix(query): fix incorrect left mark join schema (#17642)
* fix(query): fix incorrect left mark join schema * update
1 parent 43ab83f commit 8d3ceb7

File tree

5 files changed

+81
-37
lines changed

5 files changed

+81
-37
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -259,11 +259,18 @@ impl HashJoinProbeState {
259259
});
260260
let probe_keys = (&keys_columns).into();
261261

262-
if self.hash_join_state.hash_join_desc.join_type != JoinType::LeftMark {
263-
input = input.project(&self.probe_projections);
264-
}
265-
probe_state.generation_state.is_probe_projected = input.num_columns() > 0;
262+
let probe_has_null = if self.join_type() == JoinType::LeftMark {
263+
match &input.get_by_offset(0).value {
264+
Value::Scalar(Scalar::Null) => true,
265+
Value::Column(Column::Nullable(c)) if c.validity.null_count() > 0 => true,
266+
_ => false,
267+
}
268+
} else {
269+
false
270+
};
271+
input = input.project(&self.probe_projections);
266272

273+
probe_state.generation_state.is_probe_projected = input.num_columns() > 0;
267274
if self.hash_join_state.fast_return.load(Ordering::Acquire)
268275
&& matches!(
269276
self.hash_join_state.hash_join_desc.join_type,
@@ -310,6 +317,7 @@ impl HashJoinProbeState {
310317

311318
probe_state.process_state = Some(ProcessState {
312319
input,
320+
probe_has_null,
313321
keys_state,
314322
next_idx: 0,
315323
});

src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,8 @@ impl HashJoinProbeState {
5050
let max_block_size = probe_state.max_block_size;
5151
// `probe_column` is the subquery result column.
5252
// For sql: select * from t1 where t1.a in (select t2.a from t2); t2.a is the `probe_column`,
53-
let probe_column = process_state
54-
.input
55-
.get_by_offset(0)
56-
.to_column(process_state.input.num_rows());
5753
// Check if there is any null in the probe column.
58-
if matches!(probe_column.validity().1, Some(x) if x.null_count() > 0) {
54+
if process_state.probe_has_null {
5955
let mut has_null = self
6056
.hash_join_state
6157
.hash_join_desc
@@ -148,12 +144,8 @@ impl HashJoinProbeState {
148144
let max_block_size = probe_state.max_block_size;
149145
// `probe_column` is the subquery result column.
150146
// For sql: select * from t1 where t1.a in (select t2.a from t2); t2.a is the `probe_column`,
151-
let probe_column = process_state
152-
.input
153-
.get_by_offset(0)
154-
.to_column(process_state.input.num_rows());
155147
// Check if there is any null in the probe column.
156-
if matches!(probe_column.validity().1, Some(x) if x.null_count() > 0) {
148+
if process_state.probe_has_null {
157149
let mut has_null = self
158150
.hash_join_state
159151
.hash_join_desc

src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ use crate::sql::plans::JoinType;
2727
#[derive(Debug)]
2828
pub struct ProcessState {
2929
pub input: DataBlock,
30+
// used in left mark join now
31+
pub probe_has_null: bool,
3032
pub keys_state: KeysState,
3133
pub next_idx: usize,
3234
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
statement ok
2+
CREATE OR REPLACE TABLE employees AS
3+
SELECT * FROM (
4+
VALUES
5+
(1, 1, 50000),
6+
(2, 2, 60000),
7+
(3, 3, 70000)
8+
) AS employees(employee_id, name, salary);
9+
10+
statement ok
11+
CREATE OR REPLACE TABLE departments AS
12+
SELECT * FROM (
13+
VALUES
14+
(101, 1, 1),
15+
(102, 2, 2),
16+
(103, 3, 3)
17+
) AS departments(department_id, department_name, manager_id);
18+
19+
statement ok
20+
CREATE OR REPLACE TABLE salaries AS
21+
SELECT * FROM (
22+
VALUES
23+
(1, 50000),
24+
(2, 60000),
25+
(3, 70000),
26+
(4, 80000),
27+
(5, 90000)
28+
) AS salaries(employee_id, salary);
29+
30+
31+
query TT
32+
select * from employees e where e.salary > ALL (
33+
SELECT salary
34+
FROM salaries
35+
WHERE employee_id <> e.employee_id
36+
);
37+
----
38+
39+
query TT
40+
SELECT
41+
e.employee_id,
42+
d.department_name
43+
FROM
44+
employees e
45+
INNER JOIN
46+
departments d
47+
ON
48+
e.employee_id = d.manager_id
49+
WHERE
50+
e.salary > ALL (
51+
SELECT salary
52+
FROM salaries
53+
WHERE employee_id <> e.employee_id
54+
)
55+
ORDER BY employee_id;
56+
----
57+

tests/sqllogictests/suites/query/subquery.test

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,15 @@
11
statement ok
22
use default
33

4-
statement ok
5-
DROP TABLE IF EXISTS c
6-
7-
statement ok
8-
DROP TABLE IF EXISTS o
9-
104

115
query error 1065
126
SELECT * FROM (SELECT 1 AS x) AS ss1 LEFT OUTER JOIN (SELECT 2 DIV 228 AS y) AS ss2 ON TRUE, LATERAL (SELECT ss2.y AS z LIMIT 1) AS ss3
137

148
statement ok
15-
CREATE TABLE c (c_id INT NULL, bill VARCHAR NULL)
9+
CREATE OR REPLACE TABLE c (c_id INT NULL, bill VARCHAR NULL)
1610

1711
statement ok
18-
CREATE TABLE o (o_id INT NULL, c_id INT NULL, ship VARCHAR NULL)
12+
CREATE OR REPLACE TABLE o (o_id INT NULL, c_id INT NULL, ship VARCHAR NULL)
1913

2014
statement ok
2115
INSERT INTO c VALUES
@@ -645,20 +639,16 @@ select number from numbers(10) where number in (select unnest([1,2,3]))
645639
2
646640
3
647641

648-
statement ok
649-
drop table if exists t1
650642

651643
statement ok
652-
create table t1 (a int, b int);
644+
create or replace table t1 (a int, b int);
653645

654646
statement ok
655647
insert into t1 values(1, 2);
656648

657-
statement ok
658-
drop table if exists t2
659649

660650
statement ok
661-
create table t2 (a int, b int);
651+
create or replace table t2 (a int, b int);
662652

663653
statement ok
664654
insert into t2 values(1, 1);
@@ -668,10 +658,7 @@ select * from t2 where t2.b < ANY(select NULL from t1 where t1.a = t1.a)
668658
----
669659

670660
statement ok
671-
drop table if exists t3
672-
673-
statement ok
674-
create table t3(a int, b int);
661+
create or replace table t3(a int, b int);
675662

676663
statement ok
677664
insert into t3 values(1, 2), (3, 4), (6, 5);
@@ -823,11 +810,9 @@ select (select sum(a) from t1 where t1.a >= t2.a group by t1.a) from t1 as t2;
823810
statement ok
824811
drop table t1;
825812

826-
statement ok
827-
drop table if exists t;
828813

829814
statement ok
830-
create table t (i int);
815+
create or replace table t (i int);
831816

832817
statement ok
833818
insert into t values(1), (2), (3), (null);
@@ -850,14 +835,14 @@ statement ok
850835
drop table if exists merge_log;
851836

852837
statement ok
853-
CREATE TABLE `push_log` (
838+
create or replace TABLE `push_log` (
854839
`name` VARCHAR NULL,
855840
`data_count` INT NULL,
856841
`created_at` TIMESTAMP NULL
857842
) ENGINE = FUSE;
858843

859844
statement ok
860-
CREATE TABLE `merge_log` (
845+
create or replace TABLE `merge_log` (
861846
`name` VARCHAR NULL,
862847
`file_date` VARCHAR NULL,
863848
`inserted_count` INT NULL,

0 commit comments

Comments
 (0)