Skip to content

Commit 7b70b72

Browse files
branch-3.0: [fix](path gc) Fix path gc race with publish task #50343 (#50487)
Cherry-picked from #50343 Co-authored-by: deardeng <dengxin@selectdb.com>
1 parent 7905e50 commit 7b70b72

14 files changed

+204
-69
lines changed

be/src/olap/data_dir.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -754,6 +754,14 @@ void DataDir::_perform_rowset_gc(const std::string& tablet_schema_hash_path) {
754754
[&rowsets_in_version_map](auto& rs) { rowsets_in_version_map.insert(rs->rowset_id()); },
755755
true);
756756

757+
DBUG_EXECUTE_IF("DataDir::_perform_rowset_gc.simulation.slow", {
758+
auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
759+
if (target_tablet_id == tablet_id) {
760+
LOG(INFO) << "debug point wait tablet to remove rsmgr tabletId=" << tablet_id;
761+
DBUG_BLOCK;
762+
}
763+
});
764+
757765
auto reclaim_rowset_file = [](const std::string& path) {
758766
auto st = io::global_local_filesystem()->delete_file(path);
759767
if (!st.ok()) [[unlikely]] {

be/src/olap/olap_server.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,9 +465,17 @@ void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) {
465465
int32_t current_time = time(nullptr);
466466

467467
int32_t interval = _auto_get_interval_by_disk_capacity(data_dir);
468+
DBUG_EXECUTE_IF("_path_gc_thread_callback.interval.eq.1ms", {
469+
LOG(INFO) << "debug point change interval eq 1ms";
470+
interval = 1;
471+
while (DebugPoints::instance()->is_enable("_path_gc_thread_callback.always.do")) {
472+
data_dir->perform_path_gc();
473+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
474+
}
475+
});
468476
if (interval <= 0) {
469477
LOG(WARNING) << "path gc thread check interval config is illegal:" << interval
470-
<< "will be forced set to half hour";
478+
<< " will be forced set to half hour";
471479
interval = 1800; // 0.5 hour
472480
}
473481
if (current_time - last_exec_time >= interval) {
@@ -483,8 +491,9 @@ void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) {
483491
void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>& data_dirs) {
484492
int64_t interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs;
485493
do {
486-
LOG(INFO) << "begin to produce tablet meta checkpoint tasks.";
487494
for (auto data_dir : data_dirs) {
495+
LOG(INFO) << "begin to produce tablet meta checkpoint tasks, data_dir="
496+
<< data_dir->path();
488497
auto st = _tablet_meta_checkpoint_thread_pool->submit_func(
489498
[data_dir, this]() { _tablet_manager->do_tablet_meta_checkpoint(data_dir); });
490499
if (!st.ok()) {

be/src/olap/tablet.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1446,7 +1446,6 @@ bool Tablet::do_tablet_meta_checkpoint() {
14461446
_newly_created_rowset_num < config::tablet_meta_checkpoint_min_new_rowsets_num) {
14471447
return false;
14481448
}
1449-
14501449
// hold read-lock other than write-lock, because it will not modify meta structure
14511450
std::shared_lock rdlock(_meta_lock);
14521451
if (tablet_state() != TABLET_RUNNING) {

be/src/olap/task/engine_publish_version_task.cpp

Lines changed: 51 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,49 @@ TabletPublishTxnTask::TabletPublishTxnTask(StorageEngine& engine,
396396

397397
TabletPublishTxnTask::~TabletPublishTxnTask() = default;
398398

399+
Status publish_version_and_add_rowset(StorageEngine& engine, int64_t partition_id,
400+
const TabletSharedPtr& tablet, const RowsetSharedPtr& rowset,
401+
int64_t transaction_id, const Version& version,
402+
EnginePublishVersionTask* engine_publish_version_task,
403+
TabletPublishStatistics& stats) {
404+
// ATTN: Here, the life cycle needs to be extended to prevent tablet_txn_info.pending_rs_guard in txn
405+
// from being released prematurely, causing path gc to mistakenly delete the dat file
406+
std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr;
407+
408+
// Publish the transaction
409+
auto result = engine.txn_manager()->publish_txn(partition_id, tablet, transaction_id, version,
410+
&stats, extend_tablet_txn_info_lifetime);
411+
if (!result.ok()) {
412+
LOG(WARNING) << "failed to publish version. rowset_id=" << rowset->rowset_id()
413+
<< ", tablet_id=" << tablet->tablet_id() << ", txn_id=" << transaction_id
414+
<< ", res=" << result;
415+
if (engine_publish_version_task) {
416+
engine_publish_version_task->add_error_tablet_id(tablet->tablet_id());
417+
}
418+
return result;
419+
}
420+
421+
DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.block_add_rowsets", DBUG_BLOCK);
422+
423+
// Add visible rowset to tablet
424+
int64_t start_time = MonotonicMicros();
425+
result = tablet->add_inc_rowset(rowset);
426+
DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.after_add_inc_rowset_rowsets_block",
427+
DBUG_BLOCK);
428+
stats.add_inc_rowset_us = MonotonicMicros() - start_time;
429+
if (!result.ok() && !result.is<PUSH_VERSION_ALREADY_EXIST>()) {
430+
LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << rowset->rowset_id()
431+
<< ", tablet_id=" << tablet->tablet_id() << ", txn_id=" << transaction_id
432+
<< ", res=" << result;
433+
if (engine_publish_version_task) {
434+
engine_publish_version_task->add_error_tablet_id(tablet->tablet_id());
435+
}
436+
return result;
437+
}
438+
439+
return result;
440+
}
441+
399442
void TabletPublishTxnTask::handle() {
400443
std::shared_lock migration_rlock(_tablet->get_migration_lock(), std::chrono::seconds(5));
401444
SCOPED_ATTACH_TASK(_mem_tracker);
@@ -411,29 +454,14 @@ void TabletPublishTxnTask::handle() {
411454
rowset_update_lock.lock();
412455
}
413456
_stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
414-
_result = _engine.txn_manager()->publish_txn(_partition_id, _tablet, _transaction_id, _version,
415-
&_stats);
457+
_result = publish_version_and_add_rowset(_engine, _partition_id, _tablet, _rowset,
458+
_transaction_id, _version,
459+
_engine_publish_version_task, _stats);
460+
416461
if (!_result.ok()) {
417-
LOG(WARNING) << "failed to publish version. rowset_id=" << _rowset->rowset_id()
418-
<< ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id
419-
<< ", res=" << _result;
420-
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
421462
return;
422463
}
423464

424-
DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.block_add_rowsets", DBUG_BLOCK);
425-
426-
// add visible rowset to tablet
427-
int64_t t1 = MonotonicMicros();
428-
_result = _tablet->add_inc_rowset(_rowset);
429-
_stats.add_inc_rowset_us = MonotonicMicros() - t1;
430-
if (!_result.ok() && !_result.is<PUSH_VERSION_ALREADY_EXIST>()) {
431-
LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << _rowset->rowset_id()
432-
<< ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id
433-
<< ", res=" << _result;
434-
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
435-
return;
436-
}
437465
int64_t cost_us = MonotonicMicros() - _stats.submit_time_us;
438466
g_tablet_publish_latency << cost_us;
439467
_stats.record_in_bvar();
@@ -466,27 +494,14 @@ void AsyncTabletPublishTask::handle() {
466494
}
467495
RowsetSharedPtr rowset = iter->second;
468496
Version version(_version, _version);
469-
auto publish_status = _engine.txn_manager()->publish_txn(_partition_id, _tablet,
470-
_transaction_id, version, &_stats);
471-
if (!publish_status.ok()) {
472-
LOG(WARNING) << "failed to publish version. rowset_id=" << rowset->rowset_id()
473-
<< ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id
474-
<< ", res=" << publish_status;
475-
return;
476-
}
477497

478-
DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.block_add_rowsets", DBUG_BLOCK);
498+
auto publish_status = publish_version_and_add_rowset(_engine, _partition_id, _tablet, rowset,
499+
_transaction_id, version, nullptr, _stats);
479500

480-
// add visible rowset to tablet
481-
int64_t t1 = MonotonicMicros();
482-
publish_status = _tablet->add_inc_rowset(rowset);
483-
_stats.add_inc_rowset_us = MonotonicMicros() - t1;
484-
if (!publish_status.ok() && !publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) {
485-
LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << rowset->rowset_id()
486-
<< ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id
487-
<< ", res=" << publish_status;
501+
if (!publish_status.ok()) {
488502
return;
489503
}
504+
490505
int64_t cost_us = MonotonicMicros() - _stats.submit_time_us;
491506
// print stats if publish cost > 500ms
492507
g_tablet_publish_latency << cost_us;

be/src/olap/txn_manager.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,11 @@ Status TxnManager::commit_txn(TPartitionId partition_id, const Tablet& tablet,
200200

201201
Status TxnManager::publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
202202
TTransactionId transaction_id, const Version& version,
203-
TabletPublishStatistics* stats) {
203+
TabletPublishStatistics* stats,
204+
std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info) {
204205
return publish_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id,
205-
tablet->tablet_id(), tablet->tablet_uid(), version, stats);
206+
tablet->tablet_id(), tablet->tablet_uid(), version, stats,
207+
extend_tablet_txn_info);
206208
}
207209

208210
void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId transaction_id,
@@ -457,7 +459,8 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
457459
Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
458460
TTransactionId transaction_id, TTabletId tablet_id,
459461
TabletUid tablet_uid, const Version& version,
460-
TabletPublishStatistics* stats) {
462+
TabletPublishStatistics* stats,
463+
std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info) {
461464
auto tablet = _engine.tablet_manager()->get_tablet(tablet_id);
462465
if (tablet == nullptr) {
463466
return Status::OK();
@@ -483,6 +486,7 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
483486
// found load for txn,tablet
484487
// case 1: user commit rowset, then the load id must be equal
485488
tablet_txn_info = txn_info_iter->second;
489+
extend_tablet_txn_info = tablet_txn_info;
486490
rowset = tablet_txn_info->rowset;
487491
}
488492
}

be/src/olap/txn_manager.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ class TxnManager {
166166

167167
Status publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
168168
TTransactionId transaction_id, const Version& version,
169-
TabletPublishStatistics* stats);
169+
TabletPublishStatistics* stats,
170+
std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info);
170171

171172
// delete the txn from manager if it is not committed(not have a valid rowset)
172173
Status rollback_txn(TPartitionId partition_id, const Tablet& tablet,
@@ -184,7 +185,8 @@ class TxnManager {
184185
// not persist rowset meta because
185186
Status publish_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id,
186187
TTabletId tablet_id, TabletUid tablet_uid, const Version& version,
187-
TabletPublishStatistics* stats);
188+
TabletPublishStatistics* stats,
189+
std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info);
188190

189191
// only abort not committed txn
190192
void abort_txn(TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id,

be/test/olap/delta_writer_cluster_key_test.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -309,9 +309,11 @@ TEST_F(TestDeltaWriterClusterKey, vec_sequence_col) {
309309
std::cout << "start to publish txn" << std::endl;
310310
RowsetSharedPtr rowset = tablet_related_rs.begin()->second;
311311
TabletPublishStatistics pstats;
312-
res = engine_ref->txn_manager()->publish_txn(
313-
meta, write_req.partition_id, write_req.txn_id, write_req.tablet_id,
314-
tablet_related_rs.begin()->first.tablet_uid, version, &pstats);
312+
std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr;
313+
res = engine_ref->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
314+
write_req.tablet_id,
315+
tablet_related_rs.begin()->first.tablet_uid,
316+
version, &pstats, extend_tablet_txn_info_lifetime);
315317
ASSERT_TRUE(res.ok());
316318
std::cout << "start to add inc rowset:" << rowset->rowset_id()
317319
<< ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first

be/test/olap/delta_writer_test.cpp

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -669,9 +669,10 @@ TEST_F(TestDeltaWriter, vec_write) {
669669
std::cout << "start to publish txn" << std::endl;
670670
RowsetSharedPtr rowset = tablet_rs.second;
671671
TabletPublishStatistics stats;
672-
res = engine_ref->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
673-
write_req.tablet_id,
674-
tablet_rs.first.tablet_uid, version, &stats);
672+
std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr;
673+
res = engine_ref->txn_manager()->publish_txn(
674+
meta, write_req.partition_id, write_req.txn_id, write_req.tablet_id,
675+
tablet_rs.first.tablet_uid, version, &stats, extend_tablet_txn_info_lifetime);
675676
ASSERT_TRUE(res.ok());
676677
std::cout << "start to add inc rowset:" << rowset->rowset_id()
677678
<< ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first
@@ -763,9 +764,11 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
763764
std::cout << "start to publish txn" << std::endl;
764765
RowsetSharedPtr rowset = tablet_related_rs.begin()->second;
765766
TabletPublishStatistics pstats;
766-
res = engine_ref->txn_manager()->publish_txn(
767-
meta, write_req.partition_id, write_req.txn_id, write_req.tablet_id,
768-
tablet_related_rs.begin()->first.tablet_uid, version, &pstats);
767+
std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr;
768+
res = engine_ref->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
769+
write_req.tablet_id,
770+
tablet_related_rs.begin()->first.tablet_uid,
771+
version, &pstats, extend_tablet_txn_info_lifetime);
769772
ASSERT_TRUE(res.ok());
770773
std::cout << "start to add inc rowset:" << rowset->rowset_id()
771774
<< ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first
@@ -911,9 +914,11 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
911914
std::cout << "start to publish txn" << std::endl;
912915
rowset1 = tablet_related_rs.begin()->second;
913916
TabletPublishStatistics pstats;
917+
std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr;
914918
res = engine_ref->txn_manager()->publish_txn(
915919
meta, write_req.partition_id, write_req.txn_id, write_req.tablet_id,
916-
tablet_related_rs.begin()->first.tablet_uid, version, &pstats);
920+
tablet_related_rs.begin()->first.tablet_uid, version, &pstats,
921+
extend_tablet_txn_info_lifetime);
917922
ASSERT_TRUE(res.ok());
918923
std::cout << "start to add inc rowset:" << rowset1->rowset_id()
919924
<< ", num rows:" << rowset1->num_rows()
@@ -964,9 +969,11 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
964969
ASSERT_TRUE(delete_bitmap->contains({rowset2->rowset_id(), 0, 0}, 1));
965970

966971
TabletPublishStatistics pstats;
972+
std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr;
967973
res = engine_ref->txn_manager()->publish_txn(
968974
meta, write_req.partition_id, write_req.txn_id, write_req.tablet_id,
969-
tablet_related_rs.begin()->first.tablet_uid, version, &pstats);
975+
tablet_related_rs.begin()->first.tablet_uid, version, &pstats,
976+
extend_tablet_txn_info_lifetime);
970977
ASSERT_TRUE(res.ok());
971978
std::cout << "start to add inc rowset:" << rowset2->rowset_id()
972979
<< ", num rows:" << rowset2->num_rows()

be/test/olap/engine_storage_migration_task_test.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,9 +226,10 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) {
226226
for (auto& tablet_rs : tablet_related_rs) {
227227
RowsetSharedPtr rowset = tablet_rs.second;
228228
TabletPublishStatistics stats;
229-
res = engine_ref->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
230-
tablet->tablet_id(), tablet->tablet_uid(),
231-
version, &stats);
229+
std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr;
230+
res = engine_ref->txn_manager()->publish_txn(
231+
meta, write_req.partition_id, write_req.txn_id, tablet->tablet_id(),
232+
tablet->tablet_uid(), version, &stats, extend_tablet_txn_info_lifetime);
232233
EXPECT_EQ(Status::OK(), res);
233234
res = tablet->add_inc_rowset(rowset);
234235
EXPECT_EQ(Status::OK(), res);

be/test/olap/segment_cache_test.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,11 @@ TEST_F(SegmentCacheTest, vec_sequence_col) {
293293
std::cout << "start to publish txn" << std::endl;
294294
RowsetSharedPtr rowset = tablet_related_rs.begin()->second;
295295
TabletPublishStatistics pstats;
296-
res = engine_ref->txn_manager()->publish_txn(
297-
meta, write_req.partition_id, write_req.txn_id, write_req.tablet_id,
298-
tablet_related_rs.begin()->first.tablet_uid, version, &pstats);
296+
std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr;
297+
res = engine_ref->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
298+
write_req.tablet_id,
299+
tablet_related_rs.begin()->first.tablet_uid,
300+
version, &pstats, extend_tablet_txn_info_lifetime);
299301
ASSERT_TRUE(res.ok());
300302
std::cout << "start to add inc rowset:" << rowset->rowset_id()
301303
<< ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first

0 commit comments

Comments
 (0)