Skip to content

Commit 805db70

Browse files
committed
feat(query): support sideload data for insert values
1 parent 76a228c commit 805db70

File tree

9 files changed

+214
-5
lines changed

9 files changed

+214
-5
lines changed

scripts/ci/deploy/config/databend-query-node-1.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ table_cache_bloom_index_data_bytes=1073741824
6060
[log]
6161

6262
[log.file]
63-
level = "ERROR"
63+
level = "WARN"
6464
format = "text"
6565
dir = "./.databend/logs_1"
6666

src/query/catalog/src/table_context.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ pub struct ProcessInfo {
5454
pub created_time: SystemTime,
5555
}
5656

57+
#[derive(Debug, Clone)]
58+
pub struct SideloadOptions {
59+
pub uri: Option<String>,
60+
pub stage: Option<String>,
61+
}
62+
5763
#[async_trait::async_trait]
5864
pub trait TableContext: Send + Sync {
5965
/// Build a table instance the plan wants to operate on.
@@ -99,4 +105,5 @@ pub trait TableContext: Send + Sync {
99105
async fn get_table(&self, catalog: &str, database: &str, table: &str)
100106
-> Result<Arc<dyn Table>>;
101107
fn get_processes_info(&self) -> Vec<ProcessInfo>;
108+
fn get_sideload(&self) -> Option<SideloadOptions>;
102109
}

src/query/service/src/interpreters/interpreter_insert_v2.rs

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@ use std::io::BufRead;
1717
use std::io::Cursor;
1818
use std::ops::Not;
1919
use std::sync::Arc;
20+
use std::time::Instant;
2021

2122
use aho_corasick::AhoCorasick;
2223
use common_ast::ast::Expr;
2324
use common_ast::parser::parse_comma_separated_exprs;
2425
use common_ast::parser::tokenize_sql;
2526
use common_ast::Backtrace;
2627
use common_base::runtime::GlobalIORuntime;
28+
use common_catalog::plan::StageFileStatus;
29+
use common_catalog::plan::StageTableInfo;
2730
use common_catalog::table::AppendMode;
2831
use common_datablocks::DataBlock;
2932
use common_datavalues::prelude::*;
@@ -33,19 +36,26 @@ use common_formats::parse_timezone;
3336
use common_formats::FastFieldDecoderValues;
3437
use common_io::cursor_ext::ReadBytesExt;
3538
use common_io::cursor_ext::ReadCheckPointExt;
39+
use common_meta_types::UserStageInfo;
40+
use common_pipeline_core::Pipeline;
3641
use common_pipeline_sources::processors::sources::AsyncSource;
3742
use common_pipeline_sources::processors::sources::AsyncSourcer;
3843
use common_pipeline_transforms::processors::transforms::Transform;
3944
use common_sql::evaluator::ChunkOperator;
4045
use common_sql::evaluator::CompoundChunkOperator;
46+
use common_sql::executor::table_read_plan::ToReadDataSourcePlan;
4147
use common_sql::Metadata;
4248
use common_sql::MetadataRef;
49+
use common_storages_factory::Table;
50+
use common_storages_stage::StageTable;
51+
use common_users::UserApiProvider;
4352
use parking_lot::Mutex;
4453
use parking_lot::RwLock;
4554

4655
use crate::interpreters::common::append2table;
4756
use crate::interpreters::Interpreter;
4857
use crate::interpreters::InterpreterPtr;
58+
use crate::pipelines::processors::TransformAddOn;
4959
use crate::pipelines::PipelineBuildResult;
5060
use crate::pipelines::SourcePipeBuilder;
5161
use crate::schedulers::build_query_pipeline;
@@ -101,6 +111,113 @@ impl InsertInterpreterV2 {
101111
let cast_needed = select_schema != *output_schema;
102112
Ok(cast_needed)
103113
}
114+
115+
// TODO:(everpcpc)
116+
async fn build_insert_from_stage_pipeline(
117+
&self,
118+
table: Arc<dyn Table>,
119+
stage_location: &str,
120+
pipeline: &mut Pipeline,
121+
) -> Result<()> {
122+
let start = Instant::now();
123+
let ctx = self.ctx.clone();
124+
let table_ctx: Arc<dyn TableContext> = ctx.clone();
125+
let source_schema = self.plan.schema();
126+
let target_schema = table.schema();
127+
let catalog_name = self.plan.catalog.clone();
128+
let overwrite = self.plan.overwrite;
129+
130+
let (stage_info, path) = parse_stage_location(&self.ctx, stage_location).await?;
131+
132+
let mut stage_table_info = StageTableInfo {
133+
schema: source_schema.clone(),
134+
user_stage_info: stage_info,
135+
path: path.to_string(),
136+
files: vec![],
137+
pattern: "".to_string(),
138+
files_to_copy: None,
139+
};
140+
141+
let all_source_file_infos = StageTable::list_files(&table_ctx, &stage_table_info).await?;
142+
143+
// TODO: color_copied_files
144+
145+
let mut need_copied_file_infos = vec![];
146+
for file in &all_source_file_infos {
147+
if file.status == StageFileStatus::NeedCopy {
148+
need_copied_file_infos.push(file.clone());
149+
}
150+
}
151+
152+
// DEBUG:
153+
tracing::warn!(
154+
"insert: read all sideload files finished, all:{}, need copy:{}, elapsed:{}",
155+
all_source_file_infos.len(),
156+
need_copied_file_infos.len(),
157+
start.elapsed().as_secs()
158+
);
159+
160+
if need_copied_file_infos.is_empty() {
161+
return Ok(());
162+
}
163+
164+
stage_table_info.files_to_copy = Some(need_copied_file_infos.clone());
165+
let stage_table = StageTable::try_create(stage_table_info.clone())?;
166+
let read_source_plan = {
167+
stage_table
168+
.read_plan_with_catalog(ctx.clone(), catalog_name, None)
169+
.await?
170+
};
171+
172+
stage_table.read_data(table_ctx, &read_source_plan, pipeline)?;
173+
174+
let need_fill_missing_columns = target_schema != source_schema;
175+
if need_fill_missing_columns {
176+
pipeline.add_transform(|transform_input_port, transform_output_port| {
177+
TransformAddOn::try_create(
178+
transform_input_port,
179+
transform_output_port,
180+
source_schema.clone(),
181+
target_schema.clone(),
182+
ctx.clone(),
183+
)
184+
})?;
185+
}
186+
table.append_data(ctx.clone(), pipeline, AppendMode::Copy, false)?;
187+
188+
pipeline.set_on_finished(move |may_error| {
189+
// capture out variable
190+
let overwrite = overwrite;
191+
let ctx = ctx.clone();
192+
let table = table.clone();
193+
194+
match may_error {
195+
Some(error) => {
196+
tracing::error!("insert stage file error: {}", error);
197+
Err(may_error.as_ref().unwrap().clone())
198+
}
199+
None => {
200+
let append_entries = ctx.consume_precommit_blocks();
201+
// We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
202+
return GlobalIORuntime::instance().block_on(async move {
203+
// DEBUG:
204+
tracing::warn!(
205+
"insert: try to commit append entries:{}, elapsed:{}",
206+
append_entries.len(),
207+
start.elapsed().as_secs()
208+
);
209+
table
210+
.commit_insertion(ctx, append_entries, overwrite)
211+
.await?;
212+
Ok(())
213+
// TODO: purge copied files
214+
});
215+
}
216+
}
217+
});
218+
219+
Ok(())
220+
}
104221
}
105222

