Skip to content

Commit 9b3c2bd

Browse files
authored
Remove destination session after partitioning finish (#11411)
1 parent 02d2031 commit 9b3c2bd

File tree

4 files changed

+147
-9
lines changed

4 files changed

+147
-9
lines changed

.github/config/muted_ya.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1_clean_with_restart
2424
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingConsistency64
2525
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingModuloN
2626
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.UpsertWhileSplitTest
27+
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMerge
28+
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplits
29+
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsThenMerges
30+
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsWithRestartsAfterWait
31+
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsWithRestartsWhenWait
32+
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMergesWithRestartsAfterWait
33+
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMergesWithRestartsWhenWait
2734
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesActualization
2835
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInBS
2936
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInLocalMetadata

ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp

Lines changed: 125 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
1-
#include "helpers/typed_local.h"
21
#include "helpers/local.h"
2+
#include "helpers/typed_local.h"
33
#include "helpers/writer.h"
4-
#include <ydb/core/tx/columnshard/data_sharing/initiator/controller/abstract.h>
5-
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
4+
5+
#include <ydb/core/base/tablet_pipecache.h>
66
#include <ydb/core/tx/columnshard/common/snapshot.h>
7-
#include <ydb/core/tx/columnshard/data_sharing/initiator/status/abstract.h>
87
#include <ydb/core/tx/columnshard/data_sharing/common/context/context.h>
9-
#include <ydb/core/tx/columnshard/data_sharing/destination/session/destination.h>
108
#include <ydb/core/tx/columnshard/data_sharing/destination/events/control.h>
11-
#include <ydb/core/base/tablet_pipecache.h>
9+
#include <ydb/core/tx/columnshard/data_sharing/destination/session/destination.h>
10+
#include <ydb/core/tx/columnshard/data_sharing/initiator/controller/abstract.h>
11+
#include <ydb/core/tx/columnshard/data_sharing/initiator/status/abstract.h>
12+
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
13+
1214
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
1315
#include <ydb/public/sdk/cpp/client/ydb_ss_tasks/task.h>
1416

@@ -276,7 +278,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
276278
void WaitResharding(const TString& hint = "") {
277279
const TInstant start = TInstant::Now();
278280
bool clean = false;
279-
while (TInstant::Now() - start < TDuration::Seconds(20)) {
281+
while (TInstant::Now() - start < TDuration::Seconds(200)) {
280282
NYdb::NOperation::TOperationClient operationClient(Kikimr.GetDriver());
281283
auto result = operationClient.List<NYdb::NSchemeShard::TBackgroundProcessesResponse>().GetValueSync();
282284
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
@@ -408,7 +410,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
408410

409411
public:
410412
TAsyncReshardingTest() {
411-
TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 128, 4);
413+
TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 1024, 32);
412414
}
413415

414416
void AddBatch(int numRows) {
@@ -561,5 +563,120 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
561563

562564
tester.CheckCount();
563565
}
566+
567+
Y_UNIT_TEST(MultipleMerge) {
568+
TAsyncReshardingTest tester;
569+
tester.DisableCompaction();
570+
571+
tester.AddBatch(10000);
572+
573+
for (int i = 0; i < 4; i++) {
574+
tester.StartResharding("MERGE");
575+
tester.WaitResharding();
576+
}
577+
578+
tester.RestartAllShards();
579+
580+
tester.CheckCount();
581+
}
582+
583+
Y_UNIT_TEST(MultipleSplits) {
584+
TAsyncReshardingTest tester;
585+
tester.DisableCompaction();
586+
587+
tester.AddBatch(10000);
588+
589+
for (int i = 0; i < 4; i++) {
590+
tester.StartResharding("SPLIT");
591+
tester.WaitResharding();
592+
}
593+
594+
tester.RestartAllShards();
595+
596+
tester.CheckCount();
597+
}
598+
599+
Y_UNIT_TEST(MultipleSplitsThenMerges) {
600+
TAsyncReshardingTest tester;
601+
tester.DisableCompaction();
602+
603+
tester.AddBatch(10000);
604+
605+
for (int i = 0; i < 4; i++) {
606+
tester.StartResharding("SPLIT");
607+
tester.WaitResharding();
608+
}
609+
610+
for (int i = 0; i < 8; i++) {
611+
tester.StartResharding("MERGE");
612+
tester.WaitResharding();
613+
}
614+
615+
tester.RestartAllShards();
616+
617+
tester.CheckCount();
618+
}
619+
620+
Y_UNIT_TEST(MultipleSplitsWithRestartsAfterWait) {
621+
TAsyncReshardingTest tester;
622+
tester.DisableCompaction();
623+
624+
tester.AddBatch(10000);
625+
626+
for (int i = 0; i < 4; i++) {
627+
tester.StartResharding("SPLIT");
628+
tester.WaitResharding();
629+
tester.RestartAllShards();
630+
}
631+
632+
tester.CheckCount();
633+
}
634+
635+
Y_UNIT_TEST(MultipleSplitsWithRestartsWhenWait) {
636+
TAsyncReshardingTest tester;
637+
tester.DisableCompaction();
638+
639+
tester.AddBatch(10000);
640+
641+
for (int i = 0; i < 4; i++) {
642+
tester.StartResharding("SPLIT");
643+
tester.RestartAllShards();
644+
tester.WaitResharding();
645+
}
646+
tester.RestartAllShards();
647+
648+
tester.CheckCount();
649+
}
650+
651+
Y_UNIT_TEST(MultipleMergesWithRestartsAfterWait) {
652+
TAsyncReshardingTest tester;
653+
tester.DisableCompaction();
654+
655+
tester.AddBatch(10000);
656+
657+
for (int i = 0; i < 4; i++) {
658+
tester.StartResharding("MERGE");
659+
tester.WaitResharding();
660+
tester.RestartAllShards();
661+
}
662+
663+
tester.CheckCount();
664+
}
665+
666+
Y_UNIT_TEST(MultipleMergesWithRestartsWhenWait) {
667+
TAsyncReshardingTest tester;
668+
tester.DisableCompaction();
669+
670+
tester.AddBatch(10000);
671+
672+
for (int i = 0; i < 4; i++) {
673+
tester.StartResharding("MERGE");
674+
tester.RestartAllShards();
675+
tester.WaitResharding();
676+
}
677+
tester.RestartAllShards();
678+
679+
tester.CheckCount();
680+
}
564681
}
565682
}

