Skip to content

Commit 8c85ace

Browse files
authored
feat: add create or replace network policy support (#14658)
1 parent baed42d commit 8c85ace

File tree

12 files changed

+99
-46
lines changed

12 files changed

+99
-46
lines changed

src/query/ast/src/ast/statements/network_policy.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
use std::fmt::Display;
1616
use std::fmt::Formatter;
1717

18+
use databend_common_meta_app::schema::CreateOption;
19+
1820
#[derive(Debug, Clone, PartialEq)]
1921
pub struct CreateNetworkPolicyStmt {
20-
pub if_not_exists: bool,
22+
pub create_option: CreateOption,
2123
pub name: String,
2224
pub allowed_ip_list: Vec<String>,
2325
pub blocked_ip_list: Option<Vec<String>>,
@@ -26,9 +28,15 @@ pub struct CreateNetworkPolicyStmt {
2628

2729
impl Display for CreateNetworkPolicyStmt {
2830
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
29-
write!(f, "CREATE NETWORK POLICY ")?;
30-
if self.if_not_exists {
31-
write!(f, "IF NOT EXISTS ")?;
31+
write!(f, "CREATE ")?;
32+
if let CreateOption::CreateOrReplace = self.create_option {
33+
write!(f, "OR REPLACE ")?;
34+
}
35+
write!(f, "NETWORK POLICY ")?;
36+
if let CreateOption::CreateIfNotExists(if_not_exists) = self.create_option {
37+
if if_not_exists {
38+
write!(f, "IF NOT EXISTS ")?;
39+
}
3240
}
3341
write!(f, "{}", self.name)?;
3442
write!(f, " ALLOWED_IP_LIST = (")?;

src/query/ast/src/parser/statement.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1619,15 +1619,16 @@ pub fn statement(i: Input) -> IResult<StatementWithFormat> {
16191619
},
16201620
);
16211621

1622-
let create_network_policy = map(
1622+
let create_network_policy = map_res(
16231623
rule! {
1624-
CREATE ~ NETWORK ~ ^POLICY ~ ( IF ~ ^NOT ~ ^EXISTS )? ~ ^#ident
1624+
CREATE ~ (OR ~ REPLACE)? ~ NETWORK ~ ^POLICY ~ ( IF ~ ^NOT ~ ^EXISTS )? ~ ^#ident
16251625
~ ALLOWED_IP_LIST ~ ^Eq ~ ^"(" ~ ^#comma_separated_list0(literal_string) ~ ^")"
16261626
~ ( BLOCKED_IP_LIST ~ ^Eq ~ ^"(" ~ ^#comma_separated_list0(literal_string) ~ ^")" ) ?
16271627
~ ( COMMENT ~ ^Eq ~ ^#literal_string)?
16281628
},
16291629
|(
16301630
_,
1631+
opt_or_replace,
16311632
_,
16321633
_,
16331634
opt_if_not_exists,
@@ -1640,8 +1641,10 @@ pub fn statement(i: Input) -> IResult<StatementWithFormat> {
16401641
opt_blocked_ip_list,
16411642
opt_comment,
16421643
)| {
1644+
let create_option =
1645+
parse_create_option(opt_or_replace.is_some(), opt_if_not_exists.is_some())?;
16431646
let stmt = CreateNetworkPolicyStmt {
1644-
if_not_exists: opt_if_not_exists.is_some(),
1647+
create_option,
16451648
name: name.to_string(),
16461649
allowed_ip_list,
16471650
blocked_ip_list: match opt_blocked_ip_list {
@@ -1653,7 +1656,7 @@ pub fn statement(i: Input) -> IResult<StatementWithFormat> {
16531656
None => None,
16541657
},
16551658
};
1656-
Statement::CreateNetworkPolicy(stmt)
1659+
Ok(Statement::CreateNetworkPolicy(stmt))
16571660
},
16581661
);
16591662
let alter_network_policy = map(

src/query/ast/tests/it/parser.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,7 @@ fn test_statement() {
519519
r#"DROP VIRTUAL COLUMN FOR t"#,
520520
r#"REFRESH VIRTUAL COLUMN FOR t"#,
521521
r#"CREATE NETWORK POLICY mypolicy ALLOWED_IP_LIST=('192.168.10.0/24') BLOCKED_IP_LIST=('192.168.10.99') COMMENT='test'"#,
522+
r#"CREATE OR REPLACE NETWORK POLICY mypolicy ALLOWED_IP_LIST=('192.168.10.0/24') BLOCKED_IP_LIST=('192.168.10.99') COMMENT='test'"#,
522523
r#"ALTER NETWORK POLICY mypolicy SET ALLOWED_IP_LIST=('192.168.10.0/24','192.168.255.1') BLOCKED_IP_LIST=('192.168.1.99') COMMENT='test'"#,
523524
// tasks
524525
r#"CREATE TASK IF NOT EXISTS MyTask1 WAREHOUSE = 'MyWarehouse' SCHEDULE = 15 MINUTE SUSPEND_TASK_AFTER_NUM_FAILURES = 3 COMMENT = 'This is test task 1' DATABASE = 'target', TIMEZONE = 'America/Los Angeles' AS SELECT * FROM MyTable1"#,

src/query/ast/tests/it/testdata/statement.txt

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14839,7 +14839,33 @@ CREATE NETWORK POLICY mypolicy ALLOWED_IP_LIST = ('192.168.10.0/24') BLOCKED_IP_
1483914839
---------- AST ------------
1484014840
CreateNetworkPolicy(
1484114841
CreateNetworkPolicyStmt {
14842-
if_not_exists: false,
14842+
create_option: CreateIfNotExists(
14843+
false,
14844+
),
14845+
name: "mypolicy",
14846+
allowed_ip_list: [
14847+
"192.168.10.0/24",
14848+
],
14849+
blocked_ip_list: Some(
14850+
[
14851+
"192.168.10.99",
14852+
],
14853+
),
14854+
comment: Some(
14855+
"test",
14856+
),
14857+
},
14858+
)
14859+
14860+
14861+
---------- Input ----------
14862+
CREATE OR REPLACE NETWORK POLICY mypolicy ALLOWED_IP_LIST=('192.168.10.0/24') BLOCKED_IP_LIST=('192.168.10.99') COMMENT='test'
14863+
---------- Output ---------
14864+
CREATE OR REPLACE NETWORK POLICY mypolicy ALLOWED_IP_LIST = ('192.168.10.0/24') BLOCKED_IP_LIST = ('192.168.10.99') COMMENT = 'test'
14865+
---------- AST ------------
14866+
CreateNetworkPolicy(
14867+
CreateNetworkPolicyStmt {
14868+
create_option: CreateOrReplace,
1484314869
name: "mypolicy",
1484414870
allowed_ip_list: [
1484514871
"192.168.10.0/24",

src/query/management/src/network_policy/network_policy_api.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,17 @@
1414

1515
use databend_common_exception::Result;
1616
use databend_common_meta_app::principal::NetworkPolicy;
17+
use databend_common_meta_app::schema::CreateOption;
1718
use databend_common_meta_types::MatchSeq;
1819
use databend_common_meta_types::SeqV;
1920

2021
#[async_trait::async_trait]
2122
pub trait NetworkPolicyApi: Sync + Send {
22-
async fn add_network_policy(&self, network_policy: NetworkPolicy) -> Result<u64>;
23+
async fn add_network_policy(
24+
&self,
25+
network_policy: NetworkPolicy,
26+
create_option: &CreateOption,
27+
) -> Result<()>;
2328

2429
async fn update_network_policy(
2530
&self,

src/query/management/src/network_policy/network_policy_mgr.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use databend_common_base::base::escape_for_key;
1818
use databend_common_exception::ErrorCode;
1919
use databend_common_exception::Result;
2020
use databend_common_meta_app::principal::NetworkPolicy;
21+
use databend_common_meta_app::schema::CreateOption;
2122
use databend_common_meta_kvapi::kvapi;
2223
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
2324
use databend_common_meta_types::MatchSeq;
@@ -67,8 +68,12 @@ impl NetworkPolicyMgr {
6768
impl NetworkPolicyApi for NetworkPolicyMgr {
6869
#[async_backtrace::framed]
6970
#[minitrace::trace]
70-
async fn add_network_policy(&self, network_policy: NetworkPolicy) -> Result<u64> {
71-
let match_seq = MatchSeq::Exact(0);
71+
async fn add_network_policy(
72+
&self,
73+
network_policy: NetworkPolicy,
74+
create_option: &CreateOption,
75+
) -> Result<()> {
76+
let seq = MatchSeq::from(*create_option);
7277
let key = self.make_network_policy_key(network_policy.name.as_str())?;
7378
let value = Operation::Update(serialize_struct(
7479
&network_policy,
@@ -77,16 +82,20 @@ impl NetworkPolicyApi for NetworkPolicyMgr {
7782
)?);
7883

7984
let kv_api = self.kv_api.clone();
80-
let upsert_kv = kv_api.upsert_kv(UpsertKVReq::new(&key, match_seq, value, None));
85+
let res = kv_api
86+
.upsert_kv(UpsertKVReq::new(&key, seq, value, None))
87+
.await?;
8188

82-
let res_seq = upsert_kv.await?.added_seq_or_else(|_v| {
83-
ErrorCode::NetworkPolicyAlreadyExists(format!(
84-
"Network policy '{}' already exists.",
85-
network_policy.name
86-
))
87-
})?;
89+
if let CreateOption::CreateIfNotExists(false) = create_option {
90+
if res.prev.is_some() {
91+
return Err(ErrorCode::NetworkPolicyAlreadyExists(format!(
92+
"Network policy '{}' already exists.",
93+
network_policy.name
94+
)));
95+
}
96+
}
8897

89-
Ok(res_seq)
98+
Ok(())
9099
}
91100

92101
#[async_backtrace::framed]

src/query/service/src/interpreters/interpreter_network_policy_create.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ impl Interpreter for CreateNetworkPolicyInterpreter {
6262
update_on: None,
6363
};
6464
user_mgr
65-
.add_network_policy(&tenant, network_policy, plan.if_not_exists)
65+
.add_network_policy(&tenant, network_policy, &plan.create_option)
6666
.await?;
6767

6868
Ok(PipelineBuildResult::create())

src/query/sql/src/planner/binder/ddl/network_policy.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ impl Binder {
3232
stmt: &CreateNetworkPolicyStmt,
3333
) -> Result<Plan> {
3434
let CreateNetworkPolicyStmt {
35-
if_not_exists,
35+
create_option,
3636
name,
3737
allowed_ip_list,
3838
blocked_ip_list,
@@ -60,7 +60,7 @@ impl Binder {
6060

6161
let tenant = self.ctx.get_tenant();
6262
let plan = CreateNetworkPolicyPlan {
63-
if_not_exists: *if_not_exists,
63+
create_option: *create_option,
6464
tenant,
6565
name: name.to_string(),
6666
allowed_ip_list: allowed_ip_list.clone(),

src/query/sql/src/planner/plans/ddl/account.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ pub struct RevokePrivilegePlan {
129129

130130
#[derive(Clone, Debug, PartialEq)]
131131
pub struct CreateNetworkPolicyPlan {
132-
pub if_not_exists: bool,
132+
pub create_option: CreateOption,
133133
pub tenant: String,
134134
pub name: String,
135135
pub allowed_ip_list: Vec<String>,

src/query/users/src/network_policy.rs

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use databend_common_exception::ErrorCode;
1717
use databend_common_exception::Result;
1818
use databend_common_management::NetworkPolicyApi;
1919
use databend_common_meta_app::principal::NetworkPolicy;
20+
use databend_common_meta_app::schema::CreateOption;
2021
use databend_common_meta_types::MatchSeq;
2122

2223
use crate::UserApiProvider;
@@ -28,28 +29,12 @@ impl UserApiProvider {
2829
&self,
2930
tenant: &str,
3031
network_policy: NetworkPolicy,
31-
if_not_exists: bool,
32-
) -> Result<u64> {
33-
if if_not_exists
34-
&& self
35-
.exists_network_policy(tenant, network_policy.name.as_str())
36-
.await?
37-
{
38-
return Ok(0);
39-
}
40-
32+
create_option: &CreateOption,
33+
) -> Result<()> {
4134
let client = self.get_network_policy_api_client(tenant)?;
42-
let add_network_policy = client.add_network_policy(network_policy);
43-
match add_network_policy.await {
44-
Ok(res) => Ok(res),
45-
Err(e) => {
46-
if if_not_exists && e.code() == ErrorCode::NETWORK_POLICY_ALREADY_EXISTS {
47-
Ok(0)
48-
} else {
49-
Err(e.add_message_back("(while add network policy)"))
50-
}
51-
}
52-
}
35+
client
36+
.add_network_policy(network_policy, create_option)
37+
.await
5338
}
5439

5540
// Update network policy.

0 commit comments

Comments
 (0)