Skip to content

Commit 69eb9d6

Browse files
authored
Merge pull request #9090 from sundy-li/fix-agg2
fix(query): fix state merge bug for aggregation
2 parents 009bd0f + 645ed97 commit 69eb9d6

File tree

8 files changed

+18
-16
lines changed

8 files changed

+18
-16
lines changed

src/query/functions-v2/src/aggregates/adaptors/aggregate_null_unary_adaptor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction for AggregateNullUnaryAdapto
9797
#[inline]
9898
fn state_layout(&self) -> Layout {
9999
let layout = self.nested.state_layout();
100-
let add = usize::from(NULLABLE_RESULT);
100+
let add = if NULLABLE_RESULT { layout.align() } else { 0 };
101101
Layout::from_size_align(layout.size() + add, layout.align()).unwrap()
102102
}
103103

@@ -182,9 +182,9 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction for AggregateNullUnaryAdapto
182182

183183
fn deserialize(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> {
184184
if NULLABLE_RESULT {
185+
let flag = reader[reader.len() - 1];
185186
self.nested
186187
.deserialize(place, &mut &reader[..reader.len() - 1])?;
187-
let flag = reader[reader.len() - 1];
188188
self.set_flag(place, flag);
189189
} else {
190190
self.nested.deserialize(place, reader)?;

src/query/functions-v2/src/aggregates/adaptors/aggregate_null_variadic_adaptor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction
9898
#[inline]
9999
fn state_layout(&self) -> Layout {
100100
let layout = self.nested.state_layout();
101-
let add = usize::from(NULLABLE_RESULT);
101+
let add = if NULLABLE_RESULT { layout.align() } else { 0 };
102102
Layout::from_size_align(layout.size() + add, layout.align()).unwrap()
103103
}
104104

@@ -184,9 +184,9 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction
184184

185185
fn deserialize(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> {
186186
if NULLABLE_RESULT {
187+
let flag = reader[reader.len() - 1];
187188
self.nested
188189
.deserialize(place, &mut &reader[..reader.len() - 1])?;
189-
let flag = reader[reader.len() - 1];
190190
self.set_flag(place, flag);
191191
} else {
192192
self.nested.deserialize(place, reader)?;

src/query/functions-v2/src/aggregates/adaptors/aggregate_ornull_adaptor.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
9090
#[inline]
9191
fn state_layout(&self) -> std::alloc::Layout {
9292
let layout = self.inner.state_layout();
93-
Layout::from_size_align(layout.size() + 1, layout.align()).unwrap()
93+
Layout::from_size_align(layout.size() + layout.align(), layout.align()).unwrap()
9494
}
9595

9696
#[inline]
@@ -174,17 +174,17 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
174174

175175
#[inline]
176176
fn deserialize(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> {
177+
let flag = reader[reader.len() - 1];
177178
self.inner
178179
.deserialize(place, &mut &reader[..reader.len() - 1])?;
179-
let flag = reader[reader.len() - 1];
180180
self.set_flag(place, flag);
181181
Ok(())
182182
}
183183

184184
fn merge(&self, place: StateAddr, rhs: StateAddr) -> Result<()> {
185185
self.inner.merge(place, rhs)?;
186-
let flag = self.get_flag(place) + self.get_flag(rhs);
187-
self.set_flag(place, flag);
186+
let flag = self.get_flag(place) > 0 || self.get_flag(rhs) > 0;
187+
self.set_flag(place, u8::from(flag));
188188
Ok(())
189189
}
190190

src/query/functions-v2/src/aggregates/aggregate_combinator_distinct.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ where State: DistinctStateFunc
7070

7171
fn state_layout(&self) -> Layout {
7272
let layout = Layout::new::<State>();
73+
7374
let netesed = self.nested.state_layout();
7475
Layout::from_size_align(layout.size() + netesed.size(), layout.align()).unwrap()
7576
}

src/query/functions/src/aggregates/adaptors/aggregate_null_unary_adaptor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction for AggregateNullUnaryAdapto
9494
#[inline]
9595
fn state_layout(&self) -> Layout {
9696
let layout = self.nested.state_layout();
97-
let add = usize::from(NULLABLE_RESULT);
97+
let add = if NULLABLE_RESULT { layout.align() } else { 0 };
9898
Layout::from_size_align(layout.size() + add, layout.align()).unwrap()
9999
}
100100

@@ -183,9 +183,9 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction for AggregateNullUnaryAdapto
183183

184184
fn deserialize(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> {
185185
if NULLABLE_RESULT {
186+
let flag = reader[reader.len() - 1];
186187
self.nested
187188
.deserialize(place, &mut &reader[..reader.len() - 1])?;
188-
let flag = reader[reader.len() - 1];
189189
self.set_flag(place, flag);
190190
} else {
191191
self.nested.deserialize(place, reader)?;

src/query/functions/src/aggregates/adaptors/aggregate_null_variadic_adaptor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction
9595
#[inline]
9696
fn state_layout(&self) -> Layout {
9797
let layout = self.nested.state_layout();
98-
let add = usize::from(NULLABLE_RESULT);
98+
let add = if NULLABLE_RESULT { layout.align() } else { 0 };
9999
Layout::from_size_align(layout.size() + add, layout.align()).unwrap()
100100
}
101101

@@ -198,9 +198,9 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction
198198

199199
fn deserialize(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> {
200200
if NULLABLE_RESULT {
201+
let flag = reader[reader.len() - 1];
201202
self.nested
202203
.deserialize(place, &mut &reader[..reader.len() - 1])?;
203-
let flag = reader[reader.len() - 1];
204204
self.set_flag(place, flag);
205205
} else {
206206
self.nested.deserialize(place, reader)?;

src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
9292
#[inline]
9393
fn state_layout(&self) -> std::alloc::Layout {
9494
let layout = self.inner.state_layout();
95-
Layout::from_size_align(layout.size() + 1, layout.align()).unwrap()
95+
Layout::from_size_align(layout.size() + layout.align(), layout.align()).unwrap()
9696
}
9797

9898
#[inline]
@@ -178,17 +178,17 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
178178

179179
#[inline]
180180
fn deserialize(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> {
181+
let flag = reader[reader.len() - 1];
181182
self.inner
182183
.deserialize(place, &mut &reader[..reader.len() - 1])?;
183-
let flag = reader[reader.len() - 1];
184184
self.set_flag(place, flag);
185185
Ok(())
186186
}
187187

188188
fn merge(&self, place: StateAddr, rhs: StateAddr) -> Result<()> {
189189
self.inner.merge(place, rhs)?;
190-
let flag = self.get_flag(place) + self.get_flag(rhs);
191-
self.set_flag(place, flag);
190+
let flag = self.get_flag(place) > 0 || self.get_flag(rhs) > 0;
191+
self.set_flag(place, u8::from(flag));
192192
Ok(())
193193
}
194194

src/query/functions/src/aggregates/aggregate_combinator_distinct.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ where State: DistinctStateFunc
6666

6767
fn state_layout(&self) -> Layout {
6868
let layout = Layout::new::<State>();
69+
6970
let netesed = self.nested.state_layout();
7071
Layout::from_size_align(layout.size() + netesed.size(), layout.align()).unwrap()
7172
}

0 commit comments

Comments
 (0)