Skip to content

Commit cbb548e

Browse files
authored
refactor: CrudMgr::update should return only when the value is changed (#16225)
1 parent d068568 commit cbb548e

File tree

3 files changed

+107
-10
lines changed

3 files changed

+107
-10
lines changed

src/meta/api/src/crud/mod.rs

Lines changed: 90 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
mod errors;
1818
use std::marker::PhantomData;
1919
use std::sync::Arc;
20+
use std::time::Duration;
2021

2122
use databend_common_meta_app::schema::CreateOption;
2223
use databend_common_meta_app::tenant::Tenant;
@@ -36,6 +37,7 @@ use databend_common_meta_types::With;
3637
use databend_common_proto_conv::FromToProto;
3738
pub use errors::CrudError;
3839
use futures::TryStreamExt;
40+
use log::info;
3941

4042
use crate::kv_pb_api::KVPbApi;
4143
use crate::kv_pb_api::UpsertPB;
@@ -80,18 +82,42 @@ where
8082
// As a kvapi::Key, the corresponding value contains a name.
8183
ValueOf<R>: ValueWithName + FromToProto + Clone,
8284
{
85+
/// Add a record.
86+
///
87+
/// `create_option` specifies the behavior when the record already exists.
8388
#[async_backtrace::framed]
8489
#[fastrace::trace]
8590
pub async fn add(
8691
&self,
8792
value: ValueOf<R>,
8893
create_option: &CreateOption,
94+
) -> Result<(), CrudError<ExistError<R>>> {
95+
self.add_with_ttl(value, None, create_option).await
96+
}
97+
98+
/// Add a record with an optional time-to-live(TTL) argument.
99+
///
100+
/// If `ttl` is `None`, the record will not expire.
101+
/// `create_option` specifies the behavior when the record already exists.
102+
#[async_backtrace::framed]
103+
#[fastrace::trace]
104+
pub async fn add_with_ttl(
105+
&self,
106+
value: ValueOf<R>,
107+
ttl: Option<Duration>,
108+
create_option: &CreateOption,
89109
) -> Result<(), CrudError<ExistError<R>>> {
90110
let ident = self.ident(value.name());
91111

92112
let seq = MatchSeq::from(*create_option);
93113
let upsert = UpsertPB::insert(ident, value.clone()).with(seq);
94114

115+
let upsert = if let Some(ttl) = ttl {
116+
upsert.with_ttl(ttl)
117+
} else {
118+
upsert
119+
};
120+
95121
let res = self.kv_api.upsert_pb(&upsert).await?;
96122

97123
if let CreateOption::Create = create_option {
@@ -109,15 +135,71 @@ where
109135
&self,
110136
value: ValueOf<R>,
111137
match_seq: MatchSeq,
138+
) -> Result<u64, CrudError<UnknownError<R>>> {
139+
self.update_with_ttl(value, match_seq, None).await
140+
}
141+
142+
/// Update a record with an optional time-to-live(TTL) argument.
143+
///
144+
/// Returns the seq of the updated record.
145+
/// If `ttl` is `None`, the record will not expire.
146+
/// `match_seq` specifies what existing value will be overridden,
147+
/// if the seq of existing value does not match the provided `match_seq`,
148+
/// nothing will be done.
149+
#[async_backtrace::framed]
150+
#[fastrace::trace]
151+
pub async fn update_with_ttl(
152+
&self,
153+
value: ValueOf<R>,
154+
match_seq: MatchSeq,
155+
ttl: Option<Duration>,
112156
) -> Result<u64, CrudError<UnknownError<R>>> {
113157
let ident = self.ident(value.name());
114158
let upsert = UpsertPB::update(ident, value.clone()).with(match_seq);
115159

160+
let upsert = if let Some(ttl) = ttl {
161+
upsert.with_ttl(ttl)
162+
} else {
163+
upsert
164+
};
165+
116166
let res = self.kv_api.upsert_pb(&upsert).await?;
117167

118-
match res.result {
119-
Some(SeqV { seq, .. }) => Ok(seq),
120-
None => Err(UnknownError::new(value.name(), "NotFound when update").into()),
168+
if res.is_changed() {
169+
Ok(res.result.seq())
170+
} else {
171+
Err(UnknownError::new(value.name(), match_seq, "NotFound when update").into())
172+
}
173+
}
174+
175+
/// Fetch the record with the given `name`, update it with the provided function `update`,
176+
/// then save it back if the seq number did not change.
177+
///
178+
/// The `seq` is the initial seq number to fetch the record.
179+
#[async_backtrace::framed]
180+
#[fastrace::trace]
181+
pub async fn cas_with(
182+
&self,
183+
name: &str,
184+
seq: MatchSeq,
185+
update: impl Fn(SeqV<ValueOf<R>>) -> ValueOf<R> + Send,
186+
) -> Result<u64, CrudError<UnknownError<R>>> {
187+
loop {
188+
let seq_v = self.get(name, seq).await?;
189+
190+
let seq = seq_v.seq;
191+
let new_value = update(seq_v);
192+
193+
let ident = self.ident(name);
194+
let upsert = UpsertPB::update(ident, new_value).with(MatchSeq::Exact(seq));
195+
196+
let res = self.kv_api.upsert_pb(&upsert).await?;
197+
198+
if res.is_changed() {
199+
return Ok(res.result.seq());
200+
} else {
201+
info!("cas: retrying, name: {}, seq: {}", name, seq);
202+
}
121203
}
122204
}
123205

@@ -136,6 +218,7 @@ where
136218
res.removed_or_else(|e| {
137219
UnknownError::new(
138220
name,
221+
seq,
139222
format_args!("NotFound when remove, seq of existing record: {}", e.seq()),
140223
)
141224
})?;
@@ -154,11 +237,13 @@ where
154237

155238
let res = self.kv_api.get_pb(&ident).await?;
156239

157-
let seq_value = res.ok_or_else(|| UnknownError::new(name, "NotFound when get"))?;
240+
let seq_value = res.ok_or_else(|| UnknownError::new(name, seq, "NotFound when get"))?;
158241

159242
match seq.match_seq(&seq_value) {
160243
Ok(_) => Ok(seq_value),
161-
Err(e) => Err(UnknownError::new(name, format_args!("NotFound when get: {}", e)).into()),
244+
Err(e) => {
245+
Err(UnknownError::new(name, seq, format_args!("NotFound when get: {}", e)).into())
246+
}
162247
}
163248
}
164249

src/meta/app/src/tenant_key/errors.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
use std::fmt;
1616
use std::fmt::Formatter;
1717

18+
use databend_common_meta_types::MatchSeq;
19+
1820
use crate::tenant_key::resource::TenantResource;
1921

2022
/// Error occurred when a record already exists for a key.
@@ -70,14 +72,16 @@ where R: TenantResource
7072
#[derive(Clone, PartialEq, Eq, thiserror::Error)]
7173
pub struct UnknownError<R> {
7274
name: String,
75+
match_seq: MatchSeq,
7376
ctx: String,
7477
_p: std::marker::PhantomData<R>,
7578
}
7679

7780
impl<R> UnknownError<R> {
78-
pub fn new(name: impl ToString, ctx: impl ToString) -> Self {
81+
pub fn new(name: impl ToString, match_seq: MatchSeq, ctx: impl ToString) -> Self {
7982
Self {
8083
name: name.to_string(),
84+
match_seq,
8185
ctx: ctx.to_string(),
8286
_p: Default::default(),
8387
}
@@ -101,6 +105,7 @@ where R: TenantResource
101105
f.debug_struct("UnknownError")
102106
.field("type", &typ)
103107
.field("name", &self.name)
108+
.field("match_seq", &self.match_seq)
104109
.field("ctx", &self.ctx)
105110
.finish()
106111
}
@@ -111,7 +116,11 @@ where R: TenantResource
111116
{
112117
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
113118
let typ = type_name::<R>();
114-
write!(f, "Unknown {typ} '{}': {}", self.name, self.ctx)
119+
write!(
120+
f,
121+
"Unknown {typ} '{}'(seq {}): {}",
122+
self.name, self.match_seq, self.ctx
123+
)
115124
}
116125
}
117126

@@ -123,6 +132,8 @@ fn type_name<R: TenantResource>() -> &'static str {
123132

124133
#[cfg(test)]
125134
mod tests {
135+
use databend_common_meta_types::MatchSeq;
136+
126137
use crate::principal::network_policy_ident;
127138

128139
#[test]
@@ -141,11 +152,12 @@ mod tests {
141152
fn test_unknown_error() {
142153
let err = super::UnknownError::<network_policy_ident::Resource> {
143154
name: "foo".to_string(),
155+
match_seq: MatchSeq::GE(1),
144156
ctx: "bar".to_string(),
145157
_p: Default::default(),
146158
};
147159

148160
let got = err.to_string();
149-
assert_eq!(got, "Unknown NetworkPolicy 'foo': bar")
161+
assert_eq!(got, "Unknown NetworkPolicy 'foo'(seq >= 1): bar")
150162
}
151163
}

src/meta/types/src/change.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ where ID: Clone + PartialEq
4040
impl<T, ID> Change<T, ID>
4141
where
4242
ID: Clone + PartialEq + Debug,
43-
T: PartialEq + Debug,
43+
T: Debug,
4444
{
4545
pub fn new(prev: Option<SeqV<T>>, result: Option<SeqV<T>>) -> Self {
4646
Change {
@@ -76,7 +76,7 @@ where
7676
}
7777

7878
pub fn is_changed(&self) -> bool {
79-
self.prev != self.result
79+
self.prev.seq() != self.result.seq()
8080
}
8181

8282
/// Assumes it is a state transition of an add operation and return Ok if the add operation succeed.

0 commit comments

Comments
 (0)