Skip to content

Commit f2a2699

Browse files
committed
refactor(interpreter): remove useless code
1 parent 33d20df commit f2a2699

File tree

7 files changed

+5
-111
lines changed

7 files changed

+5
-111
lines changed

src/query/service/src/servers/http/clickhouse_handler.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ async fn execute(
111111
.start()
112112
.await
113113
.map_err(|e| error!("interpreter.start.error: {:?}", e));
114-
let data_stream: SendableDataBlockStream = {
114+
let mut data_stream: SendableDataBlockStream = {
115115
let output_port = OutputPort::create();
116116
let stream_source = StreamSource::create(ctx.clone(), input_stream, output_port.clone())?;
117117
let mut source_pipe_builder = SourcePipeBuilder::create();
@@ -122,7 +122,6 @@ async fn execute(
122122
interpreter.execute(ctx.clone()).await?
123123
};
124124

125-
let mut data_stream = ctx.try_create_abortable(data_stream)?;
126125
let format_setting = ctx.get_format_settings()?;
127126
let mut output_format = format.create_format(schema, format_setting);
128127
let prefix = Ok(output_format.serialize_prefix()?);

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ use common_exception::Result;
3030
use common_legacy_planners::PlanNode;
3131
use common_streams::DataBlockStream;
3232
use common_streams::SendableDataBlockStream;
33-
use futures::future::AbortHandle;
34-
use futures::future::Abortable;
3533
use futures::StreamExt;
3634
use futures_util::FutureExt;
3735
use serde::Deserialize;
@@ -322,8 +320,7 @@ async fn execute(
322320
block_buffer: Arc<BlockBuffer>,
323321
executor: Arc<RwLock<Executor>>,
324322
) -> Result<()> {
325-
let data_stream = interpreter.execute(ctx.clone()).await?;
326-
let mut data_stream = ctx.try_create_abortable(data_stream)?;
323+
let mut data_stream = interpreter.execute(ctx.clone()).await?;
327324
let use_result_cache = !ctx.get_config().query.management_mode;
328325

329326
match data_stream.next().await {
@@ -425,17 +422,10 @@ impl HttpQueryHandle {
425422
};
426423

427424
let (error_sender, mut error_receiver) = mpsc::channel::<Result<()>>(1);
428-
let (abort_handle, abort_registration) = AbortHandle::new_pair();
429425

430426
GlobalIORuntime::instance().spawn(async move {
431-
let error_receiver = Abortable::new(error_receiver.recv(), abort_registration);
432-
ctx.add_source_abort_handle(abort_handle);
433-
match error_receiver.await {
434-
Err(_) => {
435-
Executor::stop(&executor, Err(ErrorCode::AbortedQuery("")), false).await;
436-
block_buffer.stop_push().await.unwrap();
437-
}
438-
Ok(Some(Err(e))) => {
427+
match error_receiver.recv().await {
428+
Some(Err(e)) => {
439429
Executor::stop(&executor, Err(e), false).await;
440430
block_buffer.stop_push().await.unwrap();
441431
}

src/query/service/src/servers/mysql/mysql_interactive_worker.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -459,10 +459,7 @@ impl<W: AsyncWrite + Send + Unpin> InteractiveWorkerBase<W> {
459459
}
460460
};
461461

462-
let abortable_stream = ctx
463-
.try_create_abortable(intercepted_stream.boxed())?
464-
.boxed();
465-
Ok::<_, ErrorCode>(abortable_stream)
462+
Ok::<_, ErrorCode>(intercepted_stream.boxed())
466463
}
467464
.in_current_span()
468465
})?;

src/query/service/src/sessions/query_ctx.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@ use common_legacy_planners::SourceInfo;
4141
use common_legacy_planners::StageTableInfo;
4242
use common_meta_app::schema::TableInfo;
4343
use common_meta_types::UserInfo;
44-
use common_streams::AbortStream;
45-
use common_streams::SendableDataBlockStream;
46-
use futures::future::AbortHandle;
4744
use opendal::Operator;
4845
use parking_lot::Mutex;
4946
use parking_lot::RwLock;
@@ -144,16 +141,6 @@ impl QueryContext {
144141
DataExchangeManager::instance()
145142
}
146143

147-
pub fn try_create_abortable(&self, input: SendableDataBlockStream) -> Result<AbortStream> {
148-
let (abort_handle, abort_stream) = AbortStream::try_create(input)?;
149-
self.shared.add_source_abort_handle(abort_handle);
150-
Ok(abort_stream)
151-
}
152-
153-
pub fn add_source_abort_handle(&self, abort_handle: AbortHandle) {
154-
self.shared.add_source_abort_handle(abort_handle);
155-
}
156-
157144
pub fn attach_http_query(&self, handle: HttpQueryHandle) {
158145
self.shared.attach_http_query_handle(handle);
159146
}

src/query/service/src/sessions/query_ctx_shared.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use common_exception::ErrorCode;
2525
use common_exception::Result;
2626
use common_meta_types::UserInfo;
2727
use common_storage::StorageOperator;
28-
use futures::future::AbortHandle;
2928
use opendal::Operator;
3029
use parking_lot::Mutex;
3130
use parking_lot::RwLock;
@@ -68,7 +67,6 @@ pub struct QueryContextShared {
6867
pub(in crate::sessions) runtime: Arc<RwLock<Option<Arc<Runtime>>>>,
6968
pub(in crate::sessions) init_query_id: Arc<RwLock<String>>,
7069
pub(in crate::sessions) cluster_cache: Arc<Cluster>,
71-
pub(in crate::sessions) sources_abort_handle: Arc<RwLock<Vec<AbortHandle>>>,
7270
pub(in crate::sessions) subquery_index: Arc<AtomicUsize>,
7371
pub(in crate::sessions) running_query: Arc<RwLock<Option<String>>>,
7472
pub(in crate::sessions) http_query: Arc<RwLock<Option<HttpQueryHandle>>>,
@@ -99,7 +97,6 @@ impl QueryContextShared {
9997
write_progress: Arc::new(Progress::create()),
10098
error: Arc::new(Mutex::new(None)),
10199
runtime: Arc::new(RwLock::new(None)),
102-
sources_abort_handle: Arc::new(RwLock::new(Vec::new())),
103100
subquery_index: Arc::new(AtomicUsize::new(1)),
104101
running_query: Arc::new(RwLock::new(None)),
105102
http_query: Arc::new(RwLock::new(None)),
@@ -123,11 +120,6 @@ impl QueryContextShared {
123120
executor.finish(Some(cause));
124121
}
125122

126-
let mut sources_abort_handle = self.sources_abort_handle.write();
127-
128-
while let Some(source_abort_handle) = sources_abort_handle.pop() {
129-
source_abort_handle.abort();
130-
}
131123
// TODO: Wait for the query to be processed (write out the last error)
132124
}
133125

@@ -256,11 +248,6 @@ impl QueryContextShared {
256248
running_query.as_ref().unwrap_or(&"".to_string()).clone()
257249
}
258250

259-
pub fn add_source_abort_handle(&self, handle: AbortHandle) {
260-
let mut sources_abort_handle = self.sources_abort_handle.write();
261-
sources_abort_handle.push(handle);
262-
}
263-
264251
pub fn get_config(&self) -> Config {
265252
self.config.clone()
266253
}

src/query/streams/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,13 @@
1414

1515
mod sources;
1616
mod stream;
17-
mod stream_abort;
1817
mod stream_datablock;
1918
mod stream_error;
2019
mod stream_progress;
2120
mod stream_take;
2221

2322
pub use sources::*;
2423
pub use stream::*;
25-
pub use stream_abort::AbortStream;
2624
pub use stream_datablock::DataBlockStream;
2725
pub use stream_error::ErrorStream;
2826
pub use stream_progress::ProgressStream;

src/query/streams/src/stream_abort.rs

Lines changed: 0 additions & 64 deletions
This file was deleted.

0 commit comments

Comments
 (0)