Skip to content

Commit d64d2ec

Browse files
committed
fix
1 parent 028584e commit d64d2ec

File tree

14 files changed

+245
-208
lines changed

14 files changed

+245
-208
lines changed

src/common/base/src/runtime/profile/profile.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::runtime::metrics::ScopedRegistry;
2323
use crate::runtime::profile::ProfileStatisticsName;
2424
use crate::runtime::ThreadTracker;
2525

26-
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
26+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2727
pub struct ProfileLabel {
2828
pub name: String,
2929
pub value: Vec<String>,

src/common/storage/src/copy.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use serde::Deserialize;
2020
use serde::Serialize;
2121
use thiserror::Error;
2222

23-
#[derive(Default, Clone, Serialize, Deserialize)]
23+
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
2424
pub struct CopyStatus {
2525
/// Key is file path.
2626
pub files: DashMap<String, FileStatus>,
@@ -45,7 +45,7 @@ impl CopyStatus {
4545
}
4646
}
4747

48-
#[derive(Default, Clone, Serialize, Deserialize)]
48+
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
4949
pub struct FileStatus {
5050
pub num_rows_loaded: usize,
5151
pub error: Option<FileErrorsInfo>,
@@ -79,7 +79,7 @@ impl FileStatus {
7979
}
8080
}
8181

82-
#[derive(Clone, Serialize, Deserialize)]
82+
#[derive(Debug, Clone, Serialize, Deserialize)]
8383
pub struct FileErrorsInfo {
8484
pub num_errors: usize,
8585
pub first_error: FileParseErrorAtLine,
@@ -156,7 +156,7 @@ impl FileParseError {
156156
}
157157
}
158158

159-
#[derive(Clone, Serialize, Deserialize)]
159+
#[derive(Debug, Clone, Serialize, Deserialize)]
160160
pub struct FileParseErrorAtLine {
161161
pub error: FileParseError,
162162
pub line: usize,

src/common/storage/src/merge.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use serde::Deserialize;
1616
use serde::Serialize;
1717

18-
#[derive(Default, Clone, Serialize, Deserialize)]
18+
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
1919
pub struct MutationStatus {
2020
pub insert_rows: u64,
2121
pub deleted_rows: u64,

src/query/catalog/src/statistics/data_cache_statistics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub struct DataCacheMetrics {
2424
bytes_from_memory: AtomicUsize,
2525
}
2626

27-
#[derive(Default, Clone, Serialize, Deserialize)]
27+
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
2828
pub struct DataCacheMetricValues {
2929
pub bytes_from_remote_disk: usize,
3030
pub bytes_from_local_disk: usize,

src/query/expression/src/aggregate/aggregate_function.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ use super::AggrStateLoc;
2323
use super::AggrStateRegistry;
2424
use super::StateAddr;
2525
use crate::types::DataType;
26-
use crate::AggrStateSerdeType;
2726
use crate::BlockEntry;
2827
use crate::ColumnBuilder;
2928
use crate::ProjectedBlock;
3029
use crate::Scalar;
3130
use crate::ScalarRef;
31+
use crate::StateSerdeItem;
3232

3333
pub type AggregateFunctionRef = Arc<dyn AggregateFunction>;
3434

@@ -69,8 +69,8 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
6969
// Used in aggregate_null_adaptor
7070
fn accumulate_row(&self, place: AggrState, columns: ProjectedBlock, row: usize) -> Result<()>;
7171

72-
fn serialize_type(&self) -> Vec<AggrStateSerdeType> {
73-
vec![AggrStateSerdeType::Binary(self.serialize_size_per_row())]
72+
fn serialize_type(&self) -> Vec<StateSerdeItem> {
73+
vec![StateSerdeItem::Binary(self.serialize_size_per_row())]
7474
}
7575

7676
fn serialize(&self, place: AggrState, builder: &mut ColumnBuilder) -> Result<()> {

src/query/expression/src/aggregate/aggregate_function_state.rs

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ pub fn get_states_layout(funcs: &[AggregateFunctionRef]) -> Result<StatesLayout>
119119
for func in funcs {
120120
func.register_state(&mut registry);
121121
registry.commit();
122-
serialize_type.push(func.serialize_type().into_boxed_slice());
122+
serialize_type.push(StateSerdeType(func.serialize_type().into()));
123123
}
124124

125125
let AggrStateRegistry { states, offsets } = registry;
@@ -193,25 +193,49 @@ impl AggrStateLoc {
193193
}
194194
}
195195

196+
#[derive(Debug, Clone, Copy)]
197+
pub enum StateSerdeItem {
198+
Bool,
199+
Binary(Option<usize>),
200+
}
201+
202+
#[derive(Debug, Clone)]
203+
pub struct StateSerdeType(Box<[StateSerdeItem]>);
204+
205+
impl StateSerdeType {
206+
pub fn data_type(&self) -> DataType {
207+
DataType::Tuple(
208+
self.0
209+
.iter()
210+
.map(|item| match item {
211+
StateSerdeItem::Bool => DataType::Boolean,
212+
StateSerdeItem::Binary(_) => DataType::Binary,
213+
})
214+
.collect(),
215+
)
216+
}
217+
}
218+
196219
#[derive(Debug, Clone)]
197220
pub struct StatesLayout {
198221
pub layout: Layout,
199222
pub states_loc: Vec<Box<[AggrStateLoc]>>,
200-
serialize_type: Vec<Box<[AggrStateSerdeType]>>,
223+
pub(super) serialize_type: Vec<StateSerdeType>,
201224
}
202225

203226
impl StatesLayout {
204227
pub fn serialize_builders(&self, num_rows: usize) -> Vec<ColumnBuilder> {
205228
self.serialize_type
206229
.iter()
207-
.map(|item| {
208-
let builder = item
230+
.map(|serde_type| {
231+
let builder = serde_type
232+
.0
209233
.iter()
210-
.map(|serde_type| match serde_type {
211-
AggrStateSerdeType::Bool => {
234+
.map(|item| match item {
235+
StateSerdeItem::Bool => {
212236
ColumnBuilder::with_capacity(&DataType::Boolean, num_rows)
213237
}
214-
AggrStateSerdeType::Binary(size) => {
238+
StateSerdeItem::Binary(size) => {
215239
ColumnBuilder::Binary(BinaryColumnBuilder::with_capacity(
216240
num_rows,
217241
num_rows * size.unwrap_or(0),
@@ -306,12 +330,6 @@ pub enum AggrStateType {
306330
Custom(Layout),
307331
}
308332

309-
#[derive(Debug, Clone, Copy)]
310-
pub enum AggrStateSerdeType {
311-
Bool,
312-
Binary(Option<usize>),
313-
}
314-
315333
#[cfg(test)]
316334
mod tests {
317335
use proptest::prelude::*;

src/query/expression/src/aggregate/payload.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -423,9 +423,14 @@ impl Payload {
423423

424424
pub fn empty_block(&self, fake_rows: Option<usize>) -> DataBlock {
425425
let fake_rows = fake_rows.unwrap_or(0);
426-
let entries = (0..self.aggrs.len())
427-
.map(|_| {
428-
ColumnBuilder::repeat_default(&DataType::Binary, fake_rows)
426+
let entries = self
427+
.states_layout
428+
.as_ref()
429+
.unwrap()
430+
.serialize_type
431+
.iter()
432+
.map(|serde_type| {
433+
ColumnBuilder::repeat_default(&serde_type.data_type(), fake_rows)
429434
.build()
430435
.into()
431436
})

src/query/pipeline/core/src/processors/processor.rs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,24 @@ impl ProcessorPtr {
161161

162162
/// # Safety
163163
pub unsafe fn process(&self) -> Result<()> {
164-
let mut name = self.name();
165-
name.push_str("::process");
166-
let _span = LocalSpan::enter_with_local_parent(name)
164+
let span = LocalSpan::enter_with_local_parent(format!("{}::process", self.name()))
167165
.with_property(|| ("graph-node-id", self.id().index().to_string()));
168166

169-
(*self.inner.get()).process()
167+
match (*self.inner.get()).process() {
168+
Ok(_) => Ok(()),
169+
Err(err) => {
170+
let _ = span
171+
.with_property(|| ("error", "true"))
172+
.with_properties(|| {
173+
[
174+
("error.type", err.code().to_string()),
175+
("error.message", err.display_text()),
176+
]
177+
});
178+
log::info!(error = err.to_string(); "[PIPELINE-EXECUTOR] Error in process");
179+
Err(err)
180+
}
181+
}
170182
}
171183

172184
/// # Safety
@@ -190,10 +202,23 @@ impl ProcessorPtr {
190202
async move {
191203
let span = Span::enter_with_local_parent(name)
192204
.with_property(|| ("graph-node-id", id.index().to_string()));
193-
task.in_span(span).await?;
194205

195-
drop(inner);
196-
Ok(())
206+
match task.await {
207+
Ok(_) => {
208+
drop(inner);
209+
Ok(())
210+
}
211+
Err(err) => {
212+
span.with_property(|| ("error", "true")).add_properties(|| {
213+
[
214+
("error.type", err.code().to_string()),
215+
("error.message", err.display_text()),
216+
]
217+
});
218+
log::info!(error = err.to_string(); "[PIPELINE-EXECUTOR] Error in process");
219+
Err(err)
220+
}
221+
}
197222
}
198223
.boxed()
199224
}

src/query/pipeline/core/src/processors/profile.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ impl Drop for PlanScopeGuard {
4343
}
4444
}
4545

46-
#[derive(serde::Serialize, serde::Deserialize)]
46+
#[derive(Debug, serde::Serialize, serde::Deserialize)]
4747
pub struct ErrorInfoDesc {
4848
message: String,
4949
detail: String,
@@ -60,15 +60,15 @@ impl ErrorInfoDesc {
6060
}
6161
}
6262

63-
#[derive(serde::Serialize, serde::Deserialize)]
63+
#[derive(Debug, serde::Serialize, serde::Deserialize)]
6464
pub enum ErrorInfo {
6565
Other(ErrorInfoDesc),
6666
IoError(ErrorInfoDesc),
6767
ScheduleError(ErrorInfoDesc),
6868
CalculationError(ErrorInfoDesc),
6969
}
7070

71-
#[derive(Clone, serde::Serialize, serde::Deserialize)]
71+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
7272
pub struct PlanProfile {
7373
pub id: Option<u32>,
7474
pub name: Option<String>,

src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs

Lines changed: 35 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -121,26 +121,19 @@ impl Processor for TransformAggregateSerializer {
121121
impl TransformAggregateSerializer {
122122
fn transform_input_data(&mut self, mut data_block: DataBlock) -> Result<Event> {
123123
debug_assert!(data_block.is_empty());
124-
if let Some(block_meta) = data_block.take_meta() {
125-
if let Some(block_meta) = AggregateMeta::downcast_from(block_meta) {
126-
match block_meta {
127-
AggregateMeta::Spilled(_) => unreachable!(),
128-
AggregateMeta::Serialized(_) => unreachable!(),
129-
AggregateMeta::BucketSpilled(_) => unreachable!(),
130-
AggregateMeta::Partitioned { .. } => unreachable!(),
131-
AggregateMeta::AggregateSpilling(_) => unreachable!(),
132-
AggregateMeta::AggregatePayload(p) => {
133-
self.input_data = Some(SerializeAggregateStream::create(
134-
&self.params,
135-
SerializePayload::AggregatePayload(p),
136-
));
137-
return Ok(Event::Sync);
138-
}
139-
}
140-
}
141-
}
142124

143-
unreachable!()
125+
let Some(AggregateMeta::AggregatePayload(p)) = data_block
126+
.take_meta()
127+
.and_then(AggregateMeta::downcast_from)
128+
else {
129+
unreachable!()
130+
};
131+
132+
self.input_data = Some(SerializeAggregateStream::create(
133+
&self.params,
134+
SerializePayload::AggregatePayload(p),
135+
));
136+
Ok(Event::Sync)
144137
}
145138
}
146139

@@ -218,41 +211,29 @@ impl SerializeAggregateStream {
218211
return Ok(None);
219212
}
220213

221-
match self.payload.as_ref().get_ref() {
222-
SerializePayload::AggregatePayload(p) => {
223-
let block = p.payload.aggregate_flush(&mut self.flush_state)?;
224-
225-
if block.is_none() {
226-
self.end_iter = true;
227-
}
228-
229-
match block {
230-
Some(block) => {
231-
self.nums += 1;
232-
Ok(Some(block.add_meta(Some(
233-
AggregateSerdeMeta::create_agg_payload(
234-
p.bucket,
235-
p.max_partition_count,
236-
false,
237-
),
238-
))?))
239-
}
240-
None => {
241-
// always return at least one block
242-
if self.nums == 0 {
243-
self.nums += 1;
244-
let block = p.payload.empty_block(Some(1));
245-
Ok(Some(block.add_meta(Some(
246-
AggregateSerdeMeta::create_agg_payload(
247-
p.bucket,
248-
p.max_partition_count,
249-
true,
250-
),
251-
))?))
252-
} else {
253-
Ok(None)
254-
}
255-
}
214+
let SerializePayload::AggregatePayload(p) = self.payload.as_ref().get_ref();
215+
match p.payload.aggregate_flush(&mut self.flush_state)? {
216+
Some(block) => {
217+
self.nums += 1;
218+
Ok(Some(block.add_meta(Some(
219+
AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count, false),
220+
))?))
221+
}
222+
None => {
223+
self.end_iter = true;
224+
// always return at least one block
225+
if self.nums == 0 {
226+
self.nums += 1;
227+
let block = p.payload.empty_block(Some(1));
228+
Ok(Some(block.add_meta(Some(
229+
AggregateSerdeMeta::create_agg_payload(
230+
p.bucket,
231+
p.max_partition_count,
232+
true,
233+
),
234+
))?))
235+
} else {
236+
Ok(None)
256237
}
257238
}
258239
}

0 commit comments

Comments
 (0)