|
13 | 13 | // limitations under the License.
|
14 | 14 |
|
15 | 15 | use std::any::Any;
|
| 16 | +use std::collections::VecDeque; |
16 | 17 | use std::ops::BitAnd;
|
17 | 18 | use std::sync::Arc;
|
18 | 19 | use std::time::Instant;
|
@@ -65,7 +66,7 @@ pub struct DeserializeDataTransform {
|
65 | 66 |
|
66 | 67 | input: Arc<InputPort>,
|
67 | 68 | output: Arc<OutputPort>,
|
68 |
| - output_data: Vec<DataBlock>, |
| 69 | + output_data: VecDeque<DataBlock>, |
69 | 70 | src_schema: DataSchema,
|
70 | 71 | output_schema: DataSchema,
|
71 | 72 | parts: Vec<PartInfoPtr>,
|
@@ -132,7 +133,7 @@ impl DeserializeDataTransform {
|
132 | 133 | block_reader,
|
133 | 134 | input,
|
134 | 135 | output,
|
135 |
| - output_data: vec![], |
| 136 | + output_data: VecDeque::new(), |
136 | 137 | src_schema,
|
137 | 138 | output_schema,
|
138 | 139 | parts: vec![],
|
@@ -233,7 +234,7 @@ impl Processor for DeserializeDataTransform {
|
233 | 234 | return Ok(Event::NeedConsume);
|
234 | 235 | }
|
235 | 236 |
|
236 |
| - if let Some(data_block) = self.output_data.pop() { |
| 237 | + if let Some(data_block) = self.output_data.pop_front() { |
237 | 238 | self.output.push_data(Ok(data_block));
|
238 | 239 | return Ok(Event::NeedConsume);
|
239 | 240 | }
|
@@ -283,7 +284,7 @@ impl Processor for DeserializeDataTransform {
|
283 | 284 |
|
284 | 285 | self.update_scan_metrics(blocks.as_slice());
|
285 | 286 |
|
286 |
| - self.output_data = blocks; |
| 287 | + self.output_data = blocks.into(); |
287 | 288 | }
|
288 | 289 |
|
289 | 290 | ParquetDataSource::Normal((data, virtual_data)) => {
|
@@ -318,7 +319,7 @@ impl Processor for DeserializeDataTransform {
|
318 | 319 |
|
319 | 320 | self.update_scan_metrics(data_blocks.as_slice());
|
320 | 321 |
|
321 |
| - let mut output_blocks = Vec::with_capacity(data_blocks.len()); |
| 322 | + let mut output_blocks = VecDeque::with_capacity(data_blocks.len()); |
322 | 323 | for mut data_block in data_blocks {
|
323 | 324 | let origin_num_rows = data_block.num_rows();
|
324 | 325 |
|
@@ -359,7 +360,7 @@ impl Processor for DeserializeDataTransform {
|
359 | 360 | self.block_reader.query_internal_columns(),
|
360 | 361 | self.need_reserve_block_info,
|
361 | 362 | )?;
|
362 |
| - output_blocks.push(data_block); |
| 363 | + output_blocks.push_back(data_block); |
363 | 364 | }
|
364 | 365 |
|
365 | 366 | self.output_data = output_blocks;
|
|
0 commit comments