Skip to content

Commit 28670ef

Browse files
authored
chore(ci): flaky test (#16207)
flaky test
1 parent 7dc8a1a commit 28670ef

File tree

11 files changed

+86
-186
lines changed

11 files changed

+86
-186
lines changed

src/meta/api/src/schema_api_impl.rs

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3814,11 +3814,9 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
38143814
);
38153815

38163816
if succ {
3817-
break;
3817+
return Ok(CreateLockRevReply { revision });
38183818
}
38193819
}
3820-
3821-
Ok(CreateLockRevReply { revision })
38223820
}
38233821

38243822
#[logcall::logcall]
@@ -3842,7 +3840,12 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
38423840
let (tb_meta_seq, _) = get_table_by_id_or_err(self, &tbid, ctx).await?;
38433841

38443842
let (lock_seq, lock_meta_opt): (_, Option<LockMeta>) = get_pb_value(self, &key).await?;
3845-
table_lock_has_to_exist(lock_seq, table_id, ctx)?;
3843+
if lock_seq == 0 || lock_meta_opt.is_none() {
3844+
return Err(KVAppError::AppError(AppError::TableLockExpired(
3845+
TableLockExpired::new(table_id, ctx),
3846+
)));
3847+
}
3848+
38463849
let mut lock_meta = lock_meta_opt.unwrap();
38473850
// Set `acquire_lock = true` to initialize `acquired_on` when the
38483851
// first time this lock is acquired. Before the lock is
@@ -3879,10 +3882,9 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
38793882
);
38803883

38813884
if succ {
3882-
break;
3885+
return Ok(());
38833886
}
38843887
}
3885-
Ok(())
38863888
}
38873889

38883890
#[logcall::logcall]
@@ -5904,21 +5906,6 @@ async fn update_mask_policy(
59045906
Ok(())
59055907
}
59065908

5907-
/// Return OK if a table lock exists by checking the seq.
5908-
///
5909-
/// Otherwise returns TableLockExpired error
5910-
fn table_lock_has_to_exist(seq: u64, table_id: u64, msg: impl Display) -> Result<(), KVAppError> {
5911-
if seq == 0 {
5912-
debug!(seq = seq, table_id = table_id; "table lock does not exist");
5913-
5914-
Err(KVAppError::AppError(AppError::TableLockExpired(
5915-
TableLockExpired::new(table_id, format!("{}: {}", msg, table_id)),
5916-
)))
5917-
} else {
5918-
Ok(())
5919-
}
5920-
}
5921-
59225909
#[tonic::async_trait]
59235910
pub(crate) trait UndropTableStrategy {
59245911
fn table_name_ident(&self) -> &TableNameIdent;

src/meta/app/src/schema/lock.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,12 @@ impl LockKey {
9090
}
9191
}
9292

