Skip to content

Commit 82e1d41

Browse files
authored
Merge branch 'main' into move-old-to-planner
2 parents ac26c48 + e59bed1 commit 82e1d41

File tree

16 files changed

+38
-196
lines changed

16 files changed

+38
-196
lines changed

src/query/functions/src/scalars/logics/and.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ impl Function for LogicAndFiltersFunction {
7979
input_rows: usize,
8080
) -> Result<ColumnRef> {
8181
if columns.len() == 1 {
82-
return Ok(columns[1].column().clone());
82+
return Ok(columns[0].column().clone());
8383
}
8484

8585
let mut validity = None;

src/query/pipeline/core/src/processors/processor.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use futures::FutureExt;
2323
use petgraph::graph::node_index;
2424
use petgraph::prelude::NodeIndex;
2525

26-
#[derive(Debug)]
2726
pub enum Event {
2827
NeedData,
2928
NeedConsume,

src/query/planner/src/metadata.rs

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,32 @@ impl Metadata {
163163
table_index
164164
}
165165

166-
pub fn find_smallest_column_within(&self, indices: &[usize]) -> usize {
167-
let entries = indices
166+
/// find_smallest_column in given indices.
167+
pub fn find_smallest_column(&self, indices: &[usize]) -> usize {
168+
let mut smallest_index = indices.iter().min().expect("indices must be valid");
169+
let mut smallest_size = usize::MAX;
170+
for idx in indices.iter() {
171+
let entry = self.column(*idx);
172+
if let Ok(bytes) = entry.data_type.data_type_id().numeric_byte_size() {
173+
if smallest_size > bytes {
174+
smallest_size = bytes;
175+
smallest_index = &entry.column_index;
176+
}
177+
}
178+
}
179+
*smallest_index
180+
}
181+
182+
/// find_smallest_column_by_table_index by given table_index
183+
pub fn find_smallest_column_by_table_index(&self, table_index: IndexType) -> usize {
184+
let indices: Vec<usize> = self
185+
.columns
168186
.iter()
169-
.map(|i| self.column(*i).clone())
170-
.collect::<Vec<_>>();
171-
find_smallest_column(entries.as_slice())
187+
.filter(|v| v.table_index == Some(table_index))
188+
.map(|v| v.column_index)
189+
.collect();
190+
191+
self.find_smallest_column(&indices)
172192
}
173193
}
174194

@@ -295,24 +315,3 @@ impl ColumnEntry {
295315
self.path_indices.is_some()
296316
}
297317
}
298-
299-
/// TODO(xuanwo): migrate this as a function of metadata.
300-
pub fn find_smallest_column(entries: &[ColumnEntry]) -> usize {
301-
debug_assert!(!entries.is_empty());
302-
let mut column_indexes = entries
303-
.iter()
304-
.map(|entry| entry.column_index)
305-
.collect::<Vec<IndexType>>();
306-
column_indexes.sort();
307-
let mut smallest_index = column_indexes[0];
308-
let mut smallest_size = usize::MAX;
309-
for (idx, column_entry) in entries.iter().enumerate() {
310-
if let Ok(bytes) = column_entry.data_type.data_type_id().numeric_byte_size() {
311-
if smallest_size > bytes {
312-
smallest_size = bytes;
313-
smallest_index = entries[idx].column_index;
314-
}
315-
}
316-
}
317-
smallest_index
318-
}

src/query/service/src/pipelines/executor/executor_graph.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -242,15 +242,7 @@ impl ExecutingGraph {
242242
state_guard_cache = Some(node.state.lock().unwrap());
243243
}
244244

245-
let event = node.processor.event()?;
246-
tracing::debug!(
247-
"node id:{:?}, name:{:?}, event: {:?}",
248-
node.processor.id(),
249-
node.processor.name(),
250-
event
251-
);
252-
253-
let processor_state = match event {
245+
let processor_state = match node.processor.event()? {
254246
Event::Finished => State::Finished,
255247
Event::NeedData | Event::NeedConsume => State::Idle,
256248
Event::Sync => {

src/query/service/src/pipelines/executor/executor_worker_context.rs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
use std::fmt::Debug;
1616
use std::fmt::Formatter;
1717
use std::sync::Arc;
18-
use std::time::Instant;
1918

2019
use common_base::base::TrySpawn;
2120
use common_exception::ErrorCode;
@@ -80,19 +79,7 @@ impl ExecutorWorkerContext {
8079
}
8180

8281
unsafe fn execute_sync_task(&mut self, processor: ProcessorPtr) -> Result<Option<NodeIndex>> {
83-
if tracing::enabled!(tracing::Level::DEBUG) {
84-
let start = Instant::now();
85-
processor.process()?;
86-
tracing::debug!(
87-
"sync processor, node id:{:?}, name:{:?}, event: {:?}",
88-
processor.id(),
89-
processor.name(),
90-
start.elapsed()
91-
);
92-
} else {
93-
processor.process()?;
94-
}
95-
82+
processor.process()?;
9683
Ok(Some(processor.id()))
9784
}
9885

src/query/service/src/pipelines/executor/processor_async_task.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,6 @@ impl ProcessorAsyncTask {
8080
);
8181
}
8282
Either::Right((res, _)) => {
83-
tracing::debug!(
84-
"async processor, node id {:?} name: {:?}, elapsed:{:?}",
85-
wraning_processor.id(),
86-
wraning_processor.name(),
87-
start.elapsed()
88-
);
8983
return res;
9084
}
9185
}

src/query/service/src/servers/http/clickhouse_handler.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ async fn execute(
111111
.start()
112112
.await
113113
.map_err(|e| error!("interpreter.start.error: {:?}", e));
114-
let data_stream: SendableDataBlockStream = {
114+
let mut data_stream: SendableDataBlockStream = {
115115
let output_port = OutputPort::create();
116116
let stream_source = StreamSource::create(ctx.clone(), input_stream, output_port.clone())?;
117117
let mut source_pipe_builder = SourcePipeBuilder::create();
@@ -122,7 +122,6 @@ async fn execute(
122122
interpreter.execute(ctx.clone()).await?
123123
};
124124

125-
let mut data_stream = ctx.try_create_abortable(data_stream)?;
126125
let format_setting = ctx.get_format_settings()?;
127126
let mut output_format = format.create_format(schema, format_setting);
128127
let prefix = Ok(output_format.serialize_prefix()?);

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ use common_exception::Result;
3030
use common_legacy_planners::PlanNode;
3131
use common_streams::DataBlockStream;
3232
use common_streams::SendableDataBlockStream;
33-
use futures::future::AbortHandle;
34-
use futures::future::Abortable;
3533
use futures::StreamExt;
3634
use futures_util::FutureExt;
3735
use serde::Deserialize;
@@ -322,8 +320,7 @@ async fn execute(
322320
block_buffer: Arc<BlockBuffer>,
323321
executor: Arc<RwLock<Executor>>,
324322
) -> Result<()> {
325-
let data_stream = interpreter.execute(ctx.clone()).await?;
326-
let mut data_stream = ctx.try_create_abortable(data_stream)?;
323+
let mut data_stream = interpreter.execute(ctx.clone()).await?;
327324
let use_result_cache = !ctx.get_config().query.management_mode;
328325

329326
match data_stream.next().await {
@@ -425,17 +422,10 @@ impl HttpQueryHandle {
425422
};
426423

427424
let (error_sender, mut error_receiver) = mpsc::channel::<Result<()>>(1);
428-
let (abort_handle, abort_registration) = AbortHandle::new_pair();
429425

430426
GlobalIORuntime::instance().spawn(async move {
431-
let error_receiver = Abortable::new(error_receiver.recv(), abort_registration);
432-
ctx.add_source_abort_handle(abort_handle);
433-
match error_receiver.await {
434-
Err(_) => {
435-
Executor::stop(&executor, Err(ErrorCode::AbortedQuery("")), false).await;
436-
block_buffer.stop_push().await.unwrap();
437-
}
438-
Ok(Some(Err(e))) => {
427+
match error_receiver.recv().await {
428+
Some(Err(e)) => {
439429
Executor::stop(&executor, Err(e), false).await;
440430
block_buffer.stop_push().await.unwrap();
441431
}

src/query/service/src/servers/mysql/mysql_interactive_worker.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -459,10 +459,7 @@ impl<W: AsyncWrite + Send + Unpin> InteractiveWorkerBase<W> {
459459
}
460460
};
461461

462-
let abortable_stream = ctx
463-
.try_create_abortable(intercepted_stream.boxed())?
464-
.boxed();
465-
Ok::<_, ErrorCode>(abortable_stream)
462+
Ok::<_, ErrorCode>(intercepted_stream.boxed())
466463
}
467464
.in_current_span()
468465
})?;

src/query/service/src/sessions/query_ctx.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@ use common_legacy_planners::SourceInfo;
4141
use common_legacy_planners::StageTableInfo;
4242
use common_meta_app::schema::TableInfo;
4343
use common_meta_types::UserInfo;
44-
use common_streams::AbortStream;
45-
use common_streams::SendableDataBlockStream;
46-
use futures::future::AbortHandle;
4744
use opendal::Operator;
4845
use parking_lot::Mutex;
4946
use parking_lot::RwLock;
@@ -144,16 +141,6 @@ impl QueryContext {
144141
DataExchangeManager::instance()
145142
}
146143

147-
pub fn try_create_abortable(&self, input: SendableDataBlockStream) -> Result<AbortStream> {
148-
let (abort_handle, abort_stream) = AbortStream::try_create(input)?;
149-
self.shared.add_source_abort_handle(abort_handle);
150-
Ok(abort_stream)
151-
}
152-
153-
pub fn add_source_abort_handle(&self, abort_handle: AbortHandle) {
154-
self.shared.add_source_abort_handle(abort_handle);
155-
}
156-
157144
pub fn attach_http_query(&self, handle: HttpQueryHandle) {
158145
self.shared.attach_http_query_handle(handle);
159146
}

0 commit comments

Comments
 (0)