Skip to content

Commit d6ddbae

Browse files
authored
feat: impl Task for private (#18311)
* feat: impl Task for private * feat: imp when_condition for Task & store TaskRun on system table * feat: use Meta's Watch mechanism to distribute TaskMessage * refactor: Using SQL to simplify DAG * chore: fix meta_store init * chore: log error on spawn * test: add test for private task * chore: add license for private task * chore: fix test_display_license_info * chore: codefmt * chore: add `accept` for Delete & After * chore: add restart test * chore: codefmt * chore: add `system.task_history` for Private Task & use `TaskMgr::accept` replace `TaskMetaHandle::acquire_with_guard` * fix: TableFunctionFactory create fail * ci: rename to private task test * chore: add cron test * feat: add system table: `system.tasks` * chore: fix `update_or_create_task_run` correct on `ScheduleTask` * chore: add Task when test on `test-private-task.sh` * chore: add private task config check on `GlobalServices::init_with` * chore: update Task version on Meta * chore: codefmt * chore: remove `TaskMgr::list_task_fallback` * chore: codefmt
1 parent 39d0a75 commit d6ddbae

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+3909
-386
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
name: "Test private task for databend query"
2+
description: "Test private task for databend query"
3+
runs:
4+
using: "composite"
5+
steps:
6+
- uses: ./.github/actions/setup_test
7+
8+
- name: Install lsof
9+
shell: bash
10+
run: sudo apt-get update -yq && sudo apt-get install -yq lsof
11+
12+
- name: Minio Setup for (ubuntu-latest only)
13+
shell: bash
14+
run: |
15+
docker run -d --network host --name minio \
16+
-e "MINIO_ACCESS_KEY=minioadmin" \
17+
-e "MINIO_SECRET_KEY=minioadmin" \
18+
-e "MINIO_ADDRESS=:9900" \
19+
-v /tmp/data:/data \
20+
-v /tmp/config:/root/.minio \
21+
minio/minio server /data
22+
23+
export AWS_ACCESS_KEY_ID=minioadmin
24+
export AWS_SECRET_ACCESS_KEY=minioadmin
25+
export AWS_EC2_METADATA_DISABLED=true
26+
27+
aws --endpoint-url http://127.0.0.1:9900/ s3 mb s3://testbucket
28+
aws --endpoint-url http://127.0.0.1:9900/ s3 cp tests/data s3://testbucket/data --recursive --no-progress
29+
30+
- name: Run Private Task Tests
31+
shell: bash
32+
run: |
33+
bash ./tests/task/test-private-task.sh
34+
35+
- name: Upload failure
36+
if: failure()
37+
uses: ./.github/actions/artifact_failure
38+
with:
39+
name: test-tasks

.github/workflows/reuse.linux.yml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,24 @@ jobs:
183183
- uses: ./.github/actions/test_logs
184184
timeout-minutes: 20
185185

186+
test_private_tasks:
187+
needs: [ build, check ]
188+
runs-on:
189+
- self-hosted
190+
- X64
191+
- Linux
192+
- 2c8g
193+
- "${{ inputs.runner_provider }}"
194+
- "${{ inputs.runner_capacity }}"
195+
steps:
196+
- uses: actions/checkout@v4
197+
- uses: ./.github/actions/setup_license
198+
with:
199+
runner_provider: ${{ inputs.runner_provider }}
200+
type: ${{ inputs.license_type }}
201+
- uses: ./.github/actions/test_private_tasks
202+
timeout-minutes: 20
203+
186204
test_meta_cluster:
187205
needs: [build, check]
188206
runs-on:

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/binaries/query/entry.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use databend_query::servers::MySQLHandler;
4343
use databend_query::servers::MySQLTlsConfig;
4444
use databend_query::servers::Server;
4545
use databend_query::servers::ShutdownHandle;
46+
use databend_query::task::TaskService;
4647
use databend_query::GlobalServices;
4748
use log::info;
4849

@@ -306,6 +307,9 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
306307
}
307308
println!(" system history tables: {}", conf.log.history);
308309
}
310+
if conf.task.on {
311+
TaskService::instance().initialized();
312+
}
309313

