Skip to content

Commit 7d337c0

Browse files
clean useless case for evwrite (#9716)
1 parent 14936b2 commit 7d337c0

File tree

13 files changed

+529
-573
lines changed

13 files changed

+529
-573
lines changed

ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -97,17 +97,6 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
9797
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
9898
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID());
9999
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
100-
} else if (operation->GetBehaviour() == EOperationBehaviour::InTxWrite) {
101-
NKikimrTxColumnShard::TCommitWriteTxBody proto;
102-
proto.SetLockId(operation->GetLockId());
103-
TString txBody;
104-
Y_ABORT_UNLESS(proto.SerializeToString(&txBody));
105-
auto op = Self->GetProgressTxController().StartProposeOnExecute(
106-
TTxController::TTxInfo(
107-
NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, operation->GetLockId(), writeMeta.GetSource(), operation->GetCookie(), {}),
108-
txBody, txc);
109-
AFL_VERIFY(!op->IsFail());
110-
ResultOperators.emplace_back(op);
111100
} else {
112101
auto& info = Self->OperationsManager->GetLockVerified(operation->GetLockId());
113102
NKikimrDataEvents::TLock lock;

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -459,12 +459,12 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
459459
if (conclusionParse.IsFail()) {
460460
sendError(conclusionParse.GetErrorMessage(), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
461461
} else {
462-
if (commitOperation->NeedSyncLocks()) {
463-
auto* lockInfo = OperationsManager->GetLockOptional(commitOperation->GetLockId());
464-
if (!lockInfo) {
465-
sendError("haven't lock for commit: " + ::ToString(commitOperation->GetLockId()),
466-
NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED);
467-
} else {
462+
auto* lockInfo = OperationsManager->GetLockOptional(commitOperation->GetLockId());
463+
if (!lockInfo) {
464+
sendError("haven't lock for commit: " + ::ToString(commitOperation->GetLockId()),
465+
NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
466+
} else {
467+
if (commitOperation->NeedSyncLocks()) {
468468
if (lockInfo->GetGeneration() != commitOperation->GetGeneration()) {
469469
sendError("tablet lock have another generation: " + ::ToString(lockInfo->GetGeneration()) +
470470
" != " + ::ToString(commitOperation->GetGeneration()),
@@ -477,9 +477,9 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
477477
} else {
478478
Execute(new TProposeWriteTransaction(this, commitOperation, source, cookie), ctx);
479479
}
480+
} else {
481+
Execute(new TProposeWriteTransaction(this, commitOperation, source, cookie), ctx);
480482
}
481-
} else {
482-
Execute(new TProposeWriteTransaction(this, commitOperation, source, cookie), ctx);
483483
}
484484
}
485485
return;
@@ -557,8 +557,6 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
557557
ui64 lockId = 0;
558558
if (behaviour == EOperationBehaviour::NoTxWrite) {
559559
lockId = BuildEphemeralTxId();
560-
} else if (behaviour == EOperationBehaviour::InTxWrite) {
561-
lockId = record.GetTxId();
562560
} else {
563561
lockId = record.GetLockTxId();
564562
}

ydb/core/tx/columnshard/operations/manager.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,9 +255,6 @@ TConclusion<EOperationBehaviour> TOperationsManager::GetBehaviour(const NEvents:
255255
return EOperationBehaviour::NoTxWrite;
256256
}
257257

258-
if (evWrite.Record.HasTxId() && evWrite.Record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_PREPARE) {
259-
return EOperationBehaviour::InTxWrite;
260-
}
261258
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("proto", evWrite.Record.DebugString())("event", "undefined behaviour");
262259
return TConclusionStatus::Fail("undefined request for detect tx type");
263260
}

ydb/core/tx/columnshard/operations/write.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ enum class EOperationStatus : ui32 {
3737

3838
enum class EOperationBehaviour : ui32 {
3939
Undefined = 1,
40-
InTxWrite = 2,
4140
WriteWithLock = 3,
4241
CommitWriteLock = 4,
4342
AbortWriteLock = 5,

ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString
6363
return (res.GetStatus() == NKikimrTxColumnShard::PREPARED);
6464
}
6565

66-
void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap) {
66+
void PlanSchemaTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap) {
6767
auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(snap.GetPlanStep(), 0, TTestTxConfig::TxTablet0);
6868
auto tx = plan->Record.AddTransactions();
6969
tx->SetTxId(snap.GetTxId());
@@ -78,7 +78,7 @@ void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot
7878
UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::SUCCESS);
7979
}
8080

81-
void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, bool waitResult) {
81+
void PlanWriteTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap, bool waitResult) {
8282
auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(snap.GetPlanStep(), 0, TTestTxConfig::TxTablet0);
8383
auto tx = plan->Record.AddTransactions();
8484
tx->SetTxId(snap.GetTxId());
@@ -229,7 +229,7 @@ void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, con
229229
PlanCommit(runtime, sender, TTestTxConfig::TxTablet0, planStep, txIds);
230230
}
231231

232-
void Wakeup(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId) {
232+
void Wakeup(TTestBasicRuntime& runtime, const TActorId& sender, const ui64 shardId) {
233233
auto wakeup = std::make_unique<TEvPrivate::TEvPeriodicWakeup>(true);
234234
ForwardToTablet(runtime, shardId, sender, wakeup.release());
235235
}

ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -402,9 +402,9 @@ struct TTestSchema {
402402

403403
bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap);
404404
void ProvideTieringSnapshot(TTestBasicRuntime& runtime, const TActorId& sender, NMetadata::NFetcher::ISnapshot::TPtr snapshot);
405-
void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap);
405+
void PlanSchemaTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap);
406406

407-
void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, bool waitResult = true);
407+
void PlanWriteTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap, bool waitResult = true);
408408

