Skip to content

Commit dbd8f68

Browse files
committed
feat(query): support sideload data for insert values
1 parent 76a228c commit dbd8f68

File tree

9 files changed

+74
-5
lines changed

9 files changed

+74
-5
lines changed

scripts/ci/deploy/config/databend-query-node-1.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ table_cache_bloom_index_data_bytes=1073741824
6060
[log]
6161

6262
[log.file]
63-
level = "ERROR"
63+
level = "INFO"
6464
format = "text"
6565
dir = "./.databend/logs_1"
6666

src/query/catalog/src/table_context.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ pub struct ProcessInfo {
5454
pub created_time: SystemTime,
5555
}
5656

57+
#[derive(Debug, Clone)]
58+
pub struct SideloadOptions {
59+
pub uri: Option<String>,
60+
pub stage: Option<String>,
61+
}
62+
5763
#[async_trait::async_trait]
5864
pub trait TableContext: Send + Sync {
5965
/// Build a table instance the plan wants to operate on.
@@ -99,4 +105,5 @@ pub trait TableContext: Send + Sync {
99105
async fn get_table(&self, catalog: &str, database: &str, table: &str)
100106
-> Result<Arc<dyn Table>>;
101107
fn get_processes_info(&self) -> Vec<ProcessInfo>;
108+
fn get_sideload(&self) -> Option<SideloadOptions>;
102109
}

src/query/service/src/interpreters/interpreter_insert_v2.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,26 @@ impl Interpreter for InsertInterpreterV2 {
156156
.format
157157
.exec_stream(input_context.clone(), &mut build_res.main_pipeline)?;
158158
}
159+
InsertInputSource::Sideload(opts) => {
160+
tracing::warn!("sideload insert: {:?}", opts);
161+
// TODO:(everpcpc)
162+
let settings = self.ctx.get_settings();
163+
164+
build_res.main_pipeline.add_source(
165+
|output| {
166+
let name_resolution_ctx =
167+
NameResolutionContext::try_from(settings.as_ref())?;
168+
let inner = ValueSource::new(
169+
"".to_string(),
170+
self.ctx.clone(),
171+
name_resolution_ctx,
172+
plan.schema(),
173+
);
174+
AsyncSourcer::create(self.ctx.clone(), output, inner)
175+
},
176+
1,
177+
)?;
178+
}
159179
InsertInputSource::SelectPlan(plan) => {
160180
let table1 = table.clone();
161181
let (mut select_plan, select_column_bindings) = match plan.as_ref() {

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use common_base::base::tokio;
2121
use common_base::base::tokio::sync::Mutex as TokioMutex;
2222
use common_base::base::tokio::sync::RwLock;
2323
use common_base::runtime::TrySpawn;
24+
use common_catalog::table_context::SideloadOptions;
2425
use common_exception::ErrorCode;
2526
use common_exception::Result;
2627
use serde::Deserialize;
@@ -236,6 +237,14 @@ impl HttpQuery {
236237
let sql = &request.sql;
237238
tracing::info!("run query_id={id} in session_id={session_id}, sql='{sql}'");
238239

240+
match &request.sideload {
241+
Some(sideload) => ctx.attach_sideload(SideloadOptions {
242+
uri: sideload.url.clone(),
243+
stage: sideload.stage.clone(),
244+
}),
245+
None => {}
246+
};
247+
239248
let (block_sender, block_receiver) = sized_spsc(request.pagination.max_rows_in_buffer);
240249
let start_time = Instant::now();
241250
let state = Arc::new(RwLock::new(Executor {

src/query/service/src/sessions/query_ctx.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use common_catalog::plan::DataSourcePlan;
3232
use common_catalog::plan::PartInfoPtr;
3333
use common_catalog::plan::Partitions;
3434
use common_catalog::plan::StageTableInfo;
35+
use common_catalog::table_context::SideloadOptions;
3536
use common_config::DATABEND_COMMIT_VERSION;
3637
use common_datablocks::DataBlock;
3738
use common_datavalues::DataValue;
@@ -192,6 +193,10 @@ impl QueryContext {
192193
self.shared.set_executor(weak_ptr)
193194
}
194195

196+
pub fn attach_sideload(&self, sideload: SideloadOptions) {
197+
self.shared.attach_sideload(sideload);
198+
}
199+
195200
pub fn get_created_time(&self) -> SystemTime {
196201
self.shared.created_time
197202
}
@@ -354,6 +359,11 @@ impl TableContext for QueryContext {
354359
fn get_processes_info(&self) -> Vec<ProcessInfo> {
355360
SessionManager::instance().processes_info()
356361
}
362+
363+
// Get Sideload Options.
364+
fn get_sideload(&self) -> Option<SideloadOptions> {
365+
self.shared.get_sideload()
366+
}
357367
}
358368

359369
impl TrySpawn for QueryContext {

src/query/service/src/sessions/query_ctx_shared.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::time::SystemTime;
2222

2323
use common_base::base::Progress;
2424
use common_base::runtime::Runtime;
25+
use common_catalog::table_context::SideloadOptions;
2526
use common_config::Config;
2627
use common_datablocks::DataBlock;
2728
use common_exception::ErrorCode;
@@ -78,6 +79,7 @@ pub struct QueryContextShared {
7879
pub(in crate::sessions) data_operator: DataOperator,
7980
pub(in crate::sessions) executor: Arc<RwLock<Weak<PipelineExecutor>>>,
8081
pub(in crate::sessions) precommit_blocks: Arc<RwLock<Vec<DataBlock>>>,
82+
pub(in crate::sessions) sideload_config: Arc<RwLock<Option<SideloadOptions>>>,
8183
pub(in crate::sessions) created_time: SystemTime,
8284
}
8385

@@ -107,6 +109,7 @@ impl QueryContextShared {
107109
affect: Arc::new(Mutex::new(None)),
108110
executor: Arc::new(RwLock::new(Weak::new())),
109111
precommit_blocks: Arc::new(RwLock::new(vec![])),
112+
sideload_config: Arc::new(RwLock::new(None)),
110113
created_time: SystemTime::now(),
111114
}))
112115
}
@@ -316,6 +319,15 @@ impl QueryContextShared {
316319
swaped_precommit_blocks
317320
}
318321

322+
pub fn get_sideload(&self) -> Option<SideloadOptions> {
323+
self.sideload_config.read().clone()
324+
}
325+
326+
pub fn attach_sideload(&self, sideload: SideloadOptions) {
327+
let mut sideload_config = self.sideload_config.write();
328+
*sideload_config = Some(sideload);
329+
}
330+
319331
pub fn get_created_time(&self) -> SystemTime {
320332
self.created_time
321333
}

src/query/service/tests/it/storages/fuse/operations/commit.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use common_catalog::plan::PartInfoPtr;
2727
use common_catalog::plan::Partitions;
2828
use common_catalog::table::Table;
2929
use common_catalog::table_context::ProcessInfo;
30+
use common_catalog::table_context::SideloadOptions;
3031
use common_catalog::table_context::TableContext;
3132
use common_datablocks::DataBlock;
3233
use common_exception::ErrorCode;
@@ -453,6 +454,10 @@ impl TableContext for CtxDelegation {
453454
fn get_processes_info(&self) -> Vec<ProcessInfo> {
454455
todo!()
455456
}
457+
458+
fn get_sideload(&self) -> Option<SideloadOptions> {
459+
todo!()
460+
}
456461
}
457462

458463
#[derive(Clone)]

src/query/sql/src/planner/binder/insert.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,13 @@ impl<'a> Binder {
9696
opts, start, None,
9797
))
9898
}
99-
InsertSource::Values { rest_str } => {
100-
let data = rest_str.trim_end_matches(';').trim_start().to_owned();
101-
Ok(InsertInputSource::Values(data))
102-
}
99+
InsertSource::Values { rest_str } => match self.ctx.get_sideload() {
100+
Some(sideload) => Ok(InsertInputSource::Sideload(Arc::new(sideload))),
101+
None => {
102+
let data = rest_str.trim_end_matches(';').trim_start().to_owned();
103+
Ok(InsertInputSource::Values(data))
104+
}
105+
},
103106
InsertSource::Select { query } => {
104107
let statement = Statement::Query(query);
105108
let select_plan = self.bind_statement(bind_context, &statement).await?;

src/query/sql/src/planner/plans/insert.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::sync::Arc;
1616

17+
use common_catalog::table_context::SideloadOptions;
1718
use common_datablocks::DataBlock;
1819
use common_datavalues::DataSchemaRef;
1920
use common_meta_types::FileFormatOptions;
@@ -31,6 +32,8 @@ pub enum InsertInputSource {
3132
StreamingWithFileFormat(FileFormatOptions, usize, Option<Arc<InputContext>>),
3233
// From cloned String and format
3334
Values(String),
35+
// From sideload stage or uri
36+
Sideload(Arc<SideloadOptions>),
3437
}
3538

3639
#[derive(Clone)]

0 commit comments

Comments
 (0)