Skip to content

Commit 623b536

Browse files
refactor(query): csv reader support prefetch (#14983)
* csv reader support prefetch. * fix num of processor in CSV read pipeline. * Update src/query/pipeline/sources/src/prefetch_async_source.rs Co-authored-by: Winter Zhang <coswde@gmail.com> * Update prefetch_async_source.rs * Update src/query/pipeline/sources/src/prefetch_async_source.rs --------- Co-authored-by: Winter Zhang <coswde@gmail.com>
1 parent 657cfdd commit 623b536

File tree

4 files changed

+161
-8
lines changed

4 files changed

+161
-8
lines changed

src/query/pipeline/sources/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,15 @@ mod sync_source;
2828
mod sync_source_receiver;
2929

3030
pub mod input_formats;
31+
mod prefetch_async_source;
3132

3233
pub use async_source::AsyncSource;
3334
pub use async_source::AsyncSourcer;
3435
pub use blocks_source::BlocksSource;
3536
pub use empty_source::EmptySource;
3637
pub use one_block_source::OneBlockSource;
38+
pub use prefetch_async_source::PrefetchAsyncSource;
39+
pub use prefetch_async_source::PrefetchAsyncSourcer;
3740
pub use stream_source::AsyncStreamSource;
3841
pub use stream_source::StreamSource;
3942
pub use stream_source::StreamSourceNoSkipEmpty;
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::any::Any;
16+
use std::sync::Arc;
17+
18+
use databend_common_base::base::Progress;
19+
use databend_common_base::base::ProgressValues;
20+
use databend_common_base::runtime::profile::Profile;
21+
use databend_common_base::runtime::profile::ProfileStatisticsName;
22+
use databend_common_catalog::table_context::TableContext;
23+
use databend_common_exception::Result;
24+
use databend_common_expression::DataBlock;
25+
use databend_common_pipeline_core::processors::Event;
26+
use databend_common_pipeline_core::processors::EventCause;
27+
use databend_common_pipeline_core::processors::OutputPort;
28+
use databend_common_pipeline_core::processors::Processor;
29+
use databend_common_pipeline_core::processors::ProcessorPtr;
30+
31+
#[async_trait::async_trait]
32+
pub trait PrefetchAsyncSource: Send {
33+
const NAME: &'static str;
34+
const SKIP_EMPTY_DATA_BLOCK: bool = true;
35+
36+
#[async_trait::unboxed_simple]
37+
async fn generate(&mut self) -> Result<Option<DataBlock>>;
38+
fn is_full(&self, prefetched: &[DataBlock]) -> bool;
39+
40+
fn un_reacted(&self) -> Result<()> {
41+
Ok(())
42+
}
43+
}
44+
45+
// TODO: This can be refactored using proc macros
46+
// TODO: Most of its current code is consistent with sync. We need refactor this with better async
47+
// scheduling after supported expand processors. It will be implemented using a similar dynamic window.
48+
pub struct PrefetchAsyncSourcer<T: 'static + PrefetchAsyncSource> {
49+
is_inner_finish: bool,
50+
51+
inner: T,
52+
output: Arc<OutputPort>,
53+
scan_progress: Arc<Progress>,
54+
generated_data: Vec<DataBlock>,
55+
}
56+
57+
impl<T: 'static + PrefetchAsyncSource> PrefetchAsyncSourcer<T> {
58+
pub fn create(
59+
ctx: Arc<dyn TableContext>,
60+
output: Arc<OutputPort>,
61+
inner: T,
62+
) -> Result<ProcessorPtr> {
63+
let scan_progress = ctx.get_scan_progress();
64+
Ok(ProcessorPtr::create(Box::new(Self {
65+
inner,
66+
output,
67+
scan_progress,
68+
is_inner_finish: false,
69+
generated_data: vec![],
70+
})))
71+
}
72+
}
73+
74+
#[async_trait::async_trait]
75+
impl<T: 'static + PrefetchAsyncSource> Processor for PrefetchAsyncSourcer<T> {
76+
fn name(&self) -> String {
77+
T::NAME.to_string()
78+
}
79+
80+
fn as_any(&mut self) -> &mut dyn Any {
81+
self
82+
}
83+
84+
fn event(&mut self) -> Result<Event> {
85+
if self.is_inner_finish && self.generated_data.is_empty() {
86+
self.output.finish();
87+
return Ok(Event::Finished);
88+
}
89+
90+
if self.output.is_finished() {
91+
return Ok(Event::Finished);
92+
}
93+
94+
if self.output.can_push() {
95+
if let Some(data_block) = self.generated_data.pop() {
96+
self.output.push_data(Ok(data_block));
97+
}
98+
}
99+
100+
if self.is_inner_finish || self.inner.is_full(&self.generated_data) {
101+
Ok(Event::NeedConsume)
102+
} else {
103+
Ok(Event::Async)
104+
}
105+
}
106+
107+
fn un_reacted(&self, cause: EventCause, _id: usize) -> Result<()> {
108+
if let EventCause::Output(_) = cause {
109+
self.inner.un_reacted()?;
110+
}
111+
112+
Ok(())
113+
}
114+
115+
#[async_backtrace::framed]
116+
async fn async_process(&mut self) -> Result<()> {
117+
match self.inner.generate().await? {
118+
None => self.is_inner_finish = true,
119+
Some(data_block) => {
120+
// Don't need to record the scan progress of `MaterializedCteSource`
121+
// Because it reads data from memory.
122+
if !data_block.is_empty() && self.name() != "MaterializedCteSource" {
123+
let progress_values = ProgressValues {
124+
rows: data_block.num_rows(),
125+
bytes: data_block.memory_size(),
126+
};
127+
self.scan_progress.incr(&progress_values);
128+
Profile::record_usize_profile(
129+
ProfileStatisticsName::ScanBytes,
130+
data_block.memory_size(),
131+
);
132+
}
133+
134+
if !T::SKIP_EMPTY_DATA_BLOCK || !data_block.is_empty() {
135+
self.generated_data.push(data_block)
136+
}
137+
}
138+
};
139+
140+
Ok(())
141+
}
142+
}

