Skip to content

Commit 6166b6e

Browse files
committed
25-1 Allow to send duplicates through TDedicatedPipePool
1 parent 665f332 commit 6166b6e

File tree

3 files changed

+32
-28
lines changed

3 files changed

+32
-28
lines changed

ydb/core/tx/schemeshard/dedicated_pipe_pool.h

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,21 @@ class TDedicatedPipePool {
1515
TMap<TActorId, std::pair<TEntityId, TTabletId>> Owners;
1616

1717
public:
18-
void Create(const TEntityId& entityId, TTabletId dst, THolder<IEventBase> message, const TActorContext& ctx) {
19-
Y_ABORT_UNLESS(!Pipes[entityId].contains(dst));
18+
void Send(const TEntityId& entityId, TTabletId dst, THolder<IEventBase> message, const TActorContext& ctx) {
2019
using namespace NTabletPipe;
2120

22-
const auto clientId = ctx.Register(CreateClient(ctx.SelfID, ui64(dst), TClientRetryPolicy {
23-
.MinRetryTime = TDuration::MilliSeconds(100),
24-
.MaxRetryTime = TDuration::Seconds(30),
25-
}));
21+
if (!Pipes[entityId].contains(dst)) {
22+
const auto clientId = ctx.Register(CreateClient(ctx.SelfID, ui64(dst), TClientRetryPolicy {
23+
.MinRetryTime = TDuration::MilliSeconds(100),
24+
.MaxRetryTime = TDuration::Seconds(30),
25+
}));
2626

27-
Pipes[entityId][dst] = clientId;
28-
Owners[clientId] = std::make_pair(entityId, dst);
27+
Pipes[entityId][dst] = clientId;
28+
Owners[clientId] = std::make_pair(entityId, dst);
29+
}
2930

31+
const auto clientId = Pipes[entityId][dst];
32+
Y_ABORT_UNLESS(Owners[clientId] == std::make_pair(entityId, dst));
3033
SendData(ctx.SelfID, clientId, message.Release(), 0);
3134
}
3235

@@ -53,10 +56,10 @@ class TDedicatedPipePool {
5356
}
5457
}
5558

56-
ui64 CloseAll(const TEntityId& entityId, const TActorContext& ctx) {
59+
void CloseAll(const TEntityId& entityId, const TActorContext& ctx) {
5760
auto entityIt = Pipes.find(entityId);
5861
if (entityIt == Pipes.end()) {
59-
return 0;
62+
return;
6063
}
6164

6265
const auto& entityPipes = entityIt->second;
@@ -70,7 +73,7 @@ class TDedicatedPipePool {
7073
Close(entityId, tabletId, ctx);
7174
}
7275

73-
return tablets.size();
76+
return;
7477
}
7578

7679
void Shutdown(const TActorContext& ctx) {

ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
489489
private:
490490
TIndexBuildId BuildId;
491491

492-
TDeque<std::tuple<TTabletId, ui64, THolder<IEventBase>>> ToTabletSend;
492+
TMap<TTabletId, THolder<IEventBase>> ToTabletSend;
493493

494494
template <typename Record>
495495
TTabletId CommonFillRecord(Record& record, TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
@@ -541,9 +541,9 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
541541

542542
auto shardId = CommonFillRecord(ev->Record, shardIdx, buildInfo);
543543
ev->Record.SetSeed(ui64(shardId));
544-
LOG_D("TTxBuildProgress: TEvSampleKRequest: " << ev->Record.ShortDebugString());
544+
LOG_N("TTxBuildProgress: TEvSampleKRequest: " << ev->Record.ShortDebugString());
545545

546-
ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
546+
ToTabletSend.emplace(shardId, std::move(ev));
547547
}
548548

549549
void SendKMeansReshuffleRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
@@ -585,9 +585,9 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
585585
r.ClearClusters();
586586
return r.ShortDebugString();
587587
};
588-
LOG_D("TTxBuildProgress: TEvReshuffleKMeansRequest: " << toDebugStr(ev->Record));
588+
LOG_N("TTxBuildProgress: TEvReshuffleKMeansRequest: " << toDebugStr(ev->Record));
589589

590-
ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
590+
ToTabletSend.emplace(shardId, std::move(ev));
591591
}
592592

593593
void SendKMeansLocalRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
@@ -637,9 +637,9 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
637637

638638
auto shardId = CommonFillRecord(ev->Record, shardIdx, buildInfo);
639639
ev->Record.SetSeed(ui64(shardId));
640-
LOG_D("TTxBuildProgress: TEvLocalKMeansRequest: " << ev->Record.ShortDebugString());
640+
LOG_N("TTxBuildProgress: TEvLocalKMeansRequest: " << ev->Record.ShortDebugString());
641641

642-
ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
642+
ToTabletSend.emplace(shardId, std::move(ev));
643643
}
644644

