Skip to content

Commit 0a02292

Browse files
committed
refactor(interpreter): make lint
1 parent 15c8faf commit 0a02292

File tree

125 files changed

+316
-491
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

125 files changed

+316
-491
lines changed

src/query/pipeline/sources/src/processors/sources/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ pub mod deserializer;
1818
pub mod empty_source;
1919
pub mod file_splitter;
2020
pub mod multi_file_splitter;
21+
mod one_block_source;
2122
pub mod stream_source;
2223
pub mod stream_source_v2;
2324
pub mod sync_source;
2425
pub mod sync_source_receiver;
25-
mod one_block_source;
2626

2727
pub use async_source::AsyncSource;
2828
pub use async_source::AsyncSourcer;
@@ -34,14 +34,14 @@ pub use file_splitter::FileSplitter;
3434
pub use file_splitter::FileSplitterState;
3535
pub use multi_file_splitter::MultiFileSplitter;
3636
pub use multi_file_splitter::OperatorInfo;
37+
pub use one_block_source::OneBlockSource;
3738
pub use stream_source::StreamSource;
3839
pub use stream_source::StreamSourceNoSkipEmpty;
3940
pub use stream_source_v2::StreamSourceV2;
4041
pub use sync_source::SyncSource;
4142
pub use sync_source::SyncSourcer;
4243
pub use sync_source::*;
4344
pub use sync_source_receiver::SyncReceiverSource;
44-
pub use one_block_source::OneBlockSource;
4545