ydb/core/tx/columnshard/transactions/operators/sharing.cpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,24 @@ void TSharingTransactionOperator::DoStartProposeOnComplete(TColumnShard& /*owner
5252
}
5353

5454
bool TSharingTransactionOperator::ProgressOnExecute(
55-
TColumnShard& /*owner*/, const NOlap::TSnapshot& /*version*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) {
55+
TColumnShard& owner, const NOlap::TSnapshot& /*version*/, NTabletFlatExecutor::TTransactionContext& txc) {
56+
if (!SharingTask) {
57+
return true;
58+
}
59+
if (!TxFinish) {
60+
TxFinish = SharingTask->AckInitiatorFinished(&owner, SharingTask).DetachResult();
61+
}
62+
TxFinish->Execute(txc, NActors::TActivationContext::AsActorContext());
63+
5664
return true;
5765
}
5866

5967
bool TSharingTransactionOperator::ProgressOnComplete(TColumnShard& owner, const TActorContext& ctx) {
68+
if (!SharingTask) {
69+
return true;
70+
}
71+
AFL_VERIFY(!!TxFinish);
72+
TxFinish->Complete(ctx);
6073
for (TActorId subscriber : NotifySubscribers) {
6174
auto event = MakeHolder<TEvColumnShard::TEvNotifyTxCompletionResult>(owner.TabletID(), GetTxId());
6275
ctx.Send(subscriber, event.Release(), 0, 0);

ydb/core/tx/columnshard/transactions/operators/sharing.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class TSharingTransactionOperator: public IProposeTxOperator, public TMonitoring
1717
mutable std::unique_ptr<NTabletFlatExecutor::ITransaction> TxPropose;
1818
mutable std::unique_ptr<NTabletFlatExecutor::ITransaction> TxConfirm;
1919
mutable std::unique_ptr<NTabletFlatExecutor::ITransaction> TxAbort;
20+
mutable std::unique_ptr<NTabletFlatExecutor::ITransaction> TxFinish;
2021
static inline auto Registrator = TFactory::TRegistrator<TSharingTransactionOperator>(NKikimrTxColumnShard::TX_KIND_SHARING);
2122
THashSet<TActorId> NotifySubscribers;
2223
virtual TTxController::TProposeResult DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override;

0 commit comments

Comments
 (0)