Skip to content

Commit 95eb6df

Browse files
lichuangkkk25641463drmingdrmer
authored
feat: add nextval table function (#15294)
* feat: add nextval table function * feat: add nextval table function * feat: add nextval table function * feat: add nextval table function * feat: add nextval table function * feat: add nextval table function * feat: add nextval table function * feat: add nextval table function * feat: add nextval table function * feat: add nextval table function as insert value * feat: add nextval table function * feat: add nextval table function * feat: add nextval table function * feat: add nextval table function * feat: add nextval table function, fix explain bug * feat: add nextval table function, fix explain bug * feat(query): implement ST_ENDPOINT (#15366) * ST_ENDPOINT Signed-off-by: Fan Yang <yangfanlinux@gmail.com> * Update geometry.rs * Update geometry.rs Signed-off-by: Fan Yang <yangfanlinux@gmail.com> --------- Signed-off-by: Fan Yang <yangfanlinux@gmail.com> * refactor: Define StageFileIdent with TIdent (#15371) * feat: add nextval table function * feat: add nextval table function * feat: add nextval table function * feat: add nextval table function * feat: add nextval table function * feat: add nextval table function * feat: add nextval table function * feat: add nextval table function --------- Signed-off-by: Fan Yang <yangfanlinux@gmail.com> Co-authored-by: Fan Yang <yangfanlinux@gmail.com> Co-authored-by: 张炎泼 <drdr.xp@gmail.com>
1 parent ddfca5d commit 95eb6df

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+950
-11
lines changed

Cargo.lock

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ members = [
3131
"src/common/license",
3232
# Query
3333
"src/query/ast",
34+
"src/query/async_functions",
3435
"src/query/codegen",
3536
"src/query/config",
3637
"src/query/constraint",

src/query/async_functions/Cargo.toml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[package]
2+
name = "databend-common-async-functions"
3+
version = { workspace = true }
4+
authors = { workspace = true }
5+
license = { workspace = true }
6+
publish = { workspace = true }
7+
edition = { workspace = true }
8+
9+
[lib]
10+
doctest = false
11+
12+
[dependencies] # In alphabetical order
13+
# Workspace dependencies
14+
databend-common-ast = { path = "../ast" }
15+
databend-common-catalog = { path = "../catalog" }
16+
databend-common-exception = { path = "../../common/exception" }
17+
databend-common-expression = { path = "../expression" }
18+
databend-common-meta-app = { path = "../../meta/app" }
19+
20+
# Crates.io dependencies
21+
educe = "0.4"
22+
23+
[dev-dependencies]
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use databend_common_ast::ast::ColumnID;
18+
use databend_common_ast::ast::Expr;
19+
use databend_common_catalog::catalog::Catalog;
20+
use databend_common_exception::ErrorCode;
21+
use databend_common_exception::Result;
22+
use databend_common_exception::Span;
23+
use databend_common_expression::types::DataType;
24+
use databend_common_expression::types::NumberDataType;
25+
use databend_common_expression::Scalar;
26+
use databend_common_meta_app::schema::GetSequenceReq;
27+
use databend_common_meta_app::schema::SequenceIdent;
28+
use databend_common_meta_app::tenant::Tenant;
29+
use educe::Educe;
30+
31+
use crate::sequence_async_function::SequenceAsyncFunction;
32+
33+
#[derive(Clone, Debug, Educe)]
34+
#[educe(PartialEq, Eq, Hash)]
35+
pub enum AsyncFunction {
36+
SequenceAsyncFunction(SequenceAsyncFunction),
37+
}
38+
39+
#[derive(Clone, Debug, Educe)]
40+
#[educe(PartialEq, Eq, Hash)]
41+
pub struct AsyncFunctionCall {
42+
#[educe(Hash(ignore), PartialEq(ignore), Eq(ignore))]
43+
pub span: Span,
44+
pub func_name: String,
45+
pub display_name: String,
46+
pub return_type: Box<DataType>,
47+
pub arguments: Vec<String>,
48+
pub tenant: Tenant,
49+
pub function: AsyncFunction,
50+
}
51+
52+
impl AsyncFunction {
53+
pub async fn generate(
54+
&self,
55+
catalog: Arc<dyn Catalog>,
56+
async_func: &AsyncFunctionCall,
57+
) -> Result<Scalar> {
58+
match &async_func.function {
59+
AsyncFunction::SequenceAsyncFunction(async_function) => {
60+
async_function.generate(catalog, async_func).await
61+
}
62+
}
63+
}
64+
}
65+
66+
pub async fn resolve_async_function(
67+
span: Span,
68+
tenant: Tenant,
69+
catalog: Arc<dyn Catalog>,
70+
func_name: &str,
71+
arguments: &[&Expr],
72+
) -> Result<AsyncFunctionCall> {
73+
if func_name == "nextval" {
74+
resolve_nextval(span, tenant, catalog, arguments).await
75+
} else {
76+
Err(ErrorCode::SemanticError(format!(
77+
"cannot find function {}",
78+
func_name
79+
)))
80+
}
81+
}
82+
83+
async fn resolve_nextval(
84+
span: Span,
85+
tenant: Tenant,
86+
catalog: Arc<dyn Catalog>,
87+
arguments: &[&Expr],
88+
) -> Result<AsyncFunctionCall> {
89+
if arguments.len() != 1 {
90+
return Err(ErrorCode::SemanticError(format!(
91+
"nextval function need one argument but got {}",
92+
arguments.len()
93+
)));
94+
}
95+
let sequence_name = if let Expr::ColumnRef { column, .. } = arguments[0] {
96+
if let ColumnID::Name(name) = &column.column {
97+
name.name.clone()
98+
} else {
99+
return Err(ErrorCode::SemanticError(
100+
"async function can only used as column".to_string(),
101+
));
102+
}
103+
} else {
104+
return Err(ErrorCode::SemanticError(
105+
"async function can only used as column".to_string(),
106+
));
107+
};
108+
109+
let req = GetSequenceReq {
110+
ident: SequenceIdent::new(tenant.clone(), sequence_name.clone()),
111+
};
112+
113+
let _ = catalog.get_sequence(req).await?;
114+
115+
let table_func = AsyncFunctionCall {
116+
span,
117+
func_name: "nextval".to_string(),
118+
display_name: format!("nextval({})", sequence_name),
119+
return_type: Box::new(DataType::Number(NumberDataType::UInt64)),
120+
arguments: vec![sequence_name],
121+
tenant,
122+
function: AsyncFunction::SequenceAsyncFunction(SequenceAsyncFunction {}),
123+
};
124+
125+
Ok(table_func)
126+
}

src/query/async_functions/src/lib.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
pub mod async_function;
16+
pub mod sequence_async_function;
17+
18+
pub use async_function::resolve_async_function;
19+
pub use async_function::AsyncFunction;
20+
pub use async_function::AsyncFunctionCall;
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use databend_common_catalog::catalog::Catalog;
18+
use databend_common_exception::Result;
19+
use databend_common_expression::types::NumberScalar;
20+
use databend_common_expression::Scalar;
21+
use databend_common_meta_app::schema::GetSequenceNextValueReq;
22+
use databend_common_meta_app::schema::SequenceIdent;
23+
use educe::Educe;
24+
25+
use crate::AsyncFunctionCall;
26+
27+
#[derive(Clone, Debug, Educe)]
28+
#[educe(PartialEq, Eq, Hash)]
29+
pub struct SequenceAsyncFunction {}
30+
31+
impl SequenceAsyncFunction {
32+
pub async fn generate(
33+
&self,
34+
catalog: Arc<dyn Catalog>,
35+
async_func: &AsyncFunctionCall,
36+
) -> Result<Scalar> {
37+
let tenant = &async_func.tenant;
38+
let req = GetSequenceNextValueReq {
39+
ident: SequenceIdent::new(tenant, async_func.arguments[0].clone()),
40+
count: 1,
41+
};
42+
43+
let reply = catalog.get_sequence_next_value(req).await?;
44+
45+
Ok(Scalar::Number(NumberScalar::UInt64(reply.start)))
46+
}
47+
}

src/query/expression/src/expression.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -516,8 +516,7 @@ impl<Index: ColumnIndex> Expr<Index> {
516516

517517
pub fn is_deterministic(&self, registry: &FunctionRegistry) -> bool {
518518
match self {
519-
Expr::Constant { .. } => true,
520-
Expr::ColumnRef { .. } => true,
519+
Expr::Constant { .. } | Expr::ColumnRef { .. } => true,
521520
Expr::Cast { expr, .. } => expr.is_deterministic(registry),
522521
Expr::FunctionCall { function, args, .. } => {
523522
!registry

src/query/functions/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,14 @@ pub fn is_builtin_function(name: &str) -> bool {
3939
|| GENERAL_WINDOW_FUNCTIONS.contains(&name)
4040
|| GENERAL_LAMBDA_FUNCTIONS.contains(&name)
4141
|| GENERAL_SEARCH_FUNCTIONS.contains(&name)
42+
|| ASYNC_FUNCTIONS.contains(&name)
4243
}
4344

4445
#[ctor]
4546
pub static BUILTIN_FUNCTIONS: FunctionRegistry = builtin_functions();
4647

48+
pub const ASYNC_FUNCTIONS: [&str; 1] = ["nextval"];
49+
4750
pub const GENERAL_WINDOW_FUNCTIONS: [&str; 13] = [
4851
"row_number",
4952
"rank",

src/query/service/src/interpreters/interpreter_sequence_drop.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::sync::Arc;
1616

17+
use databend_common_exception::ErrorCode;
1718
use databend_common_exception::Result;
1819
use databend_common_meta_app::schema::DropSequenceReq;
1920
use databend_common_sql::plans::DropSequencePlan;
@@ -51,7 +52,13 @@ impl Interpreter for DropSequenceInterpreter {
5152
if_exists: self.plan.if_exists,
5253
};
5354
let catalog = self.ctx.get_default_catalog()?;
54-
let _reply = catalog.drop_sequence(req).await?;
55+
let reply = catalog.drop_sequence(req).await?;
56+
if reply.prev.is_none() && !self.plan.if_exists {
57+
return Err(ErrorCode::UnknownSequence(format!(
58+
"unknown sequence {:?}",
59+
self.plan.ident.name()
60+
)));
61+
}
5562

5663
Ok(PipelineBuildResult::create())
5764
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_exception::Result;
16+
use databend_common_pipeline_core::processors::ProcessorPtr;
17+
use databend_common_sql::executor::physical_plans::AsyncFunction;
18+
19+
use crate::pipelines::processors::transforms::TransformSequenceNextval;
20+
use crate::pipelines::PipelineBuilder;
21+
22+
impl PipelineBuilder {
23+
pub(crate) fn build_async_function(&mut self, async_function: &AsyncFunction) -> Result<()> {
24+
self.build_pipeline(&async_function.input)?;
25+
26+
if async_function.func_name == "nextval" {
27+
self.main_pipeline.add_transform(|input, output| {
28+
Ok(ProcessorPtr::create(TransformSequenceNextval::try_create(
29+
input,
30+
output,
31+
self.ctx.clone(),
32+
&async_function.arguments[0],
33+
&async_function.return_type,
34+
)?))
35+
})
36+
} else {
37+
unreachable!()
38+
}
39+
}
40+
}

0 commit comments

Comments
 (0)