Skip to content

Commit 89da510

Browse files
committed
Remove old interpreters
Signed-off-by: Xuanwo <github@xuanwo.io>
1 parent 42c0883 commit 89da510

29 files changed

+40
-2670
lines changed

src/binaries/query/main.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use common_tracing::set_panic_hook;
2525
use databend_query::api::HttpService;
2626
use databend_query::api::RpcService;
2727
use databend_query::clusters::ClusterDiscovery;
28-
use databend_query::interpreters::AsyncInsertManager;
2928
use databend_query::metrics::MetricService;
3029
use databend_query::servers::HttpHandler;
3130
use databend_query::servers::HttpHandlerKind;
@@ -165,13 +164,6 @@ async fn main(_global_tracker: Arc<RuntimeTracker>) -> common_exception::Result<
165164
);
166165
}
167166

168-
// Async Insert Queue
169-
{
170-
let async_insert_queue = AsyncInsertManager::instance();
171-
async_insert_queue.start().await;
172-
info!("Databend async insert has been enabled.")
173-
}
174-
175167
// Print information to users.
176168
println!("Databend Query");
177169
println!();

src/query/service/benches/suites/mod.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,16 @@
1414

1515
use common_base::base::tokio;
1616
use common_exception::Result;
17-
use common_legacy_planners::PlanNode;
1817
use criterion::Criterion;
1918
use databend_query::interpreters::Interpreter;
20-
use databend_query::interpreters::SelectInterpreter;
19+
use databend_query::interpreters::SelectInterpreterV2;
2120
use databend_query::sessions::SessionManager;
2221
use databend_query::sessions::SessionType;
23-
use databend_query::sql::PlanParser;
22+
use databend_query::sql::plans::Plan;
23+
use databend_query::sql::Planner;
2424
use databend_query::Config;
2525
use databend_query::GlobalServices;
26-
use futures::StreamExt;
26+
use futures_util::StreamExt;
2727

