Skip to content

Commit a2ce45c

Browse files
authored
refactor: deprecate: Operation::AsIs and will be removed (#16913)
* refactor: remove put_with_expire_at; Use ttl instead of expire_at for BackgroundTask * refactor: simplify ClusterApi::heartbeat. heartbeat is barely an `upsert` operation no matter the node key present or not. * refactor: deprecate: Operation::AsIs and will be removed * chore: fix deprecated * M src/query/management/src/cluster/cluster_api.rs
1 parent 5a934a6 commit a2ce45c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+244
-233
lines changed

src/meta/api/src/background_api_impl.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::time::Instant;
16+
1517
use chrono::Utc;
1618
use databend_common_meta_app::app_error::AppError;
1719
use databend_common_meta_app::background::background_job_id_ident::BackgroundJobId;
@@ -40,14 +42,14 @@ use databend_common_meta_app::KeyWithTenant;
4042
use databend_common_meta_kvapi::kvapi;
4143
use databend_common_meta_kvapi::kvapi::DirName;
4244
use databend_common_meta_kvapi::kvapi::Key;
43-
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
4445
use databend_common_meta_types::seq_value::SeqValue;
4546
use databend_common_meta_types::MatchSeq::Any;
4647
use databend_common_meta_types::MetaError;
4748
use databend_common_meta_types::MetaSpec;
4849
use databend_common_meta_types::Operation;
4950
use databend_common_meta_types::SeqV;
5051
use databend_common_meta_types::TxnRequest;
52+
use databend_common_meta_types::UpsertKV;
5153
use fastrace::func_name;
5254
use futures::TryStreamExt;
5355
use log::debug;
@@ -222,18 +224,18 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
222224
let meta = req.task_info.clone();
223225

224226
let resp = self
225-
.upsert_kv(UpsertKVReq::new(
227+
.upsert_kv(UpsertKV::new(
226228
name_key.to_string_key().as_str(),
227229
Any,
228230
Operation::Update(serialize_struct(&meta)?),
229-
Some(MetaSpec::new_expire(req.expire_at)),
231+
Some(MetaSpec::new_ttl(req.ttl)),
230232
))
231233
.await?;
232234
// confirm a successful update
233235
assert!(resp.is_changed());
234236
Ok(UpdateBackgroundTaskReply {
235237
last_updated: Utc::now(),
236-
expire_at: req.expire_at,
238+
expire_at: Instant::now() + req.ttl,
237239
})
238240
}
239241

src/meta/api/src/background_api_test_suite.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::time::Duration;
16+
1517
use chrono::DateTime;
1618
use chrono::Utc;
1719
use databend_common_meta_app::background::BackgroundJobIdent;
@@ -127,13 +129,11 @@ impl BackgroundApiTestSuite {
127129

128130
info!("--- create a background task");
129131
let create_on = Utc::now();
130-
// expire after 5 secs
131-
let expire_at = create_on + chrono::Duration::seconds(5);
132132
{
133133
let req = UpdateBackgroundTaskReq {
134134
task_name: task_ident.clone(),
135135
task_info: new_background_task(BackgroundTaskState::STARTED, create_on),
136-
expire_at: expire_at.timestamp() as u64,
136+
ttl: Duration::from_secs(5),
137137
};
138138

139139
let res = mt.update_background_task(req).await;
@@ -155,7 +155,7 @@ impl BackgroundApiTestSuite {
155155
let req = UpdateBackgroundTaskReq {
156156
task_name: task_ident.clone(),
157157
task_info: new_background_task(BackgroundTaskState::DONE, create_on),
158-
expire_at: expire_at.timestamp() as u64,
158+
ttl: Duration::from_secs(5),
159159
};
160160

161161
let res = mt.update_background_task(req).await;

src/meta/api/src/kv_pb_api/codec.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ where T: FromToProto {
3636
Ok(Operation::Update(buf))
3737
}
3838
Operation::Delete => Ok(Operation::Delete),
39-
Operation::AsIs => Ok(Operation::AsIs),
39+
_ => {
40+
unreachable!("Operation::AsIs is not supported")
41+
}
4042
}
4143
}
4244

src/meta/api/src/kv_pb_api/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -516,13 +516,13 @@ mod tests {
516516
use databend_common_meta_kvapi::kvapi::KVApi;
517517
use databend_common_meta_kvapi::kvapi::KVStream;
518518
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
519-
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
520519
use databend_common_meta_types::protobuf::StreamItem;
521520
use databend_common_meta_types::seq_value::SeqV;
522521
use databend_common_meta_types::seq_value::SeqValue;
523522
use databend_common_meta_types::MetaError;
524523
use databend_common_meta_types::TxnReply;
525524
use databend_common_meta_types::TxnRequest;
525+
use databend_common_meta_types::UpsertKV;
526526
use databend_common_proto_conv::FromToProto;
527527
use futures::StreamExt;
528528
use futures::TryStreamExt;
@@ -541,7 +541,7 @@ mod tests {
541541
impl KVApi for FooKV {
542542
type Error = MetaError;
543543

544-
async fn upsert_kv(&self, _req: UpsertKVReq) -> Result<UpsertKVReply, Self::Error> {
544+
async fn upsert_kv(&self, _req: UpsertKV) -> Result<UpsertKVReply, Self::Error> {
545545
unimplemented!()
546546
}
547547

src/meta/api/src/kv_pb_api/upsert_pb.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,6 @@ impl<K: kvapi::Key> UpsertPB<K> {
9090
}
9191
}
9292

93-
pub fn with_expire_sec(self, expire_at_sec: u64) -> Self {
94-
self.with(MetaSpec::new_expire(expire_at_sec))
95-
}
96-
9793
/// Set the time to last for the value.
9894
/// When the ttl is passed, the value is deleted.
9995
pub fn with_ttl(self, ttl: Duration) -> Self {

src/meta/api/src/name_id_value_api.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -388,12 +388,12 @@ mod tests {
388388
use databend_common_meta_kvapi::kvapi::KVApi;
389389
use databend_common_meta_kvapi::kvapi::KVStream;
390390
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
391-
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
392391
use databend_common_meta_types::protobuf::StreamItem;
393392
use databend_common_meta_types::seq_value::SeqV;
394393
use databend_common_meta_types::MetaError;
395394
use databend_common_meta_types::TxnReply;
396395
use databend_common_meta_types::TxnRequest;
396+
use databend_common_meta_types::UpsertKV;
397397
use databend_common_proto_conv::FromToProto;
398398
use futures::StreamExt;
399399
use prost::Message;
@@ -408,7 +408,7 @@ mod tests {
408408
impl KVApi for Foo {
409409
type Error = MetaError;
410410

411-
async fn upsert_kv(&self, _req: UpsertKVReq) -> Result<UpsertKVReply, Self::Error> {
411+
async fn upsert_kv(&self, _req: UpsertKV) -> Result<UpsertKVReply, Self::Error> {
412412
unimplemented!()
413413
}
414414

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,6 @@ use databend_common_meta_app::tenant::ToTenant;
130130
use databend_common_meta_app::KeyWithTenant;
131131
use databend_common_meta_kvapi::kvapi;
132132
use databend_common_meta_kvapi::kvapi::Key;
133-
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
134133
use databend_common_meta_types::MatchSeq;
135134
use databend_common_meta_types::MetaError;
136135
use databend_common_meta_types::Operation;
@@ -238,7 +237,7 @@ async fn upsert_test_data(
238237
value: Vec<u8>,
239238
) -> Result<u64, KVAppError> {
240239
let res = kv_api
241-
.upsert_kv(UpsertKVReq {
240+
.upsert_kv(UpsertKV {
242241
key: key.to_string_key(),
243242
seq: MatchSeq::GE(0),
244243
value: Operation::Update(value),
@@ -255,7 +254,7 @@ async fn delete_test_data(
255254
key: &impl kvapi::Key,
256255
) -> Result<(), KVAppError> {
257256
let _res = kv_api
258-
.upsert_kv(UpsertKVReq {
257+
.upsert_kv(UpsertKV {
259258
key: key.to_string_key(),
260259
seq: MatchSeq::GE(0),
261260
value: Operation::Delete,

src/meta/api/src/util.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use databend_common_meta_app::primitive::Id;
2525
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
2626
use databend_common_meta_app::schema::TableNameIdent;
2727
use databend_common_meta_kvapi::kvapi;
28-
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
2928
use databend_common_meta_types::seq_value::SeqV;
3029
use databend_common_meta_types::txn_condition::Target;
3130
use databend_common_meta_types::ConditionResult;
@@ -40,6 +39,7 @@ use databend_common_meta_types::TxnGetResponse;
4039
use databend_common_meta_types::TxnOp;
4140
use databend_common_meta_types::TxnOpResponse;
4241
use databend_common_meta_types::TxnRequest;
42+
use databend_common_meta_types::UpsertKV;
4343
use databend_common_proto_conv::FromToProto;
4444
use log::debug;
4545

@@ -200,7 +200,7 @@ pub async fn fetch_id<T: kvapi::Key>(
200200
generator: T,
201201
) -> Result<u64, MetaError> {
202202
let res = kv_api
203-
.upsert_kv(UpsertKVReq {
203+
.upsert_kv(UpsertKV {
204204
key: generator.to_string_key(),
205205
seq: MatchSeq::GE(0),
206206
value: Operation::Update(b"".to_vec()),

src/meta/app/src/background/background_task.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::fmt;
1616
use std::fmt::Display;
1717
use std::fmt::Formatter;
1818
use std::time::Duration;
19+
use std::time::Instant;
1920

2021
use chrono::DateTime;
2122
use chrono::Utc;
@@ -156,7 +157,7 @@ impl BackgroundTaskInfo {
156157
pub struct UpdateBackgroundTaskReq {
157158
pub task_name: BackgroundTaskIdent,
158159
pub task_info: BackgroundTaskInfo,
159-
pub expire_at: u64,
160+
pub ttl: Duration,
160161
}
161162

162163
impl Display for UpdateBackgroundTaskReq {
@@ -176,7 +177,7 @@ impl Display for UpdateBackgroundTaskReq {
176177
#[derive(Clone, Debug, PartialEq, Eq)]
177178
pub struct UpdateBackgroundTaskReply {
178179
pub last_updated: DateTime<Utc>,
179-
pub expire_at: u64,
180+
pub expire_at: Instant,
180181
}
181182

182183
#[derive(Clone, Debug, PartialEq, Eq)]

src/meta/binaries/meta/kvapi.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@
1515
use std::sync::Arc;
1616

1717
use databend_common_meta_kvapi::kvapi;
18-
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
1918
use databend_common_meta_types::MetaError;
2019
use databend_common_meta_types::MetaSpec;
20+
use databend_common_meta_types::UpsertKV;
2121
use databend_common_meta_types::With;
2222
use databend_meta::configs::Config;
2323

2424
pub enum KvApiCommand {
2525
Get(String),
26-
Upsert(UpsertKVReq),
26+
Upsert(UpsertKV),
2727
MGet(Vec<String>),
2828
List(String),
2929
}
@@ -36,7 +36,7 @@ impl KvApiCommand {
3636
return Err("The number of keys must be 1".to_string());
3737
}
3838

39-
let req = UpsertKVReq::update(config.key[0].as_str(), config.value.as_bytes());
39+
let req = UpsertKV::update(config.key[0].as_str(), config.value.as_bytes());
4040

4141
let req = if let Some(expire_after) = config.expire_after {
4242
req.with(MetaSpec::new_ttl(std::time::Duration::from_secs(
@@ -52,7 +52,7 @@ impl KvApiCommand {
5252
if config.key.len() != 1 {
5353
return Err("The number of keys must be 1".to_string());
5454
}
55-
Self::Upsert(UpsertKVReq::delete(&config.key[0]))
55+
Self::Upsert(UpsertKV::delete(&config.key[0]))
5656
}
5757
"get" => {
5858
if config.key.len() != 1 {

0 commit comments

Comments
 (0)