Skip to content

Commit b666aa2

Browse files
immediate writing with no tx (#8697)
1 parent 4c66728 commit b666aa2

File tree

7 files changed

+50
-27
lines changed

7 files changed

+50
-27
lines changed

.github/config/muted_ya.txt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@ ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
1515
ydb/core/kqp/ut/pg KqpPg.CreateIndex
1616
ydb/core/kqp/ut/query KqpLimits.QueryReplySize
1717
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
18-
ydb/core/kqp/ut/query KqpQuery.OlapCreateAsSelect_Complex
19-
ydb/core/kqp/ut/query KqpQuery.OlapCreateAsSelect_Simple
20-
ydb/core/kqp/ut/federated_query/s3 KqpFederatedQuery.CreateTableAsSelectFromExternalDataSource
21-
ydb/core/kqp/ut/federated_query/s3 KqpFederatedQuery.CreateTableAsSelectFromExternalTable
2218
ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage
2319
ydb/core/kqp/ut/scheme [*/*]*
2420
ydb/core/kqp/ut/scheme KqpOlapScheme.DropThenAddColumn

ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,13 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
9090
if (!writeMeta.HasLongTxId()) {
9191
auto operation = Self->OperationsManager->GetOperationVerified((TWriteId)writeMeta.GetWriteId());
9292
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
93-
operation->OnWriteFinish(txc, aggr->GetWriteIds());
94-
if (operation->GetBehaviour() == EOperationBehaviour::InTxWrite) {
93+
operation->OnWriteFinish(txc, aggr->GetWriteIds(), operation->GetBehaviour() == EOperationBehaviour::NoTxWrite);
94+
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
95+
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID());
96+
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
97+
Self->OperationsManager->AddTemporaryTxLink(operation->GetLockId());
98+
Self->OperationsManager->CommitTransactionOnExecute(*Self, operation->GetLockId(), txc, Self->GetLastTxSnapshot());
99+
} else if (operation->GetBehaviour() == EOperationBehaviour::InTxWrite) {
95100
NKikimrTxColumnShard::TCommitWriteTxBody proto;
96101
proto.SetLockId(operation->GetLockId());
97102
TString txBody;
@@ -145,11 +150,15 @@ void TTxWrite::Complete(const TActorContext& ctx) {
145150
const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteMeta();
146151
if (!writeMeta.HasLongTxId()) {
147152
auto op = Self->GetOperationsManager().GetOperationVerified(NOlap::TWriteId(writeMeta.GetWriteId()));
148-
if (op->GetBehaviour() == EOperationBehaviour::WriteWithLock) {
153+
if (op->GetBehaviour() == EOperationBehaviour::WriteWithLock || op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
149154
auto evWrite = std::make_shared<NOlap::NTxInteractions::TEvWriteWriter>(writeMeta.GetTableId(),
150155
buffer.GetAggregations()[i]->GetRecordBatch(), Self->GetIndexOptional()->GetVersionedIndex().GetPrimaryKey());
151156
Self->GetOperationsManager().AddEventForLock(*Self, op->GetLockId(), evWrite);
152157
}
158+
if (op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
159+
Self->OperationsManager->CommitTransactionOnComplete(*Self, op->GetLockId(), Self->GetLastTxSnapshot());
160+
}
161+
153162
}
154163
Self->Counters.GetCSCounters().OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
155164
Self->Counters.GetCSCounters().OnSuccessWriteResponse();

ydb/core/tx/columnshard/operations/manager.cpp

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ void TOperationsManager::CommitTransactionOnExecute(
7575
opPtr->CommitOnExecute(owner, txc, snapshot);
7676
commited.emplace_back(opPtr);
7777
}
78-
OnTransactionFinishOnExecute(commited, txId, txc);
78+
OnTransactionFinishOnExecute(commited, lock, txId, txc);
7979
}
8080

8181
void TOperationsManager::CommitTransactionOnComplete(
@@ -101,7 +101,7 @@ void TOperationsManager::CommitTransactionOnComplete(
101101
opPtr->CommitOnComplete(owner, snapshot);
102102
commited.emplace_back(opPtr);
103103
}
104-
OnTransactionFinishOnComplete(commited, txId);
104+
OnTransactionFinishOnComplete(commited, lock, txId);
105105
}
106106

107107
void TOperationsManager::AbortTransactionOnExecute(TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
@@ -118,7 +118,7 @@ void TOperationsManager::AbortTransactionOnExecute(TColumnShard& owner, const ui
118118
aborted.emplace_back(opPtr);
119119
}
120120

121-
OnTransactionFinishOnExecute(aborted, txId, txc);
121+
OnTransactionFinishOnExecute(aborted, *lock, txId, txc);
122122
}
123123

124124
void TOperationsManager::AbortTransactionOnComplete(TColumnShard& owner, const ui64 txId) {
@@ -135,7 +135,7 @@ void TOperationsManager::AbortTransactionOnComplete(TColumnShard& owner, const u
135135
aborted.emplace_back(opPtr);
136136
}
137137

138-
OnTransactionFinishOnComplete(aborted, txId);
138+
OnTransactionFinishOnComplete(aborted, *lock, txId);
139139
}
140140

141141
TWriteOperation::TPtr TOperationsManager::GetOperation(const TWriteId writeId) const {
@@ -147,24 +147,20 @@ TWriteOperation::TPtr TOperationsManager::GetOperation(const TWriteId writeId) c
147147
}
148148

149149
void TOperationsManager::OnTransactionFinishOnExecute(
150-
const TVector<TWriteOperation::TPtr>& operations, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
151-
const ui64 lockId = GetLockForTxVerified(txId);
152-
auto itLock = LockFeatures.find(lockId);
153-
AFL_VERIFY(itLock != LockFeatures.end());
150+
const TVector<TWriteOperation::TPtr>& operations, const TLockFeatures& lock, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
154151
for (auto&& op : operations) {
155152
RemoveOperationOnExecute(op, txc);
156153
}
157154
NIceDb::TNiceDb db(txc.DB);
158-
db.Table<Schema::OperationTxIds>().Key(txId, lockId).Delete();
155+
db.Table<Schema::OperationTxIds>().Key(txId, lock.GetLockId()).Delete();
159156
}
160157

161158
void TOperationsManager::OnTransactionFinishOnComplete(
162-
const TVector<TWriteOperation::TPtr>& operations, const ui64 txId) {
163-
const ui64 lockId = GetLockForTxVerified(txId);
164-
auto itLock = LockFeatures.find(lockId);
165-
AFL_VERIFY(itLock != LockFeatures.end());
166-
itLock->second.RemoveInteractions(InteractionsContext);
167-
LockFeatures.erase(lockId);
159+
const TVector<TWriteOperation::TPtr>& operations, const TLockFeatures& lock, const ui64 txId) {
160+
{
161+
lock.RemoveInteractions(InteractionsContext);
162+
LockFeatures.erase(lock.GetLockId());
163+
}
168164
Tx2Lock.erase(txId);
169165
for (auto&& op : operations) {
170166
RemoveOperationOnComplete(op);
@@ -233,6 +229,11 @@ EOperationBehaviour TOperationsManager::GetBehaviour(const NEvents::TDataEvents:
233229
return EOperationBehaviour::Undefined;
234230
}
235231

232+
if (!evWrite.Record.HasLockTxId() && !evWrite.Record.HasLockNodeId() &&
233+
evWrite.Record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE) {
234+
return EOperationBehaviour::NoTxWrite;
235+
}
236+
236237
if (evWrite.Record.HasTxId() && evWrite.Record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_PREPARE) {
237238
return EOperationBehaviour::InTxWrite;
238239
}

ydb/core/tx/columnshard/operations/manager.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ class TOperationsManager {
124124
TWriteId LastWriteId = TWriteId(0);
125125

126126
public:
127+
127128
bool Load(NTabletFlatExecutor::TTransactionContext& txc);
128129
void AddEventForTx(TColumnShard& owner, const ui64 txId, const std::shared_ptr<NOlap::NTxInteractions::ITxEventWriter>& writer);
129130
void AddEventForLock(TColumnShard& owner, const ui64 lockId, const std::shared_ptr<NOlap::NTxInteractions::ITxEventWriter>& writer);
@@ -139,6 +140,9 @@ class TOperationsManager {
139140
TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot);
140141
void CommitTransactionOnComplete(
141142
TColumnShard& owner, const ui64 txId, const NOlap::TSnapshot& snapshot);
143+
void AddTemporaryTxLink(const ui64 lockId) {
144+
AFL_VERIFY(Tx2Lock.emplace(lockId, lockId).second);
145+
}
142146
void LinkTransactionOnExecute(const ui64 lockId, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
143147
void LinkTransactionOnComplete(const ui64 lockId, const ui64 txId);
144148
void AbortTransactionOnExecute(TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
@@ -198,7 +202,8 @@ class TOperationsManager {
198202
TWriteId BuildNextWriteId();
199203
void RemoveOperationOnExecute(const TWriteOperation::TPtr& op, NTabletFlatExecutor::TTransactionContext& txc);
200204
void RemoveOperationOnComplete(const TWriteOperation::TPtr& op);
201-
void OnTransactionFinishOnExecute(const TVector<TWriteOperation::TPtr>& operations, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
202-
void OnTransactionFinishOnComplete(const TVector<TWriteOperation::TPtr>& operations, const ui64 txId);
205+
void OnTransactionFinishOnExecute(const TVector<TWriteOperation::TPtr>& operations, const TLockFeatures& lock, const ui64 txId,
206+
NTabletFlatExecutor::TTransactionContext& txc);
207+
void OnTransactionFinishOnComplete(const TVector<TWriteOperation::TPtr>& operations, const TLockFeatures& lock, const ui64 txId);
203208
};
204209
} // namespace NKikimr::NColumnShard

ydb/core/tx/columnshard/operations/write.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,15 @@ void TWriteOperation::CommitOnComplete(TColumnShard& owner, const NOlap::TSnapsh
6161
owner.UpdateInsertTableCounters();
6262
}
6363

64-
void TWriteOperation::OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const TVector<TWriteId>& globalWriteIds) {
64+
void TWriteOperation::OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const TVector<TWriteId>& globalWriteIds, const bool ephemeralFlag) {
6565
Y_ABORT_UNLESS(Status == EOperationStatus::Started);
6666
Status = EOperationStatus::Prepared;
6767
GlobalWriteIds = globalWriteIds;
6868

69+
if (ephemeralFlag) {
70+
return;
71+
}
72+
6973
NIceDb::TNiceDb db(txc.DB);
7074
NKikimrTxColumnShard::TInternalOperationData proto;
7175
ToProto(proto);

ydb/core/tx/columnshard/operations/write.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ enum class EOperationBehaviour : ui32 {
3939
InTxWrite = 2,
4040
WriteWithLock = 3,
4141
CommitWriteLock = 4,
42-
AbortWriteLock = 5
42+
AbortWriteLock = 5,
43+
NoTxWrite = 6
4344
};
4445

4546
class TWriteOperation {
@@ -61,7 +62,7 @@ class TWriteOperation {
6162

6263
void Start(TColumnShard& owner, const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data, const NActors::TActorId& source,
6364
const std::shared_ptr<NOlap::ISnapshotSchema>& schema, const TActorContext& ctx);
64-
void OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const TVector<TWriteId>& globalWriteIds);
65+
void OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const TVector<TWriteId>& globalWriteIds, const bool ephemeralFlag);
6566
void CommitOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) const;
6667
void CommitOnComplete(TColumnShard& owner, const NOlap::TSnapshot& snapshot) const;
6768
void AbortOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const;

ydb/core/tx/data_events/events.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,13 @@ struct TDataEvents {
103103
return result;
104104
}
105105

106+
static std::unique_ptr<TEvWriteResult> BuildCompleted(const ui64 origin) {
107+
auto result = std::make_unique<TEvWriteResult>();
108+
result->Record.SetOrigin(origin);
109+
result->Record.SetStatus(NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
110+
return result;
111+
}
112+
106113
static std::unique_ptr<TEvWriteResult> BuildCompleted(const ui64 origin, const ui64 txId) {
107114
auto result = std::make_unique<TEvWriteResult>();
108115
result->Record.SetOrigin(origin);

0 commit comments

Comments
 (0)