Skip to content

Commit 16a7800

Browse files
committed
multi thread pruning
1 parent eeeda23 commit 16a7800

File tree

10 files changed

+345
-116
lines changed

10 files changed

+345
-116
lines changed

src/common/metrics/src/metrics/storage.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,8 @@ static BLOCK_VECTOR_INDEX_GENERATE_MILLISECONDS: LazyLock<Histogram> = LazyLock:
190190
static BLOCK_VECTOR_INDEX_READ_MILLISECONDS: LazyLock<Histogram> = LazyLock::new(|| {
191191
register_histogram_in_milliseconds("fuse_block_vector_index_read_milliseconds")
192192
});
193-
static BLOCK_VECTOR_INDEX_SEARCH_MILLISECONDS: LazyLock<Histogram> = LazyLock::new(|| {
194-
register_histogram_in_milliseconds("fuse_block_vector_index_search_milliseconds")
193+
static BLOCK_VECTOR_INDEX_PRUNING_MILLISECONDS: LazyLock<Histogram> = LazyLock::new(|| {
194+
register_histogram_in_milliseconds("fuse_block_vector_index_pruning_milliseconds")
195195
});
196196
static BLOCK_VECTOR_INDEX_READ_BYTES: LazyLock<Counter> =
197197
LazyLock::new(|| register_counter("fuse_block_vector_index_read_bytes"));
@@ -626,8 +626,8 @@ pub fn metrics_inc_block_vector_index_read_milliseconds(c: u64) {
626626
BLOCK_VECTOR_INDEX_READ_MILLISECONDS.observe(c as f64);
627627
}
628628

629-
pub fn metrics_inc_block_vector_index_search_milliseconds(c: u64) {
630-
BLOCK_VECTOR_INDEX_SEARCH_MILLISECONDS.observe(c as f64);
629+
pub fn metrics_inc_block_vector_index_pruning_milliseconds(c: u64) {
630+
BLOCK_VECTOR_INDEX_PRUNING_MILLISECONDS.observe(c as f64);
631631
}
632632

633633
pub fn metrics_inc_block_vector_index_read_bytes(c: u64) {

src/query/ast/src/ast/statements/statement.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,8 @@ impl Display for Statement {
675675
unreachable!();
676676
}
677677
write!(f, ") ")?;
678+
} else {
679+
write!(f, "SETTINGS ")?;
678680
}
679681
write!(f, "{stmt}")?;
680682
}

src/query/catalog/src/plan/pushdown.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,6 @@ impl PushDownInfo {
256256
}
257257
}
258258

259-
pub fn vector_topn(&self) -> bool {
260-
!self.order_by.is_empty() && self.limit.is_some() && self.vector_index.is_some()
261-
}
262-
263259
pub fn prewhere_of_push_downs(push_downs: Option<&PushDownInfo>) -> Option<PrewhereInfo> {
264260
if let Some(PushDownInfo { prewhere, .. }) = push_downs {
265261
prewhere.clone()

src/query/storages/fuse/src/io/read/vector_index/vector_index_reader.rs

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::time::Instant;
16-
1715
use databend_common_exception::Result;
18-
use databend_common_metrics::storage::metrics_inc_block_vector_index_search_milliseconds;
1916
use databend_storages_common_index::DistanceType;
2017
use databend_storages_common_index::HNSWIndex;
2118
use databend_storages_common_index::ScoredPointOffset;
@@ -61,8 +58,6 @@ impl VectorIndexReader {
6158
row_count: usize,
6259
location: &str,
6360
) -> Result<Vec<ScoredPointOffset>> {
64-
let start = Instant::now();
65-
6661
let binary_columns = load_vector_index_files(
6762
self.operator.clone(),
6863
&self.settings,
@@ -72,24 +67,14 @@ impl VectorIndexReader {
7267
.await?;
7368

7469
let hnsw_index = HNSWIndex::open(self.distance_type, self.dim, row_count, binary_columns)?;
75-
76-
let res = hnsw_index.search(limit, &self.query_values)?;
77-
78-
// Perf.
79-
{
80-
metrics_inc_block_vector_index_search_milliseconds(start.elapsed().as_millis() as u64);
81-
}
82-
83-
Ok(res)
70+
hnsw_index.search(limit, &self.query_values)
8471
}
8572

8673
pub async fn generate_scores(
8774
&self,
8875
row_count: usize,
8976
location: &str,
9077
) -> Result<Vec<ScoredPointOffset>> {
91-
let start = Instant::now();
92-
9378
let binary_columns = load_vector_index_files(
9479
self.operator.clone(),
9580
&self.settings,
@@ -99,14 +84,6 @@ impl VectorIndexReader {
9984
.await?;
10085

10186
let hnsw_index = HNSWIndex::open(self.distance_type, self.dim, row_count, binary_columns)?;
102-
103-
let res = hnsw_index.generate_scores(row_count as u32, &self.query_values)?;
104-
105-
// Perf.
106-
{
107-
metrics_inc_block_vector_index_search_milliseconds(start.elapsed().as_millis() as u64);
108-
}
109-
110-
Ok(res)
87+
hnsw_index.generate_scores(row_count as u32, &self.query_values)
11188
}
11289
}

src/query/storages/fuse/src/operations/read_partitions.rs

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ use crate::pruning::BlockPruner;
8383
use crate::pruning::FusePruner;
8484
use crate::pruning::SegmentLocation;
8585
use crate::pruning::SegmentPruner;
86+
use crate::pruning::VectorIndexPruner;
8687
use crate::pruning_pipeline::AsyncBlockPruneTransform;
8788
use crate::pruning_pipeline::ColumnOrientedBlockPruneSink;
8889
use crate::pruning_pipeline::ExtractSegmentTransform;
@@ -95,6 +96,7 @@ use crate::pruning_pipeline::SendPartInfoSink;
9596
use crate::pruning_pipeline::SendPartState;
9697
use crate::pruning_pipeline::SyncBlockPruneTransform;
9798
use crate::pruning_pipeline::TopNPruneTransform;
99+
use crate::pruning_pipeline::VectorIndexPruneTransform;
98100
use crate::segment_format_from_location;
99101
use crate::FuseLazyPartInfo;
100102
use crate::FuseSegmentFormat;
@@ -157,15 +159,7 @@ impl FuseTable {
157159
nodes_num = cluster.nodes.len();
158160
}
159161

160-
let has_vector_topn = if let Some(ref push_downs) = push_downs {
161-
push_downs.vector_topn()
162-
} else {
163-
false
164-
};
165-
166-
if (self.is_column_oriented() || (segment_len > nodes_num && distributed_pruning))
167-
&& !has_vector_topn
168-
{
162+
if self.is_column_oriented() || (segment_len > nodes_num && distributed_pruning) {
169163
let mut segments = Vec::with_capacity(segment_locs.len());
170164
for (idx, segment_location) in segment_locs.into_iter().enumerate() {
171165
segments.push(FuseLazyPartInfo::create(idx, segment_location))
@@ -491,6 +485,27 @@ impl FuseTable {
491485
})?;
492486
}
493487

488+
if push_down
489+
.as_ref()
490+
.filter(|p| p.vector_index.is_some())
491+
.is_some()
492+
{
493+
let pruning_ctx = pruner.pruning_ctx.clone();
494+
let schema = pruner.table_schema.clone();
495+
let push_down = push_down.as_ref().unwrap();
496+
let filters = push_down.filters.clone();
497+
let sort = push_down.order_by.clone();
498+
let limit = push_down.limit;
499+
let vector_index = push_down.vector_index.clone().unwrap();
500+
501+
let vector_index_pruner =
502+
VectorIndexPruner::create(pruning_ctx, schema, vector_index, filters, sort, limit)?;
503+
prune_pipeline.resize(1, false)?;
504+
prune_pipeline.add_transform(move |input, output| {
505+
VectorIndexPruneTransform::create(input, output, vector_index_pruner.clone())
506+
})?;
507+
}
508+
494509
let top_k = push_down
495510
.as_ref()
496511
.filter(|_| self.is_native()) // Only native format supports topk push down.

src/query/storages/fuse/src/pruning/fuse_pruner.rs

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@ use databend_common_expression::RemoteExpr;
2626
use databend_common_expression::TableSchemaRef;
2727
use databend_common_expression::SEGMENT_NAME_COL_NAME;
2828
use databend_common_functions::BUILTIN_FUNCTIONS;
29-
use databend_common_metrics::storage::metrics_inc_blocks_vector_index_pruning_after;
30-
use databend_common_metrics::storage::metrics_inc_blocks_vector_index_pruning_before;
31-
use databend_common_metrics::storage::metrics_inc_bytes_block_vector_index_pruning_after;
32-
use databend_common_metrics::storage::metrics_inc_bytes_block_vector_index_pruning_before;
3329
use databend_common_sql::BloomIndexColumns;
3430
use databend_common_sql::DefaultExprBinder;
3531
use databend_storages_common_cache::CacheAccessor;
@@ -566,35 +562,15 @@ impl FusePruner {
566562
let vector_index = push_down.vector_index.clone().unwrap();
567563

568564
let vector_pruner = VectorIndexPruner::create(
569-
self.pruning_ctx.ctx.clone(),
570-
self.pruning_ctx.dal.clone(),
565+
self.pruning_ctx.clone(),
571566
schema,
572567
vector_index,
573568
filters,
574569
sort,
575570
limit,
576571
)?;
577572

578-
// Perf.
579-
{
580-
let block_size = metas.iter().map(|(_, m)| m.block_size).sum();
581-
metrics_inc_blocks_vector_index_pruning_before(metas.len() as u64);
582-
metrics_inc_bytes_block_vector_index_pruning_before(block_size);
583-
self.pruning_ctx
584-
.pruning_stats
585-
.set_blocks_vector_index_pruning_before(metas.len() as u64);
586-
}
587573
let pruned_metas = vector_pruner.prune(metas.clone()).await?;
588-
589-
// Perf.
590-
{
591-
let block_size = pruned_metas.iter().map(|(_, m)| m.block_size).sum();
592-
metrics_inc_blocks_vector_index_pruning_after(pruned_metas.len() as u64);
593-
metrics_inc_bytes_block_vector_index_pruning_after(block_size);
594-
self.pruning_ctx
595-
.pruning_stats
596-
.set_blocks_vector_index_pruning_after(pruned_metas.len() as u64);
597-
}
598574
return Ok(pruned_metas);
599575
}
600576
Ok(metas)

0 commit comments

Comments
 (0)