src/query/storages/stage/src/read/row_based/processors/reader.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use databend_common_catalog::table_context::TableContext;
2424
use databend_common_exception::ErrorCode;
2525
use databend_common_exception::Result;
2626
use databend_common_expression::DataBlock;
27-
use databend_common_pipeline_sources::AsyncSource;
27+
use databend_common_pipeline_sources::PrefetchAsyncSource;
2828
use log::debug;
2929
use opendal::Operator;
3030

@@ -40,19 +40,22 @@ pub struct BytesReader {
4040
op: Operator,
4141
read_batch_size: usize,
4242
file_state: Option<FileState>,
43+
prefetch_num: usize,
4344
}
4445

4546
impl BytesReader {
4647
pub fn try_create(
4748
table_ctx: Arc<dyn TableContext>,
4849
op: Operator,
4950
read_batch_size: usize,
51+
prefetch_num: usize,
5052
) -> Result<Self> {
5153
Ok(Self {
5254
table_ctx,
5355
op,
5456
read_batch_size,
5557
file_state: None,
58+
prefetch_num,
5659
})
5760
}
5861

@@ -103,11 +106,15 @@ impl BytesReader {
103106
}
104107

105108
#[async_trait::async_trait]
106-
impl AsyncSource for BytesReader {
109+
impl PrefetchAsyncSource for BytesReader {
107110
const NAME: &'static str = "BytesReader";
108111

109112
const SKIP_EMPTY_DATA_BLOCK: bool = false;
110113

114+
fn is_full(&self, prefetched: &[DataBlock]) -> bool {
115+
prefetched.len() >= self.prefetch_num
116+
}
117+
111118
#[async_trait::unboxed_simple]
112119
async fn generate(&mut self) -> Result<Option<DataBlock>> {
113120
if self.file_state.is_none() {

src/query/storages/stage/src/read/row_based/read_pipeline.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ use databend_common_meta_app::principal::StageFileCompression;
2525
use databend_common_pipeline_core::processors::ProcessorPtr;
2626
use databend_common_pipeline_core::Pipeline;
2727
use databend_common_pipeline_sources::input_formats::InputContext;
28-
use databend_common_pipeline_sources::AsyncSourcer;
2928
use databend_common_pipeline_sources::EmptySource;
29+
use databend_common_pipeline_sources::PrefetchAsyncSourcer;
3030
use databend_common_pipeline_transforms::processors::AccumulatingTransformer;
3131
use databend_common_settings::Settings;
3232
use databend_common_storage::init_stage_operator;
@@ -55,8 +55,8 @@ impl RowBasedReadPipelineBuilder<'_> {
5555
let batch_size = settings.get_input_read_buffer_size()? as usize;
5656
pipeline.add_source(
5757
|output| {
58-
let reader = BytesReader::try_create(ctx.clone(), operator.clone(), batch_size)?;
59-
AsyncSourcer::create(ctx.clone(), output, reader)
58+
let reader = BytesReader::try_create(ctx.clone(), operator.clone(), batch_size, 1)?;
59+
PrefetchAsyncSourcer::create(ctx.clone(), output, reader)
6060
},
6161
num_threads,
6262
)?;
@@ -94,8 +94,9 @@ impl RowBasedReadPipelineBuilder<'_> {
9494
let settings = ctx.get_settings();
9595
ctx.set_partitions(plan.parts.clone())?;
9696

97-
let threads = std::cmp::min(settings.get_max_threads()? as usize, plan.parts.len());
98-
self.build_read_stage_source(ctx.clone(), pipeline, &settings, threads)?;
97+
let max_threads = settings.get_max_threads()? as usize;
98+
let num_sources = std::cmp::min(max_threads, plan.parts.len());
99+
self.build_read_stage_source(ctx.clone(), pipeline, &settings, num_sources)?;
99100

100101
let format =
101102
create_row_based_file_format(&self.stage_table_info.stage_info.file_format_params);
@@ -137,7 +138,7 @@ impl RowBasedReadPipelineBuilder<'_> {
137138
})?;
138139

139140
// todo(youngsofun): no need to resize if it is unlikely to be unbalanced
140-
pipeline.try_resize(threads)?;
141+
pipeline.try_resize(max_threads)?;
141142

142143
pipeline.add_transform(|input, output| {
143144
let transformer = BlockBuilder::create(load_ctx.clone(), &format)?;

0 commit comments

Comments
 (0)