diff --git a/Cargo.lock b/Cargo.lock index e892d62b79e4e..908c6aaa8d8b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3580,6 +3580,10 @@ version = "0.1.0" dependencies = [ "async-backtrace", "async-trait", + "chrono", + "chrono-tz 0.8.6", + "cron", + "databend-common-ast", "databend-common-base", "databend-common-exception", "databend-common-expression", @@ -3601,6 +3605,7 @@ dependencies = [ "serde", "serde_json", "thiserror 1.0.69", + "tokio", ] [[package]] @@ -5217,6 +5222,7 @@ dependencies = [ "chrono", "chrono-tz 0.8.6", "concurrent-queue", + "cron", "ctor", "dashmap 6.1.0", "databend-common-ast", @@ -5351,6 +5357,7 @@ dependencies = [ "tempfile", "tokio", "tokio-stream", + "tokio-util", "toml 0.8.22", "tonic", "tower 0.5.2", diff --git a/src/binaries/query/entry.rs b/src/binaries/query/entry.rs index 29cae01359ad0..fec1cfdde5559 100644 --- a/src/binaries/query/entry.rs +++ b/src/binaries/query/entry.rs @@ -44,6 +44,7 @@ use databend_query::servers::MySQLHandler; use databend_query::servers::MySQLTlsConfig; use databend_query::servers::Server; use databend_query::servers::ShutdownHandle; +use databend_query::task::TaskService; use databend_query::GlobalServices; use log::info; @@ -302,6 +303,7 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> { } println!(" system history tables: {}", conf.log.history); } + TaskService::instance().initialized(); println!(); println!( diff --git a/src/common/exception/src/exception_code.rs b/src/common/exception/src/exception_code.rs index 85f76ceca368b..803c8311b9a32 100644 --- a/src/common/exception/src/exception_code.rs +++ b/src/common/exception/src/exception_code.rs @@ -391,6 +391,22 @@ build_exceptions! { UDFDataError(2607), } +// Task Errors [2611-2614] +build_exceptions! { + /// Unknown Task + UnknownTask(2611), + /// Task already exists + TaskAlreadyExists(2612), + /// Task timezone invalid + TaskTimezoneInvalid(2613), + /// Task cron invalid + TaskCronInvalid(2614), + /// Task schedule and after conflict + TaskScheduleAndAfterConflict(2615), + /// Task when condition not met + TaskWhenConditionNotMet(2616), +} + // Search and External Service Errors [1901-1903, 1910] build_exceptions! { /// Tantivy error diff --git a/src/meta/api/src/kv_pb_api/mod.rs b/src/meta/api/src/kv_pb_api/mod.rs index 71e7a5356a8d4..ab46c591e140d 100644 --- a/src/meta/api/src/kv_pb_api/mod.rs +++ b/src/meta/api/src/kv_pb_api/mod.rs @@ -43,7 +43,7 @@ use futures::TryStreamExt; use itertools::Itertools; pub(crate) use self::codec::decode_non_empty_item; -pub(crate) use self::codec::decode_seqv; +pub use self::codec::decode_seqv; pub(crate) use self::codec::decode_transition; pub(crate) use self::codec::encode_operation; pub use self::upsert_pb::UpsertPB; diff --git a/src/meta/app/src/principal/mod.rs b/src/meta/app/src/principal/mod.rs index 1159a9df46cb5..e802a84b276ed 100644 --- a/src/meta/app/src/principal/mod.rs +++ b/src/meta/app/src/principal/mod.rs @@ -48,6 +48,9 @@ pub mod procedure_id_to_name; pub mod procedure_identity; pub mod procedure_name_ident; pub mod stage_file_ident; +pub mod task; +pub mod task_ident; +pub mod task_message_ident; pub mod tenant_ownership_object_ident; pub mod tenant_user_ident; pub mod user_defined_file_format_ident; @@ -86,6 +89,16 @@ pub use role_info::RoleInfo; pub use role_info::RoleInfoSerdeError; pub use stage_file_ident::StageFileIdent; pub use stage_file_path::StageFilePath; +pub use task::ScheduleOptions; +pub use task::ScheduleType; +pub use task::State; +pub use task::Status; +pub use task::Task; +pub use task::TaskMessage; +pub use task::TaskRun; +pub use task::WarehouseOptions; +pub use task_ident::TaskIdent; +pub use task_ident::TaskIdentRaw; pub use tenant_ownership_object_ident::TenantOwnershipObjectIdent; pub use tenant_user_ident::TenantUserIdent; pub use udf_ident::UdfIdent; diff --git a/src/meta/app/src/principal/task.rs b/src/meta/app/src/principal/task.rs new file mode 100644 index 0000000000000..6929e3b3dcc34 --- /dev/null +++ b/src/meta/app/src/principal/task.rs @@ -0,0 +1,138 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use chrono::DateTime; +use chrono::Utc; + +pub const EMPTY_TASK_ID: u64 = 0; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub enum ScheduleType { + IntervalType = 0, + CronType = 1, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub enum Status { + Suspended = 0, + Started = 1, +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum State { + Scheduled = 0, + Executing = 1, + Succeeded = 2, + Failed = 3, + Cancelled = 4, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct ScheduleOptions { + pub interval: Option, + pub cron: Option, + pub time_zone: Option, + pub schedule_type: ScheduleType, + pub milliseconds_interval: Option, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct WarehouseOptions { + pub warehouse: Option, + pub using_warehouse_size: Option, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct Task { + pub task_id: u64, + pub task_name: String, + pub query_text: String, + pub when_condition: Option, + pub after: Vec, + pub comment: Option, + // expired useless + pub owner: String, + pub owner_user: String, + pub schedule_options: Option, + pub warehouse_options: Option, + pub next_scheduled_at: Option>, + pub suspend_task_after_num_failures: Option, + // TODO + pub error_integration: Option, + pub status: Status, + pub created_at: DateTime, + pub updated_at: DateTime, + pub last_suspended_at: Option>, + // TODO + pub session_params: BTreeMap, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct TaskRun { + pub task: Task, + pub run_id: u64, + pub attempt_number: i32, + pub state: State, + pub scheduled_at: DateTime, + pub completed_at: Option>, + pub error_code: i64, + pub error_message: Option, + // expired useless + pub root_task_id: u64, +} + +impl TaskRun { + pub fn key(&self) -> String { + format!("{}@{}", self.task.task_name, self.run_id) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub enum TaskMessage { + ExecuteTask(Task), + ScheduleTask(Task), + DeleteTask(String), + AfterTask(Task), +} + +impl TaskMessage { + pub fn task_name(&self) -> &str { + match self { + TaskMessage::ExecuteTask(task) + | TaskMessage::ScheduleTask(task) + | TaskMessage::AfterTask(task) => task.task_name.as_str(), + TaskMessage::DeleteTask(task_name) => task_name.as_str(), + } + } + + pub fn key(&self) -> String { + let ty = match self { + TaskMessage::ExecuteTask(_) => 0, + TaskMessage::ScheduleTask(_) => 1, + TaskMessage::DeleteTask(_) => 2, + TaskMessage::AfterTask(_) => 3, + }; + format!("{}-{}-{}", TaskMessage::prefix(), ty, self.task_name()) + } + + pub fn prefix() -> i64 { + 0 + } + + pub fn prefix_range() -> (i64, i64) { + (0, 1) + } +} diff --git a/src/meta/app/src/principal/task_ident.rs b/src/meta/app/src/principal/task_ident.rs new file mode 100644 index 0000000000000..45722cfb489b6 --- /dev/null +++ b/src/meta/app/src/principal/task_ident.rs @@ -0,0 +1,45 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::tenant_key::ident::TIdent; + +pub type TaskIdent = TIdent; + +pub type TaskIdentRaw = TIdent; + +pub use kvapi_impl::Resource; + +mod kvapi_impl { + use databend_common_meta_kvapi::kvapi; + + use crate::principal::task::Task; + use crate::principal::task_ident::TaskIdent; + use crate::tenant_key::resource::TenantResource; + + pub struct Resource; + impl TenantResource for Resource { + const PREFIX: &'static str = "__fd_tasks"; + const TYPE: &'static str = "TaskIdent"; + const HAS_TENANT: bool = true; + type ValueType = Task; + } + + impl kvapi::Value for Task { + type KeyType = TaskIdent; + + fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator { + [] + } + } +} diff --git a/src/meta/app/src/principal/task_message_ident.rs b/src/meta/app/src/principal/task_message_ident.rs new file mode 100644 index 0000000000000..c4360ebdfb67b --- /dev/null +++ b/src/meta/app/src/principal/task_message_ident.rs @@ -0,0 +1,45 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::tenant_key::ident::TIdent; + +pub type TaskMessageIdent = TIdent; + +pub type TaskMessageIdentRaw = TIdent; + +pub use kvapi_impl::Resource; + +mod kvapi_impl { + use databend_common_meta_kvapi::kvapi; + + use crate::principal::task::TaskMessage; + use crate::principal::task_message_ident::TaskMessageIdent; + use crate::tenant_key::resource::TenantResource; + + pub struct Resource; + impl TenantResource for Resource { + const PREFIX: &'static str = "__fd_task_messages"; + const TYPE: &'static str = "TaskMessageIdent"; + const HAS_TENANT: bool = true; + type ValueType = TaskMessage; + } + + impl kvapi::Value for TaskMessage { + type KeyType = TaskMessageIdent; + + fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator { + [] + } + } +} diff --git a/src/meta/cache/src/lib.rs b/src/meta/cache/src/lib.rs index d2f52b73d9942..efcc947f7469c 100644 --- a/src/meta/cache/src/lib.rs +++ b/src/meta/cache/src/lib.rs @@ -96,6 +96,6 @@ mod cache; mod meta_cache_types; -mod meta_client_source; +pub mod meta_client_source; pub use cache::Cache; diff --git a/src/meta/proto-conv/src/lib.rs b/src/meta/proto-conv/src/lib.rs index 34721ea6d4f82..b3225879b59dd 100644 --- a/src/meta/proto-conv/src/lib.rs +++ b/src/meta/proto-conv/src/lib.rs @@ -81,6 +81,7 @@ mod schema_from_to_protobuf_impl; mod sequence_from_to_protobuf_impl; mod stage_from_to_protobuf_impl; mod table_from_to_protobuf_impl; +mod task_from_to_protobuf_impl; mod tenant_quota_from_to_protobuf_impl; mod tident_from_to_protobuf_impl; mod token_from_to_protobuf_impl; diff --git a/src/meta/proto-conv/src/task_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/task_from_to_protobuf_impl.rs new file mode 100644 index 0000000000000..bc8ea4558330d --- /dev/null +++ b/src/meta/proto-conv/src/task_from_to_protobuf_impl.rs @@ -0,0 +1,181 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::DateTime; +use chrono::Utc; +use databend_common_meta_app::principal as mt; +use databend_common_meta_app::principal::task::Status; +use databend_common_protos::pb; +use databend_common_protos::pb::task_message::Message; + +use crate::reader_check_msg; +use crate::FromToProto; +use crate::Incompatible; +use crate::MIN_READER_VER; +use crate::VER; + +impl FromToProto for mt::Task { + type PB = pb::Task; + fn get_pb_ver(p: &Self::PB) -> u64 { + p.ver + } + fn from_pb(p: pb::Task) -> Result { + reader_check_msg(p.ver, p.min_reader_ver)?; + + let status = match p.status { + 0 => Status::Suspended, + 1 => Status::Started, + s => { + return Err(Incompatible::new(format!("Status can not be {s}"))); + } + }; + let schedule = match p.schedule_options { + None => None, + Some(ref s) => { + if !p.after.is_empty() { + None + } else { + let schedule_type = match s.schedule_type { + 0 => mt::ScheduleType::IntervalType, + 1 => mt::ScheduleType::CronType, + s => { + return Err(Incompatible::new(format!("ScheduleType can not be {s}"))); + } + }; + + Some(mt::ScheduleOptions { + interval: s.interval, + cron: s.cron.clone(), + time_zone: s.time_zone.clone(), + schedule_type, + milliseconds_interval: s.milliseconds_interval, + }) + } + } + }; + + let warehouse = p.warehouse_options.as_ref().map(|w| mt::WarehouseOptions { + warehouse: w.warehouse.clone(), + using_warehouse_size: w.using_warehouse_size.clone(), + }); + Ok(Self { + task_id: p.task_id, + task_name: p.task_name, + query_text: p.query_text, + when_condition: p.when_condition.clone(), + after: p.after, + comment: p.comment, + owner: p.owner, + owner_user: p.owner_user, + schedule_options: schedule, + warehouse_options: warehouse, + next_scheduled_at: match p.next_scheduled_at { + Some(c) => Some(DateTime::::from_pb(c)?), + None => None, + }, + suspend_task_after_num_failures: p.suspend_task_after_num_failures.map(|v| v as u64), + error_integration: p.error_integration.clone(), + status, + created_at: DateTime::::from_pb(p.created_at)?, + updated_at: DateTime::::from_pb(p.updated_at)?, + last_suspended_at: match p.last_suspended_at { + Some(c) => Some(DateTime::::from_pb(c)?), + None => None, + }, + session_params: p.session_parameters, + }) + } + + fn to_pb(&self) -> Result { + Ok(pb::Task { + ver: VER, + min_reader_ver: MIN_READER_VER, + task_id: self.task_id, + task_name: self.task_name.clone(), + query_text: self.query_text.clone(), + comment: self.comment.clone(), + owner: self.owner.clone(), + schedule_options: self.schedule_options.as_ref().map(|s| pb::ScheduleOptions { + interval: s.interval, + cron: s.cron.clone(), + time_zone: s.time_zone.clone(), + schedule_type: s.schedule_type as i32, + milliseconds_interval: s.milliseconds_interval, + }), + warehouse_options: self + .warehouse_options + .as_ref() + .map(|w| pb::WarehouseOptions { + warehouse: w.warehouse.clone(), + using_warehouse_size: w.using_warehouse_size.clone(), + }), + next_scheduled_at: match &self.next_scheduled_at { + None => None, + Some(d) => Some(d.to_pb()?), + }, + suspend_task_after_num_failures: self.suspend_task_after_num_failures.map(|v| v as i32), + status: self.status as i32, + created_at: self.created_at.to_pb()?, + updated_at: self.updated_at.to_pb()?, + last_suspended_at: match &self.last_suspended_at { + None => None, + Some(d) => Some(d.to_pb()?), + }, + after: self.after.clone(), + when_condition: self.when_condition.clone(), + session_parameters: self.session_params.clone(), + error_integration: self.error_integration.clone(), + owner_user: self.owner_user.clone(), + }) + } +} + +impl FromToProto for mt::TaskMessage { + type PB = pb::TaskMessage; + fn get_pb_ver(p: &Self::PB) -> u64 { + p.ver + } + + fn from_pb(p: Self::PB) -> Result + where Self: Sized { + Ok(match p.message { + None => return Err(Incompatible::new("message can not be empty")), + Some(message) => match message { + Message::ExecuteTask(task) => { + mt::TaskMessage::ExecuteTask(mt::Task::from_pb(task)?) + } + Message::ScheduleTask(task) => { + mt::TaskMessage::ScheduleTask(mt::Task::from_pb(task)?) + } + Message::DeleteTask(task_name) => mt::TaskMessage::DeleteTask(task_name), + Message::AfterTask(task) => mt::TaskMessage::AfterTask(mt::Task::from_pb(task)?), + }, + }) + } + + fn to_pb(&self) -> Result { + let message = match self { + mt::TaskMessage::ExecuteTask(task) => Message::ExecuteTask(task.to_pb()?), + mt::TaskMessage::ScheduleTask(task) => Message::ScheduleTask(task.to_pb()?), + mt::TaskMessage::DeleteTask(task_name) => Message::DeleteTask(task_name.clone()), + mt::TaskMessage::AfterTask(task) => Message::AfterTask(task.to_pb()?), + }; + + Ok(pb::TaskMessage { + ver: VER, + min_reader_ver: MIN_READER_VER, + message: Some(message), + }) + } +} diff --git a/src/meta/protos/proto/task.proto b/src/meta/protos/proto/task.proto new file mode 100644 index 0000000000000..157023fe4d36a --- /dev/null +++ b/src/meta/protos/proto/task.proto @@ -0,0 +1,74 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package databend_proto; + +message WarehouseOptions { + optional string warehouse = 1; // warehouse or using_warehouse_size + optional string using_warehouse_size = 2; +} + +message ScheduleOptions { + enum ScheduleType { + interval_type = 0; + cron_type = 1; + } + optional int32 interval = 1; // secs, INTERVAL= '5 second' means execute sql every 5 secs + optional string cron = 2; // CRON = '0 2 * * *' means Every night at 2 AM. UTC time zone. + optional string time_zone = 3; // "UTC..." + ScheduleType schedule_type = 4; + optional uint64 milliseconds_interval = 5; // milliseconds level interval +} + +message TaskMessage { + uint64 ver = 100; + uint64 min_reader_ver = 101; + + oneof message { + Task execute_task = 1; + Task schedule_task = 2; + string delete_task = 3; + Task after_task = 4; + } +} + +message Task { + uint64 ver = 100; + uint64 min_reader_ver = 101; + + enum Status { + Suspended = 0; + Started = 1; + } + uint64 task_id = 1; + string task_name = 2; + string query_text = 4; + optional string comment = 5; + string owner = 6; + ScheduleOptions schedule_options = 7; + WarehouseOptions warehouse_options = 8; + optional string next_scheduled_at = 9; + optional int32 suspend_task_after_num_failures = 10; //SUSPEND_TASK_AFTER_NUM_FAILURES + Status status = 12; + string created_at = 14; // RFC 3339 format time + string updated_at = 15; + optional string last_suspended_at = 16; + repeated string after = 17; + optional string when_condition = 18; + map session_parameters = 19; + optional string error_integration = 20; + string owner_user = 21; +} diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index ec18c472b5241..08e92ea4cc9be 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -1937,6 +1937,12 @@ pub struct QueryConfig { #[clap(long, value_name = "VALUE", default_value = "50")] pub max_cached_queries_profiles: usize, + #[clap(long, value_name = "VALUE", default_value = "false")] + pub enable_private_task: bool, + + #[clap(long, value_name = "VALUE", default_value = "1024")] + pub tasks_channel_len: usize, + /// A list of network that not to be checked by network policy. #[clap(long, value_name = "VALUE")] pub network_policy_whitelist: Vec, @@ -2040,6 +2046,8 @@ impl TryInto for QueryConfig { cloud_control_grpc_server_address: self.cloud_control_grpc_server_address, cloud_control_grpc_timeout: self.cloud_control_grpc_timeout, max_cached_queries_profiles: self.max_cached_queries_profiles, + enable_private_task: self.enable_private_task, + tasks_channel_len: self.tasks_channel_len, network_policy_whitelist: self.network_policy_whitelist, settings: self .settings @@ -2152,6 +2160,8 @@ impl From for QueryConfig { cloud_control_grpc_server_address: inner.cloud_control_grpc_server_address, cloud_control_grpc_timeout: inner.cloud_control_grpc_timeout, max_cached_queries_profiles: inner.max_cached_queries_profiles, + enable_private_task: inner.enable_private_task, + tasks_channel_len: inner.tasks_channel_len, network_policy_whitelist: inner.network_policy_whitelist, settings: HashMap::new(), resources_management: None, diff --git a/src/query/config/src/inner.rs b/src/query/config/src/inner.rs index 589f1ff424116..70fe721e6d29f 100644 --- a/src/query/config/src/inner.rs +++ b/src/query/config/src/inner.rs @@ -252,6 +252,9 @@ pub struct QueryConfig { pub cloud_control_grpc_timeout: u64, pub max_cached_queries_profiles: usize, + pub enable_private_task: bool, + pub tasks_channel_len: usize, + pub network_policy_whitelist: Vec, pub settings: HashMap, @@ -340,6 +343,8 @@ impl Default for QueryConfig { cloud_control_grpc_timeout: 0, data_retention_time_in_days_max: 90, max_cached_queries_profiles: 50, + enable_private_task: false, + tasks_channel_len: 1024, network_policy_whitelist: Vec::new(), settings: HashMap::new(), resources_management: None, diff --git a/src/query/management/Cargo.toml b/src/query/management/Cargo.toml index b69d9c56d9f7c..2928dd6a369d9 100644 --- a/src/query/management/Cargo.toml +++ b/src/query/management/Cargo.toml @@ -9,6 +9,10 @@ edition = { workspace = true } [dependencies] async-backtrace = { workspace = true } async-trait = { workspace = true } +chrono = { workspace = true } +chrono-tz = { workspace = true } +cron = { workspace = true } +databend-common-ast = { workspace = true } databend-common-base = { workspace = true } databend-common-exception = { workspace = true } databend-common-functions = { workspace = true } @@ -29,6 +33,7 @@ prost = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true } [dev-dependencies] databend-common-expression = { workspace = true } diff --git a/src/query/management/src/lib.rs b/src/query/management/src/lib.rs index a25051d426089..a088eb6702d23 100644 --- a/src/query/management/src/lib.rs +++ b/src/query/management/src/lib.rs @@ -30,6 +30,7 @@ mod warehouse; mod client_session; pub mod errors; mod procedure; +pub mod task; mod workload; pub use client_session::ClientSessionMgr; @@ -48,6 +49,7 @@ pub use serde::serialize_struct; pub use setting::SettingMgr; pub use stage::StageApi; pub use stage::StageMgr; +pub use task::TaskMgr; pub use user::UserApi; pub use user::UserMgr; pub use warehouse::SelectedNode; diff --git a/src/query/management/src/task/errors.rs b/src/query/management/src/task/errors.rs new file mode 100644 index 0000000000000..703728729ad1c --- /dev/null +++ b/src/query/management/src/task/errors.rs @@ -0,0 +1,122 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; + +use databend_common_exception::ErrorCode; +use databend_common_meta_types::MetaError; + +use crate::errors::TenantError; + +/// Task logic error, unrelated to the backend service providing Task management, or dependent component. +#[derive(Clone, Debug, thiserror::Error)] +pub enum TaskError { + // NOTE: do not expose tenant in a for-user error message. + #[error("Task not found: '{name}'; {context}")] + NotFound { + tenant: String, + name: String, + context: String, + }, + + // NOTE: do not expose tenant in a for-user error message. + #[error("Task already exists: '{name}'; {reason}")] + Exists { + tenant: String, + name: String, + reason: String, + }, + + // NOTE: do not expose tenant in a for-user error message. + #[error("Task timezone invalid: '{name}'; {reason}")] + InvalidTimezone { + tenant: String, + name: String, + reason: String, + }, + + // NOTE: do not expose tenant in a for-user error message. + #[error("Task cron invalid: '{name}'; {reason}")] + InvalidCron { + tenant: String, + name: String, + reason: String, + }, + + #[error("Task cannot have both `SCHEDULE` and `AFTER`: '{name}'")] + ScheduleAndAfterConflict { tenant: String, name: String }, +} + +impl From for ErrorCode { + fn from(value: TaskError) -> Self { + let s = value.to_string(); + match value { + TaskError::NotFound { .. } => ErrorCode::UnknownTask(s), + TaskError::Exists { .. } => ErrorCode::TaskAlreadyExists(s), + TaskError::InvalidTimezone { .. } => ErrorCode::TaskAlreadyExists(s), + TaskError::InvalidCron { .. } => ErrorCode::TaskCronInvalid(s), + TaskError::ScheduleAndAfterConflict { .. } => { + ErrorCode::TaskScheduleAndAfterConflict(s) + } + } + } +} + +/// The error occurred during accessing API providing Task management. +#[derive(Clone, Debug, thiserror::Error)] +pub enum TaskApiError { + #[error("TenantError: '{0}'")] + TenantError(#[from] TenantError), + + #[error("MetaService error: {meta_err}; {context}")] + MetaError { + meta_err: MetaError, + context: String, + }, +} + +impl From for TaskApiError { + fn from(meta_err: MetaError) -> Self { + TaskApiError::MetaError { + meta_err, + context: "".to_string(), + } + } +} + +impl From for ErrorCode { + fn from(value: TaskApiError) -> Self { + match value { + TaskApiError::TenantError(e) => ErrorCode::from(e), + TaskApiError::MetaError { meta_err, context } => { + ErrorCode::from(meta_err).add_message_back(context) + } + } + } +} + +impl TaskApiError { + pub fn append_context(self, context: impl Display) -> Self { + match self { + TaskApiError::TenantError(e) => TaskApiError::TenantError(e.append_context(context)), + TaskApiError::MetaError { + meta_err, + context: old, + } => TaskApiError::MetaError { + meta_err, + context: format!("{}; {}", old, context), + }, + } + } +} diff --git a/src/query/management/src/task/mod.rs b/src/query/management/src/task/mod.rs new file mode 100644 index 0000000000000..1820c819ad612 --- /dev/null +++ b/src/query/management/src/task/mod.rs @@ -0,0 +1,20 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod errors; +mod task_mgr; + +pub use errors::TaskApiError; +pub use errors::TaskError; +pub use task_mgr::TaskMgr; diff --git a/src/query/management/src/task/task_mgr.rs b/src/query/management/src/task/task_mgr.rs new file mode 100644 index 0000000000000..ddb74c1de1ed2 --- /dev/null +++ b/src/query/management/src/task/task_mgr.rs @@ -0,0 +1,353 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::str::FromStr; +use std::sync::Arc; + +use chrono::Utc; +use chrono_tz::Tz; +use cron::Schedule; +use databend_common_ast::ast::AlterTaskOptions; +use databend_common_ast::ast::ScheduleOptions; +use databend_common_meta_api::kv_pb_api::errors::PbApiReadError; +use databend_common_meta_api::kv_pb_api::KVPbApi; +use databend_common_meta_api::kv_pb_api::UpsertPB; +use databend_common_meta_app::principal::task; +use databend_common_meta_app::principal::task::TaskMessage; +use databend_common_meta_app::principal::task_message_ident::TaskMessageIdent; +use databend_common_meta_app::principal::ScheduleType; +use databend_common_meta_app::principal::Status; +use databend_common_meta_app::principal::Task; +use databend_common_meta_app::principal::TaskIdent; +use databend_common_meta_app::schema::CreateOption; +use databend_common_meta_app::tenant::Tenant; +use databend_common_meta_kvapi::kvapi; +use databend_common_meta_kvapi::kvapi::DirName; +use databend_common_meta_kvapi::kvapi::Key; +use databend_common_meta_types::MatchSeq; +use databend_common_meta_types::MetaError; +use databend_common_meta_types::With; +use futures::TryStreamExt; + +use crate::task::errors::TaskApiError; +use crate::task::errors::TaskError; + +#[derive(Clone)] +pub struct TaskMgr { + kv_api: Arc>, + tenant: Tenant, +} + +impl TaskMgr { + pub fn create(kv_api: Arc>, tenant: &Tenant) -> Self { + TaskMgr { + kv_api, + tenant: tenant.clone(), + } + } + + #[async_backtrace::framed] + #[fastrace::trace] + pub async fn create_task( + &self, + mut task: Task, + create_option: &CreateOption, + ) -> Result, TaskApiError> { + task.created_at = Utc::now(); + + self.create_task_inner(task, create_option, false).await + } + + #[async_backtrace::framed] + #[fastrace::trace] + pub async fn update_task(&self, task: Task) -> Result, TaskApiError> { + self.create_task_inner(task, &CreateOption::CreateOrReplace, true) + .await + } + + #[async_backtrace::framed] + #[fastrace::trace] + pub async fn execute_task( + &self, + task_name: &str, + ) -> Result, TaskApiError> { + let key = TaskIdent::new(&self.tenant, task_name); + let Some(task) = self.kv_api.get_pb(&key).await? else { + return Ok(Err(TaskError::NotFound { + tenant: self.tenant.tenant_name().to_string(), + name: task_name.to_string(), + context: "while execute task".to_string(), + })); + }; + self.send(TaskMessage::ExecuteTask(Task::clone(&task))) + .await?; + + Ok(Ok(())) + } + + #[async_backtrace::framed] + #[fastrace::trace] + pub async fn alter_task( + &self, + task_name: &str, + alter_options: &AlterTaskOptions, + ) -> Result, TaskApiError> { + let key = TaskIdent::new(&self.tenant, task_name); + let Some(task) = self.kv_api.get_pb(&key).await? else { + return Ok(Err(TaskError::NotFound { + tenant: self.tenant.tenant_name().to_string(), + name: task_name.to_string(), + context: "while alter task".to_string(), + })); + }; + let mut task = Task::clone(&task); + task.updated_at = Utc::now(); + + match alter_options { + AlterTaskOptions::Resume => { + task.status = Status::Started; + } + AlterTaskOptions::Suspend => { + task.last_suspended_at = Some(Utc::now()); + task.status = Status::Suspended; + } + AlterTaskOptions::Set { + schedule, + comments, + warehouse, + suspend_task_after_num_failures, + error_integration, + session_parameters, + } => { + task.schedule_options = schedule.clone().map(Self::make_schedule_options); + task.comment = comments.clone(); + task.warehouse_options = Some(Self::make_warehouse_options(warehouse.clone())); + task.suspend_task_after_num_failures = *suspend_task_after_num_failures; + task.error_integration = error_integration.clone(); + if let Some(session_parameters) = session_parameters { + task.session_params = session_parameters.clone(); + } + } + AlterTaskOptions::Unset { .. } => { + todo!() + } + AlterTaskOptions::ModifyAs(sql) => { + task.query_text = sql.to_string(); + } + AlterTaskOptions::ModifyWhen(sql) => { + task.when_condition = Some(sql.to_string()); + } + AlterTaskOptions::AddAfter(afters) => { + if task.schedule_options.is_some() { + return Ok(Err(TaskError::ScheduleAndAfterConflict { + tenant: self.tenant.tenant_name().to_string(), + name: task_name.to_string(), + })); + } + for after in afters { + if task.after.contains(after) { + continue; + } + task.after.push(after.clone()); + } + } + AlterTaskOptions::RemoveAfter(afters) => { + if task.schedule_options.is_some() { + return Ok(Err(TaskError::ScheduleAndAfterConflict { + tenant: self.tenant.tenant_name().to_string(), + name: task_name.to_string(), + })); + } + task.after.retain(|task| !afters.contains(task)); + } + } + if let Err(e) = self + .create_task_inner(task, &CreateOption::CreateOrReplace, false) + .await? + { + return Ok(Err(TaskError::NotFound { + tenant: self.tenant.tenant_name().to_string(), + name: task_name.to_string(), + context: format!("while alter task: {}", e), + })); + } + + Ok(Ok(())) + } + + #[async_backtrace::framed] + #[fastrace::trace] + pub async fn describe_task( + &self, + task_name: &str, + ) -> Result, TaskError>, TaskApiError> { + let key = TaskIdent::new(&self.tenant, task_name); + let task = self.kv_api.get_pb(&key).await?; + + Ok(Ok(task.map(|task| Task::clone(&task)))) + } + + #[allow(clippy::useless_asref)] + #[async_backtrace::framed] + #[fastrace::trace] + pub async fn drop_task(&self, task_name: &str) -> Result, MetaError> { + let key = TaskIdent::new(&self.tenant, task_name); + let req = UpsertPB::delete(key).with(MatchSeq::GE(1)); + let res = self.kv_api.upsert_pb(&req).await?; + + self.send(TaskMessage::DeleteTask(task_name.to_string())) + .await?; + if res.is_changed() { + Ok(res.prev.as_ref().map(|prev| Task::clone(prev))) + } else { + Ok(None) + } + } + + #[async_backtrace::framed] + #[fastrace::trace] + pub async fn list_task(&self) -> Result, MetaError> { + let key = DirName::new(TaskIdent::new(&self.tenant, "")); + let strm = self.kv_api.list_pb_values(&key).await?; + + match strm.try_collect().await { + Ok(tasks) => Ok(tasks), + Err(_) => self.list_task_fallback().await, + } + } + + #[async_backtrace::framed] + #[fastrace::trace] + pub async fn list_task_fallback(&self) -> Result, MetaError> { + let key = TaskIdent::new(&self.tenant, "dummy"); + let dir = DirName::new(key); + let tasks = self + .kv_api + .list_pb_values(&dir) + .await? + .try_collect::>() + .await?; + + Ok(tasks) + } + + #[async_backtrace::framed] + #[fastrace::trace] + pub async fn send(&self, message: TaskMessage) -> Result<(), MetaError> { + let key = TaskMessageIdent::new(&self.tenant, message.key()); + let seq = MatchSeq::from(CreateOption::CreateOrReplace); + + let req = UpsertPB::insert(key, message).with(seq); + let _ = self.kv_api.upsert_pb(&req).await?; + + Ok(()) + } + + /// mark the corresponding execute task as accepted and delete it from the queue + #[async_backtrace::framed] + #[fastrace::trace] + pub async fn execute_accept(&self, key: &str) -> Result<(), MetaError> { + let key = TaskMessageIdent::from_str_key(key).map_err(PbApiReadError::from)?; + + let req = UpsertPB::delete(key).with(MatchSeq::GE(1)); + let _ = self.kv_api.upsert_pb(&req).await?; + + Ok(()) + } + + async fn create_task_inner( + &self, + task: Task, + create_option: &CreateOption, + without_schedule: bool, + ) -> Result, TaskApiError> { + assert!(task.after.is_empty() || task.schedule_options.is_none()); + // check + if let Some(schedule_options) = &task.schedule_options { + match schedule_options.schedule_type { + ScheduleType::IntervalType => (), + ScheduleType::CronType => { + if let Err(e) = schedule_options.time_zone.as_ref().unwrap().parse::() { + return Ok(Err(TaskError::InvalidTimezone { + tenant: self.tenant.tenant_name().to_string(), + name: task.task_name.to_string(), + reason: e.to_string(), + })); + } + if let Err(e) = Schedule::from_str(schedule_options.cron.as_ref().unwrap()) { + return Ok(Err(TaskError::InvalidCron { + tenant: self.tenant.tenant_name().to_string(), + name: task.task_name.to_string(), + reason: e.to_string(), + })); + } + } + } + } + + let seq = MatchSeq::from(*create_option); + + let key = TaskIdent::new(&self.tenant, &task.task_name); + let req = UpsertPB::insert(key, task.clone()).with(seq); + let res = self.kv_api.upsert_pb(&req).await?; + + if let CreateOption::Create = create_option { + if res.prev.is_some() { + let err = TaskError::Exists { + tenant: self.tenant.tenant_name().to_string(), + name: task.task_name.to_string(), + reason: "".to_string(), + }; + return Ok(Err(err)); + } + } + if !task.after.is_empty() { + self.send(TaskMessage::AfterTask(task)).await?; + } else if task.schedule_options.is_some() && !without_schedule { + self.send(TaskMessage::ScheduleTask(task)).await?; + } + + Ok(Ok(())) + } + + pub fn make_schedule_options(opt: ScheduleOptions) -> task::ScheduleOptions { + match opt { + ScheduleOptions::IntervalSecs(secs, ms) => { + task::ScheduleOptions { + interval: Some(secs as i32), + // none if ms is 0, else some ms + milliseconds_interval: if ms == 0 { None } else { Some(ms) }, + cron: None, + time_zone: None, + schedule_type: task::ScheduleType::IntervalType, + } + } + + ScheduleOptions::CronExpression(expr, timezone) => task::ScheduleOptions { + interval: None, + milliseconds_interval: None, + cron: Some(expr), + time_zone: timezone, + schedule_type: task::ScheduleType::CronType, + }, + } + } + + pub fn make_warehouse_options(opt: Option) -> task::WarehouseOptions { + task::WarehouseOptions { + warehouse: opt, + using_warehouse_size: None, + } + } +} diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index e3d7f9ce3584a..705ab05bfea2d 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -46,6 +46,7 @@ bytes = { workspace = true } chrono = { workspace = true } chrono-tz = { workspace = true } concurrent-queue = { workspace = true } +cron = { workspace = true } ctor = { workspace = true } dashmap = { workspace = true } databend-common-ast = { workspace = true } @@ -171,6 +172,7 @@ sysinfo = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true, features = ["net"] } +tokio-util = { workspace = true } toml = { workspace = true, features = ["parse"] } tonic = { workspace = true } typetag = { workspace = true } diff --git a/src/query/service/src/global_services.rs b/src/query/service/src/global_services.rs index 19c332cc6caa8..f2d7fc880fa53 100644 --- a/src/query/service/src/global_services.rs +++ b/src/query/service/src/global_services.rs @@ -57,6 +57,7 @@ use crate::servers::http::v1::ClientSessionManager; use crate::servers::http::v1::HttpQueryManager; use crate::sessions::QueriesQueueManager; use crate::sessions::SessionManager; +use crate::task::service::TaskService; pub struct GlobalServices; @@ -142,7 +143,6 @@ impl GlobalServices { ) .await?; } - RoleCacheManager::init()?; DataOperator::init(&config.storage, config.spill.storage_params.clone()).await?; @@ -177,6 +177,7 @@ impl GlobalServices { if config.log.history.on { GlobalHistoryLog::init(config).await?; } + TaskService::init(config).await?; GLOBAL_QUERIES_MANAGER.set_gc_handle(memory_gc_handle); diff --git a/src/query/service/src/interpreters/interpreter_task_alter.rs b/src/query/service/src/interpreters/interpreter_task_alter.rs index 2f45da642685e..71eabd5a6261d 100644 --- a/src/query/service/src/interpreters/interpreter_task_alter.rs +++ b/src/query/service/src/interpreters/interpreter_task_alter.rs @@ -14,24 +14,11 @@ use std::sync::Arc; -use databend_common_ast::ast::AlterTaskOptions; -use databend_common_ast::ast::TaskSql; -use databend_common_catalog::table_context::TableContext; -use databend_common_cloud_control::client_config::make_request; -use databend_common_cloud_control::cloud_api::CloudControlApiProvider; -use databend_common_cloud_control::pb; -use databend_common_cloud_control::pb::alter_task_request::AlterTaskType; -use databend_common_cloud_control::pb::AlterTaskRequest; -use databend_common_cloud_control::pb::WarehouseOptions; -use databend_common_config::GlobalConfig; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_settings::DefaultSettings; -use databend_common_settings::SettingScope; use databend_common_sql::plans::AlterTaskPlan; -use crate::interpreters::common::get_task_client_config; -use crate::interpreters::common::make_schedule_options; +use crate::interpreters::task::TaskInterpreter; +use crate::interpreters::task::TaskInterpreterFactory; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::sessions::QueryContext; @@ -48,111 +35,7 @@ impl AlterTaskInterpreter { } } -impl AlterTaskInterpreter { - fn build_request(&self) -> AlterTaskRequest { - let plan = self.plan.clone(); - let owner = self - .ctx - .get_current_role() - .unwrap_or_default() - .identity() - .to_string(); - let mut req = AlterTaskRequest { - task_name: plan.task_name, - tenant_id: plan.tenant.tenant_name().to_string(), - owner, - alter_task_type: 0, - if_exist: plan.if_exists, - error_integration: None, - task_sql_type: 0, - query_text: None, - comment: None, - schedule_options: None, - warehouse_options: None, - suspend_task_after_num_failures: None, - when_condition: None, - add_after: vec![], - remove_after: vec![], - set_session_parameters: false, - session_parameters: Default::default(), - script_sql: None, - }; - match plan.alter_options { - AlterTaskOptions::Resume => { - req.alter_task_type = AlterTaskType::Resume as i32; - } - AlterTaskOptions::Suspend => { - req.alter_task_type = AlterTaskType::Suspend as i32; - } - AlterTaskOptions::Set { - schedule, - comments, - warehouse, - suspend_task_after_num_failures, - error_integration, - session_parameters, - } => { - req.alter_task_type = AlterTaskType::Set as i32; - req.schedule_options = schedule.map(make_schedule_options); - req.comment = comments; - req.warehouse_options = warehouse.map(|w| WarehouseOptions { - warehouse: Some(w), - using_warehouse_size: None, - }); - req.error_integration = error_integration; - req.suspend_task_after_num_failures = - suspend_task_after_num_failures.map(|i| i as i32); - if let Some(session_parameters) = session_parameters { - req.set_session_parameters = true; - req.session_parameters = session_parameters; - } - } - AlterTaskOptions::Unset { .. } => { - todo!() - } - AlterTaskOptions::ModifyAs(sql) => { - req.alter_task_type = AlterTaskType::ModifyAs as i32; - match sql { - TaskSql::SingleStatement(stmt) => { - req.task_sql_type = i32::from(pb::TaskSqlType::Sql); - req.query_text = Some(stmt); - } - TaskSql::ScriptBlock(ref sqls) => { - req.task_sql_type = i32::from(pb::TaskSqlType::Script); - req.query_text = Some(format!("{}", sql)); - req.script_sql = Some(pb::ScriptSql { sqls: sqls.clone() }) - } - } - } - AlterTaskOptions::AddAfter(tasks) => { - req.alter_task_type = AlterTaskType::AddAfter as i32; - req.add_after = tasks; - } - AlterTaskOptions::RemoveAfter(tasks) => { - req.alter_task_type = AlterTaskType::RemoveAfter as i32; - req.remove_after = tasks; - } - AlterTaskOptions::ModifyWhen(sql) => { - req.alter_task_type = AlterTaskType::ModifyWhen as i32; - req.when_condition = Some(sql.to_string()); - } - } - req - } - - fn validate_session_parameters(&self) -> Result<()> { - if let AlterTaskOptions::Set { - session_parameters: Some(session_parameters), - .. - } = &self.plan.alter_options - { - for (key, _) in session_parameters.iter() { - DefaultSettings::check_setting_scope(key, SettingScope::Session)?; - } - } - Ok(()) - } -} +impl AlterTaskInterpreter {} #[async_trait::async_trait] impl Interpreter for AlterTaskInterpreter { @@ -167,19 +50,10 @@ impl Interpreter for AlterTaskInterpreter { #[fastrace::trace] #[async_backtrace::framed] async fn execute2(&self) -> Result { - let config = GlobalConfig::instance(); - if config.query.cloud_control_grpc_server_address.is_none() { - return Err(ErrorCode::CloudControlNotEnabled( - "cannot alter task without cloud control enabled, please set cloud_control_grpc_server_address in config", - )); - } - self.validate_session_parameters()?; - let cloud_api = CloudControlApiProvider::instance(); - let task_client = cloud_api.get_task_client(); - let req = self.build_request(); - let config = get_task_client_config(self.ctx.clone(), cloud_api.get_timeout())?; - let req = make_request(req, config); - task_client.alter_task(req).await?; + TaskInterpreterFactory::build() + .alter_task(&self.ctx, &self.plan) + .await?; + Ok(PipelineBuildResult::create()) } } diff --git a/src/query/service/src/interpreters/interpreter_task_create.rs b/src/query/service/src/interpreters/interpreter_task_create.rs index f1a9a204b786b..88bd453bc0967 100644 --- a/src/query/service/src/interpreters/interpreter_task_create.rs +++ b/src/query/service/src/interpreters/interpreter_task_create.rs @@ -14,23 +14,11 @@ use std::sync::Arc; -use databend_common_ast::ast::TaskSql; -use databend_common_catalog::table_context::TableContext; -use databend_common_cloud_control::client_config::make_request; -use databend_common_cloud_control::cloud_api::CloudControlApiProvider; -use databend_common_cloud_control::pb; -use databend_common_cloud_control::pb::CreateTaskRequest; -use databend_common_cloud_control::pb::DropTaskRequest; -use databend_common_config::GlobalConfig; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_settings::DefaultSettings; -use databend_common_settings::SettingScope; use databend_common_sql::plans::CreateTaskPlan; -use crate::interpreters::common::get_task_client_config; -use crate::interpreters::common::make_schedule_options; -use crate::interpreters::common::make_warehouse_options; +use crate::interpreters::task::TaskInterpreter; +use crate::interpreters::task::TaskInterpreterFactory; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::sessions::QueryContext; @@ -47,55 +35,6 @@ impl CreateTaskInterpreter { } } -impl CreateTaskInterpreter { - fn build_request(&self) -> CreateTaskRequest { - let plan = self.plan.clone(); - let owner = self - .ctx - .get_current_role() - .unwrap_or_default() - .identity() - .to_string(); - let mut req = CreateTaskRequest { - task_name: plan.task_name, - tenant_id: plan.tenant.tenant_name().to_string(), - query_text: "".to_string(), - owner, - comment: plan.comment, - schedule_options: plan.schedule_opts.map(make_schedule_options), - warehouse_options: Some(make_warehouse_options(plan.warehouse)), - error_integration: plan.error_integration, - task_sql_type: 0, - suspend_task_after_num_failures: plan.suspend_task_after_num_failures.map(|x| x as i32), - if_not_exist: plan.create_option.if_not_exist(), - after: plan.after, - when_condition: plan.when_condition, - session_parameters: plan.session_parameters, - script_sql: None, - }; - match plan.sql { - TaskSql::SingleStatement(stmt) => { - req.task_sql_type = i32::from(pb::TaskSqlType::Sql); - req.query_text = stmt; - } - TaskSql::ScriptBlock(ref sqls) => { - req.task_sql_type = i32::from(pb::TaskSqlType::Script); - req.query_text = format!("{}", plan.sql); - req.script_sql = Some(pb::ScriptSql { sqls: sqls.clone() }) - } - } - req - } - - fn validate_session_parameters(&self) -> Result<()> { - let session_parameters = self.plan.session_parameters.clone(); - for (key, _) in session_parameters.iter() { - DefaultSettings::check_setting_scope(key, SettingScope::Session)?; - } - Ok(()) - } -} - #[async_trait::async_trait] impl Interpreter for CreateTaskInterpreter { fn name(&self) -> &str { @@ -109,31 +48,10 @@ impl Interpreter for CreateTaskInterpreter { #[fastrace::trace] #[async_backtrace::framed] async fn execute2(&self) -> Result { - let config = GlobalConfig::instance(); - if config.query.cloud_control_grpc_server_address.is_none() { - return Err(ErrorCode::CloudControlNotEnabled( - "cannot create task without cloud control enabled, please set cloud_control_grpc_server_address in config", - )); - } - self.validate_session_parameters()?; - let cloud_api = CloudControlApiProvider::instance(); - let task_client = cloud_api.get_task_client(); - let req = self.build_request(); - let config = get_task_client_config(self.ctx.clone(), cloud_api.get_timeout())?; - let req = make_request(req, config.clone()); - - // cloud don't support create or replace, let's remove the task in previous - if self.plan.create_option.is_overriding() { - let drop_req = DropTaskRequest { - task_name: self.plan.task_name.clone(), - tenant_id: self.plan.tenant.tenant_name().to_string(), - if_exist: true, - }; - let drop_req = make_request(drop_req, config); - task_client.drop_task(drop_req).await?; - } + TaskInterpreterFactory::build() + .create_task(&self.ctx, &self.plan) + .await?; - task_client.create_task(req).await?; Ok(PipelineBuildResult::create()) } } diff --git a/src/query/service/src/interpreters/interpreter_task_describe.rs b/src/query/service/src/interpreters/interpreter_task_describe.rs index 1625954f60d9e..e0814a0486495 100644 --- a/src/query/service/src/interpreters/interpreter_task_describe.rs +++ b/src/query/service/src/interpreters/interpreter_task_describe.rs @@ -14,16 +14,12 @@ use std::sync::Arc; -use databend_common_cloud_control::client_config::make_request; -use databend_common_cloud_control::cloud_api::CloudControlApiProvider; -use databend_common_cloud_control::pb::DescribeTaskRequest; -use databend_common_config::GlobalConfig; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_sql::plans::DescribeTaskPlan; use databend_common_storages_system::parse_tasks_to_datablock; -use crate::interpreters::common::get_task_client_config; +use crate::interpreters::task::TaskInterpreter; +use crate::interpreters::task::TaskInterpreterFactory; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::sessions::QueryContext; @@ -40,17 +36,6 @@ impl DescribeTaskInterpreter { } } -impl DescribeTaskInterpreter { - fn build_request(&self) -> DescribeTaskRequest { - let plan = self.plan.clone(); - DescribeTaskRequest { - task_name: plan.task_name, - tenant_id: plan.tenant.tenant_name().to_string(), - if_exist: false, - } - } -} - #[async_trait::async_trait] impl Interpreter for DescribeTaskInterpreter { fn name(&self) -> &str { @@ -64,23 +49,12 @@ impl Interpreter for DescribeTaskInterpreter { #[fastrace::trace] #[async_backtrace::framed] async fn execute2(&self) -> Result { - let config = GlobalConfig::instance(); - if config.query.cloud_control_grpc_server_address.is_none() { - return Err(ErrorCode::CloudControlNotEnabled( - "cannot describe task without cloud control enabled, please set cloud_control_grpc_server_address in config", - )); - } - let cloud_api = CloudControlApiProvider::instance(); - let task_client = cloud_api.get_task_client(); - let req = self.build_request(); - let config = get_task_client_config(self.ctx.clone(), cloud_api.get_timeout())?; - let req = make_request(req, config); - let resp = task_client.describe_task(req).await?; - if resp.task.is_none() { + let Some(task) = TaskInterpreterFactory::build() + .describe_task(&self.ctx, &self.plan) + .await? + else { return Ok(PipelineBuildResult::create()); - } - let tasks = vec![resp.task.unwrap()]; - let result = parse_tasks_to_datablock(tasks)?; - PipelineBuildResult::from_blocks(vec![result]) + }; + PipelineBuildResult::from_blocks(vec![parse_tasks_to_datablock(vec![task])?]) } } diff --git a/src/query/service/src/interpreters/interpreter_task_drop.rs b/src/query/service/src/interpreters/interpreter_task_drop.rs index c8749399aafb3..ca4f5554ae4aa 100644 --- a/src/query/service/src/interpreters/interpreter_task_drop.rs +++ b/src/query/service/src/interpreters/interpreter_task_drop.rs @@ -14,15 +14,11 @@ use std::sync::Arc; -use databend_common_cloud_control::client_config::make_request; -use databend_common_cloud_control::cloud_api::CloudControlApiProvider; -use databend_common_cloud_control::pb::DropTaskRequest; -use databend_common_config::GlobalConfig; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_sql::plans::DropTaskPlan; -use crate::interpreters::common::get_task_client_config; +use crate::interpreters::task::TaskInterpreter; +use crate::interpreters::task::TaskInterpreterFactory; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::sessions::QueryContext; @@ -39,17 +35,6 @@ impl DropTaskInterpreter { } } -impl DropTaskInterpreter { - fn build_request(&self) -> DropTaskRequest { - let plan = self.plan.clone(); - DropTaskRequest { - task_name: plan.task_name, - tenant_id: plan.tenant.tenant_name().to_string(), - if_exist: plan.if_exists, - } - } -} - #[async_trait::async_trait] impl Interpreter for DropTaskInterpreter { fn name(&self) -> &str { @@ -63,18 +48,10 @@ impl Interpreter for DropTaskInterpreter { #[fastrace::trace] #[async_backtrace::framed] async fn execute2(&self) -> Result { - let config = GlobalConfig::instance(); - if config.query.cloud_control_grpc_server_address.is_none() { - return Err(ErrorCode::CloudControlNotEnabled( - "cannot drop task without cloud control enabled, please set cloud_control_grpc_server_address in config", - )); - } - let cloud_api = CloudControlApiProvider::instance(); - let task_client = cloud_api.get_task_client(); - let req = self.build_request(); - let config = get_task_client_config(self.ctx.clone(), cloud_api.get_timeout())?; - let req = make_request(req, config); - task_client.drop_task(req).await?; + TaskInterpreterFactory::build() + .drop_task(&self.ctx, &self.plan) + .await?; + Ok(PipelineBuildResult::create()) } } diff --git a/src/query/service/src/interpreters/interpreter_task_execute.rs b/src/query/service/src/interpreters/interpreter_task_execute.rs index bac2a587f3847..23ccc1a33bb41 100644 --- a/src/query/service/src/interpreters/interpreter_task_execute.rs +++ b/src/query/service/src/interpreters/interpreter_task_execute.rs @@ -14,15 +14,11 @@ use std::sync::Arc; -use databend_common_cloud_control::client_config::make_request; -use databend_common_cloud_control::cloud_api::CloudControlApiProvider; -use databend_common_cloud_control::pb::ExecuteTaskRequest; -use databend_common_config::GlobalConfig; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_sql::plans::ExecuteTaskPlan; -use crate::interpreters::common::get_task_client_config; +use crate::interpreters::task::TaskInterpreter; +use crate::interpreters::task::TaskInterpreterFactory; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::sessions::QueryContext; @@ -52,22 +48,10 @@ impl Interpreter for ExecuteTaskInterpreter { #[fastrace::trace] #[async_backtrace::framed] async fn execute2(&self) -> Result { - let config = GlobalConfig::instance(); - if config.query.cloud_control_grpc_server_address.is_none() { - return Err(ErrorCode::CloudControlNotEnabled( - "cannot execute task without cloud control enabled, please set cloud_control_grpc_server_address in config", - )); - } - let cloud_api = CloudControlApiProvider::instance(); - let task_client = cloud_api.get_task_client(); - let req = ExecuteTaskRequest { - task_name: self.plan.task_name.clone(), - tenant_id: self.plan.tenant.tenant_name().to_string(), - }; - let config = get_task_client_config(self.ctx.clone(), cloud_api.get_timeout())?; - let req = make_request(req, config); + TaskInterpreterFactory::build() + .execute_task(&self.ctx, &self.plan) + .await?; - task_client.execute_task(req).await?; Ok(PipelineBuildResult::create()) } } diff --git a/src/query/service/src/interpreters/interpreter_tasks_show.rs b/src/query/service/src/interpreters/interpreter_tasks_show.rs index 3d9fdc299b37b..62e58ee3badc6 100644 --- a/src/query/service/src/interpreters/interpreter_tasks_show.rs +++ b/src/query/service/src/interpreters/interpreter_tasks_show.rs @@ -14,16 +14,12 @@ use std::sync::Arc; -use databend_common_cloud_control::client_config::make_request; -use databend_common_cloud_control::cloud_api::CloudControlApiProvider; -use databend_common_cloud_control::pb::ShowTasksRequest; -use databend_common_config::GlobalConfig; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_sql::plans::ShowTasksPlan; use databend_common_storages_system::parse_tasks_to_datablock; -use crate::interpreters::common::get_task_client_config; +use crate::interpreters::task::TaskInterpreter; +use crate::interpreters::task::TaskInterpreterFactory; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::sessions::QueryContext; @@ -40,27 +36,7 @@ impl ShowTasksInterpreter { } } -impl ShowTasksInterpreter { - async fn build_request(&self) -> Result { - let plan = self.plan.clone(); - let available_roles = self - .ctx - .get_current_session() - .get_all_available_roles() - .await?; - let req = ShowTasksRequest { - tenant_id: plan.tenant.tenant_name().to_string(), - name_like: "".to_string(), - result_limit: 10000, // TODO: use plan.limit pushdown - owners: available_roles - .into_iter() - .map(|x| x.identity().to_string()) - .collect(), - task_ids: vec![], - }; - Ok(req) - } -} +impl ShowTasksInterpreter {} #[async_trait::async_trait] impl Interpreter for ShowTasksInterpreter { @@ -75,20 +51,9 @@ impl Interpreter for ShowTasksInterpreter { #[fastrace::trace] #[async_backtrace::framed] async fn execute2(&self) -> Result { - let config = GlobalConfig::instance(); - if config.query.cloud_control_grpc_server_address.is_none() { - return Err(ErrorCode::CloudControlNotEnabled( - "cannot drop task without cloud control enabled, please set cloud_control_grpc_server_address in config", - )); - } - let cloud_api = CloudControlApiProvider::instance(); - let task_client = cloud_api.get_task_client(); - let req = self.build_request().await?; - let config = get_task_client_config(self.ctx.clone(), cloud_api.get_timeout())?; - let req = make_request(req, config); - - let resp = task_client.show_tasks(req).await?; - let tasks = resp.tasks; + let tasks = TaskInterpreterFactory::build() + .show_tasks(&self.ctx, &self.plan) + .await?; let result = parse_tasks_to_datablock(tasks)?; PipelineBuildResult::from_blocks(vec![result]) diff --git a/src/query/service/src/interpreters/mod.rs b/src/query/service/src/interpreters/mod.rs index d221ac3d8c6b8..dfcfd4f2fe572 100644 --- a/src/query/service/src/interpreters/mod.rs +++ b/src/query/service/src/interpreters/mod.rs @@ -170,6 +170,7 @@ mod interpreter_view_create; mod interpreter_view_describe; mod interpreter_view_drop; mod interpreter_virtual_column_refresh; +mod task; mod util; pub use access::ManagementModeAccess; diff --git a/src/query/service/src/interpreters/task/cloud.rs b/src/query/service/src/interpreters/task/cloud.rs new file mode 100644 index 0000000000000..0e389638ee955 --- /dev/null +++ b/src/query/service/src/interpreters/task/cloud.rs @@ -0,0 +1,358 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_ast::ast::AlterTaskOptions; +use databend_common_ast::ast::TaskSql; +use databend_common_catalog::table_context::TableContext; +use databend_common_cloud_control::client_config::make_request; +use databend_common_cloud_control::cloud_api::CloudControlApiProvider; +use databend_common_cloud_control::pb; +use databend_common_cloud_control::pb::alter_task_request::AlterTaskType; +use databend_common_cloud_control::pb::AlterTaskRequest; +use databend_common_cloud_control::pb::CreateTaskRequest; +use databend_common_cloud_control::pb::DescribeTaskRequest; +use databend_common_cloud_control::pb::DropTaskRequest; +use databend_common_cloud_control::pb::ExecuteTaskRequest; +use databend_common_cloud_control::pb::ShowTasksRequest; +use databend_common_cloud_control::pb::WarehouseOptions; +use databend_common_cloud_control::task_utils; +use databend_common_config::GlobalConfig; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_settings::DefaultSettings; +use databend_common_settings::SettingScope; +use databend_common_sql::plans::AlterTaskPlan; +use databend_common_sql::plans::CreateTaskPlan; +use databend_common_sql::plans::DescribeTaskPlan; +use databend_common_sql::plans::DropTaskPlan; +use databend_common_sql::plans::ExecuteTaskPlan; +use databend_common_sql::plans::ShowTasksPlan; + +use crate::interpreters::common::get_task_client_config; +use crate::interpreters::common::make_schedule_options; +use crate::interpreters::common::make_warehouse_options; +use crate::interpreters::task::TaskInterpreter; +use crate::sessions::QueryContext; + +pub(crate) struct CloudTaskInterpreter; + +impl CloudTaskInterpreter { + fn validate_create_session_parameters(plan: &CreateTaskPlan) -> Result<()> { + let session_parameters = plan.session_parameters.clone(); + for (key, _) in session_parameters.iter() { + DefaultSettings::check_setting_scope(key, SettingScope::Session)?; + } + Ok(()) + } + + fn validate_alter_session_parameters(plan: &AlterTaskPlan) -> Result<()> { + if let AlterTaskOptions::Set { + session_parameters: Some(session_parameters), + .. + } = &plan.alter_options + { + for (key, _) in session_parameters.iter() { + DefaultSettings::check_setting_scope(key, SettingScope::Session)?; + } + } + Ok(()) + } + + fn build_create_request(ctx: &Arc, plan: &CreateTaskPlan) -> CreateTaskRequest { + let plan = plan.clone(); + let owner = ctx + .get_current_role() + .unwrap_or_default() + .identity() + .to_string(); + let mut req = CreateTaskRequest { + task_name: plan.task_name, + tenant_id: plan.tenant.tenant_name().to_string(), + query_text: "".to_string(), + owner, + comment: plan.comment, + schedule_options: plan.schedule_opts.map(make_schedule_options), + warehouse_options: Some(make_warehouse_options(plan.warehouse)), + error_integration: plan.error_integration, + task_sql_type: 0, + suspend_task_after_num_failures: plan.suspend_task_after_num_failures.map(|x| x as i32), + if_not_exist: plan.create_option.if_not_exist(), + after: plan.after, + when_condition: plan.when_condition, + session_parameters: plan.session_parameters, + script_sql: None, + }; + match plan.sql { + TaskSql::SingleStatement(stmt) => { + req.task_sql_type = i32::from(pb::TaskSqlType::Sql); + req.query_text = stmt; + } + TaskSql::ScriptBlock(ref sqls) => { + req.task_sql_type = i32::from(pb::TaskSqlType::Script); + req.query_text = format!("{}", plan.sql); + req.script_sql = Some(pb::ScriptSql { sqls: sqls.clone() }) + } + } + req + } + + fn build_alter_request(ctx: &Arc, plan: &AlterTaskPlan) -> AlterTaskRequest { + let plan = plan.clone(); + let owner = ctx + .get_current_role() + .unwrap_or_default() + .identity() + .to_string(); + let mut req = AlterTaskRequest { + task_name: plan.task_name, + tenant_id: plan.tenant.tenant_name().to_string(), + owner, + alter_task_type: 0, + if_exist: plan.if_exists, + error_integration: None, + task_sql_type: 0, + query_text: None, + comment: None, + schedule_options: None, + warehouse_options: None, + suspend_task_after_num_failures: None, + when_condition: None, + add_after: vec![], + remove_after: vec![], + set_session_parameters: false, + session_parameters: Default::default(), + script_sql: None, + }; + match plan.alter_options { + AlterTaskOptions::Resume => { + req.alter_task_type = AlterTaskType::Resume as i32; + } + AlterTaskOptions::Suspend => { + req.alter_task_type = AlterTaskType::Suspend as i32; + } + AlterTaskOptions::Set { + schedule, + comments, + warehouse, + suspend_task_after_num_failures, + error_integration, + session_parameters, + } => { + req.alter_task_type = AlterTaskType::Set as i32; + req.schedule_options = schedule.map(make_schedule_options); + req.comment = comments; + req.warehouse_options = warehouse.map(|w| WarehouseOptions { + warehouse: Some(w), + using_warehouse_size: None, + }); + req.error_integration = error_integration; + req.suspend_task_after_num_failures = + suspend_task_after_num_failures.map(|i| i as i32); + if let Some(session_parameters) = session_parameters { + req.set_session_parameters = true; + req.session_parameters = session_parameters; + } + } + AlterTaskOptions::Unset { .. } => { + todo!() + } + AlterTaskOptions::ModifyAs(sql) => { + req.alter_task_type = AlterTaskType::ModifyAs as i32; + match sql { + TaskSql::SingleStatement(stmt) => { + req.task_sql_type = i32::from(pb::TaskSqlType::Sql); + req.query_text = Some(stmt); + } + TaskSql::ScriptBlock(ref sqls) => { + req.task_sql_type = i32::from(pb::TaskSqlType::Script); + req.query_text = Some(format!("{}", sql)); + req.script_sql = Some(pb::ScriptSql { sqls: sqls.clone() }) + } + } + } + AlterTaskOptions::AddAfter(tasks) => { + req.alter_task_type = AlterTaskType::AddAfter as i32; + req.add_after = tasks; + } + AlterTaskOptions::RemoveAfter(tasks) => { + req.alter_task_type = AlterTaskType::RemoveAfter as i32; + req.remove_after = tasks; + } + AlterTaskOptions::ModifyWhen(sql) => { + req.alter_task_type = AlterTaskType::ModifyWhen as i32; + req.when_condition = Some(sql.to_string()); + } + } + req + } + + async fn build_show_tasks_request( + ctx: &Arc, + plan: &ShowTasksPlan, + ) -> Result { + let plan = plan.clone(); + let available_roles = ctx.get_current_session().get_all_available_roles().await?; + let req = ShowTasksRequest { + tenant_id: plan.tenant.tenant_name().to_string(), + name_like: "".to_string(), + result_limit: 10000, // TODO: use plan.limit pushdown + owners: available_roles + .into_iter() + .map(|x| x.identity().to_string()) + .collect(), + task_ids: vec![], + }; + Ok(req) + } +} + +impl TaskInterpreter for CloudTaskInterpreter { + async fn create_task(&self, ctx: &Arc, plan: &CreateTaskPlan) -> Result<()> { + let config = GlobalConfig::instance(); + if config.query.cloud_control_grpc_server_address.is_none() { + return Err(ErrorCode::CloudControlNotEnabled( + "cannot create task without cloud control enabled, please set cloud_control_grpc_server_address in config", + )); + } + Self::validate_create_session_parameters(plan)?; + let cloud_api = CloudControlApiProvider::instance(); + let task_client = cloud_api.get_task_client(); + let req = Self::build_create_request(ctx, plan); + let config = get_task_client_config(ctx.clone(), cloud_api.get_timeout())?; + let req = make_request(req, config.clone()); + + // cloud don't support create or replace, let's remove the task in previous + if plan.create_option.is_overriding() { + let drop_req = DropTaskRequest { + task_name: plan.task_name.clone(), + tenant_id: plan.tenant.tenant_name().to_string(), + if_exist: true, + }; + let drop_req = make_request(drop_req, config); + task_client.drop_task(drop_req).await?; + } + + task_client.create_task(req).await?; + Ok(()) + } + + async fn execute_task(&self, ctx: &Arc, plan: &ExecuteTaskPlan) -> Result<()> { + let config = GlobalConfig::instance(); + if config.query.cloud_control_grpc_server_address.is_none() { + return Err(ErrorCode::CloudControlNotEnabled( + "cannot execute task without cloud control enabled, please set cloud_control_grpc_server_address in config", + )); + } + let cloud_api = CloudControlApiProvider::instance(); + let task_client = cloud_api.get_task_client(); + let req = ExecuteTaskRequest { + task_name: plan.task_name.clone(), + tenant_id: plan.tenant.tenant_name().to_string(), + }; + let config = get_task_client_config(ctx.clone(), cloud_api.get_timeout())?; + let req = make_request(req, config); + + task_client.execute_task(req).await?; + + Ok(()) + } + + async fn alter_task(&self, ctx: &Arc, plan: &AlterTaskPlan) -> Result<()> { + let config = GlobalConfig::instance(); + if config.query.cloud_control_grpc_server_address.is_none() { + return Err(ErrorCode::CloudControlNotEnabled( + "cannot alter task without cloud control enabled, please set cloud_control_grpc_server_address in config", + )); + } + Self::validate_alter_session_parameters(plan)?; + let cloud_api = CloudControlApiProvider::instance(); + let task_client = cloud_api.get_task_client(); + let req = Self::build_alter_request(ctx, plan); + let config = get_task_client_config(ctx.clone(), cloud_api.get_timeout())?; + let req = make_request(req, config); + task_client.alter_task(req).await?; + + Ok(()) + } + + async fn describe_task( + &self, + ctx: &Arc, + plan: &DescribeTaskPlan, + ) -> Result> { + let config = GlobalConfig::instance(); + if config.query.cloud_control_grpc_server_address.is_none() { + return Err(ErrorCode::CloudControlNotEnabled( + "cannot describe task without cloud control enabled, please set cloud_control_grpc_server_address in config", + )); + } + let cloud_api = CloudControlApiProvider::instance(); + let task_client = cloud_api.get_task_client(); + let plan = plan.clone(); + let req = DescribeTaskRequest { + task_name: plan.task_name, + tenant_id: plan.tenant.tenant_name().to_string(), + if_exist: false, + }; + let config = get_task_client_config(ctx.clone(), cloud_api.get_timeout())?; + let req = make_request(req, config); + let resp = task_client.describe_task(req).await?; + + resp.task.map(task_utils::Task::try_from).transpose() + } + + async fn drop_task(&self, ctx: &Arc, plan: &DropTaskPlan) -> Result<()> { + let config = GlobalConfig::instance(); + if config.query.cloud_control_grpc_server_address.is_none() { + return Err(ErrorCode::CloudControlNotEnabled( + "cannot drop task without cloud control enabled, please set cloud_control_grpc_server_address in config", + )); + } + let cloud_api = CloudControlApiProvider::instance(); + let task_client = cloud_api.get_task_client(); + let req = DropTaskRequest { + task_name: plan.task_name.clone(), + tenant_id: plan.tenant.tenant_name().to_string(), + if_exist: plan.if_exists, + }; + let config = get_task_client_config(ctx.clone(), cloud_api.get_timeout())?; + let req = make_request(req, config); + task_client.drop_task(req).await?; + + Ok(()) + } + + async fn show_tasks( + &self, + ctx: &Arc, + plan: &ShowTasksPlan, + ) -> Result> { + let config = GlobalConfig::instance(); + if config.query.cloud_control_grpc_server_address.is_none() { + return Err(ErrorCode::CloudControlNotEnabled( + "cannot drop task without cloud control enabled, please set cloud_control_grpc_server_address in config", + )); + } + let cloud_api = CloudControlApiProvider::instance(); + let task_client = cloud_api.get_task_client(); + let req = Self::build_show_tasks_request(ctx, plan).await?; + let config = get_task_client_config(ctx.clone(), cloud_api.get_timeout())?; + let req = make_request(req, config); + + let resp = task_client.show_tasks(req).await?; + resp.tasks.into_iter().map(|t| t.try_into()).try_collect() + } +} diff --git a/src/query/service/src/interpreters/task/mod.rs b/src/query/service/src/interpreters/task/mod.rs new file mode 100644 index 0000000000000..3599d664f72d2 --- /dev/null +++ b/src/query/service/src/interpreters/task/mod.rs @@ -0,0 +1,122 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_cloud_control::task_utils; +use databend_common_config::GlobalConfig; +use databend_common_exception::Result; +use databend_common_sql::plans::AlterTaskPlan; +use databend_common_sql::plans::CreateTaskPlan; +use databend_common_sql::plans::DescribeTaskPlan; +use databend_common_sql::plans::DropTaskPlan; +use databend_common_sql::plans::ExecuteTaskPlan; +use databend_common_sql::plans::ShowTasksPlan; + +use crate::interpreters::task::cloud::CloudTaskInterpreter; +use crate::interpreters::task::private::PrivateTaskInterpreter; +use crate::sessions::QueryContext; + +mod cloud; +mod private; + +pub(crate) struct TaskInterpreterFactory; + +impl TaskInterpreterFactory { + pub fn build() -> TaskInterpreterImpl { + if GlobalConfig::instance().query.enable_private_task { + return TaskInterpreterImpl::Private(PrivateTaskInterpreter); + } + TaskInterpreterImpl::Cloud(CloudTaskInterpreter) + } +} + +pub(crate) enum TaskInterpreterImpl { + Cloud(CloudTaskInterpreter), + Private(PrivateTaskInterpreter), +} + +pub(crate) trait TaskInterpreter { + async fn create_task(&self, ctx: &Arc, plan: &CreateTaskPlan) -> Result<()>; + + async fn execute_task(&self, ctx: &Arc, plan: &ExecuteTaskPlan) -> Result<()>; + + async fn alter_task(&self, ctx: &Arc, plan: &AlterTaskPlan) -> Result<()>; + + async fn describe_task( + &self, + ctx: &Arc, + plan: &DescribeTaskPlan, + ) -> Result>; + + async fn drop_task(&self, ctx: &Arc, plan: &DropTaskPlan) -> Result<()>; + + async fn show_tasks( + &self, + ctx: &Arc, + plan: &ShowTasksPlan, + ) -> Result>; +} + +impl TaskInterpreter for TaskInterpreterImpl { + async fn create_task(&self, ctx: &Arc, plan: &CreateTaskPlan) -> Result<()> { + match self { + TaskInterpreterImpl::Cloud(interpreter) => interpreter.create_task(ctx, plan).await, + TaskInterpreterImpl::Private(interpreter) => interpreter.create_task(ctx, plan).await, + } + } + + async fn execute_task(&self, ctx: &Arc, plan: &ExecuteTaskPlan) -> Result<()> { + match self { + TaskInterpreterImpl::Cloud(interpreter) => interpreter.execute_task(ctx, plan).await, + TaskInterpreterImpl::Private(interpreter) => interpreter.execute_task(ctx, plan).await, + } + } + + async fn alter_task(&self, ctx: &Arc, plan: &AlterTaskPlan) -> Result<()> { + match self { + TaskInterpreterImpl::Cloud(interpreter) => interpreter.alter_task(ctx, plan).await, + TaskInterpreterImpl::Private(interpreter) => interpreter.alter_task(ctx, plan).await, + } + } + + async fn describe_task( + &self, + ctx: &Arc, + plan: &DescribeTaskPlan, + ) -> Result> { + match self { + TaskInterpreterImpl::Cloud(interpreter) => interpreter.describe_task(ctx, plan).await, + TaskInterpreterImpl::Private(interpreter) => interpreter.describe_task(ctx, plan).await, + } + } + + async fn drop_task(&self, ctx: &Arc, plan: &DropTaskPlan) -> Result<()> { + match self { + TaskInterpreterImpl::Cloud(interpreter) => interpreter.drop_task(ctx, plan).await, + TaskInterpreterImpl::Private(interpreter) => interpreter.drop_task(ctx, plan).await, + } + } + + async fn show_tasks( + &self, + ctx: &Arc, + plan: &ShowTasksPlan, + ) -> Result> { + match self { + TaskInterpreterImpl::Cloud(interpreter) => interpreter.show_tasks(ctx, plan).await, + TaskInterpreterImpl::Private(interpreter) => interpreter.show_tasks(ctx, plan).await, + } + } +} diff --git a/src/query/service/src/interpreters/task/private.rs b/src/query/service/src/interpreters/task/private.rs new file mode 100644 index 0000000000000..0322ddeb0ff86 --- /dev/null +++ b/src/query/service/src/interpreters/task/private.rs @@ -0,0 +1,168 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use chrono::Utc; +use databend_common_ast::ast::TaskSql; +use databend_common_catalog::table_context::TableContext; +use databend_common_cloud_control::pb; +use databend_common_cloud_control::task_utils; +use databend_common_exception::Result; +use databend_common_management::task::TaskMgr; +use databend_common_meta_app::principal::task::EMPTY_TASK_ID; +use databend_common_meta_app::principal::Status; +use databend_common_meta_app::principal::Task; +use databend_common_sql::plans::AlterTaskPlan; +use databend_common_sql::plans::CreateTaskPlan; +use databend_common_sql::plans::DescribeTaskPlan; +use databend_common_sql::plans::DropTaskPlan; +use databend_common_sql::plans::ExecuteTaskPlan; +use databend_common_sql::plans::ShowTasksPlan; +use databend_common_users::UserApiProvider; + +use crate::interpreters::task::TaskInterpreter; +use crate::sessions::QueryContext; + +pub(crate) struct PrivateTaskInterpreter; + +impl PrivateTaskInterpreter { + fn task_trans(task: Task) -> Result { + Ok(task_utils::Task { + task_id: task.task_id, + task_name: task.task_name, + query_text: task.query_text, + condition_text: task.when_condition.unwrap_or_default(), + after: task.after, + comment: task.comment, + owner: task.owner, + schedule_options: task + .schedule_options + .map(|schedule_options| { + let options = pb::ScheduleOptions { + interval: schedule_options.interval, + cron: schedule_options.cron, + time_zone: schedule_options.time_zone, + schedule_type: schedule_options.schedule_type as i32, + milliseconds_interval: schedule_options.milliseconds_interval, + }; + task_utils::format_schedule_options(&options) + }) + .transpose()?, + warehouse_options: task.warehouse_options.map(|warehouse_options| { + pb::WarehouseOptions { + warehouse: warehouse_options.warehouse, + using_warehouse_size: warehouse_options.using_warehouse_size, + } + }), + next_scheduled_at: task.next_scheduled_at, + suspend_task_after_num_failures: task.suspend_task_after_num_failures.map(|i| i as i32), + error_integration: task.error_integration, + status: match task.status { + Status::Suspended => task_utils::Status::Suspended, + Status::Started => task_utils::Status::Started, + }, + created_at: task.created_at, + updated_at: task.updated_at, + last_suspended_at: task.last_suspended_at, + session_params: task.session_params, + }) + } +} + +impl TaskInterpreter for PrivateTaskInterpreter { + async fn create_task(&self, ctx: &Arc, plan: &CreateTaskPlan) -> Result<()> { + let plan = plan.clone(); + + let owner = ctx + .get_current_role() + .unwrap_or_default() + .identity() + .to_string(); + + let task = databend_common_meta_app::principal::Task { + task_id: EMPTY_TASK_ID, + task_name: plan.task_name, + query_text: match plan.sql { + TaskSql::SingleStatement(s) => s, + TaskSql::ScriptBlock(_) => format!("{}", plan.sql), + }, + when_condition: plan.when_condition, + after: plan.after.clone(), + comment: plan.comment.clone(), + owner, + owner_user: ctx.get_current_user()?.identity().encode(), + schedule_options: plan.schedule_opts.map(TaskMgr::make_schedule_options), + warehouse_options: Some(TaskMgr::make_warehouse_options(plan.warehouse)), + next_scheduled_at: None, + suspend_task_after_num_failures: plan.suspend_task_after_num_failures, + error_integration: plan.error_integration.clone(), + status: Status::Suspended, + created_at: Utc::now(), + updated_at: Utc::now(), + last_suspended_at: None, + session_params: plan.session_parameters.clone(), + }; + UserApiProvider::instance() + .create_task(&plan.tenant, task, &plan.create_option) + .await?; + + Ok(()) + } + + async fn execute_task(&self, _ctx: &Arc, plan: &ExecuteTaskPlan) -> Result<()> { + UserApiProvider::instance() + .execute_task(&plan.tenant, &plan.task_name) + .await?; + + Ok(()) + } + + async fn alter_task(&self, _ctx: &Arc, plan: &AlterTaskPlan) -> Result<()> { + UserApiProvider::instance() + .alter_task(&plan.tenant, &plan.task_name, &plan.alter_options) + .await?; + + Ok(()) + } + + async fn describe_task( + &self, + _ctx: &Arc, + plan: &DescribeTaskPlan, + ) -> Result> { + let task = UserApiProvider::instance() + .describe_task(&plan.tenant, &plan.task_name) + .await?; + task.map(Self::task_trans).transpose() + } + + async fn drop_task(&self, _ctx: &Arc, plan: &DropTaskPlan) -> Result<()> { + UserApiProvider::instance() + .drop_task(&plan.tenant, &plan.task_name) + .await?; + + Ok(()) + } + + async fn show_tasks( + &self, + _ctx: &Arc, + plan: &ShowTasksPlan, + ) -> Result> { + let tasks = UserApiProvider::instance().show_tasks(&plan.tenant).await?; + + tasks.into_iter().map(Self::task_trans).try_collect() + } +} diff --git a/src/query/service/src/lib.rs b/src/query/service/src/lib.rs index a6115a346ab93..69a9ba3b420e5 100644 --- a/src/query/service/src/lib.rs +++ b/src/query/service/src/lib.rs @@ -61,6 +61,7 @@ pub mod table_functions; pub mod test_kits; mod global_services; +pub mod task; pub use databend_common_sql as sql; pub use databend_common_storages_factory as storages; diff --git a/src/query/service/src/task/meta.rs b/src/query/service/src/task/meta.rs new file mode 100644 index 0000000000000..8f19a9f9e9306 --- /dev/null +++ b/src/query/service/src/task/meta.rs @@ -0,0 +1,137 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use databend_common_base::runtime::block_on; +use databend_common_exception::Result; +use databend_common_meta_client::ClientHandle; +use databend_common_meta_kvapi::kvapi::KVApi; +use databend_common_meta_semaphore::acquirer::Permit; +use databend_common_meta_semaphore::Semaphore; +use databend_common_meta_types::MatchSeq; +use databend_common_meta_types::Operation; +use databend_common_meta_types::UpsertKV; + +/// refer to [crate::history_tables::meta::PermitGuard] +pub struct PermitGuard { + _permit: Permit, + meta_handle: Arc, + meta_key: String, +} + +impl PermitGuard { + pub fn new(permit: Permit, meta_handle: Arc, meta_key: String) -> Self { + Self { + _permit: permit, + meta_handle, + meta_key, + } + } +} + +impl Drop for PermitGuard { + fn drop(&mut self) { + let meta_handle = self.meta_handle.clone(); + let meta_key = self.meta_key.clone(); + + block_on(async move { + let _ = meta_handle.update_last_execution_timestamp(&meta_key).await; + }); + } +} + +/// refer to [crate::history_tables::meta::HistoryMetaHandle] +pub struct TaskMetaHandle { + meta_client: Arc, + node_id: String, +} + +impl TaskMetaHandle { + pub fn new(meta_client: Arc, node_id: String) -> Self { + Self { + meta_client, + node_id, + } + } + + pub fn meta_client(&self) -> &Arc { + &self.meta_client + } + + pub async fn acquire(&self, meta_key: &str, interval_millis: u64) -> Result> { + let acquired_guard = Semaphore::new_acquired( + self.meta_client.clone(), + meta_key, + 1, + self.node_id.clone(), + Duration::from_secs(3), + ) + .await + .map_err(|_e| "acquire semaphore failed from TaskService")?; + if interval_millis == 0 { + return Ok(Some(acquired_guard)); + } + if match self + .meta_client + .get_kv(&format!("{}/last_timestamp", meta_key)) + .await? + { + Some(v) => { + let last: u64 = serde_json::from_slice(&v.data)?; + chrono::Utc::now().timestamp_millis() as u64 + - Duration::from_millis(interval_millis).as_millis() as u64 + > last + } + None => true, + } { + Ok(Some(acquired_guard)) + } else { + drop(acquired_guard); + Ok(None) + } + } + + pub async fn acquire_with_guard( + &self, + meta_key: &str, + interval: u64, + ) -> Result> { + if let Some(permit) = self.acquire(meta_key, interval).await? { + Ok(Some(PermitGuard::new( + permit, + Arc::new(TaskMetaHandle { + meta_client: self.meta_client.clone(), + node_id: self.node_id.clone(), + }), + meta_key.to_string(), + ))) + } else { + Ok(None) + } + } + + pub async fn update_last_execution_timestamp(&self, meta_key: &str) -> Result<()> { + self.meta_client + .upsert_kv(UpsertKV::new( + format!("{}/last_timestamp", meta_key), + MatchSeq::Any, + Operation::Update(serde_json::to_vec(&chrono::Utc::now().timestamp_millis())?), + None, + )) + .await?; + Ok(()) + } +} diff --git a/src/query/service/src/task/mod.rs b/src/query/service/src/task/mod.rs new file mode 100644 index 0000000000000..ca4084fdfc1d8 --- /dev/null +++ b/src/query/service/src/task/mod.rs @@ -0,0 +1,19 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod meta; +pub mod service; +mod session; + +pub use service::TaskService; diff --git a/src/query/service/src/task/service.rs b/src/query/service/src/task/service.rs new file mode 100644 index 0000000000000..e999a0b6da63e --- /dev/null +++ b/src/query/service/src/task/service.rs @@ -0,0 +1,1063 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::ops::Deref; +use std::str::FromStr; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; + +use async_stream::stream; +use chrono::DateTime; +use chrono::Utc; +use chrono_tz::Tz; +use cron::Schedule; +use databend_common_ast::ast::AlterTaskOptions; +use databend_common_base::base::GlobalInstance; +use databend_common_base::runtime::Runtime; +use databend_common_base::runtime::TrySpawn; +use databend_common_catalog::cluster_info::Cluster; +use databend_common_config::InnerConfig; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_meta_api::kv_pb_api::decode_seqv; +use databend_common_meta_app::principal::task::TaskMessage; +use databend_common_meta_app::principal::task::EMPTY_TASK_ID; +use databend_common_meta_app::principal::task_message_ident::TaskMessageIdent; +use databend_common_meta_app::principal::ScheduleOptions; +use databend_common_meta_app::principal::ScheduleType; +use databend_common_meta_app::principal::State; +use databend_common_meta_app::principal::Status; +use databend_common_meta_app::principal::Task; +use databend_common_meta_app::principal::TaskRun; +use databend_common_meta_app::principal::UserIdentity; +use databend_common_meta_app::principal::UserInfo; +use databend_common_meta_app::principal::WarehouseOptions; +use databend_common_meta_app::tenant::Tenant; +use databend_common_meta_kvapi::kvapi::Key; +use databend_common_meta_store::MetaStoreProvider; +use databend_common_meta_types::protobuf::WatchRequest; +use databend_common_meta_types::protobuf::WatchResponse; +use databend_common_meta_types::MetaError; +use databend_common_sql::Planner; +use databend_common_users::UserApiProvider; +use databend_common_users::BUILTIN_ROLE_ACCOUNT_ADMIN; +use futures::Stream; +use futures_util::stream::BoxStream; +use futures_util::TryStreamExt; +use itertools::Itertools; +use log::error; +use tokio::time::sleep; +use tokio_stream::StreamExt; +use tokio_util::sync::CancellationToken; + +use crate::interpreters::InterpreterFactory; +use crate::schedulers::ServiceQueryExecutor; +use crate::sessions::QueryContext; +use crate::task::meta::TaskMetaHandle; +use crate::task::session::create_session; +use crate::task::session::get_task_user; + +pub type TaskMessageStream = BoxStream<'static, Result<(String, TaskMessage)>>; + +/// - Multiple Query nodes can send the same Task at the same time, and the key will be distinguished by node_id +/// - Each Query node will grab the corresponding task through acquire task_name. +/// - Tasks with the same task_name cannot be executed at the same time. +pub struct TaskService { + initialized: AtomicBool, + interval: u64, + tenant: Tenant, + node_id: String, + cluster_id: String, + meta_handle: TaskMetaHandle, + _runtime: Arc, +} + +impl TaskService { + pub fn instance() -> Arc { + GlobalInstance::get() + } + + pub async fn prepare(&self) -> Result<()> { + let prepare_key = format!("{}/task_run_prepare/lock", self.tenant.tenant_name()); + let _guard = self.meta_handle.acquire(&prepare_key, 0).await?; + let create_db = "CREATE DATABASE IF NOT EXISTS system_task"; + self.execute_sql(None, create_db).await?; + + let create_task_run_table = "CREATE TABLE IF NOT EXISTS system_task.task_run(\ + task_id UINT64,\ + task_name TEXT NOT NULL,\ + query_text TEXT NOT NULL,\ + when_condition TEXT, + after TEXT,\ + comment TEXT,\ + owner TEXT, + owner_user TEXT,\ + warehouse_name TEXT,\ + using_warehouse_size TEXT,\ + schedule_type INTEGER,\ + interval INTEGER,\ + interval_secs INTEGER,\ + interval_milliseconds UINT64,\ + cron TEXT,\ + time_zone TEXT DEFAULT 'UTC',\ + run_id UINT64,\ + attempt_number INTEGER,\ + state TEXT NOT NULL DEFAULT 'SCHEDULED',\ + error_code BIGINT,\ + error_message TEXT, + root_task_id UINT64,\ + scheduled_at TIMESTAMP DEFAULT NOW(),\ + completed_at TIMESTAMP,\ + next_scheduled_at TIMESTAMP DEFAULT NOW(),\ + error_integration TEXT,\ + status TEXT,\ + created_at TIMESTAMP,\ + updated_at TIMESTAMP,\ + session_params VARIANT,\ + last_suspended_at TIMESTAMP,\ + suspend_task_after_num_failures INTEGER\ + );"; + self.execute_sql(None, create_task_run_table).await?; + + let create_task_after_table = "CREATE TABLE IF NOT EXISTS system_task.task_after(\ + task_name STRING NOT NULL,\ + next_task STRING NOT NULL\ + );"; + self.execute_sql(None, create_task_after_table).await?; + + Ok(()) + } + + pub fn initialized(&self) { + self.initialized.store(true, Ordering::SeqCst); + } + + pub async fn init(cfg: &InnerConfig) -> Result<()> { + let tenant = cfg.query.tenant_id.clone(); + let meta_store = MetaStoreProvider::new(cfg.meta.to_meta_grpc_client_conf()) + .create_meta_store() + .await + .map_err(|e| { + ErrorCode::MetaServiceError(format!("Failed to create meta store: {}", e)) + })?; + let meta_client = meta_store.deref().clone(); + let meta_handle = TaskMetaHandle::new(meta_client, cfg.query.node_id.clone()); + let runtime = Arc::new(Runtime::with_worker_threads( + 4, + Some("task-worker".to_owned()), + )?); + + let instance = TaskService { + initialized: AtomicBool::new(false), + interval: 200, + tenant: cfg.query.tenant_id.clone(), + node_id: cfg.query.node_id.clone(), + cluster_id: cfg.query.cluster_id.clone(), + meta_handle, + _runtime: runtime.clone(), + }; + GlobalInstance::set(Arc::new(instance)); + + runtime.clone().try_spawn( + async move { + let task_service = TaskService::instance(); + loop { + if !task_service.initialized.load(Ordering::SeqCst) { + tokio::time::sleep(Duration::from_secs(1)).await; + } else { + break; + } + } + if let Err(err) = task_service.prepare().await { + error!("[PRIVATE-TASKS] prepare failed due to {}", err); + } + if let Err(err) = task_service.work(&tenant, runtime).await { + error!("[PRIVATE-TASKS] prepare failed due to {}", err); + } + }, + None, + )?; + Ok(()) + } + + async fn work(&self, tenant: &Tenant, runtime: Arc) -> Result<()> { + let mut scheduled_tasks: HashMap = HashMap::new(); + let task_mgr = UserApiProvider::instance().task_api(tenant); + + let mut steam = self.subscribe().await?; + + let fn_lock = async |task_service: &TaskService, key: &str| { + task_service + .meta_handle + .acquire_with_guard(&format!("{}/lock", key), task_service.interval) + .await + }; + + while let Some(result) = steam.next().await { + let (task_key, task_message) = result?; + match task_message { + // ScheduleTask is always monitored by all Query nodes, and ExecuteTask is sent serially to avoid repeated sending. + TaskMessage::ScheduleTask(mut task) => { + debug_assert!(task.schedule_options.is_some()); + if let Some(schedule_options) = &task.schedule_options { + // clean old task if alter + if let Some(token) = scheduled_tasks.remove(&task.task_name) { + token.cancel(); + } + match task.status { + Status::Suspended => continue, + Status::Started => (), + } + + let token = CancellationToken::new(); + let child_token = token.child_token(); + let task_name = task.task_name.to_string(); + let task_name_clone = task_name.clone(); + let task_service = TaskService::instance(); + + task_service + .update_or_create_task_run(&TaskRun { + task: task.clone(), + run_id: Self::make_run_id(), + attempt_number: task.suspend_task_after_num_failures.unwrap_or(0) + as i32, + state: State::Scheduled, + scheduled_at: Utc::now(), + completed_at: None, + error_code: 0, + error_message: None, + root_task_id: EMPTY_TASK_ID, + }) + .await?; + + match schedule_options.schedule_type { + ScheduleType::IntervalType => { + let task_mgr = task_mgr.clone(); + let mut duration = + Duration::from_secs(schedule_options.interval.unwrap() as u64); + if let Some(ms) = &schedule_options.milliseconds_interval { + duration += Duration::from_millis(*ms); + } + + runtime + .spawn(async move { + let mut fn_work = async move || { + task.next_scheduled_at = Some(Utc::now() + duration); + task_mgr.update_task(task.clone()).await??; + loop { + tokio::select! { + _ = sleep(duration) => { + let Some(_guard) = fn_lock(&task_service, &task_name).await? else { + continue; + }; + task_mgr.send(TaskMessage::ExecuteTask(task.clone())).await?; + } + _ = child_token.cancelled() => { + break; + } + } + } + Result::Ok(()) + }; + if let Err(err) = fn_work().await { + error!("[PRIVATE-TASKS] interval schedule failed due to {}", err); + } + }); + } + ScheduleType::CronType => { + let task_mgr = task_mgr.clone(); + // SAFETY: check on CreateTask + let cron_expr = schedule_options.cron.as_ref().unwrap(); + let tz = schedule_options + .time_zone + .as_ref() + .unwrap() + .parse::() + .unwrap(); + let schedule = Schedule::from_str(cron_expr).unwrap(); + let task_name = task_name.clone(); + + runtime + .spawn(async move { + let mut fn_work = async move || { + let upcoming = schedule.upcoming(tz); + + for next_time in upcoming { + let now = Utc::now(); + let duration = (next_time.with_timezone(&Utc) - now) + .to_std() + .unwrap_or(Duration::ZERO); + + task.next_scheduled_at = Some(Utc::now() + duration); + tokio::select! { + _ = sleep(duration) => { + let Some(_guard) = fn_lock(&task_service, &task_name).await? else { + continue; + }; + task_mgr.send(TaskMessage::ExecuteTask(task.clone())).await?; + } + _ = child_token.cancelled() => { + break; + } + } + } + Result::Ok(()) + }; + if let Err(err) = fn_work().await { + error!("[PRIVATE-TASKS] cron schedule failed due to {}", err); + } + }); + } + } + let _ = scheduled_tasks.insert(task_name_clone, token); + } + } + TaskMessage::ExecuteTask(task) => { + let Some(_guard) = fn_lock(self, &task.task_name).await? else { + continue; + }; + let task_name = task.task_name.clone(); + let task_service = TaskService::instance(); + + let mut task_run = task_service + .lasted_task_run(&task_name) + .await? + .unwrap_or_else(|| TaskRun { + task: task.clone(), + run_id: Self::make_run_id(), + attempt_number: task.suspend_task_after_num_failures.unwrap_or(0) + as i32, + state: State::Executing, + scheduled_at: Utc::now(), + completed_at: None, + error_code: 0, + error_message: None, + root_task_id: EMPTY_TASK_ID, + }); + task_service.update_or_create_task_run(&task_run).await?; + + let task_mgr = task_mgr.clone(); + let tenant = tenant.clone(); + let owner = Self::get_task_owner(&task, &tenant).await?; + + runtime.try_spawn( + async move { + let mut fn_work = async move || { + while task_run.attempt_number >= 0 { + let task_result = + Self::spawn_task(task.clone(), owner.clone()).await; + + match task_result { + Ok(()) => { + task_run.state = State::Succeeded; + task_run.completed_at = Some(Utc::now()); + task_service + .update_or_create_task_run(&task_run) + .await?; + + let mut stream = + Box::pin(task_service.check_next_tasks(&task_name)); + + while let Some(next_task) = stream.next().await { + let next_task = next_task?; + let dep_task = task_mgr + .describe_task(&next_task) + .await?? + .ok_or_else(|| { + ErrorCode::UnknownTask(next_task) + })?; + + task_mgr + .send(TaskMessage::ExecuteTask(dep_task)) + .await?; + } + task_mgr.execute_accept(&task_key).await?; + break; + } + Err(err) => { + task_run.state = State::Failed; + task_run.completed_at = Some(Utc::now()); + task_run.attempt_number -= 1; + task_run.error_code = err.code() as i64; + task_run.error_message = Some(err.message()); + task_service + .update_or_create_task_run(&task_run) + .await?; + task_run.run_id = Self::make_run_id(); + } + } + task_mgr.execute_accept(&task_key).await?; + task_mgr + .alter_task(&task.task_name, &AlterTaskOptions::Suspend) + .await??; + } + + Result::Ok(()) + }; + if let Err(err) = fn_work().await { + error!("[PRIVATE-TASKS] execute failed due to {}", err); + } + }, + None, + )?; + } + TaskMessage::DeleteTask(task_name) => { + if let Some(_guard) = fn_lock(self, &task_name).await? { + self.clean_task_afters(&task_name).await?; + } + if let Some(token) = scheduled_tasks.remove(&task_name) { + token.cancel(); + } + } + TaskMessage::AfterTask(task) => { + let Some(_guard) = fn_lock(self, &task.task_name).await? else { + continue; + }; + match task.status { + Status::Suspended => continue, + Status::Started => (), + } + self.update_task_afters(&task).await?; + } + } + } + Ok(()) + } + + pub async fn subscribe(&self) -> Result { + let (min, max) = TaskMessage::prefix_range(); + let left = TaskMessageIdent::new(&self.tenant, min).to_string_key(); + let right = TaskMessageIdent::new(&self.tenant, max).to_string_key(); + + let watch = WatchRequest::new(left, Some(right)).with_initial_flush(true); + let stream = self + .meta_handle + .meta_client() + .watch_with_initialization(watch) + .await?; + + Ok(Box::pin(stream.filter_map(|result| { + result + .map(Self::decode) + .map_err(|_| ErrorCode::MetaServiceError("task watch-stream closed")) + .flatten() + .transpose() + }))) + } + + fn decode(resp: WatchResponse) -> Result> { + let Some((key, _, Some(value))) = resp.unpack() else { + return Ok(None); + }; + let message = decode_seqv::(value, || format!("decode value of {}", key)) + .map_err(MetaError::from)?; + + Ok(Some((key, TaskMessage::clone(message.deref())))) + } + + async fn get_task_owner(task: &Task, tenant: &Tenant) -> Result { + UserApiProvider::instance() + .get_user( + tenant, + UserIdentity::parse(&task.owner_user).map_err(|e| { + ErrorCode::MetaServiceError(format!("Failed to parse UserIdentity: {}", e)) + })?, + ) + .await + } + + async fn spawn_task(task: Task, user: UserInfo) -> Result<()> { + let task_service = TaskService::instance(); + + if let Some(when_condition) = &task.when_condition { + let result = task_service + .execute_sql(Some(user.clone()), &format!("SELECT {when_condition}")) + .await?; + let is_met = result + .first() + .and_then(|block| block.get_by_offset(0).index(0)) + .and_then(|scalar| { + scalar + .as_boolean() + .cloned() + .map(Ok) + .or_else(|| scalar.as_string().map(|str| str.trim().parse::())) + }) + .transpose() + .map_err(|err| { + ErrorCode::TaskWhenConditionNotMet(format!( + "when condition error for task: {}, {}", + task.task_name, err + )) + })? + .unwrap_or(false); + if !is_met { + return Err(ErrorCode::TaskWhenConditionNotMet(format!( + "when condition not met for task: {}", + task.task_name + ))); + } + } + task_service + .execute_sql(Some(user), &task.query_text) + .await?; + + Ok(()) + } + + pub async fn create_context(&self, other_user: Option) -> Result> { + let (user, role) = if let Some(other_user) = other_user { + (other_user, None) + } else { + ( + get_task_user(self.tenant.tenant_name(), &self.cluster_id), + Some(BUILTIN_ROLE_ACCOUNT_ADMIN.to_string()), + ) + }; + let session = create_session(user, role).await?; + // only need run the sql on the current node + session.create_query_context_with_cluster(Arc::new(Cluster { + unassign: false, + local_id: self.node_id.clone(), + nodes: vec![], + })) + } + + pub async fn lasted_task_run(&self, task_name: &str) -> Result> { + let blocks = self + .execute_sql( + None, + &format!( + "SELECT + task_id, + task_name, + query_text, + when_condition, + after, + comment, + owner, + owner_user, + warehouse_name, + using_warehouse_size, + schedule_type, + interval, + interval_milliseconds, + cron, + time_zone, + run_id, + attempt_number, + state, + error_code, + error_message, + root_task_id, + scheduled_at, + completed_at, + next_scheduled_at, + error_integration, + status, + created_at, + updated_at, + session_params, + last_suspended_at, + suspend_task_after_num_failures + FROM system_task.task_run WHERE task_name = '{task_name}' ORDER BY run_id DESC LIMIT 1;" + ), + ) + .await?; + + let Some(block) = blocks.first() else { + return Ok(None); + }; + if block.num_rows() == 0 { + return Ok(None); + } + Ok(Self::block2task_run(block, 0)) + } + + pub async fn update_or_create_task_run(&self, task_run: &TaskRun) -> Result<()> { + let state = match task_run.state { + State::Scheduled => "SCHEDULED".to_string(), + State::Executing => "EXECUTING".to_string(), + State::Succeeded => "SUCCEEDED".to_string(), + State::Failed => "FAILED".to_string(), + State::Cancelled => "CANCELLED".to_string(), + }; + let scheduled_at = task_run.scheduled_at.timestamp(); + let completed_at = task_run + .completed_at + .map(|time| time.timestamp().to_string()) + .unwrap_or_else(|| "null".to_string()); + let error_message = task_run + .error_message + .as_ref() + .map(|s| format!("'{s}'")) + .unwrap_or_else(|| "null".to_string()); + let root_task_id = task_run.root_task_id; + + let is_exists = self.execute_sql(None, &format!("UPDATE system_task.task_run SET run_id = {}, state = '{}', scheduled_at = {}, completed_at = {}, error_message = {}, root_task_id = {} WHERE task_name = '{}' AND run_id = {}", task_run.run_id, state, scheduled_at, completed_at, error_message, root_task_id, task_run.task.task_name, task_run.run_id)).await? + .first() + .and_then(|block| { + block.get_by_offset(0).index(0).and_then(|s| s.as_number().and_then(|n| n.as_u_int64().cloned())) + }) + .map(|v| v > 0) + .unwrap_or(false); + + if !is_exists { + self.execute_sql(None, &Self::task_run2insert(task_run)?) + .await?; + } + Ok(()) + } + + pub fn check_next_tasks<'a>( + &'a self, + task_name: &'a str, + ) -> impl Stream> + '_ { + stream! { + let check = format!("WITH latest_task_run AS ( \ + SELECT \ + ranked.task_name, \ + ranked.state, \ + ranked.completed_at \ + FROM ( \ + SELECT \ + task_name, \ + state, \ + completed_at, \ + ROW_NUMBER() OVER (PARTITION BY task_name ORDER BY completed_at DESC) AS rn \ + FROM system_task.task_run \ + ) AS ranked \ + WHERE ranked.rn = 1 \ + ) \ + SELECT ta.next_task \ + FROM system_task.task_after ta \ + JOIN system_task.task_after ta2 \ + ON ta.next_task = ta2.next_task \ + LEFT JOIN latest_task_run tr \ + ON ta2.task_name = tr.task_name \ + AND tr.state = 'SUCCEEDED' \ + AND tr.completed_at IS NOT NULL \ + WHERE ta.task_name = '{task_name}' \ + GROUP BY ta.next_task \ + HAVING COUNT(DISTINCT ta2.task_name) = COUNT(DISTINCT tr.task_name);"); + if let Some(next_task) = self.execute_sql(None, &check).await?.first().and_then(|block| block.columns()[0].index(0).and_then(|scalar| { scalar.as_string().map(|s| s.to_string()) })) { + yield Result::Ok(next_task); + } + } + } + + pub async fn clean_task_afters(&self, task_name: &str) -> Result<()> { + self.execute_sql( + None, + &format!( + "DELETE FROM system_task.task_after WHERE next_task = '{}'", + task_name + ), + ) + .await?; + + Ok(()) + } + + pub async fn update_task_afters(&self, task: &Task) -> Result<()> { + self.clean_task_afters(&task.task_name).await?; + let values = task + .after + .iter() + .map(|after| format!("('{}', '{}')", after, task.task_name)) + .join(", "); + self.execute_sql( + None, + &format!( + "INSERT INTO system_task.task_after (task_name, next_task) VALUES {}", + values + ), + ) + .await?; + + Ok(()) + } + + fn task_run2insert(task_run: &TaskRun) -> Result { + let task = &task_run.task; + + let sql = format!( + "INSERT INTO system_task.task_run (\ + task_id, + task_name, + query_text, + when_condition, + after, + comment, + owner, + owner_user, + warehouse_name, + using_warehouse_size, + schedule_type, + interval, + interval_milliseconds, + cron, + time_zone, + run_id, + attempt_number, + state, + error_code, + error_message, + root_task_id, + scheduled_at, + completed_at, + next_scheduled_at, + error_integration, + status, + created_at, + updated_at, + session_params, + last_suspended_at, + suspend_task_after_num_failures + ) values ( + {}, + '{}', + '{}', + {}, + {}, + {}, + '{}', + '{}', + {}, + {}, + {}, + {}, + {}, + {}, + {}, + {}, + {}, + '{}', + {}, + {}, + {}, + {}, + {}, + {}, + {}, + '{}', + {}, + {}, + {}, + {}, + {} + );", + task.task_id, + task.task_name, + task.query_text.replace('\'', "''"), + task.when_condition + .as_ref() + .map(|s| format!("'{s}'").replace('\'', "''")) + .unwrap_or_else(|| "null".to_string()), + if !task.after.is_empty() { + format!("'{}'", task.after.join(", ")) + } else { + "null".to_string() + }, + task.comment + .as_ref() + .map(|s| format!("'{s}'")) + .unwrap_or_else(|| "null".to_string()), + task.owner, + task.owner_user.replace('\'', "''"), + task.warehouse_options + .as_ref() + .and_then(|w| w.warehouse.as_ref()) + .map(|s| format!("'{s}'")) + .unwrap_or_else(|| "null".to_string()), + task.warehouse_options + .as_ref() + .and_then(|w| w.using_warehouse_size.as_ref()) + .map(|s| format!("'{s}'")) + .unwrap_or_else(|| "null".to_string()), + task.schedule_options + .as_ref() + .map(|s| match s.schedule_type { + ScheduleType::IntervalType => "0".to_string(), + ScheduleType::CronType => "1".to_string(), + }) + .unwrap_or_else(|| "null".to_string()), + task.schedule_options + .as_ref() + .and_then(|s| s.interval) + .map(|v| v.to_string()) + .unwrap_or_else(|| "null".to_string()), + task.schedule_options + .as_ref() + .and_then(|s| s.milliseconds_interval) + .map(|v| v.to_string()) + .unwrap_or_else(|| "null".to_string()), + task.schedule_options + .as_ref() + .and_then(|s| s.cron.as_ref()) + .map(|s| format!("'{s}'")) + .unwrap_or_else(|| "null".to_string()), + task.schedule_options + .as_ref() + .and_then(|s| s.time_zone.as_ref()) + .map(|s| format!("'{s}'")) + .unwrap_or_else(|| "null".to_string()), + task_run.run_id, + task_run.attempt_number, + match task_run.state { + State::Scheduled => "SCHEDULED".to_string(), + State::Executing => "EXECUTING".to_string(), + State::Succeeded => "SUCCEEDED".to_string(), + State::Failed => "FAILED".to_string(), + State::Cancelled => "CANCELLED".to_string(), + }, + task_run.error_code, + task_run + .error_message + .as_ref() + .map(|s| format!("'{s}'")) + .unwrap_or_else(|| "null".to_string()), + task_run.root_task_id, + task_run.scheduled_at.timestamp(), + task_run + .completed_at + .as_ref() + .map(|d| d.to_string()) + .unwrap_or_else(|| "null".to_string()), + task.next_scheduled_at + .as_ref() + .map(|d| d.timestamp().to_string()) + .unwrap_or_else(|| "null".to_string()), + task.error_integration + .as_ref() + .map(|s| format!("'{s}'")) + .unwrap_or_else(|| "null".to_string()), + match task.status { + Status::Suspended => "SUSPENDED".to_string(), + Status::Started => "STARTED".to_string(), + }, + task.created_at.timestamp(), + task.updated_at.timestamp(), + serde_json::to_string(&task.session_params).map(|s| format!("'{s}'"))?, + task.last_suspended_at + .as_ref() + .map(|d| d.timestamp().to_string()) + .unwrap_or_else(|| "null".to_string()), + task.suspend_task_after_num_failures + .map(|s| s.to_string()) + .unwrap_or_else(|| "null".to_string()) + ); + Ok(sql) + } + + fn block2task_run(block: &DataBlock, row: usize) -> Option { + let task_id = *block + .get_by_offset(0) + .index(row)? + .as_number()? + .as_u_int64()?; + let task_name = block.get_by_offset(1).index(row)?.as_string()?.to_string(); + let query_text = block.get_by_offset(2).index(row)?.as_string()?.to_string(); + let when_condition = block + .get_by_offset(3) + .index(row) + .and_then(|s| s.as_string().map(|s| s.to_string())); + let after = block + .get_by_offset(4) + .index(row)? + .as_string()? + .split(", ") + .map(str::to_string) + .collect::>(); + let comment = block + .get_by_offset(5) + .index(row) + .and_then(|s| s.as_string().map(|s| s.to_string())); + let owner = block.get_by_offset(6).index(row)?.as_string()?.to_string(); + let owner_user = block.get_by_offset(7).index(row)?.as_string()?.to_string(); + let warehouse_name = block + .get_by_offset(8) + .index(row) + .and_then(|s| s.as_string().map(|s| s.to_string())); + let using_warehouse_size = block + .get_by_offset(9) + .index(row) + .and_then(|s| s.as_string().map(|s| s.to_string())); + let schedule_type = block + .get_by_offset(10) + .index(row) + .and_then(|s| s.as_number().and_then(|n| n.as_int32()).cloned()); + let interval = block + .get_by_offset(11) + .index(row) + .and_then(|s| s.as_number().and_then(|n| n.as_int32()).cloned()); + let milliseconds_interval = block + .get_by_offset(12) + .index(row) + .and_then(|s| s.as_number().and_then(|n| n.as_u_int64()).cloned()); + let cron = block + .get_by_offset(13) + .index(row) + .and_then(|s| s.as_string().map(|s| s.to_string())); + let time_zone = block + .get_by_offset(14) + .index(row) + .and_then(|s| s.as_string().map(|s| s.to_string())); + let run_id = *block + .get_by_offset(15) + .index(row)? + .as_number()? + .as_u_int64()?; + let attempt_number = block + .get_by_offset(16) + .index(row) + .and_then(|s| s.as_number().and_then(|n| n.as_int32()).cloned()); + let state = block.get_by_offset(17).index(row)?.as_string()?.to_string(); + let error_code = *block + .get_by_offset(18) + .index(row)? + .as_number()? + .as_int64()?; + let error_message = block + .get_by_offset(19) + .index(row) + .and_then(|s| s.as_string().map(|s| s.to_string())); + let root_task_id = *block + .get_by_offset(20) + .index(row)? + .as_number()? + .as_u_int64()?; + let scheduled_at = *block.get_by_offset(21).index(row)?.as_timestamp()?; + let completed_at = block + .get_by_offset(22) + .index(row) + .and_then(|s| s.as_timestamp().cloned()); + let next_scheduled_at = block + .get_by_offset(23) + .index(row) + .and_then(|s| s.as_timestamp().cloned()); + let error_integration = block + .get_by_offset(24) + .index(row) + .and_then(|s| s.as_string().map(|s| s.to_string())); + let status = block.get_by_offset(25).index(row)?.as_string()?.to_string(); + let created_at = *block.get_by_offset(26).index(row)?.as_timestamp()?; + let updated_at = *block.get_by_offset(27).index(row)?.as_timestamp()?; + let session_params = block.get_by_offset(28).index(row).and_then(|s| { + s.as_variant() + .and_then(|bytes| serde_json::from_slice::>(bytes).ok()) + })?; + let last_suspended_at = block + .get_by_offset(29) + .index(row) + .and_then(|s| s.as_timestamp().cloned()); + + let schedule_options = if let Some(s) = schedule_type { + let schedule_type = match s { + 0 => ScheduleType::IntervalType, + 1 => ScheduleType::CronType, + _ => { + return None; + } + }; + Some(ScheduleOptions { + interval, + cron, + time_zone, + schedule_type, + milliseconds_interval, + }) + } else { + None + }; + + let warehouse_options = if warehouse_name.is_some() && using_warehouse_size.is_some() { + Some(WarehouseOptions { + warehouse: warehouse_name, + using_warehouse_size, + }) + } else { + None + }; + let task = Task { + task_id, + task_name, + query_text, + when_condition, + after, + comment, + owner, + owner_user, + schedule_options, + warehouse_options, + next_scheduled_at: next_scheduled_at + .and_then(|i| DateTime::::from_timestamp(i, 0)), + suspend_task_after_num_failures: attempt_number.map(|i| i as u64), + error_integration, + status: match status.as_str() { + "SUSPENDED" => Status::Suspended, + "STARTED" => Status::Started, + _ => return None, + }, + created_at: DateTime::::from_timestamp(created_at, 0)?, + updated_at: DateTime::::from_timestamp(updated_at, 0)?, + last_suspended_at: last_suspended_at + .and_then(|i| DateTime::::from_timestamp(i, 0)), + session_params, + }; + + Some(TaskRun { + task, + run_id, + attempt_number: attempt_number.unwrap_or_default(), + state: match state.as_str() { + "SCHEDULED" => State::Scheduled, + "EXECUTING" => State::Executing, + "SUCCEEDED" => State::Succeeded, + "FAILED" => State::Failed, + "CANCELLED" => State::Cancelled, + _ => return None, + }, + scheduled_at: DateTime::::from_timestamp(scheduled_at, 0)?, + completed_at: completed_at.and_then(|i| DateTime::::from_timestamp(i, 0)), + error_code, + error_message, + root_task_id, + }) + } + + async fn execute_sql(&self, other_user: Option, sql: &str) -> Result> { + let context = self.create_context(other_user).await?; + + let mut planner = Planner::new_with_query_executor( + context.clone(), + Arc::new(ServiceQueryExecutor::new(QueryContext::create_from( + context.as_ref(), + ))), + ); + let (plan, _) = planner.plan_sql(sql).await?; + let executor = InterpreterFactory::get(context.clone(), &plan).await?; + let stream = executor.execute(context).await?; + stream.try_collect::>().await + } + + fn make_run_id() -> u64 { + Utc::now().timestamp_millis() as u64 + } +} diff --git a/src/query/service/src/task/session.rs b/src/query/service/src/task/session.rs new file mode 100644 index 0000000000000..dbed311cb993b --- /dev/null +++ b/src/query/service/src/task/session.rs @@ -0,0 +1,49 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_catalog::session_type::SessionType; +use databend_common_exception::Result; +use databend_common_meta_app::principal::GrantObject; +use databend_common_meta_app::principal::UserInfo; +use databend_common_meta_app::principal::UserPrivilegeType; + +use crate::sessions::Session; +use crate::sessions::SessionManager; + +/// Create a user for task with necessary privileges +pub fn get_task_user(tenant_id: &str, cluster_id: &str) -> UserInfo { + let mut user = UserInfo::new_no_auth( + format!("{}-{}-task", tenant_id, cluster_id).as_str(), + "0.0.0.0", + ); + user.grants.grant_privileges( + &GrantObject::Global, + UserPrivilegeType::CreateDatabase.into(), + ); + user +} + +/// Create a dummy session for task +pub async fn create_session( + user: UserInfo, + restricted_role: Option, +) -> Result> { + let session_manager = SessionManager::instance(); + let session = session_manager.create_session(SessionType::MySQL).await?; + let session = session_manager.register_session(session)?; + session.set_authed_user(user, restricted_role).await?; + Ok(session) +} diff --git a/src/query/storages/system/src/tasks_table.rs b/src/query/storages/system/src/tasks_table.rs index a44a26e629b81..e9340254f5550 100644 --- a/src/query/storages/system/src/tasks_table.rs +++ b/src/query/storages/system/src/tasks_table.rs @@ -21,7 +21,7 @@ use databend_common_cloud_control::client_config::build_client_config; use databend_common_cloud_control::client_config::make_request; use databend_common_cloud_control::cloud_api::CloudControlApiProvider; use databend_common_cloud_control::pb::ShowTasksRequest; -use databend_common_cloud_control::pb::Task; +use databend_common_cloud_control::task_utils::Task; use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -36,6 +36,7 @@ use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; use databend_common_sql::plans::task_schema; +use itertools::Itertools; use crate::table::AsyncOneBlockSystemTable; use crate::table::AsyncSystemTable; @@ -59,25 +60,24 @@ pub fn parse_tasks_to_datablock(tasks: Vec) -> Result { let mut last_suspended_on: Vec> = Vec::with_capacity(tasks.len()); let mut session_params: Vec>> = Vec::with_capacity(tasks.len()); for task in tasks { - let tsk: databend_common_cloud_control::task_utils::Task = task.try_into()?; - created_on.push(tsk.created_at.timestamp_micros()); - name.push(tsk.task_name); - id.push(tsk.task_id); - owner.push(tsk.owner); - comment.push(tsk.comment); - warehouse.push(tsk.warehouse_options.and_then(|s| s.warehouse)); - schedule.push(tsk.schedule_options); - status.push(tsk.status.to_string()); - definition.push(tsk.query_text); - condition_text.push(tsk.condition_text); + created_on.push(task.created_at.timestamp_micros()); + name.push(task.task_name); + id.push(task.task_id); + owner.push(task.owner); + comment.push(task.comment); + warehouse.push(task.warehouse_options.and_then(|s| s.warehouse)); + schedule.push(task.schedule_options); + status.push(task.status.to_string()); + definition.push(task.query_text); + condition_text.push(task.condition_text); // join by comma - after.push(tsk.after.into_iter().collect::>().join(",")); - suspend_after_num_failures.push(tsk.suspend_task_after_num_failures.map(|v| v as u64)); - error_integration.push(tsk.error_integration); - next_schedule_time.push(tsk.next_scheduled_at.map(|t| t.timestamp_micros())); - last_committed_on.push(tsk.updated_at.timestamp_micros()); - last_suspended_on.push(tsk.last_suspended_at.map(|t| t.timestamp_micros())); - let serialized_params = serde_json::to_vec(&tsk.session_params).unwrap(); + after.push(task.after.into_iter().collect::>().join(",")); + suspend_after_num_failures.push(task.suspend_task_after_num_failures.map(|v| v as u64)); + error_integration.push(task.error_integration); + next_schedule_time.push(task.next_scheduled_at.map(|t| t.timestamp_micros())); + last_committed_on.push(task.updated_at.timestamp_micros()); + last_suspended_on.push(task.last_suspended_at.map(|t| t.timestamp_micros())); + let serialized_params = serde_json::to_vec(&task.session_params).unwrap(); session_params.push(Some(serialized_params)); } @@ -155,7 +155,7 @@ impl AsyncSystemTable for TasksTable { let resp = task_client.show_tasks(req).await?; let tasks = resp.tasks; - parse_tasks_to_datablock(tasks) + parse_tasks_to_datablock(tasks.into_iter().map(Task::try_from).try_collect()?) } } diff --git a/src/query/users/src/lib.rs b/src/query/users/src/lib.rs index 19f9d595ce55e..181309778098d 100644 --- a/src/query/users/src/lib.rs +++ b/src/query/users/src/lib.rs @@ -33,6 +33,7 @@ pub mod connection; pub mod file_format; pub mod role_cache_mgr; pub mod role_util; +mod user_task; pub use jwt::*; pub use password_policy::*; diff --git a/src/query/users/src/user_api.rs b/src/query/users/src/user_api.rs index d3c9c4005acf8..9151644e12c6b 100644 --- a/src/query/users/src/user_api.rs +++ b/src/query/users/src/user_api.rs @@ -23,6 +23,7 @@ use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_grpc::RpcClientConf; +use databend_common_management::task::TaskMgr; use databend_common_management::udf::UdfMgr; use databend_common_management::ClientSessionMgr; use databend_common_management::ConnectionMgr; @@ -156,6 +157,10 @@ impl UserApiProvider { UdfMgr::create(self.client.clone(), tenant) } + pub fn task_api(&self, tenant: &Tenant) -> TaskMgr { + TaskMgr::create(self.client.clone(), tenant) + } + pub fn user_api(&self, tenant: &Tenant) -> Arc { let user_mgr = UserMgr::create(self.client.clone(), tenant); Arc::new(user_mgr) diff --git a/src/query/users/src/user_task.rs b/src/query/users/src/user_task.rs new file mode 100644 index 0000000000000..58fa175e0dc6b --- /dev/null +++ b/src/query/users/src/user_task.rs @@ -0,0 +1,78 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_ast::ast::AlterTaskOptions; +use databend_common_exception::Result; +use databend_common_meta_app::principal::Task; +use databend_common_meta_app::schema::CreateOption; +use databend_common_meta_app::tenant::Tenant; + +use crate::UserApiProvider; + +impl UserApiProvider { + // Add a new Task. + #[async_backtrace::framed] + pub async fn create_task( + &self, + tenant: &Tenant, + task: Task, + create_option: &CreateOption, + ) -> Result<()> { + let task_api = self.task_api(tenant); + task_api.create_task(task, create_option).await??; + Ok(()) + } + + #[async_backtrace::framed] + pub async fn execute_task(&self, tenant: &Tenant, task_name: &str) -> Result<()> { + let task_api = self.task_api(tenant); + task_api.execute_task(task_name).await??; + Ok(()) + } + + #[async_backtrace::framed] + pub async fn alter_task( + &self, + tenant: &Tenant, + task_name: &str, + alter_options: &AlterTaskOptions, + ) -> Result<()> { + let task_api = self.task_api(tenant); + task_api.alter_task(task_name, alter_options).await??; + Ok(()) + } + + #[async_backtrace::framed] + pub async fn describe_task(&self, tenant: &Tenant, task_name: &str) -> Result> { + let task_api = self.task_api(tenant); + let task = task_api.describe_task(task_name).await??; + Ok(task) + } + + #[async_backtrace::framed] + pub async fn drop_task(&self, tenant: &Tenant, task_name: &str) -> Result<()> { + let task_api = self.task_api(tenant); + task_api.drop_task(task_name).await?; + + Ok(()) + } + + #[async_backtrace::framed] + pub async fn show_tasks(&self, tenant: &Tenant) -> Result> { + let task_api = self.task_api(tenant); + let tasks = task_api.list_task().await?; + + Ok(tasks) + } +}