Skip to content

Commit cafeed8

Browse files
authored
Fix initialization of explicit messages groups (#8745)
1 parent 656b1b8 commit cafeed8

File tree

8 files changed

+44
-12
lines changed

8 files changed

+44
-12
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,13 +576,15 @@ struct TEvPQ {
576576
};
577577

578578
struct TEvChangePartitionConfig : public TEventLocal<TEvChangePartitionConfig, EvChangePartitionConfig> {
579-
TEvChangePartitionConfig(const NPersQueue::TTopicConverterPtr& topicConverter, const NKikimrPQ::TPQTabletConfig& config)
579+
TEvChangePartitionConfig(const NPersQueue::TTopicConverterPtr& topicConverter, const NKikimrPQ::TPQTabletConfig& config, const NKikimrPQ::TBootstrapConfig& bootstrapConfig)
580580
: TopicConverter(topicConverter)
581581
, Config(config)
582+
, BootstrapConfig(bootstrapConfig)
582583
{}
583584

584585
NPersQueue::TTopicConverterPtr TopicConverter;
585586
NKikimrPQ::TPQTabletConfig Config;
587+
NKikimrPQ::TBootstrapConfig BootstrapConfig;
586588
};
587589

588590
struct TEvPartitionConfigChanged : public TEventLocal<TEvPartitionConfigChanged, EvPartitionConfigChanged> {
@@ -845,6 +847,7 @@ struct TEvPQ {
845847
ui64 TxId;
846848
NPersQueue::TTopicConverterPtr TopicConverter;
847849
NKikimrPQ::TPQTabletConfig Config;
850+
NKikimrPQ::TBootstrapConfig BootstrapConfig;
848851
};
849852

850853
struct TEvProposePartitionConfigResult : public TEventLocal<TEvProposePartitionConfigResult, EvProposePartitionConfigResult> {

ydb/core/persqueue/partition.cpp

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2037,7 +2037,8 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& t,
20372037
} else if (t->ProposeConfig) {
20382038
Y_ABORT_UNLESS(ChangingConfig);
20392039
ChangeConfig = MakeSimpleShared<TEvPQ::TEvChangePartitionConfig>(TopicConverter,
2040-
t->ProposeConfig->Config);
2040+
t->ProposeConfig->Config,
2041+
t->ProposeConfig->BootstrapConfig);
20412042
PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config);
20422043
SendChangeConfigReply = false;
20432044
}
@@ -2123,7 +2124,8 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event)
21232124
{
21242125
ChangeConfig =
21252126
MakeSimpleShared<TEvPQ::TEvChangePartitionConfig>(TopicConverter,
2126-
event.Config);
2127+
event.Config,
2128+
event.BootstrapConfig);
21272129
PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config);
21282130

21292131
SendChangeConfigReply = false;
@@ -2360,6 +2362,7 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx)
23602362

23612363
if (ChangeConfig) {
23622364
EndChangePartitionConfig(std::move(ChangeConfig->Config),
2365+
std::move(ChangeConfig->BootstrapConfig),
23632366
ChangeConfig->TopicConverter,
23642367
ctx);
23652368
}
@@ -2426,12 +2429,24 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx)
24262429
}
24272430

24282431
void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config,
2432+
NKikimrPQ::TBootstrapConfig&& bootstrapConfig,
24292433
NPersQueue::TTopicConverterPtr topicConverter,
24302434
const TActorContext& ctx)
24312435
{
24322436
Config = std::move(config);
24332437
PartitionConfig = GetPartitionConfig(Config);
24342438
PartitionGraph = MakePartitionGraph(Config);
2439+
2440+
for (const auto& mg : bootstrapConfig.GetExplicitMessageGroups()) {
2441+
TMaybe<TPartitionKeyRange> keyRange;
2442+
if (mg.HasKeyRange()) {
2443+
keyRange = TPartitionKeyRange::Parse(mg.GetKeyRange());
2444+
}
2445+
2446+
TSourceIdInfo sourceId(0, 0, ctx.Now(), std::move(keyRange), false);
2447+
SourceIdStorage.RegisterSourceIdInfo(mg.GetId(), std::move(sourceId), true);
2448+
}
2449+
24352450
TopicConverter = topicConverter;
24362451
NewPartition = false;
24372452

@@ -2441,14 +2456,15 @@ void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config,
24412456
InitSplitMergeSlidingWindow();
24422457
}
24432458

2444-
Send(ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
2445-
Send(WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
2459+
Send(ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config, bootstrapConfig));
2460+
Send(WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config, bootstrapConfig));
24462461
TotalPartitionWriteSpeed = config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond();
24472462

24482463
if (Config.GetPartitionConfig().HasMirrorFrom()) {
24492464
if (Mirrorer) {
24502465
ctx.Send(Mirrorer->Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter,
2451-
Config));
2466+
Config,
2467+
bootstrapConfig));
24522468
} else {
24532469
CreateMirrorerActor();
24542470
}