106223
#[async_trait::async_trait]
@@ -156,6 +273,27 @@ impl Interpreter for InsertInterpreterV2 {
156273
.format
157274
.exec_stream(input_context.clone(), &mut build_res.main_pipeline)?;
158275
}
276+
InsertInputSource::Sideload(opts) => {
277+
// DEBUG:
278+
tracing::warn!("==> sideload insert: {:?}", opts);
279+
280+
match &opts.stage {
281+
None => {
282+
return Err(ErrorCode::BadDataValueType(
283+
"No stage location provided".to_string(),
284+
));
285+
}
286+
Some(stage_location) => {
287+
self.build_insert_from_stage_pipeline(
288+
table.clone(),
289+
stage_location,
290+
&mut build_res.main_pipeline,
291+
)
292+
.await?;
293+
}
294+
}
295+
return Ok(build_res);
296+
}
159297
InsertInputSource::SelectPlan(plan) => {
160298
let table1 = table.clone();
161299
let (mut select_plan, select_column_bindings) = match plan.as_ref() {
@@ -602,3 +740,25 @@ async fn exprs_to_datavalue<'a>(
602740
let datavalues: Vec<DataValue> = res.columns().iter().skip(1).map(|col| col.get(0)).collect();
603741
Ok(datavalues)
604742
}
743+
744+
// FIXME: tmp copy from src/query/sql/src/planner/binder/copy.rs
745+
async fn parse_stage_location(
746+
ctx: &Arc<QueryContext>,
747+
location: &str,
748+
) -> Result<(UserStageInfo, String)> {
749+
let s: Vec<&str> = location.split('@').collect();
750+
// @my_ext_stage/abc/
751+
let names: Vec<&str> = s[1].splitn(2, '/').filter(|v| !v.is_empty()).collect();
752+
753+
let stage = if names[0] == "~" {
754+
UserStageInfo::new_user_stage(&ctx.get_current_user()?.name)
755+
} else {
756+
UserApiProvider::instance()
757+
.get_stage(&ctx.get_tenant(), names[0])
758+
.await?
759+
};
760+
761+
let path = names.get(1).unwrap_or(&"").trim_start_matches('/');
762+
763+
Ok((stage, path.to_string()))
764+
}

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use common_base::base::tokio;
2121
use common_base::base::tokio::sync::Mutex as TokioMutex;
2222
use common_base::base::tokio::sync::RwLock;
2323
use common_base::runtime::TrySpawn;
24+
use common_catalog::table_context::SideloadOptions;
2425
use common_exception::ErrorCode;
2526
use common_exception::Result;
2627
use serde::Deserialize;
@@ -236,6 +237,14 @@ impl HttpQuery {
236237
let sql = &request.sql;
237238
tracing::info!("run query_id={id} in session_id={session_id}, sql='{sql}'");
238239

240+
match &request.sideload {
241+
Some(sideload) => ctx.attach_sideload(SideloadOptions {
242+
uri: sideload.url.clone(),
243+
stage: sideload.stage.clone(),
244+
}),
245+
None => {}
246+
};
247+
239248
let (block_sender, block_receiver) = sized_spsc(request.pagination.max_rows_in_buffer);
240249
let start_time = Instant::now();
241250
let state = Arc::new(RwLock::new(Executor {

src/query/service/src/sessions/query_ctx.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use common_catalog::plan::DataSourcePlan;
3232
use common_catalog::plan::PartInfoPtr;
3333
use common_catalog::plan::Partitions;
3434
use common_catalog::plan::StageTableInfo;
35+
use common_catalog::table_context::SideloadOptions;
3536
use common_config::DATABEND_COMMIT_VERSION;
3637
use common_datablocks::DataBlock;
3738
use common_datavalues::DataValue;
@@ -192,6 +193,10 @@ impl QueryContext {
192193
self.shared.set_executor(weak_ptr)
193194
}
194195

196+
pub fn attach_sideload(&self, sideload: SideloadOptions) {
197+
self.shared.attach_sideload(sideload);
198+
}
199+
195200
pub fn get_created_time(&self) -> SystemTime {
196201
self.shared.created_time
197202
}
@@ -354,6 +359,11 @@ impl TableContext for QueryContext {
354359
fn get_processes_info(&self) -> Vec<ProcessInfo> {
355360
SessionManager::instance().processes_info()
356361
}
362+
363+
// Get Sideload Options.
364+
fn get_sideload(&self) -> Option<SideloadOptions> {
365+
self.shared.get_sideload()
366+
}
357367
}
358368

359369
impl TrySpawn for QueryContext {

src/query/service/src/sessions/query_ctx_shared.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::time::SystemTime;
2222

2323
use common_base::base::Progress;
2424
use common_base::runtime::Runtime;
25+
use common_catalog::table_context::SideloadOptions;
2526
use common_config::Config;
2627
use common_datablocks::DataBlock;
2728
use common_exception::ErrorCode;
@@ -78,6 +79,7 @@ pub struct QueryContextShared {
7879
pub(in crate::sessions) data_operator: DataOperator,
7980
pub(in crate::sessions) executor: Arc<RwLock<Weak<PipelineExecutor>>>,
8081
pub(in crate::sessions) precommit_blocks: Arc<RwLock<Vec<DataBlock>>>,
82+
pub(in crate::sessions) sideload_config: Arc<RwLock<Option<SideloadOptions>>>,
8183
pub(in crate::sessions) created_time: SystemTime,
8284
}
8385

@@ -107,6 +109,7 @@ impl QueryContextShared {
107109
affect: Arc::new(Mutex::new(None)),
108110
executor: Arc::new(RwLock::new(Weak::new())),
109111
precommit_blocks: Arc::new(RwLock::new(vec![])),
112+
sideload_config: Arc::new(RwLock::new(None)),
110113
created_time: SystemTime::now(),
111114
}))
112115
}
@@ -316,6 +319,15 @@ impl QueryContextShared {
316319
swaped_precommit_blocks
317320
}
318321

322+
pub fn get_sideload(&self) -> Option<SideloadOptions> {
323+
self.sideload_config.read().clone()
324+
}
325+
326+
pub fn attach_sideload(&self, sideload: SideloadOptions) {
327+
let mut sideload_config = self.sideload_config.write();
328+
*sideload_config = Some(sideload);
329+
}
330+
319331
pub fn get_created_time(&self) -> SystemTime {
320332
self.created_time
321333
}

src/query/service/tests/it/storages/fuse/operations/commit.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use common_catalog::plan::PartInfoPtr;
2727
use common_catalog::plan::Partitions;
2828
use common_catalog::table::Table;
2929
use common_catalog::table_context::ProcessInfo;
30+
use common_catalog::table_context::SideloadOptions;
3031
use common_catalog::table_context::TableContext;
3132
use common_datablocks::DataBlock;
3233
use common_exception::ErrorCode;
@@ -453,6 +454,10 @@ impl TableContext for CtxDelegation {
453454
fn get_processes_info(&self) -> Vec<ProcessInfo> {
454455
todo!()
455456
}
457+
458+
fn get_sideload(&self) -> Option<SideloadOptions> {
459+
todo!()
460+
}
456461
}
457462

458463
#[derive(Clone)]

src/query/sql/src/planner/binder/insert.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,13 @@ impl<'a> Binder {
9696
opts, start, None,
9797
))
9898
}
99-
InsertSource::Values { rest_str } => {
100-
let data = rest_str.trim_end_matches(';').trim_start().to_owned();
101-
Ok(InsertInputSource::Values(data))
102-
}
99+
InsertSource::Values { rest_str } => match self.ctx.get_sideload() {
100+
Some(sideload) => Ok(InsertInputSource::Sideload(Arc::new(sideload))),
101+
None => {
102+
let data = rest_str.trim_end_matches(';').trim_start().to_owned();
103+
Ok(InsertInputSource::Values(data))
104+
}
105+
},
103106
InsertSource::Select { query } => {
104107
let statement = Statement::Query(query);
105108
let select_plan = self.bind_statement(bind_context, &statement).await?;

src/query/sql/src/planner/plans/insert.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::sync::Arc;
1616

17+
use common_catalog::table_context::SideloadOptions;
1718
use common_datablocks::DataBlock;
1819
use common_datavalues::DataSchemaRef;
1920
use common_meta_types::FileFormatOptions;
@@ -31,6 +32,8 @@ pub enum InsertInputSource {
3132
StreamingWithFileFormat(FileFormatOptions, usize, Option<Arc<InputContext>>),
3233
// From cloned String and format
3334
Values(String),
35+
// From sideload stage or uri
36+
Sideload(Arc<SideloadOptions>),
3437
}
3538

3639
#[derive(Clone)]

0 commit comments

Comments
 (0)