From 2bc021d729da0929b5efe45e75f61aec62b0df8d Mon Sep 17 00:00:00 2001 From: taichong Date: Thu, 9 May 2024 19:51:51 +0800 Subject: [PATCH 1/8] feat(query): task support ownership --- src/common/exception/src/exception_code.rs | 3 + .../app/src/principal/ownership_object.rs | 11 +++- .../tenant_ownership_object_ident.rs | 16 +++++ src/meta/app/src/principal/user_grant.rs | 11 +++- src/meta/app/src/principal/user_privilege.rs | 16 ++++- .../src/ownership_from_to_protobuf_impl.rs | 8 +++ .../src/user_from_to_protobuf_impl.rs | 6 ++ src/meta/proto-conv/src/util.rs | 1 + src/meta/proto-conv/tests/it/main.rs | 2 +- .../{v090_role_info.rs => v091_role_info.rs} | 6 +- src/meta/protos/proto/ownership.proto | 5 ++ src/meta/protos/proto/user.proto | 5 ++ src/query/ast/src/ast/statements/user.rs | 3 + src/query/ast/src/parser/statement.rs | 24 ++++++- src/query/ast/tests/it/parser.rs | 10 +++ src/query/management/src/role/role_mgr.rs | 1 + .../interpreters/access/privilege_access.rs | 31 +++++++++- .../service/src/interpreters/common/grant.rs | 62 ++++++++++++------- .../interpreter_privilege_grant.rs | 3 + .../interpreters/interpreter_show_grants.rs | 6 ++ .../interpreters/interpreter_task_create.rs | 18 ++++++ .../src/interpreters/interpreter_task_drop.rs | 25 ++++++++ .../sql/src/planner/binder/ddl/account.rs | 2 + src/query/users/src/visibility_checker.rs | 30 +++++++++ .../0_stateless/18_rbac/18_0011_task.result | 0 .../0_stateless/18_rbac/18_0011_task.sh | 8 +++ 26 files changed, 281 insertions(+), 32 deletions(-) rename src/meta/proto-conv/tests/it/{v090_role_info.rs => v091_role_info.rs} (91%) create mode 100644 tests/suites/0_stateless/18_rbac/18_0011_task.result create mode 100644 tests/suites/0_stateless/18_rbac/18_0011_task.sh diff --git a/src/common/exception/src/exception_code.rs b/src/common/exception/src/exception_code.rs index 32403c2fb0f11..f19a5607800d4 100644 --- a/src/common/exception/src/exception_code.rs +++ b/src/common/exception/src/exception_code.rs @@ -370,6 +370,9 @@ build_exceptions! { // sequence SequenceError(3101), + + // Task + UnknownTask(3201), } // Storage errors [3001, 4000]. diff --git a/src/meta/app/src/principal/ownership_object.rs b/src/meta/app/src/principal/ownership_object.rs index 7a6e660e96898..0bd7f3cdae475 100644 --- a/src/meta/app/src/principal/ownership_object.rs +++ b/src/meta/app/src/principal/ownership_object.rs @@ -50,6 +50,10 @@ pub enum OwnershipObject { UDF { name: String, }, + + Task { + name: String, + }, } impl OwnershipObject { @@ -97,6 +101,7 @@ impl KeyCodec for OwnershipObject { } OwnershipObject::Stage { name } => b.push_raw("stage-by-name").push_str(name), OwnershipObject::UDF { name } => b.push_raw("udf-by-name").push_str(name), + OwnershipObject::Task { name } => b.push_raw("task-by-name").push_str(name), } } @@ -143,9 +148,13 @@ impl KeyCodec for OwnershipObject { let name = p.next_str()?; Ok(OwnershipObject::UDF { name }) } + "task-by-name" => { + let name = p.next_str()?; + Ok(OwnershipObject::Task { name }) + } _ => Err(kvapi::KeyError::InvalidSegment { i: p.index(), - expect: "database-by-id|database-by-catalog-id|table-by-id|table-by-catalog-id|stage-by-name|udf-by-name" + expect: "database-by-id|database-by-catalog-id|table-by-id|table-by-catalog-id|stage-by-name|udf-by-name|task-by-name" .to_string(), got: q.to_string(), }), diff --git a/src/meta/app/src/principal/tenant_ownership_object_ident.rs b/src/meta/app/src/principal/tenant_ownership_object_ident.rs index 98be486bc969b..d3787846c71f2 100644 --- a/src/meta/app/src/principal/tenant_ownership_object_ident.rs +++ b/src/meta/app/src/principal/tenant_ownership_object_ident.rs @@ -252,6 +252,22 @@ mod tests { let parsed = TenantOwnershipObjectIdent::from_str_key(&key).unwrap(); assert_eq!(role_grantee, parsed); } + + // udf + { + let role_grantee = TenantOwnershipObjectIdent::new_unchecked( + Tenant::new_literal("test"), + OwnershipObject::Task { + name: "t1".to_string(), + }, + ); + + let key = role_grantee.to_string_key(); + assert_eq!("__fd_object_owners/test/task-by-name/t1", key); + + let parsed = TenantOwnershipObjectIdent::from_str_key(&key).unwrap(); + assert_eq!(role_grantee, parsed); + } } #[test] diff --git a/src/meta/app/src/principal/user_grant.rs b/src/meta/app/src/principal/user_grant.rs index 7ba3df94a0844..42e1135fa069c 100644 --- a/src/meta/app/src/principal/user_grant.rs +++ b/src/meta/app/src/principal/user_grant.rs @@ -30,6 +30,7 @@ pub enum GrantObject { TableById(String, u64, u64), UDF(String), Stage(String), + Task(String), } impl GrantObject { @@ -62,6 +63,7 @@ impl GrantObject { (GrantObject::Table(_, _, _), _) => false, (GrantObject::Stage(lstage), GrantObject::Stage(rstage)) => lstage == rstage, (GrantObject::UDF(udf), GrantObject::UDF(rudf)) => udf == rudf, + (GrantObject::Task(task), GrantObject::Task(rtask)) => task == rtask, _ => false, } } @@ -82,12 +84,18 @@ impl GrantObject { GrantObject::Stage(_) => { UserPrivilegeSet::available_privileges_on_stage(available_ownership) } + GrantObject::Task(_) => { + UserPrivilegeSet::available_privileges_on_task(available_ownership) + } } } pub fn catalog(&self) -> Option { match self { - GrantObject::Global | GrantObject::Stage(_) | GrantObject::UDF(_) => None, + GrantObject::Global + | GrantObject::Stage(_) + | GrantObject::UDF(_) + | GrantObject::Task(_) => None, GrantObject::Database(cat, _) | GrantObject::DatabaseById(cat, _) => Some(cat.clone()), GrantObject::Table(cat, _, _) | GrantObject::TableById(cat, _, _) => Some(cat.clone()), } @@ -108,6 +116,7 @@ impl fmt::Display for GrantObject { } GrantObject::UDF(udf) => write!(f, "UDF {udf}"), GrantObject::Stage(stage) => write!(f, "STAGE {stage}"), + GrantObject::Task(task) => write!(f, "task {task}"), } } } diff --git a/src/meta/app/src/principal/user_privilege.rs b/src/meta/app/src/principal/user_privilege.rs index 4f7374fabef32..c290e8615bff0 100644 --- a/src/meta/app/src/principal/user_privilege.rs +++ b/src/meta/app/src/principal/user_privilege.rs @@ -76,6 +76,8 @@ pub enum UserPrivilegeType { Write = 1 << 19, // Privilege to Create database CreateDatabase = 1 << 20, + // Privilege to Create task + CreateTask = 1 << 21, // Discard Privilege Type Set = 1 << 4, } @@ -102,6 +104,7 @@ const ALL_PRIVILEGES: BitFlags = make_bitflags!( | Read | Write | CreateDatabase + | CreateTask } ); @@ -129,6 +132,7 @@ impl Display for UserPrivilegeType { UserPrivilegeType::Read => "Read", UserPrivilegeType::Write => "Write", UserPrivilegeType::CreateDatabase => "CREATE DATABASE", + UserPrivilegeType::CreateTask => "CREATE TASK", }) } } @@ -199,10 +203,12 @@ impl UserPrivilegeSet { let database_privs = Self::available_privileges_on_database(false); let stage_privs_without_ownership = Self::available_privileges_on_stage(false); let udf_privs_without_ownership = Self::available_privileges_on_udf(false); - let privs = make_bitflags!(UserPrivilegeType::{ Usage | Super | CreateUser | DropUser | CreateRole | DropRole | CreateDatabase | Grant | CreateDataMask }); + let task_privs_without_ownership = Self::available_privileges_on_task(false); + let privs = make_bitflags!(UserPrivilegeType::{ Usage | Super | CreateUser | DropUser | CreateRole | DropRole | CreateDatabase | Grant | CreateDataMask | CreateTask }); (database_privs.privileges | privs | stage_privs_without_ownership.privileges + | task_privs_without_ownership.privileges | udf_privs_without_ownership.privileges) .into() } @@ -240,6 +246,14 @@ impl UserPrivilegeSet { } } + pub fn available_privileges_on_task(available_ownership: bool) -> Self { + if available_ownership { + make_bitflags!(UserPrivilegeType::{ Drop | Alter | Ownership }).into() + } else { + make_bitflags!(UserPrivilegeType::{ Drop | Alter }).into() + } + } + // TODO: remove this, as ALL has different meanings on different objects pub fn all_privileges() -> Self { ALL_PRIVILEGES.into() diff --git a/src/meta/proto-conv/src/ownership_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/ownership_from_to_protobuf_impl.rs index 3e8aa2c34505b..7dda2fe22f784 100644 --- a/src/meta/proto-conv/src/ownership_from_to_protobuf_impl.rs +++ b/src/meta/proto-conv/src/ownership_from_to_protobuf_impl.rs @@ -82,6 +82,9 @@ impl FromToProto for mt::principal::OwnershipObject { Some(pb::ownership_object::Object::Stage( pb::ownership_object::OwnershipStageObject { stage }, )) => Ok(mt::principal::OwnershipObject::Stage { name: stage }), + Some(pb::ownership_object::Object::Task( + pb::ownership_object::OwnershipTaskObject { task }, + )) => Ok(mt::principal::OwnershipObject::Task { name: task }), _ => Err(Incompatible { reason: "OwnershipObject cannot be None".to_string(), }), @@ -115,6 +118,11 @@ impl FromToProto for mt::principal::OwnershipObject { pb::ownership_object::OwnershipUdfObject { udf: name.clone() }, )) } + mt::principal::OwnershipObject::Task { name } => { + Some(pb::ownership_object::Object::Task( + pb::ownership_object::OwnershipTaskObject { task: name.clone() }, + )) + } mt::principal::OwnershipObject::Stage { name } => Some( pb::ownership_object::Object::Stage(pb::ownership_object::OwnershipStageObject { stage: name.clone(), diff --git a/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs index 92c9c34e4c8f0..06ad04f3e9ff3 100644 --- a/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs +++ b/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs @@ -180,6 +180,9 @@ impl FromToProto for mt::principal::GrantObject { Some(pb::grant_object::Object::Stage(pb::grant_object::GrantStageObject { stage })) => { Ok(mt::principal::GrantObject::Stage(stage)) } + Some(pb::grant_object::Object::Task(pb::grant_object::GrantTaskObject { task })) => { + Ok(mt::principal::GrantObject::Task(task)) + } _ => Err(Incompatible { reason: "GrantObject cannot be None".to_string(), }), @@ -225,6 +228,9 @@ impl FromToProto for mt::principal::GrantObject { stage: stage.clone(), }, )), + mt::principal::GrantObject::Task(task) => Some(pb::grant_object::Object::Task( + pb::grant_object::GrantTaskObject { task: task.clone() }, + )), }; Ok(pb::GrantObject { ver: VER, diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs index 40de0a8d141c1..b1b125e7e9455 100644 --- a/src/meta/proto-conv/src/util.rs +++ b/src/meta/proto-conv/src/util.rs @@ -120,6 +120,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[ (88, "2024-04-17: Add: SequenceMeta"), (89, "2024-04-19: Add: geometry_output_format settings"), (90, "2024-05-13: Refactor: After reader_check_msg success, RoleInfo::from_pb should not return err"), + (91, "2024-05-09: Add: GrantTaskObject"), // Dear developer: // If you're gonna add a new metadata version, you'll have to add a test for it. // You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`) diff --git a/src/meta/proto-conv/tests/it/main.rs b/src/meta/proto-conv/tests/it/main.rs index 719879c148462..f8c69ee97bb0b 100644 --- a/src/meta/proto-conv/tests/it/main.rs +++ b/src/meta/proto-conv/tests/it/main.rs @@ -92,4 +92,4 @@ mod v086_table_index; mod v087_user_option_disabled; mod v088_sequence_meta; mod v089_geometry_output_format; -mod v090_role_info; +mod v091_role_info; diff --git a/src/meta/proto-conv/tests/it/v090_role_info.rs b/src/meta/proto-conv/tests/it/v091_role_info.rs similarity index 91% rename from src/meta/proto-conv/tests/it/v090_role_info.rs rename to src/meta/proto-conv/tests/it/v091_role_info.rs index 98eaa884b2e03..6d26cc1d463d8 100644 --- a/src/meta/proto-conv/tests/it/v090_role_info.rs +++ b/src/meta/proto-conv/tests/it/v091_role_info.rs @@ -31,8 +31,8 @@ use crate::common; // #[test] -fn test_decode_v90_role() -> anyhow::Result<()> { - let role_info_v90 = vec![ +fn test_decode_v91_role() -> anyhow::Result<()> { + let role_info_v91 = vec![ 10, 2, 114, 49, 18, 6, 160, 6, 90, 168, 6, 24, 160, 6, 90, 168, 6, 24, ]; @@ -41,7 +41,7 @@ fn test_decode_v90_role() -> anyhow::Result<()> { grants: UserGrantSet::new(vec![], HashSet::new()), }; common::test_pb_from_to(func_name!(), want())?; - common::test_load_old(func_name!(), role_info_v90.as_slice(), 90, want())?; + common::test_load_old(func_name!(), role_info_v91.as_slice(), 90, want())?; Ok(()) } diff --git a/src/meta/protos/proto/ownership.proto b/src/meta/protos/proto/ownership.proto index 227b7239cd1d4..51b53ebbeb4e0 100644 --- a/src/meta/protos/proto/ownership.proto +++ b/src/meta/protos/proto/ownership.proto @@ -46,10 +46,15 @@ message OwnershipObject { string stage = 1; } + message OwnershipTaskObject { + string task = 1; + } + oneof object { OwnershipDatabaseObject database = 1; OwnershipTableObject table = 2; OwnershipUdfObject udf = 3; OwnershipStageObject stage = 4; + OwnershipTaskObject task = 5; } } diff --git a/src/meta/protos/proto/user.proto b/src/meta/protos/proto/user.proto index 7d8190e74462f..9692557a27cd2 100644 --- a/src/meta/protos/proto/user.proto +++ b/src/meta/protos/proto/user.proto @@ -72,6 +72,10 @@ message GrantObject { string udf = 1; } + message GrantTaskObject { + string task = 1; + } + message GrantStageObject { string stage = 1; } @@ -84,6 +88,7 @@ message GrantObject { GrantStageObject stage = 5; GrantDatabaseIdObject databasebyid = 6; GrantTableIdObject tablebyid = 7; + GrantTaskObject task = 8; } } diff --git a/src/query/ast/src/ast/statements/user.rs b/src/query/ast/src/ast/statements/user.rs index 6e844eaa4d46f..bb17be94bfdad 100644 --- a/src/query/ast/src/ast/statements/user.rs +++ b/src/query/ast/src/ast/statements/user.rs @@ -176,6 +176,7 @@ impl Display for AccountMgrSource { } AccountMgrLevel::UDF(udf) => write!(f, " UDF {udf}")?, AccountMgrLevel::Stage(stage) => write!(f, " STAGE {stage}")?, + AccountMgrLevel::Task(task) => write!(f, " TASK {task}")?, } } AccountMgrSource::ALL { level, .. } => { @@ -199,6 +200,7 @@ impl Display for AccountMgrSource { } AccountMgrLevel::UDF(udf) => write!(f, " UDF {udf}")?, AccountMgrLevel::Stage(stage) => write!(f, " STAGE {stage}")?, + AccountMgrLevel::Task(task) => write!(f, " TASK {task}")?, } } } @@ -213,6 +215,7 @@ pub enum AccountMgrLevel { Table(#[drive(skip)] Option, #[drive(skip)] String), UDF(#[drive(skip)] String), Stage(#[drive(skip)] String), + Task(#[drive(skip)] String), } #[derive(Debug, Clone, PartialEq, Eq, Drive, DriveMut)] diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index f143fb536f0e7..b4435e8bd3368 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -2826,11 +2826,22 @@ pub fn grant_source(i: Input) -> IResult { }, ); + let task_privs = map( + rule! { + #comma_separated_list1(task_priv_type) ~ ON ~ TASK ~ #ident + }, + |(privileges, _, _, task_name)| AccountMgrSource::Privs { + privileges, + level: AccountMgrLevel::Task(task_name.to_string()), + }, + ); + rule!( #role : "ROLE " | #udf_privs: "USAGE ON UDF " | #privs : " ON " | #stage_privs : " ON STAGE " + | #task_privs : " ON TASK " | #udf_all_privs: "ALL [ PRIVILEGES ] ON UDF " | #all : "ALL [ PRIVILEGES ] ON " )(i) @@ -2850,6 +2861,7 @@ pub fn priv_type(i: Input) -> IResult { UserPrivilegeType::CreateDatabase, rule! { CREATE ~ DATABASE }, ), + value(UserPrivilegeType::CreateDatabase, rule! { CREATE ~ TASK }), value(UserPrivilegeType::DropUser, rule! { DROP ~ USER }), value(UserPrivilegeType::CreateRole, rule! { CREATE ~ ROLE }), value(UserPrivilegeType::DropRole, rule! { DROP ~ ROLE }), @@ -2868,6 +2880,13 @@ pub fn stage_priv_type(i: Input) -> IResult { ))(i) } +pub fn task_priv_type(i: Input) -> IResult { + alt(( + value(UserPrivilegeType::Drop, rule! { DROP }), + value(UserPrivilegeType::Alter, rule! { ALTER }), + ))(i) +} + pub fn priv_share_type(i: Input) -> IResult { alt(( value(ShareGrantObjectPrivilege::Usage, rule! { USAGE }), @@ -2991,10 +3010,12 @@ pub fn grant_ownership_level(i: Input) -> IResult { enum Object { Stage, Udf, + Task, } let object = alt(( value(Object::Udf, rule! { UDF }), value(Object::Stage, rule! { STAGE }), + value(Object::Task, rule! { TASK }), )); // Object object_name @@ -3003,13 +3024,14 @@ pub fn grant_ownership_level(i: Input) -> IResult { |(object, object_name)| match object { Object::Stage => AccountMgrLevel::Stage(object_name.to_string()), Object::Udf => AccountMgrLevel::UDF(object_name.to_string()), + Object::Task => AccountMgrLevel::Task(object_name.to_string()), }, ); rule!( #db : ".*" | #table : "." - | #object : "STAGE | UDF " + | #object : "STAGE | UDF | Task " )(i) } diff --git a/src/query/ast/tests/it/parser.rs b/src/query/ast/tests/it/parser.rs index aadde37734a4d..581af68be2427 100644 --- a/src/query/ast/tests/it/parser.rs +++ b/src/query/ast/tests/it/parser.rs @@ -659,6 +659,11 @@ fn test_statement() { SELECT avg(a), d FROM db.t GROUP BY d "#, // tasks + r#"GRANT CREATE TASK ON *.* TO role role1"#, + r#"GRANT ownership ON task MyTask1 TO role role1"#, + r#"GRANT ownership ON task MyTask1 TO role role2"#, + r#"GRANT drop ON task MyTask1 TO u2"#, + r#"GRANT alter ON task MyTask1 TO u1"#, r#"CREATE TASK IF NOT EXISTS MyTask1 WAREHOUSE = 'MyWarehouse' SCHEDULE = 15 MINUTE SUSPEND_TASK_AFTER_NUM_FAILURES = 3 ERROR_INTEGRATION = 'notification_name' COMMENT = 'This is test task 1' DATABASE = 'target', TIMEZONE = 'America/Los Angeles' AS SELECT * FROM MyTable1"#, r#"CREATE TASK IF NOT EXISTS MyTask1 WAREHOUSE = 'MyWarehouse' SCHEDULE = 15 SECOND SUSPEND_TASK_AFTER_NUM_FAILURES = 3 COMMENT = 'This is test task 1' AS SELECT * FROM MyTable1"#, r#"CREATE TASK IF NOT EXISTS MyTask1 WAREHOUSE = 'MyWarehouse' SCHEDULE = 1215 SECOND SUSPEND_TASK_AFTER_NUM_FAILURES = 3 COMMENT = 'This is test task 1' AS SELECT * FROM MyTable1"#, @@ -911,6 +916,11 @@ fn test_statement_error() { r#"drop table :a"#, r#"drop table IDENTIFIER(a)"#, r#"drop table IDENTIFIER(:a)"#, + // task + r#"GRANT CREATE TASK ON task mytask1 TO role role1"#, + r#"GRANT ownership ON task MyTask1 TO u1"#, + r#"GRANT select ON task MyTask1 TO u2"#, + r#"GRANT usage ON task MyTask1 TO u1"#, ]; for case in cases { diff --git a/src/query/management/src/role/role_mgr.rs b/src/query/management/src/role/role_mgr.rs index 5327709cf466d..d3383df0fc795 100644 --- a/src/query/management/src/role/role_mgr.rs +++ b/src/query/management/src/role/role_mgr.rs @@ -504,5 +504,6 @@ fn convert_to_grant_obj(owner_obj: &OwnershipObject) -> GrantObject { } => GrantObject::TableById(catalog_name.to_string(), *db_id, *table_id), OwnershipObject::Stage { name } => GrantObject::Stage(name.to_string()), OwnershipObject::UDF { name } => GrantObject::UDF(name.to_string()), + OwnershipObject::Task { name } => GrantObject::Task(name.to_string()), } } diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index c6a3fd7398de8..07c7a46acedaa 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -142,6 +142,9 @@ impl PrivilegeAccess { GrantObject::UDF(name) => OwnershipObject::UDF { name: name.to_string(), }, + GrantObject::Task(name) => OwnershipObject::Task { + name: name.to_string(), + }, GrantObject::Global => return Ok(None), }; @@ -364,6 +367,7 @@ impl PrivilegeAccess { | GrantObject::Table(_, _, _) | GrantObject::DatabaseById(_, _) | GrantObject::UDF(_) + | GrantObject::Task(_) | GrantObject::Stage(_) | GrantObject::TableById(_, _, _) => true, GrantObject::Global => false, @@ -394,6 +398,7 @@ impl PrivilegeAccess { GrantObject::DatabaseById(_, _) => Err(ErrorCode::PermissionDenied("")), GrantObject::Global | GrantObject::UDF(_) + | GrantObject::Task(_) | GrantObject::Stage(_) | GrantObject::Database(_, _) | GrantObject::Table(_, _, _) => Err(ErrorCode::PermissionDenied(format!( @@ -449,6 +454,17 @@ impl PrivilegeAccess { Ok(()) } + async fn validate_task_access( + &self, + task: &str, + privilege: UserPrivilegeType, + ) -> Result<()> { + self.validate_access(&GrantObject::Task(task.to_owned()), privilege) + .await?; + + Ok(()) + } + async fn convert_to_id( &self, tenant: &Tenant, @@ -616,6 +632,16 @@ impl AccessChecker for PrivilegeAccess { self.validate_access(&GrantObject::Global, UserPrivilegeType::CreateDatabase) .await?; } + Plan::CreateTask(_) => { + self.validate_access(&GrantObject::Global, UserPrivilegeType::CreateTask) + .await?; + } + Plan::AlterTask(plan) => { + self.validate_task_access(&plan.task_name, UserPrivilegeType::Alter).await?; + } + Plan::DropTask(plan) => { + self.validate_task_access(&plan.task_name, UserPrivilegeType::Drop).await?; + } Plan::DropDatabase(plan) => { self.validate_db_access(&plan.catalog, &plan.database, UserPrivilegeType::Drop, plan.if_exists).await?; } @@ -1078,12 +1104,11 @@ impl AccessChecker for PrivilegeAccess { | Plan::DropNotification(_) | Plan::DescNotification(_) | Plan::AlterNotification(_) - | Plan::CreateTask(_) // TODO: need to build ownership info for task + //| Plan::CreateTask(_) // TODO: need to build ownership info for task + //| Plan::DropTask(_) // TODO: need to build ownership info for task | Plan::ShowTasks(_) // TODO: need to build ownership info for task | Plan::DescribeTask(_) // TODO: need to build ownership info for task | Plan::ExecuteTask(_) // TODO: need to build ownership info for task - | Plan::DropTask(_) // TODO: need to build ownership info for task - | Plan::AlterTask(_) | Plan::CreateSequence(_) | Plan::DropSequence(_) => { self.validate_access(&GrantObject::Global, UserPrivilegeType::Super) diff --git a/src/query/service/src/interpreters/common/grant.rs b/src/query/service/src/interpreters/common/grant.rs index fb675b6284e44..e1ebcd810db52 100644 --- a/src/query/service/src/interpreters/common/grant.rs +++ b/src/query/service/src/interpreters/common/grant.rs @@ -15,10 +15,16 @@ use std::sync::Arc; use databend_common_catalog::table_context::TableContext; +use databend_common_cloud_control::client_config::make_request; +use databend_common_cloud_control::cloud_api::CloudControlApiProvider; +use databend_common_cloud_control::pb::DescribeTaskRequest; +use databend_common_config::GlobalConfig; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::principal::GrantObject; use databend_common_users::UserApiProvider; +use crate::interpreters::common::get_task_client_config; use crate::sessions::QueryContext; #[async_backtrace::framed] @@ -39,7 +45,7 @@ pub async fn validate_grant_object_exists( .exists_table(&tenant, database_name, table_name) .await? { - return Err(databend_common_exception::ErrorCode::UnknownTable(format!( + return Err(ErrorCode::UnknownTable(format!( "table `{}`.`{}` not exists in catalog '{}'", database_name, table_name, catalog_name, ))); @@ -48,49 +54,63 @@ pub async fn validate_grant_object_exists( GrantObject::Database(catalog_name, database_name) => { let catalog = ctx.get_catalog(catalog_name).await?; if !catalog.exists_database(&tenant, database_name).await? { - return Err(databend_common_exception::ErrorCode::UnknownDatabase( - format!("database {} not exists", database_name,), - )); + return Err(ErrorCode::UnknownDatabase(format!( + "database {} not exists", + database_name, + ))); } } GrantObject::DatabaseById(catalog_name, db_id) => { let catalog = ctx.get_catalog(catalog_name).await?; if catalog.get_db_name_by_id(*db_id).await.is_err() { - return Err(databend_common_exception::ErrorCode::UnknownDatabaseId( - format!( - "database id {} not exists in catalog {}", - db_id, catalog_name - ), - )); + return Err(ErrorCode::UnknownDatabaseId(format!( + "database id {} not exists in catalog {}", + db_id, catalog_name + ))); } } GrantObject::TableById(catalog_name, db_id, table_id) => { let catalog = ctx.get_catalog(catalog_name).await?; if catalog.get_table_meta_by_id(*table_id).await?.is_none() { - return Err(databend_common_exception::ErrorCode::UnknownTableId( - format!( - "table id `{}`.`{}` not exists in catalog '{}'", - db_id, table_id, catalog_name, - ), - )); + return Err(ErrorCode::UnknownTableId(format!( + "table id `{}`.`{}` not exists in catalog '{}'", + db_id, table_id, catalog_name, + ))); } } GrantObject::UDF(udf) => { if !UserApiProvider::instance().exists_udf(&tenant, udf).await? { - return Err(databend_common_exception::ErrorCode::UnknownFunction( - format!("udf {udf} not exists"), + return Err(ErrorCode::UnknownFunction(format!("udf {udf} not exists"))); + } + } + GrantObject::Task(task) => { + let config = GlobalConfig::instance(); + if config.query.cloud_control_grpc_server_address.is_none() { + return Err(ErrorCode::CloudControlNotEnabled( + "cannot describe task without cloud control enabled, please set cloud_control_grpc_server_address in config", )); } + let cloud_api = CloudControlApiProvider::instance(); + let task_client = cloud_api.get_task_client(); + let req = DescribeTaskRequest { + task_name: task.to_string(), + tenant_id: tenant.tenant_name().to_string(), + if_exist: false, + }; + let config = get_task_client_config(ctx.clone(), cloud_api.get_timeout())?; + let req = make_request(req, config); + let resp = task_client.describe_task(req).await?; + if resp.task.is_none() { + return Err(ErrorCode::UnknownTask(format!("task {task} not exists"))); + } } GrantObject::Stage(stage) => { if !UserApiProvider::instance() .exists_stage(&ctx.get_tenant(), stage) .await? { - return Err(databend_common_exception::ErrorCode::UnknownStage(format!( - "stage {stage} not exists" - ))); + return Err(ErrorCode::UnknownStage(format!("stage {stage} not exists"))); } } GrantObject::Global => (), diff --git a/src/query/service/src/interpreters/interpreter_privilege_grant.rs b/src/query/service/src/interpreters/interpreter_privilege_grant.rs index 07156e7ef5191..0a901c7737307 100644 --- a/src/query/service/src/interpreters/interpreter_privilege_grant.rs +++ b/src/query/service/src/interpreters/interpreter_privilege_grant.rs @@ -101,6 +101,9 @@ impl GrantPrivilegeInterpreter { GrantObject::UDF(name) => Ok(OwnershipObject::UDF { name: name.to_string(), }), + GrantObject::Task(name) => Ok(OwnershipObject::Task { + name: name.to_string(), + }), GrantObject::Global => Err(ErrorCode::IllegalGrant( "Illegal GRANT/REVOKE command; please consult the manual to see which privileges can be used", )), diff --git a/src/query/service/src/interpreters/interpreter_show_grants.rs b/src/query/service/src/interpreters/interpreter_show_grants.rs index 38fb16e4240cf..1cdaf66bca871 100644 --- a/src/query/service/src/interpreters/interpreter_show_grants.rs +++ b/src/query/service/src/interpreters/interpreter_show_grants.rs @@ -174,6 +174,12 @@ impl Interpreter for ShowGrantsInterpreter { privileges.push(get_priv_str(&grant_entry)); grant_list.push(format!("{} TO {}", grant_entry, identity)); } + GrantObject::Task(task_name) => { + object_name.push(task_name.to_string()); + object_id.push(None); + privileges.push(get_priv_str(&grant_entry)); + grant_list.push(format!("{} TO {}", grant_entry, identity)); + } GrantObject::Global => { // grant all on *.* to a object_name.push("*.*".to_string()); diff --git a/src/query/service/src/interpreters/interpreter_task_create.rs b/src/query/service/src/interpreters/interpreter_task_create.rs index 813781156c92a..1ae6a0e8bcde6 100644 --- a/src/query/service/src/interpreters/interpreter_task_create.rs +++ b/src/query/service/src/interpreters/interpreter_task_create.rs @@ -23,7 +23,11 @@ use databend_common_cloud_control::pb::CreateTaskRequest; use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_management::RoleApi; +use databend_common_meta_app::principal::OwnershipObject; use databend_common_sql::plans::CreateTaskPlan; +use databend_common_users::RoleCacheManager; +use databend_common_users::UserApiProvider; use crate::interpreters::common::get_task_client_config; use crate::interpreters::common::make_schedule_options; @@ -110,6 +114,20 @@ impl Interpreter for CreateTaskInterpreter { let config = get_task_client_config(self.ctx.clone(), cloud_api.get_timeout())?; let req = make_request(req, config); task_client.create_task(req).await?; + + // Grant ownership as the current role + if let Some(current_role) = self.ctx.get_current_role() { + let role_api = UserApiProvider::instance().role_api(&self.plan.tenant); + role_api + .grant_ownership( + &OwnershipObject::Task { + name: self.plan.task_name.clone(), + }, + ¤t_role.name, + ) + .await?; + RoleCacheManager::instance().invalidate_cache(&self.plan.tenant); + } Ok(PipelineBuildResult::create()) } } diff --git a/src/query/service/src/interpreters/interpreter_task_drop.rs b/src/query/service/src/interpreters/interpreter_task_drop.rs index cc71d1dfaf1b4..877313c8f9eed 100644 --- a/src/query/service/src/interpreters/interpreter_task_drop.rs +++ b/src/query/service/src/interpreters/interpreter_task_drop.rs @@ -16,11 +16,16 @@ use std::sync::Arc; use databend_common_cloud_control::client_config::make_request; use databend_common_cloud_control::cloud_api::CloudControlApiProvider; +use databend_common_cloud_control::pb::DescribeTaskRequest; use databend_common_cloud_control::pb::DropTaskRequest; use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_management::RoleApi; +use databend_common_meta_app::principal::OwnershipObject; use databend_common_sql::plans::DropTaskPlan; +use databend_common_users::RoleCacheManager; +use databend_common_users::UserApiProvider; use crate::interpreters::common::get_task_client_config; use crate::interpreters::Interpreter; @@ -73,6 +78,26 @@ impl Interpreter for DropTaskInterpreter { let task_client = cloud_api.get_task_client(); let req = self.build_request(); let config = get_task_client_config(self.ctx.clone(), cloud_api.get_timeout())?; + + { + let des_req = DescribeTaskRequest { + task_name: self.plan.task_name.to_string(), + tenant_id: self.plan.tenant.tenant_name().to_string(), + if_exist: false, + }; + let des_req = make_request(des_req, config.clone()); + let resp = task_client.describe_task(des_req).await?; + if resp.task.is_some() { + let role_api = UserApiProvider::instance().role_api(&self.plan.tenant); + let owner_object = OwnershipObject::Task { + name: self.plan.task_name.to_string(), + }; + + role_api.revoke_ownership(&owner_object).await?; + RoleCacheManager::instance().invalidate_cache(&self.plan.tenant); + } + } + let req = make_request(req, config); task_client.drop_task(req).await?; Ok(PipelineBuildResult::create()) diff --git a/src/query/sql/src/planner/binder/ddl/account.rs b/src/query/sql/src/planner/binder/ddl/account.rs index c27840765ae16..8c8bd0dc0c9e5 100644 --- a/src/query/sql/src/planner/binder/ddl/account.rs +++ b/src/query/sql/src/planner/binder/ddl/account.rs @@ -171,6 +171,7 @@ impl Binder { } AccountMgrLevel::UDF(udf) => Ok(GrantObject::UDF(udf.clone())), AccountMgrLevel::Stage(stage) => Ok(GrantObject::Stage(stage.clone())), + AccountMgrLevel::Task(task) => Ok(GrantObject::Task(task.clone())), } } @@ -222,6 +223,7 @@ impl Binder { } AccountMgrLevel::UDF(udf) => Ok(vec![GrantObject::UDF(udf.clone())]), AccountMgrLevel::Stage(stage) => Ok(vec![GrantObject::Stage(stage.clone())]), + AccountMgrLevel::Task(task) => Ok(vec![GrantObject::Task(task.clone())]), } } diff --git a/src/query/users/src/visibility_checker.rs b/src/query/users/src/visibility_checker.rs index 4ce108b126cf6..836b244776f80 100644 --- a/src/query/users/src/visibility_checker.rs +++ b/src/query/users/src/visibility_checker.rs @@ -27,6 +27,7 @@ use enumflags2::BitFlags; /// It is used in `SHOW DATABASES` and `SHOW TABLES` statements. pub struct GrantObjectVisibilityChecker { granted_global_udf: bool, + granted_global_task: bool, granted_global_db_table: bool, granted_global_stage: bool, granted_global_read_stage: bool, @@ -37,6 +38,7 @@ pub struct GrantObjectVisibilityChecker { extra_databases: HashSet<(String, String)>, extra_databases_id: HashSet<(String, u64)>, granted_udfs: HashSet, + granted_tasks: HashSet, granted_write_stages: HashSet, granted_read_stages: HashSet, } @@ -44,12 +46,14 @@ pub struct GrantObjectVisibilityChecker { impl GrantObjectVisibilityChecker { pub fn new(user: &UserInfo, available_roles: &Vec) -> Self { let mut granted_global_udf = false; + let mut granted_global_task = false; let mut granted_global_db_table = false; let mut granted_global_stage = false; let mut granted_global_read_stage = false; let mut granted_databases = HashSet::new(); let mut granted_tables = HashSet::new(); let mut granted_udfs = HashSet::new(); + let mut granted_tasks = HashSet::new(); let mut granted_write_stages = HashSet::new(); let mut granted_read_stages = HashSet::new(); let mut extra_databases = HashSet::new(); @@ -95,6 +99,15 @@ impl GrantObjectVisibilityChecker { }, ); + check_privilege( + &mut granted_global_task, + ent.privileges().iter(), + |privilege| { + UserPrivilegeSet::available_privileges_on_task(false) + .has_privilege(privilege) + }, + ); + check_privilege( &mut granted_global_read_stage, ent.privileges().iter(), @@ -133,6 +146,9 @@ impl GrantObjectVisibilityChecker { GrantObject::UDF(udf) => { granted_udfs.insert(udf.to_string()); } + GrantObject::Task(task) => { + granted_tasks.insert(task.to_string()); + } GrantObject::Stage(stage) => { if ent .privileges() @@ -153,6 +169,7 @@ impl GrantObjectVisibilityChecker { Self { granted_global_udf, + granted_global_task, granted_global_db_table, granted_global_stage, granted_global_read_stage, @@ -163,6 +180,7 @@ impl GrantObjectVisibilityChecker { extra_databases, extra_databases_id, granted_udfs, + granted_tasks, granted_write_stages, granted_read_stages, } @@ -201,6 +219,18 @@ impl GrantObjectVisibilityChecker { false } + // TODO: Need to call this check in task_history.rs::get_full_data + pub fn check_task_visibility(&self, task: &str) -> bool { + if self.granted_global_task { + return true; + } + + if self.granted_tasks.contains(task) { + return true; + } + false + } + pub fn check_database_visibility(&self, catalog: &str, db: &str, db_id: u64) -> bool { // skip information_schema privilege check if db.to_lowercase() == "information_schema" || db.to_lowercase() == "system" { diff --git a/tests/suites/0_stateless/18_rbac/18_0011_task.result b/tests/suites/0_stateless/18_rbac/18_0011_task.result new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/tests/suites/0_stateless/18_rbac/18_0011_task.sh b/tests/suites/0_stateless/18_rbac/18_0011_task.sh new file mode 100644 index 0000000000000..481e6a3a06826 --- /dev/null +++ b/tests/suites/0_stateless/18_rbac/18_0011_task.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../../../shell_env.sh + +echo "=== test UDF priv" +export TEST_USER_PASSWORD="password" +export TEST_USER_CONNECT="bendsql --user=test-user --password=password --host=${QUERY_MYSQL_HANDLER_HOST} --port ${QUERY_HTTP_HANDLER_PORT}" From 348da418b7c7c8684bfc610a91549cece801c1e7 Mon Sep 17 00:00:00 2001 From: TCeason Date: Fri, 10 May 2024 16:36:13 +0800 Subject: [PATCH 2/8] Task privilege Part2 1. add function: list_task_ownerships, only list task prefix. 2. add visibility in task/task_history system table. 3. refactor showtasks, directly use select * from system.task; 4. fix ut err --- src/common/cloud_control/proto/task.proto | 2 + .../tenant_ownership_object_ident.rs | 2 +- .../ast/tests/it/testdata/statement-error.txt | 45 ++ src/query/ast/tests/it/testdata/statement.txt | 114 +++ src/query/management/src/role/role_api.rs | 5 + src/query/management/src/role/role_mgr.rs | 22 + .../interpreters/access/privilege_access.rs | 663 ++++++++++++++---- .../src/interpreters/interpreter_factory.rs | 2 - .../interpreters/interpreter_tasks_show.rs | 96 --- src/query/service/src/interpreters/mod.rs | 1 - src/query/sql/src/planner/binder/binder.rs | 2 +- src/query/sql/src/planner/binder/ddl/task.rs | 27 +- .../sql/src/planner/format/display_plan.rs | 3 +- src/query/sql/src/planner/plans/ddl/task.rs | 13 - src/query/sql/src/planner/plans/plan.rs | 4 +- .../storages/system/src/task_history_table.rs | 31 +- src/query/storages/system/src/tasks_table.rs | 19 +- src/query/storages/system/src/util.rs | 38 + src/query/users/src/role_mgr.rs | 17 + src/query/users/src/visibility_checker.rs | 31 +- .../0_stateless/18_rbac/18_0011_task.sh | 0 21 files changed, 828 insertions(+), 309 deletions(-) delete mode 100644 src/query/service/src/interpreters/interpreter_tasks_show.rs mode change 100644 => 100755 tests/suites/0_stateless/18_rbac/18_0011_task.sh diff --git a/src/common/cloud_control/proto/task.proto b/src/common/cloud_control/proto/task.proto index b980acb7447ac..a49e7e0cbe129 100644 --- a/src/common/cloud_control/proto/task.proto +++ b/src/common/cloud_control/proto/task.proto @@ -119,6 +119,7 @@ message ShowTasksRequest {// every owner has a roles list like ["role1", "role2" int32 result_limit = 4; repeated string owners = 5; // all available roles under current client repeated string task_ids = 6; // all task ids which permit to access for given user + repeated string task_names = 7; // all task names which permit to access for given user } message ShowTasksResponse { @@ -170,6 +171,7 @@ message ShowTaskRunsRequest { repeated string owners = 6; repeated string task_ids = 7; string task_name = 8; + repeated string task_names = 9; optional int32 page_size = 90; // 100 by default optional int64 next_page_token = 91; diff --git a/src/meta/app/src/principal/tenant_ownership_object_ident.rs b/src/meta/app/src/principal/tenant_ownership_object_ident.rs index d3787846c71f2..29f8c290f1d9f 100644 --- a/src/meta/app/src/principal/tenant_ownership_object_ident.rs +++ b/src/meta/app/src/principal/tenant_ownership_object_ident.rs @@ -253,7 +253,7 @@ mod tests { assert_eq!(role_grantee, parsed); } - // udf + // task { let role_grantee = TenantOwnershipObjectIdent::new_unchecked( Tenant::new_literal("test"), diff --git a/src/query/ast/tests/it/testdata/statement-error.txt b/src/query/ast/tests/it/testdata/statement-error.txt index 7f4946109ebbf..bfd1ed58d0fd4 100644 --- a/src/query/ast/tests/it/testdata/statement-error.txt +++ b/src/query/ast/tests/it/testdata/statement-error.txt @@ -916,3 +916,48 @@ error: | while parsing `DROP TABLE [IF EXISTS] [.]
` +---------- Input ---------- +GRANT CREATE TASK ON task mytask1 TO role role1 +---------- Output --------- +error: + --> SQL:1:22 + | +1 | GRANT CREATE TASK ON task mytask1 TO role role1 + | ----- ------ ^^^^ unexpected `task`, expecting `FALSE`, , , `TRUE`, `IDENTIFIER`, , , `*`, or + | | | + | | while parsing ON + | while parsing `GRANT { ROLE | schemaObjectPrivileges | ALL [ PRIVILEGES ] ON } TO { [ROLE ] | [USER] }` + + +---------- Input ---------- +GRANT ownership ON task MyTask1 TO u1 +---------- Output --------- +error: + --> SQL:1:36 + | +1 | GRANT ownership ON task MyTask1 TO u1 + | ----- ^^ unexpected `u1`, expecting `ROLE` + | | + | while parsing GRANT OWNERSHIP ON TO ROLE + + +---------- Input ---------- +GRANT select ON task MyTask1 TO u2 +---------- Output --------- +error: + --> SQL:1:17 + | +1 | GRANT select ON task MyTask1 TO u2 + | ^^^^ unexpected `task`, expecting `FALSE`, `TABLE`, , , `TRUE`, `IDENTIFIER`, , , `DATABASE`, `*`, or + + +---------- Input ---------- +GRANT usage ON task MyTask1 TO u1 +---------- Output --------- +error: + --> SQL:1:16 + | +1 | GRANT usage ON task MyTask1 TO u1 + | ^^^^ unexpected `task`, expecting `FALSE`, `TABLE`, , , `TRUE`, `IDENTIFIER`, , , `DATABASE`, `UDF`, `*`, or + + diff --git a/src/query/ast/tests/it/testdata/statement.txt b/src/query/ast/tests/it/testdata/statement.txt index 3322650aa244a..cfe9460e2e149 100644 --- a/src/query/ast/tests/it/testdata/statement.txt +++ b/src/query/ast/tests/it/testdata/statement.txt @@ -18350,6 +18350,120 @@ CreateDynamicTable( ) +---------- Input ---------- +GRANT CREATE TASK ON *.* TO role role1 +---------- Output --------- +GRANT CREATE DATABASE ON *.* TO ROLE 'role1' +---------- AST ------------ +Grant( + GrantStmt { + source: Privs { + privileges: [ + CreateDatabase, + ], + level: Global, + }, + principal: Role( + "role1", + ), + }, +) + + +---------- Input ---------- +GRANT ownership ON task MyTask1 TO role role1 +---------- Output --------- +GRANT OWNERSHIP ON TASK MyTask1 TO ROLE 'role1' +---------- AST ------------ +Grant( + GrantStmt { + source: Privs { + privileges: [ + Ownership, + ], + level: Task( + "MyTask1", + ), + }, + principal: Role( + "role1", + ), + }, +) + + +---------- Input ---------- +GRANT ownership ON task MyTask1 TO role role2 +---------- Output --------- +GRANT OWNERSHIP ON TASK MyTask1 TO ROLE 'role2' +---------- AST ------------ +Grant( + GrantStmt { + source: Privs { + privileges: [ + Ownership, + ], + level: Task( + "MyTask1", + ), + }, + principal: Role( + "role2", + ), + }, +) + + +---------- Input ---------- +GRANT drop ON task MyTask1 TO u2 +---------- Output --------- +GRANT DROP ON TASK MyTask1 TO USER 'u2'@'%' +---------- AST ------------ +Grant( + GrantStmt { + source: Privs { + privileges: [ + Drop, + ], + level: Task( + "MyTask1", + ), + }, + principal: User( + UserIdentity { + username: "u2", + hostname: "%", + }, + ), + }, +) + + +---------- Input ---------- +GRANT alter ON task MyTask1 TO u1 +---------- Output --------- +GRANT ALTER ON TASK MyTask1 TO USER 'u1'@'%' +---------- AST ------------ +Grant( + GrantStmt { + source: Privs { + privileges: [ + Alter, + ], + level: Task( + "MyTask1", + ), + }, + principal: User( + UserIdentity { + username: "u1", + hostname: "%", + }, + ), + }, +) + + ---------- Input ---------- CREATE TASK IF NOT EXISTS MyTask1 WAREHOUSE = 'MyWarehouse' SCHEDULE = 15 MINUTE SUSPEND_TASK_AFTER_NUM_FAILURES = 3 ERROR_INTEGRATION = 'notification_name' COMMENT = 'This is test task 1' DATABASE = 'target', TIMEZONE = 'America/Los Angeles' AS SELECT * FROM MyTable1 ---------- Output --------- diff --git a/src/query/management/src/role/role_api.rs b/src/query/management/src/role/role_api.rs index 79da651665c2f..ab2bded232c59 100644 --- a/src/query/management/src/role/role_api.rs +++ b/src/query/management/src/role/role_api.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::principal::OwnershipInfo; use databend_common_meta_app::principal::OwnershipObject; @@ -30,6 +31,10 @@ pub trait RoleApi: Sync + Send { async fn get_ownerships(&self) -> Result>>; + async fn list_tasks_ownerships( + &self, + ) -> std::result::Result>, ErrorCode>; + /// General role update. /// /// It fetches the role that matches the specified seq number, update it in place, then write it back with the seq it sees. diff --git a/src/query/management/src/role/role_mgr.rs b/src/query/management/src/role/role_mgr.rs index d3383df0fc795..fae49363746a1 100644 --- a/src/query/management/src/role/role_mgr.rs +++ b/src/query/management/src/role/role_mgr.rs @@ -217,6 +217,28 @@ impl RoleApi for RoleMgr { Ok(r) } + #[async_backtrace::framed] + #[minitrace::trace] + async fn list_tasks_ownerships(&self) -> Result>, ErrorCode> { + let mut task_object_owner_prefix = self.ownership_object_prefix(); + task_object_owner_prefix.push_str("task-by-name/"); + let values = self + .kv_api + .prefix_list_kv(task_object_owner_prefix.as_str()) + .await?; + + let mut r = vec![]; + + let mut quota = Quota::new(func_name!()); + + for (key, val) in values { + let u = check_and_upgrade_to_pb(&mut quota, key, &val, self.kv_api.as_ref()).await?; + r.push(u); + } + + Ok(r) + } + /// General role update. /// /// It fetch the role that matches the specified seq number, update it in place, then write it back with the seq it sees. diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index 07c7a46acedaa..011e593af1900 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -454,11 +454,7 @@ impl PrivilegeAccess { Ok(()) } - async fn validate_task_access( - &self, - task: &str, - privilege: UserPrivilegeType, - ) -> Result<()> { + async fn validate_task_access(&self, task: &str, privilege: UserPrivilegeType) -> Result<()> { self.validate_access(&GrantObject::Task(task.to_owned()), privilege) .await?; @@ -514,23 +510,34 @@ impl AccessChecker for PrivilegeAccess { Some(RewriteKind::ShowDatabases) | Some(RewriteKind::ShowEngines) | Some(RewriteKind::ShowFunctions) + | Some(RewriteKind::ShowTasks) | Some(RewriteKind::ShowUserFunctions) => { return Ok(()); } - | Some(RewriteKind::ShowTableFunctions) => { + Some(RewriteKind::ShowTableFunctions) => { return Ok(()); } Some(RewriteKind::ShowTables(catalog, database)) => { let session = self.ctx.get_current_session(); - if self.has_ownership(&session, &GrantObject::Database(catalog.clone(), database.clone())).await? { + if self + .has_ownership( + &session, + &GrantObject::Database(catalog.clone(), database.clone()), + ) + .await? + { return Ok(()); } let catalog = self.ctx.get_catalog(catalog).await?; - let (db_id, table_id) = match self.convert_to_id(&tenant, &catalog, database, None).await? { - ObjectId::Table(db_id, table_id) => { (db_id, Some(table_id)) } - ObjectId::Database(db_id) => { (db_id, None) } + let (db_id, table_id) = match self + .convert_to_id(&tenant, &catalog, database, None) + .await? + { + ObjectId::Table(db_id, table_id) => (db_id, Some(table_id)), + ObjectId::Database(db_id) => (db_id, None), }; - let has_priv = has_priv(&tenant, database, None, db_id, table_id, grant_set).await?; + let has_priv = + has_priv(&tenant, database, None, db_id, table_id, grant_set).await?; return if has_priv { Ok(()) } else { @@ -542,15 +549,25 @@ impl AccessChecker for PrivilegeAccess { } Some(RewriteKind::ShowStreams(database)) => { let session = self.ctx.get_current_session(); - if self.has_ownership(&session, &GrantObject::Database(catalog_name.clone(), database.clone())).await? { + if self + .has_ownership( + &session, + &GrantObject::Database(catalog_name.clone(), database.clone()), + ) + .await? + { return Ok(()); } let catalog = self.ctx.get_catalog(&catalog_name).await?; - let (db_id, table_id) = match self.convert_to_id(&tenant, &catalog, database, None).await? { - ObjectId::Table(db_id, table_id) => { (db_id, Some(table_id)) } - ObjectId::Database(db_id) => { (db_id, None) } + let (db_id, table_id) = match self + .convert_to_id(&tenant, &catalog, database, None) + .await? + { + ObjectId::Table(db_id, table_id) => (db_id, Some(table_id)), + ObjectId::Database(db_id) => (db_id, None), }; - let has_priv = has_priv(&tenant, database, None, db_id, table_id, grant_set).await?; + let has_priv = + has_priv(&tenant, database, None, db_id, table_id, grant_set).await?; return if has_priv { Ok(()) } else { @@ -562,15 +579,30 @@ impl AccessChecker for PrivilegeAccess { } Some(RewriteKind::ShowColumns(catalog_name, database, table)) => { let session = self.ctx.get_current_session(); - if self.has_ownership(&session, &GrantObject::Table(catalog_name.clone(), database.clone(), table.clone())).await? { + if self + .has_ownership( + &session, + &GrantObject::Table( + catalog_name.clone(), + database.clone(), + table.clone(), + ), + ) + .await? + { return Ok(()); } let catalog = self.ctx.get_catalog(catalog_name).await?; - let (db_id, table_id) = match self.convert_to_id(&tenant, &catalog, database, Some(table)).await? { - ObjectId::Table(db_id, table_id) => { (db_id, Some(table_id)) } - ObjectId::Database(db_id) => { (db_id, None) } + let (db_id, table_id) = match self + .convert_to_id(&tenant, &catalog, database, Some(table)) + .await? + { + ObjectId::Table(db_id, table_id) => (db_id, Some(table_id)), + ObjectId::Database(db_id) => (db_id, None), }; - let has_priv = has_priv(&tenant, database, Some(table), db_id, table_id, grant_set).await?; + let has_priv = + has_priv(&tenant, database, Some(table), db_id, table_id, grant_set) + .await?; return if has_priv { Ok(()) } else { @@ -601,12 +633,21 @@ impl AccessChecker for PrivilegeAccess { if enable_experimental_rbac_check && table.is_source_of_stage() { match table.table().get_data_source_info() { DataSourceInfo::StageSource(stage_info) => { - self.validate_stage_access(&stage_info.stage_info, UserPrivilegeType::Read).await?; + self.validate_stage_access( + &stage_info.stage_info, + UserPrivilegeType::Read, + ) + .await?; } DataSourceInfo::ParquetSource(stage_info) => { - self.validate_stage_access(&stage_info.stage_info, UserPrivilegeType::Read).await?; + self.validate_stage_access( + &stage_info.stage_info, + UserPrivilegeType::Read, + ) + .await?; } - DataSourceInfo::TableSource(_) | DataSourceInfo::ResultScanSource(_) => {} + DataSourceInfo::TableSource(_) + | DataSourceInfo::ResultScanSource(_) => {} } } if table.is_source_of_view() { @@ -616,7 +657,14 @@ impl AccessChecker for PrivilegeAccess { // like this sql: copy into t from (select * from @s3); will bind a mock table with name `system.read_parquet(s3)` // this is no means to check table `system.read_parquet(s3)` privilege if !table.is_source_of_stage() { - self.validate_table_access(catalog_name, table.database(), table.name(), UserPrivilegeType::Select, false).await? + self.validate_table_access( + catalog_name, + table.database(), + table.name(), + UserPrivilegeType::Select, + false, + ) + .await? } } } @@ -626,35 +674,93 @@ impl AccessChecker for PrivilegeAccess { // Database. Plan::ShowCreateDatabase(plan) => { - self.validate_db_access(&plan.catalog, &plan.database, UserPrivilegeType::Select, false).await? + self.validate_db_access( + &plan.catalog, + &plan.database, + UserPrivilegeType::Select, + false, + ) + .await? } Plan::CreateDatabase(_) => { self.validate_access(&GrantObject::Global, UserPrivilegeType::CreateDatabase) .await?; } + // Task + // Is it compatible with the permission check of the old version? Plan::CreateTask(_) => { - self.validate_access(&GrantObject::Global, UserPrivilegeType::CreateTask) - .await?; + if self + .validate_access(&GrantObject::Global, UserPrivilegeType::CreateTask) + .await + .is_err() + { + self.validate_access(&GrantObject::Global, UserPrivilegeType::Super) + .await?; + } } Plan::AlterTask(plan) => { - self.validate_task_access(&plan.task_name, UserPrivilegeType::Alter).await?; + if self + .validate_task_access(&plan.task_name, UserPrivilegeType::Alter) + .await + .is_err() + { + self.validate_access(&GrantObject::Global, UserPrivilegeType::Super) + .await?; + } } Plan::DropTask(plan) => { - self.validate_task_access(&plan.task_name, UserPrivilegeType::Drop).await?; + if self + .validate_task_access(&plan.task_name, UserPrivilegeType::Drop) + .await + .is_err() + { + self.validate_access(&GrantObject::Global, UserPrivilegeType::Super) + .await?; + } + } + Plan::DescribeTask(plan) => { + let session = self.ctx.get_current_session(); + if self + .has_ownership(&session, &GrantObject::Task(plan.task_name.to_owned())) + .await + .is_err() + { + self.validate_access(&GrantObject::Global, UserPrivilegeType::Super) + .await?; + } + } + Plan::ExecuteTask(plan) => { + let session = self.ctx.get_current_session(); + if self + .has_ownership(&session, &GrantObject::Task(plan.task_name.to_owned())) + .await + .is_err() + { + self.validate_access(&GrantObject::Global, UserPrivilegeType::Super) + .await?; + } } Plan::DropDatabase(plan) => { - self.validate_db_access(&plan.catalog, &plan.database, UserPrivilegeType::Drop, plan.if_exists).await?; + self.validate_db_access( + &plan.catalog, + &plan.database, + UserPrivilegeType::Drop, + plan.if_exists, + ) + .await?; } - Plan::UndropDatabase(_) - | Plan::DropIndex(_) - | Plan::DropTableIndex(_) => { + Plan::UndropDatabase(_) | Plan::DropIndex(_) | Plan::DropTableIndex(_) => { // undroptable/db need convert name to id. But because of drop, can not find the id. Upgrade Object to Database. self.validate_access(&GrantObject::Global, UserPrivilegeType::Drop) .await?; } Plan::DropUDF(plan) => { let udf_name = &plan.udf; - if !UserApiProvider::instance().exists_udf(&tenant, udf_name).await? && plan.if_exists { + if !UserApiProvider::instance() + .exists_udf(&tenant, udf_name) + .await? + && plan.if_exists + { return Ok(()); } if enable_experimental_rbac_check { @@ -662,14 +768,18 @@ impl AccessChecker for PrivilegeAccess { self.validate_udf_access(udf).await?; } else { self.validate_access(&GrantObject::Global, UserPrivilegeType::Drop) - .await?; + .await?; } } Plan::DropStage(plan) => { - match UserApiProvider::instance().get_stage(&tenant, &plan.name).await { + match UserApiProvider::instance() + .get_stage(&tenant, &plan.name) + .await + { Ok(stage) => { if enable_experimental_rbac_check { - let privileges = vec![UserPrivilegeType::Read, UserPrivilegeType::Write]; + let privileges = + vec![UserPrivilegeType::Read, UserPrivilegeType::Write]; for privilege in privileges { self.validate_stage_access(&stage, privilege).await?; } @@ -678,108 +788,245 @@ impl AccessChecker for PrivilegeAccess { .await?; } } - Err(e) => { - match e.code() { - ErrorCode::UNKNOWN_STAGE if plan.if_exists => - { - return Ok(()); - } - _ => return Err(e.add_message("error on validating stage access")), + Err(e) => match e.code() { + ErrorCode::UNKNOWN_STAGE if plan.if_exists => { + return Ok(()); } - } + _ => return Err(e.add_message("error on validating stage access")), + }, } } Plan::UseDatabase(plan) => { let session = self.ctx.get_current_session(); - if self.has_ownership(&session, &GrantObject::Database(catalog_name.clone(), plan.database.clone())).await? { + if self + .has_ownership( + &session, + &GrantObject::Database(catalog_name.clone(), plan.database.clone()), + ) + .await? + { return Ok(()); } let catalog = self.ctx.get_catalog(&catalog_name).await?; // Use db is special. Should not check the privilege. // Just need to check user grant objects contain the db that be used. - let (db_id, _) = match self.convert_to_id(&tenant, &catalog, &plan.database, None).await? { - ObjectId::Table(db_id, table_id) => { (db_id, Some(table_id)) } - ObjectId::Database(db_id) => { (db_id, None) } + let (db_id, _) = match self + .convert_to_id(&tenant, &catalog, &plan.database, None) + .await? + { + ObjectId::Table(db_id, table_id) => (db_id, Some(table_id)), + ObjectId::Database(db_id) => (db_id, None), }; - let has_priv = has_priv(&tenant, &plan.database, None, db_id, None, grant_set).await?; + let has_priv = + has_priv(&tenant, &plan.database, None, db_id, None, grant_set).await?; return if has_priv { Ok(()) } else { Err(ErrorCode::PermissionDenied(format!( "Permission denied: User {} does not have the required privileges for database '{}'", - identity, plan.database.clone() + identity, + plan.database.clone() ))) }; } // Virtual Column. Plan::CreateVirtualColumn(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Create, false).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Create, + false, + ) + .await? } Plan::AlterVirtualColumn(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Alter, plan.if_exists).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Alter, + plan.if_exists, + ) + .await? } Plan::DropVirtualColumn(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Drop, plan.if_exists).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Drop, + plan.if_exists, + ) + .await? } Plan::RefreshVirtualColumn(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Super, false).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Super, + false, + ) + .await? } // Table. Plan::ShowCreateTable(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Select, false).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Select, + false, + ) + .await? } Plan::DescribeTable(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Select, false).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Select, + false, + ) + .await? } Plan::CreateTable(plan) => { - self.validate_db_access(&plan.catalog, &plan.database, UserPrivilegeType::Create, false).await?; + self.validate_db_access( + &plan.catalog, + &plan.database, + UserPrivilegeType::Create, + false, + ) + .await?; if let Some(query) = &plan.as_select { self.check(ctx, query).await?; } } Plan::DropTable(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Drop, plan.if_exists).await?; + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Drop, + plan.if_exists, + ) + .await?; } Plan::UndropTable(plan) => { // undroptable/db need convert name to id. But because of drop, can not find the id. Upgrade Object to Database. - self.validate_db_access(&plan.catalog, &plan.database, UserPrivilegeType::Drop, false).await?; - + self.validate_db_access( + &plan.catalog, + &plan.database, + UserPrivilegeType::Drop, + false, + ) + .await?; } Plan::RenameTable(plan) => { // You must have ALTER and DROP privileges for the original table, // and CREATE for the new db. let privileges = vec![UserPrivilegeType::Alter, UserPrivilegeType::Drop]; for privilege in privileges { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, privilege, plan.if_exists).await?; + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + privilege, + plan.if_exists, + ) + .await?; } - self.validate_db_access(&plan.catalog, &plan.new_database, UserPrivilegeType::Create, false).await?; + self.validate_db_access( + &plan.catalog, + &plan.new_database, + UserPrivilegeType::Create, + false, + ) + .await?; } Plan::SetOptions(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Alter, false).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Alter, + false, + ) + .await? } Plan::AddTableColumn(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Alter, false).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Alter, + false, + ) + .await? } Plan::RenameTableColumn(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Alter, false).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Alter, + false, + ) + .await? } Plan::ModifyTableColumn(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Alter, false).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Alter, + false, + ) + .await? } Plan::ModifyTableComment(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Alter, false).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Alter, + false, + ) + .await? } Plan::DropTableColumn(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Alter, false).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Alter, + false, + ) + .await? } Plan::AlterTableClusterKey(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Alter, false).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Alter, + false, + ) + .await? } Plan::DropTableClusterKey(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Drop, false).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Drop, + false, + ) + .await? } Plan::ReclusterTable(plan) => { if enable_experimental_rbac_check { @@ -788,25 +1035,67 @@ impl AccessChecker for PrivilegeAccess { self.validate_udf_access(udf).await?; } } - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Alter, false).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Alter, + false, + ) + .await? } Plan::TruncateTable(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Delete, false).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Delete, + false, + ) + .await? } Plan::OptimizeTable(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Super, false).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Super, + false, + ) + .await? } Plan::VacuumTable(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Super, false).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Super, + false, + ) + .await? } Plan::VacuumDropTable(plan) => { - self.validate_db_access(&plan.catalog, &plan.database, UserPrivilegeType::Super, false).await? + self.validate_db_access( + &plan.catalog, + &plan.database, + UserPrivilegeType::Super, + false, + ) + .await? } Plan::VacuumTemporaryFiles(_) => { - self.validate_access(&GrantObject::Global, UserPrivilegeType::Super).await? + self.validate_access(&GrantObject::Global, UserPrivilegeType::Super) + .await? } Plan::AnalyzeTable(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Super, false).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Super, + false, + ) + .await? } // Others. Plan::Insert(plan) => { @@ -816,7 +1105,14 @@ impl AccessChecker for PrivilegeAccess { vec![UserPrivilegeType::Insert] }; for privilege in target_table_privileges { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, privilege, false).await?; + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + privilege, + false, + ) + .await?; } match &plan.source { InsertInputSource::SelectPlan(plan) => { @@ -826,7 +1122,7 @@ impl AccessChecker for PrivilegeAccess { self.check(ctx, plan).await?; } InsertInputSource::StreamingWithFormat(..) - | InsertInputSource::StreamingWithFileFormat {..} + | InsertInputSource::StreamingWithFileFormat { .. } | InsertInputSource::Values(_) => {} } } @@ -836,18 +1132,37 @@ impl AccessChecker for PrivilegeAccess { } else { vec![UserPrivilegeType::Insert] }; - for target in plan.whens.iter().flat_map(|when|when.intos.iter()).chain(plan.opt_else.as_ref().into_iter().flat_map(|e|e.intos.iter())){ + for target in plan.whens.iter().flat_map(|when| when.intos.iter()).chain( + plan.opt_else + .as_ref() + .into_iter() + .flat_map(|e| e.intos.iter()), + ) { for privilege in target_table_privileges.clone() { - self.validate_table_access(&target.catalog, &target.database, &target.table, privilege, false).await?; + self.validate_table_access( + &target.catalog, + &target.database, + &target.table, + privilege, + false, + ) + .await?; } } self.check(ctx, &plan.input_source).await?; } Plan::Replace(plan) => { - //plan.delete_when is Expr no need to check privileges. + // plan.delete_when is Expr no need to check privileges. let privileges = vec![UserPrivilegeType::Insert, UserPrivilegeType::Delete]; for privilege in privileges { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, privilege, false).await?; + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + privilege, + false, + ) + .await?; } match &plan.source { InsertInputSource::SelectPlan(plan) => { @@ -857,7 +1172,7 @@ impl AccessChecker for PrivilegeAccess { self.check(ctx, plan).await?; } InsertInputSource::StreamingWithFormat(..) - | InsertInputSource::StreamingWithFileFormat {..} + | InsertInputSource::StreamingWithFileFormat { .. } | InsertInputSource::Values(_) => {} } } @@ -901,7 +1216,14 @@ impl AccessChecker for PrivilegeAccess { } let privileges = vec![UserPrivilegeType::Insert, UserPrivilegeType::Delete]; for privilege in privileges { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, privilege, false).await?; + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + privilege, + false, + ) + .await?; } } Plan::Delete(plan) => { @@ -923,7 +1245,14 @@ impl AccessChecker for PrivilegeAccess { } } } - self.validate_table_access(&plan.catalog_name, &plan.database_name, &plan.table_name, UserPrivilegeType::Delete, false).await? + self.validate_table_access( + &plan.catalog_name, + &plan.database_name, + &plan.table_name, + UserPrivilegeType::Delete, + false, + ) + .await? } Plan::Update(plan) => { if enable_experimental_rbac_check { @@ -948,7 +1277,14 @@ impl AccessChecker for PrivilegeAccess { } } } - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Update, false).await?; + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.table, + UserPrivilegeType::Update, + false, + ) + .await?; } Plan::CreateView(plan) => { let mut planner = Planner::new(self.ctx.clone()); @@ -956,52 +1292,77 @@ impl AccessChecker for PrivilegeAccess { self.check(ctx, &plan).await? } Plan::AlterView(plan) => { - self.validate_db_access(&plan.catalog, &plan.database, UserPrivilegeType::Alter, false).await?; + self.validate_db_access( + &plan.catalog, + &plan.database, + UserPrivilegeType::Alter, + false, + ) + .await?; let mut planner = Planner::new(self.ctx.clone()); let (plan, _) = planner.plan_sql(&plan.subquery).await?; self.check(ctx, &plan).await? } Plan::DropView(plan) => { - self.validate_db_access(&plan.catalog, &plan.database, UserPrivilegeType::Drop, plan.if_exists).await? + self.validate_db_access( + &plan.catalog, + &plan.database, + UserPrivilegeType::Drop, + plan.if_exists, + ) + .await? } Plan::DescribeView(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.view_name, UserPrivilegeType::Select, false).await? + self.validate_table_access( + &plan.catalog, + &plan.database, + &plan.view_name, + UserPrivilegeType::Select, + false, + ) + .await? } Plan::CreateStream(plan) => { - self.validate_db_access(&plan.catalog, &plan.database, UserPrivilegeType::Create, false).await? + self.validate_db_access( + &plan.catalog, + &plan.database, + UserPrivilegeType::Create, + false, + ) + .await? } Plan::DropStream(plan) => { - self.validate_db_access(&plan.catalog, &plan.database, UserPrivilegeType::Drop, plan.if_exists).await? + self.validate_db_access( + &plan.catalog, + &plan.database, + UserPrivilegeType::Drop, + plan.if_exists, + ) + .await? } Plan::CreateDynamicTable(plan) => { - self.validate_db_access(&plan.catalog, &plan.database, UserPrivilegeType::Create, false).await?; + self.validate_db_access( + &plan.catalog, + &plan.database, + UserPrivilegeType::Create, + false, + ) + .await?; } Plan::CreateUser(_) => { - self.validate_access( - &GrantObject::Global, - UserPrivilegeType::CreateUser, - ) + self.validate_access(&GrantObject::Global, UserPrivilegeType::CreateUser) .await?; } Plan::DropUser(_) => { - self.validate_access( - &GrantObject::Global, - UserPrivilegeType::DropUser, - ) + self.validate_access(&GrantObject::Global, UserPrivilegeType::DropUser) .await?; } Plan::CreateRole(_) => { - self.validate_access( - &GrantObject::Global, - UserPrivilegeType::CreateRole, - ) + self.validate_access(&GrantObject::Global, UserPrivilegeType::CreateRole) .await?; } Plan::DropRole(_) => { - self.validate_access( - &GrantObject::Global, - UserPrivilegeType::DropRole, - ) + self.validate_access(&GrantObject::Global, UserPrivilegeType::DropRole) .await?; } Plan::GrantShareObject(_) @@ -1015,32 +1376,41 @@ impl AccessChecker for PrivilegeAccess { self.validate_access(&GrantObject::Global, UserPrivilegeType::Grant) .await?; } - Plan::SetVariable(_) | Plan::UnSetVariable(_) | Plan::Kill(_) | Plan::SetPriority(_) => { + Plan::SetVariable(_) + | Plan::UnSetVariable(_) + | Plan::Kill(_) + | Plan::SetPriority(_) => { self.validate_access(&GrantObject::Global, UserPrivilegeType::Super) .await?; } Plan::ShowGrants(plan) => { let current_user = self.ctx.get_current_user()?; if let Some(principal) = &plan.principal { - match principal { - PrincipalIdentity::User(user) => { - if current_user.identity() == *user { - return Ok(()); - } else { - self.validate_access(&GrantObject::Global, UserPrivilegeType::Grant) - .await?; - } - } - PrincipalIdentity::Role(role) => { - let roles=current_user.grants.roles(); - if roles.contains(role) || role.to_lowercase() == "public" { - return Ok(()); - } else { - self.validate_access(&GrantObject::Global, UserPrivilegeType::Grant) - .await?; - } - } - } + match principal { + PrincipalIdentity::User(user) => { + if current_user.identity() == *user { + return Ok(()); + } else { + self.validate_access( + &GrantObject::Global, + UserPrivilegeType::Grant, + ) + .await?; + } + } + PrincipalIdentity::Role(role) => { + let roles = current_user.grants.roles(); + if roles.contains(role) || role.to_lowercase() == "public" { + return Ok(()); + } else { + self.validate_access( + &GrantObject::Global, + UserPrivilegeType::Grant, + ) + .await?; + } + } + } } else { return Ok(()); } @@ -1056,19 +1426,32 @@ impl AccessChecker for PrivilegeAccess { .await?; } Plan::CopyIntoTable(plan) => { - self.validate_stage_access(&plan.stage_table_info.stage_info, UserPrivilegeType::Read).await?; - self.validate_table_access(plan.catalog_info.catalog_name(), &plan.database_name, &plan.table_name, UserPrivilegeType::Insert, false).await?; + self.validate_stage_access( + &plan.stage_table_info.stage_info, + UserPrivilegeType::Read, + ) + .await?; + self.validate_table_access( + plan.catalog_info.catalog_name(), + &plan.database_name, + &plan.table_name, + UserPrivilegeType::Insert, + false, + ) + .await?; if let Some(query) = &plan.query { self.check(ctx, query).await?; } } Plan::CopyIntoLocation(plan) => { - self.validate_stage_access(&plan.stage, UserPrivilegeType::Write).await?; + self.validate_stage_access(&plan.stage, UserPrivilegeType::Write) + .await?; let from = plan.from.clone(); return self.check(ctx, &from).await; } Plan::RemoveStage(plan) => { - self.validate_stage_access(&plan.stage, UserPrivilegeType::Write).await?; + self.validate_stage_access(&plan.stage, UserPrivilegeType::Write) + .await?; } Plan::CreateShareEndpoint(_) | Plan::ShowShareEndpoint(_) @@ -1104,21 +1487,13 @@ impl AccessChecker for PrivilegeAccess { | Plan::DropNotification(_) | Plan::DescNotification(_) | Plan::AlterNotification(_) - //| Plan::CreateTask(_) // TODO: need to build ownership info for task - //| Plan::DropTask(_) // TODO: need to build ownership info for task - | Plan::ShowTasks(_) // TODO: need to build ownership info for task - | Plan::DescribeTask(_) // TODO: need to build ownership info for task - | Plan::ExecuteTask(_) // TODO: need to build ownership info for task | Plan::CreateSequence(_) | Plan::DropSequence(_) => { self.validate_access(&GrantObject::Global, UserPrivilegeType::Super) .await?; } Plan::CreateDatamaskPolicy(_) | Plan::DropDatamaskPolicy(_) => { - self.validate_access( - &GrantObject::Global, - UserPrivilegeType::CreateDataMask, - ) + self.validate_access(&GrantObject::Global, UserPrivilegeType::CreateDataMask) .await?; } // Note: No need to check privileges diff --git a/src/query/service/src/interpreters/interpreter_factory.rs b/src/query/service/src/interpreters/interpreter_factory.rs index 00c9ae6a39ed5..2690596cd58ff 100644 --- a/src/query/service/src/interpreters/interpreter_factory.rs +++ b/src/query/service/src/interpreters/interpreter_factory.rs @@ -57,7 +57,6 @@ use crate::interpreters::interpreter_task_create::CreateTaskInterpreter; use crate::interpreters::interpreter_task_describe::DescribeTaskInterpreter; use crate::interpreters::interpreter_task_drop::DropTaskInterpreter; use crate::interpreters::interpreter_task_execute::ExecuteTaskInterpreter; -use crate::interpreters::interpreter_tasks_show::ShowTasksInterpreter; use crate::interpreters::interpreter_txn_abort::AbortInterpreter; use crate::interpreters::interpreter_txn_begin::BeginInterpreter; use crate::interpreters::interpreter_txn_commit::CommitInterpreter; @@ -547,7 +546,6 @@ impl InterpreterFactory { ctx, *p.clone(), )?)), - Plan::ShowTasks(p) => Ok(Arc::new(ShowTasksInterpreter::try_create(ctx, *p.clone())?)), Plan::CreateConnection(p) => Ok(Arc::new(CreateConnectionInterpreter::try_create( ctx, diff --git a/src/query/service/src/interpreters/interpreter_tasks_show.rs b/src/query/service/src/interpreters/interpreter_tasks_show.rs deleted file mode 100644 index cf63393157a11..0000000000000 --- a/src/query/service/src/interpreters/interpreter_tasks_show.rs +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use databend_common_cloud_control::client_config::make_request; -use databend_common_cloud_control::cloud_api::CloudControlApiProvider; -use databend_common_cloud_control::pb::ShowTasksRequest; -use databend_common_config::GlobalConfig; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_sql::plans::ShowTasksPlan; -use databend_common_storages_system::parse_tasks_to_datablock; - -use crate::interpreters::common::get_task_client_config; -use crate::interpreters::Interpreter; -use crate::pipelines::PipelineBuildResult; -use crate::sessions::QueryContext; - -#[derive(Debug)] -pub struct ShowTasksInterpreter { - ctx: Arc, - plan: ShowTasksPlan, -} - -impl ShowTasksInterpreter { - pub fn try_create(ctx: Arc, plan: ShowTasksPlan) -> Result { - Ok(ShowTasksInterpreter { ctx, plan }) - } -} - -impl ShowTasksInterpreter { - async fn build_request(&self) -> Result { - let plan = self.plan.clone(); - let available_roles = self - .ctx - .get_current_session() - .get_all_available_roles() - .await?; - let req = ShowTasksRequest { - tenant_id: plan.tenant.tenant_name().to_string(), - name_like: "".to_string(), - result_limit: 10000, // TODO: use plan.limit pushdown - owners: available_roles - .into_iter() - .map(|x| x.identity().to_string()) - .collect(), - task_ids: vec![], - }; - Ok(req) - } -} - -#[async_trait::async_trait] -impl Interpreter for ShowTasksInterpreter { - fn name(&self) -> &str { - "ShowTasksInterpreter" - } - - fn is_ddl(&self) -> bool { - true - } - - #[minitrace::trace] - #[async_backtrace::framed] - async fn execute2(&self) -> Result { - let config = GlobalConfig::instance(); - if config.query.cloud_control_grpc_server_address.is_none() { - return Err(ErrorCode::CloudControlNotEnabled( - "cannot drop task without cloud control enabled, please set cloud_control_grpc_server_address in config", - )); - } - let cloud_api = CloudControlApiProvider::instance(); - let task_client = cloud_api.get_task_client(); - let req = self.build_request().await?; - let config = get_task_client_config(self.ctx.clone(), cloud_api.get_timeout())?; - let req = make_request(req, config); - - let resp = task_client.show_tasks(req).await?; - let tasks = resp.tasks; - - let result = parse_tasks_to_datablock(tasks)?; - PipelineBuildResult::from_blocks(vec![result]) - } -} diff --git a/src/query/service/src/interpreters/mod.rs b/src/query/service/src/interpreters/mod.rs index 989a2954ea077..5c191d41367e3 100644 --- a/src/query/service/src/interpreters/mod.rs +++ b/src/query/service/src/interpreters/mod.rs @@ -122,7 +122,6 @@ mod interpreter_task_create; mod interpreter_task_describe; mod interpreter_task_drop; mod interpreter_task_execute; -mod interpreter_tasks_show; mod interpreter_txn_abort; mod interpreter_txn_begin; mod interpreter_txn_commit; diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index 8b6c40c9e2057..eb1369aa46904 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -613,7 +613,7 @@ impl<'a> Binder { self.bind_execute_task(stmt).await? } Statement::ShowTasks(stmt) => { - self.bind_show_tasks(stmt).await? + self.bind_show_tasks(bind_context, stmt).await? } // Streams diff --git a/src/query/sql/src/planner/binder/ddl/task.rs b/src/query/sql/src/planner/binder/ddl/task.rs index 30d224957f3f6..b9ac090e40361 100644 --- a/src/query/sql/src/planner/binder/ddl/task.rs +++ b/src/query/sql/src/planner/binder/ddl/task.rs @@ -23,6 +23,7 @@ use databend_common_ast::ast::DescribeTaskStmt; use databend_common_ast::ast::DropTaskStmt; use databend_common_ast::ast::ExecuteTaskStmt; use databend_common_ast::ast::ScheduleOptions; +use databend_common_ast::ast::ShowLimit; use databend_common_ast::ast::ShowTasksStmt; use databend_common_ast::ast::TaskSql; use databend_common_ast::parser::parse_sql; @@ -30,6 +31,7 @@ use databend_common_ast::parser::tokenize_sql; use databend_common_ast::parser::Dialect; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use log::debug; use crate::plans::AlterTaskPlan; use crate::plans::CreateTaskPlan; @@ -37,8 +39,10 @@ use crate::plans::DescribeTaskPlan; use crate::plans::DropTaskPlan; use crate::plans::ExecuteTaskPlan; use crate::plans::Plan; -use crate::plans::ShowTasksPlan; +use crate::plans::RewriteKind; +use crate::BindContext; use crate::Binder; +use crate::SelectBuilder; fn verify_single_statement(sql: &String) -> Result<()> { let tokens = tokenize_sql(sql.as_str()).map_err(|e| { @@ -243,16 +247,27 @@ impl Binder { #[async_backtrace::framed] pub(in crate::planner::binder) async fn bind_show_tasks( &mut self, + bind_context: &mut BindContext, stmt: &ShowTasksStmt, ) -> Result { let ShowTasksStmt { limit } = stmt; - let tenant = self.ctx.get_tenant(); + let mut select_builder = SelectBuilder::from("system.tasks"); - let plan = ShowTasksPlan { - tenant, - limit: limit.clone(), + let query = match limit { + None => select_builder.build(), + Some(ShowLimit::Like { pattern }) => { + select_builder.with_filter(format!("name LIKE '{pattern}'")); + select_builder.build() + } + Some(ShowLimit::Where { selection }) => { + select_builder.with_filter(format!("({selection})")); + select_builder.build() + } }; - Ok(Plan::ShowTasks(Box::new(plan))) + + debug!("show tasks rewrite to: {:?}", query); + self.bind_rewrite_to_query(bind_context, query.as_str(), RewriteKind::ShowTasks) + .await } } diff --git a/src/query/sql/src/planner/format/display_plan.rs b/src/query/sql/src/planner/format/display_plan.rs index cc2872b742f5c..8b433a36f21e0 100644 --- a/src/query/sql/src/planner/format/display_plan.rs +++ b/src/query/sql/src/planner/format/display_plan.rs @@ -202,9 +202,8 @@ impl Plan { Plan::AlterTask(_) => Ok("AlterTask".to_string()), Plan::DescribeTask(_) => Ok("DescribeTask".to_string()), Plan::ExecuteTask(_) => Ok("ExecuteTask".to_string()), - Plan::ShowTasks(_) => Ok("ShowTasks".to_string()), - // task + // Connection Plan::CreateConnection(_) => Ok("CreateConnection".to_string()), Plan::DescConnection(_) => Ok("DescConnection".to_string()), Plan::DropConnection(_) => Ok("DropConnection".to_string()), diff --git a/src/query/sql/src/planner/plans/ddl/task.rs b/src/query/sql/src/planner/plans/ddl/task.rs index bb2a812086092..866e6e3368f4c 100644 --- a/src/query/sql/src/planner/plans/ddl/task.rs +++ b/src/query/sql/src/planner/plans/ddl/task.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use databend_common_ast::ast::AlterTaskOptions; use databend_common_ast::ast::ScheduleOptions; -use databend_common_ast::ast::ShowLimit; use databend_common_ast::ast::TaskSql; use databend_common_ast::ast::WarehouseOptions; use databend_common_expression::types::DataType; @@ -150,15 +149,3 @@ impl ExecuteTaskPlan { DataSchemaRefExt::create(vec![]) } } - -#[derive(Clone, Debug, PartialEq)] -pub struct ShowTasksPlan { - pub tenant: Tenant, - pub limit: Option, -} - -impl ShowTasksPlan { - pub fn schema(&self) -> DataSchemaRef { - task_schema() - } -} diff --git a/src/query/sql/src/planner/plans/plan.rs b/src/query/sql/src/planner/plans/plan.rs index 90d984fefed88..59fc0328c0b67 100644 --- a/src/query/sql/src/planner/plans/plan.rs +++ b/src/query/sql/src/planner/plans/plan.rs @@ -142,7 +142,6 @@ use crate::plans::ShowObjectGrantPrivilegesPlan; use crate::plans::ShowRolesPlan; use crate::plans::ShowShareEndpointPlan; use crate::plans::ShowSharesPlan; -use crate::plans::ShowTasksPlan; use crate::plans::TruncateTablePlan; use crate::plans::UnSettingPlan; use crate::plans::UndropDatabasePlan; @@ -343,7 +342,6 @@ pub enum Plan { AlterTask(Box), DropTask(Box), DescribeTask(Box), - ShowTasks(Box), ExecuteTask(Box), CreateDynamicTable(Box), @@ -381,6 +379,7 @@ pub enum RewriteKind { ShowDatabases, ShowTables(String, String), ShowColumns(String, String, String), + ShowTasks, ShowTablesStatus, ShowVirtualColumns, @@ -473,7 +472,6 @@ impl Plan { Plan::MergeInto(plan) => plan.schema(), Plan::CreateTask(plan) => plan.schema(), Plan::DescribeTask(plan) => plan.schema(), - Plan::ShowTasks(plan) => plan.schema(), Plan::ExecuteTask(plan) => plan.schema(), Plan::DescNotification(plan) => plan.schema(), Plan::DescConnection(plan) => plan.schema(), diff --git a/src/query/storages/system/src/task_history_table.rs b/src/query/storages/system/src/task_history_table.rs index 024f69bb2bf70..2b6353e7d4244 100644 --- a/src/query/storages/system/src/task_history_table.rs +++ b/src/query/storages/system/src/task_history_table.rs @@ -41,12 +41,15 @@ use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; use databend_common_sql::plans::task_run_schema; +use databend_common_users::UserApiProvider; +use databend_common_users::BUILTIN_ROLE_ACCOUNT_ADMIN; use crate::table::AsyncOneBlockSystemTable; use crate::table::AsyncSystemTable; use crate::util::find_eq_filter; use crate::util::find_gt_filter; use crate::util::find_lt_filter; +use crate::util::get_owned_task_names; pub fn parse_task_runs_to_datablock(task_runs: Vec) -> Result { let mut name: Vec = Vec::with_capacity(task_runs.len()); @@ -130,6 +133,7 @@ impl AsyncSystemTable for TaskHistoryTable { ctx: Arc, push_downs: Option, ) -> Result { + let user_api = UserApiProvider::instance(); let config = GlobalConfig::instance(); if config.query.cloud_control_grpc_server_address.is_none() { return Err(ErrorCode::CloudControlNotEnabled( @@ -140,7 +144,12 @@ impl AsyncSystemTable for TaskHistoryTable { let tenant = ctx.get_tenant(); let query_id = ctx.get_id(); let user = ctx.get_current_user()?.identity().display().to_string(); - let available_roles = ctx.get_available_roles().await?; + let all_effective_roles: Vec = ctx + .get_all_effective_roles() + .await? + .into_iter() + .map(|x| x.identity().to_string()) + .collect(); // TODO: limit push_down does NOT work during tests,we need to fix it later. let result_limit = push_downs .as_ref() @@ -175,6 +184,20 @@ impl AsyncSystemTable for TaskHistoryTable { }); } } + + let owned_tasks_names = get_owned_task_names(user_api, &tenant, &all_effective_roles).await; + if let Some(task_name) = &task_name { + // The user does not have admin role and not own the task_name + // Need directly return empty block + if !all_effective_roles + .iter() + .any(|role| role.to_lowercase() == BUILTIN_ROLE_ACCOUNT_ADMIN) + && !owned_tasks_names.contains(task_name) + { + return parse_task_runs_to_datablock(vec![]); + } + } + let req = ShowTaskRunsRequest { tenant_id: tenant.tenant_name().to_string(), scheduled_time_start: scheduled_time_start.unwrap_or("".to_string()), @@ -182,14 +205,12 @@ impl AsyncSystemTable for TaskHistoryTable { task_name: task_name.unwrap_or("".to_string()), result_limit: result_limit.unwrap_or(0), // 0 means default error_only: false, - owners: available_roles - .into_iter() - .map(|x| x.identity().to_string()) - .collect(), + owners: all_effective_roles.clone(), next_page_token: None, page_size: None, previous_page_token: None, task_ids: vec![], + task_names: owned_tasks_names.clone(), }; let cloud_api = CloudControlApiProvider::instance(); diff --git a/src/query/storages/system/src/tasks_table.rs b/src/query/storages/system/src/tasks_table.rs index 7db5a7e6888c3..879f2e9daea6f 100644 --- a/src/query/storages/system/src/tasks_table.rs +++ b/src/query/storages/system/src/tasks_table.rs @@ -36,9 +36,11 @@ use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; use databend_common_sql::plans::task_schema; +use databend_common_users::UserApiProvider; use crate::table::AsyncOneBlockSystemTable; use crate::table::AsyncSystemTable; +use crate::util::get_owned_task_names; pub fn parse_tasks_to_datablock(tasks: Vec) -> Result { let mut created_on: Vec = Vec::with_capacity(tasks.len()); @@ -120,6 +122,7 @@ impl AsyncSystemTable for TasksTable { ctx: Arc, _push_downs: Option, ) -> Result { + let user_api = UserApiProvider::instance(); let config = GlobalConfig::instance(); if config.query.cloud_control_grpc_server_address.is_none() { return Err(ErrorCode::CloudControlNotEnabled( @@ -130,16 +133,22 @@ impl AsyncSystemTable for TasksTable { let tenant = ctx.get_tenant(); let query_id = ctx.get_id(); let user = ctx.get_current_user()?.identity().display().to_string(); - let available_roles = ctx.get_available_roles().await?; + let all_effective_roles: Vec = ctx + .get_all_effective_roles() + .await? + .into_iter() + .map(|x| x.identity().to_string()) + .collect(); + + let owned_tasks_names = get_owned_task_names(user_api, &tenant, &all_effective_roles).await; + let req = ShowTasksRequest { tenant_id: tenant.tenant_name().to_string(), name_like: "".to_string(), result_limit: 10000, // TODO: use plan.limit pushdown - owners: available_roles - .into_iter() - .map(|x| x.identity().to_string()) - .collect(), + owners: all_effective_roles.clone(), task_ids: vec![], + task_names: owned_tasks_names.clone(), }; let cloud_api = CloudControlApiProvider::instance(); diff --git a/src/query/storages/system/src/util.rs b/src/query/storages/system/src/util.rs index 6f02811d562e0..c2c15c1b8f750 100644 --- a/src/query/storages/system/src/util.rs +++ b/src/query/storages/system/src/util.rs @@ -12,8 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use databend_common_expression::Expr; use databend_common_expression::Scalar; +use databend_common_meta_app::principal::OwnershipObject; +use databend_common_meta_app::tenant::Tenant; +use databend_common_users::UserApiProvider; +use databend_common_users::BUILTIN_ROLE_ACCOUNT_ADMIN; pub fn find_eq_filter(expr: &Expr, visitor: &mut impl FnMut(&str, &Scalar)) { match expr { @@ -107,3 +113,35 @@ pub fn find_lt_filter(expr: &Expr, visitor: &mut impl FnMut(&str, &Scala } } } + +pub(crate) async fn get_owned_task_names( + user_api: Arc, + tenant: &Tenant, + all_effective_roles: &[String], +) -> Vec { + let mut owned_tasks_names = vec![]; + let has_admin_role = all_effective_roles + .iter() + .any(|role| role.to_lowercase() == BUILTIN_ROLE_ACCOUNT_ADMIN); + if has_admin_role { + // Note: In old version databend-query the hashmap maybe empty + let task_ownerships = user_api + .list_tasks_ownerships(tenant) + .await + .unwrap_or_default(); + for (ownership, role) in task_ownerships { + match ownership { + OwnershipObject::Database { .. } => {} + OwnershipObject::Table { .. } => {} + OwnershipObject::Stage { .. } => {} + OwnershipObject::UDF { .. } => {} + OwnershipObject::Task { name } => { + if all_effective_roles.contains(&role) && !owned_tasks_names.contains(&name) { + owned_tasks_names.push(name); + } + } + } + } + } + owned_tasks_names +} diff --git a/src/query/users/src/role_mgr.rs b/src/query/users/src/role_mgr.rs index 63a74fb30817f..5da08a63c2839 100644 --- a/src/query/users/src/role_mgr.rs +++ b/src/query/users/src/role_mgr.rs @@ -101,6 +101,23 @@ impl UserApiProvider { Ok(roles) } + pub async fn list_tasks_ownerships( + &self, + tenant: &Tenant, + ) -> Result> { + let seq_owns = self + .role_api(tenant) + .list_tasks_ownerships() + .await + .map_err(|e| e.add_message_back("(while get ownerships)."))?; + + let roles: HashMap = seq_owns + .into_iter() + .map(|r| (r.data.object, r.data.role)) + .collect(); + Ok(roles) + } + #[async_backtrace::framed] pub async fn exists_role(&self, tenant: &Tenant, role: String) -> Result { match self.get_role(tenant, role).await { diff --git a/src/query/users/src/visibility_checker.rs b/src/query/users/src/visibility_checker.rs index 836b244776f80..3a4d86ad4626b 100644 --- a/src/query/users/src/visibility_checker.rs +++ b/src/query/users/src/visibility_checker.rs @@ -27,7 +27,6 @@ use enumflags2::BitFlags; /// It is used in `SHOW DATABASES` and `SHOW TABLES` statements. pub struct GrantObjectVisibilityChecker { granted_global_udf: bool, - granted_global_task: bool, granted_global_db_table: bool, granted_global_stage: bool, granted_global_read_stage: bool, @@ -38,7 +37,6 @@ pub struct GrantObjectVisibilityChecker { extra_databases: HashSet<(String, String)>, extra_databases_id: HashSet<(String, u64)>, granted_udfs: HashSet, - granted_tasks: HashSet, granted_write_stages: HashSet, granted_read_stages: HashSet, } @@ -46,14 +44,12 @@ pub struct GrantObjectVisibilityChecker { impl GrantObjectVisibilityChecker { pub fn new(user: &UserInfo, available_roles: &Vec) -> Self { let mut granted_global_udf = false; - let mut granted_global_task = false; let mut granted_global_db_table = false; let mut granted_global_stage = false; let mut granted_global_read_stage = false; let mut granted_databases = HashSet::new(); let mut granted_tables = HashSet::new(); let mut granted_udfs = HashSet::new(); - let mut granted_tasks = HashSet::new(); let mut granted_write_stages = HashSet::new(); let mut granted_read_stages = HashSet::new(); let mut extra_databases = HashSet::new(); @@ -99,15 +95,6 @@ impl GrantObjectVisibilityChecker { }, ); - check_privilege( - &mut granted_global_task, - ent.privileges().iter(), - |privilege| { - UserPrivilegeSet::available_privileges_on_task(false) - .has_privilege(privilege) - }, - ); - check_privilege( &mut granted_global_read_stage, ent.privileges().iter(), @@ -146,9 +133,6 @@ impl GrantObjectVisibilityChecker { GrantObject::UDF(udf) => { granted_udfs.insert(udf.to_string()); } - GrantObject::Task(task) => { - granted_tasks.insert(task.to_string()); - } GrantObject::Stage(stage) => { if ent .privileges() @@ -163,13 +147,13 @@ impl GrantObjectVisibilityChecker { granted_read_stages.insert(stage.to_string()); } } + GrantObject::Task(_) => {} } } } Self { granted_global_udf, - granted_global_task, granted_global_db_table, granted_global_stage, granted_global_read_stage, @@ -180,7 +164,6 @@ impl GrantObjectVisibilityChecker { extra_databases, extra_databases_id, granted_udfs, - granted_tasks, granted_write_stages, granted_read_stages, } @@ -219,18 +202,6 @@ impl GrantObjectVisibilityChecker { false } - // TODO: Need to call this check in task_history.rs::get_full_data - pub fn check_task_visibility(&self, task: &str) -> bool { - if self.granted_global_task { - return true; - } - - if self.granted_tasks.contains(task) { - return true; - } - false - } - pub fn check_database_visibility(&self, catalog: &str, db: &str, db_id: u64) -> bool { // skip information_schema privilege check if db.to_lowercase() == "information_schema" || db.to_lowercase() == "system" { diff --git a/tests/suites/0_stateless/18_rbac/18_0011_task.sh b/tests/suites/0_stateless/18_rbac/18_0011_task.sh old mode 100644 new mode 100755 From dceca56b6884db9a2149c93777246868effb42ac Mon Sep 17 00:00:00 2001 From: taichong Date: Mon, 13 May 2024 11:40:50 +0800 Subject: [PATCH 3/8] 1. fix 2. add tasks and task_history to SYSTEM_TABLES_ALLOW_LIST 3. delete useless test --- src/query/ast/src/parser/statement.rs | 2 +- .../service/src/interpreters/access/privilege_access.rs | 4 +++- tests/suites/0_stateless/18_rbac/18_0011_task.result | 0 tests/suites/0_stateless/18_rbac/18_0011_task.sh | 8 -------- 4 files changed, 4 insertions(+), 10 deletions(-) delete mode 100644 tests/suites/0_stateless/18_rbac/18_0011_task.result delete mode 100755 tests/suites/0_stateless/18_rbac/18_0011_task.sh diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index b4435e8bd3368..f5c64adc909dd 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -2861,7 +2861,7 @@ pub fn priv_type(i: Input) -> IResult { UserPrivilegeType::CreateDatabase, rule! { CREATE ~ DATABASE }, ), - value(UserPrivilegeType::CreateDatabase, rule! { CREATE ~ TASK }), + value(UserPrivilegeType::CreateTask, rule! { CREATE ~ TASK }), value(UserPrivilegeType::DropUser, rule! { DROP ~ USER }), value(UserPrivilegeType::CreateRole, rule! { CREATE ~ ROLE }), value(UserPrivilegeType::DropRole, rule! { DROP ~ ROLE }), diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index 011e593af1900..56f4f013f68fd 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -54,7 +54,7 @@ enum ObjectId { // some statements like `SELECT 1`, `SHOW USERS`, `SHOW ROLES`, `SHOW TABLES` will be // rewritten to the queries on the system tables, we need to skip the privilege check on // these tables. -const SYSTEM_TABLES_ALLOW_LIST: [&str; 18] = [ +const SYSTEM_TABLES_ALLOW_LIST: [&str; 20] = [ "catalogs", "columns", "databases", @@ -73,6 +73,8 @@ const SYSTEM_TABLES_ALLOW_LIST: [&str; 18] = [ "processes", "user_functions", "functions", + "tasks", + "task_history", ]; impl PrivilegeAccess { diff --git a/tests/suites/0_stateless/18_rbac/18_0011_task.result b/tests/suites/0_stateless/18_rbac/18_0011_task.result deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/tests/suites/0_stateless/18_rbac/18_0011_task.sh b/tests/suites/0_stateless/18_rbac/18_0011_task.sh deleted file mode 100755 index 481e6a3a06826..0000000000000 --- a/tests/suites/0_stateless/18_rbac/18_0011_task.sh +++ /dev/null @@ -1,8 +0,0 @@ -#!/usr/bin/env bash - -CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -. "$CURDIR"/../../../shell_env.sh - -echo "=== test UDF priv" -export TEST_USER_PASSWORD="password" -export TEST_USER_CONNECT="bendsql --user=test-user --password=password --host=${QUERY_MYSQL_HANDLER_HOST} --port ${QUERY_HTTP_HANDLER_PORT}" From 59eee4a0050cd7e23fb6a0485592b64738442737 Mon Sep 17 00:00:00 2001 From: TCeason Date: Tue, 14 May 2024 18:08:18 +0800 Subject: [PATCH 4/8] fix build err --- src/meta/app/src/principal/user_privilege.rs | 3 +++ src/query/ast/src/ast/statements/principal.rs | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/meta/app/src/principal/user_privilege.rs b/src/meta/app/src/principal/user_privilege.rs index c290e8615bff0..7e1b6f02acf78 100644 --- a/src/meta/app/src/principal/user_privilege.rs +++ b/src/meta/app/src/principal/user_privilege.rs @@ -170,6 +170,9 @@ impl From for UserPrivilegeType { databend_common_ast::ast::UserPrivilegeType::CreateDatabase => { UserPrivilegeType::CreateDatabase } + databend_common_ast::ast::UserPrivilegeType::CreateTask => { + UserPrivilegeType::CreateTask + } databend_common_ast::ast::UserPrivilegeType::Set => UserPrivilegeType::Set, } } diff --git a/src/query/ast/src/ast/statements/principal.rs b/src/query/ast/src/ast/statements/principal.rs index c7f23d1227ff0..89ca90bf28f52 100644 --- a/src/query/ast/src/ast/statements/principal.rs +++ b/src/query/ast/src/ast/statements/principal.rs @@ -156,6 +156,8 @@ pub enum UserPrivilegeType { Write, // Privilege to Create database CreateDatabase, + // Privilege to Create database + CreateTask, // Discard Privilege Type Set, } @@ -184,6 +186,7 @@ impl Display for UserPrivilegeType { UserPrivilegeType::Read => "Read", UserPrivilegeType::Write => "Write", UserPrivilegeType::CreateDatabase => "CREATE DATABASE", + UserPrivilegeType::CreateTask => "CREATE TASK", }) } } From aef34a3ed43cef8098e20caa40e275ce8268a30f Mon Sep 17 00:00:00 2001 From: TCeason Date: Mon, 20 May 2024 11:44:26 +0800 Subject: [PATCH 5/8] fix execute/desc task access bug & tasks table bug --- .../interpreters/access/privilege_access.rs | 10 ++--- .../storages/system/src/task_history_table.rs | 14 ++++++ src/query/storages/system/src/tasks_table.rs | 44 ++++++++++++++++++- 3 files changed, 61 insertions(+), 7 deletions(-) diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index 56f4f013f68fd..e715516c3ad68 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -722,10 +722,9 @@ impl AccessChecker for PrivilegeAccess { } Plan::DescribeTask(plan) => { let session = self.ctx.get_current_session(); - if self + if !self .has_ownership(&session, &GrantObject::Task(plan.task_name.to_owned())) - .await - .is_err() + .await? { self.validate_access(&GrantObject::Global, UserPrivilegeType::Super) .await?; @@ -733,10 +732,9 @@ impl AccessChecker for PrivilegeAccess { } Plan::ExecuteTask(plan) => { let session = self.ctx.get_current_session(); - if self + if !self .has_ownership(&session, &GrantObject::Task(plan.task_name.to_owned())) - .await - .is_err() + .await? { self.validate_access(&GrantObject::Global, UserPrivilegeType::Super) .await?; diff --git a/src/query/storages/system/src/task_history_table.rs b/src/query/storages/system/src/task_history_table.rs index 2b6353e7d4244..58739468ebe9c 100644 --- a/src/query/storages/system/src/task_history_table.rs +++ b/src/query/storages/system/src/task_history_table.rs @@ -43,6 +43,7 @@ use databend_common_meta_app::schema::TableMeta; use databend_common_sql::plans::task_run_schema; use databend_common_users::UserApiProvider; use databend_common_users::BUILTIN_ROLE_ACCOUNT_ADMIN; +use log::info; use crate::table::AsyncOneBlockSystemTable; use crate::table::AsyncSystemTable; @@ -194,10 +195,23 @@ impl AsyncSystemTable for TaskHistoryTable { .any(|role| role.to_lowercase() == BUILTIN_ROLE_ACCOUNT_ADMIN) && !owned_tasks_names.contains(task_name) { + info!( + "--task_history:198 all_effective_roles is {:?}, owned_tasks_names is {:?}, task_name is {:?}", + all_effective_roles.clone(), + owned_tasks_names.clone(), + task_name.clone() + ); return parse_task_runs_to_datablock(vec![]); } } + info!( + "--task_history:203 all_effective_roles is {:?}, owned_tasks_names is {:?}, task_name is {:?}", + all_effective_roles.clone(), + owned_tasks_names.clone(), + task_name.clone() + ); + let req = ShowTaskRunsRequest { tenant_id: tenant.tenant_name().to_string(), scheduled_time_start: scheduled_time_start.unwrap_or("".to_string()), diff --git a/src/query/storages/system/src/tasks_table.rs b/src/query/storages/system/src/tasks_table.rs index 879f2e9daea6f..46085932b6a4b 100644 --- a/src/query/storages/system/src/tasks_table.rs +++ b/src/query/storages/system/src/tasks_table.rs @@ -32,14 +32,20 @@ use databend_common_expression::types::UInt64Type; use databend_common_expression::types::VariantType; use databend_common_expression::DataBlock; use databend_common_expression::FromData; +use databend_common_expression::Scalar; +use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; use databend_common_sql::plans::task_schema; use databend_common_users::UserApiProvider; +use databend_common_users::BUILTIN_ROLE_ACCOUNT_ADMIN; +use log::info; +use crate::parse_task_runs_to_datablock; use crate::table::AsyncOneBlockSystemTable; use crate::table::AsyncSystemTable; +use crate::util::find_eq_filter; use crate::util::get_owned_task_names; pub fn parse_tasks_to_datablock(tasks: Vec) -> Result { @@ -120,7 +126,7 @@ impl AsyncSystemTable for TasksTable { async fn get_full_data( &self, ctx: Arc, - _push_downs: Option, + push_downs: Option, ) -> Result { let user_api = UserApiProvider::instance(); let config = GlobalConfig::instance(); @@ -140,7 +146,43 @@ impl AsyncSystemTable for TasksTable { .map(|x| x.identity().to_string()) .collect(); + let mut task_name = None; + if let Some(push_downs) = push_downs { + if let Some(filter) = push_downs.filters.as_ref().map(|f| &f.filter) { + let expr = filter.as_expr(&BUILTIN_FUNCTIONS); + find_eq_filter(&expr, &mut |col_name, scalar| { + if col_name == "name" { + if let Scalar::String(s) = scalar { + task_name = Some(s.clone()); + } + } + }); + } + } let owned_tasks_names = get_owned_task_names(user_api, &tenant, &all_effective_roles).await; + if let Some(task_name) = &task_name { + // The user does not have admin role and not own the task_name + // Need directly return empty block + if !all_effective_roles + .iter() + .any(|role| role.to_lowercase() == BUILTIN_ROLE_ACCOUNT_ADMIN) + && !owned_tasks_names.contains(task_name) + { + info!( + "--tasks:171 all_effective_roles is {:?}, owned_tasks_names is {:?}, task_name is {:?}", + all_effective_roles.clone(), + owned_tasks_names.clone(), + task_name.clone() + ); + return parse_task_runs_to_datablock(vec![]); + } + } + info!( + "--tasks:175 all_effective_roles is {:?}, owned_tasks_names is {:?}, task_name is {:?}", + all_effective_roles.clone(), + owned_tasks_names.clone(), + task_name.clone() + ); let req = ShowTasksRequest { tenant_id: tenant.tenant_name().to_string(), From 9d9d0d3ade64353cacd17c8716258d9d53651ec8 Mon Sep 17 00:00:00 2001 From: TCeason Date: Mon, 20 May 2024 14:01:35 +0800 Subject: [PATCH 6/8] fix show task bug; --- .../ast/tests/it/testdata/statement-error.txt | 6 +- src/query/ast/tests/it/testdata/statement.txt | 4 +- .../storages/system/src/task_history_table.rs | 91 +++++++++++-------- src/query/storages/system/src/tasks_table.rs | 75 +++++++++------ src/query/storages/system/src/util.rs | 7 +- 5 files changed, 105 insertions(+), 78 deletions(-) diff --git a/src/query/ast/tests/it/testdata/statement-error.txt b/src/query/ast/tests/it/testdata/statement-error.txt index bfd1ed58d0fd4..7760881e18c7e 100644 --- a/src/query/ast/tests/it/testdata/statement-error.txt +++ b/src/query/ast/tests/it/testdata/statement-error.txt @@ -923,7 +923,7 @@ error: --> SQL:1:22 | 1 | GRANT CREATE TASK ON task mytask1 TO role role1 - | ----- ------ ^^^^ unexpected `task`, expecting `FALSE`, , , `TRUE`, `IDENTIFIER`, , , `*`, or + | ----- ------ ^^^^ unexpected `task`, expecting , `FALSE`, , `TRUE`, `IDENTIFIER`, , , `*`, or | | | | | while parsing ON | while parsing `GRANT { ROLE | schemaObjectPrivileges | ALL [ PRIVILEGES ] ON } TO { [ROLE ] | [USER] }` @@ -948,7 +948,7 @@ error: --> SQL:1:17 | 1 | GRANT select ON task MyTask1 TO u2 - | ^^^^ unexpected `task`, expecting `FALSE`, `TABLE`, , , `TRUE`, `IDENTIFIER`, , , `DATABASE`, `*`, or + | ^^^^ unexpected `task`, expecting , `FALSE`, `TABLE`, , `TRUE`, `IDENTIFIER`, , , `DATABASE`, `*`, or ---------- Input ---------- @@ -958,6 +958,6 @@ error: --> SQL:1:16 | 1 | GRANT usage ON task MyTask1 TO u1 - | ^^^^ unexpected `task`, expecting `FALSE`, `TABLE`, , , `TRUE`, `IDENTIFIER`, , , `DATABASE`, `UDF`, `*`, or + | ^^^^ unexpected `task`, expecting , `FALSE`, `TABLE`, , `TRUE`, `IDENTIFIER`, , , `DATABASE`, `UDF`, `*`, or diff --git a/src/query/ast/tests/it/testdata/statement.txt b/src/query/ast/tests/it/testdata/statement.txt index cfe9460e2e149..29778e8129dd9 100644 --- a/src/query/ast/tests/it/testdata/statement.txt +++ b/src/query/ast/tests/it/testdata/statement.txt @@ -18353,13 +18353,13 @@ CreateDynamicTable( ---------- Input ---------- GRANT CREATE TASK ON *.* TO role role1 ---------- Output --------- -GRANT CREATE DATABASE ON *.* TO ROLE 'role1' +GRANT CREATE TASK ON *.* TO ROLE 'role1' ---------- AST ------------ Grant( GrantStmt { source: Privs { privileges: [ - CreateDatabase, + CreateTask, ], level: Global, }, diff --git a/src/query/storages/system/src/task_history_table.rs b/src/query/storages/system/src/task_history_table.rs index 58739468ebe9c..b8c4fff817d2b 100644 --- a/src/query/storages/system/src/task_history_table.rs +++ b/src/query/storages/system/src/task_history_table.rs @@ -186,45 +186,60 @@ impl AsyncSystemTable for TaskHistoryTable { } } - let owned_tasks_names = get_owned_task_names(user_api, &tenant, &all_effective_roles).await; - if let Some(task_name) = &task_name { - // The user does not have admin role and not own the task_name - // Need directly return empty block - if !all_effective_roles - .iter() - .any(|role| role.to_lowercase() == BUILTIN_ROLE_ACCOUNT_ADMIN) - && !owned_tasks_names.contains(task_name) - { - info!( - "--task_history:198 all_effective_roles is {:?}, owned_tasks_names is {:?}, task_name is {:?}", - all_effective_roles.clone(), - owned_tasks_names.clone(), - task_name.clone() - ); - return parse_task_runs_to_datablock(vec![]); + let has_admin_role = all_effective_roles + .iter() + .any(|role| role.to_lowercase() == BUILTIN_ROLE_ACCOUNT_ADMIN); + let req = if has_admin_role { + ShowTaskRunsRequest { + tenant_id: tenant.tenant_name().to_string(), + scheduled_time_start: scheduled_time_start.unwrap_or("".to_string()), + scheduled_time_end: scheduled_time_end.unwrap_or("".to_string()), + task_name: task_name.unwrap_or("".to_string()), + result_limit: result_limit.unwrap_or(0), // 0 means default + error_only: false, + owners: all_effective_roles.clone(), + next_page_token: None, + page_size: None, + previous_page_token: None, + task_ids: vec![], + task_names: vec![], + } + } else { + let owned_tasks_names = + get_owned_task_names(user_api, &tenant, &all_effective_roles, has_admin_role).await; + if let Some(task_name) = &task_name { + // The user does not have admin role and not own the task_name + // Need directly return empty block + if !owned_tasks_names.contains(task_name) { + info!( + "--task_history:215 all_effective_roles is {:?}, owned_tasks_names is {:?}, task_name is {:?}", + all_effective_roles.clone(), + owned_tasks_names.clone(), + task_name.clone() + ); + return parse_task_runs_to_datablock(vec![]); + } + } + info!( + "--task_history:224 all_effective_roles is {:?}, owned_tasks_names is {:?}, task_name is {:?}", + all_effective_roles.clone(), + owned_tasks_names.clone(), + task_name.clone() + ); + ShowTaskRunsRequest { + tenant_id: tenant.tenant_name().to_string(), + scheduled_time_start: scheduled_time_start.unwrap_or("".to_string()), + scheduled_time_end: scheduled_time_end.unwrap_or("".to_string()), + task_name: task_name.unwrap_or("".to_string()), + result_limit: result_limit.unwrap_or(0), // 0 means default + error_only: false, + owners: all_effective_roles.clone(), + next_page_token: None, + page_size: None, + previous_page_token: None, + task_ids: vec![], + task_names: owned_tasks_names.clone(), } - } - - info!( - "--task_history:203 all_effective_roles is {:?}, owned_tasks_names is {:?}, task_name is {:?}", - all_effective_roles.clone(), - owned_tasks_names.clone(), - task_name.clone() - ); - - let req = ShowTaskRunsRequest { - tenant_id: tenant.tenant_name().to_string(), - scheduled_time_start: scheduled_time_start.unwrap_or("".to_string()), - scheduled_time_end: scheduled_time_end.unwrap_or("".to_string()), - task_name: task_name.unwrap_or("".to_string()), - result_limit: result_limit.unwrap_or(0), // 0 means default - error_only: false, - owners: all_effective_roles.clone(), - next_page_token: None, - page_size: None, - previous_page_token: None, - task_ids: vec![], - task_names: owned_tasks_names.clone(), }; let cloud_api = CloudControlApiProvider::instance(); diff --git a/src/query/storages/system/src/tasks_table.rs b/src/query/storages/system/src/tasks_table.rs index 46085932b6a4b..0ea39cab0f36a 100644 --- a/src/query/storages/system/src/tasks_table.rs +++ b/src/query/storages/system/src/tasks_table.rs @@ -159,38 +159,53 @@ impl AsyncSystemTable for TasksTable { }); } } - let owned_tasks_names = get_owned_task_names(user_api, &tenant, &all_effective_roles).await; - if let Some(task_name) = &task_name { - // The user does not have admin role and not own the task_name - // Need directly return empty block - if !all_effective_roles - .iter() - .any(|role| role.to_lowercase() == BUILTIN_ROLE_ACCOUNT_ADMIN) - && !owned_tasks_names.contains(task_name) - { - info!( - "--tasks:171 all_effective_roles is {:?}, owned_tasks_names is {:?}, task_name is {:?}", - all_effective_roles.clone(), - owned_tasks_names.clone(), - task_name.clone() - ); + + let has_admin_role = all_effective_roles + .iter() + .any(|role| role.to_lowercase() == BUILTIN_ROLE_ACCOUNT_ADMIN); + + let req = if has_admin_role { + ShowTasksRequest { + tenant_id: tenant.tenant_name().to_string(), + name_like: "".to_string(), + result_limit: 10000, // TODO: use plan.limit pushdown + owners: all_effective_roles.clone(), + task_ids: vec![], + task_names: vec![], + } + } else { + let owned_tasks_names = + get_owned_task_names(user_api, &tenant, &all_effective_roles, has_admin_role).await; + if let Some(task_name) = &task_name { + // The user does not have admin role and not own the task_name + // Need directly return empty block + if !owned_tasks_names.contains(task_name) { + info!( + "--tasks:184 all_effective_roles is {:?}, owned_tasks_names is {:?}, task_name is {:?}", + all_effective_roles.clone(), + owned_tasks_names.clone(), + task_name.clone() + ); + return parse_task_runs_to_datablock(vec![]); + } + } + info!( + "--tasks:193 all_effective_roles is {:?}, owned_tasks_names is {:?}, task_name is {:?}", + all_effective_roles.clone(), + owned_tasks_names.clone(), + task_name.clone() + ); + if owned_tasks_names.is_empty() { return parse_task_runs_to_datablock(vec![]); } - } - info!( - "--tasks:175 all_effective_roles is {:?}, owned_tasks_names is {:?}, task_name is {:?}", - all_effective_roles.clone(), - owned_tasks_names.clone(), - task_name.clone() - ); - - let req = ShowTasksRequest { - tenant_id: tenant.tenant_name().to_string(), - name_like: "".to_string(), - result_limit: 10000, // TODO: use plan.limit pushdown - owners: all_effective_roles.clone(), - task_ids: vec![], - task_names: owned_tasks_names.clone(), + ShowTasksRequest { + tenant_id: tenant.tenant_name().to_string(), + name_like: "".to_string(), + result_limit: 10000, // TODO: use plan.limit pushdown + owners: all_effective_roles.clone(), + task_ids: vec![], + task_names: owned_tasks_names.clone(), + } }; let cloud_api = CloudControlApiProvider::instance(); diff --git a/src/query/storages/system/src/util.rs b/src/query/storages/system/src/util.rs index c2c15c1b8f750..81aa3a8a137c9 100644 --- a/src/query/storages/system/src/util.rs +++ b/src/query/storages/system/src/util.rs @@ -19,7 +19,6 @@ use databend_common_expression::Scalar; use databend_common_meta_app::principal::OwnershipObject; use databend_common_meta_app::tenant::Tenant; use databend_common_users::UserApiProvider; -use databend_common_users::BUILTIN_ROLE_ACCOUNT_ADMIN; pub fn find_eq_filter(expr: &Expr, visitor: &mut impl FnMut(&str, &Scalar)) { match expr { @@ -118,12 +117,10 @@ pub(crate) async fn get_owned_task_names( user_api: Arc, tenant: &Tenant, all_effective_roles: &[String], + has_admin_role: bool, ) -> Vec { let mut owned_tasks_names = vec![]; - let has_admin_role = all_effective_roles - .iter() - .any(|role| role.to_lowercase() == BUILTIN_ROLE_ACCOUNT_ADMIN); - if has_admin_role { + if !has_admin_role { // Note: In old version databend-query the hashmap maybe empty let task_ownerships = user_api .list_tasks_ownerships(tenant) From 614460227c4761946c6978cdaf793f04e348a4a2 Mon Sep 17 00:00:00 2001 From: TCeason Date: Mon, 20 May 2024 14:57:47 +0800 Subject: [PATCH 7/8] consider plan.if_exists --- .../interpreters/access/privilege_access.rs | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index e715516c3ad68..dfd593b2ba6b0 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -18,6 +18,10 @@ use std::sync::Arc; use databend_common_catalog::catalog::Catalog; use databend_common_catalog::plan::DataSourceInfo; use databend_common_catalog::table_context::TableContext; +use databend_common_cloud_control::client_config::make_request; +use databend_common_cloud_control::cloud_api::CloudControlApiProvider; +use databend_common_cloud_control::pb::DescribeTaskRequest; +use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::principal::GrantObject; @@ -38,6 +42,7 @@ use databend_common_users::RoleCacheManager; use databend_common_users::UserApiProvider; use crate::interpreters::access::AccessChecker; +use crate::interpreters::common::get_task_client_config; use crate::sessions::QueryContext; use crate::sessions::Session; use crate::sql::plans::Plan; @@ -701,6 +706,18 @@ impl AccessChecker for PrivilegeAccess { } } Plan::AlterTask(plan) => { + if plan.if_exists + && !check_task_exists( + &self.ctx, + plan.task_name.to_string(), + plan.task_name.to_string(), + ) + .await? + { + // has if exists tag, but task not exists, should skip priv check. + return Ok(()); + } + if self .validate_task_access(&plan.task_name, UserPrivilegeType::Alter) .await @@ -711,6 +728,17 @@ impl AccessChecker for PrivilegeAccess { } } Plan::DropTask(plan) => { + if plan.if_exists + && !check_task_exists( + &self.ctx, + plan.task_name.to_string(), + plan.task_name.to_string(), + ) + .await? + { + // has if exists tag, but task not exists, should skip priv check. + return Ok(()); + } if self .validate_task_access(&plan.task_name, UserPrivilegeType::Drop) .await @@ -1576,3 +1604,27 @@ async fn has_priv( } })) } + +async fn check_task_exists( + ctx: &Arc, + task_name: String, + tenant_id: String, +) -> Result { + let config = GlobalConfig::instance(); + if config.query.cloud_control_grpc_server_address.is_none() { + return Err(ErrorCode::CloudControlNotEnabled( + "cannot describe task without cloud control enabled, please set cloud_control_grpc_server_address in config", + )); + } + let cloud_api = CloudControlApiProvider::instance(); + let task_client = cloud_api.get_task_client(); + let req = DescribeTaskRequest { + task_name, + tenant_id, + if_exist: false, + }; + let config = get_task_client_config(ctx.clone(), cloud_api.get_timeout())?; + let req = make_request(req, config); + let resp = task_client.describe_task(req).await?; + Ok(resp.task.is_some()) +} From 978cf42b75d364a550f33ce54c7da6d0d7bc35f5 Mon Sep 17 00:00:00 2001 From: TCeason Date: Mon, 20 May 2024 15:15:30 +0800 Subject: [PATCH 8/8] super priv can list all tasks and task_history --- .../storages/system/src/task_history_table.rs | 24 +++++++--------- src/query/storages/system/src/tasks_table.rs | 28 +++++++------------ 2 files changed, 20 insertions(+), 32 deletions(-) diff --git a/src/query/storages/system/src/task_history_table.rs b/src/query/storages/system/src/task_history_table.rs index b8c4fff817d2b..10f4c80b7dec2 100644 --- a/src/query/storages/system/src/task_history_table.rs +++ b/src/query/storages/system/src/task_history_table.rs @@ -37,13 +37,14 @@ use databend_common_expression::DataBlock; use databend_common_expression::FromData; use databend_common_expression::Scalar; use databend_common_functions::BUILTIN_FUNCTIONS; +use databend_common_meta_app::principal::GrantObject; +use databend_common_meta_app::principal::UserPrivilegeType; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; use databend_common_sql::plans::task_run_schema; use databend_common_users::UserApiProvider; use databend_common_users::BUILTIN_ROLE_ACCOUNT_ADMIN; -use log::info; use crate::table::AsyncOneBlockSystemTable; use crate::table::AsyncSystemTable; @@ -189,7 +190,11 @@ impl AsyncSystemTable for TaskHistoryTable { let has_admin_role = all_effective_roles .iter() .any(|role| role.to_lowercase() == BUILTIN_ROLE_ACCOUNT_ADMIN); - let req = if has_admin_role { + let has_super_priv = ctx + .get_current_user()? + .grants + .verify_privilege(&GrantObject::Global, UserPrivilegeType::Super); + let req = if has_admin_role || has_super_priv { ShowTaskRunsRequest { tenant_id: tenant.tenant_name().to_string(), scheduled_time_start: scheduled_time_start.unwrap_or("".to_string()), @@ -207,25 +212,16 @@ impl AsyncSystemTable for TaskHistoryTable { } else { let owned_tasks_names = get_owned_task_names(user_api, &tenant, &all_effective_roles, has_admin_role).await; + if owned_tasks_names.is_empty() { + return parse_task_runs_to_datablock(vec![]); + } if let Some(task_name) = &task_name { // The user does not have admin role and not own the task_name // Need directly return empty block if !owned_tasks_names.contains(task_name) { - info!( - "--task_history:215 all_effective_roles is {:?}, owned_tasks_names is {:?}, task_name is {:?}", - all_effective_roles.clone(), - owned_tasks_names.clone(), - task_name.clone() - ); return parse_task_runs_to_datablock(vec![]); } } - info!( - "--task_history:224 all_effective_roles is {:?}, owned_tasks_names is {:?}, task_name is {:?}", - all_effective_roles.clone(), - owned_tasks_names.clone(), - task_name.clone() - ); ShowTaskRunsRequest { tenant_id: tenant.tenant_name().to_string(), scheduled_time_start: scheduled_time_start.unwrap_or("".to_string()), diff --git a/src/query/storages/system/src/tasks_table.rs b/src/query/storages/system/src/tasks_table.rs index 0ea39cab0f36a..ea59492266abc 100644 --- a/src/query/storages/system/src/tasks_table.rs +++ b/src/query/storages/system/src/tasks_table.rs @@ -34,13 +34,14 @@ use databend_common_expression::DataBlock; use databend_common_expression::FromData; use databend_common_expression::Scalar; use databend_common_functions::BUILTIN_FUNCTIONS; +use databend_common_meta_app::principal::GrantObject; +use databend_common_meta_app::principal::UserPrivilegeType; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; use databend_common_sql::plans::task_schema; use databend_common_users::UserApiProvider; use databend_common_users::BUILTIN_ROLE_ACCOUNT_ADMIN; -use log::info; use crate::parse_task_runs_to_datablock; use crate::table::AsyncOneBlockSystemTable; @@ -163,8 +164,11 @@ impl AsyncSystemTable for TasksTable { let has_admin_role = all_effective_roles .iter() .any(|role| role.to_lowercase() == BUILTIN_ROLE_ACCOUNT_ADMIN); - - let req = if has_admin_role { + let has_super_priv = ctx + .get_current_user()? + .grants + .verify_privilege(&GrantObject::Global, UserPrivilegeType::Super); + let req = if has_admin_role || has_super_priv { ShowTasksRequest { tenant_id: tenant.tenant_name().to_string(), name_like: "".to_string(), @@ -176,28 +180,16 @@ impl AsyncSystemTable for TasksTable { } else { let owned_tasks_names = get_owned_task_names(user_api, &tenant, &all_effective_roles, has_admin_role).await; + if owned_tasks_names.is_empty() { + return parse_task_runs_to_datablock(vec![]); + } if let Some(task_name) = &task_name { // The user does not have admin role and not own the task_name // Need directly return empty block if !owned_tasks_names.contains(task_name) { - info!( - "--tasks:184 all_effective_roles is {:?}, owned_tasks_names is {:?}, task_name is {:?}", - all_effective_roles.clone(), - owned_tasks_names.clone(), - task_name.clone() - ); return parse_task_runs_to_datablock(vec![]); } } - info!( - "--tasks:193 all_effective_roles is {:?}, owned_tasks_names is {:?}, task_name is {:?}", - all_effective_roles.clone(), - owned_tasks_names.clone(), - task_name.clone() - ); - if owned_tasks_names.is_empty() { - return parse_task_runs_to_datablock(vec![]); - } ShowTasksRequest { tenant_id: tenant.tenant_name().to_string(), name_like: "".to_string(),