Skip to content

feat: impl Task for private #18311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,20 @@ build_exceptions! {
UDFDataError(2607),
}

// Task Errors [2611-2614]
build_exceptions! {
/// Unknown Task
UnknownTask(2611),
/// Task already exists
TaskAlreadyExists(2612),
/// Task timezone invalid
TaskTimezoneInvalid(2613),
/// Task cron invalid
TaskCronInvalid(2614),
/// Task schedule and after conflict
TaskScheduleAndAfterConflict(2615),
}

// Search and External Service Errors [1901-1903, 1910]
build_exceptions! {
/// Tantivy error
Expand Down
13 changes: 13 additions & 0 deletions src/meta/app/src/principal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub mod procedure_id_to_name;
pub mod procedure_identity;
pub mod procedure_name_ident;
pub mod stage_file_ident;
pub mod task;
pub mod task_ident;
pub mod task_run_ident;
pub mod tenant_ownership_object_ident;
pub mod tenant_user_ident;
pub mod user_defined_file_format_ident;
Expand Down Expand Up @@ -86,6 +89,16 @@ pub use role_info::RoleInfo;
pub use role_info::RoleInfoSerdeError;
pub use stage_file_ident::StageFileIdent;
pub use stage_file_path::StageFilePath;
pub use task::AfterTaskState;
pub use task::ScheduleOptions;
pub use task::ScheduleType;
pub use task::State;
pub use task::Status;
pub use task::Task;
pub use task::TaskRun;
pub use task::WarehouseOptions;
pub use task_ident::TaskIdent;
pub use task_ident::TaskIdentRaw;
pub use tenant_ownership_object_ident::TenantOwnershipObjectIdent;
pub use tenant_user_ident::TenantUserIdent;
pub use udf_ident::UdfIdent;
Expand Down
135 changes: 135 additions & 0 deletions src/meta/app/src/principal/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::HashSet;
use std::sync::Arc;

use chrono::DateTime;
use chrono::Utc;

pub const EMPTY_TASK_ID: u64 = 0;

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum ScheduleType {
IntervalType = 0,
CronType = 1,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum Status {
Suspended = 0,
Started = 1,
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum State {
Scheduled = 0,
Executing = 1,
Succeeded = 2,
Failed = 3,
Cancelled = 4,
}

#[derive(Debug, Clone, PartialEq)]
pub struct ScheduleOptions {
pub interval: Option<i32>,
pub cron: Option<String>,
pub time_zone: Option<String>,
pub schedule_type: ScheduleType,
pub milliseconds_interval: Option<u64>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct WarehouseOptions {
pub warehouse: Option<String>,
pub using_warehouse_size: Option<String>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct Task {
pub task_id: u64,
pub task_name: String,
pub query_text: String,
pub when_condition: Option<String>,
pub after: Vec<String>,
pub comment: Option<String>,
// expired useless
pub owner: String,
pub owner_user: String,
pub schedule_options: Option<ScheduleOptions>,
pub warehouse_options: Option<WarehouseOptions>,
pub next_scheduled_at: Option<DateTime<Utc>>,
pub suspend_task_after_num_failures: Option<u64>,
// TODO
pub error_integration: Option<String>,
pub status: Status,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub last_suspended_at: Option<DateTime<Utc>>,
// TODO
pub session_params: BTreeMap<String, String>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct TaskRun {
pub task: Task,
pub run_id: u64,
pub attempt_number: i32,
pub state: State,
pub scheduled_at: DateTime<Utc>,
pub completed_at: Option<DateTime<Utc>>,
pub error_code: i64,
pub error_message: Option<String>,
// expired useless
pub root_task_id: u64,
}

impl TaskRun {
pub fn key(&self) -> String {
format!("{}@{}", self.task.task_name, self.run_id)
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct AfterTaskInfo {
pub afters: Arc<Vec<String>>,
}

pub struct AfterTaskState {
waiting: HashSet<String>,
}

impl From<&Task> for AfterTaskInfo {
fn from(value: &Task) -> Self {
AfterTaskInfo {
afters: Arc::new(value.after.clone()),
}
}
}

impl AfterTaskState {
pub fn completed_task(&mut self, task_name: &str) -> bool {
self.waiting.remove(task_name);
self.waiting.is_empty()
}
}

impl From<&AfterTaskInfo> for AfterTaskState {
fn from(value: &AfterTaskInfo) -> Self {
Self {
waiting: HashSet::from_iter(value.afters.to_vec()),
}
}
}
45 changes: 45 additions & 0 deletions src/meta/app/src/principal/task_ident.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::tenant_key::ident::TIdent;

pub type TaskIdent = TIdent<Resource>;

pub type TaskIdentRaw = TIdent<Resource>;

pub use kvapi_impl::Resource;

mod kvapi_impl {
use databend_common_meta_kvapi::kvapi;

use crate::principal::task::Task;
use crate::principal::task_ident::TaskIdent;
use crate::tenant_key::resource::TenantResource;

pub struct Resource;
impl TenantResource for Resource {
const PREFIX: &'static str = "__fd_tasks";
const TYPE: &'static str = "TaskIdent";
const HAS_TENANT: bool = true;
type ValueType = Task;
}

impl kvapi::Value for Task {
type KeyType = TaskIdent;

fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {
[]
}
}
}
45 changes: 45 additions & 0 deletions src/meta/app/src/principal/task_run_ident.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::tenant_key::ident::TIdent;

pub type TaskRunIdent = TIdent<Resource>;

pub type TaskRunIdentRaw = TIdent<Resource>;

pub use kvapi_impl::Resource;

mod kvapi_impl {
use databend_common_meta_kvapi::kvapi;

use crate::principal::task_run_ident::TaskRunIdent;
use crate::principal::TaskRun;
use crate::tenant_key::resource::TenantResource;

pub struct Resource;
impl TenantResource for Resource {
const PREFIX: &'static str = "__fd_task_runs";
const TYPE: &'static str = "TaskRunIdent";
const HAS_TENANT: bool = true;
type ValueType = TaskRun;
}

impl kvapi::Value for TaskRun {
type KeyType = TaskRunIdent;

fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {
[]
}
}
}
1 change: 1 addition & 0 deletions src/meta/proto-conv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ mod schema_from_to_protobuf_impl;
mod sequence_from_to_protobuf_impl;
mod stage_from_to_protobuf_impl;
mod table_from_to_protobuf_impl;
mod task_from_to_protobuf_impl;
mod tenant_quota_from_to_protobuf_impl;
mod tident_from_to_protobuf_impl;
mod token_from_to_protobuf_impl;
Expand Down
Loading
Loading