Skip to content

Commit c481421

Browse files
committed
feat(query): copy use new input format framework.
1 parent 42f4957 commit c481421

File tree

2 files changed

+28
-26
lines changed

2 files changed

+28
-26
lines changed

src/query/service/src/interpreters/interpreter_copy_v2.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ impl CopyInterpreterV2 {
171171
tracing::info!("copy_files_to_table from source: {:?}", read_source_plan);
172172

173173
let from_table = self.ctx.build_table_from_source_plan(&read_source_plan)?;
174+
from_table.read_partitions(self.ctx.clone(), None).await?;
174175
from_table.read2(
175176
self.ctx.clone(),
176177
&read_source_plan,

src/query/service/src/storages/stage/stage_table.rs

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// limitations under the License.
1414

1515
use std::any::Any;
16-
use std::collections::VecDeque;
1716
use std::str::FromStr;
1817
use std::sync::Arc;
1918

@@ -30,15 +29,14 @@ use common_legacy_planners::TruncateTablePlan;
3029
use common_meta_app::schema::TableInfo;
3130
use common_pipeline_core::processors::port::InputPort;
3231
use common_pipeline_core::SinkPipeBuilder;
32+
use common_pipeline_sources::processors::sources::input_formats::InputContext;
3333
use parking_lot::Mutex;
3434
use tracing::info;
3535

3636
use super::StageSourceHelper;
37-
use crate::pipelines::processors::port::OutputPort;
3837
use crate::pipelines::processors::ContextSink;
3938
use crate::pipelines::processors::TransformLimit;
4039
use crate::pipelines::Pipeline;
41-
use crate::pipelines::SourcePipeBuilder;
4240
use crate::sessions::TableContext;
4341
use crate::storages::Table;
4442

@@ -48,6 +46,7 @@ pub struct StageTable {
4846
// But the Table trait need it:
4947
// fn get_table_info(&self) -> &TableInfo).
5048
table_info_placeholder: TableInfo,
49+
input_context: Mutex<Option<Arc<InputContext>>>,
5150
}
5251

5352
impl StageTable {
@@ -57,8 +56,14 @@ impl StageTable {
5756
Ok(Arc::new(Self {
5857
table_info,
5958
table_info_placeholder,
59+
input_context: Default::default(),
6060
}))
6161
}
62+
63+
fn get_input_context(&self) -> Option<Arc<InputContext>> {
64+
let guard = self.input_context.lock();
65+
guard.clone()
66+
}
6267
}
6368

6469
#[async_trait::async_trait]
@@ -74,39 +79,35 @@ impl Table for StageTable {
7479

7580
async fn read_partitions(
7681
&self,
77-
_ctx: Arc<dyn TableContext>,
82+
ctx: Arc<dyn TableContext>,
7883
_push_downs: Option<Extras>,
7984
) -> Result<(Statistics, Partitions)> {
85+
let operator = StageSourceHelper::get_op(&ctx, &self.table_info.stage_info).await?;
86+
let input_ctx = Arc::new(
87+
InputContext::try_create_from_copy(
88+
operator,
89+
ctx.get_settings().clone(),
90+
ctx.get_format_settings()?,
91+
self.table_info.schema.clone(),
92+
self.table_info.stage_info.clone(),
93+
self.table_info.files.clone(),
94+
ctx.get_scan_progress()
95+
)
96+
.await?,
97+
);
98+
let mut guard = self.input_context.lock();
99+
*guard = Some(input_ctx);
80100
Ok((Statistics::default(), vec![]))
81101
}
82102

83103
fn read2(
84104
&self,
85-
ctx: Arc<dyn TableContext>,
105+
_ctx: Arc<dyn TableContext>,
86106
_plan: &ReadDataSourcePlan,
87107
pipeline: &mut Pipeline,
88108
) -> Result<()> {
89-
let settings = ctx.get_settings();
90-
let mut builder = SourcePipeBuilder::create();
91-
let table_info = &self.table_info;
92-
let schema = table_info.schema.clone();
93-
let mut files_deque = VecDeque::with_capacity(table_info.files.len());
94-
for f in &table_info.files {
95-
files_deque.push_back(f.to_string());
96-
}
97-
let files = Arc::new(Mutex::new(files_deque));
98-
99-
let stage_source = StageSourceHelper::try_create(ctx, schema, table_info.clone(), files)?;
100-
101-
for _index in 0..settings.get_max_threads()? {
102-
let output = OutputPort::create();
103-
builder.add_source(output.clone(), stage_source.get_splitter(output)?);
104-
}
105-
pipeline.add_pipe(builder.finalize());
106-
107-
pipeline.add_transform(|transform_input_port, transform_output_port| {
108-
stage_source.get_deserializer(transform_input_port, transform_output_port)
109-
})?;
109+
let input_ctx = self.get_input_context().unwrap();
110+
input_ctx.format.exec_copy(input_ctx.clone(), pipeline)?;
110111

111112
let limit = self.table_info.stage_info.copy_options.size_limit;
112113
if limit > 0 {

0 commit comments

Comments
 (0)