Skip to content

Commit c0c285c

Browse files
committed
feat: impl Task for private
1 parent b9f2227 commit c0c285c

35 files changed

+2255
-367
lines changed

Cargo.lock

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/exception/src/exception_code.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,20 @@ build_exceptions! {
391391
UDFDataError(2607),
392392
}
393393

394+
// Task Errors [2611-2614]
395+
build_exceptions! {
396+
/// Unknown Task
397+
UnknownTask(2611),
398+
/// Task already exists
399+
TaskAlreadyExists(2612),
400+
/// Task timezone invalid
401+
TaskTimezoneInvalid(2613),
402+
/// Task cron invalid
403+
TaskCronInvalid(2614),
404+
/// Task schedule and after conflict
405+
TaskScheduleAndAfterConflict(2615),
406+
}
407+
394408
// Search and External Service Errors [1901-1903, 1910]
395409
build_exceptions! {
396410
/// Tantivy error

src/meta/app/src/principal/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ pub mod procedure_id_to_name;
4848
pub mod procedure_identity;
4949
pub mod procedure_name_ident;
5050
pub mod stage_file_ident;
51+
pub mod task;
52+
pub mod task_ident;
53+
pub mod task_run_ident;
5154
pub mod tenant_ownership_object_ident;
5255
pub mod tenant_user_ident;
5356
pub mod user_defined_file_format_ident;
@@ -86,6 +89,16 @@ pub use role_info::RoleInfo;
8689
pub use role_info::RoleInfoSerdeError;
8790
pub use stage_file_ident::StageFileIdent;
8891
pub use stage_file_path::StageFilePath;
92+
pub use task::AfterTaskState;
93+
pub use task::ScheduleOptions;
94+
pub use task::ScheduleType;
95+
pub use task::State;
96+
pub use task::Status;
97+
pub use task::Task;
98+
pub use task::TaskRun;
99+
pub use task::WarehouseOptions;
100+
pub use task_ident::TaskIdent;
101+
pub use task_ident::TaskIdentRaw;
89102
pub use tenant_ownership_object_ident::TenantOwnershipObjectIdent;
90103
pub use tenant_user_ident::TenantUserIdent;
91104
pub use udf_ident::UdfIdent;

src/meta/app/src/principal/task.rs

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::BTreeMap;
16+
use std::collections::HashSet;
17+
use std::sync::Arc;
18+
19+
use chrono::DateTime;
20+
use chrono::Utc;
21+
22+
pub const EMPTY_TASK_ID: u64 = 0;
23+
24+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
25+
pub enum ScheduleType {
26+
IntervalType = 0,
27+
CronType = 1,
28+
}
29+
30+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
31+
pub enum Status {
32+
Suspended = 0,
33+
Started = 1,
34+
}
35+
36+
#[derive(Debug, Clone, Copy, PartialEq)]
37+
pub enum State {
38+
Scheduled = 0,
39+
Executing = 1,
40+
Succeeded = 2,
41+
Failed = 3,
42+
Cancelled = 4,
43+
}
44+
45+
#[derive(Debug, Clone, PartialEq)]
46+
pub struct ScheduleOptions {
47+
pub interval: Option<i32>,
48+
pub cron: Option<String>,
49+
pub time_zone: Option<String>,
50+
pub schedule_type: ScheduleType,
51+
pub milliseconds_interval: Option<u64>,
52+
}
53+
54+
#[derive(Debug, Clone, PartialEq)]
55+
pub struct WarehouseOptions {
56+
pub warehouse: Option<String>,
57+
pub using_warehouse_size: Option<String>,
58+
}
59+
60+
#[derive(Debug, Clone, PartialEq)]
61+
pub struct Task {
62+
pub task_id: u64,
63+
pub task_name: String,
64+
pub query_text: String,
65+
pub when_condition: Option<String>,
66+
pub after: Vec<String>,
67+
pub comment: Option<String>,
68+
// expired useless
69+
pub owner: String,
70+
pub owner_user: String,
71+
pub schedule_options: Option<ScheduleOptions>,
72+
pub warehouse_options: Option<WarehouseOptions>,
73+
pub next_scheduled_at: Option<DateTime<Utc>>,
74+
pub suspend_task_after_num_failures: Option<u64>,
75+
// TODO
76+
pub error_integration: Option<String>,
77+
pub status: Status,
78+
pub created_at: DateTime<Utc>,
79+
pub updated_at: DateTime<Utc>,
80+
pub last_suspended_at: Option<DateTime<Utc>>,
81+
// TODO
82+
pub session_params: BTreeMap<String, String>,
83+
}
84+
85+
#[derive(Debug, Clone, PartialEq)]
86+
pub struct TaskRun {
87+
pub task: Task,
88+
pub run_id: u64,
89+
pub attempt_number: i32,
90+
pub state: State,
91+
pub scheduled_at: DateTime<Utc>,
92+
pub completed_at: Option<DateTime<Utc>>,
93+
pub error_code: i64,
94+
pub error_message: Option<String>,
95+
// expired useless
96+
pub root_task_id: u64,
97+
}
98+
99+
impl TaskRun {
100+
pub fn key(&self) -> String {
101+
format!("{}@{}", self.task.task_name, self.run_id)
102+
}
103+
}
104+
105+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
106+
pub struct AfterTaskInfo {
107+
pub afters: Arc<Vec<String>>,
108+
}
109+
110+
pub struct AfterTaskState {
111+
waiting: HashSet<String>,
112+
}
113+
114+
impl From<&Task> for AfterTaskInfo {
115+
fn from(value: &Task) -> Self {
116+
AfterTaskInfo {
117+
afters: Arc::new(value.after.clone()),
118+
}
119+
}
120+
}
121+
122+
impl AfterTaskState {
123+
pub fn completed_task(&mut self, task_name: &str) -> bool {
124+
self.waiting.remove(task_name);
125+
self.waiting.is_empty()
126+
}
127+
}
128+
129+
impl From<&AfterTaskInfo> for AfterTaskState {
130+
fn from(value: &AfterTaskInfo) -> Self {
131+
Self {
132+
waiting: HashSet::from_iter(value.afters.to_vec()),
133+
}
134+
}
135+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::tenant_key::ident::TIdent;
16+
17+
pub type TaskIdent = TIdent<Resource>;
18+
19+
pub type TaskIdentRaw = TIdent<Resource>;
20+
21+
pub use kvapi_impl::Resource;
22+
23+
mod kvapi_impl {
24+
use databend_common_meta_kvapi::kvapi;
25+
26+
use crate::principal::task::Task;
27+
use crate::principal::task_ident::TaskIdent;
28+
use crate::tenant_key::resource::TenantResource;
29+
30+
pub struct Resource;
31+
impl TenantResource for Resource {
32+
const PREFIX: &'static str = "__fd_tasks";
33+
const TYPE: &'static str = "TaskIdent";
34+
const HAS_TENANT: bool = true;
35+
type ValueType = Task;
36+
}
37+
38+
impl kvapi::Value for Task {
39+
type KeyType = TaskIdent;
40+
41+
fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {
42+
[]
43+
}
44+
}
45+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::tenant_key::ident::TIdent;
16+
17+
pub type TaskRunIdent = TIdent<Resource>;
18+
19+
pub type TaskRunIdentRaw = TIdent<Resource>;
20+
21+
pub use kvapi_impl::Resource;
22+
23+
mod kvapi_impl {
24+
use databend_common_meta_kvapi::kvapi;
25+
26+
use crate::principal::task_run_ident::TaskRunIdent;
27+
use crate::principal::TaskRun;
28+
use crate::tenant_key::resource::TenantResource;
29+
30+
pub struct Resource;
31+
impl TenantResource for Resource {
32+
const PREFIX: &'static str = "__fd_task_runs";
33+
const TYPE: &'static str = "TaskRunIdent";
34+
const HAS_TENANT: bool = true;
35+
type ValueType = TaskRun;
36+
}
37+
38+
impl kvapi::Value for TaskRun {
39+
type KeyType = TaskRunIdent;
40+
41+
fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {
42+
[]
43+
}
44+
}
45+
}

src/meta/proto-conv/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ mod schema_from_to_protobuf_impl;
8181
mod sequence_from_to_protobuf_impl;
8282
mod stage_from_to_protobuf_impl;
8383
mod table_from_to_protobuf_impl;
84+
mod task_from_to_protobuf_impl;
8485
mod tenant_quota_from_to_protobuf_impl;
8586
mod tident_from_to_protobuf_impl;
8687
mod token_from_to_protobuf_impl;

0 commit comments

Comments
 (0)