Skip to content

Commit 605e422

Browse files
authored
Optimize perfomance of creating topic write sessions (#10085)
1 parent 5440a6b commit 605e422

18 files changed

+83
-91
lines changed

ydb/core/persqueue/partition_scale_manager.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ TPartitionScaleManager::TPartitionScaleManager(
1111
const TString& databasePath,
1212
ui64 pathId,
1313
int version,
14-
const NKikimrPQ::TPQTabletConfig& config
14+
const NKikimrPQ::TPQTabletConfig& config,
15+
const TPartitionGraph& partitionGraph
1516
)
1617
: TopicName(topicName)
1718
, DatabasePath(databasePath)
18-
, BalancerConfig(pathId, version, config) {
19+
, BalancerConfig(pathId, version, config)
20+
, PartitionGraph(partitionGraph) {
1921
}
2022

2123
void TPartitionScaleManager::HandleScaleStatusChange(const ui32 partitionId, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx) {
@@ -69,7 +71,7 @@ std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartition
6971
size_t allowedSplitsCount = BalancerConfig.MaxActivePartitions > BalancerConfig.CurPartitions ? BalancerConfig.MaxActivePartitions - BalancerConfig.CurPartitions : 0;
7072
auto partitionId = PartitionsToSplit.begin();
7173
while (allowedSplitsCount > 0 && partitionId != PartitionsToSplit.end()) {
72-
auto* node = BalancerConfig.PartitionGraph.GetPartition(*partitionId);
74+
auto* node = PartitionGraph.GetPartition(*partitionId);
7375
if (node->Children.empty()) {
7476
auto from = node->From;
7577
auto to = node->To;

ydb/core/persqueue/partition_scale_manager.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ class TPartitionScaleManager {
3030
)
3131
: PathId(pathId)
3232
, PathVersion(version)
33-
, PartitionGraph(MakePartitionGraph(config))
3433
, MaxActivePartitions(config.GetPartitionStrategy().GetMaxPartitionCount())
3534
, MinActivePartitions(config.GetPartitionStrategy().GetMinPartitionCount())
3635
, CurPartitions(std::count_if(config.GetAllPartitions().begin(), config.GetAllPartitions().end(), [](auto& p) {
@@ -40,14 +39,13 @@ class TPartitionScaleManager {
4039

4140
ui64 PathId;
4241
int PathVersion;
43-
TPartitionGraph PartitionGraph;
4442
ui64 MaxActivePartitions;
4543
ui64 MinActivePartitions;
4644
ui64 CurPartitions;
4745
};
4846

4947
public:
50-
TPartitionScaleManager(const TString& topicPath, const TString& databasePath, ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& config);
48+
TPartitionScaleManager(const TString& topicPath, const TString& databasePath, ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& config, const TPartitionGraph& partitionGraph);
5149

5250
public:
5351
void HandleScaleStatusChange(const ui32 partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx);
@@ -79,6 +77,8 @@ class TPartitionScaleManager {
7977
std::unordered_set<ui32> PartitionsToSplit;
8078

8179
TBalancerConfig BalancerConfig;
80+
const TPartitionGraph& PartitionGraph;
81+
8282
bool RequestInflight = false;
8383
};
8484

ydb/core/persqueue/read_balancer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
537537

538538
if (SplitMergeEnabled(TabletConfig)) {
539539
if (!PartitionsScaleManager) {
540-
PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Topic, DatabasePath, PathId, Version, TabletConfig);
540+
PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Topic, DatabasePath, PathId, Version, TabletConfig, PartitionGraph);
541541
} else {
542542
PartitionsScaleManager->UpdateBalancerConfig(PathId, Version, TabletConfig);
543543
}

ydb/core/persqueue/read_balancer__txinit.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction {
6060
Self->PartitionGraph = MakePartitionGraph(Self->TabletConfig);
6161

6262
if (SplitMergeEnabled(Self->TabletConfig)) {
63-
Self->PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Self->Topic, Self->DatabasePath, Self->PathId, Self->Version, Self->TabletConfig);
63+
Self->PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Self->Topic, Self->DatabasePath, Self->PathId, Self->Version, Self->TabletConfig, Self->PartitionGraph);
6464
}
6565
Self->UpdateConfigCounters();
6666
}

ydb/core/persqueue/ut/partition_chooser_ut.cpp

Lines changed: 8 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,6 @@ Y_UNIT_TEST(THashChooser_GetTabletIdTest) {
162162
UNIT_ASSERT_VALUES_EQUAL(chooser.GetPartition(2)->PartitionId, 2);
163163

164164
// Not found
165-
UNIT_ASSERT(!chooser.GetPartition(3));
166165
UNIT_ASSERT(!chooser.GetPartition(666));
167166
}
168167

@@ -209,13 +208,17 @@ TWriteSessionMock* ChoosePartition(NPersQueue::TTestServer& server,
209208
NPersQueue::TTopicConverterPtr fullConverter = CreateTopicConverter();
210209
TWriteSessionMock* mock = new TWriteSessionMock();
211210

211+
auto chooser = NPQ::CreatePartitionChooser(config, true);
212+
auto graph = std::make_shared<NPQ::TPartitionGraph>(NPQ::MakePartitionGraph(config));
213+
212214
NActors::TActorId parentId = server.GetRuntime()->Register(mock);
213-
server.GetRuntime()->Register(NKikimr::NPQ::CreatePartitionChooserActorM(parentId,
215+
server.GetRuntime()->Register(NKikimr::NPQ::CreatePartitionChooserActor<NTabletPipe::NTest::TPipeMock>(parentId,
214216
config,
217+
chooser,
218+
graph,
215219
fullConverter,
216220
sourceId,
217-
preferedPartition,
218-
true));
221+
preferedPartition));
219222

220223
mock->Promise.GetFuture().GetValueSync();
221224

@@ -679,20 +682,6 @@ Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_RegisteredSourceId_Test) {
679682
UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1);
680683
}
681684

682-
Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_Inactive_Test) {
683-
NPersQueue::TTestServer server = CreateServer();
684-
685-
auto config = CreateConfig0(false);
686-
AddPartition(config, 0, {}, {}, {1});
687-
AddPartition(config, 1);
688-
689-
WriteToTable(server, "A_Source", 0);
690-
auto r = ChoosePartition(server, config, "A_Source");
691-
692-
UNIT_ASSERT(r->Result);
693-
UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1);
694-
}
695-
696685
Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_PreferedPartition_Test) {
697686
NPersQueue::TTestServer server = CreateServer();
698687

@@ -706,23 +695,11 @@ Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_PreferedPartition_Test) {
706695
UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
707696
}
708697

709-
Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_PreferedPartition_Inactive_Test) {
710-
NPersQueue::TTestServer server = CreateServer();
711-
712-
auto config = CreateConfig0(false);
713-
AddPartition(config, 0, {}, {}, {1});
714-
AddPartition(config, 1);
715-
716-
auto r = ChoosePartition(server, config, "A_Source", 0);
717-
718-
UNIT_ASSERT(r->Error);
719-
}
720-
721698
Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_BadSourceId_Test) {
722699
NPersQueue::TTestServer server = CreateServer();
723700

724701
auto config = CreateConfig0(false);
725-
AddPartition(config, 0, {}, {});
702+
AddPartition(config, 0);
726703

727704
auto r = ChoosePartition(server, config, "base64:a***");
728705

ydb/core/persqueue/writer/partition_chooser.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
#include <ydb/library/actors/core/actor.h>
44
#include <ydb/core/base/events.h>
5+
#include <ydb/core/persqueue/utils.h>
6+
#include <ydb/core/persqueue/writer/pipe_utils.h>
57
#include <ydb/core/persqueue/writer/source_id_encoding.h>
68
#include <ydb/core/protos/flat_scheme_op.pb.h>
79
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
@@ -53,7 +55,7 @@ struct TEvPartitionChooser {
5355
class IPartitionChooser {
5456
public:
5557
struct TPartitionInfo {
56-
TPartitionInfo(ui32 partitionId, ui64 tabletId)
58+
explicit TPartitionInfo(ui32 partitionId = 0, ui64 tabletId = 0)
5759
: PartitionId(partitionId)
5860
, TabletId(tabletId) {}
5961

@@ -70,11 +72,13 @@ class IPartitionChooser {
7072

7173
std::shared_ptr<IPartitionChooser> CreatePartitionChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config, bool withoutHash = false);
7274

75+
template<typename TPipeHelper = NTabletPipe::TPipeHelper>
7376
NActors::IActor* CreatePartitionChooserActor(TActorId parentId,
7477
const NKikimrSchemeOp::TPersQueueGroupDescription& config,
78+
const std::shared_ptr<NPQ::IPartitionChooser>& chooser,
79+
const std::shared_ptr<NPQ::TPartitionGraph>& graph,
7580
NPersQueue::TTopicConverterPtr& fullConverter,
7681
const TString& sourceId,
77-
std::optional<ui32> preferedPartition,
78-
bool withoutHash = false);
82+
std::optional<ui32> preferedPartition);
7983

8084
} // namespace NKikimr::NPQ

ydb/core/persqueue/writer/partition_chooser_impl.cpp

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,20 +47,39 @@ std::shared_ptr<IPartitionChooser> CreatePartitionChooser(const NKikimrSchemeOp:
4747
}
4848
}
4949

50+
template<typename TPipeHelper>
5051
IActor* CreatePartitionChooserActor(TActorId parentId,
5152
const NKikimrSchemeOp::TPersQueueGroupDescription& config,
53+
const std::shared_ptr<NPQ::IPartitionChooser>& chooser,
54+
const std::shared_ptr<NPQ::TPartitionGraph>& graph,
5255
NPersQueue::TTopicConverterPtr& fullConverter,
5356
const TString& sourceId,
54-
std::optional<ui32> preferedPartition,
55-
bool withoutHash) {
56-
auto chooser = CreatePartitionChooser(config, withoutHash);
57+
std::optional<ui32> preferedPartition) {
5758
if (SplitMergeEnabled(config.GetPQTabletConfig())) {
58-
return new NPartitionChooser::TSMPartitionChooserActor<NTabletPipe::TPipeHelper>(parentId, config, chooser, fullConverter, sourceId, preferedPartition);
59+
return new NPartitionChooser::TSMPartitionChooserActor<TPipeHelper>(parentId, chooser, graph, fullConverter, sourceId, preferedPartition);
5960
} else {
60-
return new NPartitionChooser::TPartitionChooserActor<NTabletPipe::TPipeHelper>(parentId, config, chooser, fullConverter, sourceId, preferedPartition);
61+
return new NPartitionChooser::TPartitionChooserActor<TPipeHelper>(parentId, config, chooser, fullConverter, sourceId, preferedPartition);
6162
}
6263
}
6364

65+
template
66+
IActor* CreatePartitionChooserActor<NTabletPipe::TPipeHelper>(TActorId parentId,
67+
const NKikimrSchemeOp::TPersQueueGroupDescription& config,
68+
const std::shared_ptr<NPQ::IPartitionChooser>& chooser,
69+
const std::shared_ptr<NPQ::TPartitionGraph>& graph,
70+
NPersQueue::TTopicConverterPtr& fullConverter,
71+
const TString& sourceId,
72+
std::optional<ui32> preferedPartition);
73+
74+
template
75+
IActor* CreatePartitionChooserActor<NTabletPipe::NTest::TPipeMock>(TActorId parentId,
76+
const NKikimrSchemeOp::TPersQueueGroupDescription& config,
77+
const std::shared_ptr<NPQ::IPartitionChooser>& chooser,
78+
const std::shared_ptr<NPQ::TPartitionGraph>& graph,
79+
NPersQueue::TTopicConverterPtr& fullConverter,
80+
const TString& sourceId,
81+
std::optional<ui32> preferedPartition);
82+
6483
} // namespace NKikimr::NPQ
6584

