Skip to content

Commit 45ad9e3

Browse files
authored
fix(rust, python): fix streaming joins where the join order has been … (pola-rs#6143)
1 parent 3de901c commit 45ad9e3

File tree

28 files changed

+214
-22
lines changed

28 files changed

+214
-22
lines changed

polars/polars-lazy/polars-pipe/src/executors/operators/dummy.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,8 @@ impl Operator for Dummy {
1717
fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
1818
Box::new(Self {})
1919
}
20+
21+
fn fmt(&self) -> &str {
22+
"dummy"
23+
}
2024
}

polars/polars-lazy/polars-pipe/src/executors/operators/filter.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,7 @@ impl Operator for FilterOperator {
3333
fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
3434
Box::new(self.clone())
3535
}
36+
fn fmt(&self) -> &str {
37+
"filter"
38+
}
3639
}

polars/polars-lazy/polars-pipe/src/executors/operators/projection.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ impl Operator for FastProjectionOperator {
2424
fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
2525
Box::new(self.clone())
2626
}
27+
fn fmt(&self) -> &str {
28+
"fast_join_projection"
29+
}
2730
}
2831

2932
#[derive(Clone)]
@@ -49,6 +52,9 @@ impl Operator for ProjectionOperator {
4952
fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
5053
Box::new(self.clone())
5154
}
55+
fn fmt(&self) -> &str {
56+
"projection"
57+
}
5258
}
5359

5460
#[derive(Clone)]
@@ -79,4 +85,7 @@ impl Operator for HstackOperator {
7985
fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
8086
Box::new(self.clone())
8187
}
88+
fn fmt(&self) -> &str {
89+
"hstack"
90+
}
8291
}

polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/aggregates/convert.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use polars_core::datatypes::Field;
55
use polars_core::error::PolarsResult;
66
use polars_core::prelude::{DataType, SchemaRef, Series, IDX_DTYPE};
77
use polars_core::schema::Schema;
8+
use polars_plan::dsl::Expr;
89
use polars_plan::logical_plan::{ArenaExprIter, Context};
910
use polars_plan::prelude::{AAggExpr, AExpr};
1011
use polars_utils::arena::{Arena, Node};
@@ -30,6 +31,10 @@ impl PhysicalPipedExpr for Count {
3031
fn field(&self, _input_schema: &Schema) -> PolarsResult<Field> {
3132
todo!()
3233
}
34+
35+
fn expression(&self) -> Expr {
36+
Expr::Count
37+
}
3338
}
3439

3540
pub fn can_convert_to_hash_agg(

polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/generic.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,4 +428,7 @@ impl Sink for GenericGroupbySink {
428428
fn as_any(&mut self) -> &mut dyn Any {
429429
self
430430
}
431+
fn fmt(&self) -> &str {
432+
"generic_groupby"
433+
}
431434
}

polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/primitive.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,9 @@ where
360360
fn as_any(&mut self) -> &mut dyn Any {
361361
self
362362
}
363+
fn fmt(&self) -> &str {
364+
"primitive_groupby"
365+
}
363366
}
364367

365368
fn insert_and_get<T>(

polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/string.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,9 @@ impl Sink for Utf8GroupbySink {
358358
fn as_any(&mut self) -> &mut dyn Any {
359359
self
360360
}
361+
fn fmt(&self) -> &str {
362+
"utf8_groupby"
363+
}
361364
}
362365

363366
// write agg_idx to the hashes buffer.

polars/polars-lazy/polars-pipe/src/executors/sinks/joins/cross.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ impl Sink for CrossJoin {
6262
fn as_any(&mut self) -> &mut dyn Any {
6363
self
6464
}
65+
66+
fn fmt(&self) -> &str {
67+
"cross_join_sink"
68+
}
6569
}
6670

6771
#[derive(Clone)]
@@ -152,4 +156,8 @@ impl Operator for CrossJoinProbe {
152156
fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
153157
Box::new(self.clone())
154158
}
159+
160+
fn fmt(&self) -> &str {
161+
"cross_join_probe"
162+
}
155163
}

polars/polars-lazy/polars-pipe/src/executors/sinks/joins/generic_build.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ impl GenericBuild {
151151
chunk: &DataChunk,
152152
) -> PolarsResult<&[Series]> {
153153
self.join_series.clear();
154+
154155
for phys_e in self.join_columns_left.iter() {
155156
let s = phys_e.evaluate(chunk, context.execution_state.as_any())?;
156157
let s = s.to_physical_repr();
@@ -337,7 +338,7 @@ impl Sink for GenericBuild {
337338
let suffix = self.suffix.clone();
338339
let hb = self.hb.clone();
339340
let hash_tables = Arc::new(std::mem::take(&mut self.hash_tables));
340-
let join_columns_left = self.join_columns_right.clone();
341+
let join_columns_left = self.join_columns_left.clone();
341342
let join_columns_right = self.join_columns_right.clone();
342343

343344
// take the buffers, this saves one allocation
@@ -369,6 +370,9 @@ impl Sink for GenericBuild {
369370
fn as_any(&mut self) -> &mut dyn Any {
370371
self
371372
}
373+
fn fmt(&self) -> &str {
374+
"generic_join_build"
375+
}
372376
}
373377

374378
pub(super) struct KeysIter<'a> {

polars/polars-lazy/polars-pipe/src/executors/sinks/joins/inner_left.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,12 @@ impl GenericJoinProbe {
6666
hash_tables: Arc<Vec<PlIdHashMap<Key, Vec<ChunkId>>>>,
6767
join_columns_left: Arc<Vec<Arc<dyn PhysicalPipedExpr>>>,
6868
join_columns_right: Arc<Vec<Arc<dyn PhysicalPipedExpr>>>,
69-
mut swapped_or_left: bool,
69+
swapped_or_left: bool,
7070
join_series: Vec<Series>,
7171
hashes: Vec<u64>,
7272
context: &PExecutionContext,
7373
how: JoinType,
7474
) -> Self {
75-
if matches!(how, JoinType::Left) {
76-
swapped_or_left = true
77-
}
7875
if swapped_or_left {
7976
let tmp = DataChunk {
8077
data: df_a.slice(0, 1),
@@ -329,4 +326,7 @@ impl Operator for GenericJoinProbe {
329326
let new = self.clone();
330327
Box::new(new)
331328
}
329+
fn fmt(&self) -> &str {
330+
"generic_join_probe"
331+
}
332332
}

0 commit comments

Comments
 (0)