93+
pub fn get_tenant(&self) -> &Tenant {
94+
match self {
95+
LockKey::Table { tenant, .. } => tenant,
96+
}
97+
}
98+
9399
pub fn get_extra_info(&self) -> BTreeMap<String, String> {
94100
match self {
95101
LockKey::Table { .. } => BTreeMap::new(),

src/query/catalog/src/lock.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,6 @@ pub enum LockTableOption {
3131
pub trait Lock: Sync + Send {
3232
fn lock_type(&self) -> LockType;
3333

34-
fn get_catalog(&self) -> &str;
35-
36-
fn get_table_id(&self) -> u64;
37-
38-
fn tenant_name(&self) -> &str;
39-
4034
async fn try_lock(
4135
&self,
4236
ctx: Arc<dyn TableContext>,

src/query/service/src/locks/lock_ext.rs

Lines changed: 0 additions & 68 deletions
This file was deleted.

src/query/service/src/locks/lock_holder.rs

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,13 @@ use databend_common_base::base::tokio::time::sleep;
2424
use databend_common_base::runtime::GlobalIORuntime;
2525
use databend_common_base::runtime::TrySpawn;
2626
use databend_common_catalog::catalog::Catalog;
27-
use databend_common_catalog::lock::Lock;
2827
use databend_common_exception::ErrorCode;
2928
use databend_common_exception::Result;
29+
use databend_common_meta_app::schema::CreateLockRevReq;
3030
use databend_common_meta_app::schema::DeleteLockRevReq;
3131
use databend_common_meta_app::schema::ExtendLockRevReq;
32-
use databend_common_meta_app::schema::LockKey;
33-
use databend_common_meta_app::tenant::Tenant;
32+
use databend_common_metrics::lock::record_created_lock_nums;
3433
use databend_common_storages_fuse::operations::set_backoff;
35-
use fastrace::func_name;
3634
use futures::future::select;
3735
use futures::future::Either;
3836
use rand::thread_rng;
@@ -48,34 +46,26 @@ pub struct LockHolder {
4846

4947
impl LockHolder {
5048
#[async_backtrace::framed]
51-
pub async fn start<T: Lock + ?Sized>(
49+
pub async fn start(
5250
self: &Arc<Self>,
5351
query_id: String,
5452
catalog: Arc<dyn Catalog>,
55-
lock: &T,
56-
revision: u64,
57-
expire_secs: u64,
58-
) -> Result<()> {
53+
req: CreateLockRevReq,
54+
) -> Result<u64> {
55+
let lock_key = req.lock_key.clone();
56+
let expire_secs = req.expire_secs;
5957
let sleep_range = (expire_secs * 1000 / 3)..=(expire_secs * 1000 * 2 / 3);
6058

61-
let tenant_name = lock.tenant_name();
62-
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;
63-
let lock_key = LockKey::Table {
64-
tenant: tenant.clone(),
65-
table_id: lock.get_table_id(),
66-
};
59+
// get a new table lock revision.
60+
let res = catalog.create_lock_revision(req).await?;
61+
let revision = res.revision;
62+
// metrics.
63+
record_created_lock_nums(lock_key.lock_type().to_string(), lock_key.get_table_id(), 1);
6764

6865
let delete_table_lock_req = DeleteLockRevReq::new(lock_key.clone(), revision);
6966
let extend_table_lock_req =
7067
ExtendLockRevReq::new(lock_key.clone(), revision, expire_secs, false);
7168

72-
self.try_extend_lock(
73-
catalog.clone(),
74-
extend_table_lock_req.clone(),
75-
Some(Duration::from_millis(expire_secs * 1000)),
76-
)
77-
.await?;
78-
7969
GlobalIORuntime::instance().spawn({
8070
let self_clone = self.clone();
8171
async move {
@@ -122,7 +112,7 @@ impl LockHolder {
122112
}
123113
});
124114

125-
Ok(())
115+
Ok(revision)
126116
}
127117

128118
pub fn shutdown(&self) {

src/query/service/src/locks/lock_manager.rs

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::collections::HashMap;
1616
use std::sync::Arc;
1717
use std::time::Duration;
18+
use std::time::Instant;
1819

1920
use databend_common_base::base::tokio::sync::mpsc;
2021
use databend_common_base::base::tokio::time::timeout;
@@ -32,18 +33,15 @@ use databend_common_meta_app::schema::ListLockRevReq;
3233
use databend_common_meta_app::schema::LockKey;
3334
use databend_common_meta_app::schema::TableInfo;
3435
use databend_common_meta_app::schema::TableLockIdent;
35-
use databend_common_meta_app::tenant::Tenant;
3636
use databend_common_meta_kvapi::kvapi::Key;
3737
use databend_common_meta_types::protobuf::watch_request::FilterType;
3838
use databend_common_meta_types::protobuf::WatchRequest;
3939
use databend_common_metrics::lock::metrics_inc_shutdown_lock_holder_nums;
4040
use databend_common_metrics::lock::metrics_inc_start_lock_holder_nums;
4141
use databend_common_metrics::lock::record_acquired_lock_nums;
42-
use databend_common_metrics::lock::record_created_lock_nums;
4342
use databend_common_pipeline_core::LockGuard;
4443
use databend_common_pipeline_core::UnlockApi;
4544
use databend_common_users::UserApiProvider;
46-
use fastrace::func_name;
4745
use futures_util::StreamExt;
4846
use parking_lot::RwLock;
4947

@@ -91,39 +89,32 @@ impl LockManager {
9189
/// NOTICE: the lock holder is not 100% reliable.
9290
/// E.g., there is a very small probability of failure in extending or deleting the lock.
9391
#[async_backtrace::framed]
94-
pub async fn try_lock<T: Lock + ?Sized>(
92+
pub async fn try_lock(
9593
self: &Arc<Self>,
9694
ctx: Arc<dyn TableContext>,
97-
lock: &T,
95+
lock_key: LockKey,
96+
catalog_name: &str,
9897
should_retry: bool,
9998
) -> Result<Option<Arc<LockGuard>>> {
100-
let user = ctx.get_current_user()?.name;
101-
let node = ctx.get_cluster().local_id.clone();
102-
let query_id = ctx.get_current_session_id();
103-
let expire_secs = ctx.get_settings().get_table_lock_expire_secs()?;
104-
105-
let catalog = ctx.get_catalog(lock.get_catalog()).await?;
106-
107-
let tenant_name = lock.tenant_name();
108-
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;
109-
let table_id = lock.get_table_id();
110-
let lock_key = LockKey::Table {
111-
tenant: tenant.clone(),
112-
table_id,
113-
};
99+
let start = Instant::now();
114100

115-
let req = CreateLockRevReq::new(lock_key.clone(), user, node, query_id, expire_secs);
101+
let lock_type = lock_key.lock_type().to_string();
102+
let table_id = lock_key.get_table_id();
103+
let tenant = lock_key.get_tenant();
104+
let expire_secs = ctx.get_settings().get_table_lock_expire_secs()?;
105+
let query_id = ctx.get_id();
106+
let req = CreateLockRevReq::new(
107+
lock_key.clone(),
108+
ctx.get_current_user()?.name, // user
109+
ctx.get_cluster().local_id.clone(), // node
110+
query_id.clone(), // query_id
111+
expire_secs,
112+
);
116113

117-
// get a new table lock revision.
118-
let res = catalog.create_lock_revision(req).await?;
119-
let revision = res.revision;
120-
// metrics.
121-
record_created_lock_nums(lock.lock_type().to_string(), table_id, 1);
114+
let catalog = ctx.get_catalog(catalog_name).await?;
122115

123116
let lock_holder = Arc::new(LockHolder::default());
124-
lock_holder
125-
.start(ctx.get_id(), catalog.clone(), lock, revision, expire_secs)
126-
.await?;
117+
let revision = lock_holder.start(query_id, catalog.clone(), req).await?;
127118

128119
self.insert_lock(revision, lock_holder);
129120
let guard = LockGuard::new(self.clone(), revision);
@@ -141,10 +132,15 @@ impl LockManager {
141132
let reply = catalog
142133
.list_lock_revisions(list_table_lock_req.clone())
143134
.await?;
144-
let position = reply.iter().position(|(x, _)| *x == revision).ok_or_else(||
135+
let rev_list = reply.into_iter().map(|(x, _)| x).collect::<Vec<_>>();
136+
let position = rev_list.iter().position(|x| *x == revision).ok_or_else(||
145137
// If the current is not found in list, it means that the current has been expired.
146-
ErrorCode::TableLockExpired("the acquired table lock has been expired".to_string()),
147-
)?;
138+
ErrorCode::TableLockExpired(format!(
139+
"the acquired table lock with revision '{}' is not in {:?}, maybe expired(elapsed: {:?})",
140+
revision,
141+
rev_list,
142+
start.elapsed(),
143+
)))?;
148144

149145
if position == 0 {
150146
// The lock is acquired by current session.
@@ -153,7 +149,7 @@ impl LockManager {
153149

154150
catalog.extend_lock_revision(extend_table_lock_req).await?;
155151
// metrics.
156-
record_acquired_lock_nums(lock.lock_type().to_string(), table_id, 1);
152+
record_acquired_lock_nums(lock_type, table_id, 1);
157153
break;
158154
}
159155

@@ -162,12 +158,13 @@ impl LockManager {
162158
catalog
163159
.delete_lock_revision(delete_table_lock_req.clone())
164160
.await?;
165-
return Err(ErrorCode::TableAlreadyLocked(
166-
"table is locked by other session, please retry later".to_string(),
167-
));
161+
return Err(ErrorCode::TableAlreadyLocked(format!(
162+
"table is locked by other session, please retry later(elapsed: {:?})",
163+
start.elapsed()
164+
)));
168165
}
169166

170-
let watch_delete_ident = TableLockIdent::new(&tenant, table_id, reply[position - 1].0);
167+
let watch_delete_ident = TableLockIdent::new(tenant, table_id, rev_list[position - 1]);
171168

172169
// Get the previous revision, watch the delete event.
173170
let req = WatchRequest {
@@ -193,9 +190,10 @@ impl LockManager {
193190
catalog
194191
.delete_lock_revision(delete_table_lock_req.clone())
195192
.await?;
196-
Err(ErrorCode::TableAlreadyLocked(
197-
"table is locked by other session, please retry later".to_string(),
198-
))
193+
Err(ErrorCode::TableAlreadyLocked(format!(
194+
"table is locked by other session, please retry later(elapsed: {:?})",
195+
start.elapsed()
196+
)))
199197
}
200198
}?;
201199
}

src/query/service/src/locks/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
mod lock_ext;
1615
mod lock_holder;
1716
mod lock_manager;
1817
mod table_lock;
1918

20-
pub use lock_ext::LockExt;
2119
pub use lock_manager::LockManager;

0 commit comments

Comments
 (0)