2828
pub mod bench_aggregate_query_sql;
2929
pub mod bench_filter_query_sql;
@@ -35,9 +35,18 @@ pub async fn select_executor(sql: &str) -> Result<()> {
3535
let sessions = SessionManager::instance();
3636
let executor_session = sessions.create_session(SessionType::Dummy).await?;
3737
let ctx = executor_session.create_query_context().await?;
38+
let mut planner = Planner::new(ctx.clone());
3839

39-
if let PlanNode::Select(plan) = PlanParser::parse(ctx.clone(), sql).await? {
40-
let executor = SelectInterpreter::try_create(ctx.clone(), plan)?;
40+
let (plan, _, _) = planner.plan_sql(sql).await?;
41+
if let Plan::Query {
42+
s_expr,
43+
metadata,
44+
bind_context,
45+
..
46+
} = plan
47+
{
48+
let executor =
49+
SelectInterpreterV2::try_create(ctx.clone(), *bind_context, *s_expr, metadata)?;
4150
let mut stream = executor.execute(ctx.clone()).await?;
4251
while let Some(_block) = stream.next().await {}
4352
} else {

src/query/service/src/api/rpc/exchange/exchange_manager.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ use crate::interpreters::QueryFragmentsActions;
5252
use crate::pipelines::executor::ExecutorSettings;
5353
use crate::pipelines::executor::PipelineCompleteExecutor;
5454
use crate::pipelines::PipelineBuildResult;
55-
use crate::pipelines::QueryPipelineBuilder;
5655
use crate::sessions::QueryContext;
5756
use crate::sessions::TableContext;
5857
use crate::sql::executor::PipelineBuilder as PipelineBuilderV2;
@@ -668,10 +667,6 @@ impl FragmentCoordinator {
668667
self.initialized = true;
669668

670669
match &self.payload {
671-
FragmentPayload::PlanV1(node) => {
672-
let pipeline_builder = QueryPipelineBuilder::create(ctx);
673-
self.pipeline_build_res = Some(pipeline_builder.finalize(node)?);
674-
}
675670
FragmentPayload::PlanV2(plan) => {
676671
let pipeline_builder = PipelineBuilderV2::create(ctx);
677672
self.pipeline_build_res = Some(pipeline_builder.finalize(plan)?);

src/query/service/src/api/rpc/packets/packet_fragment.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::fmt::Formatter;
1717

1818
use common_datavalues::DataSchemaRef;
1919
use common_exception::Result;
20-
use common_legacy_planners::PlanNode;
2120

2221
use crate::api::DataExchange;
2322
use crate::sql::executor::PhysicalPlan;
@@ -27,14 +26,12 @@ use crate::sql::executor::PhysicalPlan;
2726
#[allow(clippy::large_enum_variant)]
2827
#[derive(Clone, serde::Serialize, serde::Deserialize)]
2928
pub enum FragmentPayload {
30-
PlanV1(PlanNode),
3129
PlanV2(PhysicalPlan),
3230
}
3331

3432
impl Debug for FragmentPayload {
3533
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
3634
match self {
37-
FragmentPayload::PlanV1(plan) => write!(f, "PlanNode({})", plan.name()),
3835
FragmentPayload::PlanV2(plan) => write!(f, "PhysicalPlan({:?})", plan),
3936
}
4037
}
@@ -43,7 +40,6 @@ impl Debug for FragmentPayload {
4340
impl FragmentPayload {
4441
pub fn schema(&self) -> Result<DataSchemaRef> {
4542
match self {
46-
FragmentPayload::PlanV1(node) => Ok(node.schema()),
4743
FragmentPayload::PlanV2(plan) => plan.output_schema(),
4844
}
4945
}

src/query/service/src/global_services.rs

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use opendal::Operator;
3131
use crate::api::DataExchangeManager;
3232
use crate::catalogs::CatalogManagerHelper;
3333
use crate::clusters::ClusterDiscovery;
34-
use crate::interpreters::AsyncInsertManager;
3534
use crate::servers::http::v1::HttpQueryManager;
3635
use crate::sessions::SessionManager;
3736

@@ -40,7 +39,6 @@ pub struct GlobalServices {
4039
query_logger: UnsafeCell<Option<Arc<QueryLogger>>>,
4140
cluster_discovery: UnsafeCell<Option<Arc<ClusterDiscovery>>>,
4241
storage_operator: UnsafeCell<Option<Operator>>,
43-
async_insert_manager: UnsafeCell<Option<Arc<AsyncInsertManager>>>,
4442
cache_manager: UnsafeCell<Option<Arc<CacheManager>>>,
4543
catalog_manager: UnsafeCell<Option<Arc<CatalogManager>>>,
4644
http_query_manager: UnsafeCell<Option<Arc<HttpQueryManager>>>,
@@ -60,7 +58,6 @@ impl GlobalServices {
6058
query_logger: UnsafeCell::new(None),
6159
cluster_discovery: UnsafeCell::new(None),
6260
storage_operator: UnsafeCell::new(None),
63-
async_insert_manager: UnsafeCell::new(None),
6461
cache_manager: UnsafeCell::new(None),
6562
catalog_manager: UnsafeCell::new(None),
6663
http_query_manager: UnsafeCell::new(None),
@@ -81,7 +78,6 @@ impl GlobalServices {
8178
ClusterDiscovery::init(config.clone(), global_services.clone()).await?;
8279

8380
StorageOperator::init(&config.storage, global_services.clone()).await?;
84-
AsyncInsertManager::init(&config, global_services.clone())?;
8581
CacheManager::init(&config.query, global_services.clone())?;
8682
CatalogManager::init(&config, global_services.clone()).await?;
8783
HttpQueryManager::init(&config, global_services.clone()).await?;
@@ -169,25 +165,6 @@ impl SingletonImpl<Operator> for GlobalServices {
169165
}
170166
}
171167

172-
impl SingletonImpl<Arc<AsyncInsertManager>> for GlobalServices {
173-
fn get(&self) -> Arc<AsyncInsertManager> {
174-
unsafe {
175-
match &*self.async_insert_manager.get() {
176-
None => panic!("AsyncInsertManager is not init"),
177-
Some(async_insert_manager) => async_insert_manager.clone(),
178-
}
179-
}
180-
}
181-
182-
fn init(&self, value: Arc<AsyncInsertManager>) -> Result<()> {
183-
unsafe {
184-
*(self.async_insert_manager.get() as *mut Option<Arc<AsyncInsertManager>>) =
185-
Some(value);
186-
Ok(())
187-
}
188-
}
189-
}
190-
191168
impl SingletonImpl<Arc<CacheManager>> for GlobalServices {
192169
fn get(&self) -> Arc<CacheManager> {
193170
unsafe {

0 commit comments

Comments
 (0)