Skip to content

Commit 55c423d

Browse files
committed
feat: imp when_condition for Task & store TaskRun on system table
1 parent c0c285c commit 55c423d

File tree

13 files changed

+1140
-385
lines changed

13 files changed

+1140
-385
lines changed

src/binaries/query/entry.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use databend_query::servers::MySQLHandler;
4444
use databend_query::servers::MySQLTlsConfig;
4545
use databend_query::servers::Server;
4646
use databend_query::servers::ShutdownHandle;
47+
use databend_query::task::TaskService;
4748
use databend_query::GlobalServices;
4849
use log::info;
4950

@@ -302,6 +303,7 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
302303
}
303304
println!(" system history tables: {}", conf.log.history);
304305
}
306+
TaskService::instance().initialized();
305307

306308
println!();
307309
println!(

src/common/exception/src/exception_code.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,8 @@ build_exceptions! {
403403
TaskCronInvalid(2614),
404404
/// Task schedule and after conflict
405405
TaskScheduleAndAfterConflict(2615),
406+
/// Task when condition not met
407+
TaskWhenConditionNotMet(2616),
406408
}
407409

408410
// Search and External Service Errors [1901-1903, 1910]

src/query/management/src/task/task_mgr.rs

Lines changed: 20 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,21 @@ impl TaskMgr {
103103
) -> Result<Result<(), TaskError>, TaskApiError> {
104104
task.created_at = Utc::now();
105105

106-
self.create_task_inner(task, create_option).await
106+
self.create_task_inner(task, create_option, false).await
107+
}
108+
109+
#[async_backtrace::framed]
110+
#[fastrace::trace]
111+
pub async fn update_task(&self, task: Task) -> Result<Result<(), TaskError>, TaskApiError> {
112+
self.create_task_inner(task, &CreateOption::CreateOrReplace, true)
113+
.await
107114
}
108115

109116
async fn create_task_inner(
110117
&self,
111118
task: Task,
112119
create_option: &CreateOption,
120+
without_schedule: bool,
113121
) -> Result<Result<(), TaskError>, TaskApiError> {
114122
assert!(task.after.is_empty() || task.schedule_options.is_none());
115123
// check
@@ -152,9 +160,13 @@ impl TaskMgr {
152160
}
153161
}
154162
if !task.after.is_empty() {
155-
let _ = TaskChannel::instance().send(TaskMessage::AfterTask(task));
156-
} else if task.schedule_options.is_some() {
157-
let _ = TaskChannel::instance().send(TaskMessage::ScheduleTask(task));
163+
let _ = TaskChannel::instance()
164+
.send(TaskMessage::AfterTask(task))
165+
.await;
166+
} else if task.schedule_options.is_some() && !without_schedule {
167+
let _ = TaskChannel::instance()
168+
.send(TaskMessage::ScheduleTask(task))
169+
.await;
158170
}
159171

160172
Ok(Ok(()))
@@ -174,7 +186,9 @@ impl TaskMgr {
174186
context: "while execute task".to_string(),
175187
}));
176188
};
177-
let _ = TaskChannel::instance().send(TaskMessage::ExecuteTask(Task::clone(&task)));
189+
let _ = TaskChannel::instance()
190+
.send(TaskMessage::ExecuteTask(Task::clone(&task)))
191+
.await;
178192