645645
void SendBuildIndexRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
@@ -688,9 +688,9 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
688688

689689
auto shardId = CommonFillRecord(ev->Record, shardIdx, buildInfo);
690690

691-
LOG_D("TTxBuildProgress: TEvBuildIndexCreateRequest: " << ev->Record.ShortDebugString());
691+
LOG_N("TTxBuildProgress: TEvBuildIndexCreateRequest: " << ev->Record.ShortDebugString());
692692

693-
ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
693+
ToTabletSend.emplace(shardId, std::move(ev));
694694
}
695695

696696
void SendUploadSampleKRequest(TIndexBuildInfo& buildInfo) {
@@ -712,6 +712,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
712712
buildInfo.InProgressShards = {};
713713
buildInfo.ToUploadShards = {};
714714

715+
ToTabletSend.clear();
715716
Self->IndexBuildPipes.CloseAll(BuildId, ctx);
716717
}
717718

@@ -968,8 +969,8 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
968969
Y_ABORT_UNLESS(buildInfoPtr);
969970
auto& buildInfo = *buildInfoPtr->Get();
970971

971-
LOG_I("TTxBuildProgress: Resume: id# " << BuildId);
972-
LOG_D("TTxBuildProgress: Resume: " << buildInfo);
972+
LOG_N("TTxBuildProgress: Execute: " << BuildId << " " << buildInfo.State);
973+
LOG_D("TTxBuildProgress: Execute: " << BuildId << " " << buildInfo.State << " " << buildInfo);
973974

974975
switch (buildInfo.State) {
975976
case TIndexBuildInfo::EState::Invalid:
@@ -1227,8 +1228,8 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
12271228
}
12281229

12291230
void DoComplete(const TActorContext& ctx) override {
1230-
for (auto& x: ToTabletSend) {
1231-
Self->IndexBuildPipes.Create(BuildId, std::get<0>(x), std::move(std::get<2>(x)), ctx);
1231+
for (auto& [shardId, ev]: ToTabletSend) {
1232+
Self->IndexBuildPipes.Send(BuildId, shardId, std::move(ev), ctx);
12321233
}
12331234
ToTabletSend.clear();
12341235
}
@@ -1303,7 +1304,7 @@ struct TSchemeShard::TIndexBuilder::TTxReplyRetry: public TSchemeShard::TIndexBu
13031304
const auto& tabletId = PipeRetry.TabletId;
13041305
const auto& shardIdx = Self->GetShardIdx(tabletId);
13051306

1306-
LOG_I("TTxReply : PipeRetry, id# " << buildId
1307+
LOG_N("TTxReply : PipeRetry, id# " << buildId
13071308
<< ", tabletId# " << tabletId
13081309
<< ", shardIdx# " << shardIdx);
13091310

@@ -2120,8 +2121,8 @@ struct TSchemeShard::TIndexBuilder::TTxReplyModify: public TSchemeShard::TIndexB
21202121
<< ", BuildIndexId: " << buildInfo.Id
21212122
<< ", status: " << Ydb::StatusIds::StatusCode_Name(status)
21222123
<< ", error: " << buildInfo.Issue
2123-
<< ", replyTo: " << buildInfo.CreateSender.ToString());
2124-
LOG_D("Message:\n" << responseEv->Record.ShortDebugString());
2124+
<< ", replyTo: " << buildInfo.CreateSender.ToString()
2125+
<< ", message: " << responseEv->Record.ShortDebugString());
21252126

21262127
Send(buildInfo.CreateSender, std::move(responseEv), 0, buildInfo.SenderCookie);
21272128
}

ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ struct TSchemeShard::TCdcStreamScan::TTxProgress: public TTransactionBase<TSchem
115115

116116
void Complete(const TActorContext& ctx) override {
117117
for (auto& [streamPathId, tabletId, ev] : ScanRequests) {
118-
Self->CdcStreamScanPipes.Create(streamPathId, tabletId, std::move(ev), ctx);
118+
Self->CdcStreamScanPipes.Send(streamPathId, tabletId, std::move(ev), ctx);
119119
}
120120

121121
if (StreamToProgress) {

0 commit comments

Comments
 (0)