Skip to content

Commit d6b0a07

Browse files
b41shBohuTANGSkyFan2002
authored
feat(query): read write inverted index (#14827)
* feat(query): read write inverted index * fix check * fix check * fix check * fix fmt * fix check * Update src/query/ee/src/inverted_index/indexer.rs Co-authored-by: Sky Fan <3374614481@qq.com> --------- Co-authored-by: Bohu <overred.shuttler@gmail.com> Co-authored-by: Sky Fan <3374614481@qq.com>
1 parent c3c40a2 commit d6b0a07

File tree

29 files changed

+1636
-2
lines changed

29 files changed

+1636
-2
lines changed

Cargo.lock

Lines changed: 266 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/exception/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,6 @@ paste = "1.0.9"
3131
prost = { workspace = true }
3232
serde = { workspace = true }
3333
serde_json = { workspace = true }
34+
tantivy = "0.21.1"
3435
thiserror = { workspace = true }
3536
tonic = { workspace = true }

src/common/exception/src/exception_code.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,12 @@ build_exceptions! {
199199
IllegalCloudControlMessageFormat(1703),
200200

201201
// Geometry errors.
202-
GeometryError(1801)
202+
GeometryError(1801),
203+
204+
// Tantivy errors.
205+
TantivyError(1901),
206+
TantivyOpenReadError(1902),
207+
TantivyQueryParserError(1903)
203208
}
204209

205210
// Meta service errors [2001, 3000].

src/common/exception/src/exception_into.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,24 @@ impl From<GeozeroError> for ErrorCode {
237237
}
238238
}
239239