310314
println!();
311315
println!(

src/common/exception/src/exception_code.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,22 @@ build_exceptions! {
391391
UDFDataError(2607),
392392
}
393393

394+
// Task Errors [2611-2614]
395+
build_exceptions! {
396+
/// Unknown Task
397+
UnknownTask(2611),
398+
/// Task already exists
399+
TaskAlreadyExists(2612),
400+
/// Task timezone invalid
401+
TaskTimezoneInvalid(2613),
402+
/// Task cron invalid
403+
TaskCronInvalid(2614),
404+
/// Task schedule and after conflict
405+
TaskScheduleAndAfterConflict(2615),
406+
/// Task when condition not met
407+
TaskWhenConditionNotMet(2616),
408+
}
409+
394410
// Search and External Service Errors [1901-1903, 1910]
395411
build_exceptions! {
396412
/// Tantivy error

src/common/license/src/license.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ pub enum Feature {
8383
SystemHistory,
8484
#[serde(alias = "vector_index", alias = "VECTOR_INDEX")]
8585
VectorIndex,
86+
#[serde(alias = "private_task", alias = "PRIVATE_TASK")]
87+
PrivateTask,
8688
#[serde(other)]
8789
Unknown,
8890
}
@@ -134,6 +136,7 @@ impl fmt::Display for Feature {
134136
Feature::WorkloadGroup => write!(f, "workload_group"),
135137
Feature::SystemHistory => write!(f, "system_history"),
136138
Feature::VectorIndex => write!(f, "vector_index"),
139+
Feature::PrivateTask => write!(f, "private_task"),
137140
Feature::Unknown => write!(f, "unknown"),
138141
}
139142
}
@@ -372,6 +375,11 @@ mod tests {
372375
serde_json::from_str::<Feature>("\"VectorIndex\"").unwrap()
373376
);
374377

378+
assert_eq!(
379+
Feature::PrivateTask,
380+
serde_json::from_str::<Feature>("\"private_task\"").unwrap()
381+
);
382+
375383
assert_eq!(
376384
Feature::Unknown,
377385
serde_json::from_str::<Feature>("\"ssss\"").unwrap()
@@ -408,11 +416,12 @@ mod tests {
408416
Feature::NgramIndex,
409417
Feature::WorkloadGroup,
410418
Feature::SystemHistory,
419+
Feature::PrivateTask,
411420
]),
412421
};
413422

414423
assert_eq!(
415-
"LicenseInfo{ type: enterprise, org: databend, tenants: [databend_tenant,foo], features: [aggregate_index,amend_table,attach_table,compute_quota(threads_num: 1, memory_usage: 1),computed_column,data_mask,hilbert_clustering,inverted_index,license_info,ngram_index,storage_encryption,storage_quota(storage_usage: 1),stream,system_history,vacuum,virtual_column,workload_group] }",
424+
"LicenseInfo{ type: enterprise, org: databend, tenants: [databend_tenant,foo], features: [aggregate_index,amend_table,attach_table,compute_quota(threads_num: 1, memory_usage: 1),computed_column,data_mask,hilbert_clustering,inverted_index,license_info,ngram_index,private_task,storage_encryption,storage_quota(storage_usage: 1),stream,system_history,vacuum,virtual_column,workload_group] }",
416425
license_info.to_string()
417426
);
418427
}

src/meta/api/src/kv_pb_api/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use itertools::Itertools;
4343
use seq_marked::SeqValue;
4444

4545
pub(crate) use self::codec::decode_non_empty_item;
46-
pub(crate) use self::codec::decode_seqv;
46+
pub use self::codec::decode_seqv;
4747
pub(crate) use self::codec::decode_transition;
4848
pub(crate) use self::codec::encode_operation;
4949
pub use self::upsert_pb::UpsertPB;

src/meta/app/src/principal/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ pub mod procedure_id_to_name;
4848
pub mod procedure_identity;
4949
pub mod procedure_name_ident;
5050
pub mod stage_file_ident;
51+
pub mod task;
52+
pub mod task_ident;
53+
pub mod task_message_ident;
5154
pub mod tenant_ownership_object_ident;
5255
pub mod tenant_user_ident;
5356
pub mod user_defined_file_format_ident;
@@ -86,6 +89,16 @@ pub use role_info::RoleInfo;
8689
pub use role_info::RoleInfoSerdeError;
8790
pub use stage_file_ident::StageFileIdent;
8891
pub use stage_file_path::StageFilePath;
92+
pub use task::ScheduleOptions;
93+
pub use task::ScheduleType;
94+
pub use task::State;
95+
pub use task::Status;
96+
pub use task::Task;
97+
pub use task::TaskMessage;
98+
pub use task::TaskRun;
99+
pub use task::WarehouseOptions;
100+
pub use task_ident::TaskIdent;
101+
pub use task_ident::TaskIdentRaw;
89102
pub use tenant_ownership_object_ident::TenantOwnershipObjectIdent;
90103
pub use tenant_user_ident::TenantUserIdent;
91104
pub use udf_ident::UdfIdent;

src/meta/app/src/principal/task.rs

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::BTreeMap;
16+
17+
use chrono::DateTime;
18+
use chrono::Utc;
19+
20+
pub const EMPTY_TASK_ID: u64 = 0;
21+
22+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
23+
pub enum ScheduleType {
24+
IntervalType = 0,
25+
CronType = 1,
26+
}
27+
28+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
29+
pub enum Status {
30+
Suspended = 0,
31+
Started = 1,
32+
}
33+
34+
#[derive(Debug, Clone, Copy, PartialEq)]
35+
pub enum State {
36+
Scheduled = 0,
37+
Executing = 1,
38+
Succeeded = 2,
39+
Failed = 3,
40+
Cancelled = 4,
41+
}
42+
43+
#[derive(Debug, Clone, PartialEq)]
44+
pub struct ScheduleOptions {
45+
pub interval: Option<i32>,
46+
pub cron: Option<String>,
47+
pub time_zone: Option<String>,
48+
pub schedule_type: ScheduleType,
49+
pub milliseconds_interval: Option<u64>,
50+
}
51+
52+
#[derive(Debug, Clone, PartialEq)]
53+
pub struct WarehouseOptions {
54+
pub warehouse: Option<String>,
55+
pub using_warehouse_size: Option<String>,
56+
}
57+
58+
#[derive(Debug, Clone, PartialEq)]
59+
pub struct Task {
60+
pub task_id: u64,
61+
pub task_name: String,
62+
pub query_text: String,
63+
pub when_condition: Option<String>,
64+
pub after: Vec<String>,
65+
pub comment: Option<String>,
66+
// expired useless
67+
pub owner: String,
68+
pub owner_user: String,
69+
pub schedule_options: Option<ScheduleOptions>,
70+
pub warehouse_options: Option<WarehouseOptions>,
71+
pub next_scheduled_at: Option<DateTime<Utc>>,
72+
pub suspend_task_after_num_failures: Option<u64>,
73+
// TODO
74+
pub error_integration: Option<String>,
75+
pub status: Status,
76+
pub created_at: DateTime<Utc>,
77+
pub updated_at: DateTime<Utc>,
78+
pub last_suspended_at: Option<DateTime<Utc>>,
79+
// TODO
80+
pub session_params: BTreeMap<String, String>,
81+
}
82+
83+
#[derive(Debug, Clone, PartialEq)]
84+
pub struct TaskRun {
85+
pub task: Task,
86+
pub run_id: u64,
87+
pub attempt_number: i32,
88+
pub state: State,
89+
pub scheduled_at: DateTime<Utc>,
90+
pub completed_at: Option<DateTime<Utc>>,
91+
pub error_code: i64,
92+
pub error_message: Option<String>,
93+
// expired useless
94+
pub root_task_id: u64,
95+
}
96+
97+
#[derive(Debug, Clone, PartialEq)]
98+
pub enum TaskMessageType {
99+
Execute,
100+
Schedule,
101+
Delete,
102+
After,
103+
}
104+
105+
#[derive(Debug, Clone, PartialEq)]
106+
pub enum TaskMessage {
107+
// Execute Task immediately. If an error occurs, if it is a SQL error in the task,
108+
// it will be recorded in the error message of the task run.
109+
ExecuteTask(Task),
110+
// Schedule Task will try to spawn a thread in Query to continue running according to the time set in schedule
111+
ScheduleTask(Task),
112+
// Delete the task information and try to cancel the scheduled task in the query.
113+
DeleteTask(String),
114+
// After Task will bind Task to the tasks in Task.afters.
115+
// When Execute Task is executed, after all the after tasks of Task are completed,
116+
// the execution will continue.
117+
AfterTask(Task),
118+
}
119+
120+
impl TaskMessage {
121+
pub fn task_name(&self) -> &str {
122+
match self {
123+
TaskMessage::ExecuteTask(task)
124+
| TaskMessage::ScheduleTask(task)
125+
| TaskMessage::AfterTask(task) => task.task_name.as_str(),
126+
TaskMessage::DeleteTask(task_name) => task_name.as_str(),
127+
}
128+
}
129+
130+
pub fn ty(&self) -> TaskMessageType {
131+
match self {
132+
TaskMessage::ExecuteTask(_) => TaskMessageType::Execute,
133+
TaskMessage::ScheduleTask(_) => TaskMessageType::Schedule,
134+
TaskMessage::DeleteTask(_) => TaskMessageType::Delete,
135+
TaskMessage::AfterTask(_) => TaskMessageType::After,
136+
}
137+
}
138+
139+
pub fn key(&self) -> String {
140+
Self::key_with_type(self.ty(), self.task_name())
141+
}
142+
143+
pub fn key_with_type(ty: TaskMessageType, task_name: &str) -> String {
144+
let ty_num = match ty {
145+
TaskMessageType::Execute => 0,
146+
TaskMessageType::Schedule => 1,
147+
TaskMessageType::Delete => 2,
148+
TaskMessageType::After => 3,
149+
};
150+
format!("{}-{}-{}", TaskMessage::prefix_range().0, ty_num, task_name)
151+
}
152+
153+
/// Returns the inclusive range of key prefixes used by `TaskMessage`.
154+
///
155+
/// This range can be used to scan all keys generated by `TaskMessage::key()`
156+
/// and related methods (e.g., `schedule_key`). The prefix `0` is prepended
157+
/// to all task-related keys to group them under the same prefix range,
158+
/// enabling efficient key scanning or iteration.
159+
///
160+
/// The returned range is (0, 1), which includes all keys starting with `0-`
161+
/// (as produced by `TaskMessage::prefix()`), and excludes any other unrelated prefixes.
162+
pub fn prefix_range() -> (i64, i64) {
163+
(0, 1)
164+
}
165+
}

0 commit comments

Comments
 (0)