Skip to content

Commit af411bb

Browse files
strict writing data validation (#13190)
1 parent 6878f9b commit af411bb

File tree

4 files changed

+27
-14
lines changed

4 files changed

+27
-14
lines changed

ydb/core/kqp/common/kqp_tx_manager.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -278,17 +278,17 @@ class TKqpTransactionManager : public IKqpTransactionManager {
278278
for (auto& [shardId, shardInfo] : ShardsInfo) {
279279
if ((shardInfo.Flags & EAction::WRITE)) {
280280
ReceivingShards.insert(shardId);
281+
if (shardInfo.IsOlap) {
282+
receivingColumnShardsSet.insert(shardId);
283+
}
281284
if (IsVolatile()) {
282285
SendingShards.insert(shardId);
283286
}
284-
if (shardInfo.IsOlap) {
285-
sendingColumnShardsSet.insert(shardId);
286-
}
287287
}
288288
if (!shardInfo.Locks.empty()) {
289289
SendingShards.insert(shardId);
290290
if (shardInfo.IsOlap) {
291-
receivingColumnShardsSet.insert(shardId);
291+
sendingColumnShardsSet.insert(shardId);
292292
}
293293
}
294294

@@ -325,6 +325,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
325325
auto arbiterIterator = std::begin(shards);
326326
std::advance(arbiterIterator, index);
327327
ArbiterColumnShard = *arbiterIterator;
328+
ReceivingShards.insert(*ArbiterColumnShard);
328329
}
329330

330331
ShardsToWaitPrepare = ShardsIds;

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2561,6 +2561,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
25612561
auto arbiterIterator = std::begin(shards);
25622562
std::advance(arbiterIterator, index);
25632563
columnShardArbiter = *arbiterIterator;
2564+
receivingShardsSet.insert(*columnShardArbiter);
25642565
}
25652566
}
25662567

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -315,18 +315,28 @@ class TCommitOperation {
315315
LockId = lock.GetLockId();
316316
SendingShards = std::set<ui64>(locks.GetSendingShards().begin(), locks.GetSendingShards().end());
317317
ReceivingShards = std::set<ui64>(locks.GetReceivingShards().begin(), locks.GetReceivingShards().end());
318-
const bool singleShardTx = SendingShards.empty() && ReceivingShards.empty();
319-
if (!singleShardTx) {
318+
if (SendingShards.empty() != ReceivingShards.empty()) {
319+
return TConclusionStatus::Fail("incorrect synchronization data (send/receiving lists)");
320+
}
321+
if (ReceivingShards.size() && SendingShards.size()) {
320322
if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) {
321-
return TConclusionStatus::Fail("shard is absent in sending and receiving lists");
323+
return TConclusionStatus::Fail("current tablet_id is absent in sending and receiving lists");
322324
}
323-
if (locks.HasArbiterColumnShard()) {
324-
ArbiterColumnShard = locks.GetArbiterColumnShard();
325-
} else {
326-
AFL_VERIFY(!ReceivingShards.empty());
327-
ArbiterColumnShard = *ReceivingShards.begin();
325+
if (!locks.HasArbiterColumnShard()) {
326+
return TConclusionStatus::Fail("no arbiter info in request");
327+
}
328+
ArbiterColumnShard = locks.GetArbiterColumnShard();
329+
330+
if (IsPrimary() && !ReceivingShards.contains(ArbiterColumnShard)) {
331+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "incorrect arbiter")("arbiter_id", ArbiterColumnShard)(
332+
"receiving", JoinSeq(", ", ReceivingShards))("sending", JoinSeq(", ", SendingShards));
333+
return TConclusionStatus::Fail("arbiter is absent in receiving lists");
334+
}
335+
if (!IsPrimary() && (!ReceivingShards.contains(ArbiterColumnShard) || !SendingShards.contains(ArbiterColumnShard))) {
336+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "incorrect arbiter")("arbiter_id", ArbiterColumnShard)(
337+
"receiving", JoinSeq(", ", ReceivingShards))("sending", JoinSeq(", ", SendingShards));
338+
return TConclusionStatus::Fail("arbiter is absent in sending or receiving lists");
328339
}
329-
AFL_VERIFY(ArbiterColumnShard);
330340
}
331341

332342
Generation = lock.GetGeneration();

ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,8 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
159159
};
160160

161161
virtual bool IsTxBroken() const override {
162-
return TxBroken.value_or(false);
162+
AFL_VERIFY(TxBroken);
163+
return *TxBroken;
163164
}
164165

165166
void InitializeRequests(TColumnShard& owner) {

0 commit comments

Comments
 (0)