1
1
#include " tx_write.h"
2
+
3
+ #include < ydb/core/tx/columnshard/engines/insert_table/user_data.h>
2
4
#include < ydb/core/tx/columnshard/transactions/locks/write.h>
3
5
4
6
namespace NKikimr ::NColumnShard {
5
7
6
- bool TTxWrite::InsertOneBlob (TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId) {
8
+ bool TTxWrite::InsertOneBlob (TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId) {
7
9
NKikimrTxColumnShard::TLogicalMetadata meta;
8
10
meta.SetNumRows (batch->GetRowsCount ());
9
11
meta.SetRawBytes (batch->GetRawBytes ());
@@ -23,9 +25,8 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
23
25
auto schemeVersion = batch.GetAggregation ().GetSchemaVersion ();
24
26
auto tableSchema = Self->TablesManager .GetPrimaryIndex ()->GetVersionedIndex ().GetSchemaVerified (schemeVersion);
25
27
26
- NOlap::TInsertedData insertData ((ui64)writeId, writeMeta.GetTableId (), writeMeta.GetDedupId (), blobRange,
27
- meta, tableSchema->GetVersion (),
28
- batch->GetData ());
28
+ auto userData = std::make_shared<NOlap::TUserData>(writeMeta.GetTableId (), blobRange, meta, tableSchema->GetVersion (), batch->GetData ());
29
+ NOlap::TInsertedData insertData (writeId, userData);
29
30
bool ok = Self->InsertTable ->Insert (dbTable, std::move (insertData));
30
31
if (ok) {
31
32
Self->UpdateInsertTableCounters ();
@@ -36,7 +37,8 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
36
37
37
38
bool TTxWrite::Execute (TTransactionContext& txc, const TActorContext&) {
38
39
TMemoryProfileGuard mpg (" TTxWrite::Execute" );
39
- NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build (NKikimrServices::TX_COLUMNSHARD_BLOBS)(" tablet_id" , Self->TabletID ())(" tx_state" , " execute" );
40
+ NActors::TLogContextGuard logGuard =
41
+ NActors::TLogContextBuilder::Build (NKikimrServices::TX_COLUMNSHARD_BLOBS)(" tablet_id" , Self->TabletID ())(" tx_state" , " execute" );
40
42
ACFL_DEBUG (" event" , " start_execute" );
41
43
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get ()->MutableWritesBuffer ();
42
44
for (auto && aggr : buffer.GetAggregations ()) {
@@ -45,33 +47,27 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
45
47
txc.DB .NoMoreReadsForTx ();
46
48
TWriteOperation::TPtr operation;
47
49
if (writeMeta.HasLongTxId ()) {
50
+ NIceDb::TNiceDb db (txc.DB );
51
+ const TInsertWriteId insertWriteId =
52
+ Self->GetLongTxWrite (db, writeMeta.GetLongTxIdUnsafe (), writeMeta.GetWritePartId (), writeMeta.GetGranuleShardingVersion ());
53
+ aggr->AddInsertWriteId (insertWriteId);
48
54
if (writeMeta.IsGuaranteeWriter ()) {
49
55
AFL_VERIFY (aggr->GetSplittedBlobs ().size () == 1 )(" count" , aggr->GetSplittedBlobs ().size ());
50
56
} else {
51
57
AFL_VERIFY (aggr->GetSplittedBlobs ().size () <= 1 )(" count" , aggr->GetSplittedBlobs ().size ());
52
58
}
59
+ if (aggr->GetSplittedBlobs ().size () == 1 ) {
60
+ AFL_VERIFY (InsertOneBlob (txc, aggr->GetSplittedBlobs ().front (), insertWriteId))(" write_id" , writeMeta.GetWriteId ())(
61
+ " insert_write_id" , insertWriteId);
62
+ }
53
63
} else {
54
- operation = Self->OperationsManager ->GetOperation ((TWriteId)writeMeta.GetWriteId ());
55
- Y_ABORT_UNLESS (operation);
64
+ operation = Self->OperationsManager ->GetOperationVerified ((TOperationWriteId)writeMeta.GetWriteId ());
56
65
Y_ABORT_UNLESS (operation->GetStatus () == EOperationStatus::Started);
57
- }
58
-
59
- auto writeId = TWriteId (writeMeta.GetWriteId ());
60
- if (!operation) {
61
- NIceDb::TNiceDb db (txc.DB );
62
- writeId = Self->GetLongTxWrite (db, writeMeta.GetLongTxIdUnsafe (), writeMeta.GetWritePartId (), writeMeta.GetGranuleShardingVersion ());
63
- aggr->AddWriteId (writeId);
64
- }
65
-
66
- for (auto && i : aggr->GetSplittedBlobs ()) {
67
- if (operation) {
68
- writeId = Self->BuildNextWriteId (txc);
69
- aggr->AddWriteId (writeId);
70
- }
71
-
72
- if (!InsertOneBlob (txc, i, writeId)) {
73
- LOG_S_DEBUG (TxPrefix () << " duplicate writeId " << (ui64)writeId << TxSuffix ());
74
- Self->Counters .GetTabletCounters ()->IncCounter (COUNTER_WRITE_DUPLICATE);
66
+ for (auto && i : aggr->GetSplittedBlobs ()) {
67
+ const TInsertWriteId insertWriteId = Self->InsertTable ->BuildNextWriteId (txc);
68
+ aggr->AddInsertWriteId (insertWriteId);
69
+ AFL_VERIFY (InsertOneBlob (txc, i, insertWriteId))(" write_id" , writeMeta.GetWriteId ())(" insert_write_id" , insertWriteId)(
70
+ " size" , aggr->GetSplittedBlobs ().size ());
75
71
}
76
72
}
77
73
}
@@ -88,9 +84,10 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
88
84
for (auto && aggr : buffer.GetAggregations ()) {
89
85
const auto & writeMeta = aggr->GetWriteMeta ();
90
86
if (!writeMeta.HasLongTxId ()) {
91
- auto operation = Self->OperationsManager ->GetOperationVerified ((TWriteId )writeMeta.GetWriteId ());
87
+ auto operation = Self->OperationsManager ->GetOperationVerified ((TOperationWriteId )writeMeta.GetWriteId ());
92
88
Y_ABORT_UNLESS (operation->GetStatus () == EOperationStatus::Started);
93
- operation->OnWriteFinish (txc, aggr->GetWriteIds (), operation->GetBehaviour () == EOperationBehaviour::NoTxWrite);
89
+ operation->OnWriteFinish (txc, aggr->GetInsertWriteIds (), operation->GetBehaviour () == EOperationBehaviour::NoTxWrite);
90
+ Self->OperationsManager ->LinkInsertWriteIdToOperationWriteId (aggr->GetInsertWriteIds (), operation->GetWriteId ());
94
91
if (operation->GetBehaviour () == EOperationBehaviour::NoTxWrite) {
95
92
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted (Self->TabletID ());
96
93
Results.emplace_back (std::move (ev), writeMeta.GetSource (), operation->GetCookie ());
@@ -119,8 +116,9 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
119
116
Results.emplace_back (std::move (ev), writeMeta.GetSource (), operation->GetCookie ());
120
117
}
121
118
} else {
122
- Y_ABORT_UNLESS (aggr->GetWriteIds ().size () == 1 );
123
- auto ev = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID (), writeMeta, (ui64)aggr->GetWriteIds ().front (), NKikimrTxColumnShard::EResultStatus::SUCCESS);
119
+ Y_ABORT_UNLESS (aggr->GetInsertWriteIds ().size () == 1 );
120
+ auto ev = std::make_unique<TEvColumnShard::TEvWriteResult>(
121
+ Self->TabletID (), writeMeta, (ui64)aggr->GetInsertWriteIds ().front (), NKikimrTxColumnShard::EResultStatus::SUCCESS);
124
122
Results.emplace_back (std::move (ev), writeMeta.GetSource (), 0 );
125
123
}
126
124
}
@@ -129,7 +127,8 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
129
127
130
128
void TTxWrite::Complete (const TActorContext& ctx) {
131
129
TMemoryProfileGuard mpg (" TTxWrite::Complete" );
132
- NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build (NKikimrServices::TX_COLUMNSHARD_BLOBS)(" tablet_id" , Self->TabletID ())(" tx_state" , " complete" );
130
+ NActors::TLogContextGuard logGuard =
131
+ NActors::TLogContextBuilder::Build (NKikimrServices::TX_COLUMNSHARD_BLOBS)(" tablet_id" , Self->TabletID ())(" tx_state" , " complete" );
133
132
const auto now = TMonotonic::Now ();
134
133
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get ()->MutableWritesBuffer ();
135
134
for (auto && i : buffer.GetAddActions ()) {
@@ -149,7 +148,7 @@ void TTxWrite::Complete(const TActorContext& ctx) {
149
148
for (ui32 i = 0 ; i < buffer.GetAggregations ().size (); ++i) {
150
149
const auto & writeMeta = buffer.GetAggregations ()[i]->GetWriteMeta ();
151
150
if (!writeMeta.HasLongTxId ()) {
152
- auto op = Self->GetOperationsManager ().GetOperationVerified (NOlap::TWriteId ( writeMeta.GetWriteId () ));
151
+ auto op = Self->GetOperationsManager ().GetOperationVerified ((TOperationWriteId) writeMeta.GetWriteId ());
153
152
if (op->GetBehaviour () == EOperationBehaviour::WriteWithLock || op->GetBehaviour () == EOperationBehaviour::NoTxWrite) {
154
153
auto evWrite = std::make_shared<NOlap::NTxInteractions::TEvWriteWriter>(writeMeta.GetTableId (),
155
154
buffer.GetAggregations ()[i]->GetRecordBatch (), Self->GetIndexOptional ()->GetVersionedIndex ().GetPrimaryKey ());
@@ -158,12 +157,11 @@ void TTxWrite::Complete(const TActorContext& ctx) {
158
157
if (op->GetBehaviour () == EOperationBehaviour::NoTxWrite) {
159
158
Self->OperationsManager ->CommitTransactionOnComplete (*Self, op->GetLockId (), Self->GetLastTxSnapshot ());
160
159
}
161
-
162
160
}
163
161
Self->Counters .GetCSCounters ().OnWriteTxComplete (now - writeMeta.GetWriteStartInstant ());
164
162
Self->Counters .GetCSCounters ().OnSuccessWriteResponse ();
165
163
}
166
164
Self->Counters .GetTabletCounters ()->IncCounter (COUNTER_IMMEDIATE_TX_COMPLETED);
167
165
}
168
166
169
- }
167
+ } // namespace NKikimr::NColumnShard
0 commit comments