Skip to content

Commit e1513fa

Browse files
committed
feat(query): copy use new input format framework.
1 parent 064dcc2 commit e1513fa

File tree

2 files changed

+28
-26
lines changed

2 files changed

+28
-26
lines changed

src/query/service/src/interpreters/interpreter_copy_v2.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ impl CopyInterpreterV2 {
269269
tracing::info!("copy_files_to_table from source: {:?}", read_source_plan);
270270

271271
let from_table = self.ctx.build_table_from_source_plan(&read_source_plan)?;
272+
from_table.read_partitions(self.ctx.clone(), None).await?;
272273
from_table.read2(
273274
self.ctx.clone(),
274275
&read_source_plan,

src/query/service/src/storages/stage/stage_table.rs

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// limitations under the License.
1414

1515
use std::any::Any;
16-
use std::collections::VecDeque;
1716
use std::str::FromStr;
1817
use std::sync::Arc;
1918

@@ -29,15 +28,14 @@ use common_legacy_planners::Statistics;
2928
use common_meta_app::schema::TableInfo;
3029
use common_pipeline_core::processors::port::InputPort;
3130
use common_pipeline_core::SinkPipeBuilder;
31+
use common_pipeline_sources::processors::sources::input_formats::InputContext;
3232
use parking_lot::Mutex;
3333
use tracing::info;
3434

3535
use super::StageSourceHelper;
36-
use crate::pipelines::processors::port::OutputPort;
3736
use crate::pipelines::processors::ContextSink;
3837
use crate::pipelines::processors::TransformLimit;
3938
use crate::pipelines::Pipeline;
40-
use crate::pipelines::SourcePipeBuilder;
4139
use crate::sessions::TableContext;
4240
use crate::storages::Table;
4341

@@ -47,6 +45,7 @@ pub struct StageTable {
4745
// But the Table trait need it:
4846
// fn get_table_info(&self) -> &TableInfo).
4947
table_info_placeholder: TableInfo,
48+
input_context: Mutex<Option<Arc<InputContext>>>,
5049
}
5150

5251
impl StageTable {
@@ -56,8 +55,14 @@ impl StageTable {
5655
Ok(Arc::new(Self {
5756
table_info,
5857
table_info_placeholder,
58+
input_context: Default::default(),
5959
}))
6060
}
61+
62+
fn get_input_context(&self) -> Option<Arc<InputContext>> {
63+
let guard = self.input_context.lock();
64+
guard.clone()
65+
}
6166
}
6267

6368
#[async_trait::async_trait]
@@ -73,39 +78,35 @@ impl Table for StageTable {
7378

7479
async fn read_partitions(
7580
&self,
76-
_ctx: Arc<dyn TableContext>,
81+
ctx: Arc<dyn TableContext>,
7782
_push_downs: Option<Extras>,
7883
) -> Result<(Statistics, Partitions)> {
84+
let operator = StageSourceHelper::get_op(&ctx, &self.table_info.stage_info).await?;
85+
let input_ctx = Arc::new(
86+
InputContext::try_create_from_copy(
87+
operator,
88+
ctx.get_settings().clone(),
89+
ctx.get_format_settings()?,
90+
self.table_info.schema.clone(),
91+
self.table_info.stage_info.clone(),
92+
self.table_info.files.clone(),
93+
ctx.get_scan_progress()
94+
)
95+
.await?,
96+
);
97+
let mut guard = self.input_context.lock();
98+
*guard = Some(input_ctx);
7999
Ok((Statistics::default(), vec![]))
80100
}
81101

82102
fn read2(
83103
&self,
84-
ctx: Arc<dyn TableContext>,
104+
_ctx: Arc<dyn TableContext>,
85105
_plan: &ReadDataSourcePlan,
86106
pipeline: &mut Pipeline,
87107
) -> Result<()> {
88-
let settings = ctx.get_settings();
89-
let mut builder = SourcePipeBuilder::create();
90-
let table_info = &self.table_info;
91-
let schema = table_info.schema.clone();
92-
let mut files_deque = VecDeque::with_capacity(table_info.files.len());
93-
for f in &table_info.files {
94-
files_deque.push_back(f.to_string());
95-
}
96-
let files = Arc::new(Mutex::new(files_deque));
97-
98-
let stage_source = StageSourceHelper::try_create(ctx, schema, table_info.clone(), files)?;
99-
100-
for _index in 0..settings.get_max_threads()? {
101-
let output = OutputPort::create();
102-
builder.add_source(output.clone(), stage_source.get_splitter(output)?);
103-
}
104-
pipeline.add_pipe(builder.finalize());
105-
106-
pipeline.add_transform(|transform_input_port, transform_output_port| {
107-
stage_source.get_deserializer(transform_input_port, transform_output_port)
108-
})?;
108+
let input_ctx = self.get_input_context().unwrap();
109+
input_ctx.format.exec_copy(input_ctx.clone(), pipeline)?;
109110

110111
let limit = self.table_info.stage_info.copy_options.size_limit;
111112
if limit > 0 {

0 commit comments

Comments
 (0)