ydb/core/persqueue/partition.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
390390
void OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx);
391391

392392
void EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config,
393+
NKikimrPQ::TBootstrapConfig&& bootstrapConfig,
393394
NPersQueue::TTopicConverterPtr topicConverter,
394395
const TActorContext& ctx);
395396
TString GetKeyConfig() const;

ydb/core/persqueue/partition_init.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
169169

170170
if (Partition()->Config.GetVersion() < Partition()->TabletConfig.GetVersion()) {
171171
auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(Partition()->TopicConverter,
172-
Partition()->TabletConfig);
172+
Partition()->TabletConfig,
173+
NKikimrPQ::TBootstrapConfig());
173174
Partition()->PushFrontDistrTx(event.Release());
174175
}
175176
break;

ydb/core/persqueue/pq_impl.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -691,7 +691,7 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
691691
ClearNewConfig();
692692

693693
for (auto& p : Partitions) { //change config for already created partitions
694-
ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
694+
ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config, BootstrapConfigTx ? *BootstrapConfigTx : NKikimrPQ::TBootstrapConfig()));
695695
}
696696
ChangePartitionConfigInflight += Partitions.size();
697697

@@ -1805,7 +1805,7 @@ void TPersQueue::AddCmdWriteConfig(TEvKeyValue::TEvRequest* request,
18051805
keyRange = TPartitionKeyRange::Parse(mg.GetKeyRange());
18061806
}
18071807

1808-
sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, ctx.Now(), std::move(keyRange));
1808+
sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, ctx.Now(), std::move(keyRange), false);
18091809
}
18101810

18111811
for (const auto& partition : cfg.GetPartitions()) {
@@ -4543,6 +4543,7 @@ void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
45434543

45444544
event->TopicConverter = tx.TopicConverter;
45454545
event->Config = tx.TabletConfig;
4546+
event->BootstrapConfig = tx.BootstrapConfig;
45464547

45474548
ctx.Send(partition.Actor, std::move(event));
45484549
}

ydb/core/persqueue/sourceid.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class TSourceIdStorage: private THeartbeatProcessor {
5151
void RegisterSourceId(const TString& sourceId, Args&&... args) {
5252
RegisterSourceIdInfo(sourceId, TSourceIdInfo(std::forward<Args>(args)...), false);
5353
}
54+
void RegisterSourceIdInfo(const TString& sourceId, TSourceIdInfo&& sourceIdInfo, bool load);
5455

5556
void DeregisterSourceId(const TString& sourceId);
5657

@@ -65,7 +66,6 @@ class TSourceIdStorage: private THeartbeatProcessor {
6566
private:
6667
void LoadRawSourceIdInfo(const TString& key, const TString& data, TInstant now);
6768
void LoadProtoSourceIdInfo(const TString& key, const TString& data);
68-
void RegisterSourceIdInfo(const TString& sourceId, TSourceIdInfo&& sourceIdInfo, bool load);
6969

7070
private:
7171
TSourceIdMap InMemorySourceIds;

ydb/core/persqueue/ut/partition_ut.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -988,7 +988,8 @@ void TPartitionFixture::SendChangePartitionConfig(const TConfigParams& config)
988988
auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(TopicConverter, MakeConfig(config.Version,
989989
config.Consumers,
990990
1,
991-
config.MeteringMode));
991+
config.MeteringMode),
992+
NKikimrPQ::TBootstrapConfig());
992993
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
993994
}
994995

ydb/core/tx/datashard/datashard_ut_change_exchange.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2734,12 +2734,13 @@ Y_UNIT_TEST_SUITE(Cdc) {
27342734
}
27352735
}
27362736

2737-
Y_UNIT_TEST(InitialScan) {
2737+
void InitialScanTest(bool withTopicSchemeTx) {
27382738
TPortManager portManager;
27392739
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
27402740
.SetUseRealThreads(false)
27412741
.SetDomainName("Root")
27422742
.SetEnableChangefeedInitialScan(true)
2743+
.SetEnablePQConfigTransactionsAtSchemeShard(withTopicSchemeTx)
27432744
);
27442745

27452746
auto& runtime = *server->GetRuntime();
@@ -2782,6 +2783,14 @@ Y_UNIT_TEST_SUITE(Cdc) {
27822783
});
27832784
}
27842785

2786+
Y_UNIT_TEST(InitialScan) {
2787+
InitialScanTest(false);
2788+
}
2789+
2790+
Y_UNIT_TEST(InitialScan_WithTopicSchemeTx) {
2791+
InitialScanTest(true);
2792+
}
2793+
27852794
Y_UNIT_TEST(InitialScanDebezium) {
27862795
TTestTopicEnv env(SimpleTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson, "UnusedStream"));
27872796
auto& client = env.GetClient();

0 commit comments

Comments
 (0)