409409
bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 writeId, const ui64 tableId, const TString& data,
410410
const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, std::vector<ui64>* writeIds,
@@ -435,7 +435,7 @@ inline void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planSt
435435
PlanCommit(runtime, sender, planStep, ids);
436436
}
437437

438-
void Wakeup(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId);
438+
void Wakeup(TTestBasicRuntime& runtime, const TActorId& sender, const ui64 shardId);
439439

440440
struct TTestBlobOptions {
441441
THashSet<TString> NullColumns;
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
#include "shard_reader.h"
2+
3+
namespace NKikimr::NTxUT {
4+
5+
std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> TShardReader::BuildStartEvent() const {
6+
auto ev = std::make_unique<TEvDataShard::TEvKqpScan>();
7+
ev->Record.SetLocalPathId(PathId);
8+
ev->Record.MutableSnapshot()->SetStep(Snapshot.GetPlanStep());
9+
ev->Record.MutableSnapshot()->SetTxId(Snapshot.GetTxId());
10+
11+
ev->Record.SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_FULL);
12+
ev->Record.SetTxId(Snapshot.GetTxId());
13+
14+
ev->Record.SetReverse(Reverse);
15+
ev->Record.SetItemsLimit(Limit);
16+
17+
ev->Record.SetDataFormat(NKikimrDataEvents::FORMAT_ARROW);
18+
19+
auto protoRanges = ev->Record.MutableRanges();
20+
protoRanges->Reserve(Ranges.size());
21+
for (auto& range : Ranges) {
22+
auto newRange = protoRanges->Add();
23+
range.Serialize(*newRange);
24+
}
25+
26+
if (ProgramProto) {
27+
NKikimrSSA::TOlapProgram olapProgram;
28+
{
29+
TString programBytes;
30+
TStringOutput stream(programBytes);
31+
ProgramProto->SerializeToArcadiaStream(&stream);
32+
olapProgram.SetProgram(programBytes);
33+
}
34+
{
35+
TString programBytes;
36+
TStringOutput stream(programBytes);
37+
olapProgram.SerializeToArcadiaStream(&stream);
38+
ev->Record.SetOlapProgram(programBytes);
39+
}
40+
ev->Record.SetOlapProgramType(NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS);
41+
} else if (SerializedProgram) {
42+
ev->Record.SetOlapProgram(*SerializedProgram);
43+
ev->Record.SetOlapProgramType(NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS);
44+
}
45+
46+
return ev;
47+
}
48+
49+
NKikimr::NTxUT::TShardReader& TShardReader::SetReplyColumns(const std::vector<TString>& replyColumns) {
50+
AFL_VERIFY(!SerializedProgram);
51+
if (!ProgramProto) {
52+
ProgramProto = NKikimrSSA::TProgram();
53+
}
54+
for (auto&& command : *ProgramProto->MutableCommand()) {
55+
if (command.HasProjection()) {
56+
NKikimrSSA::TProgram::TProjection proj;
57+
for (auto&& i : replyColumns) {
58+
proj.AddColumns()->SetName(i);
59+
}
60+
*command.MutableProjection() = proj;
61+
return *this;
62+
}
63+
}
64+
{
65+
auto* command = ProgramProto->AddCommand();
66+
NKikimrSSA::TProgram::TProjection proj;
67+
for (auto&& i : replyColumns) {
68+
proj.AddColumns()->SetName(i);
69+
}
70+
*command->MutableProjection() = proj;
71+
}
72+
return *this;
73+
}
74+
75+
NKikimr::NTxUT::TShardReader& TShardReader::SetReplyColumnIds(const std::vector<ui32>& replyColumnIds) {
76+
AFL_VERIFY(!SerializedProgram);
77+
if (!ProgramProto) {
78+
ProgramProto = NKikimrSSA::TProgram();
79+
}
80+
for (auto&& command : *ProgramProto->MutableCommand()) {
81+
if (command.HasProjection()) {
82+
NKikimrSSA::TProgram::TProjection proj;
83+
for (auto&& i : replyColumnIds) {
84+
proj.AddColumns()->SetId(i);
85+
}
86+
*command.MutableProjection() = proj;
87+
return *this;
88+
}
89+
}
90+
{
91+
auto* command = ProgramProto->AddCommand();
92+
NKikimrSSA::TProgram::TProjection proj;
93+
for (auto&& i : replyColumnIds) {
94+
proj.AddColumns()->SetId(i);
95+
}
96+
*command->MutableProjection() = proj;
97+
}
98+
return *this;
99+
}
100+
101+
}