4646
#[allow(dead_code)]
4747
mod source_example {

src/query/pipeline/sources/src/processors/sources/one_block_source.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,15 @@
1313
// limitations under the License.
1414

1515
use std::any::Any;
16-
use std::collections::VecDeque;
1716
use std::sync::Arc;
1817

19-
use common_catalog::table_context::TableContext;
2018
use common_datablocks::DataBlock;
2119
use common_exception::Result;
2220
use common_pipeline_core::processors::port::OutputPort;
23-
use common_pipeline_core::processors::processor::{Event, ProcessorPtr};
24-
use parking_lot::Mutex;
21+
use common_pipeline_core::processors::processor::Event;
22+
use common_pipeline_core::processors::processor::ProcessorPtr;
2523
use common_pipeline_core::processors::Processor;
2624

27-
use crate::processors::sources::SyncSource;
28-
use crate::processors::sources::SyncSourcer;
29-
3025
pub struct OneBlockSource {
3126
output: Arc<OutputPort>,
3227
data_block: Option<DataBlock>,

src/query/service/benches/suites/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ pub async fn select_executor(sql: &str) -> Result<()> {
3737
let ctx = executor_session.create_query_context().await?;
3838

3939
if let PlanNode::Select(plan) = PlanParser::parse(ctx.clone(), sql).await? {
40-
let executor = SelectInterpreter::try_create(ctx, plan)?;
41-
let mut stream = executor.execute().await?;
40+
let executor = SelectInterpreter::try_create(ctx.clone(), plan)?;
41+
let mut stream = executor.execute(ctx.clone()).await?;
4242
while let Some(_block) = stream.next().await {}
4343
} else {
4444
unreachable!()

src/query/service/src/interpreters/execute_query.rs

Lines changed: 0 additions & 55 deletions
This file was deleted.

src/query/service/src/interpreters/interpreter.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,20 @@
1313
// limitations under the License.
1414

1515
use std::sync::Arc;
16+
1617
use common_catalog::table_context::TableContext;
17-
use common_datavalues::{DataSchema, DataSchemaRef};
18+
use common_datavalues::DataSchema;
19+
use common_datavalues::DataSchemaRef;
1820
use common_datavalues::DataSchemaRefExt;
1921
use common_exception::ErrorCode;
2022
use common_exception::Result;
21-
use common_streams::{DataBlockStream, SendableDataBlockStream};
22-
use crate::interpreters::ProcessorExecutorStream;
23-
use crate::pipelines::executor::{ExecutorSettings, PipelineCompleteExecutor, PipelinePullingExecutor};
23+
use common_streams::DataBlockStream;
24+
use common_streams::SendableDataBlockStream;
2425

26+
use crate::interpreters::ProcessorExecutorStream;
27+
use crate::pipelines::executor::ExecutorSettings;
28+
use crate::pipelines::executor::PipelineCompleteExecutor;
29+
use crate::pipelines::executor::PipelinePullingExecutor;
2530
use crate::pipelines::PipelineBuildResult;
2631
use crate::pipelines::SourcePipeBuilder;
2732
use crate::sessions::QueryContext;

src/query/service/src/interpreters/interpreter_call.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,9 @@ use std::sync::RwLock;
1717

1818
use common_datavalues::DataSchemaRef;
1919
use common_exception::Result;
20-
use common_pipeline_core::Pipeline;
2120
use common_planners::CallPlan;
22-
use common_streams::SendableDataBlockStream;
2321

2422
use super::Interpreter;
25-
use crate::interpreters::ProcessorExecutorStream;
26-
use crate::pipelines::executor::ExecutorSettings;
27-
use crate::pipelines::executor::PipelinePullingExecutor;
2823
use crate::pipelines::PipelineBuildResult;
2924
use crate::procedures::ProcedureFactory;
3025
use crate::sessions::QueryContext;
@@ -74,8 +69,12 @@ impl Interpreter for CallInterpreter {
7469
}
7570

7671
let mut build_res = PipelineBuildResult::create();
77-
func.eval(self.ctx.clone(), plan.args.clone(), &mut build_res.main_pipeline)
78-
.await?;
72+
func.eval(
73+
self.ctx.clone(),
74+
plan.args.clone(),
75+
&mut build_res.main_pipeline,
76+
)
77+
.await?;
7978

8079
Ok(build_res)
8180
}

src/query/service/src/interpreters/interpreter_cluster_key_alter.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,9 @@ use common_exception::Result;
1818
use common_meta_types::GrantObject;
1919
use common_meta_types::UserPrivilegeType;
2020
use common_planners::AlterTableClusterKeyPlan;
21-
use common_streams::DataBlockStream;
22-
use common_streams::SendableDataBlockStream;
23-
use crate::pipelines::PipelineBuildResult;
2421

2522
use super::Interpreter;
23+
use crate::pipelines::PipelineBuildResult;
2624
use crate::sessions::QueryContext;
2725
use crate::sessions::TableContext;
2826

src/query/service/src/interpreters/interpreter_cluster_key_drop.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,9 @@ use common_exception::Result;
1818
use common_meta_types::GrantObject;
1919
use common_meta_types::UserPrivilegeType;
2020
use common_planners::DropTableClusterKeyPlan;
21-
use common_streams::DataBlockStream;
22-
use common_streams::SendableDataBlockStream;
23-
use crate::pipelines::PipelineBuildResult;
2421

2522
use super::Interpreter;
23+
use crate::pipelines::PipelineBuildResult;
2624
use crate::sessions::QueryContext;
2725
use crate::sessions::TableContext;
2826

src/query/service/src/interpreters/interpreter_common.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ pub fn append2table(
8686
Err(cause) => Err(ErrorCode::PanicError(format!(
8787
"Maybe panic while in commit insert. {}",
8888
cause
89-
)))
89+
))),
9090
};
9191
}
9292

@@ -103,7 +103,8 @@ pub fn execute_pipeline(ctx: Arc<QueryContext>, mut res: PipelineBuildResult) ->
103103
res.set_max_threads(ctx.get_settings().get_max_threads()? as usize);
104104
let mut pipelines = res.sources_pipelines;
105105
pipelines.push(res.main_pipeline);
106-
let executor = PipelineCompleteExecutor::from_pipelines(query_need_abort, pipelines, executor_settings)?;
106+
let executor =
107+
PipelineCompleteExecutor::from_pipelines(query_need_abort, pipelines, executor_settings)?;
107108
executor.execute()
108109
}
109110

src/query/service/src/interpreters/interpreter_copy_v2.rs

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,22 @@ use std::collections::HashSet;
1515
use std::path::Path;
1616
use std::sync::Arc;
1717

18-
use common_datablocks::DataBlock;
18+
use common_base::base::GlobalIORuntime;
19+
use common_base::base::TrySpawn;
1920
use common_datavalues::prelude::*;
2021
use common_exception::ErrorCode;
2122
use common_exception::Result;
2223
use common_meta_types::UserStageInfo;
2324
use common_planners::ReadDataSourcePlan;
2425
use common_planners::SourceInfo;
2526
use common_planners::StageTableInfo;
26-
use common_streams::DataBlockStream;
27-
use common_streams::SendableDataBlockStream;
2827
use futures::TryStreamExt;
2928
use regex::Regex;
30-
use common_base::base::{GlobalIORuntime, TrySpawn};
3129