240+
impl From<tantivy::TantivyError> for ErrorCode {
241+
fn from(error: tantivy::TantivyError) -> Self {
242+
ErrorCode::TantivyError(error.to_string())
243+
}
244+
}
245+
246+
impl From<tantivy::directory::error::OpenReadError> for ErrorCode {
247+
fn from(error: tantivy::directory::error::OpenReadError) -> Self {
248+
ErrorCode::TantivyOpenReadError(error.to_string())
249+
}
250+
}
251+
252+
impl From<tantivy::query::QueryParserError> for ErrorCode {
253+
fn from(error: tantivy::query::QueryParserError) -> Self {
254+
ErrorCode::TantivyQueryParserError(error.to_string())
255+
}
256+
}
257+
240258
// === prost error ===
241259
impl From<prost::EncodeError> for ErrorCode {
242260
fn from(error: prost::EncodeError) -> Self {

src/query/ee/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ databend-common-users = { path = "../users" }
3636
databend-enterprise-aggregating-index = { path = "../ee_features/aggregating_index" }
3737
databend-enterprise-background-service = { path = "../ee_features/background_service" }
3838
databend-enterprise-data-mask-feature = { path = "../ee_features/data_mask" }
39+
databend-enterprise-inverted-index = { path = "../ee_features/inverted_index" }
3940
databend-enterprise-storage-encryption = { path = "../ee_features/storage_encryption" }
4041
databend-enterprise-stream-handler = { path = "../ee_features/stream_handler" }
4142
databend-enterprise-vacuum-handler = { path = "../ee_features/vacuum_handler" }

src/query/ee/src/enterprise_services.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use databend_common_license::license_manager::LicenseManager;
1919
use crate::aggregating_index::RealAggregatingIndexHandler;
2020
use crate::background_service::RealBackgroundService;
2121
use crate::data_mask::RealDatamaskHandler;
22+
use crate::inverted_index::RealInvertedIndexHandler;
2223
use crate::license::license_mgr::RealLicenseManager;
2324
use crate::storage_encryption::RealStorageEncryptionHandler;
2425
use crate::storages::fuse::operations::RealVacuumHandler;
@@ -37,6 +38,7 @@ impl EnterpriseServices {
3738
RealBackgroundService::init(&cfg).await?;
3839
RealVirtualColumnHandler::init()?;
3940
RealStreamHandler::init()?;
41+
RealInvertedIndexHandler::init()?;
4042
Ok(())
4143
}
4244
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Copyright 2023 Databend Cloud
2+
//
3+
// Licensed under the Elastic License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.elastic.co/licensing/elastic-license
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use databend_common_catalog::plan::Projection;
18+
use databend_common_catalog::table::Table;
19+
use databend_common_catalog::table_context::TableContext;
20+
use databend_common_exception::Result;
21+
use databend_common_expression::DataSchema;
22+
use databend_common_storages_fuse::io::InvertedIndexWriter;
23+
use databend_common_storages_fuse::io::MetaReaders;
24+
use databend_common_storages_fuse::io::ReadSettings;
25+
use databend_common_storages_fuse::FuseTable;
26+
use databend_storages_common_cache::LoadParams;
27+
use databend_storages_common_table_meta::meta::Location;
28+
29+
pub struct Indexer {}
30+
31+
impl Indexer {
32+
pub(crate) fn new() -> Indexer {
33+
Indexer {}
34+
}
35+
36+
#[async_backtrace::framed]
37+
pub(crate) async fn index(
38+
&self,
39+
fuse_table: &FuseTable,
40+
ctx: Arc<dyn TableContext>,
41+
schema: DataSchema,
42+
segment_locs: Option<Vec<Location>>,
43+
) -> Result<String> {
44+
let Some(snapshot) = fuse_table.read_table_snapshot().await? else {
45+
// no snapshot
46+
return Ok("".to_string());
47+
};
48+
if schema.fields.is_empty() {
49+
// no field for index
50+
return Ok("".to_string());
51+
}
52+
53+
let table_schema = &fuse_table.get_table_info().meta.schema;
54+
55+
// Collect field indices used by inverted index.
56+
let mut field_indices = Vec::new();
57+
for field in &schema.fields {
58+
let field_index = table_schema.index_of(field.name())?;
59+
field_indices.push(field_index);
60+
}
61+
62+
let projection = Projection::Columns(field_indices);
63+
let block_reader =
64+
fuse_table.create_block_reader(ctx.clone(), projection, false, false, false)?;
65+
66+
let segment_reader =
67+
MetaReaders::segment_info_reader(fuse_table.get_operator(), table_schema.clone());
68+
69+
let settings = ReadSettings::from_ctx(&ctx)?;
70+
let write_settings = fuse_table.get_write_settings();
71+
let storage_format = write_settings.storage_format;
72+
73+
let operator = fuse_table.get_operator_ref();
74+
75+
// If no segment locations are specified, iterates through all segments
76+
let segment_locs = if let Some(segment_locs) = segment_locs {
77+
segment_locs
78+
} else {
79+
snapshot.segments.clone()
80+
};
81+
82+
let mut index_writer = InvertedIndexWriter::try_create(schema)?;
83+
84+
for (location, ver) in segment_locs {
85+
let segment_info = segment_reader
86+
.read(&LoadParams {
87+
location: location.to_string(),
88+
len_hint: None,
89+
ver,
90+
put_cache: false,
91+
})
92+
.await?;
93+
94+
let block_metas = segment_info.block_metas()?;
95+
for block_meta in block_metas {
96+
let block = block_reader
97+
.read_by_meta(&settings, &block_meta, &storage_format)
98+
.await?;
99+
100+
index_writer.add_block(block)?;
101+
}
102+
}
103+
104+
let location_generator = fuse_table.meta_location_generator();
105+
106+
let index_location = index_writer.finalize(operator, location_generator).await?;
107+
// TODO: add index location to meta
108+
Ok(index_location)
109+
}
110+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright 2023 Databend Cloud
2+
//
3+
// Licensed under the Elastic License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.elastic.co/licensing/elastic-license
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use databend_common_base::base::GlobalInstance;
18+
use databend_common_catalog::table_context::TableContext;
19+
use databend_common_exception::Result;
20+
use databend_common_expression::DataSchema;
21+
use databend_common_storages_fuse::FuseTable;
22+
use databend_enterprise_inverted_index::InvertedIndexHandler;
23+
use databend_enterprise_inverted_index::InvertedIndexHandlerWrapper;
24+
use databend_storages_common_table_meta::meta::Location;
25+
26+
use super::indexer::Indexer;
27+
28+
pub struct RealInvertedIndexHandler {}
29+
30+
#[async_trait::async_trait]
31+
impl InvertedIndexHandler for RealInvertedIndexHandler {
32+
#[async_backtrace::framed]
33+
async fn do_refresh_index(
34+
&self,
35+
fuse_table: &FuseTable,
36+
ctx: Arc<dyn TableContext>,
37+
schema: DataSchema,
38+
segment_locs: Option<Vec<Location>>,
39+
) -> Result<String> {
40+
let indexer = Indexer::new();
41+
indexer.index(fuse_table, ctx, schema, segment_locs).await
42+
}
43+
}
44+
45+
impl RealInvertedIndexHandler {
46+
pub fn init() -> Result<()> {
47+
let rm = RealInvertedIndexHandler {};
48+
let wrapper = InvertedIndexHandlerWrapper::new(Box::new(rm));
49+
GlobalInstance::set(Arc::new(wrapper));
50+
Ok(())
51+
}
52+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright 2023 Databend Cloud
2+
//
3+
// Licensed under the Elastic License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.elastic.co/licensing/elastic-license
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
mod indexer;
16+
mod inverted_index_handler;
17+
pub use inverted_index_handler::RealInvertedIndexHandler;

src/query/ee/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub mod aggregating_index;
1616
pub mod background_service;
1717
pub mod data_mask;
1818
pub mod enterprise_services;
19+
pub mod inverted_index;
1920
pub mod license;
2021
pub mod storage_encryption;
2122
pub mod storages;

0 commit comments

Comments
 (0)