Skip to content

feat: impl Task for private #18311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/binaries/query/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use databend_query::servers::MySQLHandler;
use databend_query::servers::MySQLTlsConfig;
use databend_query::servers::Server;
use databend_query::servers::ShutdownHandle;
use databend_query::task::TaskService;
use databend_query::GlobalServices;
use log::info;

Expand Down Expand Up @@ -302,6 +303,7 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
}
println!(" system history tables: {}", conf.log.history);
}
TaskService::instance().initialized();

println!();
println!(
Expand Down
16 changes: 16 additions & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,22 @@ build_exceptions! {
UDFDataError(2607),
}

// Task Errors [2611-2614]
build_exceptions! {
/// Unknown Task
UnknownTask(2611),
/// Task already exists
TaskAlreadyExists(2612),
/// Task timezone invalid
TaskTimezoneInvalid(2613),
/// Task cron invalid
TaskCronInvalid(2614),
/// Task schedule and after conflict
TaskScheduleAndAfterConflict(2615),
/// Task when condition not met
TaskWhenConditionNotMet(2616),
}

// Search and External Service Errors [1901-1903, 1910]
build_exceptions! {
/// Tantivy error
Expand Down
2 changes: 1 addition & 1 deletion src/meta/api/src/kv_pb_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use futures::TryStreamExt;
use itertools::Itertools;

pub(crate) use self::codec::decode_non_empty_item;
pub(crate) use self::codec::decode_seqv;
pub use self::codec::decode_seqv;
pub(crate) use self::codec::decode_transition;
pub(crate) use self::codec::encode_operation;
pub use self::upsert_pb::UpsertPB;
Expand Down
13 changes: 13 additions & 0 deletions src/meta/app/src/principal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub mod procedure_id_to_name;
pub mod procedure_identity;
pub mod procedure_name_ident;
pub mod stage_file_ident;
pub mod task;
pub mod task_ident;
pub mod task_message_ident;
pub mod tenant_ownership_object_ident;
pub mod tenant_user_ident;
pub mod user_defined_file_format_ident;
Expand Down Expand Up @@ -86,6 +89,16 @@ pub use role_info::RoleInfo;
pub use role_info::RoleInfoSerdeError;
pub use stage_file_ident::StageFileIdent;
pub use stage_file_path::StageFilePath;
pub use task::ScheduleOptions;
pub use task::ScheduleType;
pub use task::State;
pub use task::Status;
pub use task::Task;
pub use task::TaskMessage;
pub use task::TaskRun;
pub use task::WarehouseOptions;
pub use task_ident::TaskIdent;
pub use task_ident::TaskIdentRaw;
pub use tenant_ownership_object_ident::TenantOwnershipObjectIdent;
pub use tenant_user_ident::TenantUserIdent;
pub use udf_ident::UdfIdent;
Expand Down
138 changes: 138 additions & 0 deletions src/meta/app/src/principal/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;

use chrono::DateTime;
use chrono::Utc;

pub const EMPTY_TASK_ID: u64 = 0;

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum ScheduleType {
IntervalType = 0,
CronType = 1,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum Status {
Suspended = 0,
Started = 1,
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum State {
Scheduled = 0,
Executing = 1,
Succeeded = 2,
Failed = 3,
Cancelled = 4,
}

#[derive(Debug, Clone, PartialEq)]
pub struct ScheduleOptions {
pub interval: Option<i32>,
pub cron: Option<String>,
pub time_zone: Option<String>,
pub schedule_type: ScheduleType,
pub milliseconds_interval: Option<u64>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct WarehouseOptions {
pub warehouse: Option<String>,
pub using_warehouse_size: Option<String>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct Task {
pub task_id: u64,
pub task_name: String,
pub query_text: String,
pub when_condition: Option<String>,
pub after: Vec<String>,
pub comment: Option<String>,
// expired useless
pub owner: String,
pub owner_user: String,
pub schedule_options: Option<ScheduleOptions>,
pub warehouse_options: Option<WarehouseOptions>,
pub next_scheduled_at: Option<DateTime<Utc>>,
pub suspend_task_after_num_failures: Option<u64>,
// TODO
pub error_integration: Option<String>,
pub status: Status,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub last_suspended_at: Option<DateTime<Utc>>,
// TODO
pub session_params: BTreeMap<String, String>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct TaskRun {
pub task: Task,
pub run_id: u64,
pub attempt_number: i32,
pub state: State,
pub scheduled_at: DateTime<Utc>,
pub completed_at: Option<DateTime<Utc>>,
pub error_code: i64,
pub error_message: Option<String>,
// expired useless
pub root_task_id: u64,
}

impl TaskRun {
pub fn key(&self) -> String {
format!("{}@{}", self.task.task_name, self.run_id)
}
}

#[derive(Debug, Clone, PartialEq)]
pub enum TaskMessage {
ExecuteTask(Task),
ScheduleTask(Task),
DeleteTask(String),
AfterTask(Task),
}

impl TaskMessage {
pub fn task_name(&self) -> &str {
match self {
TaskMessage::ExecuteTask(task)
| TaskMessage::ScheduleTask(task)
| TaskMessage::AfterTask(task) => task.task_name.as_str(),
TaskMessage::DeleteTask(task_name) => task_name.as_str(),
}
}

pub fn key(&self) -> String {
let ty = match self {
TaskMessage::ExecuteTask(_) => 0,
TaskMessage::ScheduleTask(_) => 1,
TaskMessage::DeleteTask(_) => 2,
TaskMessage::AfterTask(_) => 3,
};
format!("{}-{}-{}", TaskMessage::prefix(), ty, self.task_name())
}

pub fn prefix() -> i64 {
0
}

pub fn prefix_range() -> (i64, i64) {
(0, 1)
}
}
45 changes: 45 additions & 0 deletions src/meta/app/src/principal/task_ident.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::tenant_key::ident::TIdent;

pub type TaskIdent = TIdent<Resource>;

pub type TaskIdentRaw = TIdent<Resource>;

pub use kvapi_impl::Resource;

mod kvapi_impl {
use databend_common_meta_kvapi::kvapi;

use crate::principal::task::Task;
use crate::principal::task_ident::TaskIdent;
use crate::tenant_key::resource::TenantResource;

pub struct Resource;
impl TenantResource for Resource {
const PREFIX: &'static str = "__fd_tasks";
const TYPE: &'static str = "TaskIdent";
const HAS_TENANT: bool = true;
type ValueType = Task;
}

impl kvapi::Value for Task {
type KeyType = TaskIdent;

fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {
[]
}
}
}
45 changes: 45 additions & 0 deletions src/meta/app/src/principal/task_message_ident.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::tenant_key::ident::TIdent;

pub type TaskMessageIdent = TIdent<Resource>;

pub type TaskMessageIdentRaw = TIdent<Resource>;

pub use kvapi_impl::Resource;

mod kvapi_impl {
use databend_common_meta_kvapi::kvapi;

use crate::principal::task::TaskMessage;
use crate::principal::task_message_ident::TaskMessageIdent;
use crate::tenant_key::resource::TenantResource;

pub struct Resource;
impl TenantResource for Resource {
const PREFIX: &'static str = "__fd_task_messages";
const TYPE: &'static str = "TaskMessageIdent";
const HAS_TENANT: bool = true;
type ValueType = TaskMessage;
}

impl kvapi::Value for TaskMessage {
type KeyType = TaskMessageIdent;

fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {
[]
}
}
}
2 changes: 1 addition & 1 deletion src/meta/cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,6 @@

mod cache;
mod meta_cache_types;
mod meta_client_source;
pub mod meta_client_source;

pub use cache::Cache;
1 change: 1 addition & 0 deletions src/meta/proto-conv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ mod schema_from_to_protobuf_impl;
mod sequence_from_to_protobuf_impl;
mod stage_from_to_protobuf_impl;
mod table_from_to_protobuf_impl;
mod task_from_to_protobuf_impl;
mod tenant_quota_from_to_protobuf_impl;
mod tident_from_to_protobuf_impl;
mod token_from_to_protobuf_impl;
Expand Down
Loading
Loading