Skip to content

Commit 4d92219

Browse files
authored
Merge pull request #7595 from sandflee/slow2
feat(hive): delay start the worker for simple select hive query
2 parents acd4841 + 10e4b02 commit 4d92219

File tree

2 files changed

+65
-2
lines changed

2 files changed

+65
-2
lines changed

src/query/storages/hive/src/hive_table.rs

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashSet;
1516
use std::sync::Arc;
1617
use std::time::Instant;
1718

@@ -32,6 +33,7 @@ use common_legacy_planners::Extras;
3233
use common_legacy_planners::Partitions;
3334
use common_legacy_planners::Projection;
3435
use common_legacy_planners::ReadDataSourcePlan;
36+
use common_legacy_planners::RequireColumnsVisitor;
3537
use common_legacy_planners::Statistics;
3638
use common_legacy_planners::TruncateTablePlan;
3739
use common_meta_app::schema::TableInfo;
@@ -109,19 +111,69 @@ impl HiveTable {
109111
let max_threads = std::cmp::min(parts_len, max_threads);
110112

111113
let mut source_builder = SourcePipeBuilder::create();
114+
let delay_timer = if self.is_simple_select_query(plan) {
115+
// 0, 0, 200, 200, 400,400
116+
|x: usize| (x / 2).min(10) * 200
117+
} else {
118+
|_| 0
119+
};
112120

113-
for _index in 0..std::cmp::max(1, max_threads) {
121+
for index in 0..std::cmp::max(1, max_threads) {
114122
let output = OutputPort::create();
115123
source_builder.add_source(
116124
output.clone(),
117-
HiveTableSource::create(ctx.clone(), output, block_reader.clone())?,
125+
HiveTableSource::create(
126+
ctx.clone(),
127+
output,
128+
block_reader.clone(),
129+
delay_timer(index),
130+
)?,
118131
);
119132
}
120133

121134
pipeline.add_pipe(source_builder.finalize());
122135
Ok(())
123136
}
124137

138+
// simple select query is the sql likes `select * from xx limit 10` or
139+
// `select * from xx where p_date = '20220201' limit 10` where p_date is a partition column;
140+
// we just need to read few datas from table
141+
fn is_simple_select_query(&self, plan: &ReadDataSourcePlan) -> bool {
142+
// couldn't get groupby order by info
143+
if let Some(Extras {
144+
filters: f,
145+
limit: Some(lm),
146+
..
147+
}) = &plan.push_downs
148+
{
149+
if *lm > 100000 {
150+
return false;
151+
}
152+
153+
// filter out the partition column related expressions
154+
let partition_keys = self.get_partition_key_sets();
155+
let columns = Self::get_columns_from_expressions(f);
156+
if columns.difference(&partition_keys).count() == 0 {
157+
return true;
158+
}
159+
}
160+
false
161+
}
162+
163+
fn get_partition_key_sets(&self) -> HashSet<String> {
164+
match &self.table_options.partition_keys {
165+
Some(v) => v.iter().cloned().collect::<HashSet<_>>(),
166+
None => HashSet::new(),
167+
}
168+
}
169+
170+
fn get_columns_from_expressions(expressions: &[Expression]) -> HashSet<String> {
171+
expressions
172+
.iter()
173+
.flat_map(|e| RequireColumnsVisitor::collect_columns_from_expr(e).unwrap())
174+
.collect::<HashSet<_>>()
175+
}
176+
125177
fn create_block_reader(
126178
&self,
127179
ctx: &Arc<dyn TableContext>,

src/query/storages/hive/src/hive_table_source.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
use std::any::Any;
1616
use std::sync::Arc;
1717

18+
use common_base::base::tokio::time::sleep;
19+
use common_base::base::tokio::time::Duration;
1820
use common_base::base::Progress;
1921
use common_base::base::ProgressValues;
2022
use common_catalog::table_context::TableContext;
@@ -57,13 +59,15 @@ pub struct HiveTableSource {
5759
scan_progress: Arc<Progress>,
5860
block_reader: Arc<HiveParquetBlockReader>,
5961
output: Arc<OutputPort>,
62+
delay: usize,
6063
}
6164

6265
impl HiveTableSource {
6366
pub fn create(
6467
ctx: Arc<dyn TableContext>,
6568
output: Arc<OutputPort>,
6669
block_reader: Arc<HiveParquetBlockReader>,
70+
delay: usize,
6771
) -> Result<ProcessorPtr> {
6872
let scan_progress = ctx.get_scan_progress();
6973
let mut partitions = ctx.try_get_partitions(1)?;
@@ -74,13 +78,15 @@ impl HiveTableSource {
7478
block_reader,
7579
scan_progress,
7680
state: State::Finish,
81+
delay,
7782
}))),
7883
false => Ok(ProcessorPtr::create(Box::new(HiveTableSource {
7984
ctx,
8085
output,
8186
block_reader,
8287
scan_progress,
8388
state: State::ReadMeta(partitions.remove(0)),
89+
delay,
8490
}))),
8591
}
8692
}
@@ -179,6 +185,11 @@ impl Processor for HiveTableSource {
179185
async fn async_process(&mut self) -> Result<()> {
180186
match std::mem::replace(&mut self.state, State::Finish) {
181187
State::ReadMeta(part) => {
188+
if self.delay > 0 {
189+
sleep(Duration::from_millis(self.delay as u64)).await;
190+
tracing::debug!("sleep for {}ms", self.delay);
191+
self.delay = 0;
192+
}
182193
let part = HivePartInfo::from_part(&part)?;
183194
let file_meta = self
184195
.block_reader

0 commit comments

Comments
 (0)