ydb/core/tx/columnshard/test_helper/shard_reader.h

Lines changed: 3 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -28,53 +28,7 @@ class TShardReader {
2828
std::vector<TString> ReplyColumns;
2929
std::vector<TSerializedTableRange> Ranges;
3030

31-
std::unique_ptr<TEvDataShard::TEvKqpScan> BuildStartEvent() const {
32-
auto ev = std::make_unique<TEvDataShard::TEvKqpScan>();
33-
ev->Record.SetLocalPathId(PathId);
34-
ev->Record.MutableSnapshot()->SetStep(Snapshot.GetPlanStep());
35-
ev->Record.MutableSnapshot()->SetTxId(Snapshot.GetTxId());
36-
37-
ev->Record.SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_FULL);
38-
ev->Record.SetTxId(Snapshot.GetTxId());
39-
40-
ev->Record.SetReverse(Reverse);
41-
ev->Record.SetItemsLimit(Limit);
42-
43-
ev->Record.SetDataFormat(NKikimrDataEvents::FORMAT_ARROW);
44-
45-
auto protoRanges = ev->Record.MutableRanges();
46-
protoRanges->Reserve(Ranges.size());
47-
for (auto& range : Ranges) {
48-
auto newRange = protoRanges->Add();
49-
range.Serialize(*newRange);
50-
}
51-
52-
if (ProgramProto) {
53-
NKikimrSSA::TOlapProgram olapProgram;
54-
{
55-
TString programBytes;
56-
TStringOutput stream(programBytes);
57-
ProgramProto->SerializeToArcadiaStream(&stream);
58-
olapProgram.SetProgram(programBytes);
59-
}
60-
{
61-
TString programBytes;
62-
TStringOutput stream(programBytes);
63-
olapProgram.SerializeToArcadiaStream(&stream);
64-
ev->Record.SetOlapProgram(programBytes);
65-
}
66-
ev->Record.SetOlapProgramType(
67-
NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS
68-
);
69-
} else if (SerializedProgram) {
70-
ev->Record.SetOlapProgram(*SerializedProgram);
71-
ev->Record.SetOlapProgramType(
72-
NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS
73-
);
74-
}
75-
76-
return ev;
77-
}
31+
std::unique_ptr<TEvDataShard::TEvKqpScan> BuildStartEvent() const;
7832

7933
std::vector<std::shared_ptr<arrow::RecordBatch>> ResultBatches;
8034
YDB_READONLY(ui32, IterationsCount, 0);
@@ -100,57 +54,9 @@ class TShardReader {
10054
return r ? r->num_rows() : 0;
10155
}
10256

103-
TShardReader& SetReplyColumns(const std::vector<TString>& replyColumns) {
104-
AFL_VERIFY(!SerializedProgram);
105-
if (!ProgramProto) {
106-
ProgramProto = NKikimrSSA::TProgram();
107-
}
108-
for (auto&& command : *ProgramProto->MutableCommand()) {
109-
if (command.HasProjection()) {
110-
NKikimrSSA::TProgram::TProjection proj;
111-
for (auto&& i : replyColumns) {
112-
proj.AddColumns()->SetName(i);
113-
}
114-
*command.MutableProjection() = proj;
115-
return *this;
116-
}
117-
}
118-
{
119-
auto* command = ProgramProto->AddCommand();
120-
NKikimrSSA::TProgram::TProjection proj;
121-
for (auto&& i : replyColumns) {
122-
proj.AddColumns()->SetName(i);
123-
}
124-
*command->MutableProjection() = proj;
125-
}
126-
return *this;
127-
}
57+
TShardReader& SetReplyColumns(const std::vector<TString>& replyColumns);
12858

129-
TShardReader& SetReplyColumnIds(const std::vector<ui32>& replyColumnIds) {
130-
AFL_VERIFY(!SerializedProgram);
131-
if (!ProgramProto) {
132-
ProgramProto = NKikimrSSA::TProgram();
133-
}
134-
for (auto&& command : *ProgramProto->MutableCommand()) {
135-
if (command.HasProjection()) {
136-
NKikimrSSA::TProgram::TProjection proj;
137-
for (auto&& i : replyColumnIds) {
138-
proj.AddColumns()->SetId(i);
139-
}
140-
*command.MutableProjection() = proj;
141-
return *this;
142-
}
143-
}
144-
{
145-
auto* command = ProgramProto->AddCommand();
146-
NKikimrSSA::TProgram::TProjection proj;
147-
for (auto&& i : replyColumnIds) {
148-
proj.AddColumns()->SetId(i);
149-
}
150-
*command->MutableProjection() = proj;
151-
}
152-
return *this;
153-
}
59+
TShardReader& SetReplyColumnIds(const std::vector<ui32>& replyColumnIds);
15460

15561
TShardReader& SetProgram(const NKikimrSSA::TProgram& p) {
15662
AFL_VERIFY(!ProgramProto);

0 commit comments

Comments
 (0)