Skip to content

Commit e8c3843

Browse files
authored
Pre-serialized bootstrap config (#9331)
1 parent 2d9c88c commit e8c3843

File tree

12 files changed

+88
-32
lines changed

12 files changed

+88
-32
lines changed

ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ class TMessageBusServerPersQueueRequestTestBase: public TTestBase {
169169
static int version = 0;
170170
++version;
171171

172-
THolder<TEvPersQueue::TEvUpdateConfig> request(new TEvPersQueue::TEvUpdateConfig());
172+
auto request = MakeHolder<TEvPersQueue::TEvUpdateConfigBuilder>();
173173
for (size_t i : partitions) {
174174
request->Record.MutableTabletConfig()->AddPartitionIds(i);
175175
}

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2695,7 +2695,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
26952695
for (auto& [tabletId, t] : topicTxs) {
26962696
auto& transaction = t.tx;
26972697

2698-
auto ev = std::make_unique<TEvPersQueue::TEvProposeTransaction>();
2698+
auto ev = std::make_unique<TEvPersQueue::TEvProposeTransactionBuilder>();
26992699

27002700
if (t.hasWrite && writeId.Defined()) {
27012701
auto* w = transaction.MutableWriteId();

ydb/core/persqueue/events/global.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,15 @@ struct TEvPersQueue {
7373
TEvResponse() {}
7474
};
7575

76-
struct TEvUpdateConfig: public TEventPB<TEvUpdateConfig,
76+
struct TEvUpdateConfig: public TEventPreSerializedPB<TEvUpdateConfig,
7777
NKikimrPQ::TUpdateConfig, EvUpdateConfig> {
7878
TEvUpdateConfig() {}
7979
};
8080

81+
struct TEvUpdateConfigBuilder: public TEvUpdateConfig {
82+
using TBase::Record;
83+
};
84+
8185
struct TEvUpdateBalancerConfig: public TEventPB<TEvUpdateBalancerConfig,
8286
NKikimrPQ::TUpdateBalancerConfig, EvUpdateBalancerConfig> {
8387
TEvUpdateBalancerConfig() {}
@@ -245,7 +249,11 @@ struct TEvPersQueue {
245249
{}
246250
};
247251

248-
struct TEvProposeTransaction : public TEventPB<TEvProposeTransaction, NKikimrPQ::TEvProposeTransaction, EvProposeTransaction> {
252+
struct TEvProposeTransaction : public TEventPreSerializedPB<TEvProposeTransaction, NKikimrPQ::TEvProposeTransaction, EvProposeTransaction> {
253+
};
254+
255+
struct TEvProposeTransactionBuilder: public TEvProposeTransaction {
256+
using TBase::Record;
249257
};
250258

251259
struct TEvProposeTransactionResult : public TEventPB<TEvProposeTransactionResult, NKikimrPQ::TEvProposeTransactionResult, EvProposeTransactionResult> {

ydb/core/persqueue/partition.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,7 @@ void TPartition::Handle(TEvPQ::TEvUpdateWriteTimestamp::TPtr& ev, const TActorCo
899899

900900
void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
901901
{
902-
const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record;
902+
const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->GetRecord();
903903
Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
904904
Y_ABORT_UNLESS(event.HasData());
905905
const NKikimrPQ::TDataTransaction& txBody = event.GetData();
@@ -1990,7 +1990,7 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple
19901990
return EProcessResult::Continue;
19911991
}
19921992
t->Predicate.ConstructInPlace(true);
1993-
return PreProcessImmediateTx(t->ProposeTransaction->Record);
1993+
return PreProcessImmediateTx(t->ProposeTransaction->GetRecord());
19941994

19951995
} else if (t->Tx) { // Distributed TX
19961996
if (t->Predicate.Defined()) { // Predicate defined - either failed previously or Tx created with predicate defined.
@@ -2573,7 +2573,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
25732573
void TPartition::ExecImmediateTx(TTransaction& t)
25742574
{
25752575
--ImmediateTxCount;
2576-
auto& record = t.ProposeTransaction->Record;
2576+
const auto& record = t.ProposeTransaction->GetRecord();
25772577
Y_ABORT_UNLESS(record.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
25782578
Y_ABORT_UNLESS(record.HasData());
25792579

@@ -2586,7 +2586,7 @@ void TPartition::ExecImmediateTx(TTransaction& t)
25862586
t.Message);
25872587
return;
25882588
}
2589-
for (auto& operation : record.GetData().GetOperations()) {
2589+
for (const auto& operation : record.GetData().GetOperations()) {
25902590
if (!operation.HasBegin() || !operation.HasEnd() || !operation.HasConsumer()) {
25912591
continue; //Write operation - handled separately via WriteInfo
25922592
}

ydb/core/persqueue/partition.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,9 @@ struct TTransaction {
7373
: ProposeTransaction(proposeTx)
7474
, State(ECommitState::Committed)
7575
{
76-
if (proposeTx->Record.HasSupportivePartitionActor()) {
77-
SupportivePartitionActor = ActorIdFromProto(proposeTx->Record.GetSupportivePartitionActor());
76+
const auto& record = proposeTx->GetRecord();
77+
if (record.HasSupportivePartitionActor()) {
78+
SupportivePartitionActor = ActorIdFromProto(record.GetSupportivePartitionActor());
7879
}
7980
Y_ABORT_UNLESS(ProposeTransaction);
8081
}

ydb/core/persqueue/pq_impl.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1626,7 +1626,7 @@ void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config,
16261626

16271627
void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConfig> ev, const TActorId& sender, const TActorContext& ctx)
16281628
{
1629-
auto& record = ev->Record;
1629+
const auto& record = ev->GetRecord();
16301630

16311631
int oldConfigVersion = Config.HasVersion() ? static_cast<int>(Config.GetVersion()) : -1;
16321632
int newConfigVersion = NewConfig.HasVersion() ? static_cast<int>(NewConfig.GetVersion()) : oldConfigVersion;
@@ -3259,9 +3259,9 @@ void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, co
32593259

32603260
void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
32613261
{
3262-
PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << ev->Get()->Record.ShortDebugString());
3262+
const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->GetRecord();
3263+
PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << event.ShortDebugString());
32633264

3264-
NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record;
32653265
switch (event.GetTxBodyCase()) {
32663266
case NKikimrPQ::TEvProposeTransaction::kData:
32673267
HandleDataTransaction(ev->Release(), ctx);
@@ -3316,7 +3316,7 @@ bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBod
33163316
void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> ev,
33173317
const TActorContext& ctx)
33183318
{
3319-
NKikimrPQ::TEvProposeTransaction& event = ev->Record;
3319+
NKikimrPQ::TEvProposeTransaction& event = *ev->MutableRecord();
33203320
Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
33213321
Y_ABORT_UNLESS(event.HasData());
33223322
const NKikimrPQ::TDataTransaction& txBody = event.GetData();
@@ -3427,7 +3427,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
34273427
void TPersQueue::HandleConfigTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> ev,
34283428
const TActorContext& ctx)
34293429
{
3430-
NKikimrPQ::TEvProposeTransaction& event = ev->Record;
3430+
const NKikimrPQ::TEvProposeTransaction& event = ev->GetRecord();
34313431
Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kConfig);
34323432
Y_ABORT_UNLESS(event.HasConfig());
34333433

@@ -3705,7 +3705,7 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
37053705
const auto front = std::move(EvProposeTransactionQueue.front());
37063706
EvProposeTransactionQueue.pop_front();
37073707

3708-
const NKikimrPQ::TEvProposeTransaction& event = front->Record;
3708+
const NKikimrPQ::TEvProposeTransaction& event = front->GetRecord();
37093709
TDistributedTransaction& tx = Txs[event.GetTxId()];
37103710

37113711
switch (tx.State) {

ydb/core/persqueue/ut/common/pq_ut_common.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters,
4444
try {
4545
runtime.ResetScheduledCount();
4646

47-
THolder<TEvPersQueue::TEvUpdateConfig> request(new TEvPersQueue::TEvUpdateConfig());
47+
auto request = MakeHolder<TEvPersQueue::TEvUpdateConfigBuilder>();
4848
for (ui32 i = 0; i < parameters.partitions; ++i) {
4949
request->Record.MutableTabletConfig()->AddPartitionIds(i);
5050
}

ydb/core/persqueue/ut/partition_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -882,7 +882,7 @@ void TPartitionFixture::SendProposeTransactionRequest(ui32 partition,
882882
bool immediate,
883883
ui64 txId)
884884
{
885-
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
885+
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();
886886

887887
ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor());
888888
auto* body = event->Record.MutableData();
@@ -1606,7 +1606,7 @@ ui64 TPartitionTxTestHelper::MakeAndSendWriteTx(const TSrcIdMap& srcIdsAffected)
16061606
ui64 TPartitionTxTestHelper::MakeAndSendImmediateTx(const TSrcIdMap& srcIdsAffected) {
16071607
auto actIter = AddWriteTxImpl(srcIdsAffected, NextActId++, 0);
16081608

1609-
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
1609+
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();
16101610

16111611
ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor());
16121612
auto* body = event->Record.MutableData();

ydb/core/persqueue/ut/pqtablet_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ void TPQTabletFixture::SendToPipe(const TActorId& sender,
294294

295295
void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionParams& params)
296296
{
297-
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
297+
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();
298298
THashSet<ui32> partitions;
299299

300300
ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor());

ydb/core/persqueue/ut/user_action_processor_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,7 @@ void TUserActionProcessorFixture::SendProposeTransactionRequest(ui32 partition,
647647
bool immediate,
648648
ui64 txId)
649649
{
650-
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
650+
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();
651651

652652
ActorIdToProto(Ctx->Edge, event->Record.MutableSource());
653653
auto* body = event->Record.MutableTxBody();
@@ -665,7 +665,7 @@ void TUserActionProcessorFixture::SendProposeTransactionRequest(ui32 partition,
665665

666666
void TUserActionProcessorFixture::SendProposeTransactionRequest(const TProposeTransactionParams& params)
667667
{
668-
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
668+
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();
669669

670670
//
671671
// Source

0 commit comments

Comments
 (0)