6685
std::unordered_map<ui64, NActors::TActorId> NKikimr::NTabletPipe::NTest::TPipeMock::Tablets;

ydb/core/persqueue/writer/partition_chooser_impl.h

Lines changed: 4 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -121,15 +121,10 @@ const typename TBoundaryChooser<THasher>::TPartitionInfo* TBoundaryChooser<THash
121121
//
122122
template<class THasher>
123123
THashChooser<THasher>::THashChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config) {
124+
Partitions.resize(config.GetPartitions().size());
124125
for(const auto& p : config.GetPartitions()) {
125-
if (NKikimrPQ::ETopicPartitionStatus::Active == p.GetStatus()) {
126-
Partitions.emplace_back(TPartitionInfo{p.GetPartitionId(),
127-
p.GetTabletId()});
128-
}
126+
Partitions[p.GetPartitionId()] = TPartitionInfo{p.GetPartitionId(), p.GetTabletId()};
129127
}
130-
131-
std::sort(Partitions.begin(), Partitions.end(),
132-
[](const TPartitionInfo& a, const TPartitionInfo& b) { return a.PartitionId < b.PartitionId; });
133128
}
134129

135130
template<class THasher>
@@ -142,12 +137,10 @@ const typename THashChooser<THasher>::TPartitionInfo* THashChooser<THasher>::Get
142137