3230
use super::append2table;
3331
use crate::interpreters::Interpreter;
34-
use crate::interpreters::interpreter_common::{execute_pipeline};
3532
use crate::interpreters::SelectInterpreterV2;
36-
use crate::pipelines::executor::ExecutorSettings;
37-
use crate::pipelines::executor::PipelineCompleteExecutor;
38-
use crate::pipelines::{Pipeline, PipelineBuildResult};
33+
use crate::pipelines::PipelineBuildResult;
3934
use crate::sessions::QueryContext;
4035
use crate::sessions::TableContext;
4136
use crate::sql::plans::CopyPlanV2;
@@ -117,7 +112,11 @@ impl CopyInterpreterV2 {
117112
}
118113
}
119114

120-
async fn purge_files(ctx: Arc<QueryContext>, from: &ReadDataSourcePlan, files: &Vec<String>) -> Result<()> {
115+
async fn purge_files(
116+
ctx: Arc<QueryContext>,
117+
from: &ReadDataSourcePlan,
118+
files: &Vec<String>,
119+
) -> Result<()> {
121120
match &from.source_info {
122121
SourceInfo::StageSource(table_info) => {
123122
if table_info.stage_info.copy_options.purge {
@@ -172,7 +171,11 @@ impl CopyInterpreterV2 {
172171
tracing::info!("copy_files_to_table from source: {:?}", read_source_plan);
173172

174173
let from_table = self.ctx.build_table_from_source_plan(&read_source_plan)?;
175-
from_table.read2(self.ctx.clone(), &read_source_plan, &mut build_res.main_pipeline)?;
174+
from_table.read2(
175+
self.ctx.clone(),
176+
&read_source_plan,
177+
&mut build_res.main_pipeline,
178+
)?;
176179

177180
let to_table = self.ctx.get_table(catalog_name, db_name, tbl_name).await?;
178181

@@ -195,7 +198,9 @@ impl CopyInterpreterV2 {
195198
let task = GlobalIORuntime::instance().spawn(async move {
196199
// Commit
197200
let operations = ctx.consume_precommit_blocks();
198-
to_table.commit_insertion(ctx.clone(), &catalog_name, operations, false).await?;
201+
to_table
202+
.commit_insertion(ctx.clone(), &catalog_name, operations, false)
203+
.await?;
199204

200205
// Purge
201206
CopyInterpreterV2::purge_files(ctx, &from, &files).await
@@ -207,7 +212,7 @@ impl CopyInterpreterV2 {
207212
Err(cause) => Err(ErrorCode::PanicError(format!(
208213
"Maybe panic while in commit insert. {}",
209214
cause
210-
)))
215+
))),
211216
};
212217
}
213218

@@ -263,7 +268,14 @@ impl CopyInterpreterV2 {
263268
let mut build_res = select_interpreter.execute2().await?;
264269
let table = StageTable::try_create(stage_table_info)?;
265270

266-
append2table(self.ctx.clone(), table.clone(), data_schema.clone(), &mut build_res, false, true)?;
271+
append2table(
272+
self.ctx.clone(),
273+
table.clone(),
274+
data_schema.clone(),
275+
&mut build_res,
276+
false,
277+
true,
278+
)?;
267279
Ok(build_res)
268280
}
269281
}
@@ -308,9 +320,18 @@ impl Interpreter for CopyInterpreterV2 {
308320
}
309321

310322
tracing::info!("matched files: {:?}, pattern: {}", &files, pattern);
311-
self.copy_files_to_table(catalog_name, database_name, table_name, from, files.clone()).await
323+
self.copy_files_to_table(
324+
catalog_name,
325+
database_name,
326+
table_name,
327+
from,
328+
files.clone(),
329+
)
330+
.await
312331
}
313-
CopyPlanV2::IntoStage { stage, from, path, .. } => self.execute_copy_into_stage(stage, path, from).await,
332+
CopyPlanV2::IntoStage {
333+
stage, from, path, ..
334+
} => self.execute_copy_into_stage(stage, path, from).await,
314335
}
315336
}
316337
}

0 commit comments

Comments
 (0)