179193
Ok(Ok(()))
180194
}
@@ -256,7 +270,7 @@ impl TaskMgr {
256270
}
257271
}
258272
if let Err(e) = self
259-
.create_task_inner(task, &CreateOption::CreateOrReplace)
273+
.create_task_inner(task, &CreateOption::CreateOrReplace, false)
260274
.await?
261275
{
262276
return Ok(Err(TaskError::NotFound {
@@ -322,50 +336,6 @@ impl TaskMgr {
322336
Ok(tasks)
323337
}
324338

325-
#[async_backtrace::framed]
326-
#[fastrace::trace]
327-
pub async fn update_task_run(
328-
&self,
329-
task_run: TaskRun,
330-
) -> Result<Result<(), TaskError>, TaskApiError> {
331-
let seq = MatchSeq::from(CreateOption::CreateOrReplace);
332-
let key = TaskRunIdent::new(&self.tenant, task_run.key());
333-
let req = UpsertPB::insert(key, task_run.clone()).with(seq);
334-
let _ = self.kv_api.upsert_pb(&req).await?;
335-
336-
Ok(Ok(()))
337-
}
338-
339-
#[async_backtrace::framed]
340-
#[fastrace::trace]
341-
pub async fn lasted_task_run(
342-
&self,
343-
task_name: &str,
344-
) -> Result<Result<Option<TaskRun>, TaskError>, TaskApiError> {
345-
let key = DirName::new(TaskRunIdent::new(&self.tenant, task_name));
346-
let result = self
347-
.kv_api
348-
.list_pb_values(&key)
349-
.await?
350-
.next()
351-
.await
352-
.transpose()?;
353-
354-
Ok(Ok(result))
355-
}
356-
357-
#[async_backtrace::framed]
358-
#[fastrace::trace]
359-
pub async fn show_task_runs_full(
360-
&self,
361-
task_name: &str,
362-
) -> Result<Result<Vec<TaskRun>, TaskApiError>, TaskApiError> {
363-
let key = DirName::new(TaskRunIdent::new(&self.tenant, task_name));
364-
let stream = self.kv_api.list_pb_values(&key).await?;
365-
366-
Ok(Ok(stream.try_collect().await?))
367-
}
368-
369339
pub fn make_schedule_options(opt: ScheduleOptions) -> task::ScheduleOptions {
370340
match opt {
371341
ScheduleOptions::IntervalSecs(secs, ms) => {

src/query/service/src/global_services.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ use crate::servers::http::v1::ClientSessionManager;
5858
use crate::servers::http::v1::HttpQueryManager;
5959
use crate::sessions::QueriesQueueManager;
6060
use crate::sessions::SessionManager;
61-
use crate::task_service::TaskService;
61+
use crate::task::task_service::TaskService;
6262

6363
pub struct GlobalServices;
6464

@@ -123,8 +123,7 @@ impl GlobalServices {
123123
SessionManager::init(config)?;
124124
LockManager::init()?;
125125
AuthMgr::init(config)?;
126-
127-
126+
128127
let task_rx = TaskChannel::init(GlobalConfig::instance().query.tasks_channel_len)?;
129128
// Init user manager.
130129
// Builtin users and udfs are created here.
@@ -146,7 +145,6 @@ impl GlobalServices {
146145
)
147146
.await?;
148147
}
149-
TaskService::init(task_rx, config.query.tenant_id.clone())?;
150148
RoleCacheManager::init()?;
151149

152150
DataOperator::init(&config.storage, config.spill.storage_params.clone()).await?;
@@ -181,6 +179,7 @@ impl GlobalServices {
181179
if config.log.history.on {
182180
GlobalHistoryLog::init(config).await?;
183181
}
182+
TaskService::init(task_rx, config)?;
184183

185184
GLOBAL_QUERIES_MANAGER.set_gc_handle(memory_gc_handle);
186185

src/query/service/src/interpreters/task/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
use std::sync::Arc;
1616

1717
use databend_common_cloud_control::task_utils;
18-
use databend_common_config::GlobalConfig;
1918
use databend_common_exception::Result;
2019
use databend_common_sql::plans::AlterTaskPlan;
2120
use databend_common_sql::plans::CreateTaskPlan;
@@ -37,7 +36,7 @@ impl TaskInterpreterFactory {
3736
pub fn build() -> TaskInterpreterImpl {
3837
// TODO: for test
3938
if true {
40-
// if GlobalConfig::instance().query.enable_private_task {
39+
// if GlobalConfig::instance().query.enable_private_task {
4140
return TaskInterpreterImpl::Private(PrivateTaskInterpreter);
4241
}
4342
TaskInterpreterImpl::Cloud(CloudTaskInterpreter)

src/query/service/src/interpreters/task/private.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,6 @@ impl TaskInterpreter for PrivateTaskInterpreter {
163163
) -> Result<Vec<task_utils::Task>> {
164164
let tasks = UserApiProvider::instance().show_tasks(&plan.tenant).await?;
165165

166-
tasks
167-
.into_iter()
168-
.map(Self::task_trans)
169-
.try_collect()
166+
tasks.into_iter().map(Self::task_trans).try_collect()
170167
}
171168
}

src/query/service/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ pub mod table_functions;
6161
pub mod test_kits;
6262

6363
mod global_services;
64-
mod task_service;
64+
pub mod task;
6565

6666
pub use databend_common_sql as sql;
6767
pub use databend_common_storages_factory as storages;

src/query/service/src/task/meta.rs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
use std::time::Duration;
17+
18+
use databend_common_base::runtime::block_on;
19+
use databend_common_exception::Result;
20+
use databend_common_meta_client::ClientHandle;
21+
use databend_common_meta_kvapi::kvapi::KVApi;
22+
use databend_common_meta_semaphore::acquirer::Permit;
23+
use databend_common_meta_semaphore::Semaphore;
24+
use databend_common_meta_types::MatchSeq;
25+
use databend_common_meta_types::Operation;
26+
use databend_common_meta_types::UpsertKV;
27+
28+
/// refer to [crate::history_tables::meta::PermitGuard]
29+
pub struct PermitGuard {
30+
_permit: Permit,
31+
meta_handle: Arc<TaskMetaHandle>,
32+
meta_key: String,
33+
}
34+
35+
impl PermitGuard {
36+
pub fn new(permit: Permit, meta_handle: Arc<TaskMetaHandle>, meta_key: String) -> Self {
37+
Self {
38+
_permit: permit,
39+
meta_handle,
40+
meta_key,
41+
}
42+
}
43+
}
44+
45+
impl Drop for PermitGuard {
46+
fn drop(&mut self) {
47+
let meta_handle = self.meta_handle.clone();
48+
let meta_key = self.meta_key.clone();
49+
50+
block_on(async move {
51+
let _ = meta_handle.update_last_execution_timestamp(&meta_key).await;
52+
});
53+
}
54+
}
55+
56+
/// refer to [crate::history_tables::meta::HistoryMetaHandle]
57+
pub struct TaskMetaHandle {
58+
meta_client: Arc<ClientHandle>,
59+
node_id: String,
60+
}
61+
62+
impl TaskMetaHandle {
63+
pub fn new(meta_client: Arc<ClientHandle>, node_id: String) -> Self {
64+
Self {
65+
meta_client,
66+
node_id,
67+
}
68+
}
69+
70+
pub async fn acquire(&self, meta_key: &str, interval: u64) -> Result<Option<Permit>> {
71+
let acquired_guard = Semaphore::new_acquired(
72+
self.meta_client.clone(),
73+
meta_key,
74+
1,
75+
self.node_id.clone(),
76+
Duration::from_secs(3),
77+
)
78+
.await
79+
.map_err(|_e| "acquire semaphore failed from TaskService")?;
80+
if interval == 0 {
81+
return Ok(Some(acquired_guard));
82+
}
83+
if match self
84+
.meta_client
85+
.get_kv(&format!("{}/last_timestamp", meta_key))
86+
.await?
87+
{
88+
Some(v) => {
89+
let last: u64 = serde_json::from_slice(&v.data)?;
90+
chrono::Utc::now().timestamp_millis() as u64
91+
- Duration::from_secs(interval).as_millis() as u64
92+
> last
93+
}
94+
None => true,
95+
} {
96+
Ok(Some(acquired_guard))
97+
} else {
98+
drop(acquired_guard);
99+
Ok(None)
100+
}
101+
}
102+
103+
pub async fn acquire_with_guard(
104+
&self,
105+
meta_key: &str,
106+
interval: u64,
107+
) -> Result<Option<PermitGuard>> {
108+
if let Some(permit) = self.acquire(meta_key, interval).await? {
109+
Ok(Some(PermitGuard::new(
110+
permit,
111+
Arc::new(TaskMetaHandle {
112+
meta_client: self.meta_client.clone(),
113+
node_id: self.node_id.clone(),
114+
}),
115+
meta_key.to_string(),
116+
)))
117+
} else {
118+
Ok(None)
119+
}
120+
}
121+
122+
pub async fn update_last_execution_timestamp(&self, meta_key: &str) -> Result<()> {
123+
self.meta_client
124+
.upsert_kv(UpsertKV::new(
125+
format!("{}/last_timestamp", meta_key),
126+
MatchSeq::Any,
127+
Operation::Update(serde_json::to_vec(&chrono::Utc::now().timestamp_millis())?),
128+
None,
129+
))
130+
.await?;
131+
Ok(())
132+
}
133+
}

src/query/service/src/task/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
mod meta;
16+
mod session;
17+
pub mod task_service;
18+
19+
pub use task_service::TaskService;

0 commit comments

Comments
 (0)