143138
template<class THasher>
144139
const typename THashChooser<THasher>::TPartitionInfo* THashChooser<THasher>::GetPartition(ui32 partitionId) const {
145-
auto it = std::lower_bound(Partitions.begin(), Partitions.end(), partitionId,
146-
[](const TPartitionInfo& partition, const ui32 value) { return value > partition.PartitionId; });
147-
if (it == Partitions.end()) {
140+
if (partitionId >= Partitions.size()) {
148141
return nullptr;
149142
}
150-
return it->PartitionId == partitionId ? it : nullptr;
143+
return &Partitions[partitionId];
151144
}
152145

153146
template<class THasher>
@@ -161,21 +154,4 @@ const typename THashChooser<THasher>::TPartitionInfo* THashChooser<THasher>::Get
161154

162155

163156
} // namespace NPartitionChooser
164-
165-
166-
inline IActor* CreatePartitionChooserActorM(TActorId parentId,
167-
const NKikimrSchemeOp::TPersQueueGroupDescription& config,
168-
NPersQueue::TTopicConverterPtr& fullConverter,
169-
const TString& sourceId,
170-
std::optional<ui32> preferedPartition,
171-
bool withoutHash) {
172-
auto chooser = CreatePartitionChooser(config, withoutHash);
173-
if (SplitMergeEnabled(config.GetPQTabletConfig())) {
174-
return new NPartitionChooser::TSMPartitionChooserActor<NTabletPipe::NTest::TPipeMock>(parentId, config, chooser, fullConverter, sourceId, preferedPartition);
175-
} else {
176-
return new NPartitionChooser::TPartitionChooserActor<NTabletPipe::NTest::TPipeMock>(parentId, config, chooser, fullConverter, sourceId, preferedPartition);
177-
}
178-
}
179-
180-
181157
} // namespace NKikimr::NPQ

ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class TAbstractPartitionChooserActor: public TActorBootstrapped<TDerived> {
3838

3939

4040
TAbstractPartitionChooserActor(TActorId parentId,
41-
std::shared_ptr<IPartitionChooser>& chooser,
41+
const std::shared_ptr<IPartitionChooser>& chooser,
4242
NPersQueue::TTopicConverterPtr& fullConverter,
4343
const TString& sourceId,
4444
std::optional<ui32> preferedPartition)

ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class TPartitionChooserActor: public TAbstractPartitionChooserActor<TPartitionCh
2828

2929
TPartitionChooserActor(TActorId parentId,
3030
const NKikimrSchemeOp::TPersQueueGroupDescription& config,
31-
std::shared_ptr<IPartitionChooser>& chooser,
31+
const std::shared_ptr<IPartitionChooser>& chooser,
3232
NPersQueue::TTopicConverterPtr& fullConverter,
3333
const TString& sourceId,
3434
std::optional<ui32> preferedPartition)

0 commit comments